2015-02-02 13 views
5

Próbuję zrobić Redshift COPY w SQLAlchemy.Redshift COPY operacja nie działa w SQLAlchemy

Poniższy SQL prawidłowo kopie obiektów z mojego S3 wiadra na moim stole redshifcie kiedy go wykonać w psql:

COPY posts FROM 's3://mybucket/the/key/prefix' 
WITH CREDENTIALS 'aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' 
JSON AS 'auto'; 

Mam kilka plików o nazwie

s3://mybucket/the/key/prefix.001.json 
s3://mybucket/the/key/prefix.002.json 
etc. 

mogę zweryfikować, że nowy wiersze zostały dodane do tabeli za pomocą select count(*) from posts.

Jednak, gdy wykonuję dokładnie to samo wyrażenie SQL w SQLAlchemy, wykonanie kończy się bezbłędnie, ale do mojej tabeli nie zostaną dodane żadne wiersze.

session = get_redshift_session() 
session.bind.execute("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';") 
session.commit() 

To nie ma znaczenia, czy robię powyżej lub

from sqlalchemy.sql import text 
session = get_redshift_session() 
session.execute(text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';")) 
session.commit() 

Odpowiedz

6

I w zasadzie ten sam problem, choć w moim przypadku było więcej:

engine = create_engine('...') 
engine.execute(text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';")) 

poprzez intensyfikację przez WPB, problem był oczywiście brak .commit() powołano. Nie wiem, dlaczego session.commit() nie działa w twoim przypadku (może sesja "straciła ścieżkę" wysłanych poleceń?), Więc może to faktycznie nie naprawić twojego problemu.

Tak czy inaczej, jak explained in the sqlalchemy docs

Biorąc pod uwagę ten wymóg, SQLAlchemy realizuje swój „AUTOCOMMIT” cecha, która działa zupełnie konsekwentnie we wszystkich backendów. Osiąga się to przez wykrywanie instrukcji, które reprezentują operacje zmieniające dane, tj. INSERT, UPDATE, DELETE [...] Jeśli instrukcja jest instrukcją tekstową i flaga nie jest ustawiona, używane jest wyrażenie regularne do wykrywania INSERT, UPDATE , DELETE, a także wiele innych poleceń dla określonego backendu.

Tak, są 2 rozwiązania, albo:

  • text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';").execution_options(autocommit=True).
  • Albo dostać stałą wersji dialektu przesunięcia ku czerwieni ... Właśnie opened a PR o tym
0

miałem sukces przy użyciu języka wyrażeń rdzeń i Connection.execute() (w przeciwieństwie do ORM i sesjach) do skopiowania plików do rozdzielany Redshift z kodem poniżej. Być może mógłbyś go dostosować do JSON.

def copy_s3_to_redshift(conn, s3path, table, aws_access_key, aws_secret_key, delim='\t', uncompress='auto', ignoreheader=None): 
    """Copy a TSV file from S3 into redshift. 

    Note the CSV option is not used, so quotes and escapes are ignored. Empty fields are loaded as null. 
    Does not commit a transaction. 
    :param Connection conn: SQLAlchemy Connection 
    :param str uncompress: None, 'gzip', 'lzop', or 'auto' to autodetect from `s3path` extension. 
    :param int ignoreheader: Ignore this many initial rows. 
    :return: Whatever a copy command returns. 
    """ 
    if uncompress == 'auto': 
     uncompress = 'gzip' if s3path.endswith('.gz') else 'lzop' if s3path.endswith('.lzo') else None 

    copy = text(""" 
     copy "{table}" 
     from :s3path 
     credentials 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key}' 
     delimiter :delim 
     emptyasnull 
     ignoreheader :ignoreheader 
     compupdate on 
     comprows 1000000 
     {uncompress}; 
     """.format(uncompress=uncompress or '', table=text(table), aws_access_key=aws_access_key, aws_secret_key=aws_secret_key)) # copy command doesn't like table name or keys single-quoted 
    return conn.execute(copy, s3path=s3path, delim=delim, ignoreheader=ignoreheader or 0)