hip_data_tools.apache package

Submodules

hip_data_tools.apache.cassandra module

Utility for connecting to and transforming data in Cassandra clusters

class hip_data_tools.apache.cassandra.CassandraConnectionManager(settings: hip_data_tools.apache.cassandra.CassandraConnectionSettings)

Bases: object

Creates and manages connection to the cassandra cluster. Example - >>> from cassandra.policies import DCAwareRoundRobinPolicy >>> from cassandra.cqlengine import columns >>> from cassandra.cqlengine.management import sync_table >>> from cassandra.cqlengine.models import Model

>>> load_balancing_policy = DCAwareRoundRobinPolicy(local_dc='AWS_VPC_AP_SOUTHEAST_2')
>>> conn = CassandraConnectionManager(
...     settings = CassandraConnectionSettings(
...         cluster_ips=["1.1.1.1", "2.2.2.2"],
...         port=9042,
...         load_balancing_policy=load_balancing_policy,
...     )
... )
>>> conn = CassandraConnectionManager(
...     CassandraConnectionSettings(
...         cluster_ips=["1.1.1.1", "2.2.2.2"],
...         port=9042,
...         load_balancing_policy=load_balancing_policy,
...         secrets_manager=CassandraSecretsManager(
...         username_var="MY_CUSTOM_USERNAME_ENV_VAR"),
...     )
... )

For running Cassandra model operations >>> conn.setup_connection(“dev_space”) >>> class ExampleModel(Model): … example_type = columns.Integer(primary_key=True) … created_at = columns.DateTime() … description = columns.Text(required=False) >>> sync_table(ExampleModel)

Parameters:
get_cluster() → cassandra.cluster.Cluster

get the cassandra Cluster object if it already exists or create a new one Returns: Cluster

get_session(keyspace) → cassandra.cluster.Session

get the cassandra Cluster’s Session object if it already exists or create a new one Returns: Session

setup_connection(default_keyspace) → None

setups an implicit connection object for cassandra using the cassandra settings in the connection manager :param default_keyspace: the keyspace to use as default for this implicit connection :type default_keyspace: str

Returns: None

class hip_data_tools.apache.cassandra.CassandraConnectionSettings(cluster_ips: list, port: int, load_balancing_policy: cassandra.policies.LoadBalancingPolicy, secrets_manager: hip_data_tools.apache.cassandra.CassandraSecretsManager, ssl_options: dict = None)

Bases: object

Encapsulates the Cassandra connection settings

class hip_data_tools.apache.cassandra.CassandraSecretsManager(source: hip_data_tools.common.KeyValueSource = <hip_data_tools.common.EnvironmentKeyValueSource object>, username_var: str = 'CASSANDRA_USERNAME', password_var: str = 'CASSANDRA_PASSWORD')

Bases: hip_data_tools.common.SecretsManager

Secrets manager for Cassandra configuration :param source: a kv source that has secrets :type source: KeyValueSource :param username_var: the variable name or the key for finding username :type username_var: str :param password_var: the variable name or the key for finding password :type password_var: str

class hip_data_tools.apache.cassandra.CassandraUtil(keyspace: str, conn: hip_data_tools.apache.cassandra.CassandraConnectionManager)

Bases: object

Class to connect to and then retrieve, transform and upload data from and to cassandra

create_table_from_dataframe(data_frame: pandas.core.frame.DataFrame, table_name: str, primary_key_column_list: List[str], table_options_statement='')

Create a new table in cassandra based on a pandas DataFrame :param data_frame: the data frame to be synced :type data_frame: DataFrame :param table_name: name of the table to create :type table_name: str :param primary_key_column_list: list of columns in the data frame that constitute :type primary_key_column_list: lost[str] :param primary key for new table: :param table_options_statement: a cql valid WITH statement to specify table options as :type table_options_statement: str :param specified in https: //docs.datastax.com/en/dse/6.0/cql/cql/cql_reference/cql_commands :param /cqlCreateTable.html#table_optionsŒ:

