Skip to content

Commit

Permalink
Move events from internal to pkg (#4812)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdimitrov authored Oct 25, 2024
1 parent 2f4770a commit 30decca
Show file tree
Hide file tree
Showing 53 changed files with 378 additions and 329 deletions.
4 changes: 2 additions & 2 deletions internal/controlplane/handlers_entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/engcontext"
"github.com/mindersec/minder/internal/entities/properties"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/logger"
"github.com/mindersec/minder/internal/providers"
"github.com/mindersec/minder/internal/reconcilers/messages"
"github.com/mindersec/minder/internal/util"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
"github.com/mindersec/minder/pkg/eventer/constants"
)

// ReconcileEntityRegistration reconciles the registration of an entity.
Expand Down Expand Up @@ -100,7 +100,7 @@ func (s *Server) ReconcileEntityRegistration(

func (s *Server) publishEntityMessage(l *zerolog.Logger, msg *message.Message) error {
l.Info().Str("messageID", msg.UUID).Msg("publishing register entities message for execution")
return s.evt.Publish(events.TopicQueueReconcileEntityAdd, msg)
return s.evt.Publish(constants.TopicQueueReconcileEntityAdd, msg)
}

func createEntityMessage(
Expand Down
2 changes: 1 addition & 1 deletion internal/controlplane/handlers_entities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (

"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/engcontext"
mockevents "github.com/mindersec/minder/internal/events/mock"
mockgh "github.com/mindersec/minder/internal/providers/github/mock"
"github.com/mindersec/minder/internal/providers/manager"
mockmanager "github.com/mindersec/minder/internal/providers/manager/mock"
rf "github.com/mindersec/minder/internal/repositories/mock/fixtures"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
mockevents "github.com/mindersec/minder/pkg/eventer/interfaces/mock"
)

func TestServer_ReconcileEntityRegistration(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions internal/controlplane/handlers_oauth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/engcontext"
mockprops "github.com/mindersec/minder/internal/entities/properties/service/mock"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/providers"
"github.com/mindersec/minder/internal/providers/dockerhub"
mockclients "github.com/mindersec/minder/internal/providers/github/clients/mock"
Expand All @@ -51,6 +50,7 @@ import (
"github.com/mindersec/minder/internal/providers/session"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer"
provinfv1 "github.com/mindersec/minder/pkg/providers/v1"
)

Expand Down Expand Up @@ -378,7 +378,7 @@ func TestGetAuthorizationURL(t *testing.T) {
store := mockdb.NewMockStore(ctrl)
tc.buildStubs(store)

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down Expand Up @@ -890,7 +890,7 @@ func TestHandleGitHubAppCallback(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down Expand Up @@ -1046,7 +1046,7 @@ func TestVerifyProviderCredential(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down
4 changes: 2 additions & 2 deletions internal/controlplane/handlers_reconciliationtasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (

"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/engcontext"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/logger"
reconcilers "github.com/mindersec/minder/internal/reconcilers/messages"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
"github.com/mindersec/minder/pkg/eventer/constants"
)

// CreateEntityReconciliationTask creates a task to reconcile the state of an entity
Expand Down Expand Up @@ -49,7 +49,7 @@ func (s *Server) CreateEntityReconciliationTask(ctx context.Context,
if err != nil {
return nil, err
}
topic = events.TopicQueueReconcileRepoInit
topic = constants.TopicQueueReconcileRepoInit
} else {
return nil, status.Errorf(codes.InvalidArgument, "entity type %s is not supported", entity.GetType())
}
Expand Down
4 changes: 2 additions & 2 deletions internal/controlplane/handlers_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import (
"github.com/mindersec/minder/internal/authz/mock"
mockcrypto "github.com/mindersec/minder/internal/crypto/mock"
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/flags"
"github.com/mindersec/minder/internal/marketplaces"
"github.com/mindersec/minder/internal/projects"
"github.com/mindersec/minder/internal/providers"
mockprov "github.com/mindersec/minder/internal/providers/github/service/mock"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer"
)

const (
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestDeleteUser_gRPC(t *testing.T) {
}))
defer testServer.Close()

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down
6 changes: 3 additions & 3 deletions internal/controlplane/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/mindersec/minder/internal/crypto"
"github.com/mindersec/minder/internal/db"
propSvc "github.com/mindersec/minder/internal/entities/properties/service"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/history"
"github.com/mindersec/minder/internal/invites"
"github.com/mindersec/minder/internal/logger"
Expand All @@ -63,6 +62,7 @@ import (
"github.com/mindersec/minder/internal/util"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer/interfaces"
"github.com/mindersec/minder/pkg/profiles"
"github.com/mindersec/minder/pkg/ruletypes"
)
Expand All @@ -81,7 +81,7 @@ var (
type Server struct {
store db.Store
cfg *serverconfig.Config
evt events.Publisher
evt interfaces.Publisher
mt metrics.Metrics
grpcServer *grpc.Server
jwt jwt.Validator
Expand Down Expand Up @@ -126,7 +126,7 @@ type Server struct {
// NewServer creates a new server instance
func NewServer(
store db.Store,
evt events.Publisher,
evt interfaces.Publisher,
cfg *serverconfig.Config,
serverMetrics metrics.Metrics,
jwtValidator jwt.Validator,
Expand Down
4 changes: 2 additions & 2 deletions internal/controlplane/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ import (
"github.com/mindersec/minder/internal/controlplane/metrics"
"github.com/mindersec/minder/internal/crypto"
mock_service "github.com/mindersec/minder/internal/entities/properties/service/mock"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/providers"
ghclient "github.com/mindersec/minder/internal/providers/github/clients"
ghService "github.com/mindersec/minder/internal/providers/github/service"
mock_reposvc "github.com/mindersec/minder/internal/repositories/mock"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer"
)

const bufSize = 1024 * 1024
Expand Down Expand Up @@ -78,7 +78,7 @@ func newDefaultServer(
) *Server {
t.Helper()

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
evt, err := eventer.New(context.Background(), nil, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down
11 changes: 6 additions & 5 deletions internal/eea/eea.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,25 @@ import (
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/entities"
"github.com/mindersec/minder/internal/entities/properties/service"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/providers/manager"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer/constants"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

// EEA is the Event Execution Aggregator
type EEA struct {
querier db.Store
evt events.Publisher
evt interfaces.Publisher
cfg *serverconfig.AggregatorConfig

entityFetcher service.PropertiesService
provMan manager.ProviderManager
}

// NewEEA creates a new EEA
func NewEEA(querier db.Store, evt events.Publisher, cfg *serverconfig.AggregatorConfig,
func NewEEA(querier db.Store, evt interfaces.Publisher, cfg *serverconfig.AggregatorConfig,
ef service.PropertiesService, provMan manager.ProviderManager) *EEA {
return &EEA{
querier: querier,
Expand All @@ -49,8 +50,8 @@ func NewEEA(querier db.Store, evt events.Publisher, cfg *serverconfig.Aggregator
}

// Register implements the Consumer interface.
func (e *EEA) Register(r events.Registrar) {
r.Register(events.TopicQueueEntityFlush, e.FlushMessageHandler)
func (e *EEA) Register(r interfaces.Registrar) {
r.Register(constants.TopicQueueEntityFlush, e.FlushMessageHandler)
}

// AggregateMiddleware will pass on the event to the executor engine
Expand Down
25 changes: 13 additions & 12 deletions internal/eea/eea_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
"github.com/mindersec/minder/internal/entities/properties"
psvc "github.com/mindersec/minder/internal/entities/properties/service"
propsvcmock "github.com/mindersec/minder/internal/entities/properties/service/mock"
"github.com/mindersec/minder/internal/events"
mockmanager "github.com/mindersec/minder/internal/providers/manager/mock"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer"
"github.com/mindersec/minder/pkg/eventer/constants"
)

const (
Expand All @@ -55,7 +56,7 @@ func TestAggregator(t *testing.T) {

projectID, repoID := createNeededEntities(ctx, t, testQueries)

evt, err := events.Setup(ctx, &serverconfig.EventConfig{
evt, err := eventer.New(ctx, nil, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{
BufferSize: concurrentEvents,
Expand Down Expand Up @@ -86,7 +87,7 @@ func TestAggregator(t *testing.T) {
aggr.Register(evt)

// This tests that flushing sends messages to the executor engine
evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware)
evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware)

go func() {
t.Log("Running eventer")
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestAggregator(t *testing.T) {
msg, err := inf.BuildMessage()
require.NoError(t, err, "expected no error when building message")

err = evt.Publish(events.TopicQueueEntityFlush, msg.Copy())
err = evt.Publish(constants.TopicQueueEntityFlush, msg.Copy())
require.NoError(t, err, "expected no error when publishing message")
}()
}
Expand Down Expand Up @@ -345,14 +346,14 @@ func TestFlushAll(t *testing.T) {
propsvc := propsvcmock.NewMockPropertiesService(ctrl)
provman := mockmanager.NewMockProviderManager(ctrl)

evt, err := events.Setup(ctx, &serverconfig.EventConfig{
evt, err := eventer.New(ctx, nil, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
require.NoError(t, err)

flushedMessages := newTestPubSub()
evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add)
evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add)

go func() {
t.Log("Running eventer")
Expand Down Expand Up @@ -394,7 +395,7 @@ func TestFlushAllListFlushIsEmpty(t *testing.T) {
require.NoError(t, err, "expected no error when creating embedded store")
t.Cleanup(td)

evt, err := events.Setup(ctx, &serverconfig.EventConfig{
evt, err := eventer.New(ctx, nil, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand All @@ -412,7 +413,7 @@ func TestFlushAllListFlushIsEmpty(t *testing.T) {
flushedMessages := newTestPubSub()

// This tests that flushing sends messages to the executor engine
evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware)
evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware)

t.Log("Flushing all")
require.NoError(t, aggr.FlushAll(ctx), "expected no error")
Expand All @@ -433,14 +434,14 @@ func TestFlushAllListFlushFails(t *testing.T) {

flushedMessages := newTestPubSub()

evt, err := events.Setup(ctx, &serverconfig.EventConfig{
evt, err := eventer.New(ctx, nil, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
require.NoError(t, err)

// This tests that flushing sends messages to the executor engine
evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add)
evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add)

go func() {
t.Log("Running eventer")
Expand Down Expand Up @@ -483,14 +484,14 @@ func TestFlushAllListFlushListsARepoThatGetsDeletedLater(t *testing.T) {

flushedMessages := newTestPubSub()

evt, err := events.Setup(ctx, &serverconfig.EventConfig{
evt, err := eventer.New(ctx, nil, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
require.NoError(t, err)

// This tests that flushing sends messages to the executor engine
evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add)
evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add)

go func() {
t.Log("Running eventer")
Expand Down
4 changes: 2 additions & 2 deletions internal/email/awsses/awsses.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/email"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

const (
Expand Down Expand Up @@ -47,7 +47,7 @@ func New(ctx context.Context, sender, region string) (*awsSES, error) {
}

// Register implements the Consumer interface.
func (a *awsSES) Register(reg events.Registrar) {
func (a *awsSES) Register(reg interfaces.Registrar) {
reg.Register(email.TopicQueueInviteEmail, func(msg *message.Message) error {
var e email.MailEventPayload

Expand Down
4 changes: 2 additions & 2 deletions internal/email/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/email"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

type noop struct {
Expand All @@ -24,7 +24,7 @@ func New() *noop {
}

// Register implements the Consumer interface.
func (_ *noop) Register(reg events.Registrar) {
func (_ *noop) Register(reg interfaces.Registrar) {
reg.Register(email.TopicQueueInviteEmail, func(msg *message.Message) error {
var e email.MailEventPayload

Expand Down
7 changes: 4 additions & 3 deletions internal/engine/entities/entity_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"

"github.com/mindersec/minder/internal/events"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
"github.com/mindersec/minder/pkg/eventer/constants"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

// EntityInfoWrapper is a helper struct to gather information
Expand Down Expand Up @@ -180,13 +181,13 @@ func (eiw *EntityInfoWrapper) BuildMessage() (*message.Message, error) {
}

// Publish builds a message.Message and publishes it to the event bus
func (eiw *EntityInfoWrapper) Publish(evt events.Publisher) error {
func (eiw *EntityInfoWrapper) Publish(evt interfaces.Publisher) error {
msg, err := eiw.BuildMessage()
if err != nil {
return err
}

if err := evt.Publish(events.TopicQueueEntityEvaluate, msg); err != nil {
if err := evt.Publish(constants.TopicQueueEntityEvaluate, msg); err != nil {
return fmt.Errorf("error publishing entity event: %w", err)
}

Expand Down
Loading

0 comments on commit 30decca

Please sign in to comment.