Sync v2.1.4 part 2

This commit is contained in:
Andrey Meshkov 2022-12-30 15:10:23 +03:00
parent 85bca36f57
commit 7dec041e0f
2 changed files with 89 additions and 76 deletions

View File

@ -35,102 +35,119 @@ const dayMinutes = 24 * 60
// userCounter is used to save estimated counts of active users per hour and per // userCounter is used to save estimated counts of active users per hour and per
// day. // day.
type userCounter struct { type userCounter struct {
// lock protects all fields below. // currentMinuteCounterMu protects currentMinute and currentMinuteCounter.
lock *sync.Mutex currentMinuteCounterMu *sync.Mutex
// dailyMinuteCounters contains HyperLogLog counters for each minute of the // currentMinuteCounter is a counter for the current minute of a day.
currentMinuteCounter *hyperloglog.Sketch
// dayMinuteCountersMu protects dayMinuteCounters.
dayMinuteCountersMu *sync.Mutex
// dayMinuteCounters contains HyperLogLog counters for each minute of the
// day. The index of the slice is the minute of the day in the [0, 1440) // day. The index of the slice is the minute of the day in the [0, 1440)
// interval. // interval.
dailyMinuteCounters []*hyperloglog.Sketch dayMinuteCounters []*hyperloglog.Sketch
// currentUnixSecond is used to check if the hourly and daily user counts // currentMinute is the current minute of the day in the [0, 1440) interval.
// need updating. currentMinute int
currentUnixSecond int64
// currentMinute is used to check if the current minute counter of
// dailyMinuteCounts requires resetting.
currentMinute int64
} }
// newUserCounter initializes and returns a *userCounter. // newUserCounter initializes and returns a *userCounter.
func newUserCounter() (c *userCounter) { func newUserCounter() (c *userCounter) {
return &userCounter{ return &userCounter{
lock: &sync.Mutex{}, currentMinuteCounterMu: &sync.Mutex{},
dailyMinuteCounters: make([]*hyperloglog.Sketch, dayMinutes), currentMinuteCounter: nil,
currentUnixSecond: -1, dayMinuteCountersMu: &sync.Mutex{},
currentMinute: -1, dayMinuteCounters: make([]*hyperloglog.Sketch, dayMinutes),
// Use -1 to trigger the initialization of currentMinuteCounter
// regardless of the actual current minute of the day.
currentMinute: -1,
} }
} }
// record updates the values of the hourly and daily counters. // record updates the current minute-of-the-day counter as well as sets the
func (c *userCounter) record(ip netip.Addr) { // values of the hourly and daily metric counters, if necessary. now is the
now := time.Now().UTC() // time for which to record the IP address, typically the current time. If
unixSecond := now.Unix() // syncUpdate is true, record performs the metric counter updates syncrhonously.
minuteOfTheDay := int64(now.Hour()*60 + now.Minute()) // syncUpdate is currently only used in tests.
func (c *userCounter) record(now time.Time, ip netip.Addr, syncUpdate bool) {
minuteOfTheDay := now.Hour()*60 + now.Minute()
// Assume that ip is the remote IP address, which has already been unmapped // Assume that ip is the remote IP address, which has already been unmapped
// by [netutil.NetAddrToAddrPort]. // by [netutil.NetAddrToAddrPort].
b := ip.As16() b := ip.As16()
c.lock.Lock() c.currentMinuteCounterMu.Lock()
defer c.lock.Unlock() defer c.currentMinuteCounterMu.Unlock()
var counter *hyperloglog.Sketch
if c.currentMinute != minuteOfTheDay { if c.currentMinute != minuteOfTheDay {
prevMinute := c.currentMinute
prevMinuteCounter := c.currentMinuteCounter
c.currentMinute = minuteOfTheDay c.currentMinute = minuteOfTheDay
counter = hyperloglog.New() c.currentMinuteCounter = hyperloglog.New()
c.dailyMinuteCounters[minuteOfTheDay] = counter
} else { // If this is the first iteration and prevMinute is -1, don't update the
counter = c.dailyMinuteCounters[minuteOfTheDay] // counters, since there are none.
if prevMinute != -1 {
if syncUpdate {
c.updateCounters(prevMinute, prevMinuteCounter)
} else {
go c.updateCounters(prevMinute, prevMinuteCounter)
}
}
} }
counter.Insert(b[:]) c.currentMinuteCounter.Insert(b[:])
// Only update the hourly and daily counters once per second, since this
// operation is assumed to take significant amount of time, and so the lock
// contention should be minimized. Do that in a separate goroutine to
// return quicker and not stall the request processing.
if c.currentUnixSecond != unixSecond {
c.currentUnixSecond = unixSecond
go c.update(minuteOfTheDay)
}
} }
// update sets hourly and daily gauges to the estimated values of user counters. // updateCounters adds prevCounter to c.dayMinuteCounters and then merges the
// // daily counters and updates the metrics.
// It is expected to be used in a goroutine. func (c *userCounter) updateCounters(prevMinute int, prevCounter *hyperloglog.Sketch) {
func (c *userCounter) update(m int64) { defer log.OnPanic("metrics.userCounter.updateCounters")
defer log.OnPanic("metrics.userCounter.update")
c.dayMinuteCountersMu.Lock()
defer c.dayMinuteCountersMu.Unlock()
// Insert the previous counter into the rolling counters collection.
c.dayMinuteCounters[prevMinute] = prevCounter
// Calculate the estimated numbers of hourly and daily users.
hourly, daily := c.estimate(prevMinute)
dnsSvcUsersCount.Set(float64(hourly))
dnsSvcUsersDailyCount.Set(float64(daily))
}
// estimate uses HyperLogLog counters to estimate the hourly and daily users
// count, starting with the minute of the day m.
func (c *userCounter) estimate(m int) (hourly, daily uint64) {
hourlyCounter, dailyCounter := hyperloglog.New(), hyperloglog.New() hourlyCounter, dailyCounter := hyperloglog.New(), hyperloglog.New()
c.lock.Lock()
defer c.lock.Unlock()
// Go through all minutes in a day while decreasing the current minute m. // Go through all minutes in a day while decreasing the current minute m.
// Decreasing m, as opposed to increasing it or using i as the minute, is // Decreasing m, as opposed to increasing it or using i as the minute, is
// required to make summing the hourly statistics within the same loop // required to make summing the hourly statistics within the same loop
// easier. // easier.
for i := 0; i < dayMinutes; i++ { for i := 0; i < dayMinutes; i++ {
counter := c.dailyMinuteCounters[m] minCounter := c.dayMinuteCounters[m]
m = decrMod(m, dayMinutes) m = decrMod(m, dayMinutes)
if counter == nil { if minCounter == nil {
continue continue
} }
// Use [mustMerge], since the only reason an error may be returned here // Use [mustMerge], since the only reason an error may be returned here
// is when the two sketches do not have the same precisions. // is when the two sketches do not have the same precisions.
mustMerge(dailyCounter, counter) mustMerge(dailyCounter, minCounter)
// Only include the first 60 minutes into the hourly statistics. // Only include the first 60 minutes into the hourly statistics.
if i < 60 { if i < 60 {
mustMerge(hourlyCounter, counter) mustMerge(hourlyCounter, minCounter)
} }
} }
dnsSvcUsersCount.Set(float64(hourlyCounter.Estimate())) return hourlyCounter.Estimate(), dailyCounter.Estimate()
dnsSvcUsersDailyCount.Set(float64(dailyCounter.Estimate()))
} }
// mustMerge panics if a.Merge(b) returns an error. // mustMerge panics if a.Merge(b) returns an error.
@ -143,7 +160,7 @@ func mustMerge(a, b *hyperloglog.Sketch) {
// decrMod decreases n by one using modulus m. That is, for n = 0 and m = 100 // decrMod decreases n by one using modulus m. That is, for n = 0 and m = 100
// it returns 99. // it returns 99.
func decrMod(n, m int64) (res int64) { func decrMod(n, m int) (res int) {
if n == 0 { if n == 0 {
return m - 1 return m - 1
} }
@ -157,5 +174,5 @@ var defaultUserCounter = newUserCounter()
// DNSSvcUsersCountUpdate records a visit by ip and updates the values of the // DNSSvcUsersCountUpdate records a visit by ip and updates the values of the
// [dnsSvcUsersCount] and [dnsSvcUsersDailyCount] gauges every second. // [dnsSvcUsersCount] and [dnsSvcUsersDailyCount] gauges every second.
func DNSSvcUsersCountUpdate(ip netip.Addr) { func DNSSvcUsersCountUpdate(ip netip.Addr) {
defaultUserCounter.record(ip) defaultUserCounter.record(time.Now(), ip, false)
} }

View File

@ -3,44 +3,40 @@ package metrics
import ( import (
"math/rand" "math/rand"
"net/netip" "net/netip"
"strconv"
"testing" "testing"
"time" "time"
"github.com/axiomhq/hyperloglog"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestUserCounter(t *testing.T) { func TestUserCounter(t *testing.T) {
const n = 100_000 const ipsPerMinute = 2
now := time.Now().UTC()
minuteOfTheDay := int64(now.Hour()*60 + now.Minute())
// Use a constant seed to make the test reproducible. // Use a constant seed to make the test reproducible.
src := rand.NewSource(1234) src := rand.NewSource(1234)
r := rand.New(src) r := rand.New(src)
ip := randIP(r)
c := newUserCounter() c := newUserCounter()
for i := 0; i < n; i++ {
c.record(ip) now := time.Unix(0, 0).UTC()
ip = randIP(r) for h := 0; h < 24; h++ {
t.Run(strconv.Itoa(h), func(t *testing.T) {
for m := 0; m < 60; m++ {
for i := 0; i < ipsPerMinute; i++ {
c.record(now, randIP(r), true)
}
now = now.Add(1 * time.Minute)
}
hourly, _ := c.estimate(h*60 + 59)
assert.InEpsilon(t, uint64(ipsPerMinute*60), hourly, 0.02)
})
} }
// Use the next minute as a starting point, since it could change during the _, daily := c.estimate(23*60 + 59)
// test run. assert.InEpsilon(t, uint64(ipsPerMinute*24*60), daily, 0.02)
m := minuteOfTheDay + 1
hourlyCounter := hyperloglog.New()
for i := 0; i < 60; i++ {
counter := c.dailyMinuteCounters[m]
m = decrMod(m, dayMinutes)
if counter != nil {
mustMerge(hourlyCounter, counter)
}
}
assert.InEpsilon(t, uint64(n), hourlyCounter.Estimate(), 0.01)
} }
// randIP returns a pseudorandomly generated IP address. // randIP returns a pseudorandomly generated IP address.