Concurrent update of Neighbours and Routes #49
2
.gitignore
vendored
2
.gitignore
vendored
@ -33,3 +33,5 @@ etc/alice-lg/alice.conf
|
|||||||
.DS_Store
|
.DS_Store
|
||||||
|
|
||||||
*coverage*
|
*coverage*
|
||||||
|
|
||||||
|
.idea
|
||||||
|
@ -98,67 +98,102 @@ func (self *NeighboursStore) SourceState(sourceId string) int {
|
|||||||
|
|
||||||
// Update all neighbors
|
// Update all neighbors
|
||||||
func (self *NeighboursStore) update() {
|
func (self *NeighboursStore) update() {
|
||||||
|
type ResultType string
|
||||||
|
const(
|
||||||
|
Success = "Success"
|
||||||
|
Failure = "Failure"
|
||||||
|
Skipped = "Skipped"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetNeighboursResult struct {
|
||||||
|
sourceId string
|
||||||
|
result ResultType
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make(chan GetNeighboursResult)
|
||||||
|
|
||||||
successCount := 0
|
successCount := 0
|
||||||
errorCount := 0
|
errorCount := 0
|
||||||
t0 := time.Now()
|
t0 := time.Now()
|
||||||
for sourceId, _ := range self.neighboursMap {
|
for sourceIdentifier, _ := range self.neighboursMap {
|
||||||
// Get current state
|
// Get current state
|
||||||
if self.statusMap[sourceId].State == STATE_UPDATING {
|
go func(sourceId string) {
|
||||||
continue // nothing to do here. really.
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start updating
|
|
||||||
self.Lock()
|
|
||||||
self.statusMap[sourceId] = StoreStatus{
|
|
||||||
State: STATE_UPDATING,
|
|
||||||
}
|
|
||||||
self.Unlock()
|
|
||||||
|
|
||||||
sourceConfig := self.configMap[sourceId]
|
|
||||||
source := sourceConfig.getInstance()
|
|
||||||
|
|
||||||
neighboursRes, err := source.Neighbours()
|
|
||||||
if err != nil {
|
|
||||||
log.Println(
|
|
||||||
"Refreshing the neighbors store failed for:",
|
|
||||||
sourceConfig.Name, "(", sourceConfig.Id, ")",
|
|
||||||
"with:", err,
|
|
||||||
"- NEXT STATE: ERROR",
|
|
||||||
)
|
|
||||||
// That's sad.
|
|
||||||
self.Lock()
|
self.Lock()
|
||||||
self.statusMap[sourceId] = StoreStatus{
|
if self.statusMap[sourceId].State == STATE_UPDATING {
|
||||||
State: STATE_ERROR,
|
self.Unlock()
|
||||||
LastError: err,
|
result <- GetNeighboursResult{
|
||||||
LastRefresh: time.Now(),
|
sourceId: sourceId,
|
||||||
|
result: Skipped,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Start updating
|
||||||
|
self.statusMap[sourceId] = StoreStatus{
|
||||||
|
State: STATE_UPDATING,
|
||||||
|
}
|
||||||
|
sourceConfig := self.configMap[sourceId]
|
||||||
|
instance := sourceConfig.getInstance()
|
||||||
|
self.Unlock()
|
||||||
|
|
||||||
|
neighboursRes, err := instance.Neighbours()
|
||||||
|
if err != nil {
|
||||||
|
log.Println(
|
||||||
|
"Refreshing the neighbors store failed for:",
|
||||||
|
sourceConfig.Name, "(", sourceConfig.Id, ")",
|
||||||
|
"with:", err,
|
||||||
|
"- NEXT STATE: ERROR",
|
||||||
|
)
|
||||||
|
// That's sad.
|
||||||
|
self.Lock()
|
||||||
|
self.statusMap[sourceId] = StoreStatus{
|
||||||
|
State: STATE_ERROR,
|
||||||
|
LastError: err,
|
||||||
|
LastRefresh: time.Now(),
|
||||||
|
}
|
||||||
|
self.Unlock()
|
||||||
|
|
||||||
|
result <- GetNeighboursResult{
|
||||||
|
sourceId: sourceId,
|
||||||
|
result: Failure,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
neighbours := neighboursRes.Neighbours
|
||||||
|
|
||||||
|
// Update data
|
||||||
|
// Make neighbours index
|
||||||
|
index := make(NeighboursIndex)
|
||||||
|
for _, neighbour := range neighbours {
|
||||||
|
index[neighbour.Id] = neighbour
|
||||||
|
}
|
||||||
|
|
||||||
|
self.Lock()
|
||||||
|
self.neighboursMap[sourceId] = index
|
||||||
|
// Update state
|
||||||
|
self.statusMap[sourceId] = StoreStatus{
|
||||||
|
LastRefresh: time.Now(),
|
||||||
|
State: STATE_READY,
|
||||||
|
}
|
||||||
|
self.lastRefresh = time.Now().UTC()
|
||||||
|
self.Unlock()
|
||||||
|
|
||||||
|
result <- GetNeighboursResult{
|
||||||
|
sourceId: sourceId,
|
||||||
|
result: Success,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
self.Unlock()
|
} (sourceIdentifier)
|
||||||
|
|
||||||
errorCount++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
neighbours := neighboursRes.Neighbours
|
|
||||||
|
|
||||||
// Update data
|
|
||||||
// Make neighbours index
|
|
||||||
index := make(NeighboursIndex)
|
|
||||||
for _, neighbour := range neighbours {
|
|
||||||
index[neighbour.Id] = neighbour
|
|
||||||
}
|
|
||||||
|
|
||||||
self.Lock()
|
|
||||||
self.neighboursMap[sourceId] = index
|
|
||||||
// Update state
|
|
||||||
self.statusMap[sourceId] = StoreStatus{
|
|
||||||
LastRefresh: time.Now(),
|
|
||||||
State: STATE_READY,
|
|
||||||
}
|
|
||||||
self.lastRefresh = time.Now().UTC()
|
|
||||||
self.Unlock()
|
|
||||||
successCount++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(self.neighboursMap); i++ {
|
||||||
|
switch (<- result).result {
|
||||||
|
case Success:
|
||||||
|
successCount++
|
||||||
|
case Failure:
|
||||||
|
errorCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(result)
|
||||||
|
|
||||||
refreshDuration := time.Since(t0)
|
refreshDuration := time.Since(t0)
|
||||||
log.Println(
|
log.Println(
|
||||||
"Refreshed neighbors store for", successCount, "of", successCount+errorCount,
|
"Refreshed neighbors store for", successCount, "of", successCount+errorCount,
|
||||||
|
@ -77,61 +77,96 @@ func (self *RoutesStore) init() {
|
|||||||
|
|
||||||
// Update all routes
|
// Update all routes
|
||||||
func (self *RoutesStore) update() {
|
func (self *RoutesStore) update() {
|
||||||
|
type ResultType string
|
||||||
|
const(
|
||||||
|
Success = "Success"
|
||||||
|
Failure = "Failure"
|
||||||
|
Skipped = "Skipped"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetRoutesResult struct {
|
||||||
|
sourceId string
|
||||||
|
result ResultType
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make(chan GetRoutesResult)
|
||||||
successCount := 0
|
successCount := 0
|
||||||
errorCount := 0
|
errorCount := 0
|
||||||
t0 := time.Now()
|
t0 := time.Now()
|
||||||
|
|
||||||
for sourceId, _ := range self.routesMap {
|
for sourceIdentifier, _ := range self.routesMap {
|
||||||
sourceConfig := self.configMap[sourceId]
|
go func(sourceId string) {
|
||||||
source := sourceConfig.getInstance()
|
from := time.Now()
|
||||||
|
|
||||||
// Get current update state
|
|
||||||
if self.statusMap[sourceId].State == STATE_UPDATING {
|
|
||||||
continue // nothing to do here
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set update state
|
|
||||||
self.Lock()
|
|
||||||
self.statusMap[sourceId] = StoreStatus{
|
|
||||||
State: STATE_UPDATING,
|
|
||||||
}
|
|
||||||
self.Unlock()
|
|
||||||
|
|
||||||
routes, err := source.AllRoutes()
|
|
||||||
if err != nil {
|
|
||||||
log.Println(
|
|
||||||
"Refreshing the routes store failed for:", sourceConfig.Name,
|
|
||||||
"(", sourceConfig.Id, ")",
|
|
||||||
"with:", err,
|
|
||||||
"- NEXT STATE: ERROR",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
// Get current update state
|
||||||
self.Lock()
|
self.Lock()
|
||||||
self.statusMap[sourceId] = StoreStatus{
|
if self.statusMap[sourceId].State == STATE_UPDATING {
|
||||||
State: STATE_ERROR,
|
self.Unlock()
|
||||||
LastError: err,
|
// Nothing to do here
|
||||||
LastRefresh: time.Now(),
|
result <- GetRoutesResult{
|
||||||
|
sourceId: sourceId,
|
||||||
|
result: Skipped}
|
||||||
|
} else {
|
||||||
|
// Set update state
|
||||||
|
self.statusMap[sourceId] = StoreStatus{
|
||||||
|
State: STATE_UPDATING,
|
||||||
|
}
|
||||||
|
sourceConfig := self.configMap[sourceId]
|
||||||
|
instance := sourceConfig.getInstance()
|
||||||
|
self.Unlock()
|
||||||
|
|
||||||
|
routes, err := instance.AllRoutes()
|
||||||
|
if err != nil {
|
||||||
|
log.Println(
|
||||||
|
"Refreshing the routes store failed for:", sourceConfig.Name,
|
||||||
|
"(", sourceConfig.Id, ")",
|
||||||
|
"with:", err,
|
||||||
|
"- NEXT STATE: ERROR",
|
||||||
|
)
|
||||||
|
|
||||||
|
self.Lock()
|
||||||
|
self.statusMap[sourceId] = StoreStatus{
|
||||||
|
State: STATE_ERROR,
|
||||||
|
LastError: err,
|
||||||
|
LastRefresh: time.Now(),
|
||||||
|
}
|
||||||
|
self.Unlock()
|
||||||
|
|
||||||
|
result <- GetRoutesResult{
|
||||||
|
sourceId: sourceId,
|
||||||
|
result: Failure}
|
||||||
|
} else {
|
||||||
|
self.Lock()
|
||||||
|
// Update data
|
||||||
|
self.routesMap[sourceId] = routes
|
||||||
|
// Update state
|
||||||
|
self.statusMap[sourceId] = StoreStatus{
|
||||||
|
LastRefresh: time.Now(),
|
||||||
|
State: STATE_READY,
|
||||||
|
}
|
||||||
|
self.lastRefresh = time.Now().UTC()
|
||||||
|
self.Unlock()
|
||||||
|
log.Println("Refreshed: ", sourceId, " filtered: ", len(routes.Filtered), " imported: ", len(routes.Imported), " not exported: ", len(routes.NotExported),
|
||||||
|
" from: ", from, " to: ", time.Now())
|
||||||
|
|
||||||
|
result <- GetRoutesResult{
|
||||||
|
sourceId: sourceId,
|
||||||
|
result: Success}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
self.Unlock()
|
}(sourceIdentifier)
|
||||||
|
|
||||||
errorCount++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
self.Lock()
|
|
||||||
// Update data
|
|
||||||
self.routesMap[sourceId] = routes
|
|
||||||
// Update state
|
|
||||||
self.statusMap[sourceId] = StoreStatus{
|
|
||||||
LastRefresh: time.Now(),
|
|
||||||
State: STATE_READY,
|
|
||||||
}
|
|
||||||
self.lastRefresh = time.Now().UTC()
|
|
||||||
self.Unlock()
|
|
||||||
|
|
||||||
successCount++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(self.routesMap); i++ {
|
||||||
|
switch (<- result).result {
|
||||||
|
case Success:
|
||||||
|
successCount++
|
||||||
|
case Failure:
|
||||||
|
errorCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(result)
|
||||||
|
|
||||||
refreshDuration := time.Since(t0)
|
refreshDuration := time.Since(t0)
|
||||||
log.Println(
|
log.Println(
|
||||||
"Refreshed routes store for", successCount, "of", successCount+errorCount,
|
"Refreshed routes store for", successCount, "of", successCount+errorCount,
|
||||||
|
@ -108,15 +108,7 @@ func (self *MultiTableBirdwatcher) fetchReceivedRoutes(neighborId string) (*api.
|
|||||||
return &apiStatus, received, nil
|
return &apiStatus, received, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *MultiTableBirdwatcher) fetchFilteredRoutes(neighborId string) (*api.ApiStatus, api.Routes, error) {
|
func (self *MultiTableBirdwatcher) fetchFilteredRoutes(neighborId string, protocols map[string]interface{}) (*api.ApiStatus, api.Routes, error) {
|
||||||
// Query birdwatcher
|
|
||||||
_, birdProtocols, err := self.fetchProtocols()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
protocols := birdProtocols["protocols"].(map[string]interface{})
|
|
||||||
|
|
||||||
if _, ok := protocols[neighborId]; !ok {
|
if _, ok := protocols[neighborId]; !ok {
|
||||||
return nil, nil, fmt.Errorf("Invalid Neighbor")
|
return nil, nil, fmt.Errorf("Invalid Neighbor")
|
||||||
}
|
}
|
||||||
@ -229,8 +221,16 @@ func (self *MultiTableBirdwatcher) fetchRequiredRoutes(neighborId string) (*api.
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Query birdwatcher
|
||||||
|
_, birdProtocols, err := self.fetchProtocols()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
protocols := birdProtocols["protocols"].(map[string]interface{})
|
||||||
|
|
||||||
// Second: get routes filtered
|
// Second: get routes filtered
|
||||||
_, filteredRoutes, err := self.fetchFilteredRoutes(neighborId)
|
_, filteredRoutes, err := self.fetchFilteredRoutes(neighborId, protocols)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -481,6 +481,8 @@ func (self *MultiTableBirdwatcher) AllRoutes() (*api.RoutesResponse, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protocols := birdProtocols["protocols"].(map[string]interface{})
|
||||||
|
|
||||||
// Fetch received routes first
|
// Fetch received routes first
|
||||||
birdImported, err := self.client.GetJson("/routes/table/master")
|
birdImported, err := self.client.GetJson("/routes/table/master")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -510,7 +512,7 @@ func (self *MultiTableBirdwatcher) AllRoutes() (*api.RoutesResponse, error) {
|
|||||||
learntFrom := mustString(protocolsData.(map[string]interface{})["learnt_from"], peer)
|
learntFrom := mustString(protocolsData.(map[string]interface{})["learnt_from"], peer)
|
||||||
|
|
||||||
// Fetch filtered routes
|
// Fetch filtered routes
|
||||||
_, filtered, err := self.fetchFilteredRoutes(protocolId)
|
_, filtered, err := self.fetchFilteredRoutes(protocolId, protocols)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
2
go.mod
2
go.mod
@ -1,5 +1,7 @@
|
|||||||
module github.com/alice-lg/alice-lg
|
module github.com/alice-lg/alice-lg
|
||||||
|
|
||||||
|
go 1.14
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/GeertJohan/go.rice v0.0.0-20181229193832-0af3f3b09a0a
|
github.com/GeertJohan/go.rice v0.0.0-20181229193832-0af3f3b09a0a
|
||||||
github.com/daaku/go.zipexe v0.0.0-20150329023125-a5fe2436ffcb // indirect
|
github.com/daaku/go.zipexe v0.0.0-20150329023125-a5fe2436ffcb // indirect
|
||||||
|
Loading…
x
Reference in New Issue
Block a user