Returns: ResultSet

create_table_from_model(model_class)

Create a table if not exists from the Model Class :param model_class: class for the Model :type model_class: class

Returns: None

execute(query: str, row_factory: callable, **kwargs) → cassandra.datastax.graph.query.Result

Execute a cql command and retrieve data with the row factory :param query: :type query: str :param row_factory: :type row_factory: callable :param **kwargs: Kwargs to match the session.execute command in cassandra

Returns: ResultSet

prepare_batches(prepared_statement: cassandra.query.PreparedStatement, tuples: List[tuple], batch_size: int) → List[T]

Prepares a list of cassandra batched Statements out of a list of tuples and prepared statement :param prepared_statement: the statement to be used for batching. :type prepared_statement: PreparedStatement :param tuples: the data to be inserted. :type tuples: list[tuple] :param batch_size: limit on the number of prepared statements in the batch. :type batch_size: int

Returns: list[BatchStatement]

read_as_dataframe(query: str, **kwargs) → pandas.core.frame.DataFrame

Read the result of a query in form of a pandas DataFrame :param query: :type query: str

Returns: DataFrame

read_as_dictonary_list(query: str, **kwargs) → List[dict]

Read the results of a query in form of a list of dict :param query: :type query: str

Returns: list[dict]

upsert_dataframe(dataframe: pandas.core.frame.DataFrame, table: str) → List[cassandra.datastax.graph.query.Result]

Upload all data from a DataFrame onto a cassandra table :param dataframe: a DataFrame to upsert :type dataframe: DataFrame :param table: the table to upsert data into :type table: str :param table. If None then the DataFrame column names that match cassandra table anme will be: :param upserted else ignored: :param batch_size: limit on the number of prepared statements in the batch :type batch_size: int

Returns: ResultSet

upsert_dataframe_in_batches(dataframe: pandas.core.frame.DataFrame, table: str, batch_size: int = 2) → List[cassandra.datastax.graph.query.Result]

Upload all data from a DataFrame onto a cassandra table :param dataframe: a DataFrame to upsert :type dataframe: DataFrame :param table: the table to upsert data into :type table: str :param table. If None then the DataFrame column names that match cassandra table anme will be: :param upserted else ignored: :param batch_size: limit on the number of prepared statements in the batch :type batch_size: int

Returns: ResultSet

upsert_dictonary_list(data: List[dict], table: str) → List[cassandra.datastax.graph.query.Result]

Upsert a row into the cassandra table based on the dictionary key values :param data: the data to be upserted :type data: list[dict] :param table: the table to upsert data into :type table: str

Returns: None

upsert_dictonary_list_in_batches(data: List[dict], table: str, batch_size: int = 2) → List[cassandra.datastax.graph.query.Result]

Upsert a row into the cassandra table based on the dictionary key values :param data: the data to be upserted :type data: list[dict] :param table: the table to upsert data into :type table: str :param batch_size: limit on the number of prepared statements in the batch :type batch_size: int

Returns: None

exception hip_data_tools.apache.cassandra.ValidationError(message)

Bases: Exception

Exception class to raise validation issues :param message: the error message :type message: str

hip_data_tools.apache.cassandra.dataclass(maybe_cls=None, these=None, repr_ns=None, repr=None, cmp=None, hash=None, init=None, slots=False, frozen=False, weakref_slot=True, str=False, *, auto_attribs=True, kw_only=False, cache_hash=False, auto_exc=False, eq=None, order=None, auto_detect=False, collect_by_mro=False, getstate_setstate=None, on_setattr=None, field_transformer=None)

A class decorator that adds dunder-methods according to the specified attributes using attr.ib or the these argument.

