Concurrent update of Neighbours and Routes #49
2
.gitignore
vendored
2
.gitignore
vendored
@ -33,3 +33,5 @@ etc/alice-lg/alice.conf
|
||||
.DS_Store
|
||||
|
||||
*coverage*
|
||||
|
||||
.idea
|
||||
|
@ -98,67 +98,102 @@ func (self *NeighboursStore) SourceState(sourceId string) int {
|
||||
|
||||
// Update all neighbors
|
||||
func (self *NeighboursStore) update() {
|
||||
type ResultType string
|
||||
const(
|
||||
Success = "Success"
|
||||
Failure = "Failure"
|
||||
Skipped = "Skipped"
|
||||
)
|
||||
|
||||
type GetNeighboursResult struct {
|
||||
sourceId string
|
||||
result ResultType
|
||||
}
|
||||
|
||||
result := make(chan GetNeighboursResult)
|
||||
|
||||
successCount := 0
|
||||
errorCount := 0
|
||||
t0 := time.Now()
|
||||
for sourceId, _ := range self.neighboursMap {
|
||||
for sourceIdentifier, _ := range self.neighboursMap {
|
||||
// Get current state
|
||||
if self.statusMap[sourceId].State == STATE_UPDATING {
|
||||
continue // nothing to do here. really.
|
||||
}
|
||||
|
||||
// Start updating
|
||||
self.Lock()
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
State: STATE_UPDATING,
|
||||
}
|
||||
self.Unlock()
|
||||
|
||||
sourceConfig := self.configMap[sourceId]
|
||||
source := sourceConfig.getInstance()
|
||||
|
||||
neighboursRes, err := source.Neighbours()
|
||||
if err != nil {
|
||||
log.Println(
|
||||
"Refreshing the neighbors store failed for:",
|
||||
sourceConfig.Name, "(", sourceConfig.Id, ")",
|
||||
"with:", err,
|
||||
"- NEXT STATE: ERROR",
|
||||
)
|
||||
// That's sad.
|
||||
go func(sourceId string) {
|
||||
self.Lock()
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
State: STATE_ERROR,
|
||||
LastError: err,
|
||||
LastRefresh: time.Now(),
|
||||
if self.statusMap[sourceId].State == STATE_UPDATING {
|
||||
self.Unlock()
|
||||
result <- GetNeighboursResult{
|
||||
sourceId: sourceId,
|
||||
result: Skipped,
|
||||
}
|
||||
} else {
|
||||
// Start updating
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
State: STATE_UPDATING,
|
||||
}
|
||||
sourceConfig := self.configMap[sourceId]
|
||||
instance := sourceConfig.getInstance()
|
||||
self.Unlock()
|
||||
|
||||
neighboursRes, err := instance.Neighbours()
|
||||
if err != nil {
|
||||
log.Println(
|
||||
"Refreshing the neighbors store failed for:",
|
||||
sourceConfig.Name, "(", sourceConfig.Id, ")",
|
||||
"with:", err,
|
||||
"- NEXT STATE: ERROR",
|
||||
)
|
||||
// That's sad.
|
||||
self.Lock()
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
State: STATE_ERROR,
|
||||
LastError: err,
|
||||
LastRefresh: time.Now(),
|
||||
}
|
||||
self.Unlock()
|
||||
|
||||
result <- GetNeighboursResult{
|
||||
sourceId: sourceId,
|
||||
result: Failure,
|
||||
}
|
||||
} else {
|
||||
neighbours := neighboursRes.Neighbours
|
||||
|
||||
// Update data
|
||||
// Make neighbours index
|
||||
index := make(NeighboursIndex)
|
||||
for _, neighbour := range neighbours {
|
||||
index[neighbour.Id] = neighbour
|
||||
}
|
||||
|
||||
self.Lock()
|
||||
self.neighboursMap[sourceId] = index
|
||||
// Update state
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
LastRefresh: time.Now(),
|
||||
State: STATE_READY,
|
||||
}
|
||||
self.lastRefresh = time.Now().UTC()
|
||||
self.Unlock()
|
||||
|
||||
result <- GetNeighboursResult{
|
||||
sourceId: sourceId,
|
||||
result: Success,
|
||||
}
|
||||
}
|
||||
}
|
||||
self.Unlock()
|
||||
|
||||
errorCount++
|
||||
continue
|
||||
}
|
||||
|
||||
neighbours := neighboursRes.Neighbours
|
||||
|
||||
// Update data
|
||||
// Make neighbours index
|
||||
index := make(NeighboursIndex)
|
||||
for _, neighbour := range neighbours {
|
||||
index[neighbour.Id] = neighbour
|
||||
}
|
||||
|
||||
self.Lock()
|
||||
self.neighboursMap[sourceId] = index
|
||||
// Update state
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
LastRefresh: time.Now(),
|
||||
State: STATE_READY,
|
||||
}
|
||||
self.lastRefresh = time.Now().UTC()
|
||||
self.Unlock()
|
||||
successCount++
|
||||
} (sourceIdentifier)
|
||||
}
|
||||
|
||||
for i := 0; i < len(self.neighboursMap); i++ {
|
||||
switch (<- result).result {
|
||||
case Success:
|
||||
successCount++
|
||||
case Failure:
|
||||
errorCount++
|
||||
}
|
||||
}
|
||||
close(result)
|
||||
|
||||
refreshDuration := time.Since(t0)
|
||||
log.Println(
|
||||
"Refreshed neighbors store for", successCount, "of", successCount+errorCount,
|
||||
|
@ -77,61 +77,96 @@ func (self *RoutesStore) init() {
|
||||
|
||||
// Update all routes
|
||||
func (self *RoutesStore) update() {
|
||||
type ResultType string
|
||||
const(
|
||||
Success = "Success"
|
||||
Failure = "Failure"
|
||||
Skipped = "Skipped"
|
||||
)
|
||||
|
||||
type GetRoutesResult struct {
|
||||
sourceId string
|
||||
result ResultType
|
||||
}
|
||||
|
||||
result := make(chan GetRoutesResult)
|
||||
successCount := 0
|
||||
errorCount := 0
|
||||
t0 := time.Now()
|
||||
|
||||
for sourceId, _ := range self.routesMap {
|
||||
sourceConfig := self.configMap[sourceId]
|
||||
source := sourceConfig.getInstance()
|
||||
|
||||
// Get current update state
|
||||
if self.statusMap[sourceId].State == STATE_UPDATING {
|
||||
continue // nothing to do here
|
||||
}
|
||||
|
||||
// Set update state
|
||||
self.Lock()
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
State: STATE_UPDATING,
|
||||
}
|
||||
self.Unlock()
|
||||
|
||||
routes, err := source.AllRoutes()
|
||||
if err != nil {
|
||||
log.Println(
|
||||
"Refreshing the routes store failed for:", sourceConfig.Name,
|
||||
"(", sourceConfig.Id, ")",
|
||||
"with:", err,
|
||||
"- NEXT STATE: ERROR",
|
||||
)
|
||||
for sourceIdentifier, _ := range self.routesMap {
|
||||
go func(sourceId string) {
|
||||
from := time.Now()
|
||||
|
||||
// Get current update state
|
||||
self.Lock()
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
State: STATE_ERROR,
|
||||
LastError: err,
|
||||
LastRefresh: time.Now(),
|
||||
if self.statusMap[sourceId].State == STATE_UPDATING {
|
||||
self.Unlock()
|
||||
// Nothing to do here
|
||||
result <- GetRoutesResult{
|
||||
sourceId: sourceId,
|
||||
result: Skipped}
|
||||
} else {
|
||||
// Set update state
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
State: STATE_UPDATING,
|
||||
}
|
||||
sourceConfig := self.configMap[sourceId]
|
||||
instance := sourceConfig.getInstance()
|
||||
self.Unlock()
|
||||
|
||||
routes, err := instance.AllRoutes()
|
||||
if err != nil {
|
||||
log.Println(
|
||||
"Refreshing the routes store failed for:", sourceConfig.Name,
|
||||
"(", sourceConfig.Id, ")",
|
||||
"with:", err,
|
||||
"- NEXT STATE: ERROR",
|
||||
)
|
||||
|
||||
self.Lock()
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
State: STATE_ERROR,
|
||||
LastError: err,
|
||||
LastRefresh: time.Now(),
|
||||
}
|
||||
self.Unlock()
|
||||
|
||||
result <- GetRoutesResult{
|
||||
sourceId: sourceId,
|
||||
result: Failure}
|
||||
} else {
|
||||
self.Lock()
|
||||
// Update data
|
||||
self.routesMap[sourceId] = routes
|
||||
// Update state
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
LastRefresh: time.Now(),
|
||||
State: STATE_READY,
|
||||
}
|
||||
self.lastRefresh = time.Now().UTC()
|
||||
self.Unlock()
|
||||
log.Println("Refreshed: ", sourceId, " filtered: ", len(routes.Filtered), " imported: ", len(routes.Imported), " not exported: ", len(routes.NotExported),
|
||||
" from: ", from, " to: ", time.Now())
|
||||
|
||||
result <- GetRoutesResult{
|
||||
sourceId: sourceId,
|
||||
result: Success}
|
||||
}
|
||||
}
|
||||
self.Unlock()
|
||||
|
||||
errorCount++
|
||||
continue
|
||||
}
|
||||
|
||||
self.Lock()
|
||||
// Update data
|
||||
self.routesMap[sourceId] = routes
|
||||
// Update state
|
||||
self.statusMap[sourceId] = StoreStatus{
|
||||
LastRefresh: time.Now(),
|
||||
State: STATE_READY,
|
||||
}
|
||||
self.lastRefresh = time.Now().UTC()
|
||||
self.Unlock()
|
||||
|
||||
successCount++
|
||||
}(sourceIdentifier)
|
||||
}
|
||||
|
||||
for i := 0; i < len(self.routesMap); i++ {
|
||||
switch (<- result).result {
|
||||
case Success:
|
||||
successCount++
|
||||
case Failure:
|
||||
errorCount++
|
||||
}
|
||||
}
|
||||
close(result)
|
||||
|
||||
refreshDuration := time.Since(t0)
|
||||
log.Println(
|
||||
"Refreshed routes store for", successCount, "of", successCount+errorCount,
|
||||
|
@ -108,15 +108,7 @@ func (self *MultiTableBirdwatcher) fetchReceivedRoutes(neighborId string) (*api.
|
||||
return &apiStatus, received, nil
|
||||
}
|
||||
|
||||
func (self *MultiTableBirdwatcher) fetchFilteredRoutes(neighborId string) (*api.ApiStatus, api.Routes, error) {
|
||||
// Query birdwatcher
|
||||
_, birdProtocols, err := self.fetchProtocols()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
protocols := birdProtocols["protocols"].(map[string]interface{})
|
||||
|
||||
func (self *MultiTableBirdwatcher) fetchFilteredRoutes(neighborId string, protocols map[string]interface{}) (*api.ApiStatus, api.Routes, error) {
|
||||
if _, ok := protocols[neighborId]; !ok {
|
||||
return nil, nil, fmt.Errorf("Invalid Neighbor")
|
||||
}
|
||||
@ -229,8 +221,16 @@ func (self *MultiTableBirdwatcher) fetchRequiredRoutes(neighborId string) (*api.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Query birdwatcher
|
||||
_, birdProtocols, err := self.fetchProtocols()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
protocols := birdProtocols["protocols"].(map[string]interface{})
|
||||
|
||||
// Second: get routes filtered
|
||||
_, filteredRoutes, err := self.fetchFilteredRoutes(neighborId)
|
||||
_, filteredRoutes, err := self.fetchFilteredRoutes(neighborId, protocols)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -481,6 +481,8 @@ func (self *MultiTableBirdwatcher) AllRoutes() (*api.RoutesResponse, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
protocols := birdProtocols["protocols"].(map[string]interface{})
|
||||
|
||||
// Fetch received routes first
|
||||
birdImported, err := self.client.GetJson("/routes/table/master")
|
||||
if err != nil {
|
||||
@ -510,7 +512,7 @@ func (self *MultiTableBirdwatcher) AllRoutes() (*api.RoutesResponse, error) {
|
||||
learntFrom := mustString(protocolsData.(map[string]interface{})["learnt_from"], peer)
|
||||
|
||||
// Fetch filtered routes
|
||||
_, filtered, err := self.fetchFilteredRoutes(protocolId)
|
||||
_, filtered, err := self.fetchFilteredRoutes(protocolId, protocols)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user