Skip to content

Commit e9cbabd

Browse files
authored
feat(parquet): batch consecutive null/empty rows in write_list (#9752)
# Which issue does this PR close? - Spawn off from #9653 - Contributes to #9731 # Rationale for this change See #9731 # What changes are included in this PR? Restructure `write_list()` to accumulate consecutive null and empty rows and flush them in a single `visit_leaves()` call using `extend(repeat_n(...))`, instead of calling `visit_leaves()` per row. With sparse data (99% nulls), a 4096-row batch previously triggered ~4000 individual tree traversals, each pushing a single value per leaf. Now consecutive null/empty runs are collapsed into one traversal that extends all leaf level buffers in bulk. This follows the same pattern already used by `write_struct()`. The `write_non_null_slice` path is unchanged since each non-null row has different offsets and cannot be batched. # Are these changes tested? All tests passing; existing tests give 100% coverage. # Are there any user-facing changes? N/A Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
1 parent 73ceb1d commit e9cbabd

1 file changed

Lines changed: 49 additions & 19 deletions

File tree

parquet/src/arrow/arrow_writer/levels.rs

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -336,51 +336,81 @@ impl LevelInfoBuilder {
336336
})
337337
};
338338

339-
let write_empty_slice = |child: &mut LevelInfoBuilder| {
340-
child.visit_leaves(|leaf| {
341-
let rep_levels = leaf.rep_levels.as_mut().unwrap();
342-
rep_levels.push(ctx.rep_level - 1);
343-
let def_levels = leaf.def_levels.as_mut().unwrap();
344-
def_levels.push(ctx.def_level - 1);
345-
})
339+
let write_null_run = |child: &mut LevelInfoBuilder, count: usize| {
340+
if count > 0 {
341+
child.visit_leaves(|leaf| {
342+
leaf.rep_levels
343+
.as_mut()
344+
.unwrap()
345+
.extend(std::iter::repeat_n(ctx.rep_level - 1, count));
346+
leaf.def_levels
347+
.as_mut()
348+
.unwrap()
349+
.extend(std::iter::repeat_n(ctx.def_level - 2, count));
350+
});
351+
}
346352
};
347353

348-
let write_null_slice = |child: &mut LevelInfoBuilder| {
349-
child.visit_leaves(|leaf| {
350-
let rep_levels = leaf.rep_levels.as_mut().unwrap();
351-
rep_levels.push(ctx.rep_level - 1);
352-
let def_levels = leaf.def_levels.as_mut().unwrap();
353-
def_levels.push(ctx.def_level - 2);
354-
})
354+
let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| {
355+
if count > 0 {
356+
child.visit_leaves(|leaf| {
357+
leaf.rep_levels
358+
.as_mut()
359+
.unwrap()
360+
.extend(std::iter::repeat_n(ctx.rep_level - 1, count));
361+
leaf.def_levels
362+
.as_mut()
363+
.unwrap()
364+
.extend(std::iter::repeat_n(ctx.def_level - 1, count));
365+
});
366+
}
355367
};
356368

357369
match nulls {
358370
Some(nulls) => {
359371
let null_offset = range.start;
372+
let mut pending_nulls: usize = 0;
373+
let mut pending_empties: usize = 0;
374+
360375
// TODO: Faster bitmask iteration (#1757)
361376
for (idx, w) in offsets.windows(2).enumerate() {
362377
let is_valid = nulls.is_valid(idx + null_offset);
363378
let start_idx = w[0].as_usize();
364379
let end_idx = w[1].as_usize();
380+
365381
if !is_valid {
366-
write_null_slice(child)
382+
write_empty_run(child, pending_empties);
383+
pending_empties = 0;
384+
pending_nulls += 1;
367385
} else if start_idx == end_idx {
368-
write_empty_slice(child)
386+
write_null_run(child, pending_nulls);
387+
pending_nulls = 0;
388+
pending_empties += 1;
369389
} else {
370-
write_non_null_slice(child, start_idx, end_idx)
390+
write_null_run(child, pending_nulls);
391+
pending_nulls = 0;
392+
write_empty_run(child, pending_empties);
393+
pending_empties = 0;
394+
write_non_null_slice(child, start_idx, end_idx);
371395
}
372396
}
397+
write_null_run(child, pending_nulls);
398+
write_empty_run(child, pending_empties);
373399
}
374400
None => {
401+
let mut pending_empties: usize = 0;
375402
for w in offsets.windows(2) {
376403
let start_idx = w[0].as_usize();
377404
let end_idx = w[1].as_usize();
378405
if start_idx == end_idx {
379-
write_empty_slice(child)
406+
pending_empties += 1;
380407
} else {
381-
write_non_null_slice(child, start_idx, end_idx)
408+
write_empty_run(child, pending_empties);
409+
pending_empties = 0;
410+
write_non_null_slice(child, start_idx, end_idx);
382411
}
383412
}
413+
write_empty_run(child, pending_empties);
384414
}
385415
}
386416
}

0 commit comments

Comments
 (0)