hip_data_tools.aws package

Submodules

hip_data_tools.aws.athena module

Utility to connect to, and perform DML and DDL operations on aws Athena

class hip_data_tools.aws.athena.AthenaUtil(database: str, conn: hip_data_tools.aws.common.AwsConnectionManager, output_key: str = None, output_bucket: str = None, work_group: str = 'primary')

Bases: hip_data_tools.aws.common.AwsUtil

Utility class for connecting to athena and manipulate data in a pythonic way

Parameters:
  • database (string) – the athena database to run queries on
  • conn (AwsConnection) – AwsConnection object
  • output_key (string) – the s3 key where the results of athena queries will be stored
  • output_bucket (string) – the s3 bucket where the results of athena queries will be stored
  • work_group (string) – Athena workGroup name
add_partitions(table, partition_keys, partition_values)

Add a new partition to a given table :param table: name of the table to which a new partition is added :type table: string :param partition_keys: an array of the keys/partition columns :type partition_keys: list :param partition_values: an array of values for partitions :type partition_values: list

Returns: None

create_table(table_settings)

Create a table from given settings :param table_settings: Dictionary of settings to create table :type table_settings: dict

Returns: None

drop_table(table_name)

Drop a given athena table

Parameters:table_name (string) – name of the table to be dropped

Returns: None

get_glue_table_metadata(table: str) → dict

Get table metadata from glue :param table: Athena table name :type table: str

Returns: A dict of table metadata

get_table_columns(table: str) → Tuple[List[dict], List[dict]]

Retrieves the table’s columns using glue meta store :param table: name of the table :type table: str

Returns: tuple of regular and partition column dictonaries

get_table_data_location(table: str) → tuple

Retrieves the table’s S3 data location using glue meta store :param table: name of the table :type table: str

Returns: tuple of s3 bucket and key

get_table_ddl(table)

Retrive the table DDL in string :param table: name of the table for which ddl needs to be generated :type table: string

Returns: string containing the athena table DDL

repair_table_partitions(table)

Runs repair on given table :param table: name of the table whose partitions need to be scanned and refilled :type table: string

Returns: None

run_query(query_string, return_result=False)

General purpose query executor that submits an athena query, then uses the execution id to poll and monitor the sucess of the query. and optionally return the result. :param query_string: The string contianing valid athena query :type query_string: string :param return_result: Boolean flag to turn on results :type return_result: boolean

Returns (boolean): if return_result = True then returns result dictionary, else None

watch_query(execution_id, poll_frequency=10)

Watch the query execution for a given execution id in Athena :param execution_id: the execution id of an Athena Auery :param poll_frequency: Freq in seconds to poll for the query status using Athen API :type poll_frequency: int

Returns: dictionary of status from Athena

hip_data_tools.aws.athena.extract_athena_type_from_value(val: Any) → str

Extracts athena compatible datatype from a given python object :param val: Any value from primitive python types :type val: Any

Returns: str

hip_data_tools.aws.athena.generate_csv_ctas(select_query, destination_table, destination_bucket, destination_key, partition_fields='')

Method to generate a CTAS query string for creating csv output

Parameters:
  • select_query (string) – the query to be used for table generation
  • destination_table (string) – name of the new table being created
  • destination_bucket (string) – the s3 bucket where the data from select query will be stored
  • destination_key (string) – the s3 directory where the data from select query will be stored
  • partition_fields (string) – partition field names

Returns (string): CTAS Query in a string

hip_data_tools.aws.athena.generate_parquet_ctas(select_query, destination_table, destination_bucket, destination_key, partition_fields='')

Method to generate a CTAS query string for creating parquet output

Parameters:
  • select_query (string) – the query to be used for table generation
  • destination_table (string) – name of the new table being created
  • destination_bucket (string) – the s3 bucket where the data from select query will be stored
  • destination_key (string) – the s3 directory where the data from select query will be stored
  • partition_fields (string) – partition field names

Returns (string): CTAS Query in a string

hip_data_tools.aws.athena.get_athena_columns_from_dataframe(data_frame: pandas.core.frame.DataFrame) → List[dict]

