API Documentation¶
ARN Helpers¶
- boto3_helpers.arn.construct_arn(existing=None, *, sts_client=None, **kwargs)[source]¶
Construct an ARN from an existing one.
existing is used as a template. If not provided, one will be derived from your IAM user or role.
sts_client is a
boto3.client('sts')instance. If not given, is created withboto3.client('sts'). This will only be used if existing is not supplied.kwargs can include any of the following:
partition,service,region,account_id,resource_type,resource_separator,resource_id.
Some ARNs end with
resource_id. You can construct these by supplyingresource_type=''andresource_id='DesiredID'.Other ARNs end with
resource_type:resource_id. You can construct these by supplyingresource_type='DesiredType',resource_separator=':', andresource_id='DesiredID'Still other ARNs end with
resource_type/resource_id. You can construct these by supplyingresource_type='DesiredType',resource_separator='/', andresource_id='DesiredID'Converting one format to another:
from boto3_helpers.arn import construct_arn existing = 'arn:aws:dynamodb:us-east-2:00000000:table/demo' new = construct_arn( existing, service='lambda', resource_type='function', resource_separator=':', resource_id='example' ) print(new) # arn:aws:lambda:us-east-2:00000000:function:example
Starting from your IAM user or role:
from boto3_helpers.arn import construct_arn new = construct_arn( service='dynamodb', resource_type='table', resource_id='demo' ) print(new) # arn:aws:dynamodb:us-east-2:00000000:table/demo
CloudWatch Helpers¶
- boto3_helpers.cloudwatch.yield_metric_data(namespace, metric_name, dimension_map, period, stat, start_time, end_time, cw_client=None, **kwargs)[source]¶
Yield all the data associated with a single metric. Each item yielded is a
(dt, value)pair.namespace is the namespace for the metric.
metric_name is the name of the metric.
dimension_map is a
dictthat maps dimension names to values, e.g.{'Name': 'Value'}. If the metric has no dimension, supply an emptydict.period is the granularity of the returned data points in seconds.
stat is the name of the statistic to evaluate, e.g.
Maximum.start_time is a
datetime.datetimeobject that specifies that beginning of the query.end_time is a
datetime.datetimeobject that specifies that end of the query.cw_client is a
boto3.client('cloudwatch')instance. If not given, is created withboto3.client('cloudwatch')kwargs can include
Unit,Expression,Period,AccountId,ScanBy,LabelOptions, andPaginationConfig. These are inserted at the appropriate place in theget_paginator('get_metric_data').paginatecall.
Usage:
from datetime import datetime, timezone from boto3_helpers.cloudwatch import yield_metric_data for dt, value in yield_metric_data( 'AWS/S3', 'NumberOfObjects', {'StorageType': 'AllStorageTypes', 'BucketName': 'example-bucket'}, 86400, 'Maximum', datetime(2022, 9, 1, 0, 0, 0, timezone.utc), datetime(2022, 9, 16, 0, 0, 0, timezone.utc), ): print(dt.isoformat(), value, sep=' ')
This function is designed to simplify the common case of pulling all data for a single metric, which is cumbersome with the normal CloudWatch
get_metric_datamethod.ScanByis set toTimestampAscendingby default, so data should be emitted in sorted order.
DynamoDB Helpers¶
- boto3_helpers.dynamodb.query_table(ddb_table, **kwargs)[source]¶
Yield all of the items that match the DynamoDB query:
ddb_table is a table name or a
boto3.resource('dynamodb').Tableinstance.kwargs are passed directly to the
Table.querymethod.
Usage:
from boto3 import resource as boto3_resource from boto3.dynamodb.conditions import Key from boto3_helpers.dynamodb import query_table ddb_resource = boto3_resource('dynamodb') ddb_table = ddb_resource.Table('example-table') condition = Key('username').eq('johndoe') all_items = list( query_table(ddb_table, KeyConditionExpression=condition) )
- boto3_helpers.dynamodb.scan_table(ddb_table, **kwargs)[source]¶
Yield all of the items that match the DynamoDB query:
ddb_table is a table name or a
boto3.resource('dynamodb').Tableinstance.kwargs are passed directly to the
Table.scanmethod.
Usage:
from boto3 import resource as boto3_resource from boto3.dynamodb.conditions import Attr from boto3_helpers.dynamodb import scan_table ddb_resource = boto3_resource('dynamodb') ddb_table = ddb_resource.Table('example-table') condition = Attr('username').eq('johndoe') all_items = list( scan_table(ddb_table, FilterExpression=condition) )
- boto3_helpers.dynamodb.update_attributes(ddb_table, key, update_map, **kwargs)[source]¶
Update a DyanmoDB table item and return the
update_itemresponse:ddb_table is a table name or a
boto3.resource('dynamodb').Tableinstance.key is a mapping that identifies the item to update.
update_map is a mapping of top-level item attributes to target values.
kwargs are passed directly to the the
Table.update_itemmethod.
Usage:
from boto3 import resource as boto3_resource from boto3_helpers.dynamodb import update_attributes ddb_resource = boto3_resource('dynamodb') ddb_table = ddb_resource.Table('example-table') key = {'username': 'janedoe', 'last_name': 'Doe'} update_map = {'age': 26} resp = update_attributes(ddb_table, key, update_map)
In the past, the
boto3DynamoDB library provided a simple means of updating items withAttributeUpdates. However, this parameter is deprecated. This function constructs equivalentUpdateExpressionandExpressionAttributeValuesparameters.Equivalent to:
ddb_resource = boto3_resource('dynamodb') ddb_table = ddb_resource.Table('example-table') key = {'username': 'janedoe', 'last_name': 'Doe'} resp = ddb_table.update_item( Key=key, UpdateExpression='SET age = :val1', ExpressionAttributeValues={':val1': 26}, )
Note that nested attributes (i.e. map attributes) can be updated, but that you need to provide values for the entire map.
# Suppose that DDB had this before: # { # "username": "janedoe", # "parameters": { # "age": 25, # "weight_kg": 70 # } # } # After this call, `parameters.age` will be 26, but there will # no longer be a `paramters.weight_kg`. ddb_resource = boto3_resource('dynamodb') ddb_table = ddb_resource.Table('example-table') key = {'username': 'janedoe', 'last_name': 'Doe'} update_map = {'parameters': {'age': 26}} resp = update_attributes(ddb_table, key, update_map)
- boto3_helpers.dynamodb.batch_yield_items(table_name, all_keys, ddb_resource=None, batch_size=100, backoff_base=0.1, backoff_max=5, **kwargs)[source]¶
Do a series a DyanmoDB
batch_get_itemqueries against a single table, taking care of retries and paging. Yield the returned items as they are available.table_name is the name of the table.
all_keys is an iterable of dictionaries with the keys for the
batch_get_itemoperation.ddb_resource is a
boto3.resource('dynamodb')instance. If not supplied, one will be created.batch_size is the number of items to request per page (default: 100).
backoff_base is the value, in seconds, of the exponential backoff base for retries.
backoff_max is the value, in seconds, of the maximum time to wait between retries.
kwargs are passed directly to the the
batch_get_itemmethod.
Usage:
from boto3_helpers.dynamodb import batch_yield_items all_keys = [ {'primary_key': '1', 'sort_key', 'a'}, {'primary_key': '1', 'sort_key', 'b'}, {'primary_key': '2', 'sort_key', 'a'}, {'primary_key': '2', 'sort_key', 'b'}, ] all_items = list('example-table', all_keys)
- boto3_helpers.dynamodb.fix_numbers(item)[source]¶
boto3infamously deserializes numeric types from DynamoDB to PythonDecimalobjects. This function changes these objects intointobjects andfloatobjects.from boto3 import resource as boto3_resource from boto3_helpers.dynamodb import fix_numbers ddb_resource = boto3_resource('dynamodb') ddb_table = ddb_resource.Table('example-table') resp = ddb_table.get_item(Key={'primary_key': 'FirstKey'}) item = resp['Item'] fixed_item = fix_numbers(item)
Note that
floatobjects may not be appropriate for all numeric computing needs, so think about what your application needs before using this function.
- boto3_helpers.dynamodb.load_dynamodb_json(text, use_decimal=False)[source]¶
The DynamoDB API returns JSON data with typing information. This function deserializes this JSON format into standard Python types.
from boto3 import resource as load_dynamodb_json text = '{"Item": {"some_number": {"N": "100"}}}' info = load_dynamodb_json(text) assert info['Item']['some_number'] == 100
JSON from the
GetItem,Query, andScanAPI endpoints is supported.If
use_decimalisTrue, numeric types will be deserialized todecimal.Decimalobjects. This matches theboto3client behavior, but is often inconvenient.
EventBridge Helpers¶
- boto3_helpers.events.describe_rule_with_targets(*, events_client=None, **kwargs)[source]¶
Return a
dictwith the information from thedescribe_rulecall combined with the information from thelist_targets_by_rulecall.events_client is a
boto3.client('events')instance. If not given, one will be created withboto3.client('events').Name is the name of the rule to be passed to
describe_rule. This is required.EventBusName is the name or ARN of the event bus associated with the rule. If omitted, the default event bus is used.
Sample output:
{ 'Name': 'example-rule', 'Arn': 'arn:aws:events:us-east-2:00000000:rule/example-rule', 'ScheduleExpression': 'rate(5 minutes)', 'State': 'ENABLED', 'EventBusName': 'default', 'CreatedBy': '00000000', 'Targets': [ { 'Id': 'Id1c4e7db5-6dd2-47e9-b7bc-98561ae038f7', 'Arn': 'arn:aws:lambda:us-east-2:00000000:function:example-func', 'Input': '{"hello": "world"}', } ], }
Usage:
from boto3 import client as boto3_client from boto3_helpers.events import describe_rule_with_targets events_client = boto3_client('events') rule_data = describe_rule_with_targets( Name='example-rule', EventBusName='example-bus' ) for target in rule['Targets']: print( rule_data['Arn'], rule_data['ScheduleExpression'], target['Arn'], sep=' ' )
Note
It’s possible for another API user to make changes in between the calls to
describe_ruleandlist_targets_by_rule. Govern yourself accordingly. This function is here to save you the trouble of making these calls manually.
- boto3_helpers.events.yield_rules_by_target(*, events_client=None, **kwargs)[source]¶
Yield a
dictwith information about each rule in thelist_rule_names_by_targetresponse.events_client is a
boto3.client('events')instance. If not given, one will be created withboto3.client('events').TargetArn is the ARN of the target. This is required
EventBusName is the name or ARN of the event bus to list rules for. If omitted, the default event bus is used.
See
describe_rule_with_targets()for the output format.Usage:
from boto3 import client as boto3_client from boto3_helpers.events import yield_rules_by_target events_client = boto3_client('events') for rule_data in yield_rules_by_target( TargetArn='arn:aws:lambda:us-east-2:00000000:function:example-func' ): print( rule_data['Arn'], rule_data['ScheduleExpression'], len(rule_data['Targets']), sep=' ' )
Note
It’s possible for another API user to make changes in between the calls to
list_rule_names_by_target,describe_rule, andlist_targets_by_rule. Govern yourself accordingly. This function is here to save you the trouble of making these calls manually.
- boto3_helpers.events.yield_rules_with_targets(*, events_client=None, **kwargs)[source]¶
Yield a
dictwith the information from thedescribe_rulecall combined with the information from thelist_targets_by_rulecall for each rule in thelist_rulesresponse.events_client is a
boto3.client('events')instance. If not given, one will be created withboto3.client('events').NamePrefix is an optional filtering prefix for rule name
EventBusName is the name or ARN of the event bus to list rules for. If omitted, the default event bus is used.
See
describe_rule_with_targets()for the output format.Usage:
from boto3 import client as boto3_client from boto3_helpers.events import yield_rules_with_targets events_client = boto3_client('events') for rule_data in yield_rules_with_targets(): print( rule_data['Name'], rule_data['ScheduleExpression'], len(rule_data['Targets']), sep=' ' )
Note
It’s possible for another API user to make changes in between the calls to
list_rulesandlist_targets_by_rule. Govern yourself accordingly. This function is here to save you the trouble of making these calls manually.
Lambda Helpers¶
- boto3_helpers.awslambda.update_environment_variables(function_name, new_env, *, lambda_client=None)[source]¶
Do a partial update of a Lambda function’s environment variables. Return the resulting environment.
function_name is the Lambda function name.
new_env is a mapping with the new environment variables.
lambda_client is a
boto3.client('lambda')instance. If not given, one will be created withboto3.client('lambda').
Usage:
from boto3_helpers.awslambda import update_environment_variables new_env = {'LOG_LEVEL': 'INFO', 'LOG_SERVER': '198.51.100.1'} result_env = update_environment_variables( 'ExamplePlaybackConfig', AdDecisionServerUrl='https://198.51.100.1:24601/ads/', ) assert result_env['LOG_LEVEL'] == 'INFO' assert result_env['LOG_SERVER'] == '198.51.100.1' assert result_env['LOG_PORT'] == '24601' # Or whatever it was before
The function’s existing environment variables will be fetched, merged with the new_env, and sent to the Lambda API.
Note
It’s possible for another API user to change environment variables in between this function’s calls to
get_function_configurationandupdate_function_configuration. The Lambda API doesn’t allow for atomic updates.
Kinesis Helpers¶
- boto3_helpers.kinesis.yield_all_shards(kinesis_client=None, **kwargs)[source]¶
Due to a bug in
botocore, thelist_shardspaginator does not work correctly. This function yields the information from all shards in a Kinesis stream.kinesis_client is a
boto3.client('kinesis_client')instance. If not given, one will be created withboto3.client('kinesis_client').kwargs are passed directly to the
list_shardsmethod. You’ll need to supply at least StreamARN or StreamName.
Usage:
from boto3_helpers.kinesis import yield_all_shards for shard in yield_all_shards(StreamName='example-stream'): print(shard['ShardId'])
- boto3_helpers.kinesis.yield_available_shard_records(kinesis_client=None, **kwargs)[source]¶
Yield all available records from the given Kinesis stream shard. Records will be pulled from until
MillisBehindLatestis zero.ShardId is the ID of the shard.
kinesis_client is a
boto3.client('kinesis_client')instance. If not given, one will be created withboto3.client('kinesis_client').kwargs are passed directly to the
get_shard_iteratormethod. You’ll need to supply at least StreamARN (or StreamName) and ShardId. By default you’ll get records from the stream’sTRIM_HORIZON.
Reading from the earliest available record:
from datetime import datetime, timedelta, timezone from boto3_helpers.kinesis import yield_available_shard_records for record in yield_available_shard_records( StreamName='example-stream', 'shard-0001' ): print(record['SequenceNumber'], record['Data'], sep=' ')
- boto3_helpers.kinesis.yield_available_stream_records(kinesis_client=None, **kwargs)[source]¶
Yield all available records from the given Kinesis stream. Records will be pulled from each of the stream’s shards until
MillisBehindLatestis zero. The shards’ records will be interleaved together (example: if a stream has three shards, the first record yielded will be from shard A, the second will be from shard B, the third will be from shard, the fourth will be from shard A, etc.).kinesis_client is a
boto3.client('kinesis_client')instance. If not given, one will be created withboto3.client('kinesis_client').kwargs are passed directly to the
get_shard_iteratormethod. You’ll need to supply at least StreamARN or StreamName. By default you’ll get records from the stream’sTRIM_HORIZON.
Reading from the earliest available record:
from datetime import datetime, timedelta, timezone from boto3_helpers.kinesis import yield_available_stream_records for record in yield_available_stream_records(StreamName='example-stream'): print(record['SequenceNumber'], record['Data'], sep=' ')
Reading from a particular timestamp:
from datetime import datetime, timedelta, timezone from boto3_helpers.kinesis import yield_available_stream_records for record in yield_available_stream_records( StreamName='example-stream', ShardIteratorType='AT_TIMESTAMP', Timestamp=datetime.now(timezone.utc) - timedelta(hours=1), ): print(record['SequenceNumber'], record['Data'], sep=' ')
Note
This is a synchronous function, and may not be fast enough for real-time processing of high volume streams.
MediaLive Helpers¶
- boto3_helpers.medialive.delete_schedule_action_chain(channel_id, delete_action_name, dry_run=False, eml_client=None)[source]¶
Delete a MediaLive scheduled action, plus any actions that depend on it. Return the names of the actions that were deleted.
channel_id is the MediaLive channel ID.
delete_action_name is the name of the scheduled action to delete.
dry_run determines whether the delete actions are actually executed. Set to
Falseto return the names of the actions that _would_ have been deleted.eml_client (optional) is a
boto3.client('medialive')instance.
Usage:
from boto3_helpers.medialive import delete_schedule_action_chain deleted_actions = delete_schedule_action_chain( '24601', 'switch-immediate' )
MediaLive’s deletion rules still apply: you can’t delete an action chain associated with the most recent input switch.
MediaTailor Helpers¶
- boto3_helpers.mediatailor.update_playback_configuration(config_name, emt_client=None, **config_kwargs)[source]¶
Do a partial update of a MediaTailor configuration and return the result:
config_name is the name of the playback configuration that will be updated.
emt_client is a
boto3.client('mediatailor')instance. If not given, is created withboto3.client('mediatailor').config_kwargs are passed directly to the
put_playback_configurationmethod.
Usage:
from boto3_helpers.mediatailor import update_playback_configuration new_config = update_playback_configuration( 'ExamplePlaybackConfig', AdDecisionServerUrl='https://198.51.100.1:24601/ads/', )
The existing playback configuration will be fetched, merged with the new values, and then sent to the MediaTailor API.
Note
It’s possible for another API user to change the playback configuration in between this function’s calls to
get_playback_configurationandput_playback_configuration. The MediaTailor API doesn’t allow for atomic updates.
Pagination Helpers¶
- boto3_helpers.pagination.yield_all_items(boto_client, method_name, list_key, **kwargs)[source]¶
A helper function that simplifies retrieving items from API endpoints that require paging. Yields each item from every page:
boto_client is a
boto3.client()instance for the relevant service.method_name is the name of the client method that requires paging.
list_key is the name of the top-level key in the method’s response that corresponds to the desired list of items. If the method’s response has multiple levels, use a
jmespathexpression.kwargs are passed through to the appropriate
paginatemethod.
EC2 example:
from boto3 import client as boto3_client from boto3_helpers.pagination import yield_all_items ec2_client = boto3_client('ec2') for item in yield_all_items( ec2_client, 'describe_instances', 'Reservations' ): print(item['ReservationId'])
In this example, the
list_keyfor EC2’sdescribe_instancesis'Reservations'.S3 example:
from boto3 import client as boto3_client from boto3_helpers.pagination import yield_all_items s3_client = boto3_client('s3') for item in yield_all_items( s3_client, 'list_objects_v2', 'Contents', Bucket='example-bucket', Prefix='example-prefix/' ): print(item['Key'])
In this example, the
list_keyfor S3’slist_objects_v2is'Contents'.CloudFront example:
from boto3 import client as boto3_client from boto3_helpers.pagination import yield_all_items cloudfront_client = boto3_client('cloudfront') for item in yield_all_items( cloudfront_client, 'list_distributions', 'DistributionList.Items' ): print(item['Id'])
S3 Helpers¶
- boto3_helpers.s3.query_object(bucket, key, query, input_format, *, s3_client=None, **kwargs)[source]¶
Runs an S3 Select query on the given object and yields each of the matching records.
bucket is the S3 bucket to use
key is the key to query
query is the S3 Select SQL query to use
input_format this can be
json,json.gz,jsonl,jsonl.gz,csv,csv.gz,tsv,tsv.gz, orNone.s3_client is a
boto3.client('s3')instance. If not given, one will be created withboto3.client('s3').kwargs are passed to the
select_object_contentmethod.
The
csv,csv.gz,tsv, andtsv.gzinput formats assume a usable header row. To customize the input format, set input_format toNoneand specifyInputSerializationin kwargs.Each of the output records will be decoded with
json.loadsbefore being yielded. The function takes care of combining partial records from S3’s event stream.from boto3_helpers.s3 import query_object for record in query_object( 'ExampleBucket', 'ExamplePath/ExampleKey.jsonl.gz', 'SELECT * FROM s3object s', 'jsonl.gz', ): print(record['SomeField'], record['OtherField'], sep=' ')
- boto3_helpers.s3.head_bucket(bucket, s3_client=None, **kwargs)[source]¶
Perform a
HeadBucketAPI call and return the response. If the given bucket does not exist, raises3_client.exceptions.NoSuchBucketbucket is the S3 bucket to use
s3_client is a
boto3.client('s3')instance. If not given, one will be created withboto3.client('s3').kwargs are passed to the
head_bucketmethod.
The
boto3docs infamously claim that the head_bucket method can raiseS3.Client.exceptions.NoSuchBucket, leading reasonable people to assume that this exception will be raised if the requested bucket does not exist. Alas, it raises a genericClientErrorinstead. This function fixes the problem.from boto3 import client as boto3_client from boto3_helpers.s3 import head_bucket s3_client = boto3_client('s3') try: head_bucket('ExampleBucket', s3_client=s3_client) except s3_client.exceptions.NoSuchBucket: print('No such bucket') else: print('That bucket exists')
- boto3_helpers.s3.create_bucket(bucket, region_name='us-east-1', s3_client=None, **kwargs)[source]¶
Create a bucket in the given region_name.
bucket is the S3 bucket to use
region_name is the region to use.
s3_client is a
boto3.client('s3')instance. If not given, one will be created withboto3.client('s3').kwargs are passed to the
create_bucketmethod.
This helper smooths out a quirk in the S3 API. To create buckets outside of the us-east-1 region, you must specify a
LocationConstraint. But to create a bucket in us-east-1, you must not use aLocationConstraint.from boto3 import client as boto3_client from boto3_helpers.s3 import create_bucket s3_client = boto3_client('s3', region_name='us-west-1') create_bucket( 'ExampleBucket', region_name='us-west-1', s3_client=s3_client )
Signed Request Helpers¶
- exception boto3_helpers.signed_requests.SigV4RequestException(status_code, content)[source]¶
Exception raised by
sigv4_request()when an HTTP response indicates an error.
- boto3_helpers.signed_requests.sigv4_request(service, method, endpoint, client=None, base_url=None, operation_name=None, **kwargs)[source]¶
Make a signed request to the AWS API and return the JSON payload.
service is the AWS API service code
method is an HTTP method like
'GET'or'POST'endpoint is the target API endpoint. If you need to supply parameters, put supply them as a query string here (e.g.,
?MaxResults=1)client is a
boto3.clientinstance for the same account and region as your target. If not given, is created withboto3.client('sts')base_url is the URL for the target AWS API. If not given, a guess will be made based on the service name and client region.
operation_name is the name of the API operation to use when signing the request
kwargs are passed on to an
AWSRequestobject.
If the API response indicates an error,
boto3_helpers.signed_requests.SigV4RequestExceptionwill be raised.This function is useful for accessing endpoints that aren’t supported by
boto3. For example,botocoreintroduced support for the'scheduler'service in version 1.29.7. You could have used this function to interact with that API before this new version was available:from boto3_helpers.signed_requests import sigv4_request schedule_data = sigv4_request('scheduler', 'GET', 'schedules')
Explanation:
The EventBridge Scheduler API Reference describes the
ListSchedulesAPI action.The service code for EventBridge Scheduler is
'scheduler'.The method for
ListSchedulesis'GET'.The endpoint is
'/schedules'. This function will strip off leading slashes.
We could haved optionally supplied the
operation_nameas'ListSchedules'.Example: Invoking a Lambda function URL that uses the
AWS_IAMauth type:from boto3_helpers.signed_requests import sigv4_request resp = sigv4_request( 'lambda', 'POST', '/', base_url='https://660d26cd.lambda-url.test-region-1.on.aws', data=dumps({'payload_key_1': 'payload_value_1'}) )
SQS Helpers¶
- boto3_helpers.sqs.send_batches(queue_url, all_messages, sqs_client=None, message_limit=10, size_limit=262144)[source]¶
Call
send_message_batchas many times as necessary to deliver the messages in all_messages, creating batches that fit SQS limits automatically.queue_url is the URL of the SQS queue.
all_messages is an iterable of message entries, like what you would use for
send_messageorsend_message_batch.sqs_client is a
boto3.client('sqs')instance. If not given, is created withboto3.client('sqs').message_limit is
10by default. This is the maximum number of messages to be sent per batch.size_limit is
262_144(256 KiB) by default. This is the maximum batch payload size.
Return value:
{ 'Successful': [ { 'Id': 'string', 'MessageId': 'string', 'MD5OfMessageBody': 'string', 'MD5OfMessageAttributes': 'string', 'MD5OfMessageSystemAttributes': 'string', 'SequenceNumber': 'string' }, ], 'Failed': [ { 'Id': 'string', 'SenderFault': bool, 'Code': 'string', 'Message': 'string' }, ] }
If you don’t supply an
Idparameter in your messages, one will be inserted automatically.Messages from all_messages are collected in order. If the number of message reaches the message_limit or the combined payload size of the messages reaches size_limit, a new batch will be started. The size calculation includes message attributes.
Usage:
from boto3_helpers.sqs import send_batches queue_url = 'https://sqs.test-region-1.amazonaws.com/000000000000/test-queue' all_messages = [ {'MessageBody': 'Beautiful is better than ugly'}, {'MessageBody': 'Explicit is better than implicit', 'DelaySeconds': 120}, {'MessageBody': 'Simple is better than complex'}, # Fill this in with an arbitrary number of messages ] send_batches(queue_url, all_messages)
- boto3_helpers.sqs.delete_batches(queue_url, all_messages, sqs_client=None, message_limit=10)[source]¶
Call
delete_message_batchas many times as necessary to delete the messages in all_messages, creating batches that fit SQS limits automatically.queue_url is the URL of the SQS queue.
all_messages is an iterable of message entries, like what you would use for
delete_messageordelete_message_batch.sqs_client is a
boto3.client('sqs')instance. If not given, is created withboto3.client('sqs').message_limit is
10by default. This is the maximum number of messages to delete per batch.
Return value:
{ 'Successful': [ { 'Id': 'string', 'MessageId': 'string', 'MD5OfMessageBody': 'string', 'MD5OfMessageAttributes': 'string', 'MD5OfMessageSystemAttributes': 'string', 'SequenceNumber': 'string' }, ], 'Failed': [ { 'Id': 'string', 'SenderFault': bool, 'Code': 'string', 'Message': 'string' }, ] }
The items in all_messages only need to have a
ReceiptHandlekey in them. This means you can pass in messages you get from thereceive_messagesmethod directly.Usage:
from boto3_helpers.sqs import delete_batches queue_url = 'https://sqs.test-region-1.amazonaws.com/000000000000/test-queue' all_messages = [ {'ReceiptHandle': 'UmVjZWlwdCBoYW5kbGUgMQ=='}, {'ReceiptHandle': 'U2Vjb25kIHJlY2VpcHQgaGFuZGxl'}, {'Id': '24601', 'ReceiptHandle': 'VGhpcyBvbmUgaGFzIGl0cyBvd24gSUQ='}, # Fill this in with an arbitrary number of messages ] delete_batches(queue_url, all_messages)
STS Helpers¶
- boto3_helpers.sts.assumed_role_session(sts_client=None, session_kwargs=None, **assume_role_kwargs)[source]¶
Return a
boto3.Sessionobject for an assumed role:sts_client is a
boto3.client('sts')instance. If not given, one will be created withboto3.client('sts').session_kwargs are the keyword arguments you want to pass to the
boto3.Session()constructor.assume_role_kwargs are the arguments for the
assume_roleoperation, which at least includeRoleArn. IfRoleSessionNameis not given, a randomly-generated one will be used.
Usage:
from boto3_helpers.sts import assumed_role_session role_arn = 'arn:aws:iam::000000000000:role/TargetRole' session = assumed_role_session(RoleArn=role_arn)
This is equivalent to:
from boto3 import ( client as boto3_client, Session as boto3_session, ) sts_client = boto3_client('sts') role_arn = 'arn:aws:iam::000000000000:role/TargetRole' session_name = 'AssumedRoleSession1' resp = sts_client.assume_role( RoleArn=role_arn, RoleSessionName=session_name ) credentials = resp['credentials'] session = boto3_session( aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'], )
- boto3_helpers.sts.assumed_role_client(service_name, *, sts_client=None, client_kwargs=None, **assume_role_kwargs)[source]¶
Return a
boto3.clientobject for an assumed role:service_name is the name of a service.
sts_client is a
boto3.client('sts')instance. If not given, one will be created withboto3.client('sts').client_kwargs are the keyword arguments you want to pass to the
boto3.client()constructor.assume_role_kwargs are the arguments for the
assume_roleoperation, which at least includeRoleArn. IfRoleSessionNameis not given, a randomly-generated one will be used.
Usage:
from boto3_helpers.sts import assumed_role_client client_kwargs = {'region_name': 'us-east-2'} role_arn = 'arn:aws:iam::000000000000:role/TargetRole' sqs_client = assumed_role_client( 'sqs', client_kwargs, RoleArn=role_arn )
- boto3_helpers.sts.assumed_role_resource(service_name, *, sts_client=None, resource_kwargs=None, **assume_role_kwargs)[source]¶
Return a
boto3.resourceobject for an assumed role:service_name is the name of a service.
sts_client is a
boto3.client('sts')instance. If not given, one will be created withboto3.client('sts').resource_kwargs are the keyword arguments you want to pass to the
boto3.resource()constructor.assume_role_kwargs are the arguments for the
assume_roleoperation, which at least includeRoleArn. IfRoleSessionNameis not given, a randomly-generated one will be used.
Usage:
from boto3_helpers.sts import assumed_role_resource resource_kwargs = {'region_name': 'us-east-2'} role_arn = 'arn:aws:iam::000000000000:role/TargetRole' dynamodb_resource = assumed_role_resource( 'dynamodb', resource_kwargs, RoleArn=role_arn )