Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,62 @@ def _get_column_def_sql(self, column: TColumnSchema, table: PreparedTableSchema
column_def_sql += option_str
return column_def_sql

def _get_dest_column_descriptions(self, table_name: str) -> Dict[str, Optional[str]]:
"""Fetch current column descriptions from BigQuery."""
try:
bq_table = self.sql_client.native_connection.get_table(
self.sql_client.make_qualified_table_name(table_name, quote=False),
retry=self.sql_client._default_retry,
timeout=self.config.http_timeout,
)
return {field.name: field.description for field in bq_table.schema}
except gcp_exceptions.NotFound:
return {}

def _alter_existing_column_hints_sql(
self, table_name: str, storage_columns: TTableSchemaColumns
) -> List[str]:
"""Emit ALTER COLUMN SET OPTIONS for columns whose descriptions differ.

Compares schema descriptions against the current BigQuery state and only
emits statements when they differ. Handles both adding/updating and
removing descriptions (via SET OPTIONS(description=NULL)).
"""
schema_columns = self.schema.get_table_columns(table_name)
if not schema_columns:
return []

dest_descriptions = self._get_dest_column_descriptions(table_name)
qualified_name = self.sql_client.make_qualified_table_name(table_name)
sql_updates: List[str] = []

for col_name, schema_col in schema_columns.items():
if col_name not in storage_columns:
# New column — handled by _get_table_update_sql, not here.
continue

# Normalize empty strings to None for comparison
schema_desc = schema_col.get("description") or None
dest_desc = dest_descriptions.get(col_name) or None

if schema_desc == dest_desc:
continue

escaped_col = self.sql_client.escape_column_name(col_name)
if schema_desc:
escaped_desc = escape_bigquery_literal(schema_desc)
sql_updates.append(
f"ALTER TABLE {qualified_name}\n"
f"ALTER COLUMN {escaped_col} SET OPTIONS(description={escaped_desc})"
)
else:
# Description removed from schema — clear it in BigQuery
sql_updates.append(
f"ALTER TABLE {qualified_name}\n"
f"ALTER COLUMN {escaped_col} SET OPTIONS(description=NULL)"
)
return sql_updates

def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigquery.LoadJob:
# append to table for merge loads (append to stage) and regular appends.
table_name = table["name"]
Expand Down
18 changes: 18 additions & 0 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,11 +676,29 @@ def _build_schema_update_sql(
for sql in post_sql_statements:
post_sql_updates.append(sql)

# For existing tables, update column hints (e.g. descriptions) on columns
# that already exist in the destination but have changed in the schema.
if generate_alter:
sql_updates.extend(
self._alter_existing_column_hints_sql(table_name, storage_columns)
)

# add post sql updates at the end
sql_updates.extend(post_sql_updates)

return sql_updates, schema_update

def _alter_existing_column_hints_sql(
self, table_name: str, storage_columns: TTableSchemaColumns
) -> List[str]:
"""Generates SQL to update hints (e.g. descriptions) on existing columns.

Called for tables that already exist in the destination. Override in
destination-specific clients to emit ALTER COLUMN statements for hint
changes. The base implementation returns an empty list.
"""
return []

def _make_add_column_sql(
self, new_columns: Sequence[TColumnSchema], table: PreparedTableSchema = None
) -> List[str]:
Expand Down
107 changes: 107 additions & 0 deletions tests/load/bigquery/test_bigquery_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,113 @@ def some_data() -> Iterator[Dict[str, int]]:
assert column_info["rounding_mode"] == "ROUND_HALF_AWAY_FROM_ZERO" # type: ignore[call-overload]


def _stub_dest_descriptions(gcp_client: BigQueryClient, descriptions: Dict[str, Any]) -> None:
"""Replace _get_dest_column_descriptions with a stub returning fixed data."""
gcp_client._get_dest_column_descriptions = lambda table_name: descriptions # type: ignore[assignment]


def test_alter_column_descriptions_when_changed(gcp_client: BigQueryClient) -> None:
"""Descriptions should be updated when they differ from the destination."""
columns = deepcopy(TABLE_UPDATE[:2])
columns[0]["description"] = "New description"
columns[1]["description"] = "Another description"

gcp_client.schema.update_table(utils.new_table("event_test_table", columns=columns))
storage_columns = {c["name"]: {"name": c["name"], "data_type": c["data_type"]} for c in columns}

# Destination has no descriptions — both should be updated
_stub_dest_descriptions(gcp_client, {columns[0]["name"]: None, columns[1]["name"]: None})

sql_updates = gcp_client._alter_existing_column_hints_sql("event_test_table", storage_columns)
assert len(sql_updates) == 2
for sql in sql_updates:
sqlfluff.parse(sql, dialect="bigquery")
assert "ALTER COLUMN" in sql
assert "SET OPTIONS(description=" in sql


def test_alter_column_descriptions_skips_unchanged(gcp_client: BigQueryClient) -> None:
"""No ALTER should be emitted when destination descriptions already match."""
columns = deepcopy(TABLE_UPDATE[:2])
columns[0]["description"] = "Same description"
columns[1]["description"] = "Also same"

gcp_client.schema.update_table(utils.new_table("event_test_table", columns=columns))
storage_columns = {c["name"]: {"name": c["name"], "data_type": c["data_type"]} for c in columns}

# Destination already has matching descriptions
_stub_dest_descriptions(
gcp_client,
{
columns[0]["name"]: "Same description",
columns[1]["name"]: "Also same",
},
)

sql_updates = gcp_client._alter_existing_column_hints_sql("event_test_table", storage_columns)
assert len(sql_updates) == 0


def test_alter_column_descriptions_handles_removal(gcp_client: BigQueryClient) -> None:
"""Removing a description from schema should emit SET OPTIONS(description=NULL)."""
columns = deepcopy(TABLE_UPDATE[:2])
# No descriptions in schema

gcp_client.schema.update_table(utils.new_table("event_test_table", columns=columns))
storage_columns = {c["name"]: {"name": c["name"], "data_type": c["data_type"]} for c in columns}

# Destination has descriptions that should be cleared
_stub_dest_descriptions(
gcp_client,
{
columns[0]["name"]: "Old description",
columns[1]["name"]: "Another old one",
},
)

sql_updates = gcp_client._alter_existing_column_hints_sql("event_test_table", storage_columns)
assert len(sql_updates) == 2
for sql in sql_updates:
sqlfluff.parse(sql, dialect="bigquery")
assert "SET OPTIONS(description=NULL)" in sql


def test_alter_column_descriptions_skips_new_columns(gcp_client: BigQueryClient) -> None:
"""Columns not yet in storage should not get ALTER statements (handled by ADD COLUMN)."""
columns = deepcopy(TABLE_UPDATE[:2])
columns[0]["description"] = "First column description"
columns[1]["description"] = "Second column description"

gcp_client.schema.update_table(utils.new_table("event_test_table", columns=columns))

# Only col1 exists in storage — col2 is new
storage_columns = {
columns[0]["name"]: {"name": columns[0]["name"], "data_type": columns[0]["data_type"]}
}
_stub_dest_descriptions(gcp_client, {columns[0]["name"]: None})

sql_updates = gcp_client._alter_existing_column_hints_sql("event_test_table", storage_columns)
assert len(sql_updates) == 1
assert columns[0]["name"] in sql_updates[0]


def test_alter_column_descriptions_escapes_special_characters(gcp_client: BigQueryClient) -> None:
"""Descriptions with quotes and special characters should be properly escaped."""
columns = deepcopy(TABLE_UPDATE[:1])
columns[0]["description"] = "It's a 'test' with \"quotes\" and \\ backslashes"

gcp_client.schema.update_table(utils.new_table("event_test_table", columns=columns))
storage_columns = {
columns[0]["name"]: {"name": columns[0]["name"], "data_type": columns[0]["data_type"]}
}
_stub_dest_descriptions(gcp_client, {columns[0]["name"]: None})

sql_updates = gcp_client._alter_existing_column_hints_sql("event_test_table", storage_columns)
assert len(sql_updates) == 1
sqlfluff.parse(sql_updates[0], dialect="bigquery")
assert "SET OPTIONS(description=" in sql_updates[0]


def test_adapter_no_hints_parsing() -> None:
@dlt.resource(columns=[{"name": "int_col", "data_type": "bigint"}])
def some_data() -> Iterator[Dict[str, str]]:
Expand Down