diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index d47ca744d8f6..2979dfdaa210 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -22,7 +22,10 @@ use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; use bytes::Bytes; -use futures::{FutureExt, TryFutureExt, future::BoxFuture}; +use futures::{ + FutureExt, TryFutureExt, + future::{BoxFuture, try_join_all}, +}; use object_store::ObjectStoreExt; use object_store::{GetOptions, GetRange}; use object_store::{ObjectStore, path::Path}; @@ -55,6 +58,7 @@ use tokio::runtime::Handle; pub struct ParquetObjectReader { store: Arc, path: Path, + version: Option, file_size: Option, metadata_size_hint: Option, preload_column_index: bool, @@ -68,6 +72,7 @@ impl ParquetObjectReader { Self { store, path, + version: None, file_size: None, metadata_size_hint: None, preload_column_index: false, @@ -101,6 +106,14 @@ impl ParquetObjectReader { } } + /// Request a specific object version from the underlying [`ObjectStore`]. + pub fn with_version(self, version: impl Into) -> Self { + Self { + version: Some(version.into()), + ..self + } + } + /// Whether to load the Column Index as part of [`Self::get_metadata`] /// /// Note: This setting may be overridden by [`ArrowReaderOptions`] `page_index_policy`. @@ -166,14 +179,17 @@ impl ParquetObjectReader { None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(), } } + + fn get_opts(&self, range: Option) -> GetOptions { + GetOptions::new() + .with_range(range) + .with_version(self.version.clone()) + } } impl MetadataSuffixFetch for &mut ParquetObjectReader { fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result> { - let options = GetOptions { - range: Some(GetRange::Suffix(suffix as u64)), - ..Default::default() - }; + let options = self.get_opts(Some(GetRange::Suffix(suffix as u64))); self.spawn(|store, path| { async move { let resp = store.get_opts(path, options).await?; @@ -186,14 +202,42 @@ impl MetadataSuffixFetch for &mut ParquetObjectReader { impl AsyncFileReader for ParquetObjectReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.spawn(|store, path| store.get_range(path, range).boxed()) + if self.version.is_some() { + let options = self.get_opts(Some(GetRange::from(range))); + self.spawn(|store, path| { + async move { + let resp = store.get_opts(path, options).await?; + Ok::<_, ParquetError>(resp.bytes().await?) + } + .boxed() + }) + } else { + self.spawn(|store, path| store.get_range(path, range).boxed()) + } } fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> where Self: Send, { - self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) + if self.version.is_some() { + let options = ranges + .into_iter() + .map(|range| self.get_opts(Some(GetRange::from(range)))) + .collect::>(); + self.spawn(|store, path| { + async move { + try_join_all(options.into_iter().map(|options| async move { + let resp = store.get_opts(path, options).await?; + Ok::<_, ParquetError>(resp.bytes().await?) + })) + .await + } + .boxed() + }) + } else { + self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) + } } // This method doesn't directly call `self.spawn` because all of the IO that is done down the @@ -257,9 +301,12 @@ mod tests { use futures::TryStreamExt; - use crate::arrow::ParquetRecordBatchStreamBuilder; use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; + use crate::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; use crate::errors::ParquetError; + use arrow::array::{ArrayRef, Int32Array}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; use arrow::util::test_util::parquet_test_data; use futures::FutureExt; use object_store::local::LocalFileSystem; @@ -278,6 +325,32 @@ mod tests { (meta, Arc::new(store) as Arc) } + async fn get_generated_meta_store() -> (tempfile::TempDir, ObjectMeta, Arc) { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("generated.parquet"); + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int32, + true, + )])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as ArrayRef], + ) + .unwrap(); + + let file = std::fs::File::create(&path).unwrap(); + let mut writer = ArrowWriter::try_new(file, schema, None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let store = LocalFileSystem::new_with_prefix(dir.path()).unwrap(); + let meta = store.head(&Path::from("generated.parquet")).await.unwrap(); + + (dir, meta, Arc::new(store) as Arc) + } + async fn get_meta_store_with_page_index() -> (ObjectMeta, Arc) { let res = parquet_test_data(); let store = LocalFileSystem::new_with_prefix(res).unwrap(); @@ -319,6 +392,21 @@ mod tests { assert_eq!(batches[0].num_rows(), 8); } + #[tokio::test] + async fn test_simple_with_version() { + let (_dir, meta, store) = get_generated_meta_store().await; + + let object_reader = ParquetObjectReader::new(store, meta.location).with_version("v1"); + + let builder = ParquetRecordBatchStreamBuilder::new(object_reader) + .await + .unwrap(); + let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 3); + } + #[tokio::test] async fn test_not_found() { let (mut meta, store) = get_meta_store().await;