bolster.utils.aws

AWS based Asset handling.

Includes S3, Kinesis, SSM, SQS, Lambda self-invocation and Redshift querying helpers

Attributes

logger

session

Classes

KinesisLoader

Kinesis batchwise insertion handler with chunking and retry.

Functions

chunks(iterable[, size])

Outputs <list> chunks of size N from an iterable (generator).

start_session(*args[, restart])

Initialize or return existing AWS session.

get_s3_client()

Get configured S3 client with path-style addressing and retry settings.

put_s3(obj, key, bucket[, keys, gzip, client])

Take either a list of dicts (and dump them as csv to s3) or a StringIO buffer (and dump-as-is to s3).

get_s3(key, bucket[, gzip, log_exception, client])

Get Object from S3, generally with gzip decompression included.

check_s3(key, bucket[, client])

https://www.peterbe.com/plog/fastest-way-to-find-out-if-a-file-exists-in-s3.

get_matching_s3_objects(bucket[, prefix, suffix, client])

Generate objects in an S3 bucket.

get_matching_s3_keys(bucket, **kwargs)

Generate the keys in an S3 bucket.

select_from_csv(bucket, key, fields[, client])

Query specific fields from a CSV file in S3 using S3 Select.

get_latest_key(prefix, bucket[, key, client])

Walk a given S3 bucket for the lexicographically highest item in the given bucket.

get_sqs_client()

Get configured SQS client with timeout and retry settings.

send_to_sqs(records, queue[, chunksize, client])

Send records in chunks of chunksize for a given sqs queue in json-serialised format.

get_ssm_client()

Get configured SSM client for parameter store access.

get_ssm_param(param_name[, client])

Locally memoized getter for configuration parameters stored in the AWS "Simple Systems Manager" Parameter Store.

fh_json_decode(content)

Customised JSON Decoder for consuming Firehose batched records.

decapsulate_kinesis_payloads(event)

Decapsulate base64 encoded kinesis data records to a list.

iterate_kinesis_payloads(event)

Iterate over a base64 encoded kinesis data record, yielding entries.

send_to_kinesis(records, stream[, partition_key])

Accessory function for the KinesisLoader class.

get_sns_client()

Get configured SNS client for notifications.

invoke_self_async(event, context)

Have the Lambda invoke itself asynchronously, passing the same event it received originally.

query(q, redshift_conn_dict[, named_cursor])

Helper for making queries to redshift (or any postgres compatible backend).

SQSWrapper(event, context, queuename, function[, ...])

Process SQS messages with automatic reinvocation and error handling.

Package Contents

bolster.utils.aws.chunks(iterable, size=10)[source]

Outputs <list> chunks of size N from an iterable (generator).

Parameters:

