Table of contents
Topics on this page

Sample python script for re-scanning files in AWS

    import boto3
    import json
    import logging
    import re
    import time
    import urllib.parse

    from datetime import datetime, timezone

    BUCKET_LISTENER_ARN = <My_Bucket_Listener_ARN>
    URLS = [<My_File_URL_1>, <My_File_URL_2>, <My_File_URL_3>, ...]
    REGION = BUCKET_LISTENER_ARN.split(':')[3]

    logging.getLogger().setLevel(logging.INFO)

    lambda_client = boto3.client('lambda', region_name=REGION)

    def parse_url(url):
        s3_domain_pattern = 's3(\..+)?\.amazonaws.com'

        parsed_url = urllib.parse.urlparse(url)
        # check pre-signed URL type, path or virtual
        if re.fullmatch(s3_domain_pattern, parsed_url.netloc):
            bucket = parsed_url.path.split('/')[1]
            s3_object = '/'.join(parsed_url.path.split('/')[2:])
        else:
            bucket = parsed_url.netloc.split('.')[0]
            s3_object = parsed_url.path[1:]

        object_key = urllib.parse.unquote_plus(s3_object)
        return (bucket, object_key)

    def get_s3_event(bucket, object_key):
        (bucket, object_key) = parse_url(url)
        return {
            'eventTime': datetime.now().strftime('%Y-%m-%dT%H:%M:%S%Z'),
            'responseElements': {
                'x-amz-request-id': 'some-uuid'
            },
            "awsRegion": REGION,
            's3': {
                'bucket': {
                    'name': bucket
                },
                'object': {
                    'key': object_key,
                    'eTag': '' # Use S3 HeadObject to get the object's eTag value
                }
            },
        } 

    def trigger_scans(urls):
        records = list(map(lambda url: get_s3_event(url), urls))
        response = lambda_client.invoke(
            FunctionName=BUCKET_LISTENER_ARN,
            InvocationType='Event',
            Payload=bytes(json.dumps({'Records': records}), encoding='utf8')
        )
        logging.debug(f'Trigger response: {response}')

    def main():
        trigger_scans(URLS)

    if __name__ == '__main__':
        main()

Click here to return to previous page.