-
Notifications
You must be signed in to change notification settings - Fork 37
S3 protocol #895
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
S3 protocol #895
Changes from 30 commits
5ca8417
d1cbeeb
f66ff0a
fb4e2ca
73f6faa
c6c71dd
89742f6
6c2548b
607514b
e846991
f2dbc7c
9409a2e
63e0bad
55fe59b
c5cce8e
8789a53
107f0d9
d653133
7386f90
d9aa5b6
c306023
f8d2bee
e820701
3770688
c0a343f
1c2c452
514002b
4320190
f7e56af
3864a43
29545d3
ee135f1
4bb5ddb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -10,9 +10,10 @@ | |||||||||||||||||||||||||||||||||||||||||||
| import zstandard | ||||||||||||||||||||||||||||||||||||||||||||
| import lz4.frame as lz4 | ||||||||||||||||||||||||||||||||||||||||||||
| from ast import literal_eval | ||||||||||||||||||||||||||||||||||||||||||||
| from io import BytesIO | ||||||||||||||||||||||||||||||||||||||||||||
| from botocore.exceptions import ClientError | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| import strax | ||||||||||||||||||||||||||||||||||||||||||||
| from strax import RUN_METADATA_PATTERN | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| export, __all__ = strax.exporter() | ||||||||||||||||||||||||||||||||||||||||||||
| __all__.extend(["DECOMPRESS_BUFFER_SIZE"]) | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -80,22 +81,53 @@ def _lz4_decompress(f, buffer_size=DECOMPRESS_BUFFER_SIZE): | |||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| @export | ||||||||||||||||||||||||||||||||||||||||||||
| def load_file(f, compressor, dtype): | ||||||||||||||||||||||||||||||||||||||||||||
| """Read and return data from file. | ||||||||||||||||||||||||||||||||||||||||||||
| def load_file(f, compressor, dtype, bucket_name=None, is_s3_path=False): | ||||||||||||||||||||||||||||||||||||||||||||
| """Read and return data from file or S3. | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| :param f: file name or handle to read from | ||||||||||||||||||||||||||||||||||||||||||||
| :param compressor: compressor to use for decompressing. If not passed, will try to load it from | ||||||||||||||||||||||||||||||||||||||||||||
| json metadata file. | ||||||||||||||||||||||||||||||||||||||||||||
| :param dtype: numpy dtype of data to load | ||||||||||||||||||||||||||||||||||||||||||||
| :param is_s3_path: Boolean indicating if the file is stored in S3. | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||
| if isinstance(f, str): | ||||||||||||||||||||||||||||||||||||||||||||
| with open(f, mode="rb") as write_file: | ||||||||||||||||||||||||||||||||||||||||||||
| return _load_file(write_file, compressor, dtype) | ||||||||||||||||||||||||||||||||||||||||||||
| if is_s3_path: | ||||||||||||||||||||||||||||||||||||||||||||
| # Read from S3 | ||||||||||||||||||||||||||||||||||||||||||||
| return load_file_from_s3(f, compressor, dtype, bucket_name) | ||||||||||||||||||||||||||||||||||||||||||||
| elif isinstance(f, str): | ||||||||||||||||||||||||||||||||||||||||||||
| # Read from local file | ||||||||||||||||||||||||||||||||||||||||||||
| with open(f, mode="rb") as read_file: | ||||||||||||||||||||||||||||||||||||||||||||
| return _load_file(read_file, compressor, dtype) | ||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||
| # If f is already a file-like object, just use it | ||||||||||||||||||||||||||||||||||||||||||||
| return _load_file(f, compressor, dtype) | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| def load_file_from_s3(f, compressor, dtype, bucket_name): | ||||||||||||||||||||||||||||||||||||||||||||
| """Helper function to load data from S3. | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| Confirm file exists, then try to load and decompress it. | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||
| s3 = strax.S3Frontend().s3 | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||
| # data = COMPRESSORS[compressor]["_decompress"](f) | ||||||||||||||||||||||||||||||||||||||||||||
| # if not len(data): | ||||||||||||||||||||||||||||||||||||||||||||
| # return np.zeros(0, dtype=dtype) | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| # Retrieve the file from S3 and load into a BytesIO buffer | ||||||||||||||||||||||||||||||||||||||||||||
| response = s3.get_object(Bucket=bucket_name, Key=f) | ||||||||||||||||||||||||||||||||||||||||||||
| file_data = response["Body"].read() # Read the content of the file from S3 | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| # Create a file-like object from the binary data | ||||||||||||||||||||||||||||||||||||||||||||
| file_buffer = BytesIO(file_data) | ||||||||||||||||||||||||||||||||||||||||||||
| return _load_file(file_buffer, compressor, dtype) | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| except ClientError as e: | ||||||||||||||||||||||||||||||||||||||||||||
| raise RuntimeError(f"Failed to load {f} from bucket {bucket_name}: {e}") | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| def _load_file(f, compressor, dtype): | ||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||
| data = COMPRESSORS[compressor]["_decompress"](f) | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -105,6 +137,8 @@ def _load_file(f, compressor, dtype): | |||||||||||||||||||||||||||||||||||||||||||
| return np.frombuffer(data, dtype=dtype) | ||||||||||||||||||||||||||||||||||||||||||||
| except ValueError as e: | ||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError(f"ValueError while loading data with dtype =\n\t{dtype}") from e | ||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | |
| except (ValueError, KeyError) as e: |
Copilot
AI
Jun 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Inconsistent variable naming between s3_client and s3_interface can be confusing; pick one name for clarity.
| s3_interface = s3_client | |
| # Copy temp file to final file | |
| result = _save_file_to_s3(s3_interface, temp_fn, data, Bucket, compressor) | |
| s3_interface.copy_object( | |
| Bucket=Bucket, | |
| Key=final_fn, | |
| CopySource={"Bucket": Bucket, "Key": temp_fn}, | |
| ) | |
| # Delete the temporary file | |
| s3_interface.delete_object(Bucket=Bucket, Key=temp_fn) | |
| # Copy temp file to final file | |
| result = _save_file_to_s3(s3_client, temp_fn, data, Bucket, compressor) | |
| s3_client.copy_object( | |
| Bucket=Bucket, | |
| Key=final_fn, | |
| CopySource={"Bucket": Bucket, "Key": temp_fn}, | |
| ) | |
| # Delete the temporary file | |
| s3_client.delete_object(Bucket=Bucket, Key=temp_fn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Reading the entire S3 object into memory with
read()may not scale for large files. Consider streaming decompression or chunked reads to reduce memory usage.