Włączyłem serializację Kryo dla mojej pracy Spark, włączyłem ustawienie wymagające rejestracji i upewniłem się, że wszystkie moje typy zostały zarejestrowane.Dlaczego funkcja Spark działa gorzej podczas korzystania z serializacji Kryo?
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)
Wydajność czasowa w szafie czasowej pogorszyła się o około 20%, a liczba przetasowań bajtów wzrosła o prawie 400%.
Wydaje mi się to naprawdę zaskakujące, biorąc pod uwagę sugestię, że Kryo powinien być lepszy.
Kryo jest znacznie szybszy i bardziej zwarty niż serializacji Java (często aż 10x)
ręcznie wywoływane metody serialize
na wystąpień Sparka org.apache.spark.serializer.KryoSerializer
i org.apache.spark.serializer.JavaSerializer
na przykładzie moich danych. Wyniki były zgodne z sugestiami zawartymi w dokumentacji Spark: Kryo wyprodukował 98 bajtów; Java wyprodukowała 993 bajty. To naprawdę 10-krotna poprawa.
Prawdopodobnie mylący jest fakt, że obiekty, które są serializowane i tasowane, implementują interfejs Avro GenericRecord
. Próbowałem zarejestrować schematy Avro w SparkConf
, ale nie wykazały żadnej poprawy.
Próbowałem tworzyć nowe klasy, aby przetasować dane, które były proste Scala case class
es, bez żadnych maszyn Avro. Nie poprawiło to wydajności odtwarzania losowego ani liczby wymienianych bajtów.
Kod Spark kończy się aż do wrzenia następuje:
case class A(
f1: Long,
f2: Option[Long],
f3: Int,
f4: Int,
f5: Option[String],
f6: Option[Int],
f7: Option[String],
f8: Option[Int],
f9: Option[Int],
f10: Option[Int],
f11: Option[Int],
f12: String,
f13: Option[Double],
f14: Option[Int],
f15: Option[Double],
f16: Option[Double],
f17: List[String],
f18: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(f: Int) : AnyRef = ???
def put(f: Int, value: Any) : Unit = ???
def getSchema(): org.apache.avro.Schema = A.SCHEMA$
}
object A extends AnyRef with Serializable {
val SCHEMA$: org.apache.avro.Schema = ???
}
case class B(
f1: Long
f2: Long
f3: String
f4: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(field$ : Int) : AnyRef = ???
def getSchema() : org.apache.avro.Schema = B.SCHEMA$
def put(field$ : Int, value : Any) : Unit = ???
}
object B extends AnyRef with Serializable {
val SCHEMA$ : org.apache.avro.Schema = ???
}
def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = {
val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b)
joined.map { case (_, asAndBs) => asAndBs }
}
Czy masz jakiś pomysł co może być dzieje lub w jaki sposób mogę uzyskać lepszą wydajność, które powinny być dostępne od Kryo?
mógłbyś pisać przykładową klasę sprawy i pracę? O wiele łatwiej byłoby odpowiedzieć na to pytanie, –
Dobrze, @ T.Gawęd. Zaktualizowany za pomocą uproszczonego kodu. –
Jak zmierzyłeś swój kod? –