Próbuję zautomatyzować ładowanie csv do tabeli MySQL po jej otrzymaniu do wiadra S3.Czytanie csv z S3 i wstawianie do tabeli MySQL za pomocą AWS Lambda
Moja strategia polega na tym, że S3 uruchamia zdarzenie po otrzymaniu pliku do określonego zasobnika (nazwijmy go "plikiem-wiadrem"). To zdarzenie jest wysyłane do funkcji AWS Lambda, która pobiera i przetwarza plik wstawiając każdy wiersz do tabeli MySql (nazwijmy to "target_table").
Musimy wziąć pod uwagę, że RDS jest w VPC.
Obecna konfiguracja zgody wiadra jest:
{
"Version": "2008-10-17",
"Statement": [
{
"Sid": "PublicReadForGetBucketObjects",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::bucket-file/*"
}
]
}
Utworzyłem rolę z następującymi zasadami, AmazonS3FullAccess i AWSLambdaVPCAccessExecutionRole dołączonych do funkcji AWS Lambda.
Kod lambda:
from __future__ import print_function
import boto3
import logging
import os
import sys
import uuid
import pymysql
import csv
import rds_config
rds_host = rds_config.rds_host
name = rds_config.db_username
password = rds_config.db_password
db_name = rds_config.db_name
logger = logging.getLogger()
logger.setLevel(logging.INFO)
try:
conn = pymysql.connect(rds_host, user=name, passwd=password, db=db_name, connect_timeout=5)
except Exception as e:
logger.error("ERROR: Unexpected error: Could not connect to MySql instance.")
logger.error(e)
sys.exit()
logger.info("SUCCESS: Connection to RDS mysql instance succeeded")
s3_client = boto3.client('s3')
def handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
download_path = '/tmp/{}{}'.format(uuid.uuid4(), key)
s3_client.download_file(bucket, key,download_path)
csv_data = csv.reader(file(download_path))
with conn.cursor() as cur:
for idx, row in enumerate(csv_data):
logger.info(row)
try:
cur.execute('INSERT INTO target_table(column1, column2, column3)' \
'VALUES("%s", "%s", "%s")'
, row)
except Exception as e:
logger.error(e)
if idx % 100 == 0:
conn.commit()
conn.commit()
return 'File loaded into RDS:' + str(download_path)
Byłem testowania funkcji i S3 wysyła zdarzenia, gdy plik zostanie przesłany, Lambda łączy się z wystąpieniem RDS i otrzymać powiadomienie. Sprawdziłem, że nazwa zasobnika to 'bucket-file', a nazwa pliku również jest właściwa. Problem polega na tym, że funkcja osiąga linię s3_client.download_file(bucket, key,download_path)
, gdzie blokuje się, dopóki nie zostanie osiągnięty czas wygaśnięcia lamdby.
Oglądając dzienniki mówi:
[INFO] 2017-01-24T14:36:52.102Z SUCCESS: Connection to RDS mysql instance succeeded
[INFO] 2017-01-24T14:36:53.282Z Starting new HTTPS connection (1): bucket-files.s3.amazonaws.com
[INFO] 2017-01-24T14:37:23.223Z Starting new HTTPS connection (2): bucket-files.s3.amazonaws.com
2017-01-24T14:37:48.684Z Task timed out after 60.00 seconds
Mam również przeczytać, że jeśli pracujesz w VPC, w celu uzyskania dostępu do S3 wiadra trzeba stworzyć VPC punkt końcowy, który daje dostęp do S3 dla ta podsieć. Próbowałem również tego rozwiązania, ale wynik jest taki sam.
Doceniam kilka pomysłów.
Z góry dziękuję!