diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 7f644c703e..878d644e79 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -485,6 +485,62 @@ def _get_column_def_sql(self, column: TColumnSchema, table: PreparedTableSchema column_def_sql += option_str return column_def_sql + def _get_dest_column_descriptions(self, table_name: str) -> Dict[str, Optional[str]]: + """Fetch current column descriptions from BigQuery.""" + try: + bq_table = self.sql_client.native_connection.get_table( + self.sql_client.make_qualified_table_name(table_name, quote=False), + retry=self.sql_client._default_retry, + timeout=self.config.http_timeout, + ) + return {field.name: field.description for field in bq_table.schema} + except gcp_exceptions.NotFound: + return {} + + def _alter_existing_column_hints_sql( + self, table_name: str, storage_columns: TTableSchemaColumns + ) -> List[str]: + """Emit ALTER COLUMN SET OPTIONS for columns whose descriptions differ. + + Compares schema descriptions against the current BigQuery state and only + emits statements when they differ. Handles both adding/updating and + removing descriptions (via SET OPTIONS(description=NULL)). + """ + schema_columns = self.schema.get_table_columns(table_name) + if not schema_columns: + return [] + + dest_descriptions = self._get_dest_column_descriptions(table_name) + qualified_name = self.sql_client.make_qualified_table_name(table_name) + sql_updates: List[str] = [] + + for col_name, schema_col in schema_columns.items(): + if col_name not in storage_columns: + # New column — handled by _get_table_update_sql, not here. + continue + + # Normalize empty strings to None for comparison + schema_desc = schema_col.get("description") or None + dest_desc = dest_descriptions.get(col_name) or None + + if schema_desc == dest_desc: + continue + + escaped_col = self.sql_client.escape_column_name(col_name) + if schema_desc: + escaped_desc = escape_bigquery_literal(schema_desc) + sql_updates.append( + f"ALTER TABLE {qualified_name}\n" + f"ALTER COLUMN {escaped_col} SET OPTIONS(description={escaped_desc})" + ) + else: + # Description removed from schema — clear it in BigQuery + sql_updates.append( + f"ALTER TABLE {qualified_name}\n" + f"ALTER COLUMN {escaped_col} SET OPTIONS(description=NULL)" + ) + return sql_updates + def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigquery.LoadJob: # append to table for merge loads (append to stage) and regular appends. table_name = table["name"] diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 4730249161..9fc2e9d0d0 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -676,11 +676,29 @@ def _build_schema_update_sql( for sql in post_sql_statements: post_sql_updates.append(sql) + # For existing tables, update column hints (e.g. descriptions) on columns + # that already exist in the destination but have changed in the schema. + if generate_alter: + sql_updates.extend( + self._alter_existing_column_hints_sql(table_name, storage_columns) + ) + # add post sql updates at the end sql_updates.extend(post_sql_updates) return sql_updates, schema_update + def _alter_existing_column_hints_sql( + self, table_name: str, storage_columns: TTableSchemaColumns + ) -> List[str]: + """Generates SQL to update hints (e.g. descriptions) on existing columns. + + Called for tables that already exist in the destination. Override in + destination-specific clients to emit ALTER COLUMN statements for hint + changes. The base implementation returns an empty list. + """ + return [] + def _make_add_column_sql( self, new_columns: Sequence[TColumnSchema], table: PreparedTableSchema = None ) -> List[str]: diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index 4aa55dd8ff..aef8e2f524 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -682,6 +682,113 @@ def some_data() -> Iterator[Dict[str, int]]: assert column_info["rounding_mode"] == "ROUND_HALF_AWAY_FROM_ZERO" # type: ignore[call-overload] +def _stub_dest_descriptions(gcp_client: BigQueryClient, descriptions: Dict[str, Any]) -> None: + """Replace _get_dest_column_descriptions with a stub returning fixed data.""" + gcp_client._get_dest_column_descriptions = lambda table_name: descriptions # type: ignore[assignment] + + +def test_alter_column_descriptions_when_changed(gcp_client: BigQueryClient) -> None: + """Descriptions should be updated when they differ from the destination.""" + columns = deepcopy(TABLE_UPDATE[:2]) + columns[0]["description"] = "New description" + columns[1]["description"] = "Another description" + + gcp_client.schema.update_table(utils.new_table("event_test_table", columns=columns)) + storage_columns = {c["name"]: {"name": c["name"], "data_type": c["data_type"]} for c in columns} + + # Destination has no descriptions — both should be updated + _stub_dest_descriptions(gcp_client, {columns[0]["name"]: None, columns[1]["name"]: None}) + + sql_updates = gcp_client._alter_existing_column_hints_sql("event_test_table", storage_columns) + assert len(sql_updates) == 2 + for sql in sql_updates: + sqlfluff.parse(sql, dialect="bigquery") + assert "ALTER COLUMN" in sql + assert "SET OPTIONS(description=" in sql + + +def test_alter_column_descriptions_skips_unchanged(gcp_client: BigQueryClient) -> None: + """No ALTER should be emitted when destination descriptions already match.""" + columns = deepcopy(TABLE_UPDATE[:2]) + columns[0]["description"] = "Same description" + columns[1]["description"] = "Also same" + + gcp_client.schema.update_table(utils.new_table("event_test_table", columns=columns)) + storage_columns = {c["name"]: {"name": c["name"], "data_type": c["data_type"]} for c in columns} + + # Destination already has matching descriptions + _stub_dest_descriptions( + gcp_client, + { + columns[0]["name"]: "Same description", + columns[1]["name"]: "Also same", + }, + ) + + sql_updates = gcp_client._alter_existing_column_hints_sql("event_test_table", storage_columns) + assert len(sql_updates) == 0 + + +def test_alter_column_descriptions_handles_removal(gcp_client: BigQueryClient) -> None: + """Removing a description from schema should emit SET OPTIONS(description=NULL).""" + columns = deepcopy(TABLE_UPDATE[:2]) + # No descriptions in schema + + gcp_client.schema.update_table(utils.new_table("event_test_table", columns=columns)) + storage_columns = {c["name"]: {"name": c["name"], "data_type": c["data_type"]} for c in columns} + + # Destination has descriptions that should be cleared + _stub_dest_descriptions( + gcp_client, + { + columns[0]["name"]: "Old description", + columns[1]["name"]: "Another old one", + }, + ) + + sql_updates = gcp_client._alter_existing_column_hints_sql("event_test_table", storage_columns) + assert len(sql_updates) == 2 + for sql in sql_updates: + sqlfluff.parse(sql, dialect="bigquery") + assert "SET OPTIONS(description=NULL)" in sql + + +def test_alter_column_descriptions_skips_new_columns(gcp_client: BigQueryClient) -> None: + """Columns not yet in storage should not get ALTER statements (handled by ADD COLUMN).""" + columns = deepcopy(TABLE_UPDATE[:2]) + columns[0]["description"] = "First column description" + columns[1]["description"] = "Second column description" + + gcp_client.schema.update_table(utils.new_table("event_test_table", columns=columns)) + + # Only col1 exists in storage — col2 is new + storage_columns = { + columns[0]["name"]: {"name": columns[0]["name"], "data_type": columns[0]["data_type"]} + } + _stub_dest_descriptions(gcp_client, {columns[0]["name"]: None}) + + sql_updates = gcp_client._alter_existing_column_hints_sql("event_test_table", storage_columns) + assert len(sql_updates) == 1 + assert columns[0]["name"] in sql_updates[0] + + +def test_alter_column_descriptions_escapes_special_characters(gcp_client: BigQueryClient) -> None: + """Descriptions with quotes and special characters should be properly escaped.""" + columns = deepcopy(TABLE_UPDATE[:1]) + columns[0]["description"] = "It's a 'test' with \"quotes\" and \\ backslashes" + + gcp_client.schema.update_table(utils.new_table("event_test_table", columns=columns)) + storage_columns = { + columns[0]["name"]: {"name": columns[0]["name"], "data_type": columns[0]["data_type"]} + } + _stub_dest_descriptions(gcp_client, {columns[0]["name"]: None}) + + sql_updates = gcp_client._alter_existing_column_hints_sql("event_test_table", storage_columns) + assert len(sql_updates) == 1 + sqlfluff.parse(sql_updates[0], dialect="bigquery") + assert "SET OPTIONS(description=" in sql_updates[0] + + def test_adapter_no_hints_parsing() -> None: @dlt.resource(columns=[{"name": "int_col", "data_type": "bigint"}]) def some_data() -> Iterator[Dict[str, str]]: