From cae4b26277cd9cfa178177b504544218987e9b2a Mon Sep 17 00:00:00 2001 From: Don Browne Date: Wed, 26 Jun 2024 08:45:36 +0100 Subject: [PATCH] Start using new eval history table (#3703) This PR wires the eval history service into the evaluation engine. It is gated behind a feature flag while we experiment with it. A follow on PR will add logging for remediation and alerting. Relates to #3556 --- database/query/eval_history.sql | 2 +- internal/controlplane/server.go | 3 +- internal/db/eval_history.sql.go | 2 +- internal/engine/eval_status.go | 53 ++++++++++++++++++++++++----- internal/engine/executor.go | 11 +++++- internal/engine/executor_test.go | 14 ++++++++ internal/engine/metrics.go | 23 +++---------- internal/flags/constants.go | 2 ++ internal/history/mock/service.go | 57 ++++++++++++++++++++++++++++++++ internal/history/service.go | 35 ++++++++++++-------- internal/history/service_test.go | 4 ++- internal/service/service.go | 6 ++++ 12 files changed, 167 insertions(+), 45 deletions(-) create mode 100644 internal/history/mock/service.go diff --git a/database/query/eval_history.sql b/database/query/eval_history.sql index b3481cc8ce..b9e6cd1744 100644 --- a/database/query/eval_history.sql +++ b/database/query/eval_history.sql @@ -65,5 +65,5 @@ INSERT INTO latest_evaluation_statuses( $1, $2 ) -ON CONFLICT (rule_entity_id, evaluation_history_id) DO UPDATE +ON CONFLICT (rule_entity_id) DO UPDATE SET evaluation_history_id = $2; \ No newline at end of file diff --git a/internal/controlplane/server.go b/internal/controlplane/server.go index ab2e3bb1c3..27390020a9 100644 --- a/internal/controlplane/server.go +++ b/internal/controlplane/server.go @@ -141,6 +141,7 @@ func NewServer( sessionService session.ProviderSessionService, projectDeleter projects.ProjectDeleter, projectCreator projects.ProjectCreator, + featureFlagClient *openfeature.Client, ) *Server { return &Server{ store: store, @@ -152,7 +153,7 @@ func NewServer( profiles: profileService, ruleTypes: ruleService, providerStore: providerStore, - featureFlags: openfeature.NewClient(cfg.Flags.AppName), + featureFlags: featureFlagClient, ghClient: &ghprov.ClientServiceImplementation{}, providerManager: providerManager, providerAuthManager: providerAuthManager, diff --git a/internal/db/eval_history.sql.go b/internal/db/eval_history.sql.go index cc0bda6003..1af9d96203 100644 --- a/internal/db/eval_history.sql.go +++ b/internal/db/eval_history.sql.go @@ -152,7 +152,7 @@ INSERT INTO latest_evaluation_statuses( $1, $2 ) -ON CONFLICT (rule_entity_id, evaluation_history_id) DO UPDATE +ON CONFLICT (rule_entity_id) DO UPDATE SET evaluation_history_id = $2 ` diff --git a/internal/engine/eval_status.go b/internal/engine/eval_status.go index 610ebe2369..690c347d69 100644 --- a/internal/engine/eval_status.go +++ b/internal/engine/eval_status.go @@ -28,6 +28,7 @@ import ( evalerrors "github.com/stacklok/minder/internal/engine/errors" engif "github.com/stacklok/minder/internal/engine/interfaces" ent "github.com/stacklok/minder/internal/entities" + "github.com/stacklok/minder/internal/flags" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" ) @@ -126,8 +127,21 @@ func (e *Executor) createOrUpdateEvalStatus( return nil } + // Get rule instance ID + // TODO: we should use the rule instance table in the evaluation process + // it should not be necessary to query it here. + ruleID, err := e.querier.GetIDByProfileEntityName(ctx, db.GetIDByProfileEntityNameParams{ + ProfileID: params.ProfileID, + EntityType: params.EntityType, + Name: params.Rule.Name, + }) + if err != nil { + return fmt.Errorf("unable to retrieve rule instance ID from DB: %w", err) + } + // Upsert evaluation - id, err := e.querier.UpsertRuleEvaluations(ctx, db.UpsertRuleEvaluationsParams{ + // TODO: replace this table with the evaluation statuses table + evalID, err := e.querier.UpsertRuleEvaluations(ctx, db.UpsertRuleEvaluationsParams{ ProfileID: params.ProfileID, RepositoryID: params.RepoID, ArtifactID: params.ArtifactID, @@ -136,7 +150,6 @@ func (e *Executor) createOrUpdateEvalStatus( PullRequestID: params.PullRequestID, RuleName: params.Rule.Name, }) - if err != nil { logger.Err(err).Msg("error upserting rule evaluation") return err @@ -148,9 +161,9 @@ func (e *Executor) createOrUpdateEvalStatus( return err } status := evalerrors.ErrorAsEvalStatus(params.GetEvalErr()) - e.metrics.CountEvalStatus(ctx, status, params.ProfileID, params.ProjectID, entityID, entityType) + e.metrics.CountEvalStatus(ctx, status, entityType) _, err = e.querier.UpsertRuleDetailsEval(ctx, db.UpsertRuleDetailsEvalParams{ - RuleEvalID: id, + RuleEvalID: evalID, Status: evalerrors.ErrorAsEvalStatus(params.GetEvalErr()), Details: evalerrors.ErrorAsEvalDetails(params.GetEvalErr()), }) @@ -161,9 +174,12 @@ func (e *Executor) createOrUpdateEvalStatus( } // Upsert remediation details + remediationStatus := evalerrors.ErrorAsRemediationStatus(params.GetActionsErr().RemediateErr) + e.metrics.CountRemediationStatus(ctx, remediationStatus) + _, err = e.querier.UpsertRuleDetailsRemediate(ctx, db.UpsertRuleDetailsRemediateParams{ - RuleEvalID: id, - Status: evalerrors.ErrorAsRemediationStatus(params.GetActionsErr().RemediateErr), + RuleEvalID: evalID, + Status: remediationStatus, Details: errorAsActionDetails(params.GetActionsErr().RemediateErr), Metadata: params.GetActionsErr().RemediateMeta, }) @@ -172,9 +188,12 @@ func (e *Executor) createOrUpdateEvalStatus( } // Upsert alert details + alertStatus := evalerrors.ErrorAsAlertStatus(params.GetActionsErr().AlertErr) + e.metrics.CountAlertStatus(ctx, alertStatus) + _, err = e.querier.UpsertRuleDetailsAlert(ctx, db.UpsertRuleDetailsAlertParams{ - RuleEvalID: id, - Status: evalerrors.ErrorAsAlertStatus(params.GetActionsErr().AlertErr), + RuleEvalID: evalID, + Status: alertStatus, Details: errorAsActionDetails(params.GetActionsErr().AlertErr), Metadata: params.GetActionsErr().AlertMeta, }) @@ -182,6 +201,24 @@ func (e *Executor) createOrUpdateEvalStatus( logger.Err(err).Msg("error upserting rule alert details") } + if flags.Bool(ctx, e.featureFlags, flags.EvalHistory) { + // Log in the evaluation history tables + _, err = db.WithTransaction(e.querier, func(qtx db.ExtendQuerier) (uuid.UUID, error) { + return e.historyService.StoreEvaluationStatus( + ctx, + qtx, + ruleID, + params.EntityType, + entityID, + params.GetEvalErr(), + ) + }) + if err != nil { + logger.Err(err).Msg("error logging evaluation status") + return err + } + } + return err } diff --git a/internal/engine/executor.go b/internal/engine/executor.go index 9b61ad1991..05cf885500 100644 --- a/internal/engine/executor.go +++ b/internal/engine/executor.go @@ -22,6 +22,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/google/uuid" + "github.com/open-feature/go-sdk/openfeature" "github.com/rs/zerolog" "github.com/stacklok/minder/internal/db" @@ -33,6 +34,7 @@ import ( "github.com/stacklok/minder/internal/engine/ingestcache" engif "github.com/stacklok/minder/internal/engine/interfaces" "github.com/stacklok/minder/internal/events" + "github.com/stacklok/minder/internal/history" minderlogger "github.com/stacklok/minder/internal/logger" "github.com/stacklok/minder/internal/profiles" "github.com/stacklok/minder/internal/providers/manager" @@ -61,6 +63,8 @@ type Executor struct { terminationcontext context.Context providerManager manager.ProviderManager metrics *ExecutorMetrics + historyService history.EvaluationHistoryService + featureFlags openfeature.IClient } // NewExecutor creates a new executor @@ -71,6 +75,8 @@ func NewExecutor( providerManager manager.ProviderManager, handlerMiddleware []message.HandlerMiddleware, metrics *ExecutorMetrics, + historyService history.EvaluationHistoryService, + featureFlags openfeature.IClient, ) *Executor { return &Executor{ querier: querier, @@ -80,6 +86,8 @@ func NewExecutor( handlerMiddleware: handlerMiddleware, providerManager: providerManager, metrics: metrics, + historyService: historyService, + featureFlags: featureFlags, } } @@ -390,7 +398,8 @@ func (e *Executor) releaseLockAndFlush( func logEval( ctx context.Context, inf *entities.EntityInfoWrapper, - params *engif.EvalStatusParams) { + params *engif.EvalStatusParams, +) { evalLog := params.DecorateLogger( zerolog.Ctx(ctx).With(). Str("eval_status", string(evalerrors.ErrorAsEvalStatus(params.GetEvalErr()))). diff --git a/internal/engine/executor_test.go b/internal/engine/executor_test.go index 8583b6e7ce..f55fee1bbd 100644 --- a/internal/engine/executor_test.go +++ b/internal/engine/executor_test.go @@ -42,6 +42,8 @@ import ( "github.com/stacklok/minder/internal/engine/actions/remediate" "github.com/stacklok/minder/internal/engine/entities" "github.com/stacklok/minder/internal/events" + "github.com/stacklok/minder/internal/flags" + mock_history "github.com/stacklok/minder/internal/history/mock" "github.com/stacklok/minder/internal/logger" "github.com/stacklok/minder/internal/metrics/meters" "github.com/stacklok/minder/internal/providers" @@ -255,6 +257,15 @@ default allow = true`, Details: "", }).Return(ruleEvalDetailsId, nil) + ruleInstanceID := uuid.New() + mockStore.EXPECT(). + GetIDByProfileEntityName(gomock.Any(), db.GetIDByProfileEntityNameParams{ + ProfileID: profileID, + EntityType: db.EntitiesRepository, + Name: passthroughRuleType, + }). + Return(ruleInstanceID, nil) + // Mock upserting remediate status ruleEvalRemediationId := uuid.New() mockStore.EXPECT(). @@ -350,6 +361,7 @@ default allow = true`, execMetrics, err := engine.NewExecutorMetrics(&meters.NoopMeterFactory{}) require.NoError(t, err) + historyService := mock_history.NewMockEvaluationHistoryService(ctrl) e := engine.NewExecutor( ctx, @@ -358,6 +370,8 @@ default allow = true`, providerManager, []message.HandlerMiddleware{}, execMetrics, + historyService, + &flags.FakeClient{}, ) require.NoError(t, err, "expected no error") diff --git a/internal/engine/metrics.go b/internal/engine/metrics.go index cf99d1a116..085468798f 100644 --- a/internal/engine/metrics.go +++ b/internal/engine/metrics.go @@ -18,7 +18,6 @@ import ( "context" "fmt" - "github.com/google/uuid" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -68,15 +67,9 @@ func NewExecutorMetrics(meterFactory meters.MeterFactory) (*ExecutorMetrics, err func (e *ExecutorMetrics) CountEvalStatus( ctx context.Context, status db.EvalStatusTypes, - profileID uuid.UUID, - projectID uuid.UUID, - entityID uuid.UUID, entityType db.Entities, ) { e.evalCounter.Add(ctx, 1, metric.WithAttributes( - attribute.String("profile_id", profileID.String()), - attribute.String("project_id", projectID.String()), - attribute.String("entity_id", entityID.String()), attribute.String("entity_type", string(entityType)), attribute.String("status", string(status)), )) @@ -85,27 +78,19 @@ func (e *ExecutorMetrics) CountEvalStatus( // CountRemediationStatus counts remediation events by status. func (e *ExecutorMetrics) CountRemediationStatus( ctx context.Context, - status string, - evalID uuid.UUID, - projectID uuid.UUID, + status db.RemediationStatusTypes, ) { e.evalCounter.Add(ctx, 1, metric.WithAttributes( - attribute.String("profile_id", evalID.String()), - attribute.String("project_id", projectID.String()), - attribute.String("status", status), + attribute.String("status", string(status)), )) } // CountAlertStatus counts alert events by status. func (e *ExecutorMetrics) CountAlertStatus( ctx context.Context, - status string, - evalID uuid.UUID, - projectID uuid.UUID, + status db.AlertStatusTypes, ) { e.evalCounter.Add(ctx, 1, metric.WithAttributes( - attribute.String("profile_id", evalID.String()), - attribute.String("project_id", projectID.String()), - attribute.String("status", status), + attribute.String("status", string(status)), )) } diff --git a/internal/flags/constants.go b/internal/flags/constants.go index b14d8d45e5..2546590022 100644 --- a/internal/flags/constants.go +++ b/internal/flags/constants.go @@ -18,4 +18,6 @@ package flags const ( // UserManagement enables user management, i.e. invitations, role assignments, etc. UserManagement Experiment = "user_management" + // EvalHistory enables logging of evaluation history in the new tables. + EvalHistory Experiment = "eval_history" ) diff --git a/internal/history/mock/service.go b/internal/history/mock/service.go new file mode 100644 index 0000000000..4dd2249204 --- /dev/null +++ b/internal/history/mock/service.go @@ -0,0 +1,57 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./service.go +// +// Generated by this command: +// +// mockgen -package mock_history -destination=./mock/service.go -source=./service.go +// + +// Package mock_history is a generated GoMock package. +package mock_history + +import ( + context "context" + reflect "reflect" + + uuid "github.com/google/uuid" + db "github.com/stacklok/minder/internal/db" + gomock "go.uber.org/mock/gomock" +) + +// MockEvaluationHistoryService is a mock of EvaluationHistoryService interface. +type MockEvaluationHistoryService struct { + ctrl *gomock.Controller + recorder *MockEvaluationHistoryServiceMockRecorder +} + +// MockEvaluationHistoryServiceMockRecorder is the mock recorder for MockEvaluationHistoryService. +type MockEvaluationHistoryServiceMockRecorder struct { + mock *MockEvaluationHistoryService +} + +// NewMockEvaluationHistoryService creates a new mock instance. +func NewMockEvaluationHistoryService(ctrl *gomock.Controller) *MockEvaluationHistoryService { + mock := &MockEvaluationHistoryService{ctrl: ctrl} + mock.recorder = &MockEvaluationHistoryServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEvaluationHistoryService) EXPECT() *MockEvaluationHistoryServiceMockRecorder { + return m.recorder +} + +// StoreEvaluationStatus mocks base method. +func (m *MockEvaluationHistoryService) StoreEvaluationStatus(ctx context.Context, qtx db.Querier, ruleID uuid.UUID, entityType db.Entities, entityID uuid.UUID, evalError error) (uuid.UUID, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StoreEvaluationStatus", ctx, qtx, ruleID, entityType, entityID, evalError) + ret0, _ := ret[0].(uuid.UUID) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StoreEvaluationStatus indicates an expected call of StoreEvaluationStatus. +func (mr *MockEvaluationHistoryServiceMockRecorder) StoreEvaluationStatus(ctx, qtx, ruleID, entityType, entityID, evalError any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreEvaluationStatus", reflect.TypeOf((*MockEvaluationHistoryService)(nil).StoreEvaluationStatus), ctx, qtx, ruleID, entityType, entityID, evalError) +} diff --git a/internal/history/service.go b/internal/history/service.go index 35e096436e..c38e834049 100644 --- a/internal/history/service.go +++ b/internal/history/service.go @@ -28,8 +28,12 @@ import ( evalerrors "github.com/stacklok/minder/internal/engine/errors" ) -// EvaluationHistoryService contains methods to access the eval history log +//go:generate go run go.uber.org/mock/mockgen -package mock_$GOPACKAGE -destination=./mock/$GOFILE -source=./$GOFILE + +// EvaluationHistoryService contains methods to add/query data in the history table. type EvaluationHistoryService interface { + // StoreEvaluationStatus stores the result of this evaluation in the history table. + // Returns the UUID of the evaluation status. StoreEvaluationStatus( ctx context.Context, qtx db.Querier, @@ -37,7 +41,7 @@ type EvaluationHistoryService interface { entityType db.Entities, entityID uuid.UUID, evalError error, - ) error + ) (uuid.UUID, error) } // NewEvaluationHistoryService creates a new instance of EvaluationHistoryService @@ -54,14 +58,14 @@ func (e *evaluationHistoryService) StoreEvaluationStatus( entityType db.Entities, entityID uuid.UUID, evalError error, -) error { +) (uuid.UUID, error) { var ruleEntityID, evaluationID uuid.UUID status := evalerrors.ErrorAsEvalStatus(evalError) details := evalerrors.ErrorAsEvalDetails(evalError) params, err := paramsFromEntity(ruleID, entityID, entityType) if err != nil { - return err + return uuid.Nil, err } // find the latest record for this rule/entity pair @@ -85,10 +89,10 @@ func (e *evaluationHistoryService) StoreEvaluationStatus( }, ) if err != nil { - return fmt.Errorf("error while creating new rule/entity in database: %w", err) + return uuid.Nil, fmt.Errorf("error while creating new rule/entity in database: %w", err) } } else { - return fmt.Errorf("error while querying DB: %w", err) + return uuid.Nil, fmt.Errorf("error while querying DB: %w", err) } } else { ruleEntityID = latestRecord.RuleEntityID @@ -101,16 +105,16 @@ func (e *evaluationHistoryService) StoreEvaluationStatus( if evaluationID == uuid.Nil || previousDetails != details || previousStatus != status { // if there is no prior state for this rule/entity, or the previous state // differs from the current one, create a new status record. - if err = e.createNewStatus(ctx, qtx, ruleEntityID, status, details); err != nil { - return fmt.Errorf("error while creating new evaluation status for rule/entity %s: %w", ruleEntityID, err) + if evaluationID, err = e.createNewStatus(ctx, qtx, ruleEntityID, status, details); err != nil { + return uuid.Nil, fmt.Errorf("error while creating new evaluation status for rule/entity %s: %w", ruleEntityID, err) } } else { if err = e.updateExistingStatus(ctx, qtx, entityID, latestRecord.EvaluationTimes); err != nil { - return fmt.Errorf("error while updating existing evaluation status for rule/entity %s: %w", ruleEntityID, err) + return uuid.Nil, fmt.Errorf("error while updating existing evaluation status for rule/entity %s: %w", ruleEntityID, err) } } - return nil + return evaluationID, nil } func (_ *evaluationHistoryService) createNewStatus( @@ -119,7 +123,7 @@ func (_ *evaluationHistoryService) createNewStatus( ruleEntityID uuid.UUID, status db.EvalStatusTypes, details string, -) error { +) (uuid.UUID, error) { newEvaluationID, err := qtx.InsertEvaluationStatus(ctx, db.InsertEvaluationStatusParams{ RuleEntityID: ruleEntityID, @@ -128,16 +132,21 @@ func (_ *evaluationHistoryService) createNewStatus( }, ) if err != nil { - return err + return uuid.Nil, err } // mark this as the latest status for this rule/entity - return qtx.UpsertLatestEvaluationStatus(ctx, + err = qtx.UpsertLatestEvaluationStatus(ctx, db.UpsertLatestEvaluationStatusParams{ RuleEntityID: ruleEntityID, EvaluationHistoryID: newEvaluationID, }, ) + if err != nil { + return uuid.Nil, err + } + + return newEvaluationID, err } func (_ *evaluationHistoryService) updateExistingStatus( diff --git a/internal/history/service_test.go b/internal/history/service_test.go index 446e68e090..7619f5c8fc 100644 --- a/internal/history/service_test.go +++ b/internal/history/service_test.go @@ -142,10 +142,12 @@ func TestStoreEvaluationStatus(t *testing.T) { } service := history.NewEvaluationHistoryService() - err := service.StoreEvaluationStatus(ctx, store, ruleID, scenario.EntityType, entityID, errTest) + id, err := service.StoreEvaluationStatus(ctx, store, ruleID, scenario.EntityType, entityID, errTest) if scenario.ExpectedError == "" { + require.Equal(t, evaluationID, id) require.NoError(t, err) } else { + require.Equal(t, uuid.Nil, id) require.ErrorContains(t, err, scenario.ExpectedError) } }) diff --git a/internal/service/service.go b/internal/service/service.go index 37b368068d..6878acbacd 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/ThreeDotsLabs/watermill/message" + "github.com/open-feature/go-sdk/openfeature" "golang.org/x/sync/errgroup" "github.com/stacklok/minder/internal/auth" @@ -34,6 +35,7 @@ import ( "github.com/stacklok/minder/internal/engine" "github.com/stacklok/minder/internal/events" "github.com/stacklok/minder/internal/flags" + "github.com/stacklok/minder/internal/history" "github.com/stacklok/minder/internal/marketplaces" "github.com/stacklok/minder/internal/metrics/meters" "github.com/stacklok/minder/internal/profiles" @@ -134,6 +136,7 @@ func AllInOneServerService( repos := github.NewRepositoryService(whManager, store, evt, providerManager) projectDeleter := projects.NewProjectDeleter(authzClient, providerManager) sessionsService := session.NewProviderSessionService(providerManager, providerStore, store) + featureFlagClient := openfeature.NewClient(cfg.Flags.AppName) s := controlplane.NewServer( store, @@ -154,6 +157,7 @@ func AllInOneServerService( sessionsService, projectDeleter, projectCreator, + featureFlagClient, ) // Subscribe to events from the identity server @@ -181,6 +185,8 @@ func AllInOneServerService( providerManager, executorMiddleware, executorMetrics, + history.NewEvaluationHistoryService(), + featureFlagClient, ) evt.ConsumeEvents(exec)