Parameters:
  • these (dict of str to attr.ib) –

    A dictionary of name to attr.ib mappings. This is useful to avoid the definition of your attributes within the class body because you can’t (e.g. if you want to add __repr__ methods to Django models) or don’t want to.

    If these is not None, attrs will not search the class body for attributes and will not remove any attributes from it.

    If these is an ordered dict (dict on Python 3.6+, collections.OrderedDict otherwise), the order is deduced from the order of the attributes inside these. Otherwise the order of the definition of the attributes is used.

  • repr_ns (str) – When using nested classes, there’s no way in Python 2 to automatically detect that. Therefore it’s possible to set the namespace explicitly for a more meaningful repr output.
  • auto_detect (bool) –

    Instead of setting the init, repr, eq, order, and hash arguments explicitly, assume they are set to True unless any of the involved methods for one of the arguments is implemented in the current class (i.e. it is not inherited from some base class).

    So for example by implementing __eq__ on a class yourself, attrs will deduce eq=False and won’t create neither __eq__ nor __ne__ (but Python classes come with a sensible __ne__ by default, so it should be enough to only implement __eq__ in most cases).

    Warning

    If you prevent attrs from creating the ordering methods for you (order=False, e.g. by implementing __le__), it becomes your responsibility to make sure its ordering is sound. The best way is to use the functools.total_ordering decorator.

    Passing True or False to init, repr, eq, order, cmp, or hash overrides whatever auto_detect would determine.

    auto_detect requires Python 3. Setting it True on Python 2 raises a PythonTooOldError.

  • repr (bool) – Create a __repr__ method with a human readable representation of attrs attributes..
  • str (bool) – Create a __str__ method that is identical to __repr__. This is usually not necessary except for Exceptions.
  • eq (Optional[bool]) –

    If True or None (default), add __eq__ and __ne__ methods that check two instances for equality.

    They compare the instances as if they were tuples of their attrs attributes if and only if the types of both classes are identical!

  • order (Optional[bool]) – If True, add __lt__, __le__, __gt__, and __ge__ methods that behave like eq above and allow instances to be ordered. If None (default) mirror value of eq.
  • cmp (Optional[bool]) – Setting to True is equivalent to setting eq=True, order=True. Deprecated in favor of eq and order, has precedence over them for backward-compatibility though. Must not be mixed with eq or order.
  • hash (Optional[bool]) –

    If None (default), the __hash__ method is generated according how eq and frozen are set.

    1. If both are True, attrs will generate a __hash__ for you.
    2. If eq is True and frozen is False, __hash__ will be set to None, marking it unhashable (which it is).
    3. If eq is False, __hash__ will be left untouched meaning the __hash__ method of the base class will be used (if base class is object, this means it will fall back to id-based hashing.).

    Although not recommended, you can decide for yourself and force attrs to create one (e.g. if the class is immutable even though you didn’t freeze it programmatically) by passing True or not. Both of these cases are rather special and should be used carefully.

    See our documentation on hashing, Python’s documentation on object.__hash__, and the GitHub issue that led to the default behavior for more details.

  • init (bool) – Create a __init__ method that initializes the attrs attributes. Leading underscores are stripped for the argument name. If a __attrs_post_init__ method exists on the class, it will be called after the class is fully initialized.
  • slots (bool) – Create a slotted class <slotted classes> that’s more memory-efficient. Slotted classes are generally superior to the default dict classes, but have some gotchas you should know about, so we encourage you to read the glossary entry <slotted classes>.
  • frozen (bool) –

    Make instances immutable after initialization. If someone attempts to modify a frozen instance, attr.exceptions.FrozenInstanceError is raised.

    Note

    1. This is achieved by installing a custom __setattr__ method on your class, so you can’t implement your own.
    2. True immutability is impossible in Python.
    3. This does have a minor a runtime performance impact <how-frozen> when initializing new instances. In other words: __init__ is slightly slower with frozen=True.
    4. If a class is frozen, you cannot modify self in __attrs_post_init__ or a self-written __init__. You can circumvent that limitation by using object.__setattr__(self, "attribute_name", value).
    5. Subclasses of a frozen class are frozen too.
  • weakref_slot (bool) – Make instances weak-referenceable. This has no effect unless slots is also enabled.
  • auto_attribs (bool) –

    If True, collect PEP 526-annotated attributes (Python 3.6 and later only) from the class body.

    In this case, you must annotate every field. If attrs encounters a field that is set to an attr.ib but lacks a type annotation, an attr.exceptions.UnannotatedAttributeError is raised. Use field_name: typing.Any = attr.ib(...) if you don’t want to set a type.

    If you assign a value to those attributes (e.g. x: int = 42), that value becomes the default value like if it were passed using attr.ib(default=42). Passing an instance of Factory also works as expected.

    Attributes annotated as typing.ClassVar, and attributes that are neither annotated nor set to an attr.ib are ignored.

  • kw_only (bool) – Make all attributes keyword-only (Python 3+) in the generated __init__ (if init is False, this parameter is ignored).
  • cache_hash (bool) – Ensure that the object’s hash code is computed only once and stored on the object. If this is set to True, hashing must be either explicitly or implicitly enabled for this class. If the hash code is cached, avoid any reassignments of fields involved in hash code computation or mutations of the objects those fields point to after object creation. If such changes occur, the behavior of the object’s hash code is undefined.
  • auto_exc (bool) –

    If the class subclasses BaseException (which implicitly includes any subclass of any exception), the following happens to behave like a well-behaved Python exceptions class:

    • the values for eq, order, and hash are ignored and the instances compare and hash by the instance’s ids (N.B. attrs will not remove existing implementations of __hash__ or the equality methods. It just won’t add own ones.),
    • all attributes that are either passed into __init__ or have a default value are additionally available as a tuple in the args attribute,
    • the value of str is ignored leaving __str__ to base classes.
  • collect_by_mro (bool) –

    Setting this to True fixes the way attrs collects attributes from base classes. The default behavior is incorrect in certain cases of multiple inheritance. It should be on by default but is kept off for backward-compatability.

    See issue #428 for more details.

  • getstate_setstate (Optional[bool]) –

    Note

    This is usually only interesting for slotted classes and you should probably just set auto_detect to True.

    If True, __getstate__ and __setstate__ are generated and attached to the class. This is necessary for slotted classes to be pickleable. If left None, it’s True by default for slotted classes and False for dict classes.

    If auto_detect is True, and getstate_setstate is left None, and either __getstate__ or __setstate__ is detected directly on the class (i.e. not inherited), it is set to False (this is usually what you want).

  • on_setattr

    A callable that is run whenever the user attempts to set an attribute (either by assignment like i.x = 42 or by using setattr like setattr(i, "x", 42)). It receives the same arguments as validators: the instance, the attribute that is being modified, and the new value.

    If no exception is raised, the attribute is set to the return value of the callable.

    If a list of callables is passed, they’re automatically wrapped in an attr.setters.pipe.

  • field_transformer (Optional[callable]) – A function that is called with the original class object and all fields right before attrs finalizes the class. You can use this, e.g., to automatically add converters or validators to fields based on their types. See transform-fields for more details.

