feat: Databricks Zerobus loading#3904
Conversation
Deploying with
|
| Status | Name | Latest Commit | Preview URL | Updated (UTC) |
|---|---|---|---|---|
| ✅ Deployment successful! View logs |
docs | 0faec99 | Commit Preview URL Branch Preview URL |
May 13 2026, 07:30 PM |
|
This PR is not ready to merge (it's missing docs, tests, and config options), but before polishing it I want to get feedback on a performance concern: the new I've done perf tests using NYC taxi data, which identified three main bottlenecks for
1. Record acknowledgmentI think this one is out of our control, though we can skip waiting for acknowledgement at the cost of correctness guarantees (I've tested that and it makes things much faster, though still not faster than 2. load job file reading (client side)We have to read the JSON/Parquet load job file back into Python memory, so we can encode the records and feed it into the Zerobus SDK. 3. load job record encoding (client side)We have to do row-by-row processing to encode the values into a format that the Zerobus SDK accepts. Squeezing Zerobus into the wrong mold?Current resource emits dicts -> I think this doesn't fit the Zerobus streaming paradigm well. Something like resource emits dicts into Zerobus SDK would be a more natural fit. Zerobus Arrow supportDatabricks is working on Arrow support which looks like it might land soon. I think this will speed up or completely remove the need for (2) and (3). Together with skipping record acknowledgment, this might push @zilto what are your thoughts? |
|
Thanks for the performance analysis! Code looks good. My understandingTo double-check my understanding, when comparing "datatbricks" vs "databricks zerobus"
Performance improvement avenuesArrow supportZerobus Arrow support would be a quick win. We have a solid but incomplete PR (#3477) for Arrow IPC support that would skip a lot of the serialization costs in extract and normalize phases. Parallelize load phase
AFAIK, Disabling normalize and load stepsI agree with you that those features would be desirable more generally for |
Correct! Furthermore, I have tried:
None of it led to @zilto I'm happy to polish the PR to finalize it, but the current Zerobus performance makes me doubt whether there is any merit to merging this right now. Let me know what you think. |
…-databricks-zerobus
|
Switched Zerobus serialization over from JSON to Arrow, which became available recently in Important to note that Arrow isn't officially supported yet in the Zerobus Python SDK. It's not documented, but there is an example in the repo. @zilto Can you review this PR? |
zilto
left a comment
There was a problem hiding this comment.
We need to investigate:
- why
athenatests are failing. Some of their config were edited in this PR - why
hftests are failing. We're hitting rate limits. Do you expect any behavior change from this PR (e.g., batching logic)?
If those are unrelated, we can merge.
Otherwise, two minor questions / nits.
| TDataRecord = dict[str, Any] | ||
| """Table row dictionary. Not guaranteed to be JSON serializable without custom encoding.""" | ||
| TDataRecordBatch = list[TDataRecord] | ||
| """List of table row dictionaries. Not guaranteed to be JSON serializable without custom encoding.""" |
There was a problem hiding this comment.
I like the centralization of batching logic. Though, how does TDataRecord and TDataRecordBatch differ from TDataItem and TDataItems?
(it's a bit annoyting that TDataItems is not guaranteed to be a list...)
There was a problem hiding this comment.
I think they roughly represent the same concepts. My issue is that TDataItem is just Any, which makes dev hard.
| default_sql_configs_with_staging = [ | ||
| # Athena needs filesystem staging, which will be automatically set; we have to supply a bucket url though. | ||
| cid_configs_by_cid["athena"], | ||
| cid_configs_by_cid["athena-iceberg"], | ||
| cid_configs_by_cid["athena-s3-tables"], | ||
| ] |
There was a problem hiding this comment.
CI workflows related to these configurations seem to be failing now. I retried them and they failed again.
There was a problem hiding this comment.
It's not related to changes in this PR. The failing tests also fail on origin/devel.
zilto
left a comment
There was a problem hiding this comment.
The failing CI is also failing on devel branch, so seems unrelated.
Let's merge. Good job!
Description
Adds support for using Zerobus to load data into Databricks Delta tables.
API:
databricks_adapter(my_resource, insert_api="zerobus")insert_apior set it tocopy_intoNotes:
bigquery's API:bigquery_adapter(my_resource, insert_api="streaming")zerobusinsert API is only supported for theappendwrite dispositionRelated Issues
Closes #3874