Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 247 additions & 25 deletions src/mysql-util/src/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,34 +325,71 @@ fn pack_val_as_datum(
}
}
Some(MySqlColumnMeta::Timestamp(precision)) => {
// Some MySQL dates are invalid in chrono/NaiveDate (e.g. 0000-00-00), so
// we need to handle them directly as strings
if let Value::Date(y, m, d, h, mm, s, ms) = value {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking that I understand:

We used to only handle Value::Date from the MySQL binlog, and now we handle Value::Int and Value::Bytes also.

if *precision > 0 {
let precision: usize = (*precision).try_into()?;
packer.push(Datum::String(&format!(
"{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:0precision$}",
y,
m,
d,
h,
mm,
s,
ms,
precision = precision
)));
} else {
packer.push(Datum::String(&format!(
"{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
y, m, d, h, mm, s
)));
// TIMESTAMP arrives as three mysql_common::Value variants
// (refs: mysql_common v0.35.5):
// Value::Date — binary query response + binlog DATETIME[2]
// (value/mod.rs:443-445, binlog/value.rs:109-161)
// Value::Int — legacy binlog MYSQL_TYPE_TIMESTAMP, pre-5.6,
// 4-byte unix epoch (binlog/value.rs:87-90)
// Value::Bytes — binlog MYSQL_TYPE_TIMESTAMP2, 5.6+,
// "<sec>" or "<sec>.<usec>" (binlog/value.rs:145-154)
let str_timestamp = match value {
Value::Date(y, m, d, h, mm, s, ms) => {
if *precision > 0 {
let precision: usize = (*precision).try_into()?;
format!(
"{:04}-{:02}-{:02} {:02}:{:02}:{:02}.{:0precision$}",
y,
m,
d,
h,
mm,
s,
ms,
precision = precision
)
} else {
format!(
"{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
y, m, d, h, mm, s
)
}
}
} else {
Err(anyhow::anyhow!(
// Pre-5.6 unix epoch, no fractional seconds.
Value::Int(val) => chrono::DateTime::from_timestamp(val, 0)
.ok_or_else(|| {
anyhow::anyhow!("received invalid timestamp value: {}", val)
})?
.naive_utc()
.format("%Y-%m-%d %H:%M:%S")
.to_string(),
// 5.6+ epoch string; parse + reformat so all variants emit the
// same canonical YYYY-MM-DD HH:MM:SS[.ffff] text.
Value::Bytes(data) => {
let s = std::str::from_utf8(&data).map_err(|_| {
anyhow::anyhow!("received invalid timestamp value: {:?}", data)
})?;
let dt = if s.contains('.') {
chrono::NaiveDateTime::parse_from_str(s, "%s%.6f")
} else {
chrono::NaiveDateTime::parse_from_str(s, "%s")
}
.map_err(|_| {
anyhow::anyhow!("received invalid timestamp value: {:?}", s)
})?;
if *precision > 0 {
let p: usize = (*precision).try_into()?;
dt.format(&format!("%Y-%m-%d %H:%M:%S.%{p}f")).to_string()
} else {
dt.format("%Y-%m-%d %H:%M:%S").to_string()
}
}
_ => Err(anyhow::anyhow!(
"received unexpected value for timestamp type: {:?}",
value
))?;
}
))?,
};
packer.push(Datum::String(&str_timestamp));
}
Some(MySqlColumnMeta::Bit(_)) => unreachable!("parsed as a u64"),
None => {
Expand Down Expand Up @@ -481,3 +518,188 @@ fn check_char_length(
}
Ok(())
}

#[cfg(test)]
mod tests {
//! Unit tests for the TEXT-COLUMNS decoding of MySQL TIMESTAMP values.
//!
//! These cover the regression where a MySQL TIMESTAMP column declared as
//! a TEXT COLUMN fails to decode when the wire value arrives as
//! `Value::Bytes("<unix-epoch>")` or `Value::Int(<unix-epoch>)` instead
//! of `Value::Date(..)`. The integration test in
//! `test/mysql-cdc/text-columns-timestamp.td` exercises this through
//! a real MySQL container but is non-deterministic: which `Value`
//! variant `mysql-async` produces depends on connection-state timing.
//! These unit tests pin each variant down directly.
//!
//! The wire-variant matrix exercised below is derived from mysql_common
//! v0.35.5:
//!
//! * Value::Int(epoch) — binlog MYSQL_TYPE_TIMESTAMP (pre-5.6):
//! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L87-L90
//! * Value::Bytes("<sec>"/"<sec>.<usec>") — binlog MYSQL_TYPE_TIMESTAMP2 (5.6+):
//! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L145-L154
//! * Value::Date(...) — binary query response + binlog DATETIME[2]:
//! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/value/mod.rs#L443-L445
//! https://github.com/blackbeam/rust_mysql_common/blob/v0.35.5/src/binlog/value.rs#L109-L161
//!
//! MySQL semantics referenced by the zero-date and fractional-precision
//! cases:
//!
//! * Zero-date allowed when sql_mode disables NO_ZERO_DATE:
//! https://dev.mysql.com/doc/refman/8.0/en/sql-mode.html#sqlmode_no_zero_date
//! * TIMESTAMP(p) / DATETIME(p) fractional seconds:
//! https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
use super::*;
use mz_repr::{SqlColumnType, SqlScalarType};

fn timestamp_text_col(precision: u32) -> MySqlColumnDesc {
MySqlColumnDesc {
name: "created_at".to_string(),
column_type: Some(SqlColumnType {
scalar_type: SqlScalarType::String,
nullable: true,
}),
meta: Some(MySqlColumnMeta::Timestamp(precision)),
}
}

fn pack_one(value: Value, col: &MySqlColumnDesc) -> Result<String, anyhow::Error> {
let mut row = Row::default();
pack_val_as_datum(value, col, &mut row.packer())?;
Ok(row.unpack_first().unwrap_str().to_string())
}

#[mz_ore::test]
fn timestamp_value_date_no_precision() {
let col = timestamp_text_col(0);
let s = pack_one(Value::Date(2024, 4, 3, 10, 15, 13, 0), &col).unwrap();
assert_eq!(s, "2024-04-03 10:15:13");
}

#[mz_ore::test]
fn timestamp_value_date_with_precision() {
let col = timestamp_text_col(6);
let s = pack_one(Value::Date(2024, 4, 3, 10, 15, 13, 123456), &col).unwrap();
assert_eq!(s, "2024-04-03 10:15:13.123456");
}

#[mz_ore::test]
fn timestamp_value_date_zero_date() {
// The whole reason TEXT COLUMNS exists for TIMESTAMP: a
// zero-date arriving as Value::Date(0,..) should round-trip as
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these round-trip tests?

// the literal MySQL zero-timestamp string.
let col = timestamp_text_col(0);
let s = pack_one(Value::Date(0, 0, 0, 0, 0, 0, 0), &col).unwrap();
assert_eq!(s, "0000-00-00 00:00:00");
}

/// Regression: Value::Int (pre-5.6 legacy temporal format, unix
/// epoch seconds) was previously rejected with
/// `received unexpected value for timestamp type: Int(..)`.
#[mz_ore::test]
fn timestamp_value_int_epoch() {
let col = timestamp_text_col(0);
// 1743661234 == 2025-04-03 06:20:34 UTC
let s = pack_one(Value::Int(1_743_661_234), &col).unwrap();
assert_eq!(s, "2025-04-03 06:20:34");
}

#[mz_ore::test]
fn timestamp_value_int_epoch_zero() {
// Unix epoch 0; legacy format has no fractional seconds.
let col = timestamp_text_col(0);
let s = pack_one(Value::Int(0), &col).unwrap();
assert_eq!(s, "1970-01-01 00:00:00");
}

/// Out-of-range epochs must error rather than silently producing
/// a zero-timestamp — they aren't the MySQL zero-date marker, just
/// garbage chrono can't represent.
#[mz_ore::test]
fn timestamp_value_int_out_of_range_errors() {
let col = timestamp_text_col(0);
let err = pack_one(Value::Int(i64::MAX), &col).unwrap_err();
assert!(
err.to_string().contains("invalid timestamp value"),
"unexpected error message: {err}"
);
}

/// Regression: Value::Bytes carrying a unix-epoch string is the
/// wire variant that triggered the production failure
/// received unexpected value for timestamp type: Bytes("17436613..")
#[mz_ore::test]
fn timestamp_value_bytes_epoch() {
let col = timestamp_text_col(0);
let s = pack_one(Value::Bytes(b"1743661234".to_vec()), &col).unwrap();
assert_eq!(s, "2025-04-03 06:20:34");
}

/// Regression: the zero-date can also surface as Value::Bytes("0")
/// from the binlog replication path; this was the variant the
/// local integration test triggered most often.
#[mz_ore::test]
fn timestamp_value_bytes_zero() {
let col = timestamp_text_col(0);
let s = pack_one(Value::Bytes(b"0".to_vec()), &col).unwrap();
// Treat literal "0" as the unix epoch, matching the non-TEXT
// path's behavior at `SqlScalarType::Timestamp` above.
assert_eq!(s, "1970-01-01 00:00:00");
}

/// Fractional form of the TIMESTAMP2 binlog encoding —
/// "<sec>.<usec>" wrapped in Value::Bytes (binlog/value.rs:151-153).
/// Hits the `s.contains('.')` branch and the precision-aware
/// reformat.
#[mz_ore::test]
fn timestamp_value_bytes_epoch_fractional() {
let col = timestamp_text_col(6);
let s = pack_one(Value::Bytes(b"1743661234.123456".to_vec()), &col).unwrap();
assert_eq!(s, "2025-04-03 06:20:34.123456");
}

/// Bytes that aren't valid UTF-8 should produce a meaningful error,
/// not a panic.
#[mz_ore::test]
fn timestamp_value_bytes_invalid_utf8_errors() {
let col = timestamp_text_col(0);
// 0xC3 0x28 is an invalid 2-byte UTF-8 sequence.
let err = pack_one(Value::Bytes(vec![0xC3, 0x28]), &col).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("invalid timestamp value"),
"unexpected error message: {msg}"
);
}

/// Bytes that are valid UTF-8 but not parseable as a unix epoch
/// should produce the same structured error as invalid UTF-8 —
/// covers the chrono parse failure path that
/// `timestamp_value_bytes_invalid_utf8_errors` doesn't reach.
#[mz_ore::test]
fn timestamp_value_bytes_unparseable_errors() {
let col = timestamp_text_col(0);
for payload in [&b""[..], &b"not-an-epoch"[..], &b"2024-04-03 10:15:13"[..]] {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe my brain stopped working, but I can't tell why 2024-04-03 10:15:13 isn't a timestamp.

EDIT: found it

Value::Bytes("<sec>"/"<sec>.<usec>") — binlog MYSQL_TYPE_TIMESTAMP2 (5.6+)

let err = pack_one(Value::Bytes(payload.to_vec()), &col).unwrap_err();
assert!(
err.to_string().contains("invalid timestamp value"),
"payload {payload:?}: unexpected error message: {err}"
);
}
}

/// Variants that have no defined mapping for a TIMESTAMP column
/// must still produce the existing structured decode error so the
/// source health surface can flag them.
#[mz_ore::test]
fn timestamp_value_unsupported_variant_errors() {
let col = timestamp_text_col(0);
let err = pack_one(Value::Float(1.0), &col).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("unexpected value for timestamp"),
"unexpected error message: {msg}"
);
}
}
97 changes: 97 additions & 0 deletions test/mysql-cdc/text-columns-timestamp.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

#
# Regression test for MySQL TIMESTAMP columns ingested via TEXT COLUMNS,
# where the same upstream column contains a mix of valid timestamps and
# invalid zero-dates. TEXT COLUMNS is the documented workaround for
# columns that may hold values (e.g. '0000-00-00 00:00:00') that cannot
# be represented as a Materialize TIMESTAMP.
#
# The bug: MySQL TIMESTAMP values arrive as several mysql_common::Value
# variants depending on the protocol path (Value::Date for binlog events,
# Value::Bytes("<unix-epoch>") and Value::Int for some query/binlog
# combinations). The native TIMESTAMP decoder handles all three, but the
# TEXT COLUMNS branch in src/mysql-util/src/decoding.rs only handled
# Value::Date, so any valid TIMESTAMP routed through the Bytes/Int path
# failed with:
#
# error decoding value for '...' column '...': received unexpected
# value for timestamp type: Bytes("17436613..")
#
# and forced the source to be dropped and recreated.

> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}'

> CREATE CONNECTION mysqc TO MYSQL (
HOST mysql,
USER root,
PASSWORD SECRET mysqlpass
)

$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password}

# sql_mode = '' is required so MySQL accepts the zero-date that motivates
# the use of TEXT COLUMNS in the first place.
$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
SET SESSION sql_mode = '';
CREATE TABLE products (id INT PRIMARY KEY, created_at TIMESTAMP NULL, updated_at TIMESTAMP(6) NULL);
START TRANSACTION;
INSERT INTO products VALUES (1, '2024-04-03 10:15:13', '2024-04-03 10:15:13.123456'), (2, '0000-00-00 00:00:00', '0000-00-00 00:00:00.000000'), (3, NULL, NULL);
COMMIT;

