Mam bazę danych Cassandra, z której analizowałem dane przy użyciu SparkSQL za pomocą Apache Spark. Teraz chcę wstawić te analizowane dane do PostgreSQL. Czy jest jakiś sposób, aby to osiągnąć bezpośrednio poza używaniem sterownika PostgreSQL (osiągnąłem to za pomocą PostREST i sterownika, chcę wiedzieć, czy istnieją jakieś metody, takie jak saveToCassandra()
)?Wstawianie danych analitycznych ze Sparka do PostgreStu
Odpowiedz
W tej chwili nie ma natywnej implementacji zapisu RDD do dowolnego systemu DBMS. Oto linki do pokrewnych dyskusji na liście użytkowników Spark: one, two
Generalnie, najbardziej wydajnych podejście byłoby następujące:
- potwierdzenia liczby partycji w RDD, nie powinno być za nisko i za wysoko. 20-50 partycji powinno być w porządku, jeśli liczba jest niższa - wywołanie
repartition
z 20 partycjami, jeśli wyższe - wywołaniecoalesce
do 50 partycji - Wywołanie transformacji
mapPartition
, w jej obrębie wywołanie funkcji do wstawiania rekordów do DBMS przy użyciu JDBC. W tej funkcji otwarciu połączenia z bazą danych i użyj polecenia COPY z this API, to pozwala wyeliminować konieczność stosowania osobnego polecenia dla każdego rekordu - w ten sposób wkładka byłyby przetwarzane znacznie szybciej
ten sposób wstawiania danych do PostgreSQL w sposób równoległy, wykorzystując do 50 połączeń równoległych (zależy od rozmiaru klastra Sparka i jego konfiguracji). Całe podejście może być zaimplementowane jako funkcja Java/Scala akceptująca RDD, a ciąg połączenia
Odpowiedź od 0x0FFF jest dobra. Oto dodatkowy punkt, który byłby przydatny.
Używam foreachPartition
do utrzymywania w magazynie zewnętrznym. Jest to również inline z wzorca projektowego Design Patterns for using foreachRDD
podanej w dokumentacji Spark https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams
Przykład:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
Można używać Postgres skopiować api to napisać, jej znacznie szybciej w ten sposób. Zobacz następujące dwie metody - jedna iteruje nad RDD, aby wypełnić bufor, który może zostać zapisany przez api kopiowania. Jedyną rzeczą, którą musisz się zająć, jest utworzenie poprawnej instrukcji w formacie csv, która będzie używana przez api kopiowania.
def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
val sb = mutable.StringBuilder.newBuilder
val now = System.currentTimeMillis()
rdd.collect().foreach(itr => {
itr.foreach(_.createCSV(sb, now).append("\n"))
})
copyIn("myTable", new StringReader(sb.toString), "statement")
sb.clear
}
def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
val conn = connectionPool.getConnection()
try {
conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
} catch {
case se: SQLException => logWarning(se.getMessage)
case t: Throwable => logWarning(t.getMessage)
} finally {
conn.close()
}
}
Czy bufor SBB StringBuilder nie zwiększy się bez powiązania, jak na liczbę rekordów w RDD EventModel? dlaczego nie zabraknie Ci pamięci? – nont
Używam tego rozwiązania, które działa już od wielu miesięcy i nie widziałem jak do tej pory brakowało mu pamięci. Ilość danych, które posiadam, jest dość spora - 100 000/s. Co więcej, jeśli obawiasz się o to, zawsze możesz mieć inną kontrolę, na podstawie której wywołasz copyIn i wyczyścisz bufor. – smishra