fix: [code-2912]: issue with pq sql (#3185)

* requested changes
* fix issue with pqsql
This commit is contained in:
Enver Biševac 2024-12-20 17:41:41 +00:00 committed by Harness
parent db38802e83
commit a178e629ea
2 changed files with 21 additions and 21 deletions

View File

@ -39,7 +39,7 @@ type SpaceStore interface {
FindByIDs(ctx context.Context, spaceIDs ...int64) ([]*types.Space, error) FindByIDs(ctx context.Context, spaceIDs ...int64) ([]*types.Space, error)
} }
type Store interface { type MetricStore interface {
UpsertOptimistic(ctx context.Context, in *types.UsageMetric) error UpsertOptimistic(ctx context.Context, in *types.UsageMetric) error
GetMetrics( GetMetrics(
ctx context.Context, ctx context.Context,
@ -66,8 +66,8 @@ type Mediator struct {
spaces map[string]Size spaces map[string]Size
workers []*worker workers []*worker
spaceStore SpaceStore spaceStore SpaceStore
usageMetricsStore Store metricsStore MetricStore
wg sync.WaitGroup wg sync.WaitGroup
@ -77,17 +77,17 @@ type Mediator struct {
func NewMediator( func NewMediator(
ctx context.Context, ctx context.Context,
spaceStore SpaceStore, spaceStore SpaceStore,
usageMetricsStore Store, usageMetricsStore MetricStore,
config Config, config Config,
) *Mediator { ) *Mediator {
m := &Mediator{ m := &Mediator{
queue: newQueue(), queue: newQueue(),
chunks: make(map[string]Size), chunks: make(map[string]Size),
spaces: make(map[string]Size), spaces: make(map[string]Size),
spaceStore: spaceStore, spaceStore: spaceStore,
usageMetricsStore: usageMetricsStore, metricsStore: usageMetricsStore,
workers: make([]*worker, config.MaxWorkers), workers: make([]*worker, config.MaxWorkers),
config: config, config: config,
} }
m.initialize(ctx) m.initialize(ctx)
@ -133,7 +133,7 @@ func (m *Mediator) initialize(ctx context.Context) {
now := time.Now() now := time.Now()
metrics, err := m.usageMetricsStore.List(ctx, now.Add(-m.days30()).UnixMilli(), now.UnixMilli()) metrics, err := m.metricsStore.List(ctx, now.Add(-m.days30()).UnixMilli(), now.UnixMilli())
if err != nil { if err != nil {
log.Ctx(ctx).Err(err).Msg("failed to list usage metrics") log.Ctx(ctx).Err(err).Msg("failed to list usage metrics")
return return
@ -187,7 +187,7 @@ func (m *Mediator) process(ctx context.Context, payload *Metric) {
return return
} }
if err = m.usageMetricsStore.UpsertOptimistic(ctx, &types.UsageMetric{ if err = m.metricsStore.UpsertOptimistic(ctx, &types.UsageMetric{
RootSpaceID: space.ID, RootSpaceID: space.ID,
Bandwidth: newSize.Bandwidth, Bandwidth: newSize.Bandwidth,
Storage: newSize.Storage, Storage: newSize.Storage,
@ -202,7 +202,7 @@ func (m *Mediator) process(ctx context.Context, payload *Metric) {
now := time.Now() now := time.Now()
metric, err := m.usageMetricsStore.GetMetrics(ctx, space.ID, now.Add(-m.days30()).UnixMilli(), now.UnixMilli()) metric, err := m.metricsStore.GetMetrics(ctx, space.ID, now.Add(-m.days30()).UnixMilli(), now.UnixMilli())
if err != nil { if err != nil {
log.Ctx(ctx).Err(err).Msg("failed to get usage metrics") log.Ctx(ctx).Err(err).Msg("failed to get usage metrics")
return return

View File

@ -51,7 +51,7 @@ func (s *UsageMetricsStore) getVersion(
SELECT SELECT
usage_metric_version usage_metric_version
FROM usage_metrics FROM usage_metrics
WHERE usage_metric_space_id = ? AND usage_metric_date = ? WHERE usage_metric_space_id = $1 AND usage_metric_date = $2
` `
var version int64 var version int64
err := s.db.QueryRowContext(ctx, sqlQuery, rootSpaceID, date).Scan(&version) err := s.db.QueryRowContext(ctx, sqlQuery, rootSpaceID, date).Scan(&version)
@ -85,9 +85,9 @@ func (s *UsageMetricsStore) Upsert(ctx context.Context, in *types.UsageMetric) e
SET SET
usage_metric_version = EXCLUDED.usage_metric_version usage_metric_version = EXCLUDED.usage_metric_version
,usage_metric_updated = EXCLUDED.usage_metric_updated ,usage_metric_updated = EXCLUDED.usage_metric_updated
,usage_metric_bandwidth = usage_metric_bandwidth + EXCLUDED.usage_metric_bandwidth ,usage_metric_bandwidth = usage_metrics.usage_metric_bandwidth + EXCLUDED.usage_metric_bandwidth
,usage_metric_storage = usage_metric_storage + EXCLUDED.usage_metric_storage ,usage_metric_storage = usage_metrics.usage_metric_storage + EXCLUDED.usage_metric_storage
WHERE usage_metric_version = EXCLUDED.usage_metric_version - 1` WHERE usage_metrics.usage_metric_version = EXCLUDED.usage_metric_version - 1`
db := dbtx.GetAccessor(ctx, s.db) db := dbtx.GetAccessor(ctx, s.db)
today := s.Date(time.Now()) today := s.Date(time.Now())
@ -146,8 +146,8 @@ func (s *UsageMetricsStore) GetMetrics(
COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage
FROM usage_metrics FROM usage_metrics
WHERE WHERE
usage_metric_space_id = ? AND usage_metric_space_id = $1 AND
usage_metric_date BETWEEN ? AND ?` usage_metric_date BETWEEN $2 AND $3`
result := &types.UsageMetric{ result := &types.UsageMetric{
RootSpaceID: rootSpaceID, RootSpaceID: rootSpaceID,
@ -185,7 +185,7 @@ func (s *UsageMetricsStore) List(
COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage
FROM usage_metrics FROM usage_metrics
WHERE WHERE
usage_metric_date BETWEEN ? AND ? usage_metric_date BETWEEN $1 AND $2
GROUP BY usage_metric_space_id GROUP BY usage_metric_space_id
ORDER BY usage_metric_bandwidth DESC, usage_metric_storage DESC` ORDER BY usage_metric_bandwidth DESC, usage_metric_storage DESC`