New in version 16.0.0: slots

New in version 16.1.0: frozen

New in version 16.3.0: str

New in version 16.3.0: Support for __attrs_post_init__.

Changed in version 17.1.0: hash supports None as value which is also the default now.

New in version 17.3.0: auto_attribs

Changed in version 18.1.0: If these is passed, no attributes are deleted from the class body.

Changed in version 18.1.0: If these is ordered, the order is retained.

New in version 18.2.0: weakref_slot

Deprecated since version 18.2.0: __lt__, __le__, __gt__, and __ge__ now raise a DeprecationWarning if the classes compared are subclasses of each other. __eq and __ne__ never tried to compared subclasses to each other.

Changed in version 19.2.0: __lt__, __le__, __gt__, and __ge__ now do not consider subclasses comparable anymore.

New in version 18.2.0: kw_only

New in version 18.2.0: cache_hash

New in version 19.1.0: auto_exc

Deprecated since version 19.2.0: cmp Removal on or after 2021-06-01.

New in version 19.2.0: eq and order

New in version 20.1.0: auto_detect

New in version 20.1.0: collect_by_mro

New in version 20.1.0: getstate_setstate

New in version 20.1.0: on_setattr

New in version 20.3.0: field_transformer

hip_data_tools.apache.cassandra.dataframe_to_cassandra_tuples(dataframe: pandas.core.frame.DataFrame) → list

