diff --git a/internal/controlplane/handlers_entities.go b/internal/controlplane/handlers_entities.go index f05c3ca11c..a2b5439335 100644 --- a/internal/controlplane/handlers_entities.go +++ b/internal/controlplane/handlers_entities.go @@ -16,12 +16,12 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/engcontext" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/logger" "github.com/mindersec/minder/internal/providers" "github.com/mindersec/minder/internal/reconcilers/messages" "github.com/mindersec/minder/internal/util" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) // ReconcileEntityRegistration reconciles the registration of an entity. @@ -100,7 +100,7 @@ func (s *Server) ReconcileEntityRegistration( func (s *Server) publishEntityMessage(l *zerolog.Logger, msg *message.Message) error { l.Info().Str("messageID", msg.UUID).Msg("publishing register entities message for execution") - return s.evt.Publish(events.TopicQueueReconcileEntityAdd, msg) + return s.evt.Publish(constants.TopicQueueReconcileEntityAdd, msg) } func createEntityMessage( diff --git a/internal/controlplane/handlers_entities_test.go b/internal/controlplane/handlers_entities_test.go index a6533f22f8..b000c42d58 100644 --- a/internal/controlplane/handlers_entities_test.go +++ b/internal/controlplane/handlers_entities_test.go @@ -13,12 +13,12 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/engcontext" - mockevents "github.com/mindersec/minder/internal/events/mock" mockgh "github.com/mindersec/minder/internal/providers/github/mock" "github.com/mindersec/minder/internal/providers/manager" mockmanager "github.com/mindersec/minder/internal/providers/manager/mock" rf "github.com/mindersec/minder/internal/repositories/mock/fixtures" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + mockevents "github.com/mindersec/minder/pkg/eventer/interfaces/mock" ) func TestServer_ReconcileEntityRegistration(t *testing.T) { diff --git a/internal/controlplane/handlers_oauth_test.go b/internal/controlplane/handlers_oauth_test.go index 4bb2d04054..d70440e016 100644 --- a/internal/controlplane/handlers_oauth_test.go +++ b/internal/controlplane/handlers_oauth_test.go @@ -38,7 +38,6 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/engcontext" mockprops "github.com/mindersec/minder/internal/entities/properties/service/mock" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers" "github.com/mindersec/minder/internal/providers/dockerhub" mockclients "github.com/mindersec/minder/internal/providers/github/clients/mock" @@ -51,6 +50,7 @@ import ( "github.com/mindersec/minder/internal/providers/session" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" provinfv1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -378,7 +378,7 @@ func TestGetAuthorizationURL(t *testing.T) { store := mockdb.NewMockStore(ctrl) tc.buildStubs(store) - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -890,7 +890,7 @@ func TestHandleGitHubAppCallback(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -1046,7 +1046,7 @@ func TestVerifyProviderCredential(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/controlplane/handlers_reconciliationtasks.go b/internal/controlplane/handlers_reconciliationtasks.go index 9a0148bcd7..0842edf659 100644 --- a/internal/controlplane/handlers_reconciliationtasks.go +++ b/internal/controlplane/handlers_reconciliationtasks.go @@ -16,10 +16,10 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/engcontext" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/logger" reconcilers "github.com/mindersec/minder/internal/reconcilers/messages" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) // CreateEntityReconciliationTask creates a task to reconcile the state of an entity @@ -49,7 +49,7 @@ func (s *Server) CreateEntityReconciliationTask(ctx context.Context, if err != nil { return nil, err } - topic = events.TopicQueueReconcileRepoInit + topic = constants.TopicQueueReconcileRepoInit } else { return nil, status.Errorf(codes.InvalidArgument, "entity type %s is not supported", entity.GetType()) } diff --git a/internal/controlplane/handlers_user_test.go b/internal/controlplane/handlers_user_test.go index 84fda12104..75d07224e0 100644 --- a/internal/controlplane/handlers_user_test.go +++ b/internal/controlplane/handlers_user_test.go @@ -29,7 +29,6 @@ import ( "github.com/mindersec/minder/internal/authz/mock" mockcrypto "github.com/mindersec/minder/internal/crypto/mock" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/flags" "github.com/mindersec/minder/internal/marketplaces" "github.com/mindersec/minder/internal/projects" @@ -37,6 +36,7 @@ import ( mockprov "github.com/mindersec/minder/internal/providers/github/service/mock" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" ) const ( @@ -439,7 +439,7 @@ func TestDeleteUser_gRPC(t *testing.T) { })) defer testServer.Close() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/controlplane/server.go b/internal/controlplane/server.go index a9fceea316..1c53847574 100644 --- a/internal/controlplane/server.go +++ b/internal/controlplane/server.go @@ -47,7 +47,6 @@ import ( "github.com/mindersec/minder/internal/crypto" "github.com/mindersec/minder/internal/db" propSvc "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/history" "github.com/mindersec/minder/internal/invites" "github.com/mindersec/minder/internal/logger" @@ -63,6 +62,7 @@ import ( "github.com/mindersec/minder/internal/util" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" "github.com/mindersec/minder/pkg/profiles" "github.com/mindersec/minder/pkg/ruletypes" ) @@ -81,7 +81,7 @@ var ( type Server struct { store db.Store cfg *serverconfig.Config - evt events.Publisher + evt interfaces.Publisher mt metrics.Metrics grpcServer *grpc.Server jwt jwt.Validator @@ -126,7 +126,7 @@ type Server struct { // NewServer creates a new server instance func NewServer( store db.Store, - evt events.Publisher, + evt interfaces.Publisher, cfg *serverconfig.Config, serverMetrics metrics.Metrics, jwtValidator jwt.Validator, diff --git a/internal/controlplane/server_test.go b/internal/controlplane/server_test.go index bd8879fb9a..e35dfb08cd 100644 --- a/internal/controlplane/server_test.go +++ b/internal/controlplane/server_test.go @@ -33,13 +33,13 @@ import ( "github.com/mindersec/minder/internal/controlplane/metrics" "github.com/mindersec/minder/internal/crypto" mock_service "github.com/mindersec/minder/internal/entities/properties/service/mock" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers" ghclient "github.com/mindersec/minder/internal/providers/github/clients" ghService "github.com/mindersec/minder/internal/providers/github/service" mock_reposvc "github.com/mindersec/minder/internal/repositories/mock" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" ) const bufSize = 1024 * 1024 @@ -78,7 +78,7 @@ func newDefaultServer( ) *Server { t.Helper() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/eea/eea.go b/internal/eea/eea.go index a88cce4167..3fd6c50794 100644 --- a/internal/eea/eea.go +++ b/internal/eea/eea.go @@ -20,16 +20,17 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/entities" "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/manager" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // EEA is the Event Execution Aggregator type EEA struct { querier db.Store - evt events.Publisher + evt interfaces.Publisher cfg *serverconfig.AggregatorConfig entityFetcher service.PropertiesService @@ -37,7 +38,7 @@ type EEA struct { } // NewEEA creates a new EEA -func NewEEA(querier db.Store, evt events.Publisher, cfg *serverconfig.AggregatorConfig, +func NewEEA(querier db.Store, evt interfaces.Publisher, cfg *serverconfig.AggregatorConfig, ef service.PropertiesService, provMan manager.ProviderManager) *EEA { return &EEA{ querier: querier, @@ -49,8 +50,8 @@ func NewEEA(querier db.Store, evt events.Publisher, cfg *serverconfig.Aggregator } // Register implements the Consumer interface. -func (e *EEA) Register(r events.Registrar) { - r.Register(events.TopicQueueEntityFlush, e.FlushMessageHandler) +func (e *EEA) Register(r interfaces.Registrar) { + r.Register(constants.TopicQueueEntityFlush, e.FlushMessageHandler) } // AggregateMiddleware will pass on the event to the executor engine diff --git a/internal/eea/eea_test.go b/internal/eea/eea_test.go index dd1c19bd80..8b15ab62eb 100644 --- a/internal/eea/eea_test.go +++ b/internal/eea/eea_test.go @@ -27,10 +27,11 @@ import ( "github.com/mindersec/minder/internal/entities/properties" psvc "github.com/mindersec/minder/internal/entities/properties/service" propsvcmock "github.com/mindersec/minder/internal/entities/properties/service/mock" - "github.com/mindersec/minder/internal/events" mockmanager "github.com/mindersec/minder/internal/providers/manager/mock" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" + "github.com/mindersec/minder/pkg/eventer/constants" ) const ( @@ -55,7 +56,7 @@ func TestAggregator(t *testing.T) { projectID, repoID := createNeededEntities(ctx, t, testQueries) - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := eventer.New(ctx, nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{ BufferSize: concurrentEvents, @@ -86,7 +87,7 @@ func TestAggregator(t *testing.T) { aggr.Register(evt) // This tests that flushing sends messages to the executor engine - evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware) + evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware) go func() { t.Log("Running eventer") @@ -133,7 +134,7 @@ func TestAggregator(t *testing.T) { msg, err := inf.BuildMessage() require.NoError(t, err, "expected no error when building message") - err = evt.Publish(events.TopicQueueEntityFlush, msg.Copy()) + err = evt.Publish(constants.TopicQueueEntityFlush, msg.Copy()) require.NoError(t, err, "expected no error when publishing message") }() } @@ -345,14 +346,14 @@ func TestFlushAll(t *testing.T) { propsvc := propsvcmock.NewMockPropertiesService(ctrl) provman := mockmanager.NewMockProviderManager(ctrl) - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := eventer.New(ctx, nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) require.NoError(t, err) flushedMessages := newTestPubSub() - evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add) + evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add) go func() { t.Log("Running eventer") @@ -394,7 +395,7 @@ func TestFlushAllListFlushIsEmpty(t *testing.T) { require.NoError(t, err, "expected no error when creating embedded store") t.Cleanup(td) - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := eventer.New(ctx, nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -412,7 +413,7 @@ func TestFlushAllListFlushIsEmpty(t *testing.T) { flushedMessages := newTestPubSub() // This tests that flushing sends messages to the executor engine - evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware) + evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware) t.Log("Flushing all") require.NoError(t, aggr.FlushAll(ctx), "expected no error") @@ -433,14 +434,14 @@ func TestFlushAllListFlushFails(t *testing.T) { flushedMessages := newTestPubSub() - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := eventer.New(ctx, nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) require.NoError(t, err) // This tests that flushing sends messages to the executor engine - evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add) + evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add) go func() { t.Log("Running eventer") @@ -483,14 +484,14 @@ func TestFlushAllListFlushListsARepoThatGetsDeletedLater(t *testing.T) { flushedMessages := newTestPubSub() - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := eventer.New(ctx, nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) require.NoError(t, err) // This tests that flushing sends messages to the executor engine - evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add) + evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add) go func() { t.Log("Running eventer") diff --git a/internal/email/awsses/awsses.go b/internal/email/awsses/awsses.go index a17c178719..0abfb2e097 100644 --- a/internal/email/awsses/awsses.go +++ b/internal/email/awsses/awsses.go @@ -17,7 +17,7 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/email" - "github.com/mindersec/minder/internal/events" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) const ( @@ -47,7 +47,7 @@ func New(ctx context.Context, sender, region string) (*awsSES, error) { } // Register implements the Consumer interface. -func (a *awsSES) Register(reg events.Registrar) { +func (a *awsSES) Register(reg interfaces.Registrar) { reg.Register(email.TopicQueueInviteEmail, func(msg *message.Message) error { var e email.MailEventPayload diff --git a/internal/email/noop/noop.go b/internal/email/noop/noop.go index 1f3474f78d..649fae8002 100644 --- a/internal/email/noop/noop.go +++ b/internal/email/noop/noop.go @@ -12,7 +12,7 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/email" - "github.com/mindersec/minder/internal/events" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) type noop struct { @@ -24,7 +24,7 @@ func New() *noop { } // Register implements the Consumer interface. -func (_ *noop) Register(reg events.Registrar) { +func (_ *noop) Register(reg interfaces.Registrar) { reg.Register(email.TopicQueueInviteEmail, func(msg *message.Message) error { var e email.MailEventPayload diff --git a/internal/engine/entities/entity_event.go b/internal/engine/entities/entity_event.go index 0ef6b4ebc8..d45e1d7c45 100644 --- a/internal/engine/entities/entity_event.go +++ b/internal/engine/entities/entity_event.go @@ -12,8 +12,9 @@ import ( "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/reflect/protoreflect" - "github.com/mindersec/minder/internal/events" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // EntityInfoWrapper is a helper struct to gather information @@ -180,13 +181,13 @@ func (eiw *EntityInfoWrapper) BuildMessage() (*message.Message, error) { } // Publish builds a message.Message and publishes it to the event bus -func (eiw *EntityInfoWrapper) Publish(evt events.Publisher) error { +func (eiw *EntityInfoWrapper) Publish(evt interfaces.Publisher) error { msg, err := eiw.BuildMessage() if err != nil { return err } - if err := evt.Publish(events.TopicQueueEntityEvaluate, msg); err != nil { + if err := evt.Publish(constants.TopicQueueEntityEvaluate, msg); err != nil { return fmt.Errorf("error publishing entity event: %w", err) } diff --git a/internal/engine/handler.go b/internal/engine/handler.go index 65d87612bf..292d910d53 100644 --- a/internal/engine/handler.go +++ b/internal/engine/handler.go @@ -15,9 +15,10 @@ import ( "github.com/mindersec/minder/internal/engine/engcontext" "github.com/mindersec/minder/internal/engine/entities" - "github.com/mindersec/minder/internal/events" minderlogger "github.com/mindersec/minder/internal/logger" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) const ( @@ -32,7 +33,7 @@ const ( // ExecutorEventHandler is responsible for consuming entity events, passing // entities to the executor, and then publishing the results. type ExecutorEventHandler struct { - evt events.Publisher + evt interfaces.Publisher handlerMiddleware []message.HandlerMiddleware wgEntityEventExecution *sync.WaitGroup executor Executor @@ -46,7 +47,7 @@ type ExecutorEventHandler struct { // NewExecutorEventHandler creates the event handler for the executor func NewExecutorEventHandler( ctx context.Context, - evt events.Publisher, + evt interfaces.Publisher, handlerMiddleware []message.HandlerMiddleware, executor Executor, ) *ExecutorEventHandler { @@ -70,8 +71,8 @@ func NewExecutorEventHandler( } // Register implements the Consumer interface. -func (e *ExecutorEventHandler) Register(r events.Registrar) { - r.Register(events.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...) +func (e *ExecutorEventHandler) Register(r interfaces.Registrar) { + r.Register(constants.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...) } // Wait waits for all the entity executions to finish. @@ -169,7 +170,7 @@ func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error { } // Publish the result of the entity evaluation - if err := e.evt.Publish(events.TopicQueueEntityFlush, msg); err != nil { + if err := e.evt.Publish(constants.TopicQueueEntityFlush, msg); err != nil { logger.Err(err).Msg("error publishing flush event") } }() diff --git a/internal/engine/handler_test.go b/internal/engine/handler_test.go index 2b3a8f822f..c5aa3cc4cc 100644 --- a/internal/engine/handler_test.go +++ b/internal/engine/handler_test.go @@ -16,10 +16,11 @@ import ( "github.com/mindersec/minder/internal/engine" "github.com/mindersec/minder/internal/engine/entities" mockengine "github.com/mindersec/minder/internal/engine/mock" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/util/testqueue" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" + "github.com/mindersec/minder/pkg/eventer/constants" ) func TestExecutorEventHandler_handleEntityEvent(t *testing.T) { @@ -38,7 +39,7 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) { // -- end expectations - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{ BlockPublishUntilSubscriberAck: true, @@ -51,7 +52,7 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) { go func() { t.Log("Running eventer") - evt.Register(events.TopicQueueEntityFlush, pq.Pass) + evt.Register(constants.TopicQueueEntityFlush, pq.Pass) err := evt.Run(context.Background()) require.NoError(t, err, "failed to run eventer") }() diff --git a/internal/entities/handlers/handler.go b/internal/entities/handlers/handler.go index 6913c6e491..b619e7e96c 100644 --- a/internal/entities/handlers/handler.go +++ b/internal/entities/handlers/handler.go @@ -19,10 +19,11 @@ import ( "github.com/mindersec/minder/internal/entities/models" "github.com/mindersec/minder/internal/entities/properties" propertyService "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/projects/features" "github.com/mindersec/minder/internal/providers/manager" v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) var ( @@ -32,7 +33,7 @@ var ( ) type handleEntityAndDoBase struct { - evt events.Publisher + evt interfaces.Publisher store db.Store refreshEntity strategies.GetEntityStrategy @@ -45,7 +46,7 @@ type handleEntityAndDoBase struct { } // Register satisfies the events.Consumer interface. -func (b *handleEntityAndDoBase) Register(r events.Registrar) { +func (b *handleEntityAndDoBase) Register(r interfaces.Registrar) { r.Register(b.handlerName, b.handleRefreshEntityAndDo, b.handlerMiddleware...) } @@ -192,12 +193,12 @@ func (b *handleEntityAndDoBase) repoPrivateOrArchivedCheck( // NewRefreshByIDAndEvaluateHandler creates a new handler that refreshes an entity and evaluates it. func NewRefreshByIDAndEvaluateHandler( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) interfaces.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -205,8 +206,8 @@ func NewRefreshByIDAndEvaluateHandler( refreshEntity: entStrategies.NewRefreshEntityByIDStrategy(propSvc, provMgr, store), createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr), - handlerName: events.TopicQueueRefreshEntityByIDAndEvaluate, - forwardHandlerName: events.TopicQueueEntityEvaluate, + handlerName: constants.TopicQueueRefreshEntityByIDAndEvaluate, + forwardHandlerName: constants.TopicQueueEntityEvaluate, handlerMiddleware: handlerMiddleware, } @@ -214,12 +215,12 @@ func NewRefreshByIDAndEvaluateHandler( // NewRefreshEntityAndEvaluateHandler creates a new handler that refreshes an entity and evaluates it. func NewRefreshEntityAndEvaluateHandler( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) interfaces.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -227,8 +228,8 @@ func NewRefreshEntityAndEvaluateHandler( refreshEntity: entStrategies.NewRefreshEntityByUpstreamPropsStrategy(propSvc, provMgr, store), createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr), - handlerName: events.TopicQueueRefreshEntityAndEvaluate, - forwardHandlerName: events.TopicQueueEntityEvaluate, + handlerName: constants.TopicQueueRefreshEntityAndEvaluate, + forwardHandlerName: constants.TopicQueueEntityEvaluate, handlerMiddleware: handlerMiddleware, } @@ -236,11 +237,11 @@ func NewRefreshEntityAndEvaluateHandler( // NewGetEntityAndDeleteHandler creates a new handler that gets an entity and deletes it. func NewGetEntityAndDeleteHandler( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc propertyService.PropertiesService, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) interfaces.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -248,8 +249,8 @@ func NewGetEntityAndDeleteHandler( refreshEntity: entStrategies.NewGetEntityByUpstreamIDStrategy(propSvc), createMessage: msgStrategies.NewToMinderEntity(), - handlerName: events.TopicQueueGetEntityAndDelete, - forwardHandlerName: events.TopicQueueReconcileEntityDelete, + handlerName: constants.TopicQueueGetEntityAndDelete, + forwardHandlerName: constants.TopicQueueReconcileEntityDelete, handlerMiddleware: handlerMiddleware, } @@ -257,12 +258,12 @@ func NewGetEntityAndDeleteHandler( // NewAddOriginatingEntityHandler creates a new handler that adds an originating entity. func NewAddOriginatingEntityHandler( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) interfaces.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -270,8 +271,8 @@ func NewAddOriginatingEntityHandler( refreshEntity: entStrategies.NewAddOriginatingEntityStrategy(propSvc, provMgr, store), createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr), - handlerName: events.TopicQueueOriginatingEntityAdd, - forwardHandlerName: events.TopicQueueEntityEvaluate, + handlerName: constants.TopicQueueOriginatingEntityAdd, + forwardHandlerName: constants.TopicQueueEntityEvaluate, handlerMiddleware: handlerMiddleware, } @@ -279,19 +280,19 @@ func NewAddOriginatingEntityHandler( // NewRemoveOriginatingEntityHandler creates a new handler that removes an originating entity. func NewRemoveOriginatingEntityHandler( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) interfaces.Consumer { return &handleEntityAndDoBase{ evt: evt, refreshEntity: entStrategies.NewDelOriginatingEntityStrategy(propSvc, provMgr, store), createMessage: msgStrategies.NewCreateEmpty(), - handlerName: events.TopicQueueOriginatingEntityDelete, + handlerName: constants.TopicQueueOriginatingEntityDelete, handlerMiddleware: handlerMiddleware, } diff --git a/internal/entities/handlers/handler_test.go b/internal/entities/handlers/handler_test.go index ef27c22ea4..afb40cf27c 100644 --- a/internal/entities/handlers/handler_test.go +++ b/internal/entities/handlers/handler_test.go @@ -24,7 +24,6 @@ import ( "github.com/mindersec/minder/internal/entities/properties" "github.com/mindersec/minder/internal/entities/properties/service" "github.com/mindersec/minder/internal/entities/properties/service/mock/fixtures" - "github.com/mindersec/minder/internal/events" stubeventer "github.com/mindersec/minder/internal/events/stubs" mockgithub "github.com/mindersec/minder/internal/providers/github/mock" ghprops "github.com/mindersec/minder/internal/providers/github/properties" @@ -33,6 +32,8 @@ import ( provManFixtures "github.com/mindersec/minder/internal/providers/manager/mock/fixtures" "github.com/mindersec/minder/internal/reconcilers/messages" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" provifv1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -186,54 +187,54 @@ func checkPullRequestMessage(t *testing.T, msg *watermill.Message) { } type handlerBuilder func( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer +) interfaces.Consumer func refreshEntityHandlerBuilder( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) interfaces.Consumer { return NewRefreshEntityAndEvaluateHandler(evt, store, propSvc, provMgr) } func refreshByIDHandlerBuilder( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) interfaces.Consumer { return NewRefreshByIDAndEvaluateHandler(evt, store, propSvc, provMgr) } func addOriginatingEntityHandlerBuilder( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) interfaces.Consumer { return NewAddOriginatingEntityHandler(evt, store, propSvc, provMgr) } func removeOriginatingEntityHandlerBuilder( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) interfaces.Consumer { return NewRemoveOriginatingEntityHandler(evt, store, propSvc, provMgr) } func getAndDeleteEntityHandlerBuilder( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, _ manager.ProviderManager, -) events.Consumer { +) interfaces.Consumer { return NewGetEntityAndDeleteHandler(evt, store, propSvc) } @@ -274,7 +275,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { df.WithTransaction(), ), expectedPublish: true, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkRepoMessage, }, { @@ -289,7 +290,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { }, mockStoreFunc: df.NewMockStore(), expectedPublish: false, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkRepoMessage, }, { @@ -319,7 +320,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { df.WithTransaction(), ), expectedPublish: true, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkRepoMessage, }, { @@ -354,7 +355,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { df.WithTransaction(), ), expectedPublish: true, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkRepoMessage, }, { @@ -418,7 +419,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { df.WithSuccessfulGetFeatureInProject(true), ), expectedPublish: true, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkRepoMessage, }, { @@ -618,7 +619,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { ) }, expectedPublish: true, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkPullRequestMessage, }, { @@ -683,7 +684,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { }, expectedPublish: true, checkWmMsg: checkRepoEntityMessage, - topic: events.TopicQueueReconcileEntityDelete, + topic: constants.TopicQueueReconcileEntityDelete, }, { name: "NewGetEntityAndDeleteHandler: failure to get entity does not publish", diff --git a/internal/events/eventer.go b/internal/events/events.go similarity index 84% rename from internal/events/eventer.go rename to internal/events/events.go index 7469ec11d8..55eb85eedf 100644 --- a/internal/events/eventer.go +++ b/internal/events/events.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: Copyright 2023 The Minder Authors // SPDX-License-Identifier: Apache-2.0 -// Package events provides the eventer object which is responsible for setting up the watermill router +// Package events provide the eventer object which is responsible for setting up the watermill router // and handling the incoming events package events @@ -18,18 +18,27 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/alexdrl/zerowater" + "github.com/open-feature/go-sdk/openfeature" promgo "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" "github.com/mindersec/minder/internal/events/common" - gochannel "github.com/mindersec/minder/internal/events/gochannel" + "github.com/mindersec/minder/internal/events/gochannel" "github.com/mindersec/minder/internal/events/nats" eventersql "github.com/mindersec/minder/internal/events/sql" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) +// Ensure that the eventer implements the interfaces +var _ interfaces.Publisher = (*eventer)(nil) +var _ interfaces.Service = (*eventer)(nil) +var _ interfaces.Registrar = (*eventer)(nil) +var _ message.Publisher = (*eventer)(nil) + // eventer is a wrapper over the relevant eventing objects in such // a way that they can be easily accessible and configurable. type eventer struct { @@ -44,19 +53,16 @@ type eventer struct { closer common.DriverCloser } -var _ Publisher = (*eventer)(nil) -var _ Service = (*eventer)(nil) - type messageInstruments struct { // message processing time duration histogram messageProcessingTimeHistogram metric.Int64Histogram } -var _ Registrar = (*eventer)(nil) -var _ message.Publisher = (*eventer)(nil) - -// Setup creates an eventer object which isolates the watermill setup code -func Setup(ctx context.Context, cfg *serverconfig.EventConfig) (Interface, error) { +// NewEventer creates an eventer object which isolates the watermill setup code +func NewEventer(ctx context.Context, _ openfeature.IClient, cfg *serverconfig.EventConfig) (interfaces.Interface, error) { + if cfg == nil { + return nil, errors.New("event config is nil") + } if cfg == nil { return nil, errors.New("event config is nil") } @@ -73,8 +79,8 @@ func Setup(ctx context.Context, cfg *serverconfig.EventConfig) (Interface, error metricsBuilder := metrics.NewPrometheusMetricsBuilder( promgo.DefaultRegisterer, - metricsNamespace, - metricsSubsystem) + constants.MetricsNamespace, + constants.MetricsSubsystem) metricsBuilder.AddPrometheusRouterMetrics(router) zerolog.Ctx(ctx).Info().Msg("Router Metrics registered") @@ -90,7 +96,7 @@ func Setup(ctx context.Context, cfg *serverconfig.EventConfig) (Interface, error return nil, fmt.Errorf("failed instantiating driver: %w", err) } - poisonQueueMiddleware, err := middleware.PoisonQueue(pub, DeadLetterQueueTopic) + poisonQueueMiddleware, err := middleware.PoisonQueue(pub, constants.DeadLetterQueueTopic) if err != nil { return nil, fmt.Errorf("failed instantiating poison queue: %w", err) } @@ -139,13 +145,13 @@ func instantiateDriver( cfg *serverconfig.EventConfig, ) (message.Publisher, message.Subscriber, common.DriverCloser, error) { switch driver { - case GoChannelDriver: + case constants.GoChannelDriver: zerolog.Ctx(ctx).Info().Msg("Using go-channel driver") return gochannel.BuildGoChannelDriver(ctx, cfg) - case SQLDriver: + case constants.SQLDriver: zerolog.Ctx(ctx).Info().Msg("Using SQL driver") return eventersql.BuildPostgreSQLDriver(ctx, cfg) - case NATSDriver: + case constants.NATSDriver: zerolog.Ctx(ctx).Info().Msg("Using NATS driver") return nats.BuildNatsChannelDriver(cfg) default: @@ -186,7 +192,7 @@ func (e *eventer) Publish(topic string, messages ...*message.Message) error { "component": "eventer", "function": "Publish", }) - msg.Metadata.Set(PublishedKey, time.Now().Format(time.RFC3339)) + msg.Metadata.Set(constants.PublishedKey, time.Now().Format(time.RFC3339)) } } @@ -234,7 +240,7 @@ func (e *eventer) Register( } // ConsumeEvents allows registration of multiple consumers easily -func (e *eventer) ConsumeEvents(consumers ...Consumer) { +func (e *eventer) ConsumeEvents(consumers ...interfaces.Consumer) { for _, c := range consumers { c.Register(e) } diff --git a/internal/events/eventer_test.go b/internal/events/events_test.go similarity index 90% rename from internal/events/eventer_test.go rename to internal/events/events_test.go index f7f7ac3c09..f5142ebe20 100644 --- a/internal/events/eventer_test.go +++ b/internal/events/events_test.go @@ -15,13 +15,15 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "github.com/mindersec/minder/internal/events" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) type fakeConsumer struct { topics []string - makeHandler func(string, chan eventPair) events.Handler + makeHandler func(string, chan eventPair) interfaces.Handler shouldFailHandler bool // Filled in by test later out chan eventPair @@ -39,13 +41,13 @@ func driverConfig() *serverconfig.EventConfig { } } -func (f *fakeConsumer) Register(r events.Registrar) { +func (f *fakeConsumer) Register(r interfaces.Registrar) { for _, t := range f.topics { r.Register(t, f.makeHandler(t, f.out)) } } -func fakeHandler(id string, out chan eventPair) events.Handler { +func fakeHandler(id string, out chan eventPair) interfaces.Handler { return func(msg *message.Message) error { ctx := msg.Context() select { @@ -56,7 +58,7 @@ func fakeHandler(id string, out chan eventPair) events.Handler { } } -func countFailuresHandler(counter *int) events.Handler { +func countFailuresHandler(counter *int) interfaces.Handler { return func(_ *message.Message) error { *counter++ return errors.New("handler always fails") @@ -116,7 +118,7 @@ func TestEventer(t *testing.T) { // This looks silly, but we need to generate a unique name for // the second handler on topic "a". In real usage, each Consumer // will register a different function. - makeHandler: func(_ string, out chan eventPair) events.Handler { + makeHandler: func(_ string, out chan eventPair) interfaces.Handler { return func(msg *message.Message) error { out <- eventPair{"other", msg.Copy()} return nil @@ -129,7 +131,7 @@ func TestEventer(t *testing.T) { name: "handler fails, message goes to DLQ", publish: []eventPair{{"test_dlq", &message.Message{Metadata: map[string]string{}}}}, want: map[string][]message.Message{ - events.DeadLetterQueueTopic: {{}}, + constants.DeadLetterQueueTopic: {{}}, }, consumers: []fakeConsumer{ { @@ -137,7 +139,7 @@ func TestEventer(t *testing.T) { shouldFailHandler: true, }, { - topics: []string{events.DeadLetterQueueTopic}, + topics: []string{constants.DeadLetterQueueTopic}, makeHandler: fakeHandler, }, }, @@ -227,15 +229,15 @@ var setupMu sync.Mutex // We currently use the global meter provider, so reset it for each test. // Since this is global, we use a global mutex to ensure we don't enter setup // concurrently. -func setupEventerWithMetricReader(ctx context.Context) (events.Interface, *metric.ManualReader, error) { +func setupEventerWithMetricReader(ctx context.Context) (interfaces.Interface, *metric.ManualReader, error) { setupMu.Lock() defer setupMu.Unlock() oldMeter := otel.GetMeterProvider() defer otel.SetMeterProvider(oldMeter) metricReader := metric.NewManualReader() otel.SetMeterProvider(metric.NewMeterProvider(metric.WithReader(metricReader))) - eventer, err := events.Setup(ctx, driverConfig()) - return eventer, metricReader, err + ev, err := eventer.New(ctx, nil, driverConfig()) + return ev, metricReader, err } func setupConsumer(c *fakeConsumer, out chan eventPair, failureCounter *int) { @@ -249,8 +251,8 @@ func setupConsumer(c *fakeConsumer, out chan eventPair, failureCounter *int) { } } -func makeFailingHandler(counter *int) func(_ string, _ chan eventPair) events.Handler { - return func(_ string, _ chan eventPair) events.Handler { +func makeFailingHandler(counter *int) func(_ string, _ chan eventPair) interfaces.Handler { + return func(_ string, _ chan eventPair) interfaces.Handler { return countFailuresHandler(counter) } } diff --git a/internal/events/metrics.go b/internal/events/metrics.go index 5c573a5b58..f8cfcc60e7 100644 --- a/internal/events/metrics.go +++ b/internal/events/metrics.go @@ -11,13 +11,15 @@ import ( "github.com/ThreeDotsLabs/watermill/message/router/middleware" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + + "github.com/mindersec/minder/pkg/eventer/constants" ) func recordMetrics(instruments *messageInstruments) func(h message.HandlerFunc) message.HandlerFunc { metricsFunc := func(h message.HandlerFunc) message.HandlerFunc { return func(msg *message.Message) ([]*message.Message, error) { var processingTime time.Duration - if publishedAt := msg.Metadata.Get(PublishedKey); publishedAt != "" { + if publishedAt := msg.Metadata.Get(constants.PublishedKey); publishedAt != "" { if parsedTime, err := time.Parse(time.RFC3339, publishedAt); err == nil { processingTime = time.Since(parsedTime) } diff --git a/internal/events/stubs/eventer.go b/internal/events/stubs/eventer.go index 85b62c9161..ebc15d7e96 100644 --- a/internal/events/stubs/eventer.go +++ b/internal/events/stubs/eventer.go @@ -10,12 +10,12 @@ import ( "github.com/ThreeDotsLabs/watermill/message" - "github.com/mindersec/minder/internal/events" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // StubEventer is a stub implementation of events.Interface and the events.Publisher interface -var _ events.Interface = (*StubEventer)(nil) -var _ events.Publisher = (*StubEventer)(nil) +var _ interfaces.Interface = (*StubEventer)(nil) +var _ interfaces.Publisher = (*StubEventer)(nil) // StubEventer is an eventer that's useful for testing. type StubEventer struct { @@ -29,7 +29,7 @@ func (*StubEventer) Close() error { } // ConsumeEvents implements events.Interface. -func (*StubEventer) ConsumeEvents(...events.Consumer) { +func (*StubEventer) ConsumeEvents(...interfaces.Consumer) { panic("unimplemented") } diff --git a/internal/invites/mock/service.go b/internal/invites/mock/service.go index f8218ad58c..420d8c1e6d 100644 --- a/internal/invites/mock/service.go +++ b/internal/invites/mock/service.go @@ -17,9 +17,9 @@ import ( auth "github.com/mindersec/minder/internal/auth" authz "github.com/mindersec/minder/internal/authz" db "github.com/mindersec/minder/internal/db" - events "github.com/mindersec/minder/internal/events" v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" server "github.com/mindersec/minder/pkg/config/server" + interfaces "github.com/mindersec/minder/pkg/eventer/interfaces" gomock "go.uber.org/mock/gomock" ) @@ -48,7 +48,7 @@ func (m *MockInviteService) EXPECT() *MockInviteServiceMockRecorder { } // CreateInvite mocks base method. -func (m *MockInviteService) CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, emailConfig server.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string) (*v1.Invitation, error) { +func (m *MockInviteService) CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig server.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string) (*v1.Invitation, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateInvite", ctx, qtx, idClient, eventsPub, emailConfig, targetProject, authzRole, inviteeEmail) ret0, _ := ret[0].(*v1.Invitation) @@ -78,7 +78,7 @@ func (mr *MockInviteServiceMockRecorder) RemoveInvite(ctx, qtx, idClient, target } // UpdateInvite mocks base method. -func (m *MockInviteService) UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, emailConfig server.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string) (*v1.Invitation, error) { +func (m *MockInviteService) UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig server.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string) (*v1.Invitation, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateInvite", ctx, qtx, idClient, eventsPub, emailConfig, targetProject, authzRole, inviteeEmail) ret0, _ := ret[0].(*v1.Invitation) diff --git a/internal/invites/service.go b/internal/invites/service.go index 56f1089b70..dd7bb2609c 100644 --- a/internal/invites/service.go +++ b/internal/invites/service.go @@ -20,11 +20,11 @@ import ( "github.com/mindersec/minder/internal/authz" "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/email" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/projects" "github.com/mindersec/minder/internal/util" minder "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) //go:generate go run go.uber.org/mock/mockgen -package mock_$GOPACKAGE -destination=./mock/$GOFILE -source=./$GOFILE @@ -32,12 +32,12 @@ import ( // InviteService encapsulates the methods to manage user invites to a project type InviteService interface { // CreateInvite creates a new user invite - CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, + CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) // UpdateInvite updates the invite status - UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, + UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) @@ -55,7 +55,7 @@ func NewInviteService() InviteService { return &inviteService{} } -func (_ *inviteService) UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, +func (_ *inviteService) UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) { var userInvite db.UserInvite @@ -229,7 +229,7 @@ func (_ *inviteService) RemoveInvite(ctx context.Context, qtx db.Querier, idClie }, nil } -func (_ *inviteService) CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, +func (_ *inviteService) CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) { diff --git a/internal/invites/service_test.go b/internal/invites/service_test.go index 84701e003f..4edf0252b9 100644 --- a/internal/invites/service_test.go +++ b/internal/invites/service_test.go @@ -23,10 +23,10 @@ import ( "github.com/mindersec/minder/internal/db" dbf "github.com/mindersec/minder/internal/db/fixtures" "github.com/mindersec/minder/internal/email" - mockevents "github.com/mindersec/minder/internal/events/mock" "github.com/mindersec/minder/internal/projects" minder "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" "github.com/mindersec/minder/pkg/config/server" + mockevents "github.com/mindersec/minder/pkg/eventer/interfaces/mock" ) func TestCreateInvite(t *testing.T) { diff --git a/internal/logger/telemetry_store_watermill_test.go b/internal/logger/telemetry_store_watermill_test.go index 71ae0105ee..08cf4cc263 100644 --- a/internal/logger/telemetry_store_watermill_test.go +++ b/internal/logger/telemetry_store_watermill_test.go @@ -14,11 +14,11 @@ import ( "github.com/stretchr/testify/require" "github.com/mindersec/minder/internal/engine/entities" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/logger" "github.com/mindersec/minder/internal/util/testqueue" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" ) func TestTelemetryStoreWMMiddlewareLogsRepositoryInfo(t *testing.T) { @@ -33,7 +33,7 @@ func TestTelemetryStoreWMMiddlewareLogsRepositoryInfo(t *testing.T) { pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{ BlockPublishUntilSubscriberAck: true, diff --git a/internal/providers/github/installations/installations.go b/internal/providers/github/installations/installations.go index 65f9220b75..cd4a03f4c9 100644 --- a/internal/providers/github/installations/installations.go +++ b/internal/providers/github/installations/installations.go @@ -16,8 +16,8 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/github/service" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) const ( @@ -55,7 +55,7 @@ func NewInstallationManager( } // Register implements the Consumer interface. -func (im *InstallationManager) Register(reg events.Registrar) { +func (im *InstallationManager) Register(reg interfaces.Registrar) { reg.Register(ProviderInstallationTopic, im.handleProviderInstallationEvent) } diff --git a/internal/providers/github/manager/manager.go b/internal/providers/github/manager/manager.go index 442f05a65e..8177ae252e 100644 --- a/internal/providers/github/manager/manager.go +++ b/internal/providers/github/manager/manager.go @@ -22,7 +22,6 @@ import ( "github.com/mindersec/minder/internal/crypto" "github.com/mindersec/minder/internal/db" propssvc "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers" "github.com/mindersec/minder/internal/providers/credentials" "github.com/mindersec/minder/internal/providers/github/clients" @@ -31,6 +30,7 @@ import ( m "github.com/mindersec/minder/internal/providers/manager" "github.com/mindersec/minder/internal/providers/ratecache" "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" v1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -47,7 +47,7 @@ func NewGitHubProviderClassManager( ghService service.GitHubProviderService, propSvc propssvc.PropertiesService, mt metrics.Metrics, - publisher events.Publisher, + publisher interfaces.Publisher, ) m.ProviderClassManager { return &githubProviderManager{ restClientCache: restClientCache, @@ -75,7 +75,7 @@ type githubProviderManager struct { store db.Store ghService service.GitHubProviderService mt metrics.Metrics - publisher events.Publisher + publisher interfaces.Publisher } var ( diff --git a/internal/providers/github/webhook/app.go b/internal/providers/github/webhook/app.go index 45df6cf61b..e7fff195b0 100644 --- a/internal/providers/github/webhook/app.go +++ b/internal/providers/github/webhook/app.go @@ -20,12 +20,13 @@ import ( "github.com/mindersec/minder/internal/controlplane/metrics" "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/github/clients" "github.com/mindersec/minder/internal/providers/github/installations" "github.com/mindersec/minder/internal/providers/github/service" "github.com/mindersec/minder/internal/reconcilers/messages" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // installationEvent are events related the GitHub App. Minder uses @@ -104,7 +105,7 @@ func HandleGitHubAppWebhook( store db.Store, ghService service.GitHubProviderService, mt metrics.Metrics, - publisher events.Publisher, + publisher interfaces.Publisher, ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -129,15 +130,15 @@ func HandleGitHubAppWebhook( wes.Typ = github.WebHookType(r) m := message.NewMessage(uuid.New().String(), nil) - m.Metadata.Set(events.ProviderDeliveryIdKey, github.DeliveryID(r)) + m.Metadata.Set(constants.ProviderDeliveryIdKey, github.DeliveryID(r)) // TODO: handle other sources - m.Metadata.Set(events.ProviderSourceKey, "https://api.github.com/") - m.Metadata.Set(events.GithubWebhookEventTypeKey, wes.Typ) + m.Metadata.Set(constants.ProviderSourceKey, "https://api.github.com/") + m.Metadata.Set(constants.GithubWebhookEventTypeKey, wes.Typ) l = l.With(). - Str("webhook-event-type", m.Metadata[events.GithubWebhookEventTypeKey]). - Str("providertype", m.Metadata[events.ProviderTypeKey]). - Str("upstream-delivery-id", m.Metadata[events.ProviderDeliveryIdKey]). + Str("webhook-event-type", m.Metadata[constants.GithubWebhookEventTypeKey]). + Str("providertype", m.Metadata[constants.ProviderTypeKey]). + Str("upstream-delivery-id", m.Metadata[constants.ProviderDeliveryIdKey]). // This is added for consistency with how // watermill tracks message UUID when logging. Str("message_uuid", m.UUID). @@ -356,7 +357,7 @@ func processInstallationRepositoriesAppEvent( func repositoryRemoved( repo *repo, ) (*processingResult, error) { - return sendEvaluateRepoMessage(repo, events.TopicQueueGetEntityAndDelete) + return sendEvaluateRepoMessage(repo, constants.TopicQueueGetEntityAndDelete) } func repositoryAdded( @@ -383,7 +384,7 @@ func repositoryAdded( WithProperties(addRepoProps) return &processingResult{ - topic: events.TopicQueueReconcileEntityAdd, + topic: constants.TopicQueueReconcileEntityAdd, wrapper: event, }, nil } diff --git a/internal/providers/github/webhook/fuzz_test.go b/internal/providers/github/webhook/fuzz_test.go index 1200d2cf95..676a5ad33b 100644 --- a/internal/providers/github/webhook/fuzz_test.go +++ b/internal/providers/github/webhook/fuzz_test.go @@ -19,8 +19,8 @@ import ( "github.com/mindersec/minder/internal/controlplane/metrics" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/constants" ) var eventTypes = [23]string{ @@ -89,10 +89,10 @@ func FuzzGitHubEventParsers(f *testing.F) { wes.Typ = github.WebHookType(req) m := message.NewMessage("", nil) - m.Metadata.Set(events.ProviderDeliveryIdKey, github.DeliveryID(req)) - m.Metadata.Set(events.ProviderTypeKey, string(db.ProviderTypeGithub)) - m.Metadata.Set(events.ProviderSourceKey, "") - m.Metadata.Set(events.GithubWebhookEventTypeKey, wes.Typ) + m.Metadata.Set(constants.ProviderDeliveryIdKey, github.DeliveryID(req)) + m.Metadata.Set(constants.ProviderTypeKey, string(db.ProviderTypeGithub)) + m.Metadata.Set(constants.ProviderSourceKey, "") + m.Metadata.Set(constants.GithubWebhookEventTypeKey, wes.Typ) // Create whConfig whSecretFile, err := os.CreateTemp("", "webhooksecret*") diff --git a/internal/providers/github/webhook/handlers_githubwebhooks_test.go b/internal/providers/github/webhook/handlers_githubwebhooks_test.go index ee712454cb..3894a5f487 100644 --- a/internal/providers/github/webhook/handlers_githubwebhooks_test.go +++ b/internal/providers/github/webhook/handlers_githubwebhooks_test.go @@ -41,7 +41,6 @@ import ( entMsg "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" mock_service "github.com/mindersec/minder/internal/entities/properties/service/mock" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/github/installations" gf "github.com/mindersec/minder/internal/providers/github/mock/fixtures" ghprop "github.com/mindersec/minder/internal/providers/github/properties" @@ -50,6 +49,8 @@ import ( "github.com/mindersec/minder/internal/util/testqueue" v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" + "github.com/mindersec/minder/pkg/eventer/constants" ) //go:embed test-payloads/installation-deleted.json @@ -112,7 +113,7 @@ func (s *UnitTestSuite) TestHandleWebHookPing() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -131,7 +132,7 @@ func (s *UnitTestSuite) TestHandleWebHookPing() { pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() - evt.Register(events.TopicQueueEntityEvaluate, pq.Pass) + evt.Register(constants.TopicQueueEntityEvaluate, pq.Pass) go func() { err := evt.Run(context.Background()) @@ -170,7 +171,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepository() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -181,7 +182,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepository() { defer pq.Close() queued := pq.GetQueue() - evt.Register(events.TopicQueueRefreshEntityAndEvaluate, pq.Pass) + evt.Register(constants.TopicQueueRefreshEntityAndEvaluate, pq.Pass) go func() { err := evt.Run(context.Background()) @@ -228,7 +229,7 @@ func (s *UnitTestSuite) TestHandleWebHookRepository() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -241,7 +242,7 @@ func (s *UnitTestSuite) TestHandleWebHookRepository() { // This changes because "meta" event can only trigger a // deletion - evt.Register(events.TopicQueueGetEntityAndDelete, pq.Pass) + evt.Register(constants.TopicQueueGetEntityAndDelete, pq.Pass) go func() { err := evt.Run(context.Background()) @@ -331,7 +332,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepoPackage() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -341,7 +342,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepoPackage() { pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() - evt.Register(events.TopicQueueEntityEvaluate, pq.Pass) + evt.Register(constants.TopicQueueEntityEvaluate, pq.Pass) go func() { err := evt.Run(context.Background()) @@ -390,7 +391,7 @@ func (s *UnitTestSuite) TestNoopWebhookHandler() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -460,7 +461,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/apps"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, }, { @@ -484,7 +485,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://example.com/random/url"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, }, { @@ -522,7 +523,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("login/package-name"), }, - topic: events.TopicQueueOriginatingEntityAdd, + topic: constants.TopicQueueOriginatingEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -545,7 +546,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("mindersec/minder"), }, - topic: events.TopicQueueOriginatingEntityAdd, + topic: constants.TopicQueueOriginatingEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -590,7 +591,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -608,7 +609,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, }, @@ -644,7 +645,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://example.com/random/url"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -679,7 +680,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -714,7 +715,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -746,7 +747,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -791,7 +792,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://example.com/random/url"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -826,7 +827,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -861,7 +862,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -893,7 +894,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -923,7 +924,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -959,7 +960,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -998,7 +999,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -1033,7 +1034,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1061,7 +1062,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1089,7 +1090,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1117,7 +1118,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1144,7 +1145,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1172,7 +1173,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1199,7 +1200,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1227,7 +1228,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1260,7 +1261,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1292,7 +1293,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -1324,7 +1325,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -1356,7 +1357,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1389,7 +1390,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1422,7 +1423,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1455,7 +1456,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1489,7 +1490,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -1520,7 +1521,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1555,7 +1556,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { Private: github.Bool(true), }, }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, // the message is passed on to events.TopicQueueRefreshEntityAndEvaluate // which should discard it (see test there) @@ -1588,7 +1589,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { Private: github.Bool(true), }, }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, // the message is passed on to events.TopicQueueRefreshEntityAndEvaluate // which should discard it (see test there) @@ -1624,7 +1625,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1652,7 +1653,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1680,7 +1681,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1708,7 +1709,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1736,7 +1737,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1764,7 +1765,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1791,7 +1792,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1819,7 +1820,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1847,7 +1848,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1875,7 +1876,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1903,7 +1904,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1931,7 +1932,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1959,7 +1960,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1987,7 +1988,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2015,7 +2016,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2043,7 +2044,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2070,7 +2071,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2097,7 +2098,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2117,7 +2118,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { event: "org_block", // https://pkg.go.dev/github.com/google/go-github/v62@v62.0.0/github#OrgBlockEvent payload: &github.OrgBlockEvent{}, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -2135,7 +2136,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2155,7 +2156,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { event: "push", // https://pkg.go.dev/github.com/google/go-github/v62@v62.0.0/github#PushEvent rawPayload: []byte(rawPushEvent), - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2184,7 +2185,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2210,7 +2211,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2229,7 +2230,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { // https://docs.github.com/en/webhooks/webhook-events-and-payloads#branch_protection_configuration event: "branch_protection_configuration", rawPayload: []byte(rawBranchProtectionConfigurationDisabledEvent), - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2255,7 +2256,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2281,7 +2282,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2307,7 +2308,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2333,7 +2334,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2359,7 +2360,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2385,7 +2386,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2429,7 +2430,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("mindersec/minder/42"), }, - topic: events.TopicQueueOriginatingEntityAdd, + topic: constants.TopicQueueOriginatingEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2471,7 +2472,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("mindersec/minder/42"), }, - topic: events.TopicQueueOriginatingEntityAdd, + topic: constants.TopicQueueOriginatingEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2513,7 +2514,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("mindersec/minder/42"), }, - topic: events.TopicQueueOriginatingEntityAdd, + topic: constants.TopicQueueOriginatingEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2554,7 +2555,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("mindersec/minder/42"), }, - topic: events.TopicQueueOriginatingEntityDelete, + topic: constants.TopicQueueOriginatingEntityDelete, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2618,7 +2619,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { Login: github.String("stacklok"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusInternalServerError, queued: nil, }, @@ -2631,7 +2632,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { Action: github.String("created"), Garbage: github.String("garbage"), }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, }, { @@ -2641,7 +2642,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { Action: github.String("created"), Garbage: github.String("garbage"), }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, }, } @@ -2654,7 +2655,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -3021,7 +3022,7 @@ func (s *UnitTestSuite) TestHandleGitHubAppWebHook() { }, 54321), ), - topic: events.TopicQueueReconcileEntityAdd, + topic: constants.TopicQueueReconcileEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -3108,7 +3109,7 @@ func (s *UnitTestSuite) TestHandleGitHubAppWebHook() { }, 54321), ), - topic: events.TopicQueueReconcileEntityAdd, + topic: constants.TopicQueueReconcileEntityAdd, statusCode: http.StatusOK, //nolint:thelper queued: nil, @@ -3163,7 +3164,7 @@ func (s *UnitTestSuite) TestHandleGitHubAppWebHook() { }, 54321), ), - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -3250,7 +3251,7 @@ func (s *UnitTestSuite) TestHandleGitHubAppWebHook() { mockStore = mockdb.NewMockStore(ctrl) } - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/providers/github/webhook/handlers_packages.go b/internal/providers/github/webhook/handlers_packages.go index 9316c97541..7af5941e34 100644 --- a/internal/providers/github/webhook/handlers_packages.go +++ b/internal/providers/github/webhook/handlers_packages.go @@ -15,9 +15,9 @@ import ( "github.com/mindersec/minder/internal/db" entityMessage "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" ghprop "github.com/mindersec/minder/internal/providers/github/properties" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) // packageEvent represent any event related to a repository and one of @@ -124,7 +124,7 @@ func processPackageEvent( WithOriginator(pb.Entity_ENTITY_REPOSITORIES, repoProps). WithProviderImplementsHint(string(db.ProviderTypeGithub)) - return &processingResult{topic: events.TopicQueueOriginatingEntityAdd, wrapper: pkgMsg}, nil + return &processingResult{topic: constants.TopicQueueOriginatingEntityAdd, wrapper: pkgMsg}, nil } // This routine assumes that all necessary validation is performed on diff --git a/internal/providers/github/webhook/handlers_pull_requests.go b/internal/providers/github/webhook/handlers_pull_requests.go index 769b71f4ea..a4061ef0a5 100644 --- a/internal/providers/github/webhook/handlers_pull_requests.go +++ b/internal/providers/github/webhook/handlers_pull_requests.go @@ -14,9 +14,9 @@ import ( "github.com/mindersec/minder/internal/db" entityMessage "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" ghprop "github.com/mindersec/minder/internal/providers/github/properties" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) // pullRequestEvent are events related to pull requests issued around @@ -142,9 +142,9 @@ func processPullRequestEvent( case webhookActionEventOpened, webhookActionEventReopened, webhookActionEventSynchronize: - topic = events.TopicQueueOriginatingEntityAdd + topic = constants.TopicQueueOriginatingEntityAdd case webhookActionEventClosed: - topic = events.TopicQueueOriginatingEntityDelete + topic = constants.TopicQueueOriginatingEntityDelete default: zerolog.Ctx(ctx).Info().Msgf("action %s is not handled for pull requests", pullProps.GetProperty(ghprop.PullPropertyAction).GetString()) diff --git a/internal/providers/github/webhook/handlers_repos.go b/internal/providers/github/webhook/handlers_repos.go index 835e59c385..53eb6f0883 100644 --- a/internal/providers/github/webhook/handlers_repos.go +++ b/internal/providers/github/webhook/handlers_repos.go @@ -15,9 +15,9 @@ import ( "github.com/mindersec/minder/internal/db" entityMessage "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" ghprop "github.com/mindersec/minder/internal/providers/github/properties" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) // repoEvent represents any event related to a repository. @@ -125,7 +125,7 @@ func processRepositoryEvent( l.Info().Msg("handling event for repository") - return sendEvaluateRepoMessage(event.GetRepo(), events.TopicQueueRefreshEntityAndEvaluate) + return sendEvaluateRepoMessage(event.GetRepo(), constants.TopicQueueRefreshEntityAndEvaluate) } func sendEvaluateRepoMessage( @@ -204,14 +204,14 @@ func processRelevantRepositoryEvent( } // For all other events exept deletions we issue a refresh event. - topic := events.TopicQueueRefreshEntityAndEvaluate + topic := constants.TopicQueueRefreshEntityAndEvaluate // For webhook deletions, repository deletions, and repository // transfers, we issue a delete event with the correct message // type. if event.GetAction() == webhookActionEventDeleted || event.GetAction() == webhookActionEventTransferred { - topic = events.TopicQueueGetEntityAndDelete + topic = constants.TopicQueueGetEntityAndDelete } return &processingResult{ diff --git a/internal/providers/github/webhook/hook.go b/internal/providers/github/webhook/hook.go index 304d97fac4..14e3236ec2 100644 --- a/internal/providers/github/webhook/hook.go +++ b/internal/providers/github/webhook/hook.go @@ -22,10 +22,11 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/entities" entMsg "github.com/mindersec/minder/internal/entities/handlers/message" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/github/installations" "github.com/mindersec/minder/internal/reconcilers/messages" "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) const ( @@ -66,7 +67,7 @@ type processingResult struct { // HandleWebhookEvent is the main entry point for processing github webhook events func HandleWebhookEvent( mt metrics.Metrics, - publisher events.Publisher, + publisher interfaces.Publisher, whconfig *server.WebhookConfig, ) http.HandlerFunc { // the function handles incoming GitHub webhooks @@ -105,15 +106,15 @@ func HandleWebhookEvent( // TODO: extract sender and event time from payload portably m := message.NewMessage(uuid.New().String(), nil) - m.Metadata.Set(events.ProviderDeliveryIdKey, github.DeliveryID(r)) - m.Metadata.Set(events.ProviderTypeKey, string(db.ProviderTypeGithub)) - m.Metadata.Set(events.ProviderSourceKey, "https://api.github.com/") // TODO: handle other sources - m.Metadata.Set(events.GithubWebhookEventTypeKey, wes.Typ) + m.Metadata.Set(constants.ProviderDeliveryIdKey, github.DeliveryID(r)) + m.Metadata.Set(constants.ProviderTypeKey, string(db.ProviderTypeGithub)) + m.Metadata.Set(constants.ProviderSourceKey, "https://api.github.com/") // TODO: handle other sources + m.Metadata.Set(constants.GithubWebhookEventTypeKey, wes.Typ) l = l.With(). - Str("webhook-event-type", m.Metadata[events.GithubWebhookEventTypeKey]). - Str("providertype", m.Metadata[events.ProviderTypeKey]). - Str("upstream-delivery-id", m.Metadata[events.ProviderDeliveryIdKey]). + Str("webhook-event-type", m.Metadata[constants.GithubWebhookEventTypeKey]). + Str("providertype", m.Metadata[constants.ProviderTypeKey]). + Str("upstream-delivery-id", m.Metadata[constants.ProviderDeliveryIdKey]). // This is added for consistency with how // watermill tracks message UUID when logging. Str("message_uuid", m.UUID). diff --git a/internal/providers/gitlab/manager/manager.go b/internal/providers/gitlab/manager/manager.go index eb03fb3a18..c30fdda351 100644 --- a/internal/providers/gitlab/manager/manager.go +++ b/internal/providers/gitlab/manager/manager.go @@ -20,10 +20,10 @@ import ( "github.com/mindersec/minder/internal/crypto" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/credentials" "github.com/mindersec/minder/internal/providers/gitlab" "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" v1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -38,7 +38,7 @@ type providerClassManager struct { glpcfg *server.GitLabConfig webhookURL string parentContext context.Context - pub events.Publisher + pub interfaces.Publisher // secrets for the webhook. These are stored in the // structure to allow efficient fetching. Rotation @@ -49,7 +49,7 @@ type providerClassManager struct { // NewGitLabProviderClassManager creates a new provider class manager for the dockerhub provider func NewGitLabProviderClassManager( - ctx context.Context, crypteng crypto.Engine, store db.Store, pub events.Publisher, + ctx context.Context, crypteng crypto.Engine, store db.Store, pub interfaces.Publisher, cfg *server.GitLabConfig, wgCfg server.WebhookConfig, ) (*providerClassManager, error) { webhookURLBase := wgCfg.ExternalWebhookURL diff --git a/internal/providers/gitlab/manager/webhook_handlers_merge_requests.go b/internal/providers/gitlab/manager/webhook_handlers_merge_requests.go index 42f9c83150..13c38c8234 100644 --- a/internal/providers/gitlab/manager/webhook_handlers_merge_requests.go +++ b/internal/providers/gitlab/manager/webhook_handlers_merge_requests.go @@ -14,9 +14,9 @@ import ( entmsg "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/gitlab" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) func (m *providerClassManager) handleMergeRequest(l zerolog.Logger, r *http.Request) error { @@ -47,13 +47,13 @@ func (m *providerClassManager) handleMergeRequest(l zerolog.Logger, r *http.Requ case mergeRequestEvent.ObjectAttributes.Action == "open", mergeRequestEvent.ObjectAttributes.Action == "reopen": return m.publishMergeRequestMessage(mrID, mrIID, rawProjectID, - events.TopicQueueOriginatingEntityAdd) + constants.TopicQueueOriginatingEntityAdd) case mergeRequestEvent.ObjectAttributes.Action == "close": return m.publishMergeRequestMessage(mrID, mrIID, rawProjectID, - events.TopicQueueOriginatingEntityDelete) + constants.TopicQueueOriginatingEntityDelete) case mergeRequestEvent.ObjectAttributes.Action == "update": return m.publishMergeRequestMessage(mrID, mrIID, rawProjectID, - events.TopicQueueRefreshEntityAndEvaluate) + constants.TopicQueueRefreshEntityAndEvaluate) default: return nil } diff --git a/internal/providers/gitlab/manager/webhook_handlers_releases.go b/internal/providers/gitlab/manager/webhook_handlers_releases.go index 92f0430586..a1c19838d5 100644 --- a/internal/providers/gitlab/manager/webhook_handlers_releases.go +++ b/internal/providers/gitlab/manager/webhook_handlers_releases.go @@ -14,9 +14,9 @@ import ( entmsg "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/gitlab" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) func (m *providerClassManager) handleRelease(l zerolog.Logger, r *http.Request) error { @@ -47,13 +47,13 @@ func (m *providerClassManager) handleRelease(l zerolog.Logger, r *http.Request) switch { case releaseEvent.Action == "create": return m.publishReleaseMessage(releaseID, tag, rawProjectID, - events.TopicQueueOriginatingEntityAdd) + constants.TopicQueueOriginatingEntityAdd) case releaseEvent.Action == "update": return m.publishReleaseMessage(releaseID, tag, rawProjectID, - events.TopicQueueRefreshEntityAndEvaluate) + constants.TopicQueueRefreshEntityAndEvaluate) case releaseEvent.Action == "delete": return m.publishReleaseMessage(releaseID, tag, rawProjectID, - events.TopicQueueOriginatingEntityDelete) + constants.TopicQueueOriginatingEntityDelete) default: return nil } diff --git a/internal/providers/gitlab/manager/webhook_handlers_repos.go b/internal/providers/gitlab/manager/webhook_handlers_repos.go index 01c46180df..3e2f1b7ba8 100644 --- a/internal/providers/gitlab/manager/webhook_handlers_repos.go +++ b/internal/providers/gitlab/manager/webhook_handlers_repos.go @@ -14,9 +14,9 @@ import ( entmsg "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/gitlab" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) func (m *providerClassManager) handleRepoPush(l zerolog.Logger, r *http.Request) error { @@ -83,7 +83,7 @@ func (m *providerClassManager) publishRefreshAndEvalForGitlabProject( // Publish message l.Debug().Str("msg_id", msgID).Msg("publishing refresh and eval message") - if err := m.pub.Publish(events.TopicQueueRefreshEntityAndEvaluate, msg); err != nil { + if err := m.pub.Publish(constants.TopicQueueRefreshEntityAndEvaluate, msg); err != nil { l.Error().Err(err).Msg("error publishing refresh and eval message") return fmt.Errorf("error publishing refresh and eval message: %w", err) } diff --git a/internal/reconcilers/entity_delete_test.go b/internal/reconcilers/entity_delete_test.go index e148dff875..b46b00020b 100644 --- a/internal/reconcilers/entity_delete_test.go +++ b/internal/reconcilers/entity_delete_test.go @@ -16,12 +16,12 @@ import ( mockdb "github.com/mindersec/minder/database/mock" df "github.com/mindersec/minder/database/mock/fixtures" "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/reconcilers/messages" mockrepo "github.com/mindersec/minder/internal/repositories/mock" rf "github.com/mindersec/minder/internal/repositories/mock/fixtures" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" ) var ( @@ -160,7 +160,7 @@ func setUp(t *testing.T, tt testCase, ctrl *gomock.Controller) *Reconciler { repoService = tt.mockReposFunc(ctrl) } - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/reconcilers/reconcilers.go b/internal/reconcilers/reconcilers.go index 9bdfab616e..cdef3778ad 100644 --- a/internal/reconcilers/reconcilers.go +++ b/internal/reconcilers/reconcilers.go @@ -8,15 +8,16 @@ package reconcilers import ( "github.com/mindersec/minder/internal/crypto" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/manager" "github.com/mindersec/minder/internal/repositories" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // Reconciler is a helper that reconciles entities type Reconciler struct { store db.Store - evt events.Publisher + evt interfaces.Publisher crypteng crypto.Engine providerManager manager.ProviderManager repos repositories.RepositoryService @@ -25,7 +26,7 @@ type Reconciler struct { // NewReconciler creates a new reconciler object func NewReconciler( store db.Store, - evt events.Publisher, + evt interfaces.Publisher, cryptoEngine crypto.Engine, providerManager manager.ProviderManager, repositoryService repositories.RepositoryService, @@ -40,9 +41,9 @@ func NewReconciler( } // Register implements the Consumer interface. -func (r *Reconciler) Register(reg events.Registrar) { - reg.Register(events.TopicQueueReconcileRepoInit, r.handleRepoReconcilerEvent) - reg.Register(events.TopicQueueReconcileProfileInit, r.handleProfileInitEvent) - reg.Register(events.TopicQueueReconcileEntityDelete, r.handleEntityDeleteEvent) - reg.Register(events.TopicQueueReconcileEntityAdd, r.handleEntityAddEvent) +func (r *Reconciler) Register(reg interfaces.Registrar) { + reg.Register(constants.TopicQueueReconcileRepoInit, r.handleRepoReconcilerEvent) + reg.Register(constants.TopicQueueReconcileProfileInit, r.handleProfileInitEvent) + reg.Register(constants.TopicQueueReconcileEntityDelete, r.handleEntityDeleteEvent) + reg.Register(constants.TopicQueueReconcileEntityAdd, r.handleEntityAddEvent) } diff --git a/internal/reconcilers/repository.go b/internal/reconcilers/repository.go index 708b3f1146..d414832d72 100644 --- a/internal/reconcilers/repository.go +++ b/internal/reconcilers/repository.go @@ -15,8 +15,8 @@ import ( "github.com/rs/zerolog/log" entityMessage "github.com/mindersec/minder/internal/entities/handlers/message" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/reconcilers/messages" + "github.com/mindersec/minder/pkg/eventer/constants" ) // handleRepoReconcilerEvent handles events coming from the reconciler topic @@ -64,7 +64,7 @@ func (r *Reconciler) handleRepositoryReconcilerEvent(ctx context.Context, evt *m } m.SetContext(ctx) - if err := r.evt.Publish(events.TopicQueueRefreshEntityByIDAndEvaluate, m); err != nil { + if err := r.evt.Publish(constants.TopicQueueRefreshEntityByIDAndEvaluate, m); err != nil { // we retry in case watermill is having a bad day return fmt.Errorf("error publishing message: %w", err) } diff --git a/internal/reconcilers/repository_test.go b/internal/reconcilers/repository_test.go index 26153e40c6..7f3987bcb1 100644 --- a/internal/reconcilers/repository_test.go +++ b/internal/reconcilers/repository_test.go @@ -10,9 +10,9 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - "github.com/mindersec/minder/internal/events" stubeventer "github.com/mindersec/minder/internal/events/stubs" "github.com/mindersec/minder/internal/reconcilers/messages" + "github.com/mindersec/minder/pkg/eventer/constants" ) var ( @@ -33,7 +33,7 @@ func Test_handleRepoReconcilerEvent(t *testing.T) { }{ { name: "valid event", - topic: events.TopicQueueRefreshEntityByIDAndEvaluate, + topic: constants.TopicQueueRefreshEntityByIDAndEvaluate, entityID: testRepoID, expectedPublish: true, expectedErr: false, @@ -44,7 +44,7 @@ func Test_handleRepoReconcilerEvent(t *testing.T) { // just before reconciling artifacts - we verify that because if we hit the artifacts path, we would have // a bunch of other mocks to call name: "event with string as upstream ID does publish", - topic: events.TopicQueueRefreshEntityByIDAndEvaluate, + topic: constants.TopicQueueRefreshEntityByIDAndEvaluate, entityID: testRepoID, expectedPublish: true, expectedErr: false, diff --git a/internal/reconcilers/run_profile.go b/internal/reconcilers/run_profile.go index 896b8e594c..bb9eb13061 100644 --- a/internal/reconcilers/run_profile.go +++ b/internal/reconcilers/run_profile.go @@ -14,7 +14,7 @@ import ( "github.com/rs/zerolog" entityMessage "github.com/mindersec/minder/internal/entities/handlers/message" - "github.com/mindersec/minder/internal/events" + "github.com/mindersec/minder/pkg/eventer/constants" ) // ProfileInitEvent is an event that is sent to the reconciler topic @@ -96,7 +96,7 @@ func (r *Reconciler) publishProfileInitEvents( return nil } - if err := r.evt.Publish(events.TopicQueueRefreshEntityByIDAndEvaluate, m); err != nil { + if err := r.evt.Publish(constants.TopicQueueRefreshEntityByIDAndEvaluate, m); err != nil { // we retry in case watermill is having a bad day return fmt.Errorf("error publishing message: %w", err) } diff --git a/internal/reconcilers/run_profile_test.go b/internal/reconcilers/run_profile_test.go index a6a29c805a..44f11dce88 100644 --- a/internal/reconcilers/run_profile_test.go +++ b/internal/reconcilers/run_profile_test.go @@ -14,8 +14,8 @@ import ( df "github.com/mindersec/minder/database/mock/fixtures" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" stubeventer "github.com/mindersec/minder/internal/events/stubs" + "github.com/mindersec/minder/pkg/eventer/constants" ) var ( @@ -94,7 +94,7 @@ func Test_handleProfileInitEvent(t *testing.T) { require.Equal(t, scenario.numPublish, len(stubEventer.Sent)) if scenario.numPublish > 0 { - require.Contains(t, stubEventer.Topics, events.TopicQueueRefreshEntityByIDAndEvaluate) + require.Contains(t, stubEventer.Topics, constants.TopicQueueRefreshEntityByIDAndEvaluate) } }) } diff --git a/internal/reminder/reminder.go b/internal/reminder/reminder.go index 51fb5908ec..7ec9a61974 100644 --- a/internal/reminder/reminder.go +++ b/internal/reminder/reminder.go @@ -16,10 +16,10 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/events/common" remindermessages "github.com/mindersec/minder/internal/reminder/messages" reminderconfig "github.com/mindersec/minder/pkg/config/reminder" + "github.com/mindersec/minder/pkg/eventer/constants" ) // Interface is an interface over the reminder service @@ -143,7 +143,7 @@ func (r *reminder) sendReminders(ctx context.Context) error { return fmt.Errorf("error creating reminder messages: %w", err) } - err = r.eventPublisher.Publish(events.TopicQueueRepoReminder, messages...) + err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...) if err != nil { return fmt.Errorf("error publishing messages: %w", err) } diff --git a/internal/reminderprocessor/reminder_processor.go b/internal/reminderprocessor/reminder_processor.go index c9e3b6fab1..2368296b1e 100644 --- a/internal/reminderprocessor/reminder_processor.go +++ b/internal/reminderprocessor/reminder_processor.go @@ -10,24 +10,25 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/rs/zerolog/log" - "github.com/mindersec/minder/internal/events" reconcilermessages "github.com/mindersec/minder/internal/reconcilers/messages" remindermessages "github.com/mindersec/minder/internal/reminder/messages" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // ReminderProcessor processes the incoming reminders type ReminderProcessor struct { - evt events.Interface + evt interfaces.Interface } // NewReminderProcessor creates a new ReminderProcessor -func NewReminderProcessor(evt events.Interface) *ReminderProcessor { +func NewReminderProcessor(evt interfaces.Interface) *ReminderProcessor { return &ReminderProcessor{evt: evt} } // Register implements the Consumer interface. -func (rp *ReminderProcessor) Register(r events.Registrar) { - r.Register(events.TopicQueueRepoReminder, rp.reminderMessageHandler) +func (rp *ReminderProcessor) Register(r interfaces.Registrar) { + r.Register(constants.TopicQueueRepoReminder, rp.reminderMessageHandler) } func (rp *ReminderProcessor) reminderMessageHandler(msg *message.Message) error { @@ -44,7 +45,7 @@ func (rp *ReminderProcessor) reminderMessageHandler(msg *message.Message) error } // This is a non-fatal error, so we'll just log it and continue with the next ones - if err := rp.evt.Publish(events.TopicQueueReconcileRepoInit, repoReconcileMsg); err != nil { + if err := rp.evt.Publish(constants.TopicQueueReconcileRepoInit, repoReconcileMsg); err != nil { log.Printf("error publishing reconciler event: %v", err) } return nil diff --git a/internal/repositories/service.go b/internal/repositories/service.go index 72f47a4ccf..69126ea69e 100644 --- a/internal/repositories/service.go +++ b/internal/repositories/service.go @@ -18,13 +18,14 @@ import ( "github.com/mindersec/minder/internal/entities/models" "github.com/mindersec/minder/internal/entities/properties" "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/logger" "github.com/mindersec/minder/internal/projects/features" "github.com/mindersec/minder/internal/providers/manager" reconcilers "github.com/mindersec/minder/internal/reconcilers/messages" "github.com/mindersec/minder/internal/util/ptr" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" provifv1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -95,7 +96,7 @@ var ( type repositoryService struct { store db.Store - eventProducer events.Publisher + eventProducer interfaces.Publisher providerManager manager.ProviderManager propSvc service.PropertiesService } @@ -104,7 +105,7 @@ type repositoryService struct { func NewRepositoryService( store db.Store, propSvc service.PropertiesService, - eventProducer events.Publisher, + eventProducer interfaces.Publisher, providerManager manager.ProviderManager, ) RepositoryService { return &repositoryService{ @@ -394,7 +395,7 @@ func (r *repositoryService) pushReconcilerEvent(entityID uuid.UUID, projectID uu } // This is a non-fatal error, so we'll just log it and continue with the next ones - if err = r.eventProducer.Publish(events.TopicQueueReconcileRepoInit, msg); err != nil { + if err = r.eventProducer.Publish(constants.TopicQueueReconcileRepoInit, msg); err != nil { log.Printf("error publishing reconciler event: %v", err) } diff --git a/internal/repositories/service_test.go b/internal/repositories/service_test.go index a152db53ba..24eaf8b5b5 100644 --- a/internal/repositories/service_test.go +++ b/internal/repositories/service_test.go @@ -21,7 +21,6 @@ import ( "github.com/mindersec/minder/internal/entities/models" "github.com/mindersec/minder/internal/entities/properties" mock_service "github.com/mindersec/minder/internal/entities/properties/service/mock" - mockevents "github.com/mindersec/minder/internal/events/mock" mockgithub "github.com/mindersec/minder/internal/providers/github/mock" ghprop "github.com/mindersec/minder/internal/providers/github/properties" "github.com/mindersec/minder/internal/providers/manager" @@ -29,6 +28,7 @@ import ( "github.com/mindersec/minder/internal/repositories" "github.com/mindersec/minder/internal/util/ptr" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + mockevents "github.com/mindersec/minder/pkg/eventer/interfaces/mock" provinfv1 "github.com/mindersec/minder/pkg/providers/v1" ) diff --git a/internal/service/service.go b/internal/service/service.go index 62b0da47a4..106589a19a 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -25,7 +25,6 @@ import ( "github.com/mindersec/minder/internal/engine" "github.com/mindersec/minder/internal/entities/handlers" propService "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/flags" "github.com/mindersec/minder/internal/history" "github.com/mindersec/minder/internal/invites" @@ -50,6 +49,8 @@ import ( "github.com/mindersec/minder/internal/roles" serverconfig "github.com/mindersec/minder/pkg/config/server" "github.com/mindersec/minder/pkg/engine/selectors" + "github.com/mindersec/minder/pkg/eventer" + "github.com/mindersec/minder/pkg/eventer/interfaces" "github.com/mindersec/minder/pkg/profiles" "github.com/mindersec/minder/pkg/ruletypes" ) @@ -73,7 +74,7 @@ func AllInOneServerService( ) error { errg, ctx := errgroup.WithContext(ctx) - evt, err := events.Setup(ctx, &cfg.Events) + evt, err := eventer.New(ctx, nil, &cfg.Events) if err != nil { return fmt.Errorf("unable to setup eventer: %w", err) } @@ -270,7 +271,7 @@ func AllInOneServerService( evt.ConsumeEvents(getAndDeleteEntity) // Register the email manager to handle email invitations - var mailClient events.Consumer + var mailClient interfaces.Consumer if cfg.Email.AWSSES.Region != "" && cfg.Email.AWSSES.Sender != "" { // If AWS SES is configured, use it to send emails mailClient, err = awsses.New(ctx, cfg.Email.AWSSES.Sender, cfg.Email.AWSSES.Region) diff --git a/internal/events/constants.go b/pkg/eventer/constants/constants.go similarity index 88% rename from internal/events/constants.go rename to pkg/eventer/constants/constants.go index b2b73a1087..cd7c0041e7 100644 --- a/internal/events/constants.go +++ b/pkg/eventer/constants/constants.go @@ -1,7 +1,8 @@ // SPDX-FileCopyrightText: Copyright 2024 The Minder Authors // SPDX-License-Identifier: Apache-2.0 -package events +// Package constants contains constants used by the eventer package. +package constants // Metadata added to Messages const ( @@ -19,8 +20,10 @@ const ( ) const ( - metricsNamespace = "minder" - metricsSubsystem = "eventer" + // MetricsNamespace is the namespace for all metrics emitted by the eventer + MetricsNamespace = "minder" + // MetricsSubsystem is the subsystem for all metrics emitted by the eventer + MetricsSubsystem = "eventer" ) const ( diff --git a/pkg/eventer/events.go b/pkg/eventer/events.go new file mode 100644 index 0000000000..d75ebf88c7 --- /dev/null +++ b/pkg/eventer/events.go @@ -0,0 +1,20 @@ +// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package eventer provides an interface for creating a new eventer +package eventer + +import ( + "context" + + "github.com/open-feature/go-sdk/openfeature" + + "github.com/mindersec/minder/internal/events" + serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" +) + +// New creates a new eventer +func New(ctx context.Context, flagClient openfeature.IClient, cfg *serverconfig.EventConfig) (interfaces.Interface, error) { + return events.NewEventer(ctx, flagClient, cfg) +} diff --git a/internal/events/interfaces.go b/pkg/eventer/interfaces/interfaces.go similarity index 96% rename from internal/events/interfaces.go rename to pkg/eventer/interfaces/interfaces.go index 822f35c26f..e2554fc6df 100644 --- a/internal/events/interfaces.go +++ b/pkg/eventer/interfaces/interfaces.go @@ -1,7 +1,8 @@ // SPDX-FileCopyrightText: Copyright 2024 The Minder Authors // SPDX-License-Identifier: Apache-2.0 -package events +// Package interfaces provides the interfaces for the eventer package. +package interfaces import ( "context" diff --git a/internal/events/mock/interfaces.go b/pkg/eventer/interfaces/mock/interfaces.go similarity index 94% rename from internal/events/mock/interfaces.go rename to pkg/eventer/interfaces/mock/interfaces.go index ffaa0c0b49..b758e8c79c 100644 --- a/internal/events/mock/interfaces.go +++ b/pkg/eventer/interfaces/mock/interfaces.go @@ -3,18 +3,18 @@ // // Generated by this command: // -// mockgen -package mock_events -destination=./mock/interfaces.go -source=./interfaces.go +// mockgen -package mock_interfaces -destination=./mock/interfaces.go -source=./interfaces.go // -// Package mock_events is a generated GoMock package. -package mock_events +// Package mock_interfaces is a generated GoMock package. +package mock_interfaces import ( context "context" reflect "reflect" message "github.com/ThreeDotsLabs/watermill/message" - events "github.com/mindersec/minder/internal/events" + interfaces "github.com/mindersec/minder/pkg/eventer/interfaces" gomock "go.uber.org/mock/gomock" ) @@ -43,7 +43,7 @@ func (m *MockRegistrar) EXPECT() *MockRegistrarMockRecorder { } // Register mocks base method. -func (m *MockRegistrar) Register(topic string, handler events.Handler, mdw ...message.HandlerMiddleware) { +func (m *MockRegistrar) Register(topic string, handler interfaces.Handler, mdw ...message.HandlerMiddleware) { m.ctrl.T.Helper() varargs := []any{topic, handler} for _, a := range mdw { @@ -84,7 +84,7 @@ func (m *MockConsumer) EXPECT() *MockConsumerMockRecorder { } // Register mocks base method. -func (m *MockConsumer) Register(arg0 events.Registrar) { +func (m *MockConsumer) Register(arg0 interfaces.Registrar) { m.ctrl.T.Helper() m.ctrl.Call(m, "Register", arg0) } @@ -215,7 +215,7 @@ func (mr *MockServiceMockRecorder) Close() *gomock.Call { } // ConsumeEvents mocks base method. -func (m *MockService) ConsumeEvents(consumers ...events.Consumer) { +func (m *MockService) ConsumeEvents(consumers ...interfaces.Consumer) { m.ctrl.T.Helper() varargs := []any{} for _, a := range consumers { @@ -297,7 +297,7 @@ func (mr *MockInterfaceMockRecorder) Close() *gomock.Call { } // ConsumeEvents mocks base method. -func (m *MockInterface) ConsumeEvents(consumers ...events.Consumer) { +func (m *MockInterface) ConsumeEvents(consumers ...interfaces.Consumer) { m.ctrl.T.Helper() varargs := []any{} for _, a := range consumers { @@ -332,7 +332,7 @@ func (mr *MockInterfaceMockRecorder) Publish(topic any, messages ...any) *gomock } // Register mocks base method. -func (m *MockInterface) Register(topic string, handler events.Handler, mdw ...message.HandlerMiddleware) { +func (m *MockInterface) Register(topic string, handler interfaces.Handler, mdw ...message.HandlerMiddleware) { m.ctrl.T.Helper() varargs := []any{topic, handler} for _, a := range mdw { diff --git a/pkg/profiles/service.go b/pkg/profiles/service.go index e2770fa011..f5dab7c177 100644 --- a/pkg/profiles/service.go +++ b/pkg/profiles/service.go @@ -22,7 +22,6 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/entities" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/logger" "github.com/mindersec/minder/internal/marketplaces/namespaces" "github.com/mindersec/minder/internal/reconcilers" @@ -30,6 +29,8 @@ import ( "github.com/mindersec/minder/internal/util/ptr" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" "github.com/mindersec/minder/pkg/engine/selectors" + "github.com/mindersec/minder/pkg/eventer/constants" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) //go:generate go run go.uber.org/mock/mockgen -package mock_$GOPACKAGE -destination=./mock/$GOFILE -source=./$GOFILE @@ -72,13 +73,13 @@ type ProfileService interface { } type profileService struct { - publisher events.Publisher + publisher interfaces.Publisher validator *Validator } // NewProfileService creates an instance of ProfileService func NewProfileService( - publisher events.Publisher, + publisher interfaces.Publisher, selChecker selectors.SelectionChecker, ) ProfileService { return &profileService{ @@ -419,7 +420,7 @@ func (p *profileService) sendNewProfileEvent( } // This is a non-fatal error, so we'll just log it and continue with the next ones - if err := p.publisher.Publish(events.TopicQueueReconcileProfileInit, msg); err != nil { + if err := p.publisher.Publish(constants.TopicQueueReconcileProfileInit, msg); err != nil { log.Printf("error publishing reconciler event: %v", err) } }