Source code for mpu.aws

"""Convenience functions for AWS interactions."""

# Core Library
import enum
import os
from collections import namedtuple
from tempfile import mkstemp
from typing import List, Optional

# Third party
import boto3.session


[docs]def list_files( bucket: str, prefix: str = "", profile_name: Optional[str] = None ) -> List[str]: """ List up to 1000 files in a bucket. Parameters ---------- bucket : str prefix : str profile_name : str, optional AWS profile Returns ------- s3_paths : List[str] """ session = boto3.session.Session(profile_name=profile_name) conn = session.client("s3") keys = [] ret = conn.list_objects_v2(Bucket=bucket, Prefix=prefix) if "Contents" not in ret: return [] # Make this a generator in future and use the marker: # https://boto3.readthedocs.io/en/latest/reference/services/ # s3.html#S3.Client.list_objects for key in conn.list_objects_v2(Bucket=bucket, Prefix=prefix)["Contents"]: keys.append("s3://" + bucket + "/" + key["Key"]) return keys
[docs]def s3_read(source: str, profile_name: Optional[str] = None) -> bytes: """ Read a file from an S3 source. Parameters ---------- source : str Path starting with s3://, e.g. 's3://bucket-name/key/foo.bar' profile_name : str, optional AWS profile Returns ------- content : bytes Raises ------ botocore.exceptions.NoCredentialsError Botocore is not able to find your credentials. Either specify profile_name or add the environment variables AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN. See https://boto3.readthedocs.io/en/latest/guide/configuration.html """ session = boto3.session.Session(profile_name=profile_name) s3 = session.client("s3") bucket_name, key = _s3_path_split(source) s3_object = s3.get_object(Bucket=bucket_name, Key=key) body = s3_object["Body"] return body.read()
[docs]class ExistsStrategy(enum.Enum): """Strategies what to do when a file already exists.""" RAISE = "raise" REPLACE = "replace" ABORT = "abort"
[docs]def s3_download( source: str, destination: Optional[str] = None, exists_strategy: ExistsStrategy = ExistsStrategy.RAISE, profile_name: Optional[str] = None, ) -> Optional[str]: """ Copy a file from an S3 source to a local destination. Parameters ---------- source : str Path starting with s3://, e.g. 's3://bucket-name/key/foo.bar' destination : str, optional If none is given, a temporary file is created exists_strategy : {'raise', 'replace', 'abort'} What is done when the destination already exists? * `ExistsStrategy.RAISE` means a RuntimeError is raised, * `ExistsStrategy.REPLACE` means the local file is replaced, * `ExistsStrategy.ABORT` means the download is not done. profile_name : str, optional AWS profile Returns ------- download_path : Optional[str] Path of the downloaded file, if any was downloaded. Raises ------ botocore.exceptions.NoCredentialsError Botocore is not able to find your credentials. Either specify profile_name or add the environment variables AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN. See https://boto3.readthedocs.io/en/latest/guide/configuration.html """ if not isinstance(exists_strategy, ExistsStrategy): raise ValueError( f"exists_strategy '{exists_strategy}' is not in {ExistsStrategy}" ) session = boto3.session.Session(profile_name=profile_name) s3 = session.resource("s3") bucket_name, key = _s3_path_split(source) if destination is None: _, filename = os.path.split(source) prefix, suffix = os.path.splitext(filename) _, destination = mkstemp(prefix=prefix, suffix=suffix) elif os.path.isfile(destination): if exists_strategy is ExistsStrategy.RAISE: raise RuntimeError(f"File '{destination}' already exists.") elif exists_strategy is ExistsStrategy.ABORT: return None s3.Bucket(bucket_name).download_file(key, destination) return destination
[docs]def s3_upload( source: str, destination: str, profile_name: Optional[str] = None ) -> None: """ Copy a file from a local source to an S3 destination. Parameters ---------- source : str destination : str Path starting with s3://, e.g. 's3://bucket-name/key/foo.bar' profile_name : str, optional AWS profile """ session = boto3.session.Session(profile_name=profile_name) s3 = session.resource("s3") bucket_name, key = _s3_path_split(destination) with open(source, "rb") as data: s3.Bucket(bucket_name).put_object(Key=key, Body=data)
S3Path = namedtuple("S3Path", ["bucket_name", "key"]) def _s3_path_split(s3_path: str) -> S3Path: """ Split an S3 path into bucket and key. Parameters ---------- s3_path : str Returns ------- splitted : S3Path Examples -------- >>> _s3_path_split('s3://my-bucket/foo/bar.jpg') S3Path(bucket_name='my-bucket', key='foo/bar.jpg') """ if not s3_path.startswith("s3://"): raise ValueError( f"s3_path is expected to start with 's3://', but was {s3_path}" ) bucket_key = s3_path[len("s3://") :] bucket_name, key = bucket_key.split("/", 1) return S3Path(bucket_name, key)