Skip to content

[AURON #2320] Fix PB deserialization bug & improve PB parsing performance#2339

Open
Tartarus0zm wants to merge 5 commits into
apache:masterfrom
Tartarus0zm:pb-deserializer-opts
Open

[AURON #2320] Fix PB deserialization bug & improve PB parsing performance#2339
Tartarus0zm wants to merge 5 commits into
apache:masterfrom
Tartarus0zm:pb-deserializer-opts

Conversation

@Tartarus0zm

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2320

Rationale for this change

  • Boolean type was not given a default value, causing incorrect data results.
  • Complex nested PB structures can encounter parsing errors.

What changes are included in this PR?

  • Fix boolean type was not given a default value, causing incorrect data results.
  • FIx complex nested PB structures can encounter parsing errors.
  • improve PB parsing performance

Are there any user-facing changes?

  • No

How was this patch tested?

  • No

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 critical correctness bugs that will cause panics in production with common protobuf schemas, plus 2 moderate issues. Cannot approve until the critical ones are addressed.


Critical 1: O3 bitmap optimization skips ensure_size, breaking List/Map/Struct builders

Location: Main deserialization loop, the if seen_tags.count_ones() < total_handlers guard

Problem: ensure_size(row_idx + 1) is the only mechanism that calls SharedListArrayBuilder::append(true) / SharedMapArrayBuilder::append(true) / SharedStructArrayBuilder::append(false) to finalize these builders' per-row offset/null-buffer entries. The individual value handlers (impl_for_repeated_builder, impl_for_message_builder) only append to the child values builder -- they never call the parent's append().

When all tags are present in a row (seen_tags.count_ones() == total_handlers), ensure_size is skipped entirely. List/Map/Struct builders never get their slot finalized for that row.

Reproduction: Any schema with a repeated field where every message populates all fields. After processing N rows, scalar builders have len = N, but SharedListArrayBuilder has len = 0 (no append(true) was ever called). At finish() time, the offsets array is empty while values exist -- corrupt ListArray or panic.

Fix suggestion: The optimization assumption ("if all handlers fired, all builders are correctly sized") is incorrect for composite builders. Either:

  • Remove the optimization, or
  • After each handler fires, have composite-type handlers also call their own append() internally (i.e., move the list/struct finalization INTO the handler instead of relying on ensure_size)

Critical 2: Sub-message struct handler missing sub_ensure_size in empty-buf branch

Location: Struct handler's if buf.is_empty() branch where it only calls struct_builder.get_mut().append(false)

Problem: When a struct field is present on the wire with zero-length content, the handler calls struct_builder.append(false) (advancing the struct's null buffer to len N) but does NOT call sub_ensure_size to pad the struct's child builders. Combined with Critical 1 (top-level ensure_size may be skipped), the struct builder's null_buffer_builder.len() diverges from its children's lengths.

Reproduction: Messages where a struct field is always present but empty (tag + length 0), with all other fields also present. StructArray::new() panics: "all arrays in a StructArray must have the same length."

Fix: Call sub_ensure_size before struct_builder.append(false) in the empty branch, same as the non-empty branch.


Moderate: log::info with DEBUG prefix left in production code

Location: transfer_output_schema_to_pb_schema function

A debug log statement at INFO level will pollute production logs on every schema initialization. Should be log::debug! or removed.


Moderate: unsafe from_utf8_unchecked on raw wire bytes

Location: String field handlers (impl_for_bytes_builder usage)

While proto3 specifies string fields must be UTF-8, producers may be non-conformant. The unsafe trades a validity check for throughput, but creates undefined behavior (not just wrong output) on malformed input. Consider adding at minimum a debug_assert for UTF-8 validity.


The rest of the PR (error propagation replacing .expect(), skip_pb_value extraction, RefCell buffer reuse, ValueHandlers enum) looks good. The bug fixes (C1 unknown-tag skip, C3 error propagation) are valuable improvements. Just the O3 ensure_size interaction needs rethinking.

@github-actions github-actions Bot removed the flink label Jun 24, 2026
@Tartarus0zm

Copy link
Copy Markdown
Contributor Author

@SteNicholas thanks for your review! I've fixed the issue you mentioned. PTAL

@Tartarus0zm Tartarus0zm requested a review from SteNicholas June 24, 2026 11:20

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Tartarus0zm, thanks for updates — the boolean-default fix and the unknown-tag/skip_pb_value cursor-sync fix (C1) are real correctness improvements, and the C2 empty-struct child padding is a good catch. CI is green, but note the new unit tests always populate every field, so they don't exercise the input shape that breaks below.

The one must-fix before merge is the O10 optimization (inline comment on the short-circuit): it makes a whole-batch-absent column come out all-NULL, which silently re-introduces the exact #2320 boolean bug this PR fixes (and regresses int/string/float/binary/list/map the same way). Everything else is non-blocking — a cross-file side effect on the JSON deserializer worth confirming, a couple of behavior changes worth a conscious decision, the expect() panics the PR's own O7/C3 goal meant to remove, and minor cleanups.

Verified sound (did not flag): O3's ensure_size skip (with the C1 ensure_size_every_row guard), nested List/Map inside a top-level Struct, and C2 — all hold. from_utf8_unchecked is pre-existing (master already had it).

// O10: if the (top-level) column was never written, the entire
// resulting array is null — skip the lazy bitmap scan entirely.
let top_idx = mapping[0];
if mapping.len() == 1 && !top_builders_touched[top_idx] {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Blocker] O10 returns an all-NULL column for any top-level field absent from the entire batch — this re-introduces the #2320 bug.

top_builders_touched[idx] is set only when a field's tag is seen in some row. But absent rows are padded by ensure_size with non-null defaults (ensure_output_array_builders_size: false/0/""/0.0/b"", and empty non-null list/map). So when a field is absent in every message of a batch:

  • without O10 → builder is all-default, null_count() != len(), cast path emits the defaults (correct);
  • with O10 → touched == falsenew_null_arrayall null (wrong).

Concretely: a top-level Boolean field never set in a batch yields all null instead of all false — exactly the bug this PR fixes, just for the all-absent case. Same regression for Int/UInt/Float/String/Binary (→ should be 0/""/0.0/b"") and List/Map (→ should be empty non-null). It's also internally inconsistent: present-in-≥1-row → default, absent-in-all → null; and a nested field (mapping.len() > 1, O10 skipped) → default while the top-level field → null.

Suggest dropping O10 entirely: the existing array_ref.null_count() == array_ref.len() path (line ~209) already handles genuinely-all-null columns, so O10 adds no correctness-safe benefit. Removing it also lets you delete tag_to_top_idx_vec/tag_to_top_idx_map (lines 94-95) and the per-batch top_builders_touched allocation (line 152), which exist only to feed this short-circuit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Dropped O10 entirely as suggested. Removed tag_to_top_idx_vec/tag_to_top_idx_map, the per-batch top_builders_touched allocation, the mark-touched block in the parse loop, and the output-path short-circuit. The remaining array_ref.null_count() == array_ref.len() path already handles genuinely-all-null columns. Added a regression test test_parse_messages_top_level_boolean_absent_in_all_rows covering the exact "top-level field absent from every row" shape — verified red/green (re-introducing the short-circuit makes it fail, removing it passes).

Ok(match builder_type {
BuilderType::Boolean => {
impl_for_builders!(BooleanBuilder, builders, |b| b.append_null())
impl_for_builders!(BooleanBuilder, builders, |b| b.append_value(false))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This boolean padding change (append_null()append_value(false)) is correct for the proto path, but ensure_output_array_builders_size is pub(crate) and also used by the JSON deserializer (json_deserializer.rs:133). JSON's own boolean handler appends null for an explicit JSON null (json_deserializer.rs:335), so after this change a JSON record that omits a boolean field → false, while an explicit nullnull — a new absent-vs-null inconsistency in the JSON path that isn't mentioned or tested. Please confirm this side effect on the JSON deserializer is intended (and ideally add a JSON test pinning the behavior).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion! Added test_parse_json_boolean_omitted_vs_explicit_null to pin all three states (explicit true / explicit null / omitted).

})?;
if let Some(sub_value_handler) = sub_value_handlers.get(&sub_tag) {
// O7/C3 fix: propagate error instead of expect()
(*sub_value_handler)(&mut sub_cursor, sub_tag, sub_wire_type)?;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior change worth a conscious decision: the old sub-message loops used if let Ok((tag, wt)) = decode_key { ... }, silently tolerating a malformed/truncated nested message. Propagating with ? here (and in the list/map twins at ~1584 and ~1660) removes a possible spin on a non-advancing decode error (good), but now a single bad nested record fails the entire parse_messages_with_kafka_meta batch instead of being skipped — which could stall a stream on one corrupt message. If best-effort tolerance was intentional upstream, consider skipping the offending sub-message instead of failing the batch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

nested_msg_mapping,
&skip_fields,
)
.expect("Failed to transfer output schema to pb schema");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This .expect(...) (and the recursive one at line 465) still panics, which contradicts this PR's own O7/C3 goal of removing expect() so JNI callers don't abort the JVM via SIGABRT. Both the enclosing fn and the callee return Result, so this can just be ?. (Init-path, so lower severity than the per-row paths you already fixed — but it's the same crash mode.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Both transfer_output_schema_to_pb_schema(...).expect(...) (the top-level call and the recursive one) are now ?. Consistent with the PR's O7/C3 goal of not aborting the JVM.

// upstream message could surface invalid UTF-8 in the
// resulting StringArray (downstream Arrow consumers
// typically tolerate this).
debug_assert!(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note (pre-existing, not introduced here): str::from_utf8_unchecked on the proto string bytes a few lines down is UB if the payload isn't valid UTF-8, and this debug_assert! is compiled out in release builds — so corrupt/untrusted Kafka input can construct an invalid &str and a StringArray that violates Arrow's UTF-8 invariant. Since you're touching these sites, it'd be worth replacing the unchecked with a checked from_utf8 (error or lossy) on the release path rather than only asserting in debug. The added comment rationalizes the unchecked call as safe, which isn't true for non-conformant producers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced both unchecked sites with checked std::str::from_utf8(value).map_err(...)?, and removed the misleading O11/SAFETY comment that rationalized the unchecked call as safe. The impl_for_bytes_builder! macro now propagates the handler's Result (res?), so invalid UTF-8 from a non-conformant producer surfaces as a clean error instead of UB.

nested_msg_mapping: &HashMap<String, String>,
skip_fields: &[String],
) -> Result<SchemaRef> {
log::debug!(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover dev-tracing log (was log::info!("[DEBUG] …"), now log::debug!). It dumps internal mapping state and re-derives a Vec of all field names per recursive nested level. Suggest removing it rather than just downgrading the level.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

fn len(&self) -> usize {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor efficiency: the Vec variant of len() is an O(max_tag) scan (v.iter().filter(|h| h.is_some()).count()), and it's called once per batch as total_handlers (line 147) to recompute a value that's constant for the deserializer's lifetime. Precompute the handler count once in from_map/new() and store it as a field for an O(1) read.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Precomputed once in try_new and stored as a handler_count field.

get_content_after_last_dot(enum_value_descriptor.name()).to_string(),
);
}
let enum_string_mapping = Arc::new(enum_string_mapping);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The O8 comment claims this Arc<HashMap> is shared across multiple handlers, but enum_string_mapping is built fresh inside each create_value_handler call (per field) and the Arc is only cloned into that one field's closure — never shared across handlers. So the Arc adds an atomic refcount for no sharing benefit (a plain owned HashMap moved into the closure is equivalent), and the three let mapping = enum_string_mapping; rebindings in the mutually-exclusive branches are redundant. Either drop the Arc + rebindings, or actually hoist/share the map across fields of the same enum type if that was the intent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Dropped the Arc and the three redundant let mapping = enum_string_mapping; rebindings; the owned HashMap is now moved directly into the (mutually-exclusive) closure. Updated the comment to stop claiming cross-handler sharing that never happened.

struct_builder.get_mut().append(false);
} else {
let mut sub_cursor = Cursor::new(buf);
while sub_cursor.has_remaining() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maintainability: this sub-message decode loop is duplicated almost verbatim three times (struct here, list-of-struct ~1583, map ~1660), as is the preceding for field in sub_message_descriptor.fields() { create_value_handler(...) } builder loop. The C1 "skip unknown sub-tags" and O7 error-propagation fixes each had to be applied in all three; a shared helper (e.g. decode_sub_message(buf, &sub_value_handlers)) would collapse them and prevent future drift.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! done

)?;
} else {
// C1 fix: skip unknown sub-tags
skip_pb_value(&mut sub_cursor, sub_tag, sub_wire_type)?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This map branch picks up the same C1 unknown-sub-tag skip and O7 ? propagation as the struct and list-of-struct branches, and a top-level Map column relies on the top_level_has_list_or_mapensure_size-every-row path to finalize its per-row offset/null slot. The new tests cover the List path (test_parse_messages_with_repeated_field_all_tags_present) and the Struct path (test_parse_messages_with_empty_struct_message_all_tags_present), but I don't see one that builds a DataType::Map schema — so this finalization, which is structurally different from List (append(true) advances the map's offset + null buffers rather than pushing to a child values builder), isn't exercised by any test.

Would it be worth adding a top-level-map case — one row with ≥1 entry and one row with the map absent, asserting map.len() == num_rows and the per-row entry counts? That pins the row-alignment invariant this PR is fixing: a regression where top_level_has_list_or_map ever stopped matching DataType::Map would desync the map column from the batch row count with no failing test. The same test would also cover the new top-level unknown-tag skip_pb_value fallback and the sparse-tag ValueHandlers::Map branch, both currently unexercised.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test_parse_messages_with_top_level_map: one row with 2 entries and one row with the map absent, asserting map.len() == num_rows and the per-row entry counts. This pins the row-alignment invariant (the top_level_has_list_or_map → ensure-size-every-row finalization) and also exercises the top-level unknown-tag skip_pb_value fallback and the sparse-tag ValueHandlers::Map branch.

Copilot AI review requested due to automatic review settings July 3, 2026 09:01

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses correctness and stability issues in the Flink Protobuf (PB) deserializer—especially row/column alignment failures with complex nested structures (issue #2320)—and introduces several parsing hot-path optimizations to reduce overhead during decoding.

Changes:

  • Fixes PB decoding alignment issues by properly skipping unknown tags, padding nested builders correctly, and ensuring top-level List/Map builders finalize per-row slots.
  • Fixes boolean default handling (proto3-style default false instead of all-NULL when absent) and avoids process aborts by replacing several expect() paths with error propagation.
  • Improves parsing performance via adaptive handler dispatch (Vec vs HashMap), avoiding intermediate StructArray construction, and reducing repeated allocations in packed repeated-field decoding.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs Core fixes for PB row-alignment, unknown-tag skipping, boolean defaults, safer error propagation, plus multiple performance optimizations and new regression tests.
native-engine/datafusion-ext-plans/src/flink/serde/json_deserializer.rs Adds a regression test pinning boolean omitted-vs-null behavior after the shared boolean default change.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +66 to +70
// Heuristic: dense enough and within 64-tag bitmap range. We cap at
// 64 so it composes nicely with O3's seen_tags bitmap, but the cap
// is independent — the fallback HashMap remains correct.
if field_count > 0 && max_tag <= 64 && (max_tag as usize) <= field_count.saturating_mul(4) {
let mut vec: Vec<Option<ValueHandler>> = (0..=max_tag).map(|_| None).collect();
@Tartarus0zm

Copy link
Copy Markdown
Contributor Author

hi @SteNicholas @weiqingy I have updated this PR, PTAL

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fix row-column misalignment with complex structures in PB deserializer

5 participants