diff --git a/pkg/api/store_stats.go b/pkg/api/store_stats.go index 5bc591b..852407e 100644 --- a/pkg/api/store_stats.go +++ b/pkg/api/store_stats.go @@ -8,8 +8,8 @@ import ( // RoutesStats provides number of filtered and // imported prefixes in the store type RoutesStats struct { - Filtered int `json:"filtered"` - Imported int `json:"imported"` + Filtered uint `json:"filtered"` + Imported uint `json:"imported"` } // RouteServerRoutesStats provides the number of diff --git a/pkg/store/neighbors_store.go b/pkg/store/neighbors_store.go index 691afa1..e313f3e 100644 --- a/pkg/store/neighbors_store.go +++ b/pkg/store/neighbors_store.go @@ -111,6 +111,13 @@ func (s *NeighborsStore) SourceStatus(sourceID string) (*Status, error) { return s.sources.GetStatus(sourceID) } +// IsReady retrieves the status for a route server +// and checks if it is ready. +func (s *NeighborsStore) IsReady(sourceID string) bool { + rdy, _ := s.sources.IsReady(sourceID) + return rdy +} + // updateSource will update a single source. This // function may crash or return errors. func (s *NeighborsStore) updateSource( diff --git a/pkg/store/routes_store.go b/pkg/store/routes_store.go index 47256f0..fef9a7d 100644 --- a/pkg/store/routes_store.go +++ b/pkg/store/routes_store.go @@ -9,7 +9,6 @@ import ( "github.com/alice-lg/alice-lg/pkg/api" "github.com/alice-lg/alice-lg/pkg/config" - "github.com/alice-lg/alice-lg/pkg/sources" ) // RoutesStoreBackend interface @@ -18,7 +17,7 @@ type RoutesStoreBackend interface { SetRoutes( ctx context.Context, sourceID string, - routes *api.RoutesResponse, + routes api.LookupRoutes, ) error // CountRoutesAt returns the number of imported @@ -27,7 +26,7 @@ type RoutesStoreBackend interface { CountRoutesAt( ctx context.Context, sourceID string, - ) (uint, error) + ) (uint, uint, error) // GetNeighborPrefixesAt retrieves the prefixes // announced by the neighbor at a given source @@ -42,15 +41,15 @@ type RoutesStoreBackend interface { // status and cfgs and will be queried instead // of a backend by the API type RoutesStore struct { - backend RoutesStoreBackend - sources *SourcesStore + backend RoutesStoreBackend + sources *SourcesStore neighbors *NeighborsStore } // NewRoutesStore makes a new store instance // with a cfg. func NewRoutesStore( - neighborsStore *NeighborsStore, + neighbors *NeighborsStore, cfg *config.Config, backend RoutesStoreBackend, ) *RoutesStore { @@ -67,9 +66,9 @@ func NewRoutesStore( // Store refresh information per store sources := NewSourcesStore(cfg, refreshInterval) store := &RoutesStore{ - backend: backend, - sources: sources, - neighborsStore: neighborsStore, + backend: backend, + sources: sources, + neighbors: neighbors, } return store } @@ -115,22 +114,21 @@ func (s *RoutesStore) safeUpdateSource(id string) { // TODO: Make configurable time.Sleep(time.Duration(rand.Intn(30)) * time.Second) - src := s.sources.GetInstance(id) - srcName := s.sources.GetName(id) + src := s.sources.Get(id) // Prepare for impact. defer func() { if err := recover(); err != nil { log.Println( "Recovering after failed routes refresh of", - srcName, "from:", err) + src.Name, "from:", err) s.sources.RefreshError(id, err) } }() - if err := s.updateSource(ctx, src, id); err != nil { + if err := s.updateSource(ctx, src); err != nil { log.Println( - "Refeshing routes of", srcName, "failed:", err) + "Refeshing routes of", src.Name, "failed:", err) s.sources.RefreshError(id, err) } } @@ -138,17 +136,42 @@ func (s *RoutesStore) safeUpdateSource(id string) { // Update all routes func (s *RoutesStore) updateSource( ctx context.Context, - src* config.SourceConfig, + src *config.SourceConfig, ) error { - routesRes, err := src.AllRoutes() + if err := s.awaitNeighborStore(ctx, src.ID); err != nil { + return err + } + + rs := src.GetInstance() + res, err := rs.AllRoutes() if err != nil { return err } // Prepare imported routes for lookup + imported := s.routesToLookupRoutes("imported", src, res.Imported) + filtered := s.routesToLookupRoutes("filtered", src, res.Filtered) + lookupRoutes := append(imported, filtered...) + + if err = s.backend.SetRoutes(ctx, src.ID, lookupRoutes); err != nil { + return err + } + + return s.sources.RefreshSuccess(src.ID) +} + +func (s *RoutesStore) routesToLookupRoutes( + state string, + src *config.SourceConfig, + routes api.Routes, +) api.LookupRoutes { lookupRoutes := make(api.LookupRoutes, 0, len(routes)) for _, route := range routes { - neighbor := nStore.GetNeighborAt(source.ID, route.NeighborID) + neighbor, err := s.neighbors.GetNeighborAt(src.ID, route.NeighborID) + if err != nil { + log.Println("prepare route, neighbor lookup failed:", err) + continue + } lr := &api.LookupRoute{ Route: route, State: state, @@ -160,25 +183,33 @@ func (s *RoutesStore) updateSource( } lookupRoutes = append(lookupRoutes, lr) } - - if err = s.backend.SetRoutes(ctx, src.ID, lookupRoutes); err != nil { - return err - } - - return s.sources.RefreshSuccess(src.ID) + return lookupRoutes } -func (s* RoutesStore) routesToLookupRoutes( - src sources.Source, - routes api.Routes, - +func (s *RoutesStore) awaitNeighborStore( + ctx context.Context, + srcID string, +) error { + // Poll the neighbor store state for the sourceID + // until the context is not longer valid + for { + err := ctx.Err() + if err != nil { + return err + } + if s.neighbors.IsReady(srcID) { + return nil + } + time.Sleep(100 * time.Millisecond) + } +} // Stats calculates some store insights func (s *RoutesStore) Stats() *api.RoutesStoreStats { ctx := context.TODO() - totalImported := 0 - totalFiltered := 0 + totalImported := uint(0) + totalFiltered := uint(0) rsStats := []api.RouteServerRoutesStats{} @@ -189,7 +220,9 @@ func (s *RoutesStore) Stats() *api.RoutesStoreStats { continue } - nImported, nFiltered, err := s.backend.CountRoutes(ctx, sourceID) + src := s.sources.Get(sourceID) + + nImported, nFiltered, err := s.backend.CountRoutesAt(ctx, sourceID) if err != nil { log.Println("error during routes count:", err) } @@ -198,7 +231,7 @@ func (s *RoutesStore) Stats() *api.RoutesStoreStats { totalFiltered += nFiltered serverStats := api.RouteServerRoutesStats{ - Name: s.cfgMap[sourceID].Name, + Name: src.Name, Routes: api.RoutesStats{ Imported: nImported, Filtered: nFiltered, @@ -243,7 +276,7 @@ func routeToLookupRoute( route *api.Route, ) *api.LookupRoute { // Get neighbor and make route - neighbor := nStore.GetNeighborAt(source.ID, route.NeighborID) + neighbor, _ := nStore.GetNeighborAt(source.ID, route.NeighborID) lookup := &api.LookupRoute{ Route: route, State: state, @@ -282,13 +315,13 @@ func (s *RoutesStore) LookupPrefixAt( s.RUnlock() filtered := filterRoutesByPrefix( - s.neighborsStore, + s.neighbors, cfg, routes.Filtered, prefix, "filtered") imported := filterRoutesByPrefix( - s.neighborsStore, + s.neighbors, cfg, routes.Imported, prefix, diff --git a/pkg/store/sources_store.go b/pkg/store/sources_store.go index e807869..4bf7337 100644 --- a/pkg/store/sources_store.go +++ b/pkg/store/sources_store.go @@ -86,6 +86,18 @@ func (s *SourcesStore) GetStatus(sourceID string) (*Status, error) { return s.getStatus(sourceID) } +// IsReady will retrieve the status of the source +// and check if the state is ready. +func (s *SourcesStore) IsReady(sourceID string) (bool, error) { + s.Lock() + defer s.Unlock() + status, err := getStatus(sourceID) + if err != nil { + return false, err + } + return status.State == StateReady +} + // Internal getStatus func (s *SourcesStore) getStatus(sourceID string) (*Status, error) { status, ok := s.status[sourceID] @@ -145,6 +157,13 @@ func (s *SourcesStore) GetName(sourceID string) string { return s.sources[sourceID].Name } +// Get retrieves the source +func (s *SourcesStore) Get(sourceID string) *config.SourceConfig { + s.Lock() + defer s.Unlock() + return s.sources[sourceID] +} + // GetSourceIDs returns a list of registered source ids. func (s *SourcesStore) GetSourceIDs() []string { s.Lock()