> BEGIN
> CREATE SOURCE da
FROM MYSQL CONNECTION mysqc;
> CREATE TABLE products FROM SOURCE da (REFERENCE public.products)
WITH (TEXT COLUMNS = (created_at, updated_at));
> COMMIT

# Post-snapshot rows exercise the replication / binlog decode path,
# which uses a different mysql_common::Value variant than the snapshot.
$ mysql-execute name=mysql
USE public;
SET SESSION sql_mode = '';
START TRANSACTION;
INSERT INTO products VALUES (4, '2025-04-03 09:01:53', '2025-04-03 09:01:53.987654'), (5, '0000-00-00 00:00:00', '0000-00-00 00:00:00.000000'), (6, NULL, NULL);
COMMIT;

> SELECT id, created_at FROM products ORDER BY id;
1 "2024-04-03 10:15:13"
2 "0000-00-00 00:00:00"
3 <null>
4 "2025-04-03 09:01:53"
5 "0000-00-00 00:00:00"
6 <null>

> SELECT id, updated_at FROM products ORDER BY id;
1 "2024-04-03 10:15:13.123456"
2 "0000-00-00 00:00:00.000000"
3 <null>
4 "2025-04-03 09:01:53.987654"
5 "0000-00-00 00:00:00.000000"
6 <null>

# Verify the column types were rewritten to text by TEXT COLUMNS.
> SELECT pg_typeof(created_at), pg_typeof(updated_at) FROM products LIMIT 1;
text text

# The decode error stalls the source even after the queries above appear to
# succeed (the snapshot rows are already in dataflow and remain queryable).
# Asserting on the source status is what makes this a hard regression check.
# We poll until either the source is fully healthy or surfaces the stall;
# this avoids a race where the binlog error has not yet been recorded.
Comment on lines +121 to +125
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having not seen the error myself, it's not obvious what this comment means. I feel like I'd have to do some Linear+Slack archaeology, especially if I find it sometime later.

> SELECT name, status, error IS NULL FROM mz_internal.mz_source_statuses WHERE name IN ('da', 'products') ORDER BY name;
da running true
products running true

> DROP SOURCE da CASCADE;
Loading