5

Mam scenariusz, w którym będę otrzymywać dane strumieniowe, które są przetwarzane przez mój program strumieniowania iskier, a dane wyjściowe dla każdego interwału są dołączane do mojego istniejący stół z kandży.java.lang.UnsupportedOperationException: "Zapisywanie do pustej tabeli Cassandra jest niedozwolone

Obecnie mój program do strumieniowania iskier generuje ramkę danych, którą muszę zapisać w moim stole z kassandra. Problem Jestem obecnie stoi to nie jestem w stanie dołączyć dane/wierszy do mojego istniejącej tabeli Cassandra kiedy używam poniżej polecenia

dff.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "xxx", "yyy" -> "retail")).save() 

Czytałem w poniższy link http://rustyrazorblade.com/2015/08/migrating-from-mysql-to-cassandra-using-spark/ gdzie zdał mode = „dołączyć” do metody save ale jego błąd składni rzucanie

również byłem nt stanie zrozumieć, gdzie muszę ustalić z linku poniżej https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/rlGGWQF2wnM

potrzebujesz pomocy tak jak rozwiązać ten issue.I'm pisać moją iskrę przesyłanie strumieniowe zadań w scala

Odpowiedz

8

Myślę, że trzeba to zrobić w następujący sposób:

dff.write.format("org.apache.spark.sql.cassandra").mode(SaveMode.Append).options(Map("table" -> "xxx", "yyy" -> "retail")).save() 

Sposób Cassandra obsługuje siły danych, aby zrobić tak zwanych „upserts” - trzeba pamiętać, że wkładka może zastąpić niektóre z wierszy gdzie klucz podstawowy już zapisanego rekordu jest taki sam jak klucz podstawowy wstawionego rekordu. Cassandra jest bazą do szybkiego zapisu, więc nie sprawdza istnienia danych przed ich zapisaniem.