package store import ( "context" "errors" "log" "math/rand" "regexp" "strconv" "time" "github.com/alice-lg/alice-lg/pkg/api" "github.com/alice-lg/alice-lg/pkg/config" "github.com/alice-lg/alice-lg/pkg/sources" ) // ReMatchASLookup matches lookups with an 'AS' prefix var ReMatchASLookup = regexp.MustCompile(`(?i)^AS(\d+)`) // NeighborsStoreBackend interface type NeighborsStoreBackend interface { // SetNeighbors replaces all neighbors for a given // route server identified by sourceID. SetNeighbors( ctx context.Context, sourceID string, neighbors api.Neighbors, ) error // GetNeighborsAt retrieves all neighbors associated // with a route server (source). GetNeighborsAt( ctx context.Context, sourceID string, ) (api.Neighbors, error) // GetNeighborsMapAt retrieve a map of neighbor ids // to the neighbor for a given route server for quick // consecutive lookup. GetNeighborsMapAt( ctx context.Context, sourceID string, ) (map[string]*api.Neighbor, error) // CountNeighborsAt retrieves the current number of // stored neighbors. CountNeighborsAt( ctx context.Context, sourceID string, ) (int, error) } // NeighborsStore is queryable for neighbor information type NeighborsStore struct { backend NeighborsStoreBackend sources *SourcesStore forceNeighborRefresh bool } // NewNeighborsStore creates a new store for neighbors func NewNeighborsStore( cfg *config.Config, backend NeighborsStoreBackend, ) *NeighborsStore { // Set refresh interval, default to 5 minutes when // interval is set to 0 refreshInterval := time.Duration( cfg.Server.NeighborsStoreRefreshInterval) * time.Minute if refreshInterval == 0 { refreshInterval = time.Duration(5) * time.Minute } refreshParallelism := cfg.Server.NeighborsStoreRefreshParallelism if refreshParallelism <= 0 { refreshParallelism = 1 } log.Println("Neighbors refresh interval set to:", refreshInterval) log.Println("Neighbors refresh parallelism:", refreshParallelism) // Store refresh information per store sources := NewSourcesStore(cfg, refreshInterval, refreshParallelism) // Neighbors will be refreshed on every GetNeighborsAt // invocation. Why? I (Annika) don't know. I have to ask Patrick. // TODO: This feels wrong here. Figure out reason why it // was added and refactor. // At least now the variable name is a bit more honest. forceNeighborRefresh := cfg.Server.EnableNeighborsStatusRefresh store := &NeighborsStore{ backend: backend, sources: sources, forceNeighborRefresh: forceNeighborRefresh, } return store } // Start the store's housekeeping. func (s *NeighborsStore) Start(ctx context.Context) { log.Println("Starting local neighbors store") for { if err := ctx.Err(); err != nil { return // Context invalid } s.update(ctx) time.Sleep(time.Second) } } // GetStatus retrievs the status for a route server // identified by sourceID. func (s *NeighborsStore) GetStatus(sourceID string) (*Status, error) { return s.sources.GetStatus(sourceID) } // IsInitialized retrieves the status for a route server // and checks if it is ready. func (s *NeighborsStore) IsInitialized(sourceID string) bool { rdy, _ := s.sources.IsInitialized(sourceID) return rdy } // updateSource will update a single source. This // function may crash or return errors. func (s *NeighborsStore) updateSource( ctx context.Context, src sources.Source, srcID string, ) error { // Get neighbors form source instance and update backend res, err := src.Neighbors(ctx) if err != nil { return err } if err = s.backend.SetNeighbors(ctx, srcID, res.Neighbors); err != nil { return err } return s.sources.RefreshSuccess(srcID) } // safeUpdateSource will try to update a source but // will recover from a panic if something goes wrong. // In that case, the LastError and State will be updated. func (s *NeighborsStore) safeUpdateSource(ctx context.Context, id string) { if !s.sources.ShouldRefresh(id) { return // Nothing to do here } if err := s.sources.LockSource(id); err != nil { log.Println("Cloud not start neighbor refresh:", err) return } // Apply jitter so, we do not hit everything at once. // TODO: Make configurable time.Sleep(time.Duration(rand.Intn(30)) * time.Second) src := s.sources.GetInstance(id) srcName := s.sources.GetName(id) // Prepare for impact. defer func() { if err := recover(); err != nil { log.Println( "Recovering after failed neighbors refresh of", srcName, "from:", err) s.sources.RefreshError(id, err) } }() if err := s.updateSource(ctx, src, id); err != nil { log.Println( "Refeshing neighbors of", srcName, "failed:", err) s.sources.RefreshError(id, err) } status, err := s.sources.GetStatus(id) if err != nil { log.Println(err) } else { log.Println("Refreshed neighbors of", srcName, "in", status.LastRefreshDuration) } } // Update all neighbors from all sources, where the // sources last neighbor refresh is longer ago // than the configured refresh period. func (s *NeighborsStore) update(ctx context.Context) { for _, id := range s.sources.GetSourceIDsForRefresh() { go s.safeUpdateSource(ctx, id) } } // CachedAt returns the time of the oldest partial // refresh of the dataset. func (s *NeighborsStore) CachedAt( ctx context.Context, ) time.Time { return s.sources.CachedAt(ctx) } // CacheTTL returns the TTL time func (s *NeighborsStore) CacheTTL( ctx context.Context, ) time.Time { return s.sources.NextRefresh(ctx) } // GetNeighborsAt gets all neighbors from a routeserver func (s *NeighborsStore) GetNeighborsAt( ctx context.Context, sourceID string, ) (api.Neighbors, error) { if !s.IsInitialized(sourceID) { return nil, ErrSourceNotInitialized } if s.forceNeighborRefresh { src := s.sources.GetInstance(sourceID) if src == nil { return nil, sources.ErrSourceNotFound } if err := s.updateSource(ctx, src, sourceID); err != nil { return nil, err } } return s.backend.GetNeighborsAt(ctx, sourceID) } // GetNeighborsMapAt looks up a neighbor on a RS by ID. func (s *NeighborsStore) GetNeighborsMapAt( ctx context.Context, sourceID string, ) (map[string]*api.Neighbor, error) { if !s.IsInitialized(sourceID) { return nil, ErrSourceNotInitialized } return s.backend.GetNeighborsMapAt(ctx, sourceID) } // lookupNeighborsAt filters for neighbors at a route // server matching a given query string. func (s *NeighborsStore) lookupNeighborsAt( ctx context.Context, sourceID string, query string, ) (api.Neighbors, error) { neighbors, err := s.backend.GetNeighborsAt(ctx, sourceID) if err != nil { return nil, err } asn := -1 if ReMatchASLookup.MatchString(query) { groups := ReMatchASLookup.FindStringSubmatch(query) if a, err := strconv.Atoi(groups[1]); err == nil { asn = a } } results := api.Neighbors{} for _, neighbor := range neighbors { if asn >= 0 && neighbor.ASN == asn { // only executed if valid AS query is detected results = append(results, neighbor) } else if ContainsCi(neighbor.Description, query) { results = append(results, neighbor) } else { continue } } return results, nil } // LookupNeighbors filters for neighbors matching a query // on all route servers. func (s *NeighborsStore) LookupNeighbors( ctx context.Context, query string, ) (api.NeighborsLookupResults, error) { // Create empty result set results := make(api.NeighborsLookupResults) for _, sourceID := range s.sources.GetSourceIDs() { neighbors, err := s.lookupNeighborsAt(ctx, sourceID, query) if errors.Is(err, sources.ErrSourceNotFound) { continue // Skip neighbors from this source for now } else if err != nil { return nil, err } results[sourceID] = neighbors } return results, nil } // FilterNeighbors retrieves neighbors by name or by ASN // from all route servers. func (s *NeighborsStore) FilterNeighbors( ctx context.Context, filter *api.NeighborFilter, ) (api.Neighbors, error) { results := []*api.Neighbor{} // Get neighbors from all routeservers for _, sourceID := range s.sources.GetSourceIDs() { neighbors, err := s.backend.GetNeighborsAt(ctx, sourceID) if errors.Is(err, sources.ErrSourceNotFound) { continue // Skip neighbors from this source for now } else if err != nil { return nil, err } // Apply filters for _, neighbor := range neighbors { if filter.Match(neighbor) { results = append(results, neighbor) } } } return results, nil } // Stats exports some statistics for monitoring. func (s *NeighborsStore) Stats( ctx context.Context, ) *api.NeighborsStoreStats { totalNeighbors := 0 rsStats := []api.RouteServerNeighborsStats{} for _, sourceID := range s.sources.GetSourceIDs() { status, _ := s.sources.GetStatus(sourceID) ncount, err := s.backend.CountNeighborsAt(ctx, sourceID) if err != nil { if !errors.Is(err, sources.ErrSourceNotFound) { log.Println("error during neighbor count:", err) } } totalNeighbors += ncount serverStats := api.RouteServerNeighborsStats{ Name: s.sources.GetName(sourceID), State: status.State.String(), Neighbors: ncount, UpdatedAt: s.SourceCachedAt(sourceID), } rsStats = append(rsStats, serverStats) } storeStats := &api.NeighborsStoreStats{ TotalNeighbors: totalNeighbors, RouteServers: rsStats, } return storeStats } // Status returns the stores current status func (s *NeighborsStore) Status(ctx context.Context) *api.StoreStatus { initialized := true sources := s.sources.GetSourcesStatus() status := make(map[string]*api.SourceStatus) for _, s := range sources { if !s.Initialized { initialized = false } status[s.SourceID] = &api.SourceStatus{ RefreshInterval: s.RefreshInterval, LastRefresh: s.LastRefresh, State: s.State.String(), Initialized: s.Initialized, } } meta := &api.StoreStatus{ Initialized: initialized, Sources: status, } return meta } // SourceCachedAt returns the last time the store content // was refreshed. func (s *NeighborsStore) SourceCachedAt(sourceID string) time.Time { status, err := s.sources.GetStatus(sourceID) if err != nil { log.Println("error while getting source cached at:", err) return time.Time{} } return status.LastRefresh } // SourceCacheTTL returns the next time when a refresh // will be started. func (s *NeighborsStore) SourceCacheTTL( ctx context.Context, sourceID string, ) time.Time { return s.sources.NextRefresh(ctx) }