provide community search backend

This commit is contained in:
Annika Hannig 2024-01-26 14:19:18 +01:00
parent dff2773826
commit 3951704b92
7 changed files with 87 additions and 29 deletions

7
pkg/api/errors.go Normal file
View File

@ -0,0 +1,7 @@
package api
import "errors"
// ErrTooManyRoutes is returned when the result set
// of a route query exceeds the maximum allowed number of routes.
var ErrTooManyRoutes = errors.New("too many routes")

View File

@ -87,6 +87,7 @@ type ServerConfig struct {
NeighborsStoreRefreshParallelism int `ini:"neighbors_store_refresh_parallelism"`
RoutesStoreRefreshInterval int `ini:"routes_store_refresh_interval"`
RoutesStoreRefreshParallelism int `ini:"routes_store_refresh_parallelism"`
RoutesStoreQueryLimit uint `ini:"routes_store_query_limit"`
StoreBackend string `ini:"store_backend"`
DefaultAsn int `ini:"asn"`
EnableNeighborsStatusRefresh bool `ini:"enable_neighbors_status_refresh"`

View File

@ -50,10 +50,18 @@ func (s *Server) apiLookupPrefixGlobal(
// Merge query filters into applied filters
filtersApplied = filtersApplied.Combine(queryFilters)
// Check what we want to query
// Select the query strategy:
// Prefix -> fetch prefix
// _ -> fetch neighbors and routes
//
lookupPrefix := decoders.MaybePrefix(q)
lookupEmptyQuery := false
if q == "" && (filtersApplied.HasGroup(api.SearchKeyCommunities) ||
filtersApplied.HasGroup(api.SearchKeyExtCommunities) ||
filtersApplied.HasGroup(api.SearchKeyLargeCommunities)) {
lookupPrefix = true
lookupEmptyQuery = true
}
// Measure response time
t0 := time.Now()
@ -61,9 +69,11 @@ func (s *Server) apiLookupPrefixGlobal(
// Perform query
var routes api.LookupRoutes
if lookupPrefix {
q, err = validatePrefixQuery(q)
if err != nil {
return nil, err
if !lookupEmptyQuery {
q, err = validatePrefixQuery(q)
if err != nil {
return nil, err
}
}
routes, err = s.routesStore.LookupPrefix(ctx, q, filtersApplied)
if err != nil {

View File

@ -71,30 +71,38 @@ func apiErrorResponse(
tag := TagGenericError
status := StatusError
switch e := err.(type) {
case ErrTimeout:
tag = TagConnectionTimeout
code = CodeConnectionTimeout
status = TimeoutError
case *ErrResourceNotFoundError:
tag = TagResourceNotFound
code = CodeResourceNotFound
status = StatusResourceNotFound
case *url.Error:
if strings.Contains(message, "connection refused") {
tag = TagConnectionRefused
code = CodeConnectionRefused
message = "Connection refused while dialing the API"
} else if e.Timeout() {
tag = TagConnectionTimeout
code = CodeConnectionTimeout
message = "Connection timed out when connecting to the backend API"
}
case *ErrValidationFailed:
// TODO: This needs refactoring.
if err == api.ErrTooManyRoutes {
tag = TagValidationError
code = CodeValidationError
status = StatusValidationError
message = e.Reason
} else {
switch e := err.(type) {
case ErrTimeout:
tag = TagConnectionTimeout
code = CodeConnectionTimeout
status = TimeoutError
case *ErrResourceNotFoundError:
tag = TagResourceNotFound
code = CodeResourceNotFound
status = StatusResourceNotFound
case *url.Error:
if strings.Contains(message, "connection refused") {
tag = TagConnectionRefused
code = CodeConnectionRefused
message = "Connection refused while dialing the API"
} else if e.Timeout() {
tag = TagConnectionTimeout
code = CodeConnectionTimeout
message = "Connection timed out when connecting to the backend API"
}
case *ErrValidationFailed:
tag = TagValidationError
code = CodeValidationError
status = StatusValidationError
message = e.Reason
}
}
return api.ErrorResponse{

View File

@ -95,12 +95,22 @@ func (r *RoutesBackend) FindByPrefix(
ctx context.Context,
prefix string,
filters *api.SearchFilters,
limit uint,
) (api.LookupRoutes, error) {
// We make our compare case insensitive
var (
count uint
limitExceeded bool
)
prefix = strings.ToLower(prefix)
result := api.LookupRoutes{}
hasPrefix := prefix != ""
r.routes.Range(func(k, rs interface{}) bool {
if limit > 0 && count >= limit {
limitExceeded = true
return false
}
for _, route := range rs.(api.LookupRoutes) {
// Naiive string filtering:
if hasPrefix && !strings.HasPrefix(strings.ToLower(route.Network), prefix) {
@ -110,8 +120,16 @@ func (r *RoutesBackend) FindByPrefix(
continue
}
result = append(result, route)
count++
if limit > 0 && count >= limit {
limitExceeded = true
return false
}
}
return true
})
if limitExceeded {
return nil, api.ErrTooManyRoutes
}
return result, nil
}

View File

@ -249,7 +249,7 @@ func (b *RoutesBackend) FindByNeighbors(
return nil, err
}
return fetchRoutes(rows, filters)
return fetchRoutes(rows, filters, 0)
}
// FindByPrefix will return the prefixes matching a pattern
@ -257,6 +257,7 @@ func (b *RoutesBackend) FindByPrefix(
ctx context.Context,
prefix string,
filters *api.SearchFilters,
limit uint,
) (api.LookupRoutes, error) {
tx, err := b.pool.BeginTx(ctx, pgx.TxOptions{
IsoLevel: pgx.ReadCommitted,
@ -280,11 +281,16 @@ func (b *RoutesBackend) FindByPrefix(
if err != nil {
return nil, err
}
return fetchRoutes(rows, filters)
return fetchRoutes(rows, filters, limit)
}
// Private fetchRoutes will load the queried result set
func fetchRoutes(rows pgx.Rows, filters *api.SearchFilters) (api.LookupRoutes, error) {
func fetchRoutes(
rows pgx.Rows,
filters *api.SearchFilters,
limit uint,
) (api.LookupRoutes, error) {
var count uint
cmd := rows.CommandTag()
results := make(api.LookupRoutes, 0, cmd.RowsAffected())
for rows.Next() {
@ -296,6 +302,10 @@ func fetchRoutes(rows pgx.Rows, filters *api.SearchFilters) (api.LookupRoutes, e
continue
}
results = append(results, route)
count++
if limit > 0 && count >= limit {
return nil, api.ErrTooManyRoutes
}
}
return results, nil
}

View File

@ -60,6 +60,7 @@ type RoutesStoreBackend interface {
ctx context.Context,
prefix string,
filters *api.SearchFilters,
limit uint,
) (api.LookupRoutes, error)
}
@ -70,6 +71,7 @@ type RoutesStore struct {
backend RoutesStoreBackend
sources *SourcesStore
neighbors *NeighborsStore
limit uint
}
// NewRoutesStore makes a new store instance
@ -93,6 +95,7 @@ func NewRoutesStore(
log.Println("Routes refresh interval set to:", refreshInterval)
log.Println("Routes refresh parallelism:", refreshParallelism)
log.Println("Routes store query limit:", cfg.Server.RoutesStoreQueryLimit)
// Store refresh information per store
sources := NewSourcesStore(cfg, refreshInterval, refreshParallelism)
@ -100,6 +103,7 @@ func NewRoutesStore(
backend: backend,
sources: sources,
neighbors: neighbors,
limit: cfg.Server.RoutesStoreQueryLimit,
}
return store
}
@ -332,7 +336,7 @@ func (s *RoutesStore) LookupPrefix(
prefix string,
filters *api.SearchFilters,
) (api.LookupRoutes, error) {
return s.backend.FindByPrefix(ctx, prefix, filters)
return s.backend.FindByPrefix(ctx, prefix, filters, s.limit)
}
// LookupPrefixForNeighbors returns all routes for