Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: changed transaction handling related to Environment package and Auth package #1501

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 21 additions & 22 deletions pkg/auth/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type authService struct {
signer token.Signer
config *auth.OAuthConfig
mysqlClient mysql.Client
organizationStorage envstotage.OrganizationStorage
projectStorage envstotage.ProjectStorage
environmentStorage envstotage.EnvironmentStorage
accountClient accountclient.Client
verifier token.Verifier
googleAuthenticator auth.Authenticator
Expand All @@ -112,13 +115,16 @@ func NewAuthService(
}
logger := options.logger.Named("api")
service := &authService{
issuer: issuer,
audience: audience,
signer: signer,
config: config,
mysqlClient: mysqlClient,
accountClient: accountClient,
verifier: verifier,
issuer: issuer,
audience: audience,
signer: signer,
config: config,
mysqlClient: mysqlClient,
organizationStorage: envstotage.NewOrganizationStorage(mysqlClient),
environmentStorage: envstotage.NewEnvironmentStorage(mysqlClient),
projectStorage: envstotage.NewProjectStorage(mysqlClient),
accountClient: accountClient,
verifier: verifier,
googleAuthenticator: google.NewAuthenticator(
&config.GoogleConfig, signer, logger,
),
Expand Down Expand Up @@ -602,19 +608,14 @@ func (s *authService) PrepareDemoUser() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tx, err := s.mysqlClient.BeginTx(ctx)
if err != nil {
s.logger.Error("Create mysql tx error", zap.Error(err))
return
}
now := time.Now()
err = s.mysqlClient.RunInTransaction(ctx, tx, func() error {
var err error
err = s.mysqlClient.RunInTransactionV2(ctx, func(contextWithTx context.Context, _ mysql.Transaction) error {
// Create a demo organization if not exists
organizationStorage := envstotage.NewOrganizationStorage(tx)
_, err = organizationStorage.GetOrganization(ctx, config.OrganizationId)
_, err = s.organizationStorage.GetOrganization(contextWithTx, config.OrganizationId)
if err != nil {
if errors.Is(err, envstotage.ErrOrganizationNotFound) {
err = organizationStorage.CreateOrganization(ctx, &envdomain.Organization{
err = s.organizationStorage.CreateOrganization(contextWithTx, &envdomain.Organization{
Organization: &envproto.Organization{
Id: config.OrganizationId,
Name: "Demo organization",
Expand All @@ -635,11 +636,10 @@ func (s *authService) PrepareDemoUser() {
}
}
// Create a demo project if not exists
projectStorage := envstotage.NewProjectStorage(tx)
_, err = projectStorage.GetProject(ctx, config.ProjectId)
_, err = s.projectStorage.GetProject(contextWithTx, config.ProjectId)
if err != nil {
if errors.Is(err, envstotage.ErrProjectNotFound) {
err = projectStorage.CreateProject(ctx, &envdomain.Project{
err = s.projectStorage.CreateProject(contextWithTx, &envdomain.Project{
Project: &envproto.Project{
Id: config.ProjectId,
Description: "This project is for demo users",
Expand All @@ -659,11 +659,10 @@ func (s *authService) PrepareDemoUser() {
}
}
// Create a demo environment if not exists
environmentStorage := envstotage.NewEnvironmentStorage(tx)
_, err = environmentStorage.GetEnvironmentV2(ctx, config.EnvironmentId)
_, err = s.environmentStorage.GetEnvironmentV2(contextWithTx, config.EnvironmentId)
if err != nil {
if errors.Is(err, envstotage.ErrEnvironmentNotFound) {
err = environmentStorage.CreateEnvironmentV2(ctx, &envdomain.EnvironmentV2{
err = s.environmentStorage.CreateEnvironmentV2(contextWithTx, &envdomain.EnvironmentV2{
EnvironmentV2: &envproto.EnvironmentV2{
Id: config.EnvironmentId,
Name: "Demo",
Expand Down
27 changes: 17 additions & 10 deletions pkg/environment/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/status"

accountclient "github.com/bucketeer-io/bucketeer/pkg/account/client"
v2 "github.com/bucketeer-io/bucketeer/pkg/environment/storage/v2"
"github.com/bucketeer-io/bucketeer/pkg/locale"
"github.com/bucketeer-io/bucketeer/pkg/log"
"github.com/bucketeer-io/bucketeer/pkg/pubsub/publisher"
Expand All @@ -47,11 +48,14 @@ func WithLogger(l *zap.Logger) Option {
}

type EnvironmentService struct {
accountClient accountclient.Client
mysqlClient mysql.Client
publisher publisher.Publisher
opts *options
logger *zap.Logger
accountClient accountclient.Client
mysqlClient mysql.Client
projectStorage v2.ProjectStorage
orgStorage v2.OrganizationStorage
environmentStorage v2.EnvironmentStorage
publisher publisher.Publisher
opts *options
logger *zap.Logger
}

func NewEnvironmentService(
Expand All @@ -67,11 +71,14 @@ func NewEnvironmentService(
opt(dopts)
}
return &EnvironmentService{
accountClient: ac,
mysqlClient: mysqlClient,
publisher: publisher,
opts: dopts,
logger: dopts.logger.Named("api"),
accountClient: ac,
mysqlClient: mysqlClient,
projectStorage: v2.NewProjectStorage(mysqlClient),
orgStorage: v2.NewOrganizationStorage(mysqlClient),
environmentStorage: v2.NewEnvironmentStorage(mysqlClient),
publisher: publisher,
opts: dopts,
logger: dopts.logger.Named("api"),
}
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/environment/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"

acmock "github.com/bucketeer-io/bucketeer/pkg/account/client/mock"
storagemock "github.com/bucketeer-io/bucketeer/pkg/environment/storage/v2/mock"
"github.com/bucketeer-io/bucketeer/pkg/log"
publishermock "github.com/bucketeer-io/bucketeer/pkg/pubsub/publisher/mock"
"github.com/bucketeer-io/bucketeer/pkg/rpc"
Expand Down Expand Up @@ -77,9 +78,12 @@ func newEnvironmentService(t *testing.T, mockController *gomock.Controller, s st
logger, err := log.NewLogger()
require.NoError(t, err)
return &EnvironmentService{
accountClient: acmock.NewMockClient(mockController),
mysqlClient: mysqlmock.NewMockClient(mockController),
publisher: publishermock.NewMockPublisher(mockController),
logger: logger.Named("api"),
accountClient: acmock.NewMockClient(mockController),
mysqlClient: mysqlmock.NewMockClient(mockController),
orgStorage: storagemock.NewMockOrganizationStorage(mockController),
projectStorage: storagemock.NewMockProjectStorage(mockController),
environmentStorage: storagemock.NewMockEnvironmentStorage(mockController),
publisher: publishermock.NewMockPublisher(mockController),
logger: logger.Named("api"),
}
}
100 changes: 13 additions & 87 deletions pkg/environment/api/environment_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ func (s *EnvironmentService) GetEnvironmentV2(
if err := validateGetEnvironmentV2Request(req, localizer); err != nil {
return nil, err
}
environmentStorage := v2es.NewEnvironmentStorage(s.mysqlClient)
environment, err := environmentStorage.GetEnvironmentV2(ctx, req.Id)
environment, err := s.environmentStorage.GetEnvironmentV2(ctx, req.Id)
if err != nil {
if err == v2es.ErrEnvironmentNotFound {
dt, err := statusEnvironmentNotFound.WithDetails(&errdetails.LocalizedMessage{
Expand Down Expand Up @@ -147,8 +146,7 @@ func (s *EnvironmentService) ListEnvironmentsV2(
}
return nil, dt.Err()
}
environmentStorage := v2es.NewEnvironmentStorage(s.mysqlClient)
environments, nextCursor, totalCount, err := environmentStorage.ListEnvironmentsV2(
environments, nextCursor, totalCount, err := s.environmentStorage.ListEnvironmentsV2(
ctx,
whereParts,
orders,
Expand Down Expand Up @@ -340,33 +338,15 @@ func (s *EnvironmentService) createEnvironmentV2(
editor *eventproto.Editor,
localizer locale.Localizer,
) error {
tx, err := s.mysqlClient.BeginTx(ctx)
if err != nil {
s.logger.Error(
"Failed to begin transaction",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
)...,
)
dt, err := statusInternal.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.InternalServerError),
})
if err != nil {
return statusInternal.Err()
}
return dt.Err()
}
err = s.mysqlClient.RunInTransaction(ctx, tx, func() error {
environmentStorage := v2es.NewEnvironmentStorage(tx)
err := s.mysqlClient.RunInTransactionV2(ctx, func(contextWithTx context.Context, _ mysql.Transaction) error {
handler, err := command.NewEnvironmentV2CommandHandler(editor, environment, s.publisher)
if err != nil {
return err
}
if err := handler.Handle(ctx, cmd); err != nil {
return err
}
return environmentStorage.CreateEnvironmentV2(ctx, environment)
return s.environmentStorage.CreateEnvironmentV2(contextWithTx, environment)
})
if err != nil {
if err == v2es.ErrEnvironmentAlreadyExists {
Expand Down Expand Up @@ -421,26 +401,8 @@ func (s *EnvironmentService) updateEnvironmentV2(
editor *eventproto.Editor,
localizer locale.Localizer,
) error {
tx, err := s.mysqlClient.BeginTx(ctx)
if err != nil {
s.logger.Error(
"Failed to begin transaction",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
)...,
)
dt, err := statusInternal.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.InternalServerError),
})
if err != nil {
return statusInternal.Err()
}
return dt.Err()
}
err = s.mysqlClient.RunInTransaction(ctx, tx, func() error {
environmentStorage := v2es.NewEnvironmentStorage(tx)
environment, err := environmentStorage.GetEnvironmentV2(ctx, envId)
err := s.mysqlClient.RunInTransactionV2(ctx, func(contextWithTx context.Context, _ mysql.Transaction) error {
environment, err := s.environmentStorage.GetEnvironmentV2(contextWithTx, envId)
if err != nil {
return err
}
Expand All @@ -453,7 +415,7 @@ func (s *EnvironmentService) updateEnvironmentV2(
return err
}
}
return environmentStorage.UpdateEnvironmentV2(ctx, environment)
return s.environmentStorage.UpdateEnvironmentV2(contextWithTx, environment)
})
if err != nil {
if err == v2es.ErrEnvironmentNotFound || err == v2es.ErrEnvironmentUnexpectedAffectedRows {
Expand Down Expand Up @@ -548,26 +510,8 @@ func (s *EnvironmentService) ArchiveEnvironmentV2(
if err := validateArchiveEnvironmentV2Request(req, localizer); err != nil {
return nil, err
}
tx, err := s.mysqlClient.BeginTx(ctx)
if err != nil {
s.logger.Error(
"Failed to begin transaction",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
)...,
)
dt, err := statusInternal.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.InternalServerError),
})
if err != nil {
return nil, statusInternal.Err()
}
return nil, dt.Err()
}
err = s.mysqlClient.RunInTransaction(ctx, tx, func() error {
environmentStorage := v2es.NewEnvironmentStorage(tx)
environment, err := environmentStorage.GetEnvironmentV2(ctx, req.Id)
err = s.mysqlClient.RunInTransactionV2(ctx, func(contextWithTx context.Context, _ mysql.Transaction) error {
environment, err := s.environmentStorage.GetEnvironmentV2(contextWithTx, req.Id)
if err != nil {
return err
}
Expand All @@ -578,7 +522,7 @@ func (s *EnvironmentService) ArchiveEnvironmentV2(
if err := handler.Handle(ctx, req.Command); err != nil {
return err
}
return environmentStorage.UpdateEnvironmentV2(ctx, environment)
return s.environmentStorage.UpdateEnvironmentV2(contextWithTx, environment)
})
if err != nil {
if err == v2es.ErrEnvironmentNotFound || err == v2es.ErrEnvironmentUnexpectedAffectedRows {
Expand Down Expand Up @@ -637,26 +581,8 @@ func (s *EnvironmentService) UnarchiveEnvironmentV2(
if err := validateUnarchiveEnvironmentV2Request(req, localizer); err != nil {
return nil, err
}
tx, err := s.mysqlClient.BeginTx(ctx)
if err != nil {
s.logger.Error(
"Failed to begin transaction",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
)...,
)
dt, err := statusInternal.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.InternalServerError),
})
if err != nil {
return nil, statusInternal.Err()
}
return nil, dt.Err()
}
err = s.mysqlClient.RunInTransaction(ctx, tx, func() error {
environmentStorage := v2es.NewEnvironmentStorage(tx)
environment, err := environmentStorage.GetEnvironmentV2(ctx, req.Id)
err = s.mysqlClient.RunInTransactionV2(ctx, func(contextWithTx context.Context, _ mysql.Transaction) error {
environment, err := s.environmentStorage.GetEnvironmentV2(contextWithTx, req.Id)
if err != nil {
return err
}
Expand All @@ -667,7 +593,7 @@ func (s *EnvironmentService) UnarchiveEnvironmentV2(
if err := handler.Handle(ctx, req.Command); err != nil {
return err
}
return environmentStorage.UpdateEnvironmentV2(ctx, environment)
return s.environmentStorage.UpdateEnvironmentV2(contextWithTx, environment)
})
if err != nil {
if err == v2es.ErrEnvironmentNotFound || err == v2es.ErrEnvironmentUnexpectedAffectedRows {
Expand Down
Loading
Loading