alice-lg/pkg/store/routes_store.go
2024-01-26 14:19:18 +01:00

362 lines
9.0 KiB
Go

package store
import (
"context"
"errors"
"log"
"math/rand"
"time"
"github.com/alice-lg/alice-lg/pkg/api"
"github.com/alice-lg/alice-lg/pkg/config"
"github.com/alice-lg/alice-lg/pkg/pools"
"github.com/alice-lg/alice-lg/pkg/sources"
)
// newNeighborQuery creates a new NeighborQuery
func newNeighborQuery(neighborID string, sourceID string) *api.NeighborQuery {
ptrNeighborID := pools.Neighbors.Get(neighborID)
if ptrNeighborID == nil {
return nil
}
ptrSourceID := pools.RouteServers.Get(sourceID)
if ptrSourceID == nil {
return nil
}
return &api.NeighborQuery{
NeighborID: ptrNeighborID,
SourceID: ptrSourceID,
}
}
// RoutesStoreBackend interface
type RoutesStoreBackend interface {
// SetRoutes updates the routes in the store after a refresh.
SetRoutes(
ctx context.Context,
sourceID string,
routes api.LookupRoutes,
) error
// CountRoutesAt returns the number of imported
// and filtered routes for a given route server.
// Example: (imported, filtered, error)
CountRoutesAt(
ctx context.Context,
sourceID string,
) (uint, uint, error)
// FindByNeighbors retrieves the prefixes
// announced by the neighbor at a given source
FindByNeighbors(
ctx context.Context,
neighbors []*api.NeighborQuery,
filters *api.SearchFilters,
) (api.LookupRoutes, error)
// FindByPrefix
FindByPrefix(
ctx context.Context,
prefix string,
filters *api.SearchFilters,
limit uint,
) (api.LookupRoutes, error)
}
// The RoutesStore holds a mapping of routes,
// status and cfgs and will be queried instead
// of a backend by the API
type RoutesStore struct {
backend RoutesStoreBackend
sources *SourcesStore
neighbors *NeighborsStore
limit uint
}
// NewRoutesStore makes a new store instance
// with a cfg.
func NewRoutesStore(
neighbors *NeighborsStore,
cfg *config.Config,
backend RoutesStoreBackend,
) *RoutesStore {
// Set refresh interval as duration, fall back to
// five minutes if no interval is set.
refreshInterval := time.Duration(
cfg.Server.RoutesStoreRefreshInterval) * time.Minute
if refreshInterval == 0 {
refreshInterval = time.Duration(5) * time.Minute
}
refreshParallelism := cfg.Server.RoutesStoreRefreshParallelism
if refreshParallelism <= 0 {
refreshParallelism = 1
}
log.Println("Routes refresh interval set to:", refreshInterval)
log.Println("Routes refresh parallelism:", refreshParallelism)
log.Println("Routes store query limit:", cfg.Server.RoutesStoreQueryLimit)
// Store refresh information per store
sources := NewSourcesStore(cfg, refreshInterval, refreshParallelism)
store := &RoutesStore{
backend: backend,
sources: sources,
neighbors: neighbors,
limit: cfg.Server.RoutesStoreQueryLimit,
}
return store
}
// Start starts the routes store
func (s *RoutesStore) Start(ctx context.Context) {
log.Println("Starting local routes store")
// Periodically trigger updates
for {
if err := ctx.Err(); err != nil {
return // context is done
}
s.update(ctx)
time.Sleep(time.Second)
}
}
// Update all routes from all sources, where the
// sources last refresh is longer ago than the configured
// refresh period. This is totally the same as the
// NeighborsStore.update and maybe these functions can be merged (TODO)
func (s *RoutesStore) update(ctx context.Context) {
for _, id := range s.sources.GetSourceIDsForRefresh() {
go s.safeUpdateSource(ctx, id)
}
}
// safeUpdateSource will try to update a source but
// will recover from a panic if something goes wrong.
// In that case, the LastError and State will be updated.
// Again. The similarity to the NeighborsStore is really sus.
func (s *RoutesStore) safeUpdateSource(ctx context.Context, id string) {
if !s.sources.ShouldRefresh(id) {
return // Nothing to do here
}
if err := s.sources.LockSource(id); err != nil {
log.Println("Cloud not start routes refresh:", err)
return
}
// Apply jitter so, we do not hit everything at once.
// TODO: Make configurable
time.Sleep(time.Duration(rand.Intn(30)) * time.Second)
src := s.sources.Get(id)
srcName := s.sources.GetName(id)
log.Println("[routes store] begin routes refresh of:", srcName)
// Prepare for impact.
defer func() {
if err := recover(); err != nil {
log.Println(
"Recovering after failed routes refresh of",
src.Name, "from:", err)
s.sources.RefreshError(id, err)
}
}()
if err := s.updateSource(ctx, src); err != nil {
log.Println(
"Refeshing routes of", src.Name, "failed:", err)
s.sources.RefreshError(id, err)
} else {
status, err := s.sources.GetStatus(id)
if err != nil {
log.Println(err)
} else {
log.Println("Refreshed routes of", srcName, "in", status.LastRefreshDuration)
}
}
}
// Update all routes
func (s *RoutesStore) updateSource(
ctx context.Context,
src *config.SourceConfig,
) error {
if err := s.awaitNeighborStore(ctx, src.ID); err != nil {
return err
}
rs := src.GetInstance()
res, err := rs.AllRoutes(ctx)
if err != nil {
return err
}
log.Println("[routes store] finished fetching routes dump from RS", src.Name)
neighbors, err := s.neighbors.GetNeighborsMapAt(ctx, src.ID)
if err != nil {
return err
}
log.Println(
"[routes store] retrieved", len(res.Imported),
"accepted and", len(res.Filtered), "filtered routes for:", src.Name)
// Prepare imported routes for lookup
srcRS := &api.LookupRouteServer{
ID: pools.RouteServers.Acquire(src.ID),
Name: src.Name,
}
imported := res.Imported.ToLookupRoutes("imported", srcRS, neighbors)
filtered := res.Filtered.ToLookupRoutes("filtered", srcRS, neighbors)
lookupRoutes := append(imported, filtered...)
log.Println("[routes store] importing", len(lookupRoutes), "into store from", src.Name)
if err = s.backend.SetRoutes(ctx, src.ID, lookupRoutes); err != nil {
return err
}
log.Println("[routes store] import success")
return s.sources.RefreshSuccess(src.ID)
}
// awaitNeighborStore polls the neighbor store state
// for the sourceID until the context is not longer valid.
func (s *RoutesStore) awaitNeighborStore(
ctx context.Context,
srcID string,
) error {
for {
if err := ctx.Err(); err != nil {
return err
}
if s.neighbors.IsInitialized(srcID) {
return nil
}
time.Sleep(100 * time.Millisecond)
}
}
// Status returns the store status meta
func (s *RoutesStore) Status(ctx context.Context) *api.StoreStatus {
initialized := true
sources := s.sources.GetSourcesStatus()
status := make(map[string]*api.SourceStatus)
for _, s := range sources {
if !s.Initialized {
initialized = false
}
status[s.SourceID] = &api.SourceStatus{
RefreshInterval: s.RefreshInterval,
LastRefresh: s.LastRefresh,
State: s.State.String(),
Initialized: s.Initialized,
}
}
meta := &api.StoreStatus{
Initialized: initialized,
Sources: status,
}
return meta
}
// Stats calculates some store insights
func (s *RoutesStore) Stats(ctx context.Context) *api.RoutesStoreStats {
totalImported := uint(0)
totalFiltered := uint(0)
rsStats := []api.RouteServerRoutesStats{}
for _, sourceID := range s.sources.GetSourceIDs() {
status, err := s.sources.GetStatus(sourceID)
if err != nil {
log.Println("error while getting source status:", err)
continue
}
src := s.sources.Get(sourceID)
nImported, nFiltered, err := s.backend.CountRoutesAt(ctx, sourceID)
if err != nil {
if !errors.Is(err, sources.ErrSourceNotFound) {
log.Println("error during routes count:", err)
}
}
totalImported += nImported
totalFiltered += nFiltered
serverStats := api.RouteServerRoutesStats{
Name: src.Name,
Routes: api.RoutesStats{
Imported: nImported,
Filtered: nFiltered,
},
State: status.State.String(),
UpdatedAt: status.LastRefresh,
}
rsStats = append(rsStats, serverStats)
}
// Make stats
storeStats := &api.RoutesStoreStats{
TotalRoutes: api.RoutesStats{
Imported: totalImported,
Filtered: totalFiltered,
},
RouteServers: rsStats,
}
return storeStats
}
// CachedAt returns the time of the oldest partial
// refresh of the dataset.
func (s *RoutesStore) CachedAt(
ctx context.Context,
) time.Time {
return s.sources.CachedAt(ctx)
}
// CacheTTL returns the TTL time
func (s *RoutesStore) CacheTTL(
ctx context.Context,
) time.Time {
return s.sources.NextRefresh(ctx)
}
// LookupPrefix performs a lookup over all route servers
func (s *RoutesStore) LookupPrefix(
ctx context.Context,
prefix string,
filters *api.SearchFilters,
) (api.LookupRoutes, error) {
return s.backend.FindByPrefix(ctx, prefix, filters, s.limit)
}
// LookupPrefixForNeighbors returns all routes for
// a set of neighbors.
func (s *RoutesStore) LookupPrefixForNeighbors(
ctx context.Context,
neighbors api.NeighborsLookupResults,
filters *api.SearchFilters,
) (api.LookupRoutes, error) {
query := make([]*api.NeighborQuery, 0, len(neighbors))
for sourceID, sourceNeighbors := range neighbors {
for _, neighbor := range sourceNeighbors {
q := newNeighborQuery(neighbor.ID, sourceID)
if q == nil {
continue
}
query = append(query, q)
}
}
return s.backend.FindByNeighbors(ctx, query, filters)
}