alice-lg/pkg/store/neighbors_store.go

369 lines
9.4 KiB
Go
Raw Normal View History

2021-10-20 18:36:30 +00:00
package store
2017-06-23 16:27:09 +02:00
import (
2021-11-19 19:08:59 +01:00
"context"
2022-01-10 17:13:00 +01:00
"errors"
2017-06-23 16:27:09 +02:00
"log"
2021-11-19 19:08:59 +01:00
"math/rand"
"regexp"
"strconv"
2017-06-23 16:27:09 +02:00
"time"
2017-07-14 14:18:47 +02:00
2021-03-22 17:35:20 +01:00
"github.com/alice-lg/alice-lg/pkg/api"
2021-10-20 20:26:37 +00:00
"github.com/alice-lg/alice-lg/pkg/config"
2021-11-19 19:08:59 +01:00
"github.com/alice-lg/alice-lg/pkg/sources"
2017-06-23 16:27:09 +02:00
)
2017-06-23 17:40:19 +02:00
2021-10-22 22:17:04 +02:00
// ReMatchASLookup matches lookups with an 'AS' prefix
var ReMatchASLookup = regexp.MustCompile(`(?i)^AS(\d+)`)
2018-12-16 16:29:02 +01:00
2021-11-19 19:08:59 +01:00
// NeighborsStoreBackend interface
type NeighborsStoreBackend interface {
// SetNeighbors replaces all neighbors for a given
// route server identified by sourceID.
SetNeighbors(
ctx context.Context,
sourceID string,
neighbors api.Neighbors,
2021-11-19 22:00:09 +01:00
) error
// GetNeighborsAt retrieves all neighbors associated
// with a route server (source).
GetNeighborsAt(
ctx context.Context,
sourceID string,
) (api.Neighbors, error)
// GetNeighborsMapAt retrieve a map of neighbor ids
// to the neighbor for a given route server for quick
// consecutive lookup.
GetNeighborsMapAt(
2021-11-19 22:00:09 +01:00
ctx context.Context,
sourceID string,
) (map[string]*api.Neighbor, error)
2021-11-19 22:00:09 +01:00
// CountNeighborsAt retrieves the current number of
// stored neighbors.
CountNeighborsAt(
ctx context.Context,
sourceID string,
) (int, error)
2021-11-19 19:08:59 +01:00
}
2017-06-23 16:27:09 +02:00
2021-10-22 22:17:04 +02:00
// NeighborsStore is queryable for neighbor information
2021-10-15 21:24:24 +02:00
type NeighborsStore struct {
2021-11-19 22:00:09 +01:00
backend NeighborsStoreBackend
2021-11-19 19:08:59 +01:00
sources *SourcesStore
2017-07-14 14:18:47 +02:00
2021-11-19 19:08:59 +01:00
forceNeighborRefresh bool
2017-06-23 17:40:19 +02:00
}
2021-10-15 21:24:24 +02:00
// NewNeighborsStore creates a new store for neighbors
2021-11-19 19:08:59 +01:00
func NewNeighborsStore(
cfg *config.Config,
2021-11-19 22:00:09 +01:00
backend NeighborsStoreBackend,
2021-11-19 19:08:59 +01:00
) *NeighborsStore {
2018-07-07 11:39:46 +02:00
// Set refresh interval, default to 5 minutes when
// interval is set to 0
2021-11-19 19:08:59 +01:00
refreshInterval := time.Duration(
2021-10-20 20:26:37 +00:00
cfg.Server.NeighborsStoreRefreshInterval) * time.Minute
2021-11-19 19:08:59 +01:00
if refreshInterval == 0 {
refreshInterval = time.Duration(5) * time.Minute
2018-07-07 11:39:46 +02:00
}
2022-02-08 21:55:26 +01:00
refreshParallelism := cfg.Server.NeighborsStoreRefreshParallelism
if refreshParallelism <= 0 {
refreshParallelism = 1
}
2018-07-07 11:39:46 +02:00
2022-02-08 21:55:26 +01:00
log.Println("Neighbors refresh interval set to:", refreshInterval)
log.Println("Neighbors refresh parallelism:", refreshParallelism)
2021-11-19 22:21:18 +01:00
2021-11-19 22:00:09 +01:00
// Store refresh information per store
2022-02-08 21:55:26 +01:00
sources := NewSourcesStore(cfg, refreshInterval, refreshParallelism)
2021-11-19 22:00:09 +01:00
2021-11-19 19:08:59 +01:00
// Neighbors will be refreshed on every GetNeighborsAt
// invocation. Why? I (Annika) don't know. I have to ask Patrick.
// TODO: This feels wrong here. Figure out reason why it
// was added and refactor.
// At least now the variable name is a bit more honest.
2021-11-19 22:00:09 +01:00
forceNeighborRefresh := cfg.Server.EnableNeighborsStatusRefresh
2021-10-15 21:24:24 +02:00
store := &NeighborsStore{
2021-11-19 19:08:59 +01:00
backend: backend,
sources: sources,
forceNeighborRefresh: forceNeighborRefresh,
2017-06-23 17:40:19 +02:00
}
return store
}
2021-07-02 14:30:43 +02:00
// Start the store's housekeeping.
2021-10-22 22:17:04 +02:00
func (s *NeighborsStore) Start() {
2021-10-15 21:24:24 +02:00
log.Println("Starting local neighbors store")
2021-10-22 22:17:04 +02:00
go s.init()
2017-06-23 17:40:19 +02:00
}
2021-10-22 22:17:04 +02:00
func (s *NeighborsStore) init() {
2021-11-19 19:08:59 +01:00
// Periodically trigger updates. Sources with an
// LastNeighborsRefresh < refreshInterval or currently
// updating will be skipped.
2017-06-23 17:40:19 +02:00
for {
2021-10-22 22:17:04 +02:00
s.update()
2021-11-19 19:08:59 +01:00
time.Sleep(time.Second)
2017-06-23 17:40:19 +02:00
}
}
2021-12-06 10:03:17 +01:00
// GetStatus retrievs the status for a route server
2021-10-22 22:17:04 +02:00
// identified by sourceID.
2021-12-06 10:03:17 +01:00
func (s *NeighborsStore) GetStatus(sourceID string) (*Status, error) {
2021-11-19 19:08:59 +01:00
return s.sources.GetStatus(sourceID)
}
2021-12-03 18:17:41 +01:00
// IsInitialized retrieves the status for a route server
2021-12-02 16:11:15 +01:00
// and checks if it is ready.
2021-12-03 18:17:41 +01:00
func (s *NeighborsStore) IsInitialized(sourceID string) bool {
rdy, _ := s.sources.IsInitialized(sourceID)
2021-12-02 16:11:15 +01:00
return rdy
}
2021-11-19 19:08:59 +01:00
// updateSource will update a single source. This
// function may crash or return errors.
2021-11-19 22:00:09 +01:00
func (s *NeighborsStore) updateSource(
ctx context.Context,
src sources.Source,
srcID string,
) error {
2021-11-19 19:08:59 +01:00
// Get neighbors form source instance and update backend
2021-11-19 22:00:09 +01:00
res, err := src.Neighbors()
2021-11-19 19:08:59 +01:00
if err != nil {
return err
}
2021-11-19 22:00:09 +01:00
if err = s.backend.SetNeighbors(ctx, srcID, res.Neighbors); err != nil {
2021-11-19 19:08:59 +01:00
return err
}
2021-11-19 22:00:09 +01:00
return s.sources.RefreshSuccess(srcID)
2021-11-19 19:08:59 +01:00
}
2017-06-23 17:40:19 +02:00
2021-11-19 19:08:59 +01:00
// safeUpdateSource will try to update a source but
// will recover from a panic if something goes wrong.
// In that case, the LastError and State will be updated.
func (s *NeighborsStore) safeUpdateSource(id string) {
2021-11-19 22:00:09 +01:00
ctx := context.TODO()
if !s.sources.ShouldRefresh(id) {
2021-11-19 19:08:59 +01:00
return // Nothing to do here
}
2017-06-23 17:40:19 +02:00
2021-11-19 19:08:59 +01:00
if err := s.sources.LockSource(id); err != nil {
log.Println("Cloud not start neighbor refresh:", err)
return
}
2017-06-23 18:01:49 +02:00
2021-11-19 19:08:59 +01:00
// Apply jitter so, we do not hit everything at once.
// TODO: Make configurable
2021-11-19 22:00:09 +01:00
time.Sleep(time.Duration(rand.Intn(30)) * time.Second)
2017-06-23 17:40:19 +02:00
2021-11-19 22:00:09 +01:00
src := s.sources.GetInstance(id)
2021-11-19 19:08:59 +01:00
srcName := s.sources.GetName(id)
2018-07-13 11:41:00 +02:00
2021-11-19 19:08:59 +01:00
// Prepare for impact.
defer func() {
if err := recover(); err != nil {
log.Println(
"Recovering after failed neighbors refresh of",
srcName, "from:", err)
2021-11-19 22:00:09 +01:00
s.sources.RefreshError(id, err)
2017-06-23 17:40:19 +02:00
}
2021-11-19 19:08:59 +01:00
}()
2017-06-23 17:40:19 +02:00
2021-11-19 22:00:09 +01:00
if err := s.updateSource(ctx, src, id); err != nil {
2021-11-19 19:08:59 +01:00
log.Println(
"Refeshing neighbors of", srcName, "failed:", err)
2021-11-19 22:00:09 +01:00
s.sources.RefreshError(id, err)
2017-06-23 17:40:19 +02:00
}
status, err := s.sources.GetStatus(id)
if err != nil {
log.Println(err)
} else {
log.Println("Refreshed neighbors of", srcName, "in", status.LastRefreshDuration)
}
2021-11-19 19:08:59 +01:00
}
2018-10-01 12:02:14 +02:00
2021-11-19 19:08:59 +01:00
// Update all neighbors from all sources, where the
// sources last neighbor refresh is longer ago
// than the configured refresh period.
func (s *NeighborsStore) update() {
2022-02-09 12:02:22 +01:00
for _, id := range s.sources.GetSourceIDsForRefresh() {
2021-11-19 22:00:09 +01:00
go s.safeUpdateSource(id)
2021-11-19 19:08:59 +01:00
}
2017-06-23 17:40:19 +02:00
}
2021-12-07 19:11:11 +01:00
// CachedAt returns the time of the oldest partial
// refresh of the dataset.
func (s *NeighborsStore) CachedAt(
ctx context.Context,
) time.Time {
return s.sources.CachedAt(ctx)
}
// CacheTTL returns the TTL time
func (s *NeighborsStore) CacheTTL(
ctx context.Context,
) time.Time {
return s.sources.NextRefresh(ctx)
}
2021-10-22 22:17:04 +02:00
// GetNeighborsAt gets all neighbors from a routeserver
2021-12-06 10:03:17 +01:00
func (s *NeighborsStore) GetNeighborsAt(
ctx context.Context,
sourceID string,
) (api.Neighbors, error) {
2021-11-19 19:08:59 +01:00
if s.forceNeighborRefresh {
2021-11-19 22:00:09 +01:00
src := s.sources.GetInstance(sourceID)
if src == nil {
return nil, sources.ErrSourceNotFound
}
if err := s.updateSource(ctx, src, sourceID); err != nil {
2021-11-19 19:08:59 +01:00
return nil, err
}
}
2021-11-19 19:08:59 +01:00
return s.backend.GetNeighborsAt(ctx, sourceID)
}
// GetNeighborsMapAt looks up a neighbor on a RS by ID.
func (s *NeighborsStore) GetNeighborsMapAt(
2021-12-08 14:15:55 +01:00
ctx context.Context,
sourceID string,
) (map[string]*api.Neighbor, error) {
return s.backend.GetNeighborsMapAt(ctx, sourceID)
2017-06-23 17:40:19 +02:00
}
2021-12-03 18:17:41 +01:00
// lookupNeighborsAt filters for neighbors at a route
2021-10-22 22:17:04 +02:00
// server matching a given query string.
2021-12-03 18:17:41 +01:00
func (s *NeighborsStore) lookupNeighborsAt(
ctx context.Context,
2021-10-20 20:26:37 +00:00
sourceID string,
2017-06-29 14:24:08 +02:00
query string,
2021-11-19 22:00:09 +01:00
) (api.Neighbors, error) {
2021-10-15 21:24:24 +02:00
results := api.Neighbors{}
2021-11-19 22:00:09 +01:00
neighbors, err := s.backend.GetNeighborsAt(ctx, sourceID)
if err != nil {
return nil, err
}
asn := -1
2021-10-22 22:51:11 +02:00
if ReMatchASLookup.MatchString(query) {
groups := ReMatchASLookup.FindStringSubmatch(query)
if a, err := strconv.Atoi(groups[1]); err == nil {
asn = a
}
}
2021-10-15 21:24:24 +02:00
for _, neighbor := range neighbors {
2021-10-20 20:26:37 +00:00
if asn >= 0 && neighbor.ASN == asn { // only executed if valid AS query is detected
2021-10-15 21:24:24 +02:00
results = append(results, neighbor)
} else if ContainsCi(neighbor.Description, query) {
results = append(results, neighbor)
} else {
continue
}
}
2021-11-19 22:00:09 +01:00
return results, nil
}
2021-10-22 22:17:04 +02:00
// LookupNeighbors filters for neighbors matching a query
// on all route servers.
func (s *NeighborsStore) LookupNeighbors(
2021-12-06 10:03:17 +01:00
ctx context.Context,
query string,
2021-12-06 10:03:17 +01:00
) (api.NeighborsLookupResults, error) {
// Create empty result set
2021-10-15 21:24:24 +02:00
results := make(api.NeighborsLookupResults)
2021-11-19 22:00:09 +01:00
for _, sourceID := range s.sources.GetSourceIDs() {
2021-12-03 18:17:41 +01:00
neighbors, err := s.lookupNeighborsAt(ctx, sourceID, query)
2021-11-19 22:00:09 +01:00
if err != nil {
2021-12-06 10:03:17 +01:00
return nil, err
2021-11-19 22:00:09 +01:00
}
results[sourceID] = neighbors
}
2021-11-19 22:00:09 +01:00
return results, nil
2019-10-07 18:13:08 +02:00
}
2021-10-22 22:17:04 +02:00
// FilterNeighbors retrieves neighbors by name or by ASN
// from all route servers.
func (s *NeighborsStore) FilterNeighbors(
2021-12-06 10:03:17 +01:00
ctx context.Context,
2019-10-07 18:13:08 +02:00
filter *api.NeighborFilter,
2021-12-06 10:03:17 +01:00
) (api.Neighbors, error) {
2021-10-15 21:24:24 +02:00
results := []*api.Neighbor{}
2019-10-07 18:13:08 +02:00
// Get neighbors from all routeservers
2021-11-19 22:00:09 +01:00
for _, sourceID := range s.sources.GetSourceIDs() {
2021-12-06 10:03:17 +01:00
neighbors, err := s.backend.GetNeighborsAt(ctx, sourceID)
2021-11-19 22:00:09 +01:00
if err != nil {
2021-12-06 10:03:17 +01:00
return nil, err
}
// Apply filters
for _, neighbor := range neighbors {
if filter.Match(neighbor) {
results = append(results, neighbor)
}
2021-11-19 22:00:09 +01:00
}
2019-10-07 18:13:08 +02:00
}
2021-12-06 10:03:17 +01:00
return results, nil
2019-10-07 18:13:08 +02:00
}
2021-10-22 22:17:04 +02:00
// Stats exports some statistics for monitoring.
2021-12-06 10:03:17 +01:00
func (s *NeighborsStore) Stats(
ctx context.Context,
) *api.NeighborsStoreStats {
2021-10-15 21:24:24 +02:00
totalNeighbors := 0
2021-10-22 22:51:11 +02:00
rsStats := []api.RouteServerNeighborsStats{}
2017-06-23 17:40:19 +02:00
2021-11-19 22:00:09 +01:00
for _, sourceID := range s.sources.GetSourceIDs() {
status, _ := s.sources.GetStatus(sourceID)
ncount, err := s.backend.CountNeighborsAt(ctx, sourceID)
if err != nil {
2022-01-10 17:13:00 +01:00
if !errors.Is(err, sources.ErrSourceNotFound) {
log.Println("error during neighbor count:", err)
}
2021-11-19 22:00:09 +01:00
}
totalNeighbors += ncount
2021-10-22 22:51:11 +02:00
serverStats := api.RouteServerNeighborsStats{
2021-11-19 22:00:09 +01:00
Name: s.sources.GetName(sourceID),
State: status.State.String(),
Neighbors: ncount,
UpdatedAt: s.SourceCachedAt(sourceID),
2017-06-23 17:40:19 +02:00
}
rsStats = append(rsStats, serverStats)
}
2021-10-22 22:51:11 +02:00
storeStats := &api.NeighborsStoreStats{
2021-10-15 21:24:24 +02:00
TotalNeighbors: totalNeighbors,
2021-10-22 22:17:04 +02:00
RouteServers: rsStats,
2017-06-23 17:40:19 +02:00
}
return storeStats
2017-06-23 16:27:09 +02:00
}
2019-10-07 18:29:25 +02:00
2021-11-19 19:08:59 +01:00
// SourceCachedAt returns the last time the store content
2021-10-22 22:17:04 +02:00
// was refreshed.
2021-11-19 19:08:59 +01:00
func (s *NeighborsStore) SourceCachedAt(sourceID string) time.Time {
status, err := s.sources.GetStatus(sourceID)
if err != nil {
log.Println("error while getting source cached at:", err)
return time.Time{}
}
2021-11-19 22:00:09 +01:00
return status.LastRefresh
2019-10-07 18:29:25 +02:00
}
2021-11-19 19:08:59 +01:00
// SourceCacheTTL returns the next time when a refresh
2021-10-22 22:17:04 +02:00
// will be started.
2021-12-07 19:11:11 +01:00
func (s *NeighborsStore) SourceCacheTTL(
ctx context.Context,
sourceID string,
) time.Time {
return s.sources.NextRefresh(ctx)
2019-10-07 18:29:25 +02:00
}