Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,27 +858,17 @@ func (v *DataValidator) genValidateTableInfo(sourceTable *filter.Table, columnCo
// tableInfoForVisibleColumnCount returns the source TableInfo layout that matches
// a binlog row image with columnCount visible columns.
func tableInfoForVisibleColumnCount(tableInfo *model.TableInfo, columnCount int) (*model.TableInfo, bool) {
visibleCount := 0
stripColumnCount := len(tableInfo.Columns)
for i, col := range tableInfo.Columns {
if col.Hidden {
continue
}
visibleCount++
if visibleCount > columnCount {
stripColumnCount = i
break
}
}
if visibleCount < columnCount {
layout := sqlmodel.NewRowImageLayoutFromColumns(tableInfo.Columns, nil)
sourceColumnCount, ok := layout.SourceColumnCountForVisibleColumnCount(columnCount)
if !ok {
return nil, false
}
if visibleCount == columnCount {
if sourceColumnCount == len(tableInfo.Columns) {
return tableInfo, true
}

clone := tableInfo.Clone()
clone.Columns = clone.Columns[:stripColumnCount]
clone.Columns = clone.Columns[:sourceColumnCount]
return clone, true
}
Comment on lines 860 to 873

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function tableInfoForVisibleColumnCount does not check if the input tableInfo is nil. If a nil pointer is passed, calling tableInfo.Columns will cause a nil pointer dereference panic. Adding a defensive nil check at the beginning of the function will make it more robust.

func tableInfoForVisibleColumnCount(tableInfo *model.TableInfo, columnCount int) (*model.TableInfo, bool) {\n\tif tableInfo == nil {\n\t	return nil, false\n\t}\n\tlayout := sqlmodel.NewRowImageLayoutFromColumns(tableInfo.Columns, nil)\n\tsourceColumnCount, ok := layout.SourceColumnCountForVisibleColumnCount(columnCount)\n\tif !ok {\n\t	return nil, false\n\t}\n\tif sourceColumnCount == len(tableInfo.Columns) {\n\t	return tableInfo, true\n\t}\n\n\tclone := tableInfo.Clone()\n\tclone.Columns = clone.Columns[:sourceColumnCount]\n\treturn clone, true\n}


Expand Down
12 changes: 5 additions & 7 deletions dm/syncer/expr_filter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/pkg/sqlmodel"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -193,13 +194,10 @@ func rowForExpressionFilter(

values := row
if len(row) != len(upstreamCols) {
visibleCols := visibleColumns(upstreamCols)
if len(row) != len(visibleCols) {
return chunk.Row{}, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(visibleCols), len(row))
}
fullValues := make([]interface{}, len(upstreamCols))
for i, col := range visibleCols {
fullValues[col.Offset] = row[i]
layout := sqlmodel.NewRowImageLayoutFromColumns(upstreamCols, nil)
fullValues, ok := layout.FullValues(row)
if !ok {
return chunk.Row{}, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(layout.VisibleColumnCount(), len(row))
}
values = fullValues
}
Expand Down
14 changes: 1 addition & 13 deletions dm/syncer/validate_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (vw *validateWorker) batchValidateRowChanges(rows []*rowValidationJob, dele
firstRow := rows[0].row
cond := &Cond{
TargetTbl: firstRow.TargetTableID(),
Columns: visibleColumns(firstRow.SourceTableInfo().Columns),
Columns: sqlmodel.VisibleColumns(firstRow.SourceTableInfo().Columns),
PK: firstRow.UniqueNotNullIdx(),
PkValues: pkValues,
}
Expand Down Expand Up @@ -453,18 +453,6 @@ func (vw *validateWorker) resetErrorRows() {
vw.errorRows = make([]*validateFailedRow, 0)
}

// TODO(joechenrh): Share visible-row/source-column offset mapping with sqlmodel
// instead of maintaining validator-specific hidden column helpers.
func visibleColumns(columns []*model.ColumnInfo) []*model.ColumnInfo {
ret := make([]*model.ColumnInfo, 0, len(columns))
for _, col := range columns {
if !col.Hidden {
ret = append(ret, col)
}
}
return ret
}

func (vw *validateWorker) newJobAdded(job *rowValidationJob) {
tp := job.Tp
vw.pendingRowCounts[tp]++
Expand Down
18 changes: 5 additions & 13 deletions pkg/sqlmodel/row_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,7 @@ func (r *RowChange) TargetTableID() string {
// TiDB TableInfo contains some internal columns like expression index, they
// are not included in this count.
func (r *RowChange) ColumnCount() int {
c := 0
for _, col := range r.sourceTableInfo.Columns {
if !col.Hidden {
c++
}
}
return c
return VisibleColumnCount(r.sourceTableInfo.Columns)
}

// SourceTableInfo returns the TableInfo of source table.
Expand Down Expand Up @@ -242,15 +236,13 @@ func (r *RowChange) lazyInitWhereHandle() {
// dmlRowMapping returns DML column/value mapping without initializing the full
// where handle. CDC insert/update paths can use this without paying the WHERE
// handle build cost, while DM paths reuse the injected cached handle.
func (r *RowChange) dmlRowMapping() (rowValueMapper, []*timodel.ColumnInfo) {
func (r *RowChange) dmlRowMapping() (RowImageLayout, []*timodel.ColumnInfo) {
if r.whereHandle != nil {
return r.whereHandle.rowMapper, r.whereHandle.writableColumns
return r.whereHandle.rowMapper, r.whereHandle.rowMapper.WritableColumns()
}

// Keep this fallback mapping consistent with GetWhereHandle until the row
// image mapping is centralized.
rowMapper := newRowValueMapper(r.sourceTableInfo.Columns)
return rowMapper, writableSourceColumns(rowMapper.visibleColumns, r.targetTableInfo.Columns)
layout := NewRowImageLayout(r.sourceTableInfo, r.targetTableInfo)
return layout, layout.WritableColumns()
}

// whereColumnsAndValues returns columns and values to identify the row, to form
Expand Down
173 changes: 173 additions & 0 deletions pkg/sqlmodel/row_image_layout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sqlmodel

import "github.com/pingcap/tidb/pkg/meta/model"

// RowImageLayout describes how a binlog row image maps to source table columns.
type RowImageLayout struct {
columns []*model.ColumnInfo
visibleColumns []*model.ColumnInfo
visibleOffsetByColumnOffset []int
writableColumns []*model.ColumnInfo
}

// NewRowImageLayout creates a RowImageLayout from source and target TableInfo.
func NewRowImageLayout(source, target *model.TableInfo) RowImageLayout {
return NewRowImageLayoutFromColumns(source.Columns, target.Columns)
}

// NewRowImageLayoutFromColumns creates a RowImageLayout from source and target columns.
func NewRowImageLayoutFromColumns(sourceColumns, targetColumns []*model.ColumnInfo) RowImageLayout {
visibleColumns := VisibleColumns(sourceColumns)
visibleOffsetByColumnOffset := make([]int, len(sourceColumns))
for i := range visibleOffsetByColumnOffset {
visibleOffsetByColumnOffset[i] = -1
}
for i, column := range visibleColumns {
visibleOffsetByColumnOffset[column.Offset] = i
}

layout := RowImageLayout{
columns: sourceColumns,
visibleColumns: visibleColumns,
visibleOffsetByColumnOffset: visibleOffsetByColumnOffset,
}
if targetColumns != nil {
layout.writableColumns = writableSourceColumns(visibleColumns, targetColumns)
}
return layout
}
Comment on lines +32 to +51

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In NewRowImageLayoutFromColumns, visibleOffsetByColumnOffset is sized to len(sourceColumns). However, if sourceColumns is a sliced columns list (e.g., clone.Columns[:sourceColumnCount]), len(sourceColumns) will be smaller than the original column count, but the individual column.Offset values will still retain their original values. This can cause visibleOffsetByColumnOffset[column.Offset] = i to panic with an out-of-bounds index. Sizing the slice based on the maximum offset found in sourceColumns avoids this panic.

func NewRowImageLayoutFromColumns(sourceColumns, targetColumns []*model.ColumnInfo) RowImageLayout {\n\tvisibleColumns := VisibleColumns(sourceColumns)\n\tmaxOffset := -1\n\tfor _, column := range sourceColumns {\n\t\tif column.Offset > maxOffset {\n\t\t	maxOffset = column.Offset\n\t\t}\n\t}\n\tvisibleOffsetByColumnOffset := make([]int, maxOffset+1)\n\tfor i := range visibleOffsetByColumnOffset {\n\t\tvisibleOffsetByColumnOffset[i] = -1\n\t}\n\tfor i, column := range visibleColumns {\n\t\tif column.Offset >= 0 && column.Offset <= maxOffset {\n\t\t	visibleOffsetByColumnOffset[column.Offset] = i\n\t\t}\n\t}\n\n\tlayout := RowImageLayout{\n\t\tcolumns:                     sourceColumns,\n\t\tvisibleColumns:              visibleColumns,\n\t\tvisibleOffsetByColumnOffset: visibleOffsetByColumnOffset,\n\t}\n\tif targetColumns != nil {\n\t\tlayout.writableColumns = writableSourceColumns(visibleColumns, targetColumns)\n\t}\n\treturn layout\n}


// VisibleColumns returns the visible columns from the given table columns.
func VisibleColumns(columns []*model.ColumnInfo) []*model.ColumnInfo {
ret := make([]*model.ColumnInfo, 0, len(columns))
for _, col := range columns {
if !col.Hidden {
ret = append(ret, col)
}
}
return ret
}

// VisibleColumnCount returns the number of visible columns.
func VisibleColumnCount(columns []*model.ColumnInfo) int {
count := 0
for _, col := range columns {
if !col.Hidden {
count++
}
}
return count
}

// VisibleColumns returns the visible source columns in the row image.
func (l RowImageLayout) VisibleColumns() []*model.ColumnInfo {
return l.visibleColumns
}

// VisibleColumnCount returns the visible source column count in the row image.
func (l RowImageLayout) VisibleColumnCount() int {
return len(l.visibleColumns)
}

// WritableColumns returns visible source columns that can be written to the target table.
func (l RowImageLayout) WritableColumns() []*model.ColumnInfo {
return l.writableColumns
}

// FullValues expands a visible-only row image to the source table column layout.
func (l RowImageLayout) FullValues(row []any) ([]any, bool) {
if len(row) == len(l.columns) {
return row, true
}
if len(row) != len(l.visibleColumns) {
return nil, false
}

fullValues := make([]any, len(l.columns))
for i, col := range l.visibleColumns {
fullValues[col.Offset] = row[i]
}
return fullValues, true
}
Comment on lines +91 to +104

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In FullValues, fullValues is sized to len(l.columns). If col.Offset is greater than or equal to len(l.columns) (which can happen if the columns slice is sliced or has non-contiguous offsets), assigning fullValues[col.Offset] = row[i] will panic with an out-of-bounds index. Adding a defensive bounds check prevents this panic.

func (l RowImageLayout) FullValues(row []any) ([]any, bool) {\n\tif len(row) == len(l.columns) {\n\t	return row, true\n\t}\n\tif len(row) != len(l.visibleColumns) {\n\t	return nil, false\n\t}\n\n\tfullValues := make([]any, len(l.columns))\n\tfor i, col := range l.visibleColumns {\n\t\tif col.Offset >= 0 && col.Offset < len(fullValues) {\n\t\t	fullValues[col.Offset] = row[i]\n\t\t}\n\t}\n\treturn fullValues, true\n}


// SourceColumnCountForVisibleColumnCount returns the source-column prefix width
// that contains exactly columnCount visible columns.
func (l RowImageLayout) SourceColumnCountForVisibleColumnCount(columnCount int) (int, bool) {
visibleCount := 0
sourceColumnCount := len(l.columns)
for i, col := range l.columns {
if col.Hidden {
continue
}
visibleCount++
if visibleCount > columnCount {
sourceColumnCount = i
break
}
}
if visibleCount < columnCount {
return 0, false
}
return sourceColumnCount, true
}

func (l RowImageLayout) isFullValues(values []any) bool {
return len(values) == len(l.columns)
}

func (l RowImageLayout) columnsForValues(values []any) []*model.ColumnInfo {
if l.isFullValues(values) {
return l.columns
}
return l.visibleColumns
}

func (l RowImageLayout) columnsAndValuesByIndex(
indexInfo *model.IndexInfo,
values []any,
) ([]*model.ColumnInfo, []any) {
cols := make([]*model.ColumnInfo, 0, len(indexInfo.Columns))
vals := make([]any, 0, len(indexInfo.Columns))
for _, column := range indexInfo.Columns {
offset := l.valueOffset(column.Offset, values)
cols = append(cols, l.columns[column.Offset])
vals = append(vals, values[offset])
}
return cols, vals
}

func (l RowImageLayout) valuesByIndex(indexInfo *model.IndexInfo, values []any) []any {
ret := make([]any, 0, len(indexInfo.Columns))
if values == nil {
return ret
}
for _, column := range indexInfo.Columns {
offset := l.valueOffset(column.Offset, values)
ret = append(ret, values[offset])
}
return ret
}

func (l RowImageLayout) valueOffset(columnOffset int, values []any) int {
if l.isFullValues(values) {
return columnOffset
}
return l.visibleOffsetByColumnOffset[columnOffset]
}
Comment on lines +164 to +169

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In valueOffset, accessing l.visibleOffsetByColumnOffset[columnOffset] directly can panic if columnOffset is out of bounds (e.g., if it is negative or greater than or equal to the length of the slice). Adding a defensive bounds check makes the method safer.

func (l RowImageLayout) valueOffset(columnOffset int, values []any) int {\n\tif l.isFullValues(values) {\n\t	return columnOffset\n\t}\n\tif columnOffset < 0 || columnOffset >= len(l.visibleOffsetByColumnOffset) {\n\t	return -1\n\t}\n\treturn l.visibleOffsetByColumnOffset[columnOffset]\n}


func (l RowImageLayout) valueByOffset(columnOffset int, values []any) any {
return values[l.valueOffset(columnOffset, values)]
}
60 changes: 60 additions & 0 deletions pkg/sqlmodel/row_image_layout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sqlmodel

import (
"testing"

timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tiflow/pkg/util/testutil"
"github.com/stretchr/testify/require"
)

func TestRowImageLayoutWithInterleavedHiddenColumn(t *testing.T) {
t.Parallel()

tableInfo := mockTableInfo(t, "CREATE TABLE t ("+
"id INT PRIMARY KEY, a INT, b INT, UNIQUE KEY idx_expr ((a + b)))")
hidden := testutil.HiddenColumnName(t, tableInfo)
testutil.ReorderColumnsByName(t, tableInfo, "id", "a", hidden, "b")

layout := NewRowImageLayout(tableInfo, tableInfo)
require.Equal(t, 3, layout.VisibleColumnCount())
require.Equal(t, []string{"id", "a", "b"}, columnNames(layout.VisibleColumns()))
require.Equal(t, 2, layout.valueOffset(3, []any{1, 2, 3}))
require.Equal(t, 3, layout.valueByOffset(3, []any{1, 2, 3}))

fullValues, ok := layout.FullValues([]any{1, 2, 3})
require.True(t, ok)
require.Equal(t, []any{1, 2, nil, 3}, fullValues)

sourceColumnCount, ok := layout.SourceColumnCountForVisibleColumnCount(2)
require.True(t, ok)
require.Equal(t, 3, sourceColumnCount)

sourceColumnCount, ok = layout.SourceColumnCountForVisibleColumnCount(3)
require.True(t, ok)
require.Equal(t, len(tableInfo.Columns), sourceColumnCount)

_, ok = layout.SourceColumnCountForVisibleColumnCount(4)
require.False(t, ok)
}

func columnNames(columns []*timodel.ColumnInfo) []string {
names := make([]string, 0, len(columns))
for _, column := range columns {
names = append(names, column.Name.L)
}
return names
}
Loading
Loading