alice-lg/pkg/store/sources_store.go
2021-12-07 19:11:11 +01:00

256 lines
5.5 KiB
Go

package store
import (
"context"
"log"
"sync"
"time"
"github.com/alice-lg/alice-lg/pkg/config"
"github.com/alice-lg/alice-lg/pkg/sources"
)
// Store State Constants
const (
StateInit = iota
StateReady
StateBusy
StateError
)
// State is an enum of the above States
type State int
// String() converts a state into a string
func (s State) String() string {
switch s {
case StateInit:
return "INIT"
case StateReady:
return "READY"
case StateBusy:
return "BUSY"
case StateError:
return "ERROR"
}
return "INVALID"
}
// Status defines a status the store can be in
type Status struct {
RefreshInterval time.Duration
LastRefresh time.Time
LastRefreshDuration time.Duration
LastError interface{}
State State
Initialized bool
lastRefreshStart time.Time
}
// SourcesStore provides methods for retrieving
// the current status of a source.
type SourcesStore struct {
refreshInterval time.Duration
status map[string]*Status
sources map[string]*config.SourceConfig
sync.Mutex
}
// NewSourcesStore initializes a new source store
func NewSourcesStore(
cfg *config.Config,
refreshInterval time.Duration,
) *SourcesStore {
status := make(map[string]*Status)
sources := make(map[string]*config.SourceConfig)
// Add sources from config
for _, src := range cfg.Sources {
sourceID := src.ID
sources[sourceID] = src
status[sourceID] = &Status{
RefreshInterval: refreshInterval,
}
}
return &SourcesStore{
status: status,
sources: sources,
refreshInterval: refreshInterval,
}
}
// GetStatus will retrieve the status of a source
func (s *SourcesStore) GetStatus(sourceID string) (*Status, error) {
s.Lock()
defer s.Unlock()
return s.getStatus(sourceID)
}
// Internal getStatus
func (s *SourcesStore) getStatus(sourceID string) (*Status, error) {
status, ok := s.status[sourceID]
if !ok {
return nil, sources.ErrSourceNotFound
}
return status, nil
}
// IsInitialized will retrieve the status of the source
// and check if a successful refresh happend at least
// once.
func (s *SourcesStore) IsInitialized(sourceID string) (bool, error) {
s.Lock()
defer s.Unlock()
status, err := s.getStatus(sourceID)
if err != nil {
return false, err
}
return status.Initialized, nil
}
// NextRefresh calculates the next refresh time
// TODO: I doubt the usefulness of these numbers.
//
func (s *SourcesStore) NextRefresh(
ctx context.Context,
) time.Time {
s.Lock()
defer s.Unlock()
t := time.Time{}
for _, status := range s.status {
nextRefresh := status.LastRefresh.Add(
s.refreshInterval)
if nextRefresh.After(t) {
t = nextRefresh
}
}
return t
}
// ShouldRefresh checks if the source needs a
// new refresh according to the provided refreshInterval.
func (s *SourcesStore) ShouldRefresh(
sourceID string,
) bool {
status, err := s.GetStatus(sourceID)
if err != nil {
log.Println("get status error:", err)
return false
}
if status.State == StateBusy {
return false // Source is busy
}
nextRefresh := status.LastRefresh.Add(
s.refreshInterval)
if time.Now().UTC().Before(nextRefresh) {
return false // Too soon
}
return true // Go for it
}
// CachedAt retrievs the oldest refresh time
// from all sources. All data is then guaranteed to be older
// than the CachedAt date.
func (s *SourcesStore) CachedAt(ctx context.Context) time.Time {
s.Lock()
defer s.Unlock()
t := time.Now().UTC()
for _, status := range s.status {
if status.LastRefresh.Before(t) {
t = status.LastRefresh
}
}
return t
}
// GetInstance retrieves a source instance by ID
func (s *SourcesStore) GetInstance(sourceID string) sources.Source {
s.Lock()
defer s.Unlock()
return s.sources[sourceID].GetInstance()
}
// GetName retrieves a source name by ID
func (s *SourcesStore) GetName(sourceID string) string {
s.Lock()
defer s.Unlock()
return s.sources[sourceID].Name
}
// Get retrieves the source
func (s *SourcesStore) Get(sourceID string) *config.SourceConfig {
s.Lock()
defer s.Unlock()
return s.sources[sourceID]
}
// GetSourceIDs returns a list of registered source ids.
func (s *SourcesStore) GetSourceIDs() []string {
s.Lock()
defer s.Unlock()
ids := make([]string, 0, len(s.sources))
for id := range s.sources {
ids = append(ids, id)
}
return ids
}
// LockSource indicates the start of a refresh
func (s *SourcesStore) LockSource(sourceID string) error {
s.Lock()
defer s.Unlock()
status, err := s.getStatus(sourceID)
if err != nil {
return err
}
if status.State == StateBusy {
return sources.ErrSourceBusy
}
status.State = StateBusy
status.lastRefreshStart = time.Now()
return nil
}
// RefreshSuccess indicates a successfull update
// of the store's content.
func (s *SourcesStore) RefreshSuccess(sourceID string) error {
s.Lock()
defer s.Unlock()
status, err := s.getStatus(sourceID)
if err != nil {
return err
}
status.State = StateReady
status.LastRefresh = time.Now().UTC()
status.LastRefreshDuration = time.Now().Sub(
status.lastRefreshStart)
status.LastError = nil
status.Initialized = true // We now have data
return nil
}
// RefreshError indicates that the refresh has failed
func (s *SourcesStore) RefreshError(
sourceID string,
err interface{},
) {
s.Lock()
defer s.Unlock()
status, err := s.getStatus(sourceID)
if err != nil {
log.Println("error getting source status:", err)
return
}
status.State = StateError
status.LastRefresh = time.Now().UTC()
status.LastRefreshDuration = time.Now().Sub(
status.lastRefreshStart)
status.LastError = err
return
}