Extracts a dictionary of column names and their athena data types from the dataframe :param data_frame: the dataframe which the columns need to be extracted :type data_frame: DataFrame

Returns: list of dict

hip_data_tools.aws.athena.get_partitions_from_partitions_dict(partitions: dict)

Get the Athena table settings partitions list of dictionary :param partitions: dictionary of partition column name and value :type partitions: dict

Returns:List of dictionary
hip_data_tools.aws.athena.get_table_settings_for_dataframe(dataframe: pandas.core.frame.DataFrame, partitions: dict, s3_bucket: str, s3_dir: str, table: str) → dict

Get the Athena table settings :param dataframe: data frame with column types and names :type dataframe: DataFrame :param partitions: dictionary of partition column name and value :type partitions: dict :param s3_bucket: S3 bucket name to store Athena table’s data :type s3_bucket: str :param s3_dir: S3 directory to store Athena table’s data :type s3_dir: str :param table: Name of the table :type table: str

Returns:table settings
hip_data_tools.aws.athena.zip_columns(column_list)

Combine the column list into a zipped comma separated list of column name and data type :param column_list: an array of dictionaries with keys column and type :type column_list: list

Returns (string): a string containing comma separated list of column name and data type

hip_data_tools.aws.common module

class hip_data_tools.aws.common.AwsConnectionManager(settings: hip_data_tools.aws.common.AwsConnectionSettings)

Bases: object

utility class to connect to a database and perform some basic operations example - to connect using an aws cli profile >>> conn = AwsConnectionManager( … AwsConnectionSettings(region_name=”ap-southeast-2”, profile=”default”, secrets_manager=None))

# OR if you want to connect using the standard aws environment variables (aws_access_key_id, aws_secret_access_key): >>> conn = AwsConnectionManager(settings=AwsConnectionSettings(region_name=”ap-southeast-2”, profile=None, secrets_manager=AwsSecretsManager()))

# OR if you want custom set of env vars to connect >>> conn = AwsConnectionManager( … settings=AwsConnectionSettings( … region_name=”ap-southeast-2”, … secrets_manager=AwsSecretsManager( … access_key_id_var=”SOME_CUSTOM_AWS_ACCESS_KEY_ID”, … secret_access_key_var=”SOME_CUSTOM_AWS_SECRET_ACCESS_KEY”, … use_session_token=True, … aws_session_token_var=”SOME_CUSTOM_AWS_SESSION_TOKEN” … ), … profile=None, … ) … )

Parameters:settings (AwsConnectionSettings) – settings to use for connecting to aws
client(client_type)

Get a client for specific aws service :param client_type: choice of aws service like s3, athena, etc. based on boto3: :type client_type: string :param session.client: :type session.client: …

Returns (client): boto3 client

resource(resource_type)

Get a resource for specific aws service :param resource_type: choice of aws service like s3, athena, etc. based on boto3: :type resource_type: string :param session.client: :type session.client: …

Returns (resource): boto3 of type resource_type

class hip_data_tools.aws.common.AwsConnectionSettings(region: str, profile: Optional[str], secrets_manager: Optional[hip_data_tools.aws.common.AwsSecretsManager])

Bases: object

Encapsulates the Cassandra connection settings

class hip_data_tools.aws.common.AwsSecretsManager(source: hip_data_tools.common.KeyValueSource = <hip_data_tools.common.EnvironmentKeyValueSource object>, access_key_id_var: str = 'AWS_ACCESS_KEY_ID', secret_access_key_var: str = 'AWS_SECRET_ACCESS_KEY', use_session_token: bool = False, aws_session_token_var: str = 'AWS_SESSION_TOKEN')

Bases: hip_data_tools.common.SecretsManager

Parameters:
  • source (KeyValueSource) – a kv source that has secrets
  • access_key_id_var (str) – the variable name or the key for finding access_key_id
  • secret_access_key_var (str) – the variable name or the key for finding secret_access_key
  • use_session_token (bool) – flag to check the session token is required or not
  • aws_session_token_var (str) – the variable name or the key for finding aws_session_token
class hip_data_tools.aws.common.AwsUtil(conn: hip_data_tools.aws.common.AwsConnectionManager, boto_type: str)

