commit
This commit is contained in:
parent
db0583d581
commit
3b25d472f5
@ -8,8 +8,8 @@ import (
|
||||
// RoutesStats provides number of filtered and
|
||||
// imported prefixes in the store
|
||||
type RoutesStats struct {
|
||||
Filtered int `json:"filtered"`
|
||||
Imported int `json:"imported"`
|
||||
Filtered uint `json:"filtered"`
|
||||
Imported uint `json:"imported"`
|
||||
}
|
||||
|
||||
// RouteServerRoutesStats provides the number of
|
||||
|
@ -111,6 +111,13 @@ func (s *NeighborsStore) SourceStatus(sourceID string) (*Status, error) {
|
||||
return s.sources.GetStatus(sourceID)
|
||||
}
|
||||
|
||||
// IsReady retrieves the status for a route server
|
||||
// and checks if it is ready.
|
||||
func (s *NeighborsStore) IsReady(sourceID string) bool {
|
||||
rdy, _ := s.sources.IsReady(sourceID)
|
||||
return rdy
|
||||
}
|
||||
|
||||
// updateSource will update a single source. This
|
||||
// function may crash or return errors.
|
||||
func (s *NeighborsStore) updateSource(
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
|
||||
"github.com/alice-lg/alice-lg/pkg/api"
|
||||
"github.com/alice-lg/alice-lg/pkg/config"
|
||||
"github.com/alice-lg/alice-lg/pkg/sources"
|
||||
)
|
||||
|
||||
// RoutesStoreBackend interface
|
||||
@ -18,7 +17,7 @@ type RoutesStoreBackend interface {
|
||||
SetRoutes(
|
||||
ctx context.Context,
|
||||
sourceID string,
|
||||
routes *api.RoutesResponse,
|
||||
routes api.LookupRoutes,
|
||||
) error
|
||||
|
||||
// CountRoutesAt returns the number of imported
|
||||
@ -27,7 +26,7 @@ type RoutesStoreBackend interface {
|
||||
CountRoutesAt(
|
||||
ctx context.Context,
|
||||
sourceID string,
|
||||
) (uint, error)
|
||||
) (uint, uint, error)
|
||||
|
||||
// GetNeighborPrefixesAt retrieves the prefixes
|
||||
// announced by the neighbor at a given source
|
||||
@ -42,15 +41,15 @@ type RoutesStoreBackend interface {
|
||||
// status and cfgs and will be queried instead
|
||||
// of a backend by the API
|
||||
type RoutesStore struct {
|
||||
backend RoutesStoreBackend
|
||||
sources *SourcesStore
|
||||
backend RoutesStoreBackend
|
||||
sources *SourcesStore
|
||||
neighbors *NeighborsStore
|
||||
}
|
||||
|
||||
// NewRoutesStore makes a new store instance
|
||||
// with a cfg.
|
||||
func NewRoutesStore(
|
||||
neighborsStore *NeighborsStore,
|
||||
neighbors *NeighborsStore,
|
||||
cfg *config.Config,
|
||||
backend RoutesStoreBackend,
|
||||
) *RoutesStore {
|
||||
@ -67,9 +66,9 @@ func NewRoutesStore(
|
||||
// Store refresh information per store
|
||||
sources := NewSourcesStore(cfg, refreshInterval)
|
||||
store := &RoutesStore{
|
||||
backend: backend,
|
||||
sources: sources,
|
||||
neighborsStore: neighborsStore,
|
||||
backend: backend,
|
||||
sources: sources,
|
||||
neighbors: neighbors,
|
||||
}
|
||||
return store
|
||||
}
|
||||
@ -115,22 +114,21 @@ func (s *RoutesStore) safeUpdateSource(id string) {
|
||||
// TODO: Make configurable
|
||||
time.Sleep(time.Duration(rand.Intn(30)) * time.Second)
|
||||
|
||||
src := s.sources.GetInstance(id)
|
||||
srcName := s.sources.GetName(id)
|
||||
src := s.sources.Get(id)
|
||||
|
||||
// Prepare for impact.
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
log.Println(
|
||||
"Recovering after failed routes refresh of",
|
||||
srcName, "from:", err)
|
||||
src.Name, "from:", err)
|
||||
s.sources.RefreshError(id, err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := s.updateSource(ctx, src, id); err != nil {
|
||||
if err := s.updateSource(ctx, src); err != nil {
|
||||
log.Println(
|
||||
"Refeshing routes of", srcName, "failed:", err)
|
||||
"Refeshing routes of", src.Name, "failed:", err)
|
||||
s.sources.RefreshError(id, err)
|
||||
}
|
||||
}
|
||||
@ -138,17 +136,42 @@ func (s *RoutesStore) safeUpdateSource(id string) {
|
||||
// Update all routes
|
||||
func (s *RoutesStore) updateSource(
|
||||
ctx context.Context,
|
||||
src* config.SourceConfig,
|
||||
src *config.SourceConfig,
|
||||
) error {
|
||||
routesRes, err := src.AllRoutes()
|
||||
if err := s.awaitNeighborStore(ctx, src.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rs := src.GetInstance()
|
||||
res, err := rs.AllRoutes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Prepare imported routes for lookup
|
||||
imported := s.routesToLookupRoutes("imported", src, res.Imported)
|
||||
filtered := s.routesToLookupRoutes("filtered", src, res.Filtered)
|
||||
lookupRoutes := append(imported, filtered...)
|
||||
|
||||
if err = s.backend.SetRoutes(ctx, src.ID, lookupRoutes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.sources.RefreshSuccess(src.ID)
|
||||
}
|
||||
|
||||
func (s *RoutesStore) routesToLookupRoutes(
|
||||
state string,
|
||||
src *config.SourceConfig,
|
||||
routes api.Routes,
|
||||
) api.LookupRoutes {
|
||||
lookupRoutes := make(api.LookupRoutes, 0, len(routes))
|
||||
for _, route := range routes {
|
||||
neighbor := nStore.GetNeighborAt(source.ID, route.NeighborID)
|
||||
neighbor, err := s.neighbors.GetNeighborAt(src.ID, route.NeighborID)
|
||||
if err != nil {
|
||||
log.Println("prepare route, neighbor lookup failed:", err)
|
||||
continue
|
||||
}
|
||||
lr := &api.LookupRoute{
|
||||
Route: route,
|
||||
State: state,
|
||||
@ -160,25 +183,33 @@ func (s *RoutesStore) updateSource(
|
||||
}
|
||||
lookupRoutes = append(lookupRoutes, lr)
|
||||
}
|
||||
|
||||
if err = s.backend.SetRoutes(ctx, src.ID, lookupRoutes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.sources.RefreshSuccess(src.ID)
|
||||
return lookupRoutes
|
||||
}
|
||||
|
||||
func (s* RoutesStore) routesToLookupRoutes(
|
||||
src sources.Source,
|
||||
routes api.Routes,
|
||||
|
||||
func (s *RoutesStore) awaitNeighborStore(
|
||||
ctx context.Context,
|
||||
srcID string,
|
||||
) error {
|
||||
// Poll the neighbor store state for the sourceID
|
||||
// until the context is not longer valid
|
||||
for {
|
||||
err := ctx.Err()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if s.neighbors.IsReady(srcID) {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// Stats calculates some store insights
|
||||
func (s *RoutesStore) Stats() *api.RoutesStoreStats {
|
||||
ctx := context.TODO()
|
||||
|
||||
totalImported := 0
|
||||
totalFiltered := 0
|
||||
totalImported := uint(0)
|
||||
totalFiltered := uint(0)
|
||||
|
||||
rsStats := []api.RouteServerRoutesStats{}
|
||||
|
||||
@ -189,7 +220,9 @@ func (s *RoutesStore) Stats() *api.RoutesStoreStats {
|
||||
continue
|
||||
}
|
||||
|
||||
nImported, nFiltered, err := s.backend.CountRoutes(ctx, sourceID)
|
||||
src := s.sources.Get(sourceID)
|
||||
|
||||
nImported, nFiltered, err := s.backend.CountRoutesAt(ctx, sourceID)
|
||||
if err != nil {
|
||||
log.Println("error during routes count:", err)
|
||||
}
|
||||
@ -198,7 +231,7 @@ func (s *RoutesStore) Stats() *api.RoutesStoreStats {
|
||||
totalFiltered += nFiltered
|
||||
|
||||
serverStats := api.RouteServerRoutesStats{
|
||||
Name: s.cfgMap[sourceID].Name,
|
||||
Name: src.Name,
|
||||
Routes: api.RoutesStats{
|
||||
Imported: nImported,
|
||||
Filtered: nFiltered,
|
||||
@ -243,7 +276,7 @@ func routeToLookupRoute(
|
||||
route *api.Route,
|
||||
) *api.LookupRoute {
|
||||
// Get neighbor and make route
|
||||
neighbor := nStore.GetNeighborAt(source.ID, route.NeighborID)
|
||||
neighbor, _ := nStore.GetNeighborAt(source.ID, route.NeighborID)
|
||||
lookup := &api.LookupRoute{
|
||||
Route: route,
|
||||
State: state,
|
||||
@ -282,13 +315,13 @@ func (s *RoutesStore) LookupPrefixAt(
|
||||
s.RUnlock()
|
||||
|
||||
filtered := filterRoutesByPrefix(
|
||||
s.neighborsStore,
|
||||
s.neighbors,
|
||||
cfg,
|
||||
routes.Filtered,
|
||||
prefix,
|
||||
"filtered")
|
||||
imported := filterRoutesByPrefix(
|
||||
s.neighborsStore,
|
||||
s.neighbors,
|
||||
cfg,
|
||||
routes.Imported,
|
||||
prefix,
|
||||
|
@ -86,6 +86,18 @@ func (s *SourcesStore) GetStatus(sourceID string) (*Status, error) {
|
||||
return s.getStatus(sourceID)
|
||||
}
|
||||
|
||||
// IsReady will retrieve the status of the source
|
||||
// and check if the state is ready.
|
||||
func (s *SourcesStore) IsReady(sourceID string) (bool, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
status, err := getStatus(sourceID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return status.State == StateReady
|
||||
}
|
||||
|
||||
// Internal getStatus
|
||||
func (s *SourcesStore) getStatus(sourceID string) (*Status, error) {
|
||||
status, ok := s.status[sourceID]
|
||||
@ -145,6 +157,13 @@ func (s *SourcesStore) GetName(sourceID string) string {
|
||||
return s.sources[sourceID].Name
|
||||
}
|
||||
|
||||
// Get retrieves the source
|
||||
func (s *SourcesStore) Get(sourceID string) *config.SourceConfig {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
return s.sources[sourceID]
|
||||
}
|
||||
|
||||
// GetSourceIDs returns a list of registered source ids.
|
||||
func (s *SourcesStore) GetSourceIDs() []string {
|
||||
s.Lock()
|
||||
|
Loading…
x
Reference in New Issue
Block a user