The Cassandra api uses tuples to send data for it’s prepared statements, this method converts the rows of a dataframe into a list of tuples. It also converts the Pandas specific datatypes like Timestamp and NaN to python datatypes like datetime and None :param dataframe: the dataframe to be converted to a list of tuples :type dataframe: DataFrame

Returns: list[tuple]

hip_data_tools.apache.cassandra.dicts_to_cassandra_tuples(data: list) → list

The Cassandra api uses tuples to send data for it’s prepared statements, this method converts the list of dictionaries into a list of tuples :param data: the list of dictonaries to be converted into a list of tuples :type data: list[dict]

Returns: list[tuple]

hip_data_tools.apache.cassandra.get_cql_columns_from_dataframe(data_frame)

Extracts a dictionary of column names and their cassandra data types from the dataframe :param data_frame: the dataframe whose columns need to be extracted :type data_frame: DataFrame

Returns: dict

hip_data_tools.apache.kafka module

This module provides a simple wrapper to enable the instantiation of kafka producers and consumers

class hip_data_tools.apache.kafka.BatchS3UploaderConfig(export_path=None, bucket=None, ts_col_nm=None, partition_key_nm=None)

Bases: object

Configurations for the BatchS3Uploader

Parameters:
  • export_path (str) – base path within the s3 bucket to export to
  • bucket (str) – name of the bucket to export results to
  • ts_col_nm (str) – name given to the time stamp column name
  • partition_key_nm (str) – name of the column used to store the temporal partitioning keys
export_path

base path within the s3 bucket to export to

Type:str
bucket

name of the bucket to export results to

Type:str
ts_col_nm

name given to the time stamp column name

Type:str
partition_key_nm

name of the column used to store the temporal partitioning keys

Type:str
class hip_data_tools.apache.kafka.ConsumerConfig(bootstrap_servers=None, conf=None, group_id=None)

Bases: object

Enscapsulation of the Kafka Consumer Configurations

Parameters:
  • bootstrap_servers (string) – address and port of bootstrap server
  • conf (string(dictionary)) – string of dictionary of configurations
  • group_id (string) – Group Id for the consumer
configs

Dictionary of the kafka configurations

Type:dict
hip_data_tools.apache.kafka.DEFAULT_CONSUMER_CONF = "{'auto.offset.reset': 'earliest'}"

Default configurations for the Kafka consumer, as string representing a python dictionary of configurations

hip_data_tools.apache.kafka.DEFAULT_KAFKA_TIMESTAMP_COLUMN_NAME = 'kafka_timestamp'

Default name of the column in which we store a message’s timestamp provided by Kafka

hip_data_tools.apache.kafka.DEFAULT_PRODUCER_CONF = "{'queue.buffering.max.messages': 10000,\n 'queue.buffering.max.ms' : 1000}"

Default configurations for the Kafka producer, as string representing a python dictionary of configurations

hip_data_tools.apache.kafka.DEFAULT_TIMESTAMP_PARTITION_KEY = 'partition_key_ts'

Default column name for the partitioning message batches uploaded to S3

class hip_data_tools.apache.kafka.KafkaConduit(kafka_poller, kafka_exporter, polling_interval)

Bases: object

This class polls the Kafka topic at set intervals and exports the results to an s3 bucket

