Source code for boto3_helpers.sqs

from secrets import token_hex

from boto3 import client as boto3_client

MESSAGE_LIMIT = 10
SIZE_LIMIT = 262144


def _get_size(message):
    # The size of the message body is the size of the UTF-8 representation
    ret = len(message['MessageBody'].encode('utf-8'))

    # All parts of the message attribute, including Name, DataType, and Value are part
    # of the message size restriction
    for attr_name, attr_data in message.get('MessageAttributes', {}).items():
        ret += len(attr_name.encode('utf-8'))
        ret += len(attr_data['DataType'].encode('utf-8'))
        if 'StringValue' in attr_data:
            ret += len(attr_data['StringValue'].encode('utf-8'))
        elif 'BinaryValue' in attr_data:
            ret += len(attr_data['BinaryValue'])

    # MessageSystemAttributes don't count towards the total size of a message.
    return ret


def _get_batches(all_messages, message_limit, size_limit):
    base_id = token_hex(4)
    current_batch = []
    current_size = 0
    current_count = 0
    for i, message in enumerate(all_messages, 1):
        if message.get('Id') is None:
            message['Id'] = f'{base_id}-{i}'

        if size_limit is None:
            message_size = 0
            reached_size = False
        if size_limit is not None:
            message_size = _get_size(message)
            reached_size = (current_size + message_size) > size_limit

        reached_count = current_count == message_limit
        if current_batch and (reached_size or reached_count):
            yield current_batch[:]
            del current_batch[:]
            current_size = 0
            current_count = 0

        current_batch.append(message)
        current_size += message_size
        current_count += 1

    if current_batch:
        yield current_batch[:]


[docs]def send_batches( queue_url, all_messages, sqs_client=None, message_limit=MESSAGE_LIMIT, size_limit=SIZE_LIMIT, ): """Call ``send_message_batch`` as many times as necessary to deliver the messages in *all_messages*, creating batches that fit SQS limits automatically. * *queue_url* is the URL of the SQS queue. * *all_messages* is an iterable of message entries, like what you would use for ``send_message`` or ``send_message_batch``. * *sqs_client* is a ``boto3.client('sqs')`` instance. If not given, is created with ``boto3.client('sqs')``. * *message_limit* is ``10`` by default. This is the maximum number of messages to be sent per batch. * *size_limit* is ``262_144`` (256 KiB) by default. This is the maximum batch payload size. Return value: .. code-block:: python { 'Successful': [ { 'Id': 'string', 'MessageId': 'string', 'MD5OfMessageBody': 'string', 'MD5OfMessageAttributes': 'string', 'MD5OfMessageSystemAttributes': 'string', 'SequenceNumber': 'string' }, ], 'Failed': [ { 'Id': 'string', 'SenderFault': bool, 'Code': 'string', 'Message': 'string' }, ] } If you don't supply an ``Id`` parameter in your messages, one will be inserted automatically. Messages from *all_messages* are collected in order. If the number of message reaches the *message_limit* or the combined payload size of the messages reaches *size_limit*, a new batch will be started. The size calculation includes message attributes. Usage: .. code-block:: python from boto3_helpers.sqs import send_batches queue_url = 'https://sqs.test-region-1.amazonaws.com/000000000000/test-queue' all_messages = [ {'MessageBody': 'Beautiful is better than ugly'}, {'MessageBody': 'Explicit is better than implicit', 'DelaySeconds': 120}, {'MessageBody': 'Simple is better than complex'}, # Fill this in with an arbitrary number of messages ] send_batches(queue_url, all_messages) """ sqs_client = sqs_client or boto3_client('sqs') ret = {'Successful': [], 'Failed': []} for batch in _get_batches(all_messages, message_limit, size_limit): resp = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=batch) ret['Successful'] += resp.get('Successful', []) ret['Failed'] += resp.get('Failed', []) return ret
[docs]def delete_batches( queue_url, all_messages, sqs_client=None, message_limit=MESSAGE_LIMIT ): """Call ``delete_message_batch`` as many times as necessary to delete the messages in *all_messages*, creating batches that fit SQS limits automatically. * *queue_url* is the URL of the SQS queue. * *all_messages* is an iterable of message entries, like what you would use for ``delete_message`` or ``delete_message_batch``. * *sqs_client* is a ``boto3.client('sqs')`` instance. If not given, is created with ``boto3.client('sqs')``. * *message_limit* is ``10`` by default. This is the maximum number of messages to delete per batch. Return value: .. code-block:: python { 'Successful': [ { 'Id': 'string', 'MessageId': 'string', 'MD5OfMessageBody': 'string', 'MD5OfMessageAttributes': 'string', 'MD5OfMessageSystemAttributes': 'string', 'SequenceNumber': 'string' }, ], 'Failed': [ { 'Id': 'string', 'SenderFault': bool, 'Code': 'string', 'Message': 'string' }, ] } The items in *all_messages* only need to have a ``ReceiptHandle`` key in them. This means you can pass in messages you get from the ``receive_messages`` method directly. Usage: .. code-block:: python from boto3_helpers.sqs import delete_batches queue_url = 'https://sqs.test-region-1.amazonaws.com/000000000000/test-queue' all_messages = [ {'ReceiptHandle': 'UmVjZWlwdCBoYW5kbGUgMQ=='}, {'ReceiptHandle': 'U2Vjb25kIHJlY2VpcHQgaGFuZGxl'}, {'Id': '24601', 'ReceiptHandle': 'VGhpcyBvbmUgaGFzIGl0cyBvd24gSUQ='}, # Fill this in with an arbitrary number of messages ] delete_batches(queue_url, all_messages) """ sqs_client = sqs_client or boto3_client('sqs') all_deletes = ({k: m.get(k) for k in ('Id', 'ReceiptHandle')} for m in all_messages) ret = {'Successful': [], 'Failed': []} for batch in _get_batches(all_deletes, message_limit, None): resp = sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=batch) ret['Successful'] += resp.get('Successful', []) ret['Failed'] += resp.get('Failed', []) return ret