async load filtered routes

This commit is contained in:
Annika Hannig 2022-03-16 18:50:29 +01:00
parent 2af2d18b85
commit e3044abbea

View File

@ -5,7 +5,7 @@ import (
"log"
"sort"
"strings"
"time"
"sync"
"github.com/alice-lg/alice-lg/pkg/api"
"github.com/alice-lg/alice-lg/pkg/decoders"
@ -175,6 +175,12 @@ func (src *MultiTableBirdwatcher) fetchFilteredRoutes(
filtered := parseRoutesData(
birdFiltered["routes"].([]interface{}), src.config, keepDetails)
// Try to free memory
for k := range birdFiltered {
delete(birdFiltered, k)
}
birdFiltered = nil
// Stage 2 filters
table := protocols[neighborID].(map[string]interface{})["table"].(string)
pipeName := src.getMasterPipeName(table)
@ -566,10 +572,6 @@ func (src *MultiTableBirdwatcher) RoutesNotExported(
// AllRoutes retrieves a routes dump from the server
func (src *MultiTableBirdwatcher) AllRoutes() (*api.RoutesResponse, error) {
t0 := time.Now()
log.Println("[RD] begin AllRoutes for:", src.config.ID, src.config.Name)
// Query birdwatcher
_, birdProtocols, err := src.fetchProtocols()
if err != nil {
@ -577,18 +579,12 @@ func (src *MultiTableBirdwatcher) AllRoutes() (*api.RoutesResponse, error) {
}
mainTable := src.GenericBirdwatcher.config.MainTable
t1 := time.Now()
log.Println("[RD] fetchProtocols:", t1.Sub(t0))
// Fetch received routes first
birdImported, err := src.client.GetJSON("/routes/table/" + mainTable)
if err != nil {
return nil, err
}
t2 := time.Now()
log.Println("[RD] fetch received from /routes/table/", mainTable, t2.Sub(t1))
// Use api status from first request
apiStatus, err := parseAPIStatus(birdImported, src.config)
if err != nil {
@ -603,41 +599,75 @@ func (src *MultiTableBirdwatcher) AllRoutes() (*api.RoutesResponse, error) {
// Parse the routes
imported := parseRoutesData(birdImported["routes"].([]interface{}), src.config, false)
// Try to free memory
for k := range birdImported {
delete(birdImported, k)
}
birdImported = nil
// Sort routes for deterministic ordering
// sort.Sort(imported)
response.Imported = imported
t3 := time.Now()
log.Println("[RD] parsed recieved in", t3.Sub(t2))
log.Println("[RD] begin fetching filtered for all neighbors")
// Iterate over all the protocols and fetch the filtered routes for everyone
protocolsBgp := src.filterProtocolsBgp(birdProtocols)
for protocolID, protocolsData := range protocolsBgp["protocols"].(map[string]interface{}) {
peer := protocolsData.(map[string]interface{})["neighbor_address"].(string)
learntFrom := decoders.String(protocolsData.(map[string]interface{})["learnt_from"], peer)
// Fetch filtered routes
t4 := time.Now()
_, filtered, err := src.fetchFilteredRoutes(protocolID, false)
if err != nil {
continue
}
// We load the filtered routes asynchronously with workers.
type fetchFilteredReq struct {
protocolID string
peer string
learntFrom string
}
reqQ := make(chan fetchFilteredReq, 1000)
resQ := make(chan api.Routes, 1000)
wg := &sync.WaitGroup{}
t5 := time.Now()
log.Println("[RD] fetched filtered for", protocolID, "in:", t5.Sub(t4))
// Start workers
for i := 0; i < 42; i++ {
wg.Add(1)
go func() {
// This is a worker for fetching filtered routes
defer wg.Done()
for req := range reqQ {
// Fetch filtered routes
_, filtered, err := src.fetchFilteredRoutes(req.protocolID, false)
if err != nil {
log.Println("error while fetching filtered routes:", err)
}
// Perform route deduplication
filtered = src.filterRoutesByPeerOrLearntFrom(
filtered, req.peer, req.learntFrom)
// Perform route deduplication
filtered = src.filterRoutesByPeerOrLearntFrom(filtered, peer, learntFrom)
response.Filtered = append(response.Filtered, filtered...)
t6 := time.Now()
log.Println("[RD] deduplication of", protocolID, "in:", t6.Sub(t5))
resQ <- filtered
}
}()
}
t7 := time.Now()
log.Println("[RD] total time filtered:", t7.Sub(t3))
log.Println("[RD] total time:", t7.Sub(t0))
// Fill request queue
go func() {
for protocolID, protocolsData := range protocolsBgp["protocols"].(map[string]interface{}) {
peer := protocolsData.(map[string]interface{})["neighbor_address"].(string)
learntFrom := decoders.String(protocolsData.(map[string]interface{})["learnt_from"], peer)
reqQ <- fetchFilteredReq{
protocolID: protocolID,
peer: peer,
learntFrom: learntFrom,
}
}
close(reqQ)
}()
// Await all workers done and close channel
go func() {
wg.Wait()
close(resQ)
}()
// Collect results
for filtered := range resQ {
response.Filtered = append(response.Filtered, filtered...)
}
return response, nil
}