Skip to content

Commit

Permalink
Merge branch 'master' into concurrent-rowcount-defaults-true
Browse files Browse the repository at this point in the history
  • Loading branch information
Shlomi Noach authored Nov 1, 2016
2 parents 10850e4 + 3435539 commit 02e9287
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 33 deletions.
1 change: 1 addition & 0 deletions RELEASE_VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.0.28
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
#

RELEASE_VERSION="1.0.26"
RELEASE_VERSION=$(cat RELEASE_VERSION)

function build {
osname=$1
Expand Down
20 changes: 16 additions & 4 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,28 @@ const (
CutOverTwoStep = iota
)

type ThrottleReasonHint string

const (
NoThrottleReasonHint ThrottleReasonHint = "NoThrottleReasonHint"
UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint"
)

var (
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
)

type ThrottleCheckResult struct {
ShouldThrottle bool
Reason string
ReasonHint ThrottleReasonHint
}

func NewThrottleCheckResult(throttle bool, reason string) *ThrottleCheckResult {
func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleReasonHint) *ThrottleCheckResult {
return &ThrottleCheckResult{
ShouldThrottle: throttle,
Reason: reason,
ReasonHint: reasonHint,
}
}

Expand Down Expand Up @@ -138,13 +147,15 @@ type MigrationContext struct {
TotalDMLEventsApplied int64
isThrottled bool
throttleReason string
throttleReasonHint ThrottleReasonHint
throttleGeneralCheckResult ThrottleCheckResult
throttleMutex *sync.Mutex
IsPostponingCutOver int64
CountingRowsFlag int64
AllEventsUpToLockProcessedInjectedFlag int64
CleanupImminentFlag int64
UserCommandedUnpostponeFlag int64
CutOverCompleteFlag int64
PanicAbort chan error

OriginalTableColumnsOnApplier *sql.ColumnList
Expand Down Expand Up @@ -416,17 +427,18 @@ func (this *MigrationContext) GetThrottleGeneralCheckResult() *ThrottleCheckResu
return &result
}

func (this *MigrationContext) SetThrottled(throttle bool, reason string) {
func (this *MigrationContext) SetThrottled(throttle bool, reason string, reasonHint ThrottleReasonHint) {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
this.isThrottled = throttle
this.throttleReason = reason
this.throttleReasonHint = reasonHint
}

func (this *MigrationContext) IsThrottled() (bool, string) {
func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
return this.isThrottled, this.throttleReason
return this.isThrottled, this.throttleReason, this.throttleReasonHint
}

func (this *MigrationContext) GetReplicationLagQuery() string {
Expand Down
8 changes: 8 additions & 0 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven

// StreamEvents
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
if canStopStreaming() {
return nil
}
for {
if canStopStreaming() {
break
Expand Down Expand Up @@ -148,3 +151,8 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha

return nil
}

func (this *GoMySQLReader) Close() error {
this.binlogSyncer.Close()
return nil
}
3 changes: 3 additions & 0 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ func (this *Applier) InitiateHeartbeat() {
// Generally speaking, we would issue a goroutine, but I'd actually rather
// have this block the loop rather than spam the master in the event something
// goes wrong
if throttle, _, reasonHint := this.migrationContext.IsThrottled(); throttle && (reasonHint == base.UserCommandThrottleReasonHint) {
continue
}
if err := injectHeartbeat(); err != nil {
return
}
Expand Down
23 changes: 19 additions & 4 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,27 @@ func (this *Inspector) validateLogSlaveUpdates() error {
if err := this.db.QueryRow(query).Scan(&logSlaveUpdates); err != nil {
return err
}
if !logSlaveUpdates && !this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.IsTungsten {
return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)

if logSlaveUpdates {
log.Infof("log_slave_updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil
}

log.Infof("binary logs updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil
if this.migrationContext.IsTungsten {
log.Warning("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil
}

if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
return fmt.Errorf("%s:%d must have log_slave_updates enabled for testing/migrating on replica", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
}

if this.migrationContext.InspectorIsAlsoApplier() {
log.Warning("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil
}

return fmt.Errorf("%s:%d must have log_slave_updates enabled for executing migration", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
}

// validateTable makes sure the table we need to operate on actually exists
Expand Down
8 changes: 6 additions & 2 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (this *Migrator) consumeRowCopyComplete() {
}

func (this *Migrator) canStopStreaming() bool {
return false
return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0
}

// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
Expand Down Expand Up @@ -345,6 +345,7 @@ func (this *Migrator) Migrate() (err error) {
if err := this.cutOver(); err != nil {
return err
}
atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1)

if err := this.finalCleanup(); err != nil {
return nil
Expand Down Expand Up @@ -803,7 +804,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
eta = "due"
state = "postponing cut-over"
} else if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
} else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled {
state = fmt.Sprintf("throttled, %s", throttleReason)
}

Expand Down Expand Up @@ -1058,6 +1059,9 @@ func (this *Migrator) finalCleanup() error {
log.Errore(err)
}
}
if err := this.eventsStreamer.Close(); err != nil {
log.Errore(err)
}

if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions go/logic/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,9 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
}
}
}

func (this *EventsStreamer) Close() (err error) {
err = this.binlogReader.Close()
log.Infof("Closed streamer connection. err=%+v", err)
return err
}
40 changes: 20 additions & 20 deletions go/logic/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
// shouldThrottle performs checks to see whether we should currently be throttling.
// It merely observes the metrics collected by other components, it does not issue
// its own metric collection.
func (this *Throttler) shouldThrottle() (result bool, reason string) {
func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint base.ThrottleReasonHint) {
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
if generalCheckResult.ShouldThrottle {
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint
}
// Replication lag throttle
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()), base.NoThrottleReasonHint
}
checkThrottleControlReplicas := true
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
Expand All @@ -52,14 +52,14 @@ func (this *Throttler) shouldThrottle() (result bool, reason string) {
if checkThrottleControlReplicas {
lagResult := this.migrationContext.GetControlReplicasLagResult()
if lagResult.Err != nil {
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err)
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err), base.NoThrottleReasonHint
}
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds())
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()), base.NoThrottleReasonHint
}
}
// Got here? No metrics indicates we need throttling.
return false, ""
return false, "", base.NoThrottleReasonHint
}

// parseChangelogHeartbeat is called when a heartbeat event is intercepted
Expand Down Expand Up @@ -147,8 +147,8 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
func (this *Throttler) collectGeneralThrottleMetrics() error {

setThrottle := func(throttle bool, reason string) error {
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason))
setThrottle := func(throttle bool, reason string, reasonHint base.ThrottleReasonHint) error {
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason, reasonHint))
return nil
}

Expand All @@ -161,7 +161,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {

criticalLoadMet, variableName, value, threshold, err := this.criticalLoadIsMet()
if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
}
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
Expand All @@ -181,38 +181,38 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {

// User-based throttle
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
return setThrottle(true, "commanded by user")
return setThrottle(true, "commanded by user", base.UserCommandThrottleReasonHint)
}
if this.migrationContext.ThrottleFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
// Throttle file defined and exists!
return setThrottle(true, "flag-file")
return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
}
}
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
// 2nd Throttle file defined and exists!
return setThrottle(true, "flag-file")
return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
}
}

maxLoad := this.migrationContext.GetMaxLoad()
for variableName, threshold := range maxLoad {
value, err := this.applier.ShowStatusVariable(variableName)
if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
}
if value >= threshold {
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold))
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold), base.NoThrottleReasonHint)
}
}
if this.migrationContext.GetThrottleQuery() != "" {
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
return setThrottle(true, "throttle-query")
return setThrottle(true, "throttle-query", base.NoThrottleReasonHint)
}
}

return setThrottle(false, "")
return setThrottle(false, "", base.NoThrottleReasonHint)
}

// initiateThrottlerMetrics initiates the various processes that collect measurements
Expand All @@ -237,8 +237,8 @@ func (this *Throttler) initiateThrottlerChecks() error {
throttlerTick := time.Tick(100 * time.Millisecond)

throttlerFunction := func() {
alreadyThrottling, currentReason := this.migrationContext.IsThrottled()
shouldThrottle, throttleReason := this.shouldThrottle()
alreadyThrottling, currentReason, _ := this.migrationContext.IsThrottled()
shouldThrottle, throttleReason, throttleReasonHint := this.shouldThrottle()
if shouldThrottle && !alreadyThrottling {
// New throttling
this.applier.WriteAndLogChangelog("throttle", throttleReason)
Expand All @@ -249,7 +249,7 @@ func (this *Throttler) initiateThrottlerChecks() error {
// End of throttling
this.applier.WriteAndLogChangelog("throttle", "done throttling")
}
this.migrationContext.SetThrottled(shouldThrottle, throttleReason)
this.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint)
}
throttlerFunction()
for range throttlerTick {
Expand All @@ -265,7 +265,7 @@ func (this *Throttler) throttle(onThrottled func()) {
for {
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
// Therefore calling IsThrottled() is cheap
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
if shouldThrottle, _, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
return
}
if onThrottled != nil {
Expand Down
28 changes: 28 additions & 0 deletions localtests/datetime-submillis/create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
dt0 datetime(6),
dt1 datetime(6),
ts2 timestamp(6),
updated tinyint unsigned default 0,
primary key(id),
key i_idx(i)
) auto_increment=1;

drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values (null, 11, now(), now(), now(), 0);
update gh_ost_test set dt1='2016-10-31 11:22:33.444', updated = 1 where i = 11 order by id desc limit 1;

insert into gh_ost_test values (null, 13, now(), now(), now(), 0);
update gh_ost_test set ts1='2016-11-01 11:22:33.444', updated = 1 where i = 13 order by id desc limit 1;
end ;;
6 changes: 5 additions & 1 deletion localtests/latin1/create.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
t varchar(128),
t varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1 charset latin1 collate latin1_swedish_ci;

Expand All @@ -17,5 +17,9 @@ create event gh_ost_test
begin
insert into gh_ost_test values (null, md5(rand()));
insert into gh_ost_test values (null, 'átesting');
insert into gh_ost_test values (null, 'ádelete');
insert into gh_ost_test values (null, 'testátest');
update gh_ost_test set t='áupdated' order by id desc limit 1;
update gh_ost_test set t='áupdated1' where t='áupdated' order by id desc limit 1;
delete from gh_ost_test where t='ádelete';
end ;;
8 changes: 8 additions & 0 deletions script/cibuild-gh-ost-build-deploy-tarball
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,11 @@ tar cvf $tarball --mode="ugo=rx" bin/
gzip $tarball
mkdir -p "$BUILD_ARTIFACT_DIR"/gh-ost
cp ${tarball}.gz "$BUILD_ARTIFACT_DIR"/gh-ost/

### HACK HACK HACK ###
# Blame @carlosmn. In the good way.
# We don't have any jessie machines for building, but a pure-Go binary depends
# on a version of libc and ld which are widely available, so we can copy the
# tarball over with jessie in its name so we can deploy it on jessie machines.
jessie_tarball_name=$(echo $(basename "${tarball}") | sed s/-precise-/-jessie-/)
cp ${tarball}.gz "$BUILD_ARTIFACT_DIR/gh-ost/${jessie_tarball_name}.gz"

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 02e9287

Please sign in to comment.