Concurrent update of Neighbours and Routes #49

Closed
zromi18 wants to merge 1 commits from develop into develop
5 changed files with 186 additions and 110 deletions

2
.gitignore vendored
View File

@ -33,3 +33,5 @@ etc/alice-lg/alice.conf
.DS_Store
*coverage*
.idea

View File

@ -98,67 +98,102 @@ func (self *NeighboursStore) SourceState(sourceId string) int {
// Update all neighbors
func (self *NeighboursStore) update() {
type ResultType string
const(
Success = "Success"
Failure = "Failure"
Skipped = "Skipped"
)
type GetNeighboursResult struct {
sourceId string
result ResultType
}
result := make(chan GetNeighboursResult)
successCount := 0
errorCount := 0
t0 := time.Now()
for sourceId, _ := range self.neighboursMap {
for sourceIdentifier, _ := range self.neighboursMap {
// Get current state
if self.statusMap[sourceId].State == STATE_UPDATING {
continue // nothing to do here. really.
}
// Start updating
self.Lock()
self.statusMap[sourceId] = StoreStatus{
State: STATE_UPDATING,
}
self.Unlock()
sourceConfig := self.configMap[sourceId]
source := sourceConfig.getInstance()
neighboursRes, err := source.Neighbours()
if err != nil {
log.Println(
"Refreshing the neighbors store failed for:",
sourceConfig.Name, "(", sourceConfig.Id, ")",
"with:", err,
"- NEXT STATE: ERROR",
)
// That's sad.
go func(sourceId string) {
self.Lock()
self.statusMap[sourceId] = StoreStatus{
State: STATE_ERROR,
LastError: err,
LastRefresh: time.Now(),
if self.statusMap[sourceId].State == STATE_UPDATING {
self.Unlock()
result <- GetNeighboursResult{
sourceId: sourceId,
result: Skipped,
}
} else {
// Start updating
self.statusMap[sourceId] = StoreStatus{
State: STATE_UPDATING,
}
sourceConfig := self.configMap[sourceId]
instance := sourceConfig.getInstance()
self.Unlock()
neighboursRes, err := instance.Neighbours()
if err != nil {
log.Println(
"Refreshing the neighbors store failed for:",
sourceConfig.Name, "(", sourceConfig.Id, ")",
"with:", err,
"- NEXT STATE: ERROR",
)
// That's sad.
self.Lock()
self.statusMap[sourceId] = StoreStatus{
State: STATE_ERROR,
LastError: err,
LastRefresh: time.Now(),
}
self.Unlock()
result <- GetNeighboursResult{
sourceId: sourceId,
result: Failure,
}
} else {
neighbours := neighboursRes.Neighbours
// Update data
// Make neighbours index
index := make(NeighboursIndex)
for _, neighbour := range neighbours {
index[neighbour.Id] = neighbour
}
self.Lock()
self.neighboursMap[sourceId] = index
// Update state
self.statusMap[sourceId] = StoreStatus{
LastRefresh: time.Now(),
State: STATE_READY,
}
self.lastRefresh = time.Now().UTC()
self.Unlock()
result <- GetNeighboursResult{
sourceId: sourceId,
result: Success,
}
}
}
self.Unlock()
errorCount++
continue
}
neighbours := neighboursRes.Neighbours
// Update data
// Make neighbours index
index := make(NeighboursIndex)
for _, neighbour := range neighbours {
index[neighbour.Id] = neighbour
}
self.Lock()
self.neighboursMap[sourceId] = index
// Update state
self.statusMap[sourceId] = StoreStatus{
LastRefresh: time.Now(),
State: STATE_READY,
}
self.lastRefresh = time.Now().UTC()
self.Unlock()
successCount++
} (sourceIdentifier)
}
for i := 0; i < len(self.neighboursMap); i++ {
switch (<- result).result {
case Success:
successCount++
case Failure:
errorCount++
}
}
close(result)
refreshDuration := time.Since(t0)
log.Println(
"Refreshed neighbors store for", successCount, "of", successCount+errorCount,

View File

@ -77,61 +77,96 @@ func (self *RoutesStore) init() {
// Update all routes
func (self *RoutesStore) update() {
type ResultType string
const(
Success = "Success"
Failure = "Failure"
Skipped = "Skipped"
)
type GetRoutesResult struct {
sourceId string
result ResultType
}
result := make(chan GetRoutesResult)
successCount := 0
errorCount := 0
t0 := time.Now()
for sourceId, _ := range self.routesMap {
sourceConfig := self.configMap[sourceId]
source := sourceConfig.getInstance()
// Get current update state
if self.statusMap[sourceId].State == STATE_UPDATING {
continue // nothing to do here
}
// Set update state
self.Lock()
self.statusMap[sourceId] = StoreStatus{
State: STATE_UPDATING,
}
self.Unlock()
routes, err := source.AllRoutes()
if err != nil {
log.Println(
"Refreshing the routes store failed for:", sourceConfig.Name,
"(", sourceConfig.Id, ")",
"with:", err,
"- NEXT STATE: ERROR",
)
for sourceIdentifier, _ := range self.routesMap {
go func(sourceId string) {
from := time.Now()
// Get current update state
self.Lock()
self.statusMap[sourceId] = StoreStatus{
State: STATE_ERROR,
LastError: err,
LastRefresh: time.Now(),
if self.statusMap[sourceId].State == STATE_UPDATING {
self.Unlock()
// Nothing to do here
result <- GetRoutesResult{
sourceId: sourceId,
result: Skipped}
} else {
// Set update state
self.statusMap[sourceId] = StoreStatus{
State: STATE_UPDATING,
}
sourceConfig := self.configMap[sourceId]
instance := sourceConfig.getInstance()
self.Unlock()
routes, err := instance.AllRoutes()
if err != nil {
log.Println(
"Refreshing the routes store failed for:", sourceConfig.Name,
"(", sourceConfig.Id, ")",
"with:", err,
"- NEXT STATE: ERROR",
)
self.Lock()
self.statusMap[sourceId] = StoreStatus{
State: STATE_ERROR,
LastError: err,
LastRefresh: time.Now(),
}
self.Unlock()
result <- GetRoutesResult{
sourceId: sourceId,
result: Failure}
} else {
self.Lock()
// Update data
self.routesMap[sourceId] = routes
// Update state
self.statusMap[sourceId] = StoreStatus{
LastRefresh: time.Now(),
State: STATE_READY,
}
self.lastRefresh = time.Now().UTC()
self.Unlock()
log.Println("Refreshed: ", sourceId, " filtered: ", len(routes.Filtered), " imported: ", len(routes.Imported), " not exported: ", len(routes.NotExported),
" from: ", from, " to: ", time.Now())
result <- GetRoutesResult{
sourceId: sourceId,
result: Success}
}
}
self.Unlock()
errorCount++
continue
}
self.Lock()
// Update data
self.routesMap[sourceId] = routes
// Update state
self.statusMap[sourceId] = StoreStatus{
LastRefresh: time.Now(),
State: STATE_READY,
}
self.lastRefresh = time.Now().UTC()
self.Unlock()
successCount++
}(sourceIdentifier)
}
for i := 0; i < len(self.routesMap); i++ {
switch (<- result).result {
case Success:
successCount++
case Failure:
errorCount++
}
}
close(result)
refreshDuration := time.Since(t0)
log.Println(
"Refreshed routes store for", successCount, "of", successCount+errorCount,

View File

@ -108,15 +108,7 @@ func (self *MultiTableBirdwatcher) fetchReceivedRoutes(neighborId string) (*api.
return &apiStatus, received, nil
}
func (self *MultiTableBirdwatcher) fetchFilteredRoutes(neighborId string) (*api.ApiStatus, api.Routes, error) {
// Query birdwatcher
_, birdProtocols, err := self.fetchProtocols()
if err != nil {
return nil, nil, err
}
protocols := birdProtocols["protocols"].(map[string]interface{})
func (self *MultiTableBirdwatcher) fetchFilteredRoutes(neighborId string, protocols map[string]interface{}) (*api.ApiStatus, api.Routes, error) {
if _, ok := protocols[neighborId]; !ok {
return nil, nil, fmt.Errorf("Invalid Neighbor")
}
@ -229,8 +221,16 @@ func (self *MultiTableBirdwatcher) fetchRequiredRoutes(neighborId string) (*api.
return nil, err
}
// Query birdwatcher
_, birdProtocols, err := self.fetchProtocols()
if err != nil {
return nil, err
}
protocols := birdProtocols["protocols"].(map[string]interface{})
// Second: get routes filtered
_, filteredRoutes, err := self.fetchFilteredRoutes(neighborId)
_, filteredRoutes, err := self.fetchFilteredRoutes(neighborId, protocols)
if err != nil {
return nil, err
}
@ -481,6 +481,8 @@ func (self *MultiTableBirdwatcher) AllRoutes() (*api.RoutesResponse, error) {
return nil, err
}
protocols := birdProtocols["protocols"].(map[string]interface{})
// Fetch received routes first
birdImported, err := self.client.GetJson("/routes/table/master")
if err != nil {
@ -510,7 +512,7 @@ func (self *MultiTableBirdwatcher) AllRoutes() (*api.RoutesResponse, error) {
learntFrom := mustString(protocolsData.(map[string]interface{})["learnt_from"], peer)
// Fetch filtered routes
_, filtered, err := self.fetchFilteredRoutes(protocolId)
_, filtered, err := self.fetchFilteredRoutes(protocolId, protocols)
if err != nil {
continue
}

2
go.mod
View File

@ -1,5 +1,7 @@
module github.com/alice-lg/alice-lg
go 1.14
require (
github.com/GeertJohan/go.rice v0.0.0-20181229193832-0af3f3b09a0a
github.com/daaku/go.zipexe v0.0.0-20150329023125-a5fe2436ffcb // indirect