9

Używam Apache Spark DataFrames, aby połączyć dwa źródła danych i uzyskać wynik jako inny DataFrame. Chcę zapisać wynik do innej tabeli PostgreSQL. Widzę tę opcję:Spark Dataframes UPSERT do Postgres Tabela

myDataFrame.write.jdbc(url, table, connectionProperties) 

Ale, co chcę zrobić, to upsert dataframe w tabeli na podstawie klucza podstawowego tabeli. Jak to zrobić? Używam Spark 1.6.0.

Odpowiedz

8

Nie jest obsługiwany. DataFrameWriter można albo dołączyć do istniejącej tabeli, albo ją zastąpić. Jeśli twoja aplikacja wymaga bardziej złożonej logiki, musisz sobie z tym poradzić ręcznie.

Jedną opcją jest użycie akcji (foreach, foreachPartition) ze standardowym połączeniem JDBC. Innym jest napisanie do pliku tymczasowego i obsłużenie reszty bezpośrednio w bazie danych.

+0

Również, jak zastąpić istniejącą tabelę jcbc? Widzę tylko opcję df.write.mode(). SaveAsTable() Ale to nie obsługuje tabel jdbc – void

+0

dataframe.write.mode (SaveMode.OverWrite) –

6

Jeśli masz zamiar zrobić to ręcznie i za pomocą opcji 1 wspomnianej przez zero323, należy przyjrzeć Spark source code for the insert statement here

def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = { 
    val columns = rddSchema.fields.map(_.name).mkString(",") 
    val placeholders = rddSchema.fields.map(_ => "?").mkString(",") 
    val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)" 
    conn.prepareStatement(sql) 
    } 

The PreparedStatement jest part of java.sql i ma metody, takie jak execute() i executeUpdate(). Nadal musisz oczywiście zmodyfikować sql.

2

Aby wstawić JDBC można używać

dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)

Również Dataframe.write daje DataFrameWriter i ma kilka metod, aby wstawić dataframe.

def insertInto(tableName: String): Unit

Wstawia zawartość w DataFrame do podanej tabeli. Wymaga to, aby schemat ramki DataFrame był taki sam, jak schemat tabeli.

Ponieważ wstawia dane do istniejącej tabeli, format lub opcje będą ignorowane.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

Nic jeszcze zaktualizować poszczególne rekordy z pudełka od iskry chociaż

4

KrisP ma prawo do niego. Najlepszym sposobem, aby zrobić upsert nie jest przez przygotowane oświadczenie. Ważne jest, aby pamiętać, że ta metoda będzie wstawiać po jednym na raz z tak wieloma partycjami, jak liczba pracowników, których masz. Jeśli chcesz to zrobić w partii można także

import java.sql._ 
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch => 
val dbc: Connection = DriverManager.getConnection("JDBCURL") 
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT") 

batch.grouped("# Of Rows you want per batch").foreach { session => 
    session.foreach { x => 
    st.setDouble(1, x.getDouble(1)) 
    st.addBatch() 
    } 
    st.executeBatch() 
} 
dbc.close() 
    } 

Spowoduje to wykonanie partii dla każdego pracownika i zamknąć połączenie DB. Daje ci kontrolę nad liczbą pracowników, ilością partii i pozwala pracować w tych granicach.