Bases: abc.ABC

Common Aws class to use boto connection and resources :param conn: Connection to use for accessing aws resources :type conn: AwsConnectionManager :param boto_type: the type of boto client / resource to instantiate

get_client() → botocore.client.BaseClient

returns a boto client and creates one if not present Returns: BaseClient

get_resource() → Any

returns a boto resporce and creates one if not present Returns: Any

hip_data_tools.aws.common.dataclass(maybe_cls=None, these=None, repr_ns=None, repr=None, cmp=None, hash=None, init=None, slots=False, frozen=False, weakref_slot=True, str=False, *, auto_attribs=True, kw_only=False, cache_hash=False, auto_exc=False, eq=None, order=None, auto_detect=False, collect_by_mro=False, getstate_setstate=None, on_setattr=None, field_transformer=None)

A class decorator that adds dunder-methods according to the specified attributes using attr.ib or the these argument.

Parameters:
  • these (dict of str to attr.ib) –

    A dictionary of name to attr.ib mappings. This is useful to avoid the definition of your attributes within the class body because you can’t (e.g. if you want to add __repr__ methods to Django models) or don’t want to.

    If these is not None, attrs will not search the class body for attributes and will not remove any attributes from it.

    If these is an ordered dict (dict on Python 3.6+, collections.OrderedDict otherwise), the order is deduced from the order of the attributes inside these. Otherwise the order of the definition of the attributes is used.

  • repr_ns (str) – When using nested classes, there’s no way in Python 2 to automatically detect that. Therefore it’s possible to set the namespace explicitly for a more meaningful repr output.
  • auto_detect (bool) –

    Instead of setting the init, repr, eq, order, and hash arguments explicitly, assume they are set to True unless any of the involved methods for one of the arguments is implemented in the current class (i.e. it is not inherited from some base class).

    So for example by implementing __eq__ on a class yourself, attrs will deduce eq=False and won’t create neither __eq__ nor __ne__ (but Python classes come with a sensible __ne__ by default, so it should be enough to only implement __eq__ in most cases).

    Warning

    If you prevent attrs from creating the ordering methods for you (order=False, e.g. by implementing __le__), it becomes your responsibility to make sure its ordering is sound. The best way is to use the functools.total_ordering decorator.

    Passing True or False to init, repr, eq, order, cmp, or hash overrides whatever auto_detect would determine.

    auto_detect requires Python 3. Setting it True on Python 2 raises a PythonTooOldError.

  • repr (bool) – Create a __repr__ method with a human readable representation of attrs attributes..
  • str (bool) – Create a __str__ method that is identical to __repr__. This is usually not necessary except for Exceptions.
  • eq (Optional[bool]) –

    If True or None (default), add __eq__ and __ne__ methods that check two instances for equality.

    They compare the instances as if they were tuples of their attrs attributes if and only if the types of both classes are identical!

  • order (Optional[bool]) – If True, add __lt__, __le__, __gt__, and __ge__ methods that behave like eq above and allow instances to be ordered. If None (default) mirror value of eq.
  • cmp (Optional[bool]) – Setting to True is equivalent to setting eq=True, order=True. Deprecated in favor of eq and order, has precedence over them for backward-compatibility though. Must not be mixed with eq or order.
  • hash (Optional[bool]) –

    If None (default), the __hash__ method is generated according how eq and frozen are set.

    1. If both are True, attrs will generate a __hash__ for you.
    2. If eq is True and frozen is False, __hash__ will be set to None, marking it unhashable (which it is).
    3. If eq is False, __hash__ will be left untouched meaning the __hash__ method of the base class will be used (if base class is object, this means it will fall back to id-based hashing.).

    Although not recommended, you can decide for yourself and force attrs to create one (e.g. if the class is immutable even though you didn’t freeze it programmatically) by passing True or not. Both of these cases are rather special and should be used carefully.

    See our documentation on hashing, Python’s documentation on object.__hash__, and the GitHub issue that led to the default behavior for more details.

  • init (bool) – Create a __init__ method that initializes the attrs attributes. Leading underscores are stripped for the argument name. If a __attrs_post_init__ method exists on the class, it will be called after the class is fully initialized.
  • slots (bool) – Create a slotted class <slotted classes> that’s more memory-efficient. Slotted classes are generally superior to the default dict classes, but have some gotchas you should know about, so we encourage you to read the glossary entry <slotted classes>.
  • frozen (bool) –

    Make instances immutable after initialization. If someone attempts to modify a frozen instance, attr.exceptions.FrozenInstanceError is raised.

    Note

    1. This is achieved by installing a custom __setattr__ method on your class, so you can’t implement your own.
    2. True immutability is impossible in Python.
    3. This does have a minor a runtime performance impact <how-frozen> when initializing new instances. In other words: __init__ is slightly slower with frozen=True.
    4. If a class is frozen, you cannot modify self in __attrs_post_init__ or a self-written __init__. You can circumvent that limitation by using object.__setattr__(self, "attribute_name", value).
    5. Subclasses of a frozen class are frozen too.
  • weakref_slot (bool) – Make instances weak-referenceable. This has no effect unless slots is also enabled.
  • auto_attribs (bool) –

    If True, collect PEP 526-annotated attributes (Python 3.6 and later only) from the class body.

    In this case, you must annotate every field. If attrs encounters a field that is set to an attr.ib but lacks a type annotation, an attr.exceptions.UnannotatedAttributeError is raised. Use field_name: typing.Any = attr.ib(...) if you don’t want to set a type.

    If you assign a value to those attributes (e.g. x: int = 42), that value becomes the default value like if it were passed using attr.ib(default=42). Passing an instance of Factory also works as expected.

    Attributes annotated as typing.ClassVar, and attributes that are neither annotated nor set to an attr.ib are ignored.

  • kw_only (bool) – Make all attributes keyword-only (Python 3+) in the generated __init__ (if init is False, this parameter is ignored).
  • cache_hash (bool) – Ensure that the object’s hash code is computed only once and stored on the object. If this is set to True, hashing must be either explicitly or implicitly enabled for this class. If the hash code is cached, avoid any reassignments of fields involved in hash code computation or mutations of the objects those fields point to after object creation. If such changes occur, the behavior of the object’s hash code is undefined.
  • auto_exc (bool) –

    If the class subclasses BaseException (which implicitly includes any subclass of any exception), the following happens to behave like a well-behaved Python exceptions class:

    • the values for eq, order, and hash are ignored and the instances compare and hash by the instance’s ids (N.B. attrs will not remove existing implementations of __hash__ or the equality methods. It just won’t add own ones.),
    • all attributes that are either passed into __init__ or have a default value are additionally available as a tuple in the args attribute,
    • the value of str is ignored leaving __str__ to base classes.
  • collect_by_mro (bool) –

    Setting this to True fixes the way attrs collects attributes from base classes. The default behavior is incorrect in certain cases of multiple inheritance. It should be on by default but is kept off for backward-compatability.

    See issue #428 for more details.

  • getstate_setstate (Optional[bool]) –

    Note

    This is usually only interesting for slotted classes and you should probably just set auto_detect to True.

    If True, __getstate__ and __setstate__ are generated and attached to the class. This is necessary for slotted classes to be pickleable. If left None, it’s True by default for slotted classes and False for dict classes.

    If auto_detect is True, and getstate_setstate is left None, and either __getstate__ or __setstate__ is detected directly on the class (i.e. not inherited), it is set to False (this is usually what you want).

  • on_setattr

    A callable that is run whenever the user attempts to set an attribute (either by assignment like i.x = 42 or by using setattr like setattr(i, "x", 42)). It receives the same arguments as validators: the instance, the attribute that is being modified, and the new value.

    If no exception is raised, the attribute is set to the return value of the callable.

    If a list of callables is passed, they’re automatically wrapped in an attr.setters.pipe.

  • field_transformer (Optional[callable]) – A function that is called with the original class object and all fields right before attrs finalizes the class. You can use this, e.g., to automatically add converters or validators to fields based on their types. See transform-fields for more details.

