This is a sample starter project that show cases how to replicate between AWS s3 storage and Akamai Netstorage based on object events. (only create/update events for now)
This project uses a Lambda s3 object event trigger function that adds the events to a SQS queue. This queue is read by a ECS Cluster task that processes the events and updates Netstorage.
The scope of this POC is to provide the means/example on how to get started but additional changes will be needed, like: add auto scaling of task based on SQS Queue or creation of Task schedule, etc.
Functionality:
-
Creates AWS infrastructure:
- IAM Roles
- IAM Policy Attachments
- VPC Subnets
- Security Group
- Secret (Secret Manager)
- SQS FIFO Queue
- Lambda Function
- S3 Lambda Event Trigger
- ECR Container Repository
- ECS Container Cluster
- ECS Container Service
- ECS Container Task
-
Other:
- Builds Docker Image
- Tags Docker Image
- Push's Docker Image to ECR
Argument | Purpose |
---|---|
bucket | [Requiered] S3 Bucket to be replicated. |
accountid | [Requiered] AWS Account ID |
region | [Requiered] AWS region |
cpcode | [Requiered] Akamai NetStorage CPCODE |
secret | [Requiered] NetStorage API Credentials, in Json format. Expects Hostname, username and Key |
terraform plan -var='bucket=CHANGE_ME' -var='accountid=CHANGE_ME' -var='cpcode=CHANGE_ME' -var='region=CHANGE_ME' -var='secret={"NS_HOSTNAME":"CHANGE_ME","NS_USER":"CHANGE_ME","NS_KEY":"CHANGE_ME"}'
terraform apply -var='bucket=CHANGE_ME' -var='accountid=CHANGE_ME' -var='cpcode=CHANGE_ME' -var='region=CHANGE_ME' -var='secret={"NS_HOSTNAME":"CHANGE_ME","NS_USER":"CHANGE_ME","NS_KEY":"CHANGE_ME"}'
FROM python:3
RUN pip3 install netstorageapi
RUN pip3 install boto3
COPY sync.py /
ENTRYPOINT [ "python", "./sync.py" ]
Lightweight python script that reads s3 events and adds them to SQS queue.
def lambda_handler(event, context):
configure_logging()
statusCode = 200
record_lst = []
logger.info ("Events {0} received.".format(len(event['Records'])))
for record in event['Records']:
new_record = {
'eventName':record['eventName'],
'bucket':record['s3']['bucket']['name'],
'key':record['s3']['object']['key'],
'etag':record['s3']['object']['eTag'],
'sequencer':record['s3']['object']['sequencer']
}
record_lst.append(new_record)
if addToQueue(record_lst) is False:
statusCode = 500
if len(record_lst) == 0:
statusCode = 500
if statusCode == 500:
return {
'statusCode': statusCode,
'body': 'Error, {0}/{1} added to queue!'.format(len(record_lst),len(event['Records']))
}
return {
'statusCode': statusCode,
'body': 'Success, {0}/{1} Added to queue!'.format(len(record_lst),len(event['Records']))
}
def addToQueue(record):
"""
Description: add processed event to SQS Queue for processing
Links:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.send_message
Expects:
record
Returns: Bool on success or failure
"""
sqs = boto3.client('sqs')
try:
response = sqs.send_message(
QueueUrl= os.environ['queueUrl'],
DelaySeconds=0,
MessageBody=(json.dumps(record)),
MessageGroupId='S3-NS-SYNC'
)
logger.info ("Record added to queue: {0}".format(record))
except Exception as e:
logger.error ("Error adding record '{0}' to queue: {1}".format(record,e))
return False
return True
Lightweight python script that is run by ECS instance.
Functionality:
- Reads SQS Queue
def readQueue():
"""
Description: Reads SQS Queue for processing
Links:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.receive_message
Expects:
Nothing
Returns: message array, each in JSON
"""
sqs = boto3.client('sqs',region_name=REGION_NAME)
resp = sqs.receive_message(
QueueUrl=QUEUE_URL,
AttributeNames=['All'],
MaxNumberOfMessages=5,
VisibilityTimeout=30,
WaitTimeSeconds=0
)
try:
messages = resp['Messages']
logger.info('{0} Messages on the queue!'.format(len(messages)))
except KeyError:
logger.info('No messages on the queue!')
messages = []
return messages
- Fetches s3 Object
def fetchS3Object(s3_key):
logger.info("Downloading file: {0}".format(s3_key))
try:
s3 = boto3.client('s3')
path = os.path.dirname(s3_key)
logger.info("File directory : {0}".format(path))
os.makedirs(path, exist_ok=True)
s3.download_file(S3_BUCKET,s3_key,s3_key)
success = os.path.isfile(s3_key)
logger.info ("File Downloaded: {0}".format(str(success)))
return success
except Exception as e:
logger.error ("Error downloading file: {0}".format(e))
- Uploads/updates Akamai Netstorage objects.
def upload(CPCODE,path):
"""
Description: uploads to netstorage.
Links:
https://learn.akamai.com/en-us/webhelp/netstorage/netstorage-http-api-developer-guide/GUID-C9CEE090-B272-4E47-ACD9-853D80C747BC.html
https://github.com/akamai/NetStorageKit-Python
Expects:
Cpcode
path: s3 object key
Returns: Bool based on task success.
"""
NS_USER = json.loads(SECRET)['NS_USER']
NS_KEY = json.loads(SECRET)['NS_KEY']
NS_HOSTNAME = json.loads(SECRET)['NS_HOSTNAME']
logger.info("Uploading to Ns with user: '{0}'".format(NS_USER))
ns = Netstorage(NS_HOSTNAME, NS_USER, NS_KEY, ssl=False)
netstorage_destination = "/{0}/{1}".format(CPCODE,path)
logger.info("Upload Location: '{0}'.".format(netstorage_destination))
ok, _ = ns.upload(path, netstorage_destination)
if ok:
logger.info("File: uploaded.")
return True
else:
logger.info("File: upload failed.")
return False
- Removes Event from Queue
def popQueue(receipt_handle):
"""
Description: POPs Msg from SQS fifo queue
Links:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.delete_message
Expects:
receipt_handle (string) -- The SQS Message's receipt_handle identifier.
Returns: Nothing
"""
sqs = boto3.client('sqs',region_name=REGION_NAME)
sqs.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=receipt_handle
)
logger.info ("Event removed from queue: {0}".format(receipt_handle))
return
Want to contribute? Sure why not! just let me know!
I am providing code and resources in this repository to you under an open-source license. Because this is my repository, the license you receive to my code and resources is from me and not my employer (Akamai).
Copyright 2019 Roy Martinez
Creative Commons Attribution 4.0 International License (CC BY 4.0)
http://creativecommons.org/licenses/by/4.0/