diff --git a/dm/syncer/data_validator.go b/dm/syncer/data_validator.go index 8f1bfcc814..5899a98013 100644 --- a/dm/syncer/data_validator.go +++ b/dm/syncer/data_validator.go @@ -858,27 +858,17 @@ func (v *DataValidator) genValidateTableInfo(sourceTable *filter.Table, columnCo // tableInfoForVisibleColumnCount returns the source TableInfo layout that matches // a binlog row image with columnCount visible columns. func tableInfoForVisibleColumnCount(tableInfo *model.TableInfo, columnCount int) (*model.TableInfo, bool) { - visibleCount := 0 - stripColumnCount := len(tableInfo.Columns) - for i, col := range tableInfo.Columns { - if col.Hidden { - continue - } - visibleCount++ - if visibleCount > columnCount { - stripColumnCount = i - break - } - } - if visibleCount < columnCount { + layout := sqlmodel.NewRowImageLayoutFromColumns(tableInfo.Columns, nil) + sourceColumnCount, ok := layout.SourceColumnCountForVisibleColumnCount(columnCount) + if !ok { return nil, false } - if visibleCount == columnCount { + if sourceColumnCount == len(tableInfo.Columns) { return tableInfo, true } clone := tableInfo.Clone() - clone.Columns = clone.Columns[:stripColumnCount] + clone.Columns = clone.Columns[:sourceColumnCount] return clone, true } diff --git a/dm/syncer/expr_filter_group.go b/dm/syncer/expr_filter_group.go index 9b46a926d7..88b9ee0d49 100644 --- a/dm/syncer/expr_filter_group.go +++ b/dm/syncer/expr_filter_group.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" + "github.com/pingcap/tiflow/pkg/sqlmodel" "go.uber.org/zap" ) @@ -193,13 +194,10 @@ func rowForExpressionFilter( values := row if len(row) != len(upstreamCols) { - visibleCols := visibleColumns(upstreamCols) - if len(row) != len(visibleCols) { - return chunk.Row{}, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(visibleCols), len(row)) - } - fullValues := make([]interface{}, len(upstreamCols)) - for i, col := range visibleCols { - fullValues[col.Offset] = row[i] + layout := sqlmodel.NewRowImageLayoutFromColumns(upstreamCols, nil) + fullValues, ok := layout.FullValues(row) + if !ok { + return chunk.Row{}, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(layout.VisibleColumnCount(), len(row)) } values = fullValues } diff --git a/dm/syncer/validate_worker.go b/dm/syncer/validate_worker.go index 4460d3c3df..1924051853 100644 --- a/dm/syncer/validate_worker.go +++ b/dm/syncer/validate_worker.go @@ -320,7 +320,7 @@ func (vw *validateWorker) batchValidateRowChanges(rows []*rowValidationJob, dele firstRow := rows[0].row cond := &Cond{ TargetTbl: firstRow.TargetTableID(), - Columns: visibleColumns(firstRow.SourceTableInfo().Columns), + Columns: sqlmodel.VisibleColumns(firstRow.SourceTableInfo().Columns), PK: firstRow.UniqueNotNullIdx(), PkValues: pkValues, } @@ -453,18 +453,6 @@ func (vw *validateWorker) resetErrorRows() { vw.errorRows = make([]*validateFailedRow, 0) } -// TODO(joechenrh): Share visible-row/source-column offset mapping with sqlmodel -// instead of maintaining validator-specific hidden column helpers. -func visibleColumns(columns []*model.ColumnInfo) []*model.ColumnInfo { - ret := make([]*model.ColumnInfo, 0, len(columns)) - for _, col := range columns { - if !col.Hidden { - ret = append(ret, col) - } - } - return ret -} - func (vw *validateWorker) newJobAdded(job *rowValidationJob) { tp := job.Tp vw.pendingRowCounts[tp]++ diff --git a/pkg/sqlmodel/row_change.go b/pkg/sqlmodel/row_change.go index 9f0d61e14c..e1cff05437 100644 --- a/pkg/sqlmodel/row_change.go +++ b/pkg/sqlmodel/row_change.go @@ -184,13 +184,7 @@ func (r *RowChange) TargetTableID() string { // TiDB TableInfo contains some internal columns like expression index, they // are not included in this count. func (r *RowChange) ColumnCount() int { - c := 0 - for _, col := range r.sourceTableInfo.Columns { - if !col.Hidden { - c++ - } - } - return c + return VisibleColumnCount(r.sourceTableInfo.Columns) } // SourceTableInfo returns the TableInfo of source table. @@ -242,15 +236,13 @@ func (r *RowChange) lazyInitWhereHandle() { // dmlRowMapping returns DML column/value mapping without initializing the full // where handle. CDC insert/update paths can use this without paying the WHERE // handle build cost, while DM paths reuse the injected cached handle. -func (r *RowChange) dmlRowMapping() (rowValueMapper, []*timodel.ColumnInfo) { +func (r *RowChange) dmlRowMapping() (RowImageLayout, []*timodel.ColumnInfo) { if r.whereHandle != nil { - return r.whereHandle.rowMapper, r.whereHandle.writableColumns + return r.whereHandle.rowMapper, r.whereHandle.rowMapper.WritableColumns() } - // Keep this fallback mapping consistent with GetWhereHandle until the row - // image mapping is centralized. - rowMapper := newRowValueMapper(r.sourceTableInfo.Columns) - return rowMapper, writableSourceColumns(rowMapper.visibleColumns, r.targetTableInfo.Columns) + layout := NewRowImageLayout(r.sourceTableInfo, r.targetTableInfo) + return layout, layout.WritableColumns() } // whereColumnsAndValues returns columns and values to identify the row, to form diff --git a/pkg/sqlmodel/row_image_layout.go b/pkg/sqlmodel/row_image_layout.go new file mode 100644 index 0000000000..62efe406c6 --- /dev/null +++ b/pkg/sqlmodel/row_image_layout.go @@ -0,0 +1,173 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlmodel + +import "github.com/pingcap/tidb/pkg/meta/model" + +// RowImageLayout describes how a binlog row image maps to source table columns. +type RowImageLayout struct { + columns []*model.ColumnInfo + visibleColumns []*model.ColumnInfo + visibleOffsetByColumnOffset []int + writableColumns []*model.ColumnInfo +} + +// NewRowImageLayout creates a RowImageLayout from source and target TableInfo. +func NewRowImageLayout(source, target *model.TableInfo) RowImageLayout { + return NewRowImageLayoutFromColumns(source.Columns, target.Columns) +} + +// NewRowImageLayoutFromColumns creates a RowImageLayout from source and target columns. +func NewRowImageLayoutFromColumns(sourceColumns, targetColumns []*model.ColumnInfo) RowImageLayout { + visibleColumns := VisibleColumns(sourceColumns) + visibleOffsetByColumnOffset := make([]int, len(sourceColumns)) + for i := range visibleOffsetByColumnOffset { + visibleOffsetByColumnOffset[i] = -1 + } + for i, column := range visibleColumns { + visibleOffsetByColumnOffset[column.Offset] = i + } + + layout := RowImageLayout{ + columns: sourceColumns, + visibleColumns: visibleColumns, + visibleOffsetByColumnOffset: visibleOffsetByColumnOffset, + } + if targetColumns != nil { + layout.writableColumns = writableSourceColumns(visibleColumns, targetColumns) + } + return layout +} + +// VisibleColumns returns the visible columns from the given table columns. +func VisibleColumns(columns []*model.ColumnInfo) []*model.ColumnInfo { + ret := make([]*model.ColumnInfo, 0, len(columns)) + for _, col := range columns { + if !col.Hidden { + ret = append(ret, col) + } + } + return ret +} + +// VisibleColumnCount returns the number of visible columns. +func VisibleColumnCount(columns []*model.ColumnInfo) int { + count := 0 + for _, col := range columns { + if !col.Hidden { + count++ + } + } + return count +} + +// VisibleColumns returns the visible source columns in the row image. +func (l RowImageLayout) VisibleColumns() []*model.ColumnInfo { + return l.visibleColumns +} + +// VisibleColumnCount returns the visible source column count in the row image. +func (l RowImageLayout) VisibleColumnCount() int { + return len(l.visibleColumns) +} + +// WritableColumns returns visible source columns that can be written to the target table. +func (l RowImageLayout) WritableColumns() []*model.ColumnInfo { + return l.writableColumns +} + +// FullValues expands a visible-only row image to the source table column layout. +func (l RowImageLayout) FullValues(row []any) ([]any, bool) { + if len(row) == len(l.columns) { + return row, true + } + if len(row) != len(l.visibleColumns) { + return nil, false + } + + fullValues := make([]any, len(l.columns)) + for i, col := range l.visibleColumns { + fullValues[col.Offset] = row[i] + } + return fullValues, true +} + +// SourceColumnCountForVisibleColumnCount returns the source-column prefix width +// that contains exactly columnCount visible columns. +func (l RowImageLayout) SourceColumnCountForVisibleColumnCount(columnCount int) (int, bool) { + visibleCount := 0 + sourceColumnCount := len(l.columns) + for i, col := range l.columns { + if col.Hidden { + continue + } + visibleCount++ + if visibleCount > columnCount { + sourceColumnCount = i + break + } + } + if visibleCount < columnCount { + return 0, false + } + return sourceColumnCount, true +} + +func (l RowImageLayout) isFullValues(values []any) bool { + return len(values) == len(l.columns) +} + +func (l RowImageLayout) columnsForValues(values []any) []*model.ColumnInfo { + if l.isFullValues(values) { + return l.columns + } + return l.visibleColumns +} + +func (l RowImageLayout) columnsAndValuesByIndex( + indexInfo *model.IndexInfo, + values []any, +) ([]*model.ColumnInfo, []any) { + cols := make([]*model.ColumnInfo, 0, len(indexInfo.Columns)) + vals := make([]any, 0, len(indexInfo.Columns)) + for _, column := range indexInfo.Columns { + offset := l.valueOffset(column.Offset, values) + cols = append(cols, l.columns[column.Offset]) + vals = append(vals, values[offset]) + } + return cols, vals +} + +func (l RowImageLayout) valuesByIndex(indexInfo *model.IndexInfo, values []any) []any { + ret := make([]any, 0, len(indexInfo.Columns)) + if values == nil { + return ret + } + for _, column := range indexInfo.Columns { + offset := l.valueOffset(column.Offset, values) + ret = append(ret, values[offset]) + } + return ret +} + +func (l RowImageLayout) valueOffset(columnOffset int, values []any) int { + if l.isFullValues(values) { + return columnOffset + } + return l.visibleOffsetByColumnOffset[columnOffset] +} + +func (l RowImageLayout) valueByOffset(columnOffset int, values []any) any { + return values[l.valueOffset(columnOffset, values)] +} diff --git a/pkg/sqlmodel/row_image_layout_test.go b/pkg/sqlmodel/row_image_layout_test.go new file mode 100644 index 0000000000..3e6692149d --- /dev/null +++ b/pkg/sqlmodel/row_image_layout_test.go @@ -0,0 +1,60 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlmodel + +import ( + "testing" + + timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tiflow/pkg/util/testutil" + "github.com/stretchr/testify/require" +) + +func TestRowImageLayoutWithInterleavedHiddenColumn(t *testing.T) { + t.Parallel() + + tableInfo := mockTableInfo(t, "CREATE TABLE t ("+ + "id INT PRIMARY KEY, a INT, b INT, UNIQUE KEY idx_expr ((a + b)))") + hidden := testutil.HiddenColumnName(t, tableInfo) + testutil.ReorderColumnsByName(t, tableInfo, "id", "a", hidden, "b") + + layout := NewRowImageLayout(tableInfo, tableInfo) + require.Equal(t, 3, layout.VisibleColumnCount()) + require.Equal(t, []string{"id", "a", "b"}, columnNames(layout.VisibleColumns())) + require.Equal(t, 2, layout.valueOffset(3, []any{1, 2, 3})) + require.Equal(t, 3, layout.valueByOffset(3, []any{1, 2, 3})) + + fullValues, ok := layout.FullValues([]any{1, 2, 3}) + require.True(t, ok) + require.Equal(t, []any{1, 2, nil, 3}, fullValues) + + sourceColumnCount, ok := layout.SourceColumnCountForVisibleColumnCount(2) + require.True(t, ok) + require.Equal(t, 3, sourceColumnCount) + + sourceColumnCount, ok = layout.SourceColumnCountForVisibleColumnCount(3) + require.True(t, ok) + require.Equal(t, len(tableInfo.Columns), sourceColumnCount) + + _, ok = layout.SourceColumnCountForVisibleColumnCount(4) + require.False(t, ok) +} + +func columnNames(columns []*timodel.ColumnInfo) []string { + names := make([]string, 0, len(columns)) + for _, column := range columns { + names = append(names, column.Name.L) + } + return names +} diff --git a/pkg/sqlmodel/where_handle.go b/pkg/sqlmodel/where_handle.go index d135a2b81d..05f61963aa 100644 --- a/pkg/sqlmodel/where_handle.go +++ b/pkg/sqlmodel/where_handle.go @@ -41,85 +41,7 @@ type WhereHandle struct { // causalityIdxs is a superset of UniqueIdxs and also includes expression indexes. causalityIdxs []*model.IndexInfo hiddenGeneratedColumnExprCache *generatedColumnExprCache - rowMapper rowValueMapper - writableColumns []*model.ColumnInfo -} - -// TODO(joechenrh): Centralize visible-row to source-column-offset mapping into -// an explicit internal row-image representation, then remove the duplicated -// fallback mapping in RowChange.dmlRowMapping. -type rowValueMapper struct { - columns []*model.ColumnInfo - visibleColumns []*model.ColumnInfo - visibleOffsetByColumnOffset []int -} - -func newRowValueMapper(columns []*model.ColumnInfo) rowValueMapper { - visibleColumns := make([]*model.ColumnInfo, 0, len(columns)) - visibleOffsetByColumnOffset := make([]int, len(columns)) - for i := range visibleOffsetByColumnOffset { - visibleOffsetByColumnOffset[i] = -1 - } - for _, column := range columns { - if column.Hidden { - continue - } - visibleOffsetByColumnOffset[column.Offset] = len(visibleColumns) - visibleColumns = append(visibleColumns, column) - } - return rowValueMapper{ - columns: columns, - visibleColumns: visibleColumns, - visibleOffsetByColumnOffset: visibleOffsetByColumnOffset, - } -} - -func (m rowValueMapper) isFullValues(values []any) bool { - return len(values) == len(m.visibleOffsetByColumnOffset) -} - -func (m rowValueMapper) columnsForValues(values []any) []*model.ColumnInfo { - if m.isFullValues(values) { - return m.columns - } - return m.visibleColumns -} - -func (m rowValueMapper) columnsAndValuesByIndex( - indexInfo *model.IndexInfo, - values []any, -) ([]*model.ColumnInfo, []any) { - cols := make([]*model.ColumnInfo, 0, len(indexInfo.Columns)) - vals := make([]any, 0, len(indexInfo.Columns)) - for _, column := range indexInfo.Columns { - offset := m.valueOffset(column.Offset, values) - cols = append(cols, m.columns[column.Offset]) - vals = append(vals, values[offset]) - } - return cols, vals -} - -func (m rowValueMapper) valuesByIndex(indexInfo *model.IndexInfo, values []any) []any { - ret := make([]any, 0, len(indexInfo.Columns)) - if values == nil { - return ret - } - for _, column := range indexInfo.Columns { - offset := m.valueOffset(column.Offset, values) - ret = append(ret, values[offset]) - } - return ret -} - -func (m rowValueMapper) valueOffset(columnOffset int, values []any) int { - if m.isFullValues(values) { - return columnOffset - } - return m.visibleOffsetByColumnOffset[columnOffset] -} - -func (m rowValueMapper) valueByOffset(columnOffset int, values []any) any { - return values[m.valueOffset(columnOffset, values)] + rowMapper RowImageLayout } type generatedColumnExprCache struct { @@ -199,9 +121,8 @@ func generatedColumnExprContext(tiSessionCtx sessionctx.Context) *exprstatic.Exp // columns and state. Other component can cache the result. func GetWhereHandle(source, target *model.TableInfo) *WhereHandle { ret := WhereHandle{ - rowMapper: newRowValueMapper(source.Columns), + rowMapper: NewRowImageLayout(source, target), } - ret.writableColumns = writableSourceColumns(ret.rowMapper.visibleColumns, target.Columns) indices := make([]*model.IndexInfo, 0, len(target.Indices)+1) indices = append(indices, target.Indices...) if idx := getPKIsHandleIdx(target); target.PKIsHandle && idx != nil {