New in version 16.0.0: slots

New in version 16.1.0: frozen

New in version 16.3.0: str

New in version 16.3.0: Support for __attrs_post_init__.

Changed in version 17.1.0: hash supports None as value which is also the default now.

New in version 17.3.0: auto_attribs

Changed in version 18.1.0: If these is passed, no attributes are deleted from the class body.

Changed in version 18.1.0: If these is ordered, the order is retained.

New in version 18.2.0: weakref_slot

Deprecated since version 18.2.0: __lt__, __le__, __gt__, and __ge__ now raise a DeprecationWarning if the classes compared are subclasses of each other. __eq and __ne__ never tried to compared subclasses to each other.

Changed in version 19.2.0: __lt__, __le__, __gt__, and __ge__ now do not consider subclasses comparable anymore.

New in version 18.2.0: kw_only

New in version 18.2.0: cache_hash

New in version 19.1.0: auto_exc

Deprecated since version 19.2.0: cmp Removal on or after 2021-06-01.

New in version 19.2.0: eq and order

New in version 20.1.0: auto_detect

New in version 20.1.0: collect_by_mro

New in version 20.1.0: getstate_setstate

New in version 20.1.0: on_setattr

New in version 20.3.0: field_transformer

hip_data_tools.aws.s3 module

Utility to connect to, and interact with the s3 file storage system

