API Documentation¶
ARN Helpers¶
- boto3_helpers.arn.construct_arn(existing=None, *, sts_client=None, **kwargs)[source]¶
Construct an ARN from an existing one.
existing is used as a template. If not provided, one will be derived from your IAM user or role.
sts_client is a
boto3.client('sts')
instance. If not given, is created withboto3.client('sts')
. This will only be used if existing is not supplied.kwargs can include any of the following:
partition
,service
,region
,account_id
,resource_type
,resource_separator
,resource_id
.
Some ARNs end with
resource_id
. You can construct these by supplyingresource_type=''
andresource_id='DesiredID'
.Other ARNs end with
resource_type:resource_id
. You can construct these by supplyingresource_type='DesiredType'
,resource_separator=':'
, andresource_id='DesiredID'
Still other ARNs end with
resource_type/resource_id
. You can construct these by supplyingresource_type='DesiredType'
,resource_separator='/'
, andresource_id='DesiredID'
Converting one format to another:
from boto3_helpers.arn import construct_arn existing = 'arn:aws:dynamodb:us-east-2:00000000:table/demo' new = construct_arn( existing, service='lambda', resource_type='function', resource_separator=':', resource_id='example' ) print(new) # arn:aws:lambda:us-east-2:00000000:function:example
Starting from your IAM user or role:
from boto3_helpers.arn import construct_arn new = construct_arn( service='dynamodb', resource_type='table', resource_id='demo' ) print(new) # arn:aws:dynamodb:us-east-2:00000000:table/demo
CloudWatch Helpers¶
- boto3_helpers.cloudwatch.yield_metric_data(namespace, metric_name, dimension_map, period, stat, start_time, end_time, cw_client=None, **kwargs)[source]¶
Yield all the data associated with a single metric. Each item yielded is a
(dt, value)
pair.namespace is the namespace for the metric.
metric_name is the name of the metric.
dimension_map is a
dict
that maps dimension names to values, e.g.{'Name': 'Value'}
. If the metric has no dimension, supply an emptydict
.period is the granularity of the returned data points in seconds.
stat is the name of the statistic to evaluate, e.g.
Maximum
.start_time is a
datetime.datetime
object that specifies that beginning of the query.end_time is a
datetime.datetime
object that specifies that end of the query.cw_client is a
boto3.client('cloudwatch')
instance. If not given, is created withboto3.client('cloudwatch')
kwargs can include
Unit
,Expression
,Period
,AccountId
,ScanBy
,LabelOptions
, andPaginationConfig
. These are inserted at the appropriate place in theget_paginator('get_metric_data').paginate
call.
Usage:
from datetime import datetime, timezone from boto3_helpers.cloudwatch import yield_metric_data for dt, value in yield_metric_data( 'AWS/S3', 'NumberOfObjects', {'StorageType': 'AllStorageTypes', 'BucketName': 'example-bucket'}, 86400, 'Maximum', datetime(2022, 9, 1, 0, 0, 0, timezone.utc), datetime(2022, 9, 16, 0, 0, 0, timezone.utc), ): print(dt.isoformat(), value, sep=' ')
This function is designed to simplify the common case of pulling all data for a single metric, which is cumbersome with the normal CloudWatch
get_metric_data
method.ScanBy
is set toTimestampAscending
by default, so data should be emitted in sorted order.
DynamoDB Helpers¶
- boto3_helpers.dynamodb.query_table(ddb_table, **kwargs)[source]¶
Yield all of the items that match the DynamoDB query:
ddb_table is a table name or a
boto3.resource('dynamodb').Table
instance.kwargs are passed directly to the
Table.query
method.
Usage:
from boto3 import resource as boto3_resource from boto3.dynamodb.conditions import Key from boto3_helpers.dynamodb import query_table ddb_resource = boto3_resource('dynamodb') ddb_table = ddb_resource.Table('example-table') condition = Key('username').eq('johndoe') all_items = list( query_table(ddb_table, KeyConditionExpression=condition) )
- boto3_helpers.dynamodb.scan_table(ddb_table, **kwargs)[source]¶
Yield all of the items that match the DynamoDB query:
ddb_table is a table name or a
boto3.resource('dynamodb').Table
instance.kwargs are passed directly to the
Table.scan
method.
Usage:
from boto3 import resource as boto3_resource from boto3.dynamodb.conditions import Attr from boto3_helpers.dynamodb import scan_table ddb_resource = boto3_resource('dynamodb') ddb_table = ddb_resource.Table('example-table') condition = Attr('username').eq('johndoe') all_items = list( scan_table(ddb_table, FilterExpression=condition) )
- boto3_helpers.dynamodb.update_attributes(ddb_table, key, update_map, **kwargs)[source]¶
Update a DyanmoDB table item and return the
update_item
response:ddb_table is a table name or a
boto3.resource('dynamodb').Table
instance.key is a mapping that identifies the item to update.
update_map is a mapping of top-level item attributes to target values.
kwargs are passed directly to the the
Table.update_item
method.
Usage:
from boto3 import resource as boto3_resource from boto3_helpers.dynamodb import update_attributes ddb_resource = boto3_resource('dynamodb') ddb_table = ddb_resource.Table('example-table') key = {'username': 'janedoe', 'last_name': 'Doe'} update_map = {'age': 26} resp = update_attributes(ddb_table, key, update_map)
In the past, the
boto3
DynamoDB library provided a simple means of updating items withAttributeUpdates
. However, this parameter is deprecated. This function constructs equivalentUpdateExpression
andExpressionAttributeValues
parameters.Equivalent to:
ddb_resource = boto3_resource('dynamodb') ddb_table = ddb_resource.Table('example-table') key = {'username': 'janedoe', 'last_name': 'Doe'} resp = ddb_table.update_item( Key=key, UpdateExpression='SET age = :val1', ExpressionAttributeValues={':val1': 26}, )
Note that nested attributes (i.e. map attributes) can be updated, but that you need to provide values for the entire map.
# Suppose that DDB had this before: # { # "username": "janedoe", # "parameters": { # "age": 25, # "weight_kg": 70 # } # } # After this call, `parameters.age` will be 26, but there will # no longer be a `paramters.weight_kg`. ddb_resource = boto3_resource('dynamodb') ddb_table = ddb_resource.Table('example-table') key = {'username': 'janedoe', 'last_name': 'Doe'} update_map = {'parameters': {'age': 26}} resp = update_attributes(ddb_table, key, update_map)
EventBridge Helpers¶
- boto3_helpers.events.describe_rule_with_targets(*, events_client=None, **kwargs)[source]¶
Return a
dict
with the information from thedescribe_rule
call combined with the information from thelist_targets_by_rule
call.events_client is a
boto3.client('events')
instance. If not given, one will be created withboto3.client('events')
.Name is the name of the rule to be passed to
describe_rule
. This is required.EventBusName is the name or ARN of the event bus associated with the rule. If omitted, the default event bus is used.
Sample output:
{ 'Name': 'example-rule', 'Arn': 'arn:aws:events:us-east-2:00000000:rule/example-rule', 'ScheduleExpression': 'rate(5 minutes)', 'State': 'ENABLED', 'EventBusName': 'default', 'CreatedBy': '00000000', 'Targets': [ { 'Id': 'Id1c4e7db5-6dd2-47e9-b7bc-98561ae038f7', 'Arn': 'arn:aws:lambda:us-east-2:00000000:function:example-func', 'Input': '{"hello": "world"}', } ], }
Usage:
from boto3 import client as boto3_client from boto3_helpers.events import describe_rule_with_targets events_client = boto3_client('events') rule_data = describe_rule_with_targets( Name='example-rule', EventBusName='example-bus' ) for target in rule['Targets']: print( rule_data['Arn'], rule_data['ScheduleExpression'], target['Arn'], sep=' ' )
Note
It’s possible for another API user to make changes in between the calls to
describe_rule
andlist_targets_by_rule
. Govern yourself accordingly. This function is here to save you the trouble of making these calls manually.
- boto3_helpers.events.yield_rules_by_target(*, events_client=None, **kwargs)[source]¶
Yield a
dict
with information about each rule in thelist_rule_names_by_target
response.events_client is a
boto3.client('events')
instance. If not given, one will be created withboto3.client('events')
.TargetArn is the ARN of the target. This is required
EventBusName is the name or ARN of the event bus to list rules for. If omitted, the default event bus is used.
See
describe_rule_with_targets()
for the output format.Usage:
from boto3 import client as boto3_client from boto3_helpers.events import yield_rules_by_target events_client = boto3_client('events') for rule_data in yield_rules_by_target( TargetArn='arn:aws:lambda:us-east-2:00000000:function:example-func' ): print( rule_data['Arn'], rule_data['ScheduleExpression'], len(rule_data['Targets']), sep=' ' )
Note
It’s possible for another API user to make changes in between the calls to
list_rule_names_by_target
,describe_rule
, andlist_targets_by_rule
. Govern yourself accordingly. This function is here to save you the trouble of making these calls manually.
- boto3_helpers.events.yield_rules_with_targets(*, events_client=None, **kwargs)[source]¶
Yield a
dict
with the information from thedescribe_rule
call combined with the information from thelist_targets_by_rule
call for each rule in thelist_rules
response.events_client is a
boto3.client('events')
instance. If not given, one will be created withboto3.client('events')
.NamePrefix is an optional filtering prefix for rule name
EventBusName is the name or ARN of the event bus to list rules for. If omitted, the default event bus is used.
See
describe_rule_with_targets()
for the output format.Usage:
from boto3 import client as boto3_client from boto3_helpers.events import yield_rules_with_targets events_client = boto3_client('events') for rule_data in yield_rules_with_targets(): print( rule_data['Name'], rule_data['ScheduleExpression'], len(rule_data['Targets']), sep=' ' )
Note
It’s possible for another API user to make changes in between the calls to
list_rules
andlist_targets_by_rule
. Govern yourself accordingly. This function is here to save you the trouble of making these calls manually.
Lambda Helpers¶
- boto3_helpers.awslambda.update_environment_variables(function_name, new_env, *, lambda_client=None)[source]¶
Do a partial update of a Lambda function’s environment variables. Return the resulting environment.
function_name is the Lambda function name.
new_env is a mapping with the new environment variables.
lambda_client is a
boto3.client('lambda')
instance. If not given, one will be created withboto3.client('lambda')
.
Usage:
from boto3_helpers.awslambda import update_environment_variables new_env = {'LOG_LEVEL': 'INFO', 'LOG_SERVER': '198.51.100.1'} result_env = update_environment_variables( 'ExamplePlaybackConfig', AdDecisionServerUrl='https://198.51.100.1:24601/ads/', ) assert result_env['LOG_LEVEL'] == 'INFO' assert result_env['LOG_SERVER'] == '198.51.100.1' assert result_env['LOG_PORT'] == '24601' # Or whatever it was before
The function’s existing environment variables will be fetched, merged with the new_env, and sent to the Lambda API.
Note
It’s possible for another API user to change environment variables in between this function’s calls to
get_function_configuration
andupdate_function_configuration
. The Lambda API doesn’t allow for atomic updates.
Kinesis Helpers¶
- boto3_helpers.kinesis.yield_all_shards(kinesis_client=None, **kwargs)[source]¶
Due to a bug in
botocore
, thelist_shards
paginator does not work correctly. This function yields the information from all shards in a Kinesis stream.kinesis_client is a
boto3.client('kinesis_client')
instance. If not given, one will be created withboto3.client('kinesis_client')
.kwargs are passed directly to the
list_shards
method. You’ll need to supply at least StreamARN or StreamName.
Usage:
from boto3_helpers.kinesis import yield_all_shards for shard in yield_all_shards(StreamName='example-stream'): print(shard['ShardId'])
- boto3_helpers.kinesis.yield_available_shard_records(kinesis_client=None, **kwargs)[source]¶
Yield all available records from the given Kinesis stream shard. Records will be pulled from until
MillisBehindLatest
is zero.ShardId is the ID of the shard.
kinesis_client is a
boto3.client('kinesis_client')
instance. If not given, one will be created withboto3.client('kinesis_client')
.kwargs are passed directly to the
get_shard_iterator
method. You’ll need to supply at least StreamARN (or StreamName) and ShardId. By default you’ll get records from the stream’sTRIM_HORIZON
.
Reading from the earliest available record:
from datetime import datetime, timedelta, timezone from boto3_helpers.kinesis import yield_available_shard_records for record in yield_available_shard_records('example-stream', 'shard-0001'): print(record['SequenceNumber], record['Data], sep=' ')
- boto3_helpers.kinesis.yield_available_stream_records(kinesis_client=None, **kwargs)[source]¶
Yield all available records from the given Kinesis stream. Records will be pulled from each of the stream’s shards until
MillisBehindLatest
is zero. The shards’ records will be interleaved together (example: if a stream has three shards, the first record yielded will be from shard A, the second will be from shard B, the third will be from shard, the fourth will be from shard A, etc.).kinesis_client is a
boto3.client('kinesis_client')
instance. If not given, one will be created withboto3.client('kinesis_client')
.kwargs are passed directly to the
get_shard_iterator
method. You’ll need to supply at least StreamARN or StreamName. By default you’ll get records from the stream’sTRIM_HORIZON
.
Reading from the earliest available record:
from datetime import datetime, timedelta, timezone from boto3_helpers.kinesis import yield_available_stream_records for record in yield_available_stream_records(StreamName='example-stream'): print(record['SequenceNumber], record['Data], sep=' ')
Reading from a particular timestamp:
from datetime import datetime, timedelta, timezone from boto3_helpers.kinesis import yield_available_stream_records for record in yield_available_stream_records( 'example-stream', ShardIteratorType='AT_TIMESTAMP', Timestamp=datetime.now(timezone.utc) - timedelta(hours=1), ): print(record['SequenceNumber], record['Data], sep=' ')
Note
This is a synchronous function, and may not be fast enough for real-time processing of high volume streams.
MediaLive Helpers¶
- boto3_helpers.medialive.delete_schedule_action_chain(channel_id, delete_action_name, dry_run=False, eml_client=None)[source]¶
Delete a MediaLive scheduled action, plus any actions that depend on it. Return the names of the actions that were deleted.
channel_id is the MediaLive channel ID.
delete_action_name is the name of the scheduled action to delete.
dry_run determines whether the delete actions are actually executed. Set to
False
to return the names of the actions that _would_ have been deleted.eml_client (optional) is a
boto3.client('medialive')
instance.
Usage:
from boto3_helpers.medialive import delete_schedule_action_chain deleted_actions = delete_schedule_action_chain( '24601', 'switch-immediate' )
MediaLive’s deletion rules still apply: you can’t delete an action chain associated with the most recent input switch.
MediaTailor Helpers¶
- boto3_helpers.mediatailor.update_playback_configuration(config_name, emt_client=None, **config_kwargs)[source]¶
Do a partial update of a MediaTailor configuration and return the result:
config_name is the name of the playback configuration that will be updated.
emt_client is a
boto3.client('mediatailor')
instance. If not given, is created withboto3.client('mediatailor')
.config_kwargs are passed directly to the
put_playback_configuration
method.
Usage:
from boto3_helpers.mediatailor import update_playback_configuration new_config = update_playback_configuration( 'ExamplePlaybackConfig', AdDecisionServerUrl='https://198.51.100.1:24601/ads/', )
The existing playback configuration will be fetched, merged with the new values, and then sent to the MediaTailor API.
Note
It’s possible for another API user to change the playback configuration in between this function’s calls to
get_playback_configuration
andput_playback_configuration
. The MediaTailor API doesn’t allow for atomic updates.
Pagination Helpers¶
- boto3_helpers.pagination.yield_all_items(boto_client, method_name, list_key, **kwargs)[source]¶
A helper function that simplifies retrieving items from API endpoints that require paging. Yields each item from every page:
boto_client is a
boto3.client()
instance for the relevant service.method_name is the name of the client method that requires paging.
list_key is the name of the top-level key in the method’s response that corresponds to the desired list of items. If the method’s response has multiple levels, use a
jmespath
expression.kwargs are passed through to the appropriate
paginate
method.
EC2 example:
from boto3 import client as boto3_client from boto3_helpers.pagination import yield_all_items ec2_client = boto3_client('ec2') for item in yield_all_items( ec2_client, 'describe_instances', 'Reservations' ): print(item['ReservationId'])
In this example, the
list_key
for EC2’sdescribe_instances
is'Reservations'
.S3 example:
from boto3 import client as boto3_client from boto3_helpers.pagination import yield_all_items s3_client = boto3_client('s3') for item in yield_all_items( s3_client, 'list_objects_v2', 'Contents', Bucket='example-bucket', Prefix='example-prefix/' ): print(item['Key'])
In this example, the
list_key
for S3’slist_objects_v2
is'Contents'
.CloudFront example:
from boto3 import client as boto3_client from boto3_helpers.pagination import yield_all_items cloudfront_client = boto3_client('cloudfront') for item in yield_all_items( cloudfront_client, 'list_distributions', 'DistributionList.Items' ): print(item['Id'])
S3 Helpers¶
- boto3_helpers.s3.query_object(bucket, key, query, input_format, *, s3_client=None, **kwargs)[source]¶
Runs an S3 Select query on the given object and yields each of the matching records.
bucket is the S3 bucket to use
key is the key to query
query is the S3 Select SQL query to use
input_format this can be
json
,json.gz
,jsonl
,jsonl.gz
,csv
,csv.gz
,tsv
,tsv.gz
, orNone
.s3_client is a
boto3.client('s3')
instance. If not given, one will be created withboto3.client('s3')
.kwargs are passed to the
select_object_content
method.
The
csv
,csv.gz
,tsv
, andtsv.gz
input formats assume a usable header row. To customize the input format, set input_format toNone
and specifyInputSerialization
in kwargs.Each of the output records will be decoded with
json.loads
before being yielded. The function takes care of combining partial records from S3’s event stream.from boto3_helpers.s3 import query_object for record in query_object( 'ExampleBucket', 'ExamplePath/ExampleKey.jsonl.gz', 'SELECT * FROM s3object s', 'jsonl.gz', ): print(record['SomeField'], record['OtherField'], sep=' ')
Signed Request Helpers¶
- exception boto3_helpers.signed_requests.SigV4RequestException(status_code, content)[source]¶
Exception raised by
sigv4_request()
when an HTTP response indicates an error.
- boto3_helpers.signed_requests.sigv4_request(service, method, endpoint, client=None, base_url=None, operation_name=None, **kwargs)[source]¶
Make a signed request to the AWS API and return the JSON payload.
service is the AWS API service code
method is an HTTP method like
'GET'
or'POST'
endpoint is the target API endpoint. If you need to supply parameters, put supply them as a query string here (e.g.,
?MaxResults=1
)client is a
boto3.client
instance for the same account and region as your target. If not given, is created withboto3.client('sts')
base_url is the URL for the target AWS API. If not given, a guess will be made based on the service name and client region.
operation_name is the name of the API operation to use when signing the request
kwargs are passed on to an
AWSRequest
object.
If the API response indicates an error,
boto3_helpers.signed_requests.SigV4RequestException
will be raised.This function is useful for accessing endpoints that aren’t supported by
boto3
. For example,botocore
introduced support for the'scheduler'
service in version 1.29.7. You could have used this function to interact with that API before this new version was available:from boto3_helpers.signed_requests import sigv4_request schedule_data = sigv4_request('scheduler', 'GET', 'schedules')
Explanation:
The EventBridge Scheduler API Reference describes the
ListSchedules
API action.The service code for EventBridge Scheduler is
'scheduler'
.The method for
ListSchedules
is'GET'
.The endpoint is
'/schedules'
. This function will strip off leading slashes.
We could haved optionally supplied the
operation_name
as'ListSchedules'
.Example: Invoking a Lambda function URL that uses the
AWS_IAM
auth type:from boto3_helpers.signed_requests import sigv4_request resp = sigv4_request( 'lambda', 'POST', '/', base_url='https://660d26cd.lambda-url.test-region-1.on.aws', data=dumps({'payload_key_1': 'payload_value_1'}) )
SQS Helpers¶
- boto3_helpers.sqs.send_batches(queue_url, all_messages, sqs_client=None, message_limit=10, size_limit=262144)[source]¶
Call
send_message_batch
as many times as necessary to deliver the messages in all_messages, creating batches that fit SQS limits automatically.queue_url is the URL of the SQS queue.
all_messages is an iterable of message entries, like what you would use for
send_message
orsend_message_batch
.sqs_client is a
boto3.client('sqs')
instance. If not given, is created withboto3.client('sqs')
.message_limit is
10
by default. This is the maximum number of messages to be sent per batch.size_limit is
262_144
(256 KiB) by default. This is the maximum batch payload size.
Return value:
{ 'Successful': [ { 'Id': 'string', 'MessageId': 'string', 'MD5OfMessageBody': 'string', 'MD5OfMessageAttributes': 'string', 'MD5OfMessageSystemAttributes': 'string', 'SequenceNumber': 'string' }, ], 'Failed': [ { 'Id': 'string', 'SenderFault': bool, 'Code': 'string', 'Message': 'string' }, ] }
If you don’t supply an
Id
parameter in your messages, one will be inserted automatically.Messages from all_messages are collected in order. If the number of message reaches the message_limit or the combined payload size of the messages reaches size_limit, a new batch will be started. The size calculation includes message attributes.
Usage:
from boto3_helpers.sqs import send_batches queue_url = 'https://sqs.test-region-1.amazonaws.com/000000000000/test-queue' all_messages = [ {'MessageBody': 'Beautiful is better than ugly'}, {'MessageBody': 'Explicit is better than implicit', 'DelaySeconds': 120}, {'MessageBody': 'Simple is better than complex'}, # Fill this in with an arbitrary number of messages ] send_batches(queue_url, all_messages)
- boto3_helpers.sqs.delete_batches(queue_url, all_messages, sqs_client=None, message_limit=10)[source]¶
Call
delete_message_batch
as many times as necessary to delete the messages in all_messages, creating batches that fit SQS limits automatically.queue_url is the URL of the SQS queue.
all_messages is an iterable of message entries, like what you would use for
delete_message
ordelete_message_batch
.sqs_client is a
boto3.client('sqs')
instance. If not given, is created withboto3.client('sqs')
.message_limit is
10
by default. This is the maximum number of messages to delete per batch.
Return value:
{ 'Successful': [ { 'Id': 'string', 'MessageId': 'string', 'MD5OfMessageBody': 'string', 'MD5OfMessageAttributes': 'string', 'MD5OfMessageSystemAttributes': 'string', 'SequenceNumber': 'string' }, ], 'Failed': [ { 'Id': 'string', 'SenderFault': bool, 'Code': 'string', 'Message': 'string' }, ] }
The items in all_messages only need to have a
ReceiptHandle
key in them. This means you can pass in messages you get from thereceive_messages
method directly.Usage:
from boto3_helpers.sqs import delete_batches queue_url = 'https://sqs.test-region-1.amazonaws.com/000000000000/test-queue' all_messages = [ {'ReceiptHandle': 'UmVjZWlwdCBoYW5kbGUgMQ=='}, {'ReceiptHandle': 'U2Vjb25kIHJlY2VpcHQgaGFuZGxl'}, {'Id': '24601', 'ReceiptHandle': 'VGhpcyBvbmUgaGFzIGl0cyBvd24gSUQ='}, # Fill this in with an arbitrary number of messages ] delete_batches(queue_url, all_messages)
STS Helpers¶
- boto3_helpers.sts.assumed_role_session(sts_client=None, session_kwargs=None, **assume_role_kwargs)[source]¶
Return a
boto3.Session
object for an assumed role:sts_client is a
boto3.client('sts')
instance. If not given, one will be created withboto3.client('sts')
.session_kwargs are the keyword arguments you want to pass to the
boto3.Session()
constructor.assume_role_kwargs are the arguments for the
assume_role
operation, which at least includeRoleArn
. IfRoleSessionName
is not given, a randomly-generated one will be used.
Usage:
from boto3_helpers.sts import assumed_role_session role_arn = 'arn:aws:iam::000000000000:role/TargetRole' session = assumed_role_session(RoleArn=role_arn)
This is equivalent to:
from boto3 import ( client as boto3_client, Session as boto3_session, ) sts_client = boto3_client('sts') role_arn = 'arn:aws:iam::000000000000:role/TargetRole' session_name = 'AssumedRoleSession1' resp = sts_client.assume_role( RoleArn=role_arn, RoleSessionName=session_name ) credentials = resp['credentials'] session = boto3_session( aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'], )
- boto3_helpers.sts.assumed_role_client(service_name, *, sts_client=None, client_kwargs=None, **assume_role_kwargs)[source]¶
Return a
boto3.client
object for an assumed role:service_name is the name of a service.
sts_client is a
boto3.client('sts')
instance. If not given, one will be created withboto3.client('sts')
.client_kwargs are the keyword arguments you want to pass to the
boto3.client()
constructor.assume_role_kwargs are the arguments for the
assume_role
operation, which at least includeRoleArn
. IfRoleSessionName
is not given, a randomly-generated one will be used.
Usage:
from boto3_helpers.sts import assumed_role_client client_kwargs = {'region_name': 'us-east-2'} role_arn = 'arn:aws:iam::000000000000:role/TargetRole' sqs_client = assumed_role_client( 'sqs', client_kwargs, RoleArn=role_arn )
- boto3_helpers.sts.assumed_role_resource(service_name, *, sts_client=None, resource_kwargs=None, **assume_role_kwargs)[source]¶
Return a
boto3.resource
object for an assumed role:service_name is the name of a service.
sts_client is a
boto3.client('sts')
instance. If not given, one will be created withboto3.client('sts')
.resource_kwargs are the keyword arguments you want to pass to the
boto3.resource()
constructor.assume_role_kwargs are the arguments for the
assume_role
operation, which at least includeRoleArn
. IfRoleSessionName
is not given, a randomly-generated one will be used.
Usage:
from boto3_helpers.sts import assumed_role_resource resource_kwargs = {'region_name': 'us-east-2'} role_arn = 'arn:aws:iam::000000000000:role/TargetRole' dynamodb_resource = assumed_role_resource( 'dynamodb', resource_kwargs, RoleArn=role_arn )