Topics on this page
Sample python script for re-scanning files in AWS
import boto3
import json
import logging
import re
import time
import urllib.parse
from datetime import datetime, timezone
BUCKET_LISTENER_ARN = <My_Bucket_Listener_ARN>
URLS = [<My_File_URL_1>, <My_File_URL_2>, <My_File_URL_3>, ...]
REGION = BUCKET_LISTENER_ARN.split(':')[3]
logging.getLogger().setLevel(logging.INFO)
lambda_client = boto3.client('lambda', region_name=REGION)
def parse_url(url):
s3_domain_pattern = 's3(\..+)?\.amazonaws.com'
parsed_url = urllib.parse.urlparse(url)
# check pre-signed URL type, path or virtual
if re.fullmatch(s3_domain_pattern, parsed_url.netloc):
bucket = parsed_url.path.split('/')[1]
s3_object = '/'.join(parsed_url.path.split('/')[2:])
else:
bucket = parsed_url.netloc.split('.')[0]
s3_object = parsed_url.path[1:]
object_key = urllib.parse.unquote_plus(s3_object)
return (bucket, object_key)
def get_s3_event(bucket, object_key):
(bucket, object_key) = parse_url(url)
return {
'eventTime': datetime.now().strftime('%Y-%m-%dT%H:%M:%S%Z'),
'responseElements': {
'x-amz-request-id': 'some-uuid'
},
"awsRegion": REGION,
's3': {
'bucket': {
'name': bucket
},
'object': {
'key': object_key,
'eTag': '' # Use S3 HeadObject to get the object's eTag value
}
},
}
def trigger_scans(urls):
records = list(map(lambda url: get_s3_event(url), urls))
response = lambda_client.invoke(
FunctionName=BUCKET_LISTENER_ARN,
InvocationType='Event',
Payload=bytes(json.dumps({'Records': records}), encoding='utf8')
)
logging.debug(f'Trigger response: {response}')
def main():
trigger_scans(URLS)
if __name__ == '__main__':
main()
Click here to return to previous page.