class hip_data_tools.aws.s3.S3Util(conn: hip_data_tools.aws.common.AwsConnectionManager, bucket: str)

Bases: hip_data_tools.aws.common.AwsUtil

Utility class for connecting to s3 and manipulate data in a python way :param conn: AwsConnection object or a boto.Session object :type conn: AwsConnection :param bucket: S3 bucket name where these operations will take place :type bucket: string

create_bucket() → None

Creates the s3 bucket Returns: None

delete_recursive(key_prefix: str) → None

Recursively delete all keys with given prefix from the named bucket :param key_prefix: Key prefix under which all files will be deleted :type key_prefix: str

Returns: NA

delete_recursive_match_suffix(key_prefix: str, suffix: str) → None

Recursively delete all keys with given key prefix and suffix from the bucket :param key_prefix: Key prefix under which all files will be deleted. :type key_prefix: str :param suffix: suffix of the subset of files in the given prefix directory to be deleted :type suffix: str

Returns: None

download_directory(source_key: str, file_suffix: str, local_directory: str) → None

Download an entire directory from s3 onto local file system :param source_key: key prefix of the directory to be downloaded from s3 :type source_key: str :param file_suffix: suffix to filter a subset under the source_key to be downloaded :type file_suffix: str :param local_directory: local absolute path to store all the files :type local_directory: str

Returns: None

download_file(local_file_path: str, key: str) → None

Downloads a file from a path on s3 to a local path on disk :param local_file_path: Absolute path on s3. :type local_file_path: str :param key: Absolute path to save file to local. :type key: str

Returns: None

download_json(key: str, encoding: str = 'utf-8') → dict

Read a file in json format from s3 :param key: location of the file to read :param encoding: the character encoding to use for encoding / decoding content

Returns: dict

download_object_and_deserialise(key: str, local_file_path: str = None)

Download a serialised object from S3 and deserialize :param key: Absolute path on s3 to the file :type key: str :param local_file_path: The deserialized object :type local_file_path: str

Returns: object

download_parquet_as_dataframe(key: str, engine: str = 'auto', columns: List[str] = None, **kwargs) → pandas.core.frame.DataFrame

Exports a datafame to a parquet file on s3 :param key: The absolute path on s3 to upload the file to :type key: str :param engine: parquet engine :type engine: str :param columns: list of columns default None to extrapolate from dataframe :type columns: lis[str]

Returns: DataFrame

download_strings(key: str, encoding: str = 'utf-8') → List[str]

Read lines from s3 files :param key: the key for the file which contains strings :param encoding: the character encoding to use for encoding / decoding content

Returns: List[str]

download_strings_from_directory(key_prefix: str, encoding: str = 'utf-8') → List[str]

Read lines from s3 files :param key_prefix: the key prefix under which all files will be read :param encoding: the character encoding to use for encoding / decoding content

Returns: List[str]

get_all_keys(key_prefix: str) → List[str]

