-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(parquet): support object versions in ParquetObjectReader #9753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,10 @@ use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch}; | |
| use crate::errors::{ParquetError, Result}; | ||
| use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; | ||
| use bytes::Bytes; | ||
| use futures::{FutureExt, TryFutureExt, future::BoxFuture}; | ||
| use futures::{ | ||
| FutureExt, TryFutureExt, | ||
| future::{BoxFuture, try_join_all}, | ||
| }; | ||
| use object_store::ObjectStoreExt; | ||
| use object_store::{GetOptions, GetRange}; | ||
| use object_store::{ObjectStore, path::Path}; | ||
|
|
@@ -55,6 +58,7 @@ use tokio::runtime::Handle; | |
| pub struct ParquetObjectReader { | ||
| store: Arc<dyn ObjectStore>, | ||
| path: Path, | ||
| version: Option<String>, | ||
| file_size: Option<u64>, | ||
| metadata_size_hint: Option<usize>, | ||
| preload_column_index: bool, | ||
|
|
@@ -68,6 +72,7 @@ impl ParquetObjectReader { | |
| Self { | ||
| store, | ||
| path, | ||
| version: None, | ||
| file_size: None, | ||
| metadata_size_hint: None, | ||
| preload_column_index: false, | ||
|
|
@@ -101,6 +106,14 @@ impl ParquetObjectReader { | |
| } | ||
| } | ||
|
|
||
| /// Request a specific object version from the underlying [`ObjectStore`]. | ||
| pub fn with_version(self, version: impl Into<String>) -> Self { | ||
| Self { | ||
| version: Some(version.into()), | ||
| ..self | ||
| } | ||
| } | ||
|
|
||
| /// Whether to load the Column Index as part of [`Self::get_metadata`] | ||
| /// | ||
| /// Note: This setting may be overridden by [`ArrowReaderOptions`] `page_index_policy`. | ||
|
|
@@ -166,14 +179,17 @@ impl ParquetObjectReader { | |
| None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(), | ||
| } | ||
| } | ||
|
|
||
| fn get_opts(&self, range: Option<GetRange>) -> GetOptions { | ||
| GetOptions::new() | ||
| .with_range(range) | ||
| .with_version(self.version.clone()) | ||
| } | ||
| } | ||
|
|
||
| impl MetadataSuffixFetch for &mut ParquetObjectReader { | ||
| fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> { | ||
| let options = GetOptions { | ||
| range: Some(GetRange::Suffix(suffix as u64)), | ||
| ..Default::default() | ||
| }; | ||
| let options = self.get_opts(Some(GetRange::Suffix(suffix as u64))); | ||
| self.spawn(|store, path| { | ||
| async move { | ||
| let resp = store.get_opts(path, options).await?; | ||
|
|
@@ -186,14 +202,42 @@ impl MetadataSuffixFetch for &mut ParquetObjectReader { | |
|
|
||
| impl AsyncFileReader for ParquetObjectReader { | ||
| fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> { | ||
| self.spawn(|store, path| store.get_range(path, range).boxed()) | ||
| if self.version.is_some() { | ||
| let options = self.get_opts(Some(GetRange::from(range))); | ||
| self.spawn(|store, path| { | ||
| async move { | ||
| let resp = store.get_opts(path, options).await?; | ||
| Ok::<_, ParquetError>(resp.bytes().await?) | ||
| } | ||
| .boxed() | ||
|
Comment on lines
+207
to
+212
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need the async closure here? Can we simplify this to something like this store.get_opts(path, options).await.boxed() |
||
| }) | ||
| } else { | ||
| self.spawn(|store, path| store.get_range(path, range).boxed()) | ||
| } | ||
| } | ||
|
|
||
| fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> | ||
| where | ||
| Self: Send, | ||
| { | ||
| self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) | ||
| if self.version.is_some() { | ||
| let options = ranges | ||
| .into_iter() | ||
| .map(|range| self.get_opts(Some(GetRange::from(range)))) | ||
| .collect::<Vec<_>>(); | ||
| self.spawn(|store, path| { | ||
| async move { | ||
| try_join_all(options.into_iter().map(|options| async move { | ||
| let resp = store.get_opts(path, options).await?; | ||
| Ok::<_, ParquetError>(resp.bytes().await?) | ||
| })) | ||
| .await | ||
| } | ||
| .boxed() | ||
| }) | ||
| } else { | ||
| self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) | ||
| } | ||
| } | ||
|
|
||
| // This method doesn't directly call `self.spawn` because all of the IO that is done down the | ||
|
|
@@ -257,9 +301,12 @@ mod tests { | |
|
|
||
| use futures::TryStreamExt; | ||
|
|
||
| use crate::arrow::ParquetRecordBatchStreamBuilder; | ||
| use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; | ||
| use crate::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; | ||
| use crate::errors::ParquetError; | ||
| use arrow::array::{ArrayRef, Int32Array}; | ||
| use arrow::datatypes::{DataType, Field, Schema}; | ||
| use arrow::record_batch::RecordBatch; | ||
| use arrow::util::test_util::parquet_test_data; | ||
| use futures::FutureExt; | ||
| use object_store::local::LocalFileSystem; | ||
|
|
@@ -278,6 +325,32 @@ mod tests { | |
| (meta, Arc::new(store) as Arc<dyn ObjectStore>) | ||
| } | ||
|
|
||
| async fn get_generated_meta_store() -> (tempfile::TempDir, ObjectMeta, Arc<dyn ObjectStore>) { | ||
| let dir = tempfile::tempdir().unwrap(); | ||
| let path = dir.path().join("generated.parquet"); | ||
|
|
||
| let schema = Arc::new(Schema::new(vec![Field::new( | ||
| "value", | ||
| DataType::Int32, | ||
| true, | ||
| )])); | ||
| let batch = RecordBatch::try_new( | ||
| Arc::clone(&schema), | ||
| vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as ArrayRef], | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let file = std::fs::File::create(&path).unwrap(); | ||
| let mut writer = ArrowWriter::try_new(file, schema, None).unwrap(); | ||
| writer.write(&batch).unwrap(); | ||
| writer.close().unwrap(); | ||
|
|
||
| let store = LocalFileSystem::new_with_prefix(dir.path()).unwrap(); | ||
| let meta = store.head(&Path::from("generated.parquet")).await.unwrap(); | ||
|
|
||
| (dir, meta, Arc::new(store) as Arc<dyn ObjectStore>) | ||
| } | ||
|
|
||
| async fn get_meta_store_with_page_index() -> (ObjectMeta, Arc<dyn ObjectStore>) { | ||
| let res = parquet_test_data(); | ||
| let store = LocalFileSystem::new_with_prefix(res).unwrap(); | ||
|
|
@@ -319,6 +392,21 @@ mod tests { | |
| assert_eq!(batches[0].num_rows(), 8); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_simple_with_version() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do these very the version is passed through? Do they fail if you revert the code changes? |
||
| let (_dir, meta, store) = get_generated_meta_store().await; | ||
|
|
||
| let object_reader = ParquetObjectReader::new(store, meta.location).with_version("v1"); | ||
|
|
||
| let builder = ParquetRecordBatchStreamBuilder::new(object_reader) | ||
| .await | ||
| .unwrap(); | ||
| let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap(); | ||
|
|
||
| assert_eq!(batches.len(), 1); | ||
| assert_eq!(batches[0].num_rows(), 3); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_not_found() { | ||
| let (mut meta, store) = get_meta_store().await; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am missing something but the version is not passed to the options... So how does it make it into the request?
I also think it would be easier to read this code if you use the same async closure and just changed the ptions