Parameters:
  • kafka_poller (KafkaPoller) – Instantiated Kafka Poller
  • kafka_exporter (KafkaS3BatchExporter) – Exporter to parse and export Kafka messages
  • Interval (Polling) – Interval in Seconds at which to poll the Kafka topic and export the messages to s3
create_events_snapshot()

Get Kafka messages from a topic and export to s3

Returns: None

poll_topic_and_upload_to_s3()

Poll at the Kafka topic at set intervals and parse and export the messages to S3 Returns: None

class hip_data_tools.apache.kafka.KafkaPoller(kafka_consumer, topic, timeout_interval)

Bases: object

This class provides a wrapper around the Kafka consumer to enable polling of a kafka queue and aggregation of all messages into a single list

Parameters:
  • kafka_consumer (Consumer) – Kafka Consumer
  • topic (str) – Topic to subscribe to
  • timeout_interval (int) – Time int seconds before the consumer times out
kafka_consumer

Kafka Consumer

Type:Consumer
topic

Topic to subscribe to

Type:str
timeout_interval

Time int seconds before the consumer times out

Type:int
get_msgs()

Get the latest messages from the Kafka topic

Returns list(Message) : list of Kafka Messages

poll_kafka_for_messages()

Poll the kafka topic for messages

Returns list(Message): list of Kafka messages

class hip_data_tools.apache.kafka.KafkaPollerConfig(consumer=None, topic=None, timeout_interval=None)

Bases: object

Configuration for the poller class

Parameters:
  • consumer (Consumer) – Consumer configurations object
  • topic (str) – The topic to poll messages from
  • timeout_interval (int) – Timeout interval for reading the messages
class hip_data_tools.apache.kafka.KafkaProducer(topic=None, raise_exception_on_failed_connection=False, bootstrap_servers=None, config=None)

Bases: object

This class provide a wrapper around the lower level Kafka produce, and extends this base functionality with commonly used methods or approaches to dealing with a message queue

Parameters:
  • topic (str) – Name of Kafka topic to publish the message to
  • raise_exception_on_failed_connection (bool) – raise exception if the Kafka broker cannot be polled
  • bootstrap_servers – (str): String list of bootstrap servers and their ports e.g. ‘localhost:9092’ or ‘[broker1:9092, broker2:9092]’
  • config (str) – string representation of a python dictionary which encapsulates the required settings for the consumer
topic

Name of Kafka topic to publish the message to

Type:str
bootstrap_servers

(str): String list of bootstrap servers and their ports e.g. ‘localhost:9092’ or ‘[broker1:9092, broker2:9092]’

config

string representation of a python dictionary which encapsulates the required settings for the consumer

Type:str
producer

Kafka producer class

Type:Producer
instantiate_producer()

Try to connect to the Kafka bootstrap server. We include functionality to allow failure to connect to the queue to happen silently

Returns (Producer): Kafka Producer

produce_msg(msg)

Produces a message to the Kafka topic

Parameters:msg (str) – String to be push to the Kafka topic

Returns: None

publish_dict_as_json(in_dict)

Converts a dictionary to json and pushes it to the Kafka topic

Parameters:in_dict (dict) – Dictionary to be pushed to the topic

Returns (string): Message published to kafka

class hip_data_tools.apache.kafka.KafkaS3BatchExporter(root_path, s3_client, ts_col_nm, partition_key_nm)

Bases: object

This class enables the deposition of a batch of Kafka messages into a s3 bucket at set intervals

generate_partitioned_dataframes(df_msgs_and_meta_data)

Generates the batched dataframes to upload to s3 :param df_msgs_and_meta_data: dataframe to be paritioned :type df_msgs_and_meta_data: dataframe

Returns (List[DataFrame, str, str]): list of triples (df, s3_dir, file_name)

parse_and_export_msgs(list_of_msgs, interval)

Converts messages to a pandas dataframe and then exports to s3

Parameters:
  • list_of_msgs (list(Kafka Message Object)) – List of msg objects
  • interval (int) – Rounding interval for the temporal partitioning

