Skip to content

S3 Integration

S3-compatible object storage integration utilities.

s3

Utilities for interacting with S3-compatible storage.

Classes

S3Object

Bases: TypedDict

Source code in diracx-core/src/diracx/core/s3.py
class S3Object(TypedDict):
    Key: str
Attributes
Key instance-attribute

S3PresignedPostInfo

Bases: TypedDict

Source code in diracx-core/src/diracx/core/s3.py
class S3PresignedPostInfo(TypedDict):
    url: str
    fields: dict[str, str]
Attributes
url instance-attribute
fields instance-attribute

Functions

s3_bucket_exists(s3_client, bucket_name) async

Check if a bucket exists in S3.

Source code in diracx-core/src/diracx/core/s3.py
async def s3_bucket_exists(s3_client: S3Client, bucket_name: str) -> bool:
    """Check if a bucket exists in S3."""
    return await _s3_exists(s3_client.head_bucket, Bucket=bucket_name)

s3_object_exists(s3_client, bucket_name, key) async

Check if an object exists in an S3 bucket.

Source code in diracx-core/src/diracx/core/s3.py
async def s3_object_exists(s3_client: S3Client, bucket_name: str, key: str) -> bool:
    """Check if an object exists in an S3 bucket."""
    return await _s3_exists(s3_client.head_object, Bucket=bucket_name, Key=key)

generate_presigned_upload(s3_client, bucket_name, key, checksum_algorithm, checksum, size, validity_seconds) async

Generate a presigned URL and fields for uploading a file to S3.

The signature is restricted to only accept data with the given checksum and size.

Source code in diracx-core/src/diracx/core/s3.py
async def generate_presigned_upload(
    s3_client: S3Client,
    bucket_name: str,
    key: str,
    checksum_algorithm: ChecksumAlgorithm,
    checksum: str,
    size: int,
    validity_seconds: int,
) -> S3PresignedPostInfo:
    """Generate a presigned URL and fields for uploading a file to S3.

    The signature is restricted to only accept data with the given checksum and size.
    """
    fields = {
        "x-amz-checksum-algorithm": checksum_algorithm,
        f"x-amz-checksum-{checksum_algorithm}": b16_to_b64(checksum),
    }
    conditions = [["content-length-range", size, size]] + [
        {k: v} for k, v in fields.items()
    ]
    result = await s3_client.generate_presigned_post(
        Bucket=bucket_name,
        Key=key,
        Fields=fields,
        Conditions=conditions,
        ExpiresIn=validity_seconds,
    )
    return cast(S3PresignedPostInfo, result)

b16_to_b64(hex_string)

Convert hexadecimal encoded data to base64 encoded data.

Source code in diracx-core/src/diracx/core/s3.py
def b16_to_b64(hex_string: str) -> str:
    """Convert hexadecimal encoded data to base64 encoded data."""
    return base64.b64encode(base64.b16decode(hex_string.upper())).decode()

s3_bulk_delete_with_retry(s3_client, bucket, objects) async

Source code in diracx-core/src/diracx/core/s3.py
async def s3_bulk_delete_with_retry(
    s3_client, bucket: str, objects: list[S3Object]
) -> None:
    max_attempts = 5
    delay = 1.0
    for attempt in range(1, max_attempts + 1):
        try:
            response = await s3_client.delete_objects(
                Bucket=bucket,
                Delete={"Objects": objects, "Quiet": True},
            )
            if "Errors" in response and response["Errors"]:
                raise RuntimeError(f"S3 deletion error: {response['Errors']}")
            return
        except (ClientError, RuntimeError) as e:
            if attempt == max_attempts:
                raise RuntimeError(f"Failed to delete objects in {bucket}") from e
            await asyncio.sleep(delay)
            delay *= 2