Source code for boto3_helpers.cloudwatch

from boto3 import client as boto3_client

from boto3_helpers.pagination import yield_all_items


[docs]def yield_metric_data( namespace, metric_name, dimension_map, period, stat, start_time, end_time, cw_client=None, **kwargs, ): """Yield all the data associated with a single metric. Each item yielded is a ``(dt, value)`` pair. * *namespace* is the namespace for the metric. * *metric_name* is the name of the metric. * *dimension_map* is a ``dict`` that maps dimension names to values, e.g. ``{'Name': 'Value'}``. If the metric has no dimension, supply an empty ``dict``. * *period* is the granularity of the returned data points in seconds. * *stat* is the name of the statistic to evaluate, e.g. ``Maximum``. * *start_time* is a ``datetime.datetime`` object that specifies that beginning of the query. * *end_time* is a ``datetime.datetime`` object that specifies that end of the query. * *cw_client* is a ``boto3.client('cloudwatch')`` instance. If not given, is created with ``boto3.client('cloudwatch')`` * *kwargs* can include ``Unit``, ``Expression``, ``Period``, ``AccountId``, ``ScanBy``, ``LabelOptions``, and ``PaginationConfig``. These are inserted at the appropriate place in the ``get_paginator('get_metric_data').paginate`` call. Usage: .. code-block:: python from datetime import datetime, timezone from boto3_helpers.cloudwatch import yield_metric_data for dt, value in yield_metric_data( 'AWS/S3', 'NumberOfObjects', {'StorageType': 'AllStorageTypes', 'BucketName': 'example-bucket'}, 86400, 'Maximum', datetime(2022, 9, 1, 0, 0, 0, timezone.utc), datetime(2022, 9, 16, 0, 0, 0, timezone.utc), ): print(dt.isoformat(), value, sep=' ') This function is designed to simplify the common case of pulling all data for a single metric, which is cumbersome with the normal CloudWatch ``get_metric_data`` method. ``ScanBy`` is set to ``TimestampAscending`` by default, so data should be emitted in sorted order. """ cw_client = cw_client or boto3_client('cloudwatch') metric_stat = { 'Metric': { 'Namespace': namespace, 'MetricName': metric_name, 'Dimensions': [{'Name': k, 'Value': v} for k, v in dimension_map.items()], }, 'Period': period, 'Stat': stat, } unit = kwargs.pop('Unit', None) if unit is not None: metric_stat['Unit'] = unit query = {'Id': 'query0', 'MetricStat': metric_stat} for key in ('Expression', 'Period', 'AccountId'): value = kwargs.pop(key, None) if value is not None: query[key] = value get_kwargs = { 'MetricDataQueries': [query], 'StartTime': start_time, 'EndTime': end_time, 'ScanBy': 'TimestampAscending', } get_kwargs.update(kwargs) for item in yield_all_items( cw_client, 'get_metric_data', 'MetricDataResults', **get_kwargs ): yield from zip(item['Timestamps'], item['Values'])