Sense all keys under a given key prefix :param key_prefix: the key prefix under which all files will be sensed :type key_prefix: str

Returns: List[str]

get_keys(key_prefix: str) → List[str]

returns a list of all objects unser a given key prefix :param key_prefix: Key Prefix under which all objects are to be listed :type key_prefix: str

Returns: list[str]

get_keys_modified_in_range(key_prefix: str, start_date: <module 'arrow' from '/home/docs/checkouts/readthedocs.org/user_builds/hip-data-tools/envs/v1.53.0/lib/python3.7/site-packages/arrow-0.17.0-py3.7.egg/arrow/__init__.py'>, end_date: <module 'arrow' from '/home/docs/checkouts/readthedocs.org/user_builds/hip-data-tools/envs/v1.53.0/lib/python3.7/site-packages/arrow-0.17.0-py3.7.egg/arrow/__init__.py'>) → List[str]

Sense if there were any files changed or added in the given time period under the given key prefix and return a list of keys :param key_prefix: the key prefix under which all files will be sensed :param start_date: start of the duration in which the s3 objects were modified :param end_date: end of the duration in which the s3 objects were modified

Returns: List[str]

get_object_metadata(key_prefix: str) → List[T]

Get metadata for all objects under a key prefix :param key_prefix: the key prefix under which all files will be sensed

Returns: List[metadata]

move_recursive(source_dir: str, destination_dir: str, delete_after_copy: bool = True) → None

recursively move files on s3 to a new location on the same bucket :param source_dir: Source key prefix representing the directory to move :param destination_dir: destination key prefix :param delete_after_copy: removes the files from source after successful copy if set to true

Returns: None

read_lines_as_list(key_prefix: str, encoding: str = 'utf-8') → List[str]

Read lines from s3 files :param key_prefix: the key prefix under which all files will be read :type key_prefix: str :param encoding: the character encoding to use for encoding / decoding content

Returns: list[str] lines read from all files

rename_file(key: str, new_file_name: str) → None

Rename a file on s3 :param key: Current key of the file :param new_file_name: target file name

Returns: None

serialise_and_upload_object(obj: Any, key: str) → None

Serialise any object to disk, and then upload to S3 :param obj: Any serialisable object :type obj: Any :param key: The absolute path on s3 to upload the file to :type key: str

Returns: None

upload_binary_stream(stream: bytes, key: str) → None

Upload a binary stream of data as an s3 object’s body :param stream: the stream of bytes to be uploaded :param key: s3 key at which this stream is to be uploaded

Returns: None

upload_dataframe_as_parquet(dataframe: pandas.core.frame.DataFrame, key: str, file_name: str = 'data', **kwargs) → None

Exports a datafame to a parquet file on s3 :param dataframe: dataframe to export :type dataframe: DataFrame :param key: The path on s3 to upload the file to (excluding bucket name and file name) :type key: str :param file_name: the name of the file at destination :type file_name: str

Returns: None

upload_directory(source_directory: str, extension: str, target_key: str, overwrite: bool = True, rename: bool = True) → None

Upload a local file directory to s3 :param source_directory: Local source directory’s absolute path. :type source_directory: str :param extension: the file extension of files in that directory to be uploaded. :type extension: str :param target_key: Target location on the s3 bucket for files to be uploaded. :type target_key: str :param overwrite: overwrites files on s3 if set to true. :type overwrite: bool :param rename: renames the file when uploading to s3 if set to true. :type rename: bool

Returns: None

upload_file(local_file_path: str, key: str, remove_local: bool = True) → None

Uploads a file from local to s3 :param local_file_path: Absolute local path to the file to upload :type local_file_path: str :param key: Absolute path within the s3 buck to upload the file :type key: str :param remove_local: remove file from local fs after transfer :type remove_local: bool

Returns: None

upload_json(key: str, json_list: List[dict], encoding: str = 'utf-8') → None

Save the json/dict data structure onto s3 as a file without using temporary local files :param key: target key of the file on s3 :param json_list: a list of dictionaries that are saved as newline json in a file :param encoding: the character encoding to use for encoding / decoding content

Returns: None

Module contents