From 7dec041e0f236958eec5b5e32b0b575d0a6aff2b Mon Sep 17 00:00:00 2001 From: Andrey Meshkov Date: Fri, 30 Dec 2022 15:10:23 +0300 Subject: [PATCH] Sync v2.1.4 part 2 --- internal/metrics/usercount.go | 123 +++++++++++--------- internal/metrics/usercount_internal_test.go | 42 +++---- 2 files changed, 89 insertions(+), 76 deletions(-) diff --git a/internal/metrics/usercount.go b/internal/metrics/usercount.go index ec042e7..b49d2b3 100644 --- a/internal/metrics/usercount.go +++ b/internal/metrics/usercount.go @@ -35,102 +35,119 @@ const dayMinutes = 24 * 60 // userCounter is used to save estimated counts of active users per hour and per // day. type userCounter struct { - // lock protects all fields below. - lock *sync.Mutex + // currentMinuteCounterMu protects currentMinute and currentMinuteCounter. + currentMinuteCounterMu *sync.Mutex - // dailyMinuteCounters contains HyperLogLog counters for each minute of the + // currentMinuteCounter is a counter for the current minute of a day. + currentMinuteCounter *hyperloglog.Sketch + + // dayMinuteCountersMu protects dayMinuteCounters. + dayMinuteCountersMu *sync.Mutex + + // dayMinuteCounters contains HyperLogLog counters for each minute of the // day. The index of the slice is the minute of the day in the [0, 1440) // interval. - dailyMinuteCounters []*hyperloglog.Sketch + dayMinuteCounters []*hyperloglog.Sketch - // currentUnixSecond is used to check if the hourly and daily user counts - // need updating. - currentUnixSecond int64 - - // currentMinute is used to check if the current minute counter of - // dailyMinuteCounts requires resetting. - currentMinute int64 + // currentMinute is the current minute of the day in the [0, 1440) interval. + currentMinute int } // newUserCounter initializes and returns a *userCounter. func newUserCounter() (c *userCounter) { return &userCounter{ - lock: &sync.Mutex{}, - dailyMinuteCounters: make([]*hyperloglog.Sketch, dayMinutes), - currentUnixSecond: -1, - currentMinute: -1, + currentMinuteCounterMu: &sync.Mutex{}, + currentMinuteCounter: nil, + dayMinuteCountersMu: &sync.Mutex{}, + dayMinuteCounters: make([]*hyperloglog.Sketch, dayMinutes), + // Use -1 to trigger the initialization of currentMinuteCounter + // regardless of the actual current minute of the day. + currentMinute: -1, } } -// record updates the values of the hourly and daily counters. -func (c *userCounter) record(ip netip.Addr) { - now := time.Now().UTC() - unixSecond := now.Unix() - minuteOfTheDay := int64(now.Hour()*60 + now.Minute()) +// record updates the current minute-of-the-day counter as well as sets the +// values of the hourly and daily metric counters, if necessary. now is the +// time for which to record the IP address, typically the current time. If +// syncUpdate is true, record performs the metric counter updates syncrhonously. +// syncUpdate is currently only used in tests. +func (c *userCounter) record(now time.Time, ip netip.Addr, syncUpdate bool) { + minuteOfTheDay := now.Hour()*60 + now.Minute() // Assume that ip is the remote IP address, which has already been unmapped // by [netutil.NetAddrToAddrPort]. b := ip.As16() - c.lock.Lock() - defer c.lock.Unlock() + c.currentMinuteCounterMu.Lock() + defer c.currentMinuteCounterMu.Unlock() - var counter *hyperloglog.Sketch if c.currentMinute != minuteOfTheDay { + prevMinute := c.currentMinute + prevMinuteCounter := c.currentMinuteCounter + c.currentMinute = minuteOfTheDay - counter = hyperloglog.New() - c.dailyMinuteCounters[minuteOfTheDay] = counter - } else { - counter = c.dailyMinuteCounters[minuteOfTheDay] + c.currentMinuteCounter = hyperloglog.New() + + // If this is the first iteration and prevMinute is -1, don't update the + // counters, since there are none. + if prevMinute != -1 { + if syncUpdate { + c.updateCounters(prevMinute, prevMinuteCounter) + } else { + go c.updateCounters(prevMinute, prevMinuteCounter) + } + } } - counter.Insert(b[:]) - - // Only update the hourly and daily counters once per second, since this - // operation is assumed to take significant amount of time, and so the lock - // contention should be minimized. Do that in a separate goroutine to - // return quicker and not stall the request processing. - if c.currentUnixSecond != unixSecond { - c.currentUnixSecond = unixSecond - go c.update(minuteOfTheDay) - } + c.currentMinuteCounter.Insert(b[:]) } -// update sets hourly and daily gauges to the estimated values of user counters. -// -// It is expected to be used in a goroutine. -func (c *userCounter) update(m int64) { - defer log.OnPanic("metrics.userCounter.update") +// updateCounters adds prevCounter to c.dayMinuteCounters and then merges the +// daily counters and updates the metrics. +func (c *userCounter) updateCounters(prevMinute int, prevCounter *hyperloglog.Sketch) { + defer log.OnPanic("metrics.userCounter.updateCounters") + c.dayMinuteCountersMu.Lock() + defer c.dayMinuteCountersMu.Unlock() + + // Insert the previous counter into the rolling counters collection. + c.dayMinuteCounters[prevMinute] = prevCounter + + // Calculate the estimated numbers of hourly and daily users. + hourly, daily := c.estimate(prevMinute) + + dnsSvcUsersCount.Set(float64(hourly)) + dnsSvcUsersDailyCount.Set(float64(daily)) +} + +// estimate uses HyperLogLog counters to estimate the hourly and daily users +// count, starting with the minute of the day m. +func (c *userCounter) estimate(m int) (hourly, daily uint64) { hourlyCounter, dailyCounter := hyperloglog.New(), hyperloglog.New() - c.lock.Lock() - defer c.lock.Unlock() - // Go through all minutes in a day while decreasing the current minute m. // Decreasing m, as opposed to increasing it or using i as the minute, is // required to make summing the hourly statistics within the same loop // easier. for i := 0; i < dayMinutes; i++ { - counter := c.dailyMinuteCounters[m] + minCounter := c.dayMinuteCounters[m] m = decrMod(m, dayMinutes) - if counter == nil { + if minCounter == nil { continue } // Use [mustMerge], since the only reason an error may be returned here // is when the two sketches do not have the same precisions. - mustMerge(dailyCounter, counter) + mustMerge(dailyCounter, minCounter) // Only include the first 60 minutes into the hourly statistics. if i < 60 { - mustMerge(hourlyCounter, counter) + mustMerge(hourlyCounter, minCounter) } } - dnsSvcUsersCount.Set(float64(hourlyCounter.Estimate())) - dnsSvcUsersDailyCount.Set(float64(dailyCounter.Estimate())) + return hourlyCounter.Estimate(), dailyCounter.Estimate() } // mustMerge panics if a.Merge(b) returns an error. @@ -143,7 +160,7 @@ func mustMerge(a, b *hyperloglog.Sketch) { // decrMod decreases n by one using modulus m. That is, for n = 0 and m = 100 // it returns 99. -func decrMod(n, m int64) (res int64) { +func decrMod(n, m int) (res int) { if n == 0 { return m - 1 } @@ -157,5 +174,5 @@ var defaultUserCounter = newUserCounter() // DNSSvcUsersCountUpdate records a visit by ip and updates the values of the // [dnsSvcUsersCount] and [dnsSvcUsersDailyCount] gauges every second. func DNSSvcUsersCountUpdate(ip netip.Addr) { - defaultUserCounter.record(ip) + defaultUserCounter.record(time.Now(), ip, false) } diff --git a/internal/metrics/usercount_internal_test.go b/internal/metrics/usercount_internal_test.go index 8dd92d9..641a378 100644 --- a/internal/metrics/usercount_internal_test.go +++ b/internal/metrics/usercount_internal_test.go @@ -3,44 +3,40 @@ package metrics import ( "math/rand" "net/netip" + "strconv" "testing" "time" - "github.com/axiomhq/hyperloglog" "github.com/stretchr/testify/assert" ) func TestUserCounter(t *testing.T) { - const n = 100_000 - - now := time.Now().UTC() - minuteOfTheDay := int64(now.Hour()*60 + now.Minute()) + const ipsPerMinute = 2 // Use a constant seed to make the test reproducible. src := rand.NewSource(1234) r := rand.New(src) - ip := randIP(r) c := newUserCounter() - for i := 0; i < n; i++ { - c.record(ip) - ip = randIP(r) + + now := time.Unix(0, 0).UTC() + for h := 0; h < 24; h++ { + t.Run(strconv.Itoa(h), func(t *testing.T) { + for m := 0; m < 60; m++ { + for i := 0; i < ipsPerMinute; i++ { + c.record(now, randIP(r), true) + } + + now = now.Add(1 * time.Minute) + } + + hourly, _ := c.estimate(h*60 + 59) + assert.InEpsilon(t, uint64(ipsPerMinute*60), hourly, 0.02) + }) } - // Use the next minute as a starting point, since it could change during the - // test run. - m := minuteOfTheDay + 1 - hourlyCounter := hyperloglog.New() - for i := 0; i < 60; i++ { - counter := c.dailyMinuteCounters[m] - m = decrMod(m, dayMinutes) - - if counter != nil { - mustMerge(hourlyCounter, counter) - } - } - - assert.InEpsilon(t, uint64(n), hourlyCounter.Estimate(), 0.01) + _, daily := c.estimate(23*60 + 59) + assert.InEpsilon(t, uint64(ipsPerMinute*24*60), daily, 0.02) } // randIP returns a pseudorandomly generated IP address.