hip_data_tools.apache package¶
Submodules¶
hip_data_tools.apache.cassandra module¶
Utility for connecting to and transforming data in Cassandra clusters
-
class
hip_data_tools.apache.cassandra.CassandraConnectionManager(settings: hip_data_tools.apache.cassandra.CassandraConnectionSettings)¶ Bases:
objectCreates and manages connection to the cassandra cluster. Example - >>> from cassandra.policies import DCAwareRoundRobinPolicy >>> from cassandra.cqlengine import columns >>> from cassandra.cqlengine.management import sync_table >>> from cassandra.cqlengine.models import Model
>>> load_balancing_policy = DCAwareRoundRobinPolicy(local_dc='AWS_VPC_AP_SOUTHEAST_2')
>>> conn = CassandraConnectionManager( ... settings = CassandraConnectionSettings( ... cluster_ips=["1.1.1.1", "2.2.2.2"], ... port=9042, ... load_balancing_policy=load_balancing_policy, ... ) ... )
>>> conn = CassandraConnectionManager( ... CassandraConnectionSettings( ... cluster_ips=["1.1.1.1", "2.2.2.2"], ... port=9042, ... load_balancing_policy=load_balancing_policy, ... secrets_manager=CassandraSecretsManager( ... username_var="MY_CUSTOM_USERNAME_ENV_VAR"), ... ) ... )
For running Cassandra model operations >>> conn.setup_connection(“dev_space”) >>> class ExampleModel(Model): … example_type = columns.Integer(primary_key=True) … created_at = columns.DateTime() … description = columns.Text(required=False) >>> sync_table(ExampleModel)
Parameters: - settings (hip_data_tools.apache.cassandra.CassandraConnectionSettings) – settings to use
- connecting to a cluster (for) –
-
get_cluster() → cassandra.cluster.Cluster¶ get the cassandra Cluster object if it already exists or create a new one Returns: Cluster
-
get_session(keyspace) → cassandra.cluster.Session¶ get the cassandra Cluster’s Session object if it already exists or create a new one Returns: Session
-
setup_connection(default_keyspace) → None¶ setups an implicit connection object for cassandra using the cassandra settings in the connection manager :param default_keyspace: the keyspace to use as default for this implicit connection :type default_keyspace: str
Returns: None
-
class
hip_data_tools.apache.cassandra.CassandraConnectionSettings(cluster_ips: list, port: int, load_balancing_policy: cassandra.policies.LoadBalancingPolicy, secrets_manager: hip_data_tools.apache.cassandra.CassandraSecretsManager, ssl_options: dict = None)¶ Bases:
objectEncapsulates the Cassandra connection settings
-
class
hip_data_tools.apache.cassandra.CassandraSecretsManager(source: hip_data_tools.common.KeyValueSource = <hip_data_tools.common.EnvironmentKeyValueSource object>, username_var: str = 'CASSANDRA_USERNAME', password_var: str = 'CASSANDRA_PASSWORD')¶ Bases:
hip_data_tools.common.SecretsManagerSecrets manager for Cassandra configuration :param source: a kv source that has secrets :type source: KeyValueSource :param username_var: the variable name or the key for finding username :type username_var: str :param password_var: the variable name or the key for finding password :type password_var: str
-
class
hip_data_tools.apache.cassandra.CassandraUtil(keyspace: str, conn: hip_data_tools.apache.cassandra.CassandraConnectionManager)¶ Bases:
objectClass to connect to and then retrieve, transform and upload data from and to cassandra
-
create_table_from_dataframe(data_frame: pandas.core.frame.DataFrame, table_name: str, primary_key_column_list: List[str], table_options_statement='')¶ Create a new table in cassandra based on a pandas DataFrame :param data_frame: the data frame to be synced :type data_frame: DataFrame :param table_name: name of the table to create :type table_name: str :param primary_key_column_list: list of columns in the data frame that constitute :type primary_key_column_list: lost[str] :param primary key for new table: :param table_options_statement: a cql valid WITH statement to specify table options as :type table_options_statement: str :param specified in https: //docs.datastax.com/en/dse/6.0/cql/cql/cql_reference/cql_commands :param /cqlCreateTable.html#table_optionsŒ:
Returns: ResultSet
-
create_table_from_model(model_class)¶ Create a table if not exists from the Model Class :param model_class: class for the Model :type model_class: class
Returns: None
-
execute(query: str, row_factory: callable, **kwargs) → cassandra.datastax.graph.query.Result¶ Execute a cql command and retrieve data with the row factory :param query: :type query: str :param row_factory: :type row_factory: callable :param **kwargs: Kwargs to match the session.execute command in cassandra
Returns: ResultSet
-
prepare_batches(prepared_statement: cassandra.query.PreparedStatement, tuples: List[tuple], batch_size: int) → List[T]¶ Prepares a list of cassandra batched Statements out of a list of tuples and prepared statement :param prepared_statement: the statement to be used for batching. :type prepared_statement: PreparedStatement :param tuples: the data to be inserted. :type tuples: list[tuple] :param batch_size: limit on the number of prepared statements in the batch. :type batch_size: int
Returns: list[BatchStatement]
-
read_as_dataframe(query: str, **kwargs) → pandas.core.frame.DataFrame¶ Read the result of a query in form of a pandas DataFrame :param query: :type query: str
Returns: DataFrame
-
read_as_dictonary_list(query: str, **kwargs) → List[dict]¶ Read the results of a query in form of a list of dict :param query: :type query: str
Returns: list[dict]
-
upsert_dataframe(dataframe: pandas.core.frame.DataFrame, table: str) → List[cassandra.datastax.graph.query.Result]¶ Upload all data from a DataFrame onto a cassandra table :param dataframe: a DataFrame to upsert :type dataframe: DataFrame :param table: the table to upsert data into :type table: str :param table. If None then the DataFrame column names that match cassandra table anme will be: :param upserted else ignored: :param batch_size: limit on the number of prepared statements in the batch :type batch_size: int
Returns: ResultSet
-
upsert_dataframe_in_batches(dataframe: pandas.core.frame.DataFrame, table: str, batch_size: int = 2) → List[cassandra.datastax.graph.query.Result]¶ Upload all data from a DataFrame onto a cassandra table :param dataframe: a DataFrame to upsert :type dataframe: DataFrame :param table: the table to upsert data into :type table: str :param table. If None then the DataFrame column names that match cassandra table anme will be: :param upserted else ignored: :param batch_size: limit on the number of prepared statements in the batch :type batch_size: int
Returns: ResultSet
-
upsert_dictonary_list(data: List[dict], table: str) → List[cassandra.datastax.graph.query.Result]¶ Upsert a row into the cassandra table based on the dictionary key values :param data: the data to be upserted :type data: list[dict] :param table: the table to upsert data into :type table: str
Returns: None
-
upsert_dictonary_list_in_batches(data: List[dict], table: str, batch_size: int = 2) → List[cassandra.datastax.graph.query.Result]¶ Upsert a row into the cassandra table based on the dictionary key values :param data: the data to be upserted :type data: list[dict] :param table: the table to upsert data into :type table: str :param batch_size: limit on the number of prepared statements in the batch :type batch_size: int
Returns: None
-
-
exception
hip_data_tools.apache.cassandra.ValidationError(message)¶ Bases:
ExceptionException class to raise validation issues :param message: the error message :type message: str
-
hip_data_tools.apache.cassandra.dataclass(maybe_cls=None, these=None, repr_ns=None, repr=True, cmp=None, hash=None, init=True, slots=False, frozen=False, weakref_slot=True, str=False, *, auto_attribs=True, kw_only=False, cache_hash=False, auto_exc=False, eq=None, order=None)¶ A class decorator that adds dunder-methods according to the specified attributes using attr.ib or the these argument.
Parameters: - these (dict of str to attr.ib) –
A dictionary of name to attr.ib mappings. This is useful to avoid the definition of your attributes within the class body because you can’t (e.g. if you want to add
__repr__methods to Django models) or don’t want to.If these is not
None,attrswill not search the class body for attributes and will not remove any attributes from it.If these is an ordered dict (dict on Python 3.6+, collections.OrderedDict otherwise), the order is deduced from the order of the attributes inside these. Otherwise the order of the definition of the attributes is used.
- repr_ns (str) – When using nested classes, there’s no way in Python 2
to automatically detect that. Therefore it’s possible to set the
namespace explicitly for a more meaningful
reproutput. - repr (bool) – Create a
__repr__method with a human readable representation ofattrsattributes.. - str (bool) – Create a
__str__method that is identical to__repr__. This is usually not necessary except for Exceptions. - eq (bool or None) –
If
TrueorNone(default), add__eq__and__ne__methods that check two instances for equality.They compare the instances as if they were tuples of their
attrsattributes, but only iff the types of both classes are identical! - order (bool or None) – If
True, add__lt__,__le__,__gt__, and__ge__methods that behave like eq above and allow instances to be ordered. IfNone(default) mirror value of eq. - cmp (bool or None) – Setting to
Trueis equivalent to settingeq=True, order=True. Deprecated in favor of eq and order, has precedence over them for backward-compatibility though. Must not be mixed with eq or order. - hash (
boolorNone) –If
None(default), the__hash__method is generated according how eq and frozen are set.- If both are True,
attrswill generate a__hash__for you. - If eq is True and frozen is False,
__hash__will be set to None, marking it unhashable (which it is). - If eq is False,
__hash__will be left untouched meaning the__hash__method of the base class will be used (if base class isobject, this means it will fall back to id-based hashing.).
Although not recommended, you can decide for yourself and force
attrsto create one (e.g. if the class is immutable even though you didn’t freeze it programmatically) by passingTrueor not. Both of these cases are rather special and should be used carefully.See our documentation on hashing, Python’s documentation on object.__hash__, and the GitHub issue that led to the default behavior for more details.
- If both are True,
- init (bool) – Create a
__init__method that initializes theattrsattributes. Leading underscores are stripped for the argument name. If a__attrs_post_init__method exists on the class, it will be called after the class is fully initialized. - slots (bool) – Create a slotted class <slotted classes> that’s more memory-efficient.
- frozen (bool) –
Make instances immutable after initialization. If someone attempts to modify a frozen instance, attr.exceptions.FrozenInstanceError is raised.
Please note:
- This is achieved by installing a custom
__setattr__method on your class, so you can’t implement your own. - True immutability is impossible in Python.
- This does have a minor a runtime performance impact
<how-frozen> when initializing new instances. In other words:
__init__is slightly slower withfrozen=True. - If a class is frozen, you cannot modify
selfin__attrs_post_init__or a self-written__init__. You can circumvent that limitation by usingobject.__setattr__(self, "attribute_name", value).
- This is achieved by installing a custom
- weakref_slot (bool) – Make instances weak-referenceable. This has no
effect unless
slotsis also enabled. - auto_attribs (bool) –
If True, collect PEP 526-annotated attributes (Python 3.6 and later only) from the class body.
In this case, you must annotate every field. If
attrsencounters a field that is set to an attr.ib but lacks a type annotation, an attr.exceptions.UnannotatedAttributeError is raised. Usefield_name: typing.Any = attr.ib(...)if you don’t want to set a type.If you assign a value to those attributes (e.g.
x: int = 42), that value becomes the default value like if it were passed usingattr.ib(default=42). Passing an instance of Factory also works as expected.Attributes annotated as typing.ClassVar, and attributes that are neither annotated nor set to an attr.ib are ignored.
- kw_only (bool) – Make all attributes keyword-only (Python 3+)
in the generated
__init__(ifinitisFalse, this parameter is ignored). - cache_hash (bool) – Ensure that the object’s hash code is computed
only once and stored on the object. If this is set to
True, hashing must be either explicitly or implicitly enabled for this class. If the hash code is cached, avoid any reassignments of fields involved in hash code computation or mutations of the objects those fields point to after object creation. If such changes occur, the behavior of the object’s hash code is undefined. - auto_exc (bool) –
If the class subclasses BaseException (which implicitly includes any subclass of any exception), the following happens to behave like a well-behaved Python exceptions class:
- the values for eq, order, and hash are ignored and the
instances compare and hash by the instance’s ids (N.B.
attrswill not remove existing implementations of__hash__or the equality methods. It just won’t add own ones.), - all attributes that are either passed into
__init__or have a default value are additionally available as a tuple in theargsattribute, - the value of str is ignored leaving
__str__to base classes.
- the values for eq, order, and hash are ignored and the
instances compare and hash by the instance’s ids (N.B.
New in version 16.0.0: slots
New in version 16.1.0: frozen
New in version 16.3.0: str
New in version 16.3.0: Support for
__attrs_post_init__.Changed in version 17.1.0: hash supports
Noneas value which is also the default now.New in version 17.3.0: auto_attribs
Changed in version 18.1.0: If these is passed, no attributes are deleted from the class body.
Changed in version 18.1.0: If these is ordered, the order is retained.
New in version 18.2.0: weakref_slot
Deprecated since version 18.2.0:
__lt__,__le__,__gt__, and__ge__now raise a DeprecationWarning if the classes compared are subclasses of each other.__eqand__ne__never tried to compared subclasses to each other.Changed in version 19.2.0:
__lt__,__le__,__gt__, and__ge__now do not consider subclasses comparable anymore.New in version 18.2.0: kw_only
New in version 18.2.0: cache_hash
New in version 19.1.0: auto_exc
Deprecated since version 19.2.0: cmp Removal on or after 2021-06-01.
New in version 19.2.0: eq and order
- these (dict of str to attr.ib) –
-
hip_data_tools.apache.cassandra.dataframe_to_cassandra_tuples(dataframe: pandas.core.frame.DataFrame) → list¶ The Cassandra api uses tuples to send data for it’s prepared statements, this method converts the rows of a dataframe into a list of tuples. It also converts the Pandas specific datatypes like Timestamp and NaN to python datatypes like datetime and None :param dataframe: the dataframe to be converted to a list of tuples :type dataframe: DataFrame
Returns: list[tuple]
-
hip_data_tools.apache.cassandra.dicts_to_cassandra_tuples(data: list) → list¶ The Cassandra api uses tuples to send data for it’s prepared statements, this method converts the list of dictionaries into a list of tuples :param data: the list of dictonaries to be converted into a list of tuples :type data: list[dict]
Returns: list[tuple]
-
hip_data_tools.apache.cassandra.get_cql_columns_from_dataframe(data_frame)¶ Extracts a dictionary of column names and their cassandra data types from the dataframe :param data_frame: the dataframe whose columns need to be extracted :type data_frame: DataFrame
Returns: dict
hip_data_tools.apache.kafka module¶
This module provides a simple wrapper to enable the instantiation of kafka producers and consumers
-
class
hip_data_tools.apache.kafka.BatchS3UploaderConfig(export_path=None, bucket=None, ts_col_nm=None, partition_key_nm=None)¶ Bases:
objectConfigurations for the BatchS3Uploader
Parameters: - export_path (str) – base path within the s3 bucket to export to
- bucket (str) – name of the bucket to export results to
- ts_col_nm (str) – name given to the time stamp column name
- partition_key_nm (str) – name of the column used to store the temporal partitioning keys
-
export_path¶ base path within the s3 bucket to export to
Type: str
-
bucket¶ name of the bucket to export results to
Type: str
-
ts_col_nm¶ name given to the time stamp column name
Type: str
-
partition_key_nm¶ name of the column used to store the temporal partitioning keys
Type: str
-
class
hip_data_tools.apache.kafka.ConsumerConfig(bootstrap_servers=None, conf=None, group_id=None)¶ Bases:
objectEnscapsulation of the Kafka Consumer Configurations
Parameters: - bootstrap_servers (string) – address and port of bootstrap server
- conf (string(dictionary)) – string of dictionary of configurations
- group_id (string) – Group Id for the consumer
-
configs¶ Dictionary of the kafka configurations
Type: dict
-
hip_data_tools.apache.kafka.DEFAULT_CONSUMER_CONF= "{'auto.offset.reset': 'earliest'}"¶ Default configurations for the Kafka consumer, as string representing a python dictionary of configurations
-
hip_data_tools.apache.kafka.DEFAULT_KAFKA_TIMESTAMP_COLUMN_NAME= 'kafka_timestamp'¶ Default name of the column in which we store a message’s timestamp provided by Kafka
-
hip_data_tools.apache.kafka.DEFAULT_PRODUCER_CONF= "{'queue.buffering.max.messages': 10000,\n 'queue.buffering.max.ms' : 1000}"¶ Default configurations for the Kafka producer, as string representing a python dictionary of configurations
-
hip_data_tools.apache.kafka.DEFAULT_TIMESTAMP_PARTITION_KEY= 'partition_key_ts'¶ Default column name for the partitioning message batches uploaded to S3
-
class
hip_data_tools.apache.kafka.KafkaConduit(kafka_poller, kafka_exporter, polling_interval)¶ Bases:
objectThis class polls the Kafka topic at set intervals and exports the results to an s3 bucket
Parameters: - kafka_poller (KafkaPoller) – Instantiated Kafka Poller
- kafka_exporter (KafkaS3BatchExporter) – Exporter to parse and export Kafka messages
- Interval (Polling) – Interval in Seconds at which to poll the Kafka topic and export the messages to s3
-
create_events_snapshot()¶ Get Kafka messages from a topic and export to s3
Returns: None
-
poll_topic_and_upload_to_s3()¶ Poll at the Kafka topic at set intervals and parse and export the messages to S3 Returns: None
-
class
hip_data_tools.apache.kafka.KafkaPoller(kafka_consumer, topic, timeout_interval)¶ Bases:
objectThis class provides a wrapper around the Kafka consumer to enable polling of a kafka queue and aggregation of all messages into a single list
Parameters: - kafka_consumer (Consumer) – Kafka Consumer
- topic (str) – Topic to subscribe to
- timeout_interval (int) – Time int seconds before the consumer times out
-
kafka_consumer¶ Kafka Consumer
Type: Consumer
-
topic¶ Topic to subscribe to
Type: str
-
timeout_interval¶ Time int seconds before the consumer times out
Type: int
-
get_msgs()¶ Get the latest messages from the Kafka topic
Returns list(Message) : list of Kafka Messages
-
poll_kafka_for_messages()¶ Poll the kafka topic for messages
Returns list(Message): list of Kafka messages
-
class
hip_data_tools.apache.kafka.KafkaPollerConfig(consumer=None, topic=None, timeout_interval=None)¶ Bases:
objectConfiguration for the poller class
Parameters: - consumer (Consumer) – Consumer configurations object
- topic (str) – The topic to poll messages from
- timeout_interval (int) – Timeout interval for reading the messages
-
class
hip_data_tools.apache.kafka.KafkaProducer(topic=None, raise_exception_on_failed_connection=False, bootstrap_servers=None, config=None)¶ Bases:
objectThis class provide a wrapper around the lower level Kafka produce, and extends this base functionality with commonly used methods or approaches to dealing with a message queue
Parameters: - topic (str) – Name of Kafka topic to publish the message to
- raise_exception_on_failed_connection (bool) – raise exception if the Kafka broker cannot be polled
- bootstrap_servers – (str): String list of bootstrap servers and their ports e.g. ‘localhost:9092’ or ‘[broker1:9092, broker2:9092]’
- config (str) – string representation of a python dictionary which encapsulates the required settings for the consumer
-
topic¶ Name of Kafka topic to publish the message to
Type: str
-
bootstrap_servers¶ (str): String list of bootstrap servers and their ports e.g. ‘localhost:9092’ or ‘[broker1:9092, broker2:9092]’
-
config¶ string representation of a python dictionary which encapsulates the required settings for the consumer
Type: str
-
producer¶ Kafka producer class
Type: Producer
-
instantiate_producer()¶ Try to connect to the Kafka bootstrap server. We include functionality to allow failure to connect to the queue to happen silently
Returns (Producer): Kafka Producer
-
produce_msg(msg)¶ Produces a message to the Kafka topic
Parameters: msg (str) – String to be push to the Kafka topic Returns: None
-
publish_dict_as_json(in_dict)¶ Converts a dictionary to json and pushes it to the Kafka topic
Parameters: in_dict (dict) – Dictionary to be pushed to the topic Returns (string): Message published to kafka
-
class
hip_data_tools.apache.kafka.KafkaS3BatchExporter(root_path, s3_client, ts_col_nm, partition_key_nm)¶ Bases:
objectThis class enables the deposition of a batch of Kafka messages into a s3 bucket at set intervals
-
generate_partitioned_dataframes(df_msgs_and_meta_data)¶ Generates the batched dataframes to upload to s3 :param df_msgs_and_meta_data: dataframe to be paritioned :type df_msgs_and_meta_data: dataframe
- Returns (List(dataframe, string)): list of dataframes and thier
- folder paths on s3
-
parse_and_export_msgs(list_of_msgs, interval)¶ Converts messages to a pandas dataframe and then exports to s3
Parameters: - list_of_msgs (list(Kafka Message Object)) – List of msg objects
- interval (int) – Rounding interval for the temporal partitioning
Returns: None
-
partition_msgs_by_kafka_ts(list_of_dicts, interval)¶ Partition messages by their Kafka timestamps and uses these timestamps to generate their relevant paths on s3 :param list_of_dicts: list of dictionaries to upload to s3 :type list_of_dicts: list(dict) :param interval: Rounding interval for the temporal partitioning :type interval: int
Returns list((DataFrame, string)): Returns the dataframe for each temporal partition, and the path to upload it to
-
-
exception
hip_data_tools.apache.kafka.NoProducerInstantiatedError¶ Bases:
ExceptionException raised when no producer has been instantiated
-
class
hip_data_tools.apache.kafka.ProducerConfig(bootstrap_servers=None, conf=None)¶ Bases:
objectEncapsulation of the kafka configurations, reading from environment if not supplied, and defaulting where necessary
Parameters: - bootstrap_servers (string) – address and port of bootstrap server
- conf (string(dictionary)) – string of dictionary of configurations
-
configs¶ Dictionary of the kafka configurations
Type: dict
-
hip_data_tools.apache.kafka.add_interval_partitioning_column(msgs_df, time_column, partitioning_key, interval)¶ Adding column to a dataframe with a timestamp column rounded to a time interval.
Parameters: - msgs_df (DataFrame) – Pandas dataframe to with at least 1 timestamp column
- time_column (str) – Name of the timestamp column to be rounded
- partitioning_key (str) – Name of the column in which we create the
- timestamps (rounded) –
- interval (int) – Time interval (in seconds) to round timestamps to
Returns (DataFame): Returns a dataframe with an appended column of rounded timestamps
-
hip_data_tools.apache.kafka.convert_msgs_to_dictionary(list_of_msgs)¶ Converts json msgs into dictionaries, catching any badly formatted into string into their own list
Parameters: list_of_msgs (list(Message Object)) – list of json strings - Returns (list(dict), list(dict)): Returns correctly formatted dictionaries
- in one list and any ill formatted strings in a second list
-
hip_data_tools.apache.kafka.create_batch_s3_uploader(batch_s3_uploader_config=None)¶ Factory method to generate a kafka batch uploadxr
Parameters: batch_s3_uploader_config (BatchS3UploaderConfig) – Configuration object for the s3 uploader Returns (KafkaS3BatchExporter): Instantiated Exporter
-
hip_data_tools.apache.kafka.create_conduit(kafka_poller=None, kafka_exporter=None, polling_interval=None)¶ Factory method to create Kafka S3 Conduit, this polls the Kafka queue after a set interval and deposits the results onto s3
Parameters: - kafka_poller (KafkaPoller) – Kafka Poller used to poll results from a topic at set intervals
- kafka_exporter (KafkaS3BatchExporter) – Batch Exporter
- polling_interval (int) – Time in seconds between polling events
Returns (KafkaS3Conduit): Instantiated KafkaS3Conduit
-
hip_data_tools.apache.kafka.create_consumer(kafka_consumer_config=None)¶ Creates a Kafka consumer
Parameters: kafka_consumer_config (ConsumerConfig) – Instantiated kafka configuration object Returns: Instantiated Kafka Consumer
-
hip_data_tools.apache.kafka.create_poller(kafka_poller_conf=None)¶ Factory method to create a Kafka Poller and take values from environment if they aren’t provided
Parameters: kafka_poller_conf (KafkaPollerConfig) – Kafka Consumer Returns (KafkaPoller): Instantiated KafkaPoller
-
hip_data_tools.apache.kafka.create_producer(kafka_producer_config=None)¶ Creates a Kafka producer
Parameters: - kafka_producer_config (ProducerConfig) – Encapsulated
- for the KafkaProducer (configurations) –
Returns (Producer): Instantiated Kafka Producer for a specified topic
-
hip_data_tools.apache.kafka.generate_snapshot_file_name_with_timestamp()¶ - Generates the name of the snapshot folder using both the current time
- and date
Returns (str): String containing the date and time stamped path