398 lines
10 KiB
Go
398 lines
10 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log"
|
|
"math/rand"
|
|
"regexp"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/alice-lg/alice-lg/pkg/api"
|
|
"github.com/alice-lg/alice-lg/pkg/config"
|
|
"github.com/alice-lg/alice-lg/pkg/sources"
|
|
)
|
|
|
|
// ReMatchASLookup matches lookups with an 'AS' prefix
|
|
var ReMatchASLookup = regexp.MustCompile(`(?i)^AS(\d+)`)
|
|
|
|
// NeighborsStoreBackend interface
|
|
type NeighborsStoreBackend interface {
|
|
// SetNeighbors replaces all neighbors for a given
|
|
// route server identified by sourceID.
|
|
SetNeighbors(
|
|
ctx context.Context,
|
|
sourceID string,
|
|
neighbors api.Neighbors,
|
|
) error
|
|
|
|
// GetNeighborsAt retrieves all neighbors associated
|
|
// with a route server (source).
|
|
GetNeighborsAt(
|
|
ctx context.Context,
|
|
sourceID string,
|
|
) (api.Neighbors, error)
|
|
|
|
// GetNeighborsMapAt retrieve a map of neighbor ids
|
|
// to the neighbor for a given route server for quick
|
|
// consecutive lookup.
|
|
GetNeighborsMapAt(
|
|
ctx context.Context,
|
|
sourceID string,
|
|
) (map[string]*api.Neighbor, error)
|
|
|
|
// CountNeighborsAt retrieves the current number of
|
|
// stored neighbors.
|
|
CountNeighborsAt(
|
|
ctx context.Context,
|
|
sourceID string,
|
|
) (int, error)
|
|
}
|
|
|
|
// NeighborsStore is queryable for neighbor information
|
|
type NeighborsStore struct {
|
|
backend NeighborsStoreBackend
|
|
sources *SourcesStore
|
|
|
|
forceNeighborRefresh bool
|
|
}
|
|
|
|
// NewNeighborsStore creates a new store for neighbors
|
|
func NewNeighborsStore(
|
|
cfg *config.Config,
|
|
backend NeighborsStoreBackend,
|
|
) *NeighborsStore {
|
|
// Set refresh interval, default to 5 minutes when
|
|
// interval is set to 0
|
|
refreshInterval := time.Duration(
|
|
cfg.Server.NeighborsStoreRefreshInterval) * time.Minute
|
|
if refreshInterval == 0 {
|
|
refreshInterval = time.Duration(5) * time.Minute
|
|
}
|
|
refreshParallelism := cfg.Server.NeighborsStoreRefreshParallelism
|
|
if refreshParallelism <= 0 {
|
|
refreshParallelism = 1
|
|
}
|
|
|
|
log.Println("Neighbors refresh interval set to:", refreshInterval)
|
|
log.Println("Neighbors refresh parallelism:", refreshParallelism)
|
|
|
|
// Store refresh information per store
|
|
sources := NewSourcesStore(cfg, refreshInterval, refreshParallelism)
|
|
|
|
// Neighbors will be refreshed on every GetNeighborsAt
|
|
// invocation. Why? I (Annika) don't know. I have to ask Patrick.
|
|
// TODO: This feels wrong here. Figure out reason why it
|
|
// was added and refactor.
|
|
// At least now the variable name is a bit more honest.
|
|
forceNeighborRefresh := cfg.Server.EnableNeighborsStatusRefresh
|
|
|
|
store := &NeighborsStore{
|
|
backend: backend,
|
|
sources: sources,
|
|
forceNeighborRefresh: forceNeighborRefresh,
|
|
}
|
|
return store
|
|
}
|
|
|
|
// Start the store's housekeeping.
|
|
func (s *NeighborsStore) Start(ctx context.Context) {
|
|
log.Println("Starting local neighbors store")
|
|
for {
|
|
if err := ctx.Err(); err != nil {
|
|
return // Context invalid
|
|
}
|
|
s.update(ctx)
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
|
|
// GetStatus retrievs the status for a route server
|
|
// identified by sourceID.
|
|
func (s *NeighborsStore) GetStatus(sourceID string) (*Status, error) {
|
|
return s.sources.GetStatus(sourceID)
|
|
}
|
|
|
|
// IsInitialized retrieves the status for a route server
|
|
// and checks if it is ready.
|
|
func (s *NeighborsStore) IsInitialized(sourceID string) bool {
|
|
rdy, _ := s.sources.IsInitialized(sourceID)
|
|
return rdy
|
|
}
|
|
|
|
// updateSource will update a single source. This
|
|
// function may crash or return errors.
|
|
func (s *NeighborsStore) updateSource(
|
|
ctx context.Context,
|
|
src sources.Source,
|
|
srcID string,
|
|
) error {
|
|
// Get neighbors form source instance and update backend
|
|
res, err := src.Neighbors(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = s.backend.SetNeighbors(ctx, srcID, res.Neighbors); err != nil {
|
|
return err
|
|
}
|
|
|
|
return s.sources.RefreshSuccess(srcID)
|
|
}
|
|
|
|
// safeUpdateSource will try to update a source but
|
|
// will recover from a panic if something goes wrong.
|
|
// In that case, the LastError and State will be updated.
|
|
func (s *NeighborsStore) safeUpdateSource(ctx context.Context, id string) {
|
|
if !s.sources.ShouldRefresh(id) {
|
|
return // Nothing to do here
|
|
}
|
|
|
|
if err := s.sources.LockSource(id); err != nil {
|
|
log.Println("Cloud not start neighbor refresh:", err)
|
|
return
|
|
}
|
|
|
|
// Apply jitter so, we do not hit everything at once.
|
|
// TODO: Make configurable
|
|
time.Sleep(time.Duration(rand.Intn(30)) * time.Second)
|
|
|
|
src := s.sources.GetInstance(id)
|
|
srcName := s.sources.GetName(id)
|
|
|
|
// Prepare for impact.
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Println(
|
|
"Recovering after failed neighbors refresh of",
|
|
srcName, "from:", err)
|
|
s.sources.RefreshError(id, err)
|
|
}
|
|
}()
|
|
|
|
if err := s.updateSource(ctx, src, id); err != nil {
|
|
log.Println(
|
|
"Refeshing neighbors of", srcName, "failed:", err)
|
|
s.sources.RefreshError(id, err)
|
|
}
|
|
|
|
status, err := s.sources.GetStatus(id)
|
|
if err != nil {
|
|
log.Println(err)
|
|
} else {
|
|
log.Println("Refreshed neighbors of", srcName, "in", status.LastRefreshDuration)
|
|
}
|
|
}
|
|
|
|
// Update all neighbors from all sources, where the
|
|
// sources last neighbor refresh is longer ago
|
|
// than the configured refresh period.
|
|
func (s *NeighborsStore) update(ctx context.Context) {
|
|
for _, id := range s.sources.GetSourceIDsForRefresh() {
|
|
go s.safeUpdateSource(ctx, id)
|
|
}
|
|
}
|
|
|
|
// CachedAt returns the time of the oldest partial
|
|
// refresh of the dataset.
|
|
func (s *NeighborsStore) CachedAt(
|
|
ctx context.Context,
|
|
) time.Time {
|
|
return s.sources.CachedAt(ctx)
|
|
}
|
|
|
|
// CacheTTL returns the TTL time
|
|
func (s *NeighborsStore) CacheTTL(
|
|
ctx context.Context,
|
|
) time.Time {
|
|
return s.sources.NextRefresh(ctx)
|
|
}
|
|
|
|
// GetNeighborsAt gets all neighbors from a routeserver
|
|
func (s *NeighborsStore) GetNeighborsAt(
|
|
ctx context.Context,
|
|
sourceID string,
|
|
) (api.Neighbors, error) {
|
|
if !s.IsInitialized(sourceID) {
|
|
return nil, ErrSourceNotInitialized
|
|
}
|
|
if s.forceNeighborRefresh {
|
|
src := s.sources.GetInstance(sourceID)
|
|
if src == nil {
|
|
return nil, sources.ErrSourceNotFound
|
|
}
|
|
if err := s.updateSource(ctx, src, sourceID); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return s.backend.GetNeighborsAt(ctx, sourceID)
|
|
}
|
|
|
|
// GetNeighborsMapAt looks up a neighbor on a RS by ID.
|
|
func (s *NeighborsStore) GetNeighborsMapAt(
|
|
ctx context.Context,
|
|
sourceID string,
|
|
) (map[string]*api.Neighbor, error) {
|
|
if !s.IsInitialized(sourceID) {
|
|
return nil, ErrSourceNotInitialized
|
|
}
|
|
return s.backend.GetNeighborsMapAt(ctx, sourceID)
|
|
}
|
|
|
|
// lookupNeighborsAt filters for neighbors at a route
|
|
// server matching a given query string.
|
|
func (s *NeighborsStore) lookupNeighborsAt(
|
|
ctx context.Context,
|
|
sourceID string,
|
|
query string,
|
|
) (api.Neighbors, error) {
|
|
neighbors, err := s.backend.GetNeighborsAt(ctx, sourceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
asn := -1
|
|
if ReMatchASLookup.MatchString(query) {
|
|
groups := ReMatchASLookup.FindStringSubmatch(query)
|
|
if a, err := strconv.Atoi(groups[1]); err == nil {
|
|
asn = a
|
|
}
|
|
}
|
|
|
|
results := api.Neighbors{}
|
|
for _, neighbor := range neighbors {
|
|
if asn >= 0 && neighbor.ASN == asn { // only executed if valid AS query is detected
|
|
results = append(results, neighbor)
|
|
} else if ContainsCi(neighbor.Description, query) {
|
|
results = append(results, neighbor)
|
|
} else {
|
|
continue
|
|
}
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// LookupNeighbors filters for neighbors matching a query
|
|
// on all route servers.
|
|
func (s *NeighborsStore) LookupNeighbors(
|
|
ctx context.Context,
|
|
query string,
|
|
) (api.NeighborsLookupResults, error) {
|
|
// Create empty result set
|
|
results := make(api.NeighborsLookupResults)
|
|
for _, sourceID := range s.sources.GetSourceIDs() {
|
|
neighbors, err := s.lookupNeighborsAt(ctx, sourceID, query)
|
|
if errors.Is(err, sources.ErrSourceNotFound) {
|
|
continue // Skip neighbors from this source for now
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
results[sourceID] = neighbors
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
// FilterNeighbors retrieves neighbors by name or by ASN
|
|
// from all route servers.
|
|
func (s *NeighborsStore) FilterNeighbors(
|
|
ctx context.Context,
|
|
filter *api.NeighborFilter,
|
|
) (api.Neighbors, error) {
|
|
results := []*api.Neighbor{}
|
|
// Get neighbors from all routeservers
|
|
for _, sourceID := range s.sources.GetSourceIDs() {
|
|
neighbors, err := s.backend.GetNeighborsAt(ctx, sourceID)
|
|
if errors.Is(err, sources.ErrSourceNotFound) {
|
|
continue // Skip neighbors from this source for now
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Apply filters
|
|
for _, neighbor := range neighbors {
|
|
if filter.Match(neighbor) {
|
|
results = append(results, neighbor)
|
|
}
|
|
}
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
// Stats exports some statistics for monitoring.
|
|
func (s *NeighborsStore) Stats(
|
|
ctx context.Context,
|
|
) *api.NeighborsStoreStats {
|
|
totalNeighbors := 0
|
|
rsStats := []api.RouteServerNeighborsStats{}
|
|
|
|
for _, sourceID := range s.sources.GetSourceIDs() {
|
|
status, _ := s.sources.GetStatus(sourceID)
|
|
ncount, err := s.backend.CountNeighborsAt(ctx, sourceID)
|
|
if err != nil {
|
|
if !errors.Is(err, sources.ErrSourceNotFound) {
|
|
log.Println("error during neighbor count:", err)
|
|
}
|
|
}
|
|
totalNeighbors += ncount
|
|
serverStats := api.RouteServerNeighborsStats{
|
|
Name: s.sources.GetName(sourceID),
|
|
State: status.State.String(),
|
|
Neighbors: ncount,
|
|
UpdatedAt: s.SourceCachedAt(sourceID),
|
|
}
|
|
rsStats = append(rsStats, serverStats)
|
|
}
|
|
storeStats := &api.NeighborsStoreStats{
|
|
TotalNeighbors: totalNeighbors,
|
|
RouteServers: rsStats,
|
|
}
|
|
return storeStats
|
|
}
|
|
|
|
// Status returns the stores current status
|
|
func (s *NeighborsStore) Status(ctx context.Context) *api.StoreStatus {
|
|
initialized := true
|
|
sources := s.sources.GetSourcesStatus()
|
|
status := make(map[string]*api.SourceStatus)
|
|
|
|
for _, s := range sources {
|
|
if !s.Initialized {
|
|
initialized = false
|
|
}
|
|
status[s.SourceID] = &api.SourceStatus{
|
|
RefreshInterval: s.RefreshInterval,
|
|
LastRefresh: s.LastRefresh,
|
|
State: s.State.String(),
|
|
Initialized: s.Initialized,
|
|
}
|
|
}
|
|
|
|
meta := &api.StoreStatus{
|
|
Initialized: initialized,
|
|
Sources: status,
|
|
}
|
|
return meta
|
|
}
|
|
|
|
// SourceCachedAt returns the last time the store content
|
|
// was refreshed.
|
|
func (s *NeighborsStore) SourceCachedAt(sourceID string) time.Time {
|
|
status, err := s.sources.GetStatus(sourceID)
|
|
if err != nil {
|
|
log.Println("error while getting source cached at:", err)
|
|
return time.Time{}
|
|
}
|
|
return status.LastRefresh
|
|
}
|
|
|
|
// SourceCacheTTL returns the next time when a refresh
|
|
// will be started.
|
|
func (s *NeighborsStore) SourceCacheTTL(
|
|
ctx context.Context,
|
|
sourceID string,
|
|
) time.Time {
|
|
return s.sources.NextRefresh(ctx)
|
|
}
|