Returns: None

partition_msgs_by_kafka_ts(list_of_dicts, interval)

Partition messages by their Kafka timestamps and uses these timestamps to generate their relevant paths on s3 :param list_of_dicts: list of dictionaries to upload to s3 :type list_of_dicts: list(dict) :param interval: Rounding interval for the temporal partitioning :type interval: int

Returns list((DataFrame, string, string)): Returns the dataframe for each temporal partition, the path to upload it to and file name

exception hip_data_tools.apache.kafka.NoProducerInstantiatedError

Bases: Exception

Exception raised when no producer has been instantiated

class hip_data_tools.apache.kafka.ProducerConfig(bootstrap_servers=None, conf=None)

Bases: object

Encapsulation of the kafka configurations, reading from environment if not supplied, and defaulting where necessary

Parameters:
  • bootstrap_servers (string) – address and port of bootstrap server
  • conf (string(dictionary)) – string of dictionary of configurations
configs

Dictionary of the kafka configurations

Type:dict
hip_data_tools.apache.kafka.add_interval_partitioning_column(msgs_df, time_column, partitioning_key, interval)

Adding column to a dataframe with a timestamp column rounded to a time interval.

Parameters:
  • msgs_df (DataFrame) – Pandas dataframe to with at least 1 timestamp column
  • time_column (str) – Name of the timestamp column to be rounded
  • partitioning_key (str) – Name of the column in which we create the
  • timestamps (rounded) –
  • interval (int) – Time interval (in seconds) to round timestamps to

Returns (DataFame): Returns a dataframe with an appended column of rounded timestamps

hip_data_tools.apache.kafka.convert_msgs_to_dictionary(list_of_msgs)

Converts json msgs into dictionaries, catching any badly formatted into string into their own list

Parameters:list_of_msgs (list(Message Object)) – list of json strings
Returns (list(dict), list(dict)): Returns correctly formatted dictionaries
in one list and any ill formatted strings in a second list
hip_data_tools.apache.kafka.create_batch_s3_uploader(batch_s3_uploader_config=None)

Factory method to generate a kafka batch uploadxr

Parameters:batch_s3_uploader_config (BatchS3UploaderConfig) – Configuration object for the s3 uploader

Returns (KafkaS3BatchExporter): Instantiated Exporter

hip_data_tools.apache.kafka.create_conduit(kafka_poller=None, kafka_exporter=None, polling_interval=None)

Factory method to create Kafka S3 Conduit, this polls the Kafka queue after a set interval and deposits the results onto s3

Parameters:
  • kafka_poller (KafkaPoller) – Kafka Poller used to poll results from a topic at set intervals
  • kafka_exporter (KafkaS3BatchExporter) – Batch Exporter
  • polling_interval (int) – Time in seconds between polling events

Returns (KafkaS3Conduit): Instantiated KafkaS3Conduit

hip_data_tools.apache.kafka.create_consumer(kafka_consumer_config=None)

Creates a Kafka consumer

Parameters:kafka_consumer_config (ConsumerConfig) – Instantiated kafka configuration object

Returns: Instantiated Kafka Consumer

hip_data_tools.apache.kafka.create_poller(kafka_poller_conf=None)

Factory method to create a Kafka Poller and take values from environment if they aren’t provided

Parameters:kafka_poller_conf (KafkaPollerConfig) – Kafka Consumer

Returns (KafkaPoller): Instantiated KafkaPoller

hip_data_tools.apache.kafka.create_producer(kafka_producer_config=None)

Creates a Kafka producer

Parameters:
  • kafka_producer_config (ProducerConfig) – Encapsulated
  • for the KafkaProducer (configurations) –

Returns (Producer): Instantiated Kafka Producer for a specified topic

hip_data_tools.apache.kafka.generate_snapshot_file_name_with_timestamp()
Generates the name of the snapshot folder using both the current time
and date

Returns (str): String containing the date and time stamped path

Module contents