Skip to content

Commit

Permalink
Cancel any row count queries before attempting to cut over (#846)
Browse files Browse the repository at this point in the history
* Cancel any row count queries before attempting to cut over

Closes #830. Switches from using `QueryRow` to `QueryRowContext`, and
stores a context.CancelFunc in the migration context, which is called to
halt any running row count query before beginning the cut over.

* Make it threadsafe

* Kill the count query on the database side as well

* Explicitly grab a connection to run the count, store its connection id
* When the query context is canceled, run a `KILL QUERY ?` on that connection id

* Rewrite these to use the threadsafe functions, stop exporting the cancel func

* Update logger

* Update logger

Co-authored-by: Tim Vaillancourt <[email protected]>
Co-authored-by: Tim Vaillancourt <[email protected]>
Co-authored-by: dm-2 <[email protected]>
  • Loading branch information
4 people committed Jul 7, 2022
1 parent 614b379 commit 3e72f1b
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 7 deletions.
32 changes: 32 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type MigrationContext struct {
AlterStatement string
AlterStatementOptions string // anything following the 'ALTER TABLE [schema.]table' from AlterStatement

countMutex sync.Mutex
countTableRowsCancelFunc func()
CountTableRows bool
ConcurrentCountTableRows bool
AllowedRunningOnMaster bool
Expand Down Expand Up @@ -429,6 +431,36 @@ func (this *MigrationContext) IsTransactionalTable() bool {
return false
}

// SetCountTableRowsCancelFunc sets the cancel function for the CountTableRows query context
func (this *MigrationContext) SetCountTableRowsCancelFunc(f func()) {
this.countMutex.Lock()
defer this.countMutex.Unlock()

this.countTableRowsCancelFunc = f
}

// IsCountingTableRows returns true if the migration has a table count query running
func (this *MigrationContext) IsCountingTableRows() bool {
this.countMutex.Lock()
defer this.countMutex.Unlock()

return this.countTableRowsCancelFunc != nil
}

// CancelTableRowsCount cancels the CountTableRows query context. It is safe to
// call function even when IsCountingTableRows is false.
func (this *MigrationContext) CancelTableRowsCount() {
this.countMutex.Lock()
defer this.countMutex.Unlock()

if this.countTableRowsCancelFunc == nil {
return
}

this.countTableRowsCancelFunc()
this.countTableRowsCancelFunc = nil
}

// ElapsedTime returns time since very beginning of the process
func (this *MigrationContext) ElapsedTime() time.Duration {
return time.Since(this.StartTime)
Expand Down
37 changes: 34 additions & 3 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package logic

import (
"context"
gosql "database/sql"
"fmt"
"reflect"
Expand Down Expand Up @@ -533,18 +534,48 @@ func (this *Inspector) estimateTableRowsViaExplain() error {
return nil
}

// Kill kills a query for connectionID.
// - @amason: this should go somewhere _other_ than `logic`, but I couldn't decide
// between `base`, `sql`, or `mysql`.
func Kill(db *gosql.DB, connectionID string) error {
_, err := db.Exec(`KILL QUERY %s`, connectionID)
return err
}

// CountTableRows counts exact number of rows on the original table
func (this *Inspector) CountTableRows() error {
func (this *Inspector) CountTableRows(ctx context.Context) error {
atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 1)
defer atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 0)

this.migrationContext.Log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")

conn, err := this.db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()

var connectionID string
if err := conn.QueryRowContext(ctx, `SELECT /* gh-ost */ CONNECTION_ID()`).Scan(&connectionID); err != nil {
return err
}

query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
var rowsEstimate int64
if err := this.db.QueryRow(query).Scan(&rowsEstimate); err != nil {
return err
if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil {
switch err {
case context.Canceled, context.DeadlineExceeded:
this.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err())
return Kill(this.db, connectionID)
default:
return err
}
}

// row count query finished. nil out the cancel func, so the main migration thread
// doesn't bother calling it after row copy is done.
this.migrationContext.SetCountTableRowsCancelFunc(nil)

atomic.StoreInt64(&this.migrationContext.RowsEstimate, rowsEstimate)
this.migrationContext.UsedRowsEstimateMethod = base.CountRowsEstimate

Expand Down
18 changes: 14 additions & 4 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package logic

import (
"context"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -295,8 +296,8 @@ func (this *Migrator) countTableRows() (err error) {
return nil
}

countRowsFunc := func() error {
if err := this.inspector.CountTableRows(); err != nil {
countRowsFunc := func(ctx context.Context) error {
if err := this.inspector.CountTableRows(ctx); err != nil {
return err
}
if err := this.hooksExecutor.onRowCountComplete(); err != nil {
Expand All @@ -306,12 +307,17 @@ func (this *Migrator) countTableRows() (err error) {
}

if this.migrationContext.ConcurrentCountTableRows {
// store a cancel func so we can stop this query before a cut over
rowCountContext, rowCountCancel := context.WithCancel(context.Background())
this.migrationContext.SetCountTableRowsCancelFunc(rowCountCancel)

this.migrationContext.Log.Infof("As instructed, counting rows in the background; meanwhile I will use an estimated count, and will update it later on")
go countRowsFunc()
go countRowsFunc(rowCountContext)

// and we ignore errors, because this turns to be a background job
return nil
}
return countRowsFunc()
return countRowsFunc(context.Background())
}

func (this *Migrator) createFlagFiles() (err error) {
Expand Down Expand Up @@ -415,6 +421,10 @@ func (this *Migrator) Migrate() (err error) {
}
this.printStatus(ForcePrintStatusRule)

if this.migrationContext.IsCountingTableRows() {
this.migrationContext.Log.Info("stopping query for exact row count, because that can accidentally lock out the cut over")
this.migrationContext.CancelTableRowsCount()
}
if err := this.hooksExecutor.onBeforeCutOver(); err != nil {
return err
}
Expand Down

0 comments on commit 3e72f1b

Please sign in to comment.