Returns: >>> next((b for b in chunks(range(10), 2))) [0, 1] >>> [b for b in chunks(list(range(10)), 2)] [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

bolster.utils.aws.logger[source]
bolster.utils.aws.session: boto3.Session | None = None[source]
bolster.utils.aws.start_session(*args, restart=False, **kwargs)[source]

Initialize or return existing AWS session.

bolster.utils.aws.get_s3_client()[source]

Get configured S3 client with path-style addressing and retry settings.

bolster.utils.aws.put_s3(obj, key, bucket, keys=None, gzip=True, client=None)[source]

Take either a list of dicts (and dump them as csv to s3) or a StringIO buffer (and dump-as-is to s3).

Parameters:
  • obj (collections.abc.Sequence[dict] | io.StringIO) – List of records to be written to CSV (or StringIO for direct upload):

  • key (str) – Destination Key

  • bucket (str) – Destination Bucket (Default value = S3_ANALYSIS_STORE)

  • keys – List of expected keys, can be used to filter or set the order of key entry in the resultant file (Default value = None)

  • gzip (bool) – Compress the object (Default value = True)

  • client – Optional S3 client instance, created automatically if not provided

bolster.utils.aws.get_s3(key, bucket, gzip=True, log_exception=True, client=None)[source]

Get Object from S3, generally with gzip decompression included.

Parameters:
  • key (str) – str:

  • bucket (str) – str: (Default value = S3_ANALYSIS_STORE)

  • gzip (bool) – bool: (Default value = True)

  • log_exception – Whether to log exceptions that occur during retrieval

  • client – Optional S3 client instance, created automatically if not provided

bolster.utils.aws.check_s3(key, bucket, client=None)[source]

https://www.peterbe.com/plog/fastest-way-to-find-out-if-a-file-exists-in-s3.

Parameters:
  • key (str) – str:

  • bucket (str) – str: (Default value = S3_ANALYSIS_STORE)

  • client – Optional S3 client instance, created automatically if not provided

bolster.utils.aws.get_matching_s3_objects(bucket, prefix='', suffix='', client=None)[source]

Generate objects in an S3 bucket.

https://alexwlchan.net/2018/01/listing-s3-keys-redux/

Parameters:
  • bucket (AnyStr) – Name of the S3 bucket.

  • prefix – Only fetch objects whose key starts with this prefix (optional).

  • suffix – Only fetch objects whose keys end with this suffix (optional).

bolster.utils.aws.get_matching_s3_keys(bucket, **kwargs)[source]

Generate the keys in an S3 bucket.

https://alexwlchan.net/2018/01/listing-s3-keys-redux/.

Parameters:
  • bucket (AnyStr) – Name of the S3 bucket.

  • prefix – Only fetch keys that start with this prefix (optional).

  • suffix – Only fetch keys that end with this suffix (optional).

bolster.utils.aws.select_from_csv(bucket, key, fields, client=None)[source]

Query specific fields from a CSV file in S3 using S3 Select.

bolster.utils.aws.get_latest_key(prefix, bucket, key=None, client=None)[source]

Walk a given S3 bucket for the lexicographically highest item in the given bucket.

Defaults to the analysis store defined in utils.env.

Accepts a key callable that can be used to decide how the candidate keys are sorted.

For example, to use loose-versioning, distutils.version.LooseVersion can be passed as the key argument

Parameters:
  • prefix (str) – str:

  • bucket (str) – str: (Default value = S3_ANALYSIS_STORE)

  • key (collections.abc.Callable | None) – Optional[Callable]: (Default value = None)

  • client – Optional S3 client instance, created automatically if not provided

bolster.utils.aws.get_sqs_client()[source]

Get configured SQS client with timeout and retry settings.

bolster.utils.aws.send_to_sqs(records, queue, chunksize=1, client=None)[source]

Send records in chunks of chunksize for a given sqs queue in json-serialised format.

Parameters:
  • records (collections.abc.Iterator) – Iterator:

  • queue (str) – str:

  • chunksize (int) – int: (Default value = 1)

  • client – Optional SQS client instance, created automatically if not provided

bolster.utils.aws.get_ssm_client()[source]

Get configured SSM client for parameter store access.

bolster.utils.aws.get_ssm_param(param_name, client=None)[source]

Locally memoized getter for configuration parameters stored in the AWS “Simple Systems Manager” Parameter Store.

Now just systems manager.

Parameters:
  • param_name (str) – str:

  • client – Optional SSM client instance, created automatically if not provided

bolster.utils.aws.fh_json_decode(content)[source]

Customised JSON Decoder for consuming Firehose batched records.

Firehose doesn’t include entry separators between entries, so we intercept the raw_decoder on JSONDecodeError and ‘skip’ over the ‘where is my comma?’ issue and continue to parse the rest of the content until we reach the end of the given content string.

Parameters:

content (AnyStr) – AnyStr:

>>> list(fh_json_decode('{"test":"value"}{"test":"othervalue"}'))
[{'test': 'value'}, {'test': 'othervalue'}]
bolster.utils.aws.decapsulate_kinesis_payloads(event)[source]

Decapsulate base64 encoded kinesis data records to a list.

Parameters:

event (dict) – Dict:

bolster.utils.aws.iterate_kinesis_payloads(event)[source]

Iterate over a base64 encoded kinesis data record, yielding entries.

Parameters:
  • event (dict) – return:

  • event – Dict:

class bolster.utils.aws.KinesisLoader(batch_size=500, maximum_records=None, stream=None)[source]

Kinesis batchwise insertion handler with chunking and retry.

The default batch_size here is to match the maximum allowed by Kinesis in a PutRecords request.

batch_size[source]
maximum_records = None[source]
kinesis_client[source]
stream = None[source]
generate_and_submit(items, partition_key=None)[source]

Submit batches of items to the configured stream.

Parameters:
submit_batch_until_successful(this_batch, response)[source]

If needed, retry a batch of records, backing off exponentially until it goes through.

Parameters:
  • this_batch (list) – List:

  • response (dict) – Dict:

bolster.utils.aws.send_to_kinesis(records, stream, partition_key=None)[source]

Accessory function for the KinesisLoader class.

Parameters:
bolster.utils.aws.get_sns_client()[source]

Get configured SNS client for notifications.

bolster.utils.aws.invoke_self_async(event, context)[source]

Have the Lambda invoke itself asynchronously, passing the same event it received originally.

Tags the event as ‘async’ so it’s actually processed.

THIS DOES NOT WORK FROM WITHIN A VPC! (There is no lambda-invoke endpoint accessible without poking lots of holes in the VPC.

Parameters:
  • event (dict) – Dict:

  • context (Any) – Any:

bolster.utils.aws.query(q, redshift_conn_dict, named_cursor='bolster_query_cursor', **kwargs)[source]

Helper for making queries to redshift (or any postgres compatible backend).

{
  "user":"USERNAME",
  "host":"HOSTNAME",
  "connect_timeout":3,
  "dbname":"DATABASE",
  "port":5439,
  "password":"SUPERSECRETPASSWORD1111"
}

This function implements the ‘is_local’ check if it is getting it’s configuration dictionary from the parameter store, and will overwrite the ‘host’ in the store with a resolvable hostname for the ALDS datastore.

Basically, if you’re not working on ALDS, in a few very specific locations, or are outside the ALDS VPC, give this a sensible dictionary.

kwargs are passed through as vars to the SQL execution, i.e. to be used with substitution queries, eg:

query("select * from table where id = %(my_id)s", my_id = 14228)

NOTE! If you use % wildcards (i.e. LIKE ‘%string’), you’re gonna have a bad time… (Use the POSIX regex instead: https://docs.aws.amazon.com/redshift/latest/dg/pattern-matching-conditions-posix.html)

Parameters:
  • q (str) – str:

  • redshift_conn_dict (dict) – dict: (Default value = None)

  • named_cursor – Name for the database cursor

  • **kwargs – Additional parameters passed to SQL execution for query substitution

bolster.utils.aws.SQSWrapper(event, context, queuename, function, timeout=60000, reinvokelimit=10, maxmessages=1, raise_exceptions=True, deduplicate=False, fkwargs=None, client=None)[source]

Process SQS messages with automatic reinvocation and error handling.