2017-01-09 45 views
14

Włączyłem serializację Kryo dla mojej pracy Spark, włączyłem ustawienie wymagające rejestracji i upewniłem się, że wszystkie moje typy zostały zarejestrowane.Dlaczego funkcja Spark działa gorzej podczas korzystania z serializacji Kryo?

val conf = new SparkConf() 
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
conf.set("spark.kryo.registrationRequired", "true") 
conf.registerKryoClasses(classes) 
conf.registerAvroSchemas(avroSchemas: _*) 

Wydajność czasowa w szafie czasowej pogorszyła się o około 20%, a liczba przetasowań bajtów wzrosła o prawie 400%.

Wydaje mi się to naprawdę zaskakujące, biorąc pod uwagę sugestię, że Kryo powinien być lepszy.

Kryo jest znacznie szybszy i bardziej zwarty niż serializacji Java (często aż 10x)

ręcznie wywoływane metody serialize na wystąpień Sparka org.apache.spark.serializer.KryoSerializer i org.apache.spark.serializer.JavaSerializer na przykładzie moich danych. Wyniki były zgodne z sugestiami zawartymi w dokumentacji Spark: Kryo wyprodukował 98 bajtów; Java wyprodukowała 993 bajty. To naprawdę 10-krotna poprawa.

Prawdopodobnie mylący jest fakt, że obiekty, które są serializowane i tasowane, implementują interfejs Avro GenericRecord. Próbowałem zarejestrować schematy Avro w SparkConf, ale nie wykazały żadnej poprawy.

Próbowałem tworzyć nowe klasy, aby przetasować dane, które były proste Scala case class es, bez żadnych maszyn Avro. Nie poprawiło to wydajności odtwarzania losowego ani liczby wymienianych bajtów.

Kod Spark kończy się aż do wrzenia następuje:

case class A(
    f1: Long, 
    f2: Option[Long], 
    f3: Int, 
    f4: Int, 
    f5: Option[String], 
    f6: Option[Int], 
    f7: Option[String], 
    f8: Option[Int], 
    f9: Option[Int], 
    f10: Option[Int], 
    f11: Option[Int], 
    f12: String, 
    f13: Option[Double], 
    f14: Option[Int], 
    f15: Option[Double], 
    f16: Option[Double], 
    f17: List[String], 
    f18: String) extends org.apache.avro.specific.SpecificRecordBase { 
    def get(f: Int) : AnyRef = ??? 
    def put(f: Int, value: Any) : Unit = ??? 
    def getSchema(): org.apache.avro.Schema = A.SCHEMA$ 
} 
object A extends AnyRef with Serializable { 
    val SCHEMA$: org.apache.avro.Schema = ??? 
} 

case class B(
    f1: Long 
    f2: Long 
    f3: String 
    f4: String) extends org.apache.avro.specific.SpecificRecordBase { 
    def get(field$ : Int) : AnyRef = ??? 
    def getSchema() : org.apache.avro.Schema = B.SCHEMA$ 
    def put(field$ : Int, value : Any) : Unit = ??? 
} 
object B extends AnyRef with Serializable { 
    val SCHEMA$ : org.apache.avro.Schema = ??? 
} 

def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = { 
    val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b) 
    joined.map { case (_, asAndBs) => asAndBs } 
} 

Czy masz jakiś pomysł co może być dzieje lub w jaki sposób mogę uzyskać lepszą wydajność, które powinny być dostępne od Kryo?

+0

mógłbyś pisać przykładową klasę sprawy i pracę? O wiele łatwiej byłoby odpowiedzieć na to pytanie, –

+0

Dobrze, @ T.Gawęd. Zaktualizowany za pomocą uproszczonego kodu. –

+0

Jak zmierzyłeś swój kod? –

Odpowiedz

0

Ponieważ macie duże RDD o dużej liczności, łączenie emisji i emisji nie jest niestety niestety możliwe.

Twoja najlepsza szansa to coalesce() Twoje RDD przed przystąpieniem. Czy widzisz wysokie pochylenie w czasie tasowania? Jeśli tak, możesz chcieć połączyć się z shuffle = true.

Wreszcie, jeśli posiadasz RDD struktur zagnieżdżonych (np. JSON), to czasami pozwala ci ominąć tasowanie. Obejrzyj slajdy i/lub wideo here, aby uzyskać bardziej szczegółowe wyjaśnienie.

1

Jeśli pojedynczy rozmiar rekordu jest zbyt mały, a duża liczba rekordów może spowolnić pracę. Spróbuj zwiększyć rozmiar bufora i sprawdź, czy przynosi poprawę.

Spróbuj poniżej jednego, jeśli jeszcze tego nie zrobiłeś ..

val conf = new SparkConf() 
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    // Now it's 24 Mb of buffer by default instead of 0.064 Mb 
    .set("spark.kryoserializer.buffer.mb","24") 

Ref: https://ogirardot.wordpress.com/2015/01/09/changing-sparks-default-java-serialization-to-kryo/