2017-01-24 26 views
5

Próbuję zautomatyzować ładowanie csv do tabeli MySQL po jej otrzymaniu do wiadra S3.Czytanie csv z S3 i wstawianie do tabeli MySQL za pomocą AWS Lambda

Moja strategia polega na tym, że S3 uruchamia zdarzenie po otrzymaniu pliku do określonego zasobnika (nazwijmy go "plikiem-wiadrem"). To zdarzenie jest wysyłane do funkcji AWS Lambda, która pobiera i przetwarza plik wstawiając każdy wiersz do tabeli MySql (nazwijmy to "target_table").

Musimy wziąć pod uwagę, że RDS jest w VPC.

Obecna konfiguracja zgody wiadra jest:

{ 
    "Version": "2008-10-17", 
    "Statement": [ 
     { 
      "Sid": "PublicReadForGetBucketObjects", 
      "Effect": "Allow", 
      "Principal": { 
       "AWS": "*" 
      }, 
      "Action": "s3:GetObject", 
      "Resource": "arn:aws:s3:::bucket-file/*" 
     } 
    ] 
} 

Utworzyłem rolę z następującymi zasadami, AmazonS3FullAccess i AWSLambdaVPCAccessExecutionRole dołączonych do funkcji AWS Lambda.

Kod lambda:

from __future__ import print_function 
import boto3 
import logging 
import os 
import sys 
import uuid 
import pymysql 
import csv 
import rds_config 


rds_host = rds_config.rds_host 
name = rds_config.db_username 
password = rds_config.db_password 
db_name = rds_config.db_name 


logger = logging.getLogger() 
logger.setLevel(logging.INFO) 

try: 
    conn = pymysql.connect(rds_host, user=name, passwd=password, db=db_name, connect_timeout=5) 
except Exception as e: 
    logger.error("ERROR: Unexpected error: Could not connect to MySql instance.") 
    logger.error(e) 
    sys.exit() 

logger.info("SUCCESS: Connection to RDS mysql instance succeeded") 

s3_client = boto3.client('s3') 

def handler(event, context): 

    bucket = event['Records'][0]['s3']['bucket']['name'] 
    key = event['Records'][0]['s3']['object']['key'] 
    download_path = '/tmp/{}{}'.format(uuid.uuid4(), key) 

    s3_client.download_file(bucket, key,download_path) 

    csv_data = csv.reader(file(download_path)) 

    with conn.cursor() as cur: 
     for idx, row in enumerate(csv_data): 

      logger.info(row) 
      try: 
       cur.execute('INSERT INTO target_table(column1, column2, column3)' \ 
           'VALUES("%s", "%s", "%s")' 
           , row) 
      except Exception as e: 
       logger.error(e) 

      if idx % 100 == 0: 
       conn.commit() 

     conn.commit() 

    return 'File loaded into RDS:' + str(download_path) 

Byłem testowania funkcji i S3 wysyła zdarzenia, gdy plik zostanie przesłany, Lambda łączy się z wystąpieniem RDS i otrzymać powiadomienie. Sprawdziłem, że nazwa zasobnika to 'bucket-file', a nazwa pliku również jest właściwa. Problem polega na tym, że funkcja osiąga linię s3_client.download_file(bucket, key,download_path), gdzie blokuje się, dopóki nie zostanie osiągnięty czas wygaśnięcia lamdby.

Oglądając dzienniki mówi:

[INFO] 2017-01-24T14:36:52.102Z SUCCESS: Connection to RDS mysql instance succeeded 
[INFO] 2017-01-24T14:36:53.282Z Starting new HTTPS connection (1): bucket-files.s3.amazonaws.com 
[INFO] 2017-01-24T14:37:23.223Z Starting new HTTPS connection (2): bucket-files.s3.amazonaws.com 
2017-01-24T14:37:48.684Z Task timed out after 60.00 seconds 

Mam również przeczytać, że jeśli pracujesz w VPC, w celu uzyskania dostępu do S3 wiadra trzeba stworzyć VPC punkt końcowy, który daje dostęp do S3 dla ta podsieć. Próbowałem również tego rozwiązania, ale wynik jest taki sam.

Doceniam kilka pomysłów.

Z góry dziękuję!

Odpowiedz

2

W końcu to dostałem!

Problem polegał na problemie VPC. Jak już powiedziałem, stworzyłem punkt końcowy VPC, aby usługa S3 była dostępna z mojego VPC, ale mój stół trasy był źle skonfigurowany.

Podsumowując, jeśli pracujesz w VPC z lambda i chcesz uzyskać dostęp do S3, musisz utworzyć punkt końcowy VPC. Poza tym, jeśli chcesz uzyskać dostęp do innych usług internetowych poza VPC, musisz skonfigurować bramkę NAT.