AdGuardDNS/internal/metrics/usercount.go
Andrey Meshkov da0cb6fd0e Sync v2.9.0
2024-10-14 17:44:24 +03:00

207 lines
6.2 KiB
Go

package metrics
import (
"fmt"
"sync"
"time"
"github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/golibs/timeutil"
"github.com/axiomhq/hyperloglog"
"github.com/prometheus/client_golang/prometheus"
)
const (
// minutesPerHour is the number of minutes in an hour.
minutesPerHour = int(time.Hour / time.Minute)
// hoursPerDay is the number of hours in a day.
hoursPerDay = int(timeutil.Day / time.Hour)
)
// UserCounter is used to save estimated counts of active users per hour and per
// day by some data.
//
// TODO(a.garipov): Improve and move to golibs.
type UserCounter struct {
// lastHour is a gauge with an approximate number of DNS users for the
// last 1 hour.
lastHour prometheus.Gauge
// lastDay is a gauge with an approximate number of DNS users for
// the last 24 hours.
lastDay prometheus.Gauge
// currentMu protects currentMinute and currentMinuteCounter.
currentMu *sync.Mutex
// currentMinuteCounter is a counter for the current minute of a day.
currentMinuteCounter *hyperloglog.Sketch
// countersMu protects hourMinuteCounters and dayHourCounters.
countersMu *sync.Mutex
// hourMinuteCounters contains HyperLogLog counters for each minute of an
// hour. The index of the slice is the minute of the hour in the [0, 60)
// interval.
hourMinuteCounters *[minutesPerHour]*hyperloglog.Sketch
// dayHourCounters contains HyperLogLog counters for each hour of a day.
// The index of the slice is the hour of the day in the [0, 24) interval.
dayHourCounters *[hoursPerDay]*hyperloglog.Sketch
// currentMinute is the current minute of the day in the [0, 1440) interval.
currentMinute int
}
// NewUserCounter initializes and returns a properly initialized *UserCounter
// that uses the given gauges to estimate the user count.
func NewUserCounter(lastHour, lastDay prometheus.Gauge) (c *UserCounter) {
return &UserCounter{
lastHour: lastHour,
lastDay: lastDay,
currentMu: &sync.Mutex{},
currentMinuteCounter: nil,
countersMu: &sync.Mutex{},
hourMinuteCounters: &[minutesPerHour]*hyperloglog.Sketch{},
dayHourCounters: &[hoursPerDay]*hyperloglog.Sketch{},
// Use -1 to trigger the initialization of currentMinuteCounter
// regardless of the actual current minute of the day.
currentMinute: -1,
}
}
// Record updates the current minute-of-the-day counter as well as sets the
// values of the hourly and daily metric counters, if necessary. now is the
// time for which to Record the IP address or other data, typically the current
// time.
//
// If syncUpdate is true, Record performs the metric counter updates
// synchronously. It is currently only used in tests.
//
// It currently assumes that it will be called at least once per day.
func (c *UserCounter) Record(now time.Time, userData []byte, syncUpdate bool) {
hour, minute, _ := now.Clock()
minuteOfDay := hour*minutesPerHour + minute
c.currentMu.Lock()
defer c.currentMu.Unlock()
if c.currentMinute != minuteOfDay {
prevMinute := c.currentMinute
prevMinuteCounter := c.currentMinuteCounter
c.currentMinute = minuteOfDay
c.currentMinuteCounter = newHyperLogLog()
// If this is the first iteration and prevMinute is -1, don't update the
// counters, since there are none.
if prevMinute != -1 {
if syncUpdate {
c.updateCounters(prevMinute, hour, prevMinuteCounter)
} else {
go c.updateCounters(prevMinute, hour, prevMinuteCounter)
}
}
}
c.currentMinuteCounter.Insert(userData)
}
// updateCounters adds prevCounter to counters and then merges them and updates
// the metrics. It also clears all the stale hourly counters from the previous
// day.
func (c *UserCounter) updateCounters(
prevMinute int,
currentHour int,
prevMinuteCounter *hyperloglog.Sketch,
) {
defer log.OnPanic("metrics.userCounter.updateCounters")
prevMinuteOfHour := prevMinute % minutesPerHour
hourOfPrevMinute := prevMinute / minutesPerHour
c.countersMu.Lock()
defer c.countersMu.Unlock()
// Insert the previous counter into the rolling counters collection.
c.hourMinuteCounters[prevMinuteOfHour] = prevMinuteCounter
c.updateHours(currentHour, hourOfPrevMinute, prevMinuteCounter)
// Calculate the estimated numbers of hourly and daily users.
hourly, daily := c.Estimate()
c.lastHour.Set(float64(hourly))
c.lastDay.Set(float64(daily))
}
// updateHours adds the prevMinuteCounter to the hourly counter for prevHour
// hour, and clears all the counters between curHour and prevHour, since those
// may contain data for the previous day.
func (c *UserCounter) updateHours(curHour, prevHour int, prevMinuteCounter *hyperloglog.Sketch) {
for h := curHour; h != prevHour; h = decMod(h, hoursPerDay) {
c.dayHourCounters[h] = nil
}
if c.dayHourCounters[prevHour] == nil {
c.dayHourCounters[prevHour] = newHyperLogLog()
}
mustMerge(c.dayHourCounters[prevHour], prevMinuteCounter)
}
// Estimate uses HyperLogLog counters to return the number of users for the last
// hour and the last day. It doesn't include the data for current minute.
//
// It must not be called concurrently with [UserCounter.updateCounters].
//
// TODO(a.garipov): Unexport and use gauges instead?
func (c *UserCounter) Estimate() (hourly, daily uint64) {
hourlyCounter, dailyCounter := newHyperLogLog(), newHyperLogLog()
for _, c := range c.hourMinuteCounters {
if c != nil {
mustMerge(hourlyCounter, c)
}
}
for _, c := range c.dayHourCounters {
if c != nil {
mustMerge(dailyCounter, c)
}
}
return hourlyCounter.Estimate(), dailyCounter.Estimate()
}
// mustMerge panics if [hyperloglog.Sketch.Merge] returns an error.
func mustMerge(a, b *hyperloglog.Sketch) {
err := a.Merge(b)
if err != nil {
panic(err)
}
}
// newHyperLogLog creates a new instance of hyperloglog.Sketch with precision 18
// and sparse mode enabled.
func newHyperLogLog() (sk *hyperloglog.Sketch) {
sk, err := hyperloglog.NewSketch(18, true)
if err != nil {
// Should never happen, as NewSketch only returns an error when the
// precision is out of range.
panic(fmt.Errorf("metrics.UserCounter.Record: unexpected error: %w", err))
}
return sk
}
// decMod decreases n by one using modulus m. That is, for n = 0 and m = 100 it
// returns 99. n should be in the [0, m) interval.
func decMod(n, m int) (res int) {
if n == 0 {
return m - 1
}
return n - 1
}