hip_data_tools.aws package¶
Submodules¶
hip_data_tools.aws.athena module¶
Utility to connect to, and perform DML and DDL operations on aws Athena
-
class
hip_data_tools.aws.athena.AthenaUtil(database: str, conn: hip_data_tools.aws.common.AwsConnectionManager, output_key: str = None, output_bucket: str = None)¶ Bases:
hip_data_tools.aws.common.AwsUtilUtility class for connecting to athena and manipulate data in a pythonic way
Parameters: - database (string) – the athena database to run queries on
- conn (AwsConnection) – AwsConnection object
- output_key (string) – the s3 key where the results of athena queries will be stored
- output_bucket (string) – the s3 bucket where the results of athena queries will be stored
-
add_partitions(table, partition_keys, partition_values)¶ Add a new partition to a given table :param table: name of the table to which a new partition is added :type table: string :param partition_keys: an array of the keys/partition columns :type partition_keys: list :param partition_values: an array of values for partitions :type partition_values: list
Returns: None
-
create_table(table_settings)¶ Create a table from given settings :param table_settings: Dictionary of settings to create table :type table_settings: dict
Returns: None
-
drop_table(table_name)¶ Drop a given athena table
Parameters: table_name (string) – name of the table to be dropped Returns: None
-
get_glue_table_metadata(table: str) → dict¶ Get table metadata from glue :param table: Athena table name :type table: str
Returns: A dict of table metadata
-
get_table_columns(table: str) → Tuple[List[dict], List[dict]]¶ Retrieves the table’s columns using glue meta store :param table: name of the table :type table: str
Returns: tuple of regular and partition column dictonaries
-
get_table_data_location(table: str) → tuple¶ Retrieves the table’s S3 data location using glue meta store :param table: name of the table :type table: str
Returns: tuple of s3 bucket and key
-
get_table_ddl(table)¶ Retrive the table DDL in string :param table: name of the table for which ddl needs to be generated :type table: string
Returns: string containing the athena table DDL
-
repair_table_partitions(table)¶ Runs repair on given table :param table: name of the table whose partitions need to be scanned and refilled :type table: string
Returns: None
-
run_query(query_string, return_result=False)¶ General purpose query executor that submits an athena query, then uses the execution id to poll and monitor the sucess of the query. and optionally return the result. :param query_string: The string contianing valid athena query :type query_string: string :param return_result: Boolean flag to turn on results :type return_result: boolean
Returns (boolean): if return_result = True then returns result dictionary, else None
-
watch_query(execution_id, poll_frequency=10)¶ Watch the query execution for a given execution id in Athena :param execution_id: the execution id of an Athena Auery :param poll_frequency: Freq in seconds to poll for the query status using Athen API :type poll_frequency: int
Returns: dictionary of status from Athena
-
hip_data_tools.aws.athena.extract_athena_type_from_value(val: Any) → str¶ Extracts athena compatible datatype from a given python object :param val: Any value from primitive python types :type val: Any
Returns: str
-
hip_data_tools.aws.athena.generate_csv_ctas(select_query, destination_table, destination_bucket, destination_key, partition_fields='')¶ Method to generate a CTAS query string for creating csv output
Parameters: - select_query (string) – the query to be used for table generation
- destination_table (string) – name of the new table being created
- destination_bucket (string) – the s3 bucket where the data from select query will be stored
- destination_key (string) – the s3 directory where the data from select query will be stored
- partition_fields (string) – partition field names
Returns (string): CTAS Query in a string
-
hip_data_tools.aws.athena.generate_parquet_ctas(select_query, destination_table, destination_bucket, destination_key, partition_fields='')¶ Method to generate a CTAS query string for creating parquet output
Parameters: - select_query (string) – the query to be used for table generation
- destination_table (string) – name of the new table being created
- destination_bucket (string) – the s3 bucket where the data from select query will be stored
- destination_key (string) – the s3 directory where the data from select query will be stored
- partition_fields (string) – partition field names
Returns (string): CTAS Query in a string
-
hip_data_tools.aws.athena.get_athena_columns_from_dataframe(data_frame: pandas.core.frame.DataFrame) → List[dict]¶ Extracts a dictionary of column names and their athena data types from the dataframe :param data_frame: the dataframe which the columns need to be extracted :type data_frame: DataFrame
Returns: list of dict
-
hip_data_tools.aws.athena.get_partitions_from_partitions_dict(partitions: dict)¶ Get the Athena table settings partitions list of dictionary :param partitions: dictionary of partition column name and value :type partitions: dict
Returns: List of dictionary
-
hip_data_tools.aws.athena.get_table_settings_for_dataframe(dataframe: pandas.core.frame.DataFrame, partitions: dict, s3_bucket: str, s3_dir: str, table: str) → dict¶ Get the Athena table settings :param dataframe: data frame with column types and names :type dataframe: DataFrame :param partitions: dictionary of partition column name and value :type partitions: dict :param s3_bucket: S3 bucket name to store Athena table’s data :type s3_bucket: str :param s3_dir: S3 directory to store Athena table’s data :type s3_dir: str :param table: Name of the table :type table: str
Returns: table settings
-
hip_data_tools.aws.athena.zip_columns(column_list)¶ Combine the column list into a zipped comma separated list of column name and data type :param column_list: an array of dictionaries with keys column and type :type column_list: list
Returns (string): a string containing comma separated list of column name and data type
hip_data_tools.aws.common module¶
-
class
hip_data_tools.aws.common.AwsConnectionManager(settings: hip_data_tools.aws.common.AwsConnectionSettings)¶ Bases:
objectutility class to connect to a database and perform some basic operations example - to connect using an aws cli profile >>> conn = AwsConnectionManager( … AwsConnectionSettings(region_name=”ap-southeast-2”, profile=”default”, secrets_manager=None))
# OR if you want to connect using the standard aws environment variables (aws_access_key_id, aws_secret_access_key): >>> conn = AwsConnectionManager(settings=AwsConnectionSettings(region_name=”ap-southeast-2”, profile=None, secrets_manager=AwsSecretsManager()))
# OR if you want custom set of env vars to connect >>> conn = AwsConnectionManager( … settings=AwsConnectionSettings( … region_name=”ap-southeast-2”, … secrets_manager=AwsSecretsManager( … access_key_id_var=”SOME_CUSTOM_AWS_ACCESS_KEY_ID”, … secret_access_key_var=”SOME_CUSTOM_AWS_SECRET_ACCESS_KEY”, … use_session_token=True, … aws_session_token_var=”SOME_CUSTOM_AWS_SESSION_TOKEN” … ), … profile=None, … ) … )
Parameters: settings (AwsConnectionSettings) – settings to use for connecting to aws -
client(client_type)¶ Get a client for specific aws service :param client_type: choice of aws service like s3, athena, etc. based on boto3: :type client_type: string :param session.client: :type session.client: …
Returns (client): boto3 client
-
resource(resource_type)¶ Get a resource for specific aws service :param resource_type: choice of aws service like s3, athena, etc. based on boto3: :type resource_type: string :param session.client: :type session.client: …
Returns (resource): boto3 of type resource_type
-
-
class
hip_data_tools.aws.common.AwsConnectionSettings(region: str, profile: Optional[str], secrets_manager: Optional[hip_data_tools.aws.common.AwsSecretsManager])¶ Bases:
objectEncapsulates the Cassandra connection settings
-
class
hip_data_tools.aws.common.AwsSecretsManager(source: hip_data_tools.common.KeyValueSource = <hip_data_tools.common.EnvironmentKeyValueSource object>, access_key_id_var: str = 'AWS_ACCESS_KEY_ID', secret_access_key_var: str = 'AWS_SECRET_ACCESS_KEY', use_session_token: bool = False, aws_session_token_var: str = 'AWS_SESSION_TOKEN')¶ Bases:
hip_data_tools.common.SecretsManagerParameters: - source (KeyValueSource) – a kv source that has secrets
- access_key_id_var (str) – the variable name or the key for finding access_key_id
- secret_access_key_var (str) – the variable name or the key for finding secret_access_key
- use_session_token (bool) – flag to check the session token is required or not
- aws_session_token_var (str) – the variable name or the key for finding aws_session_token
-
class
hip_data_tools.aws.common.AwsUtil(conn: hip_data_tools.aws.common.AwsConnectionManager, boto_type: str)¶ Bases:
abc.ABCCommon Aws class to use boto connection and resources :param conn: Connection to use for accessing aws resources :type conn: AwsConnectionManager :param boto_type: the type of boto client / resource to instantiate
-
get_client() → botocore.client.BaseClient¶ returns a boto client and creates one if not present Returns: BaseClient
-
get_resource() → Any¶ returns a boto resporce and creates one if not present Returns: Any
-
-
hip_data_tools.aws.common.dataclass(maybe_cls=None, these=None, repr_ns=None, repr=True, cmp=None, hash=None, init=True, slots=False, frozen=False, weakref_slot=True, str=False, *, auto_attribs=True, kw_only=False, cache_hash=False, auto_exc=False, eq=None, order=None)¶ A class decorator that adds dunder-methods according to the specified attributes using attr.ib or the these argument.
Parameters: - these (dict of str to attr.ib) –
A dictionary of name to attr.ib mappings. This is useful to avoid the definition of your attributes within the class body because you can’t (e.g. if you want to add
__repr__methods to Django models) or don’t want to.If these is not
None,attrswill not search the class body for attributes and will not remove any attributes from it.If these is an ordered dict (dict on Python 3.6+, collections.OrderedDict otherwise), the order is deduced from the order of the attributes inside these. Otherwise the order of the definition of the attributes is used.
- repr_ns (str) – When using nested classes, there’s no way in Python 2
to automatically detect that. Therefore it’s possible to set the
namespace explicitly for a more meaningful
reproutput. - repr (bool) – Create a
__repr__method with a human readable representation ofattrsattributes.. - str (bool) – Create a
__str__method that is identical to__repr__. This is usually not necessary except for Exceptions. - eq (bool or None) –
If
TrueorNone(default), add__eq__and__ne__methods that check two instances for equality.They compare the instances as if they were tuples of their
attrsattributes, but only iff the types of both classes are identical! - order (bool or None) – If
True, add__lt__,__le__,__gt__, and__ge__methods that behave like eq above and allow instances to be ordered. IfNone(default) mirror value of eq. - cmp (bool or None) – Setting to
Trueis equivalent to settingeq=True, order=True. Deprecated in favor of eq and order, has precedence over them for backward-compatibility though. Must not be mixed with eq or order. - hash (
boolorNone) –If
None(default), the__hash__method is generated according how eq and frozen are set.- If both are True,
attrswill generate a__hash__for you. - If eq is True and frozen is False,
__hash__will be set to None, marking it unhashable (which it is). - If eq is False,
__hash__will be left untouched meaning the__hash__method of the base class will be used (if base class isobject, this means it will fall back to id-based hashing.).
Although not recommended, you can decide for yourself and force
attrsto create one (e.g. if the class is immutable even though you didn’t freeze it programmatically) by passingTrueor not. Both of these cases are rather special and should be used carefully.See our documentation on hashing, Python’s documentation on object.__hash__, and the GitHub issue that led to the default behavior for more details.
- If both are True,
- init (bool) – Create a
__init__method that initializes theattrsattributes. Leading underscores are stripped for the argument name. If a__attrs_post_init__method exists on the class, it will be called after the class is fully initialized. - slots (bool) – Create a slotted class <slotted classes> that’s more memory-efficient.
- frozen (bool) –
Make instances immutable after initialization. If someone attempts to modify a frozen instance, attr.exceptions.FrozenInstanceError is raised.
Please note:
- This is achieved by installing a custom
__setattr__method on your class, so you can’t implement your own. - True immutability is impossible in Python.
- This does have a minor a runtime performance impact
<how-frozen> when initializing new instances. In other words:
__init__is slightly slower withfrozen=True. - If a class is frozen, you cannot modify
selfin__attrs_post_init__or a self-written__init__. You can circumvent that limitation by usingobject.__setattr__(self, "attribute_name", value).
- This is achieved by installing a custom
- weakref_slot (bool) – Make instances weak-referenceable. This has no
effect unless
slotsis also enabled. - auto_attribs (bool) –
If True, collect PEP 526-annotated attributes (Python 3.6 and later only) from the class body.
In this case, you must annotate every field. If
attrsencounters a field that is set to an attr.ib but lacks a type annotation, an attr.exceptions.UnannotatedAttributeError is raised. Usefield_name: typing.Any = attr.ib(...)if you don’t want to set a type.If you assign a value to those attributes (e.g.
x: int = 42), that value becomes the default value like if it were passed usingattr.ib(default=42). Passing an instance of Factory also works as expected.Attributes annotated as typing.ClassVar, and attributes that are neither annotated nor set to an attr.ib are ignored.
- kw_only (bool) – Make all attributes keyword-only (Python 3+)
in the generated
__init__(ifinitisFalse, this parameter is ignored). - cache_hash (bool) – Ensure that the object’s hash code is computed
only once and stored on the object. If this is set to
True, hashing must be either explicitly or implicitly enabled for this class. If the hash code is cached, avoid any reassignments of fields involved in hash code computation or mutations of the objects those fields point to after object creation. If such changes occur, the behavior of the object’s hash code is undefined. - auto_exc (bool) –
If the class subclasses BaseException (which implicitly includes any subclass of any exception), the following happens to behave like a well-behaved Python exceptions class:
- the values for eq, order, and hash are ignored and the
instances compare and hash by the instance’s ids (N.B.
attrswill not remove existing implementations of__hash__or the equality methods. It just won’t add own ones.), - all attributes that are either passed into
__init__or have a default value are additionally available as a tuple in theargsattribute, - the value of str is ignored leaving
__str__to base classes.
- the values for eq, order, and hash are ignored and the
instances compare and hash by the instance’s ids (N.B.
New in version 16.0.0: slots
New in version 16.1.0: frozen
New in version 16.3.0: str
New in version 16.3.0: Support for
__attrs_post_init__.Changed in version 17.1.0: hash supports
Noneas value which is also the default now.New in version 17.3.0: auto_attribs
Changed in version 18.1.0: If these is passed, no attributes are deleted from the class body.
Changed in version 18.1.0: If these is ordered, the order is retained.
New in version 18.2.0: weakref_slot
Deprecated since version 18.2.0:
__lt__,__le__,__gt__, and__ge__now raise a DeprecationWarning if the classes compared are subclasses of each other.__eqand__ne__never tried to compared subclasses to each other.Changed in version 19.2.0:
__lt__,__le__,__gt__, and__ge__now do not consider subclasses comparable anymore.New in version 18.2.0: kw_only
New in version 18.2.0: cache_hash
New in version 19.1.0: auto_exc
Deprecated since version 19.2.0: cmp Removal on or after 2021-06-01.
New in version 19.2.0: eq and order
- these (dict of str to attr.ib) –
hip_data_tools.aws.s3 module¶
Utility to connect to, and interact with the s3 file storage system
-
class
hip_data_tools.aws.s3.S3Util(conn: hip_data_tools.aws.common.AwsConnectionManager, bucket: str)¶ Bases:
hip_data_tools.aws.common.AwsUtilUtility class for connecting to s3 and manipulate data in a python way :param conn: AwsConnection object or a boto.Session object :type conn: AwsConnection :param bucket: S3 bucket name where these operations will take place :type bucket: string
-
create_bucket() → None¶ Creates the s3 bucket Returns: None
-
delete_recursive(key_prefix: str) → None¶ Recursively delete all keys with given prefix from the named bucket :param key_prefix: Key prefix under which all files will be deleted :type key_prefix: str
Returns: NA
-
delete_recursive_match_suffix(key_prefix: str, suffix: str) → None¶ Recursively delete all keys with given key prefix and suffix from the bucket :param key_prefix: Key prefix under which all files will be deleted. :type key_prefix: str :param suffix: suffix of the subset of files in the given prefix directory to be deleted :type suffix: str
Returns: None
-
download_directory(source_key: str, file_suffix: str, local_directory: str) → None¶ Download an entire directory from s3 onto local file system :param source_key: key prefix of the directory to be downloaded from s3 :type source_key: str :param file_suffix: suffix to filter a subset under the source_key to be downloaded :type file_suffix: str :param local_directory: local absolute path to store all the files :type local_directory: str
Returns: None
-
download_file(local_file_path: str, key: str) → None¶ Downloads a file from a path on s3 to a local path on disk :param local_file_path: Absolute path on s3. :type local_file_path: str :param key: Absolute path to save file to local. :type key: str
Returns: None
-
download_json(key: str, encoding: str = 'utf-8') → dict¶ Read a file in json format from s3 :param key: location of the file to read :param encoding: the character encoding to use for encoding / decoding content
Returns: dict
-
download_object_and_deserialise(key: str, local_file_path: str = None)¶ Download a serialised object from S3 and deserialize :param key: Absolute path on s3 to the file :type key: str :param local_file_path: The deserialized object :type local_file_path: str
Returns: object
-
download_parquet_as_dataframe(key: str, engine: str = 'auto', columns: List[str] = None, **kwargs) → pandas.core.frame.DataFrame¶ Exports a datafame to a parquet file on s3 :param key: The absolute path on s3 to upload the file to :type key: str :param engine: parquet engine :type engine: str :param columns: list of columns default None to extrapolate from dataframe :type columns: lis[str]
Returns: DataFrame
-
download_strings(key: str, encoding: str = 'utf-8') → List[str]¶ Read lines from s3 files :param key: the key for the file which contains strings :param encoding: the character encoding to use for encoding / decoding content
Returns: List[str]
-
download_strings_from_directory(key_prefix: str, encoding: str = 'utf-8') → List[str]¶ Read lines from s3 files :param key_prefix: the key prefix under which all files will be read :param encoding: the character encoding to use for encoding / decoding content
Returns: List[str]
-
get_all_keys(key_prefix: str) → List[str]¶ Sense all keys under a given key prefix :param key_prefix: the key prefix under which all files will be sensed :type key_prefix: str
Returns: List[str]
-
get_keys(key_prefix: str) → List[str]¶ returns a list of all objects unser a given key prefix :param key_prefix: Key Prefix under which all objects are to be listed :type key_prefix: str
Returns: list[str]
-
get_keys_modified_in_range(key_prefix: str, start_date: <module 'arrow' from '/home/docs/checkouts/readthedocs.org/user_builds/hip-data-tools/envs/v1.44.1/lib/python3.7/site-packages/arrow-0.15.6-py3.7.egg/arrow/__init__.py'>, end_date: <module 'arrow' from '/home/docs/checkouts/readthedocs.org/user_builds/hip-data-tools/envs/v1.44.1/lib/python3.7/site-packages/arrow-0.15.6-py3.7.egg/arrow/__init__.py'>) → List[str]¶ Sense if there were any files changed or added in the given time period under the given key prefix and return a list of keys :param key_prefix: the key prefix under which all files will be sensed :param start_date: start of the duration in which the s3 objects were modified :param end_date: end of the duration in which the s3 objects were modified
Returns: List[str]
-
get_object_metadata(key_prefix: str) → List[T]¶ Get metadata for all objects under a key prefix :param key_prefix: the key prefix under which all files will be sensed
Returns: List[metadata]
-
move_recursive(source_dir: str, destination_dir: str, delete_after_copy: bool = True) → None¶ recursively move files on s3 to a new location on the same bucket :param source_dir: Source key prefix representing the directory to move :param destination_dir: destination key prefix :param delete_after_copy: removes the files from source after successful copy if set to true
Returns: None
-
read_lines_as_list(key_prefix: str, encoding: str = 'utf-8') → List[str]¶ Read lines from s3 files :param key_prefix: the key prefix under which all files will be read :type key_prefix: str :param encoding: the character encoding to use for encoding / decoding content
Returns: list[str] lines read from all files
-
rename_file(key: str, new_file_name: str) → None¶ Rename a file on s3 :param key: Current key of the file :param new_file_name: target file name
Returns: None
-
serialise_and_upload_object(obj: Any, key: str) → None¶ Serialise any object to disk, and then upload to S3 :param obj: Any serialisable object :type obj: Any :param key: The absolute path on s3 to upload the file to :type key: str
Returns: None
-
upload_binary_stream(stream: bytes, key: str) → None¶ Upload a binary stream of data as an s3 object’s body :param stream: the stream of bytes to be uploaded :param key: s3 key at which this stream is to be uploaded
Returns: None
-
upload_dataframe_as_parquet(dataframe: pandas.core.frame.DataFrame, key: str, file_name: str = 'data') → None¶ Exports a datafame to a parquet file on s3 :param dataframe: dataframe to export :type dataframe: DataFrame :param key: The path on s3 to upload the file to (excluding bucket name and file name) :type key: str :param file_name: the name of the file at destination :type file_name: str
Returns: None
-
upload_directory(source_directory: str, extension: str, target_key: str, overwrite: bool = True, rename: bool = True) → None¶ Upload a local file directory to s3 :param source_directory: Local source directory’s absolute path. :type source_directory: str :param extension: the file extension of files in that directory to be uploaded. :type extension: str :param target_key: Target location on the s3 bucket for files to be uploaded. :type target_key: str :param overwrite: overwrites files on s3 if set to true. :type overwrite: bool :param rename: renames the file when uploading to s3 if set to true. :type rename: bool
Returns: None
-
upload_file(local_file_path: str, key: str, remove_local: bool = True) → None¶ Uploads a file from local to s3 :param local_file_path: Absolute local path to the file to upload :type local_file_path: str :param key: Absolute path within the s3 buck to upload the file :type key: str :param remove_local: remove file from local fs after transfer :type remove_local: bool
Returns: None
-
upload_json(key: str, json_list: List[dict], encoding: str = 'utf-8') → None¶ Save the json/dict data structure onto s3 as a file without using temporary local files :param key: target key of the file on s3 :param json_list: a list of dictionaries that are saved as newline json in a file :param encoding: the character encoding to use for encoding / decoding content
Returns: None
-