5

Jestem nowicjuszem do iskrzenia i kassandra. Próbuję wstawić do tabeli Cassandra za pomocą łącznika zapłonem Cassandra jak poniżej:Wykonanie wkładki Cassandra przy użyciu złącza iskrobassandra

import java.util.UUID 

import org.apache.spark.{SparkContext, SparkConf} 
import org.joda.time.DateTime 
import com.datastax.spark.connector._ 

case class TestEntity(id:UUID, category:String, name:String,value:Double, createDate:DateTime, tag:Long) 

object SparkConnectorContext { 
    val conf = new SparkConf(true).setMaster("local") 
    .set("spark.cassandra.connection.host", "192.168.xxx.xxx") 
    val sc = new SparkContext(conf) 
} 
object TestRepo { 
    def insertList(list: List[TestEntity]) = { 
    SparkConnectorContext.sc.parallelize(list).saveToCassandra("testKeySpace", "testColumnFamily") 
    } 
} 
object TestApp extends App { 
    val start = System.currentTimeMillis() 
    TestRepo.insertList(Utility.generateRandomData()) 
    val end = System.currentTimeMillis() 
    val timeDiff = end-start 
    println("Difference (in millis)= "+timeDiff) 
} 

kiedy wstawić przy użyciu powyższej metody (listę 100 podmiotów), trwa 300-1100 milliseconds. Próbowałem te same dane do wstawienia przy użyciu biblioteki phantom. Zajmuje to mniej niż 20-40 milliseconds.

Czy ktoś może mi powiedzieć, dlaczego złącze świecowe zajmuje tyle czasu na wstawienie? Czy robię cokolwiek złego w moim kodzie lub czy nie zaleca się używania spark-cassandra connector dla operacji wstawiania?

+0

Ile masz węzłów Cassandra? Czy twoi iskrowcy działają na węzłach Cassandra? Nie widzę żadnych pomiarów czasu w kodzie, więc wygląda na to, że mierzysz o wiele więcej operacji niż tylko czas wstawiania. –

+0

Właśnie zacząłem z Kasandra i iskrą. Używam lokalnej iskry. Ale Kasandra znajduje się w innej maszynie, ale w tej samej sieci. Jeśli chodzi o pomiar czasu, zredagowałem kod. –

+0

@YaduKrishnan Spark rozprasza operacje w całej sieci, podczas gdy phantom jest wykonywany bezpośrednio zapisuje parallelised na JVM poprzez klasyczne wielowątkowość. Jest także znacznie szybszy w mapowaniu danych, ponieważ większość magii to czas kompilacji, nie ma boksa wykonawczego, ponieważ wszystkie konwersje są wyspecjalizowane, złącze iskrzenia tak naprawdę nie dotyka bitów Scala. – flavian

Odpowiedz

4

Wygląda na to, że włączyłeś operację równoległości w swoim czasie. Również dlatego, że twój robot iskrowy działa na innej maszynie niż Cassandra, operacja saveToCassandra będzie zapisana w sieci.

Spróbuj skonfigurować system do uruchamiania robotów iskrowych na węzłach Cassandra. Następnie utwórz RDD w oddzielnym kroku i wywołaj akcję taką jak count(), aby załadować dane do pamięci. Również możesz chcieć utrzymywać() lub cache() RDD, aby upewnić się, że pozostanie w pamięci do testu.

Później tylko saveToCassandra tego buforowanego RDD.

Możesz również zajrzeć do metody repartitionByCassandraReplica oferowanej przez złącze Cassandra. To podzieliłoby dane w RDD na podstawie tego, do którego węzła Cassandra potrzebne są zapisy. W ten sposób wykorzystujesz lokalność danych i często unikasz pisania i tasowania w sieci.

+0

Próbowałem z pamięci podręcznej rdd, i poprawia wydajność przetwarzania drastycznie. Ale mam wątpliwości, czy jeśli nowy rekord zostanie wstawiony do tabeli C *, która jest zapisana w pamięci podręcznej rdd za pomocą iskrownika, czy pamięć podręczna zostanie wyczyszczona lub zaktualizowana automatycznie? Lub pozostanie taki sam? –

+0

RDD są niezmienne, więc nie można ich zmienić. W Spark tworzysz nowe RDD zamiast modyfikować istniejące. Zbuforowane karty RDD zostaną eksmitowane, gdy Spark będzie potrzebował pamięci na inne rzeczy. –

1

Istnieją pewne poważne problemy z „benchmark”:

  1. Twój zbiór danych jest tak mały, że jesteś pomiaru przeważnie tylko raz skonfigurować zadanie. Zapisywanie 100 elementów powinno być rzędu pojedynczych milisekund na pojedynczym węźle, a nie na sekundę. Ponadto zapisanie 100 elementów daje JVM żadnej szansy na skompilowanie kodu, który uruchamiasz, do zoptymalizowanego kodu maszynowego.
  2. Zawarłeś inicjalizację kontekstu iskry w swoim pomiarze. JVM ładuje klasy leniwie, więc kod inicjalizacji iskry jest naprawdę wywoływany po rozpoczęciu pomiaru. Jest to niezwykle kosztowny element, zwykle wykonywany tylko raz na całą aplikację iskrzenia, nawet na jedno zadanie.
  3. Pomiar wykonuje się tylko raz na uruchomienie. Oznacza to, że nawet nieprawidłowo ty pomiaru konfiguracji iskra ctx i czas konfiguracji pracy, ponieważ JVM musi załadować wszystkie klasy po raz pierwszy i zapewne nie Hotspot ma szansę kopać.

Podsumowując, ty” najprawdopodobniej bardzo często mierzy czas ładowania klasy, który zależy od wielkości i liczby załadowanych klas. Spark to całkiem spora rzecz do załadowania, a kilkaset milisekund nie jest wcale zaskakujące.

Aby zmierzyć wydajność insert poprawnie:

  • zastosować większy zestaw danych
  • wykluczyć konfigurację jednorazową z pomiaru
  • zrobić wiele tras dzielące ten sam kontekst zapłonową i wyrzucić kilka początkowych tych, aż osiągasz stałą wydajność.

BTW Po włączeniu poziomu rejestrowania debugowania złącze rejestruje czasy wstawiania dla każdej partycji w dziennikach executorów.