Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/replica-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ jobs:
- name: Run tests
run: script/docker-gh-ost-replica-tests run

- name: Set artifact name
if: failure()
run: |
ARTIFACT_NAME=$(echo "${{ matrix.image }}" | tr '/:' '-')
echo "ARTIFACT_NAME=test-logs-${ARTIFACT_NAME}" >> $GITHUB_ENV

- name: Upload test logs on failure
if: failure()
uses: actions/upload-artifact@v4
with:
name: ${{ env.ARTIFACT_NAME }}
path: /tmp/gh-ost-test.*
retention-days: 7

- name: Teardown environment
if: always()
run: script/docker-gh-ost-replica-tests down
3 changes: 3 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ func main() {
if migrationContext.CheckpointIntervalSeconds < 10 {
migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10")
}
if migrationContext.CountTableRows && migrationContext.PanicOnWarnings {
migrationContext.Log.Warning("--exact-rowcount with --panic-on-warnings: row counts cannot be exact due to warning detection")
}

switch *cutOver {
case "atomic", "default", "":
Expand Down
232 changes: 162 additions & 70 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
}
}

// compileMigrationKeyWarningRegex compiles a regex pattern that matches duplicate key warnings
// for the migration's unique key. Duplicate warnings are formatted differently across MySQL versions,
// hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid
// regex syntax errors.
func (this *Applier) compileMigrationKeyWarningRegex() (*regexp.Regexp, error) {
escapedTable := regexp.QuoteMeta(this.migrationContext.GetGhostTableName())
escapedKey := regexp.QuoteMeta(this.migrationContext.UniqueKey.NameInGhostTable)
migrationUniqueKeyPattern := fmt.Sprintf(`for key '(%s\.)?%s'`, escapedTable, escapedKey)
migrationKeyRegex, err := regexp.Compile(migrationUniqueKeyPattern)
if err != nil {
return nil, fmt.Errorf("failed to compile migration key pattern: %w", err)
}
return migrationKeyRegex, nil
}

func (this *Applier) InitDBConnections() (err error) {
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
uriWithMulti := fmt.Sprintf("%s&multiStatements=true", applierUri)
Expand Down Expand Up @@ -917,6 +932,12 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
return nil, err
}

// Compile regex once before loop to avoid performance penalty and handle errors properly
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
if err != nil {
return nil, err
}

var sqlWarnings []string
for rows.Next() {
var level, message string
Expand All @@ -925,10 +946,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
continue
}
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable)
matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message)
if strings.Contains(message, "Duplicate entry") && matched {
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
continue
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
Expand Down Expand Up @@ -1468,6 +1486,107 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
}

// executeBatchWithWarningChecking executes a batch of DML statements with SHOW WARNINGS
// interleaved after each statement to detect warnings from any statement in the batch.
// This is used when PanicOnWarnings is enabled to ensure warnings from middle statements
// are not lost (SHOW WARNINGS only shows warnings from the last statement in a multi-statement batch).
func (this *Applier) executeBatchWithWarningChecking(ctx context.Context, tx *gosql.Tx, buildResults []*dmlBuildResult) (int64, error) {
// Build query with interleaved SHOW WARNINGS: stmt1; SHOW WARNINGS; stmt2; SHOW WARNINGS; ...
var queryBuilder strings.Builder
args := make([]interface{}, 0)

for _, buildResult := range buildResults {
queryBuilder.WriteString(buildResult.query)
queryBuilder.WriteString(";\nSHOW WARNINGS;\n")
args = append(args, buildResult.args...)
}

query := queryBuilder.String()

// Execute the multi-statement query
rows, err := tx.QueryContext(ctx, query, args...)
if err != nil {
return 0, fmt.Errorf("%w; query=%s; args=%+v", err, query, args)
}
defer rows.Close()

var totalDelta int64

// QueryContext with multi-statement queries returns rows positioned at the first result set
// that produces rows (i.e., the first SHOW WARNINGS), automatically skipping DML results.
// Verify we're at a SHOW WARNINGS result set (should have 3 columns: Level, Code, Message)
cols, err := rows.Columns()
if err != nil {
return 0, fmt.Errorf("failed to get columns: %w", err)
}

// If somehow we're not at a result set with columns, try to advance
if len(cols) == 0 {
if !rows.NextResultSet() {
return 0, fmt.Errorf("expected SHOW WARNINGS result set after first statement")
}
}

// Compile regex once before loop to avoid performance penalty and handle errors properly
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
if err != nil {
return 0, err
}

// Iterate through SHOW WARNINGS result sets.
// DML statements don't create navigable result sets, so we move directly between SHOW WARNINGS.
// Pattern: [at SHOW WARNINGS #1] -> read warnings -> NextResultSet() -> [at SHOW WARNINGS #2] -> ...
for i := 0; i < len(buildResults); i++ {
// We can't get exact rows affected with QueryContext (needed for reading SHOW WARNINGS).
// Use the theoretical delta (+1 for INSERT, -1 for DELETE, 0 for UPDATE) as an approximation.
// This may be inaccurate (e.g., INSERT IGNORE with duplicate affects 0 rows but we count +1).
totalDelta += buildResults[i].rowsDelta

// Read warnings from this statement's SHOW WARNINGS result set
var sqlWarnings []string
for rows.Next() {
var level, message string
var code int
if err := rows.Scan(&level, &code, &message); err != nil {
// Scan failure means we cannot reliably read warnings.
// Since PanicOnWarnings is a safety feature, we must fail hard rather than silently skip.
return 0, fmt.Errorf("failed to scan SHOW WARNINGS for statement %d: %w", i+1, err)
}

if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
// Duplicate entry on migration unique key is expected during binlog replay
// (row was already copied during bulk copy phase)
continue
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
}

// Check for errors that occurred while iterating through warnings
if err := rows.Err(); err != nil {
return 0, fmt.Errorf("error reading SHOW WARNINGS result set for statement %d: %w", i+1, err)
}

if len(sqlWarnings) > 0 {
return 0, fmt.Errorf("warnings detected in statement %d of %d: %v", i+1, len(buildResults), sqlWarnings)
}

// Move to the next statement's SHOW WARNINGS result set
// For the last statement, there's no next result set
// DML statements don't create result sets, so we only need one NextResultSet call
// to move from SHOW WARNINGS #N to SHOW WARNINGS #(N+1)
if i < len(buildResults)-1 {
if !rows.NextResultSet() {
if err := rows.Err(); err != nil {
return 0, fmt.Errorf("error moving to SHOW WARNINGS for statement %d: %w", i+2, err)
}
return 0, fmt.Errorf("expected SHOW WARNINGS result set for statement %d", i+2)
}
}
}

return totalDelta, nil
}

// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
var totalDelta int64
Expand Down Expand Up @@ -1507,79 +1626,52 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
}
}

// We batch together the DML queries into multi-statements to minimize network trips.
// We have to use the raw driver connection to access the rows affected
// for each statement in the multi-statement.
execErr := conn.Raw(func(driverConn any) error {
ex := driverConn.(driver.ExecerContext)
nvc := driverConn.(driver.NamedValueChecker)

multiArgs := make([]driver.NamedValue, 0, nArgs)
multiQueryBuilder := strings.Builder{}
for _, buildResult := range buildResults {
for _, arg := range buildResult.args {
nv := driver.NamedValue{Value: driver.Value(arg)}
nvc.CheckNamedValue(&nv)
multiArgs = append(multiArgs, nv)
}

multiQueryBuilder.WriteString(buildResult.query)
multiQueryBuilder.WriteString(";\n")
}

res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
return err
}

mysqlRes := res.(drivermysql.Result)

// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
totalDelta += buildResults[i].rowsDelta * rowsAffected
}
return nil
})

if execErr != nil {
return rollback(execErr)
}

// Check for warnings when PanicOnWarnings is enabled
// When PanicOnWarnings is enabled, we need to check warnings after each statement
// in the batch. SHOW WARNINGS only shows warnings from the last statement in a
// multi-statement query, so we interleave SHOW WARNINGS after each DML statement.
if this.migrationContext.PanicOnWarnings {
//nolint:execinquery
rows, err := tx.Query("SHOW WARNINGS")
totalDelta, err = this.executeBatchWithWarningChecking(ctx, tx, buildResults)
if err != nil {
return rollback(err)
}
defer rows.Close()
if err = rows.Err(); err != nil {
return rollback(err)
}
} else {
// Fast path: batch together DML queries into multi-statements to minimize network trips.
// We use the raw driver connection to access the rows affected for each statement.
execErr := conn.Raw(func(driverConn any) error {
ex := driverConn.(driver.ExecerContext)
nvc := driverConn.(driver.NamedValueChecker)

multiArgs := make([]driver.NamedValue, 0, nArgs)
multiQueryBuilder := strings.Builder{}
for _, buildResult := range buildResults {
for _, arg := range buildResult.args {
nv := driver.NamedValue{Value: driver.Value(arg)}
nvc.CheckNamedValue(&nv)
multiArgs = append(multiArgs, nv)
}

multiQueryBuilder.WriteString(buildResult.query)
multiQueryBuilder.WriteString(";\n")
}

var sqlWarnings []string
for rows.Next() {
var level, message string
var code int
if err := rows.Scan(&level, &code, &message); err != nil {
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
continue
res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
return err
}
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable)
matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message)
if strings.Contains(message, "Duplicate entry") && matched {
// Duplicate entry on migration unique key is expected during binlog replay
// (row was already copied during bulk copy phase)
continue

mysqlRes := res.(drivermysql.Result)

// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
totalDelta += buildResults[i].rowsDelta * rowsAffected
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
}
if len(sqlWarnings) > 0 {
warningMsg := fmt.Sprintf("Warnings detected during DML event application: %v", sqlWarnings)
return rollback(errors.New(warningMsg))
return nil
})

if execErr != nil {
return rollback(execErr)
}
}

Expand Down
Loading