234 lines
6.4 KiB
Go
Raw Permalink Normal View History

2023-09-06 08:22:07 +03:00
package backendpb
import (
"context"
"fmt"
"io"
2024-10-14 17:44:24 +03:00
"log/slog"
2023-09-06 08:22:07 +03:00
"net/url"
"strconv"
"time"
"github.com/AdguardTeam/AdGuardDNS/internal/agd"
2024-01-04 19:22:32 +03:00
"github.com/AdguardTeam/AdGuardDNS/internal/errcoll"
2023-09-06 08:22:07 +03:00
"github.com/AdguardTeam/AdGuardDNS/internal/profiledb"
"github.com/AdguardTeam/golibs/errors"
2024-03-11 12:21:07 +03:00
"github.com/AdguardTeam/golibs/netutil"
2024-10-14 17:44:24 +03:00
"github.com/c2h5oh/datasize"
"google.golang.org/grpc"
2023-09-06 08:22:07 +03:00
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/timestamppb"
)
// ProfileStorageConfig is the configuration for the business logic backend
// profile storage.
type ProfileStorageConfig struct {
2024-03-11 12:21:07 +03:00
// BindSet is the subnet set created from DNS servers listening addresses.
2024-10-14 17:44:24 +03:00
// It must not be nil.
2024-03-11 12:21:07 +03:00
BindSet netutil.SubnetSet
2023-09-06 08:22:07 +03:00
// ErrColl is the error collector that is used to collect critical and
2024-10-14 17:44:24 +03:00
// non-critical errors. It must not be nil.
2024-01-04 19:22:32 +03:00
ErrColl errcoll.Interface
2023-09-06 08:22:07 +03:00
2024-12-05 14:19:25 +03:00
// Logger is used for logging the operation of the profile storage. It must
// not be nil.
2024-10-14 17:44:24 +03:00
Logger *slog.Logger
2024-12-05 14:19:25 +03:00
// GRPCMetrics is used for the collection of the protobuf communication
// statistics.
GRPCMetrics GRPCMetrics
// Metrics is used for the collection of the profiles storage statistics.
Metrics ProfileDBMetrics
2024-10-14 17:44:24 +03:00
2023-09-06 08:22:07 +03:00
// Endpoint is the backend API URL. The scheme should be either "grpc" or
2024-10-14 17:44:24 +03:00
// "grpcs". It must not be nil.
2023-09-06 08:22:07 +03:00
Endpoint *url.URL
2024-07-10 19:49:07 +03:00
2024-10-14 17:44:24 +03:00
// APIKey is the API key used for authentication, if any. If empty, no
// authentication is performed.
2024-07-10 19:49:07 +03:00
APIKey string
2024-10-14 17:44:24 +03:00
// ResponseSizeEstimate is the estimate of the size of one DNS response for
// the purposes of custom ratelimiting. Responses over this estimate are
// counted as several responses.
ResponseSizeEstimate datasize.ByteSize
// MaxProfilesSize is the maximum response size for the profiles endpoint.
MaxProfilesSize datasize.ByteSize
2023-09-06 08:22:07 +03:00
}
// ProfileStorage is the implementation of the [profiledb.Storage] interface
// that retrieves the profile and device information from the business logic
// backend. It is safe for concurrent use.
type ProfileStorage struct {
2024-10-14 17:44:24 +03:00
bindSet netutil.SubnetSet
errColl errcoll.Interface
client DNSServiceClient
logger *slog.Logger
2024-12-05 14:19:25 +03:00
grpcMetrics GRPCMetrics
metrics ProfileDBMetrics
2024-10-14 17:44:24 +03:00
apiKey string
respSzEst datasize.ByteSize
maxProfSize datasize.ByteSize
2023-09-06 08:22:07 +03:00
}
// NewProfileStorage returns a new [ProfileStorage] that retrieves information
// from the business logic backend.
func NewProfileStorage(c *ProfileStorageConfig) (s *ProfileStorage, err error) {
client, err := newClient(c.Endpoint)
if err != nil {
// Don't wrap the error, because it's informative enough as is.
return nil, err
}
return &ProfileStorage{
2024-10-14 17:44:24 +03:00
bindSet: c.BindSet,
errColl: c.ErrColl,
2024-11-08 16:26:22 +03:00
client: NewDNSServiceClient(client),
2024-10-14 17:44:24 +03:00
logger: c.Logger,
2024-12-05 14:19:25 +03:00
grpcMetrics: c.GRPCMetrics,
2024-10-14 17:44:24 +03:00
metrics: c.Metrics,
apiKey: c.APIKey,
respSzEst: c.ResponseSizeEstimate,
maxProfSize: c.MaxProfilesSize,
2023-09-06 08:22:07 +03:00
}, nil
}
// type check
var _ profiledb.Storage = (*ProfileStorage)(nil)
2024-07-10 19:49:07 +03:00
// CreateAutoDevice implements the [profile.Storage] interface for
// *ProfileStorage.
func (s *ProfileStorage) CreateAutoDevice(
ctx context.Context,
req *profiledb.StorageCreateAutoDeviceRequest,
) (resp *profiledb.StorageCreateAutoDeviceResponse, err error) {
defer func() {
err = errors.Annotate(
err,
"creating auto device for profile %q and human id %q: %w",
req.ProfileID,
req.HumanID,
)
}()
ctx = ctxWithAuthentication(ctx, s.apiKey)
backendResp, err := s.client.CreateDeviceByHumanId(ctx, &CreateDeviceRequest{
DnsId: string(req.ProfileID),
HumanId: string(req.HumanID),
DeviceType: DeviceType(req.DeviceType),
})
if err != nil {
2024-12-05 14:19:25 +03:00
return nil, fmt.Errorf("calling backend: %w", fixGRPCError(ctx, s.grpcMetrics, err))
2024-07-10 19:49:07 +03:00
}
d, err := backendResp.Device.toInternal(s.bindSet)
if err != nil {
return nil, fmt.Errorf("converting device: %w", err)
}
return &profiledb.StorageCreateAutoDeviceResponse{
Device: d,
}, nil
}
2023-09-06 08:22:07 +03:00
// Profiles implements the [profiledb.Storage] interface for *ProfileStorage.
func (s *ProfileStorage) Profiles(
ctx context.Context,
2024-07-10 19:49:07 +03:00
req *profiledb.StorageProfilesRequest,
) (resp *profiledb.StorageProfilesResponse, err error) {
ctx = ctxWithAuthentication(ctx, s.apiKey)
2024-10-14 17:44:24 +03:00
// #nosec G115 -- The value of limit comes from validated environment
// variables.
respSzOpt := grpc.MaxCallRecvMsgSize(int(s.maxProfSize.Bytes()))
stream, err := s.client.GetDNSProfiles(ctx, toProtobuf(req), respSzOpt)
2023-09-06 08:22:07 +03:00
if err != nil {
2024-12-05 14:19:25 +03:00
return nil, fmt.Errorf("loading profiles: %w", fixGRPCError(ctx, s.grpcMetrics, err))
2023-09-06 08:22:07 +03:00
}
defer func() { err = errors.WithDeferred(err, stream.CloseSend()) }()
2024-07-10 19:49:07 +03:00
resp = &profiledb.StorageProfilesResponse{
2023-09-06 08:22:07 +03:00
Profiles: []*agd.Profile{},
Devices: []*agd.Device{},
}
stats := &profilesCallStats{
2024-10-14 17:44:24 +03:00
logger: s.logger,
2024-01-04 19:22:32 +03:00
isFullSync: req.SyncTime.IsZero(),
2023-09-06 08:22:07 +03:00
}
2024-07-10 19:49:07 +03:00
for n := 1; ; n++ {
2023-09-06 08:22:07 +03:00
stats.startRecv()
profile, profErr := stream.Recv()
if profErr != nil {
if errors.Is(profErr, io.EOF) {
break
}
2024-10-14 17:44:24 +03:00
return nil, fmt.Errorf(
"receiving profile #%d: %w",
n,
2024-12-05 14:19:25 +03:00
fixGRPCError(ctx, s.grpcMetrics, profErr),
2024-10-14 17:44:24 +03:00
)
2023-09-06 08:22:07 +03:00
}
stats.endRecv()
stats.startDec()
2024-10-14 17:44:24 +03:00
prof, devices, profErr := profile.toInternal(
ctx,
time.Now(),
s.bindSet,
s.errColl,
2024-12-05 14:19:25 +03:00
s.logger,
2024-10-14 17:44:24 +03:00
s.metrics,
s.respSzEst,
)
2023-09-06 08:22:07 +03:00
if profErr != nil {
2024-12-05 14:19:25 +03:00
errcoll.Collect(ctx, s.errColl, s.logger, "loading profile", profErr)
2023-09-06 08:22:07 +03:00
continue
}
stats.endDec()
resp.Profiles = append(resp.Profiles, prof)
resp.Devices = append(resp.Devices, devices...)
}
2024-10-14 17:44:24 +03:00
stats.report(ctx, s.metrics)
2023-09-06 08:22:07 +03:00
trailer := stream.Trailer()
resp.SyncTime, err = syncTimeFromTrailer(trailer)
if err != nil {
return nil, fmt.Errorf("retrieving sync_time: %w", err)
}
return resp, nil
}
// toProtobuf converts a storage request structure into the protobuf structure.
2024-07-10 19:49:07 +03:00
func toProtobuf(r *profiledb.StorageProfilesRequest) (req *DNSProfilesRequest) {
2023-09-06 08:22:07 +03:00
return &DNSProfilesRequest{
SyncTime: timestamppb.New(r.SyncTime),
}
}
// syncTimeFromTrailer returns sync time from trailer metadata. Trailer
// metadata must contain "sync_time" field with milliseconds presentation of
// sync time.
func syncTimeFromTrailer(trailer metadata.MD) (syncTime time.Time, err error) {
st := trailer.Get("sync_time")
if len(st) == 0 {
return syncTime, fmt.Errorf("empty value")
}
syncTimeMs, err := strconv.ParseInt(st[0], 10, 64)
if err != nil {
2024-07-10 19:49:07 +03:00
return syncTime, fmt.Errorf("bad value: %w", err)
2023-09-06 08:22:07 +03:00
}
return time.Unix(0, syncTimeMs*time.Millisecond.Nanoseconds()), nil
}