2016-05-26 45 views
5

Używam nadawania zmiennej około 100 MB marynowanych w rozmiarze, który mam zbliżoną z:Wskazówki dotyczące poprawnego używania dużych zmiennych rozgłoszeniowych?

>>> data = list(range(int(10*1e6))) 
>>> import cPickle as pickle 
>>> len(pickle.dumps(data)) 
98888896 

bieganie w klastrze z 3 wykonawców c3.2xlarge oraz sterownik m3.large, z następujące polecenia rozpoczęcia sesji interaktywnej:

IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g 

W RDD, jeśli utrzymują się odniesienie do tej zmiennej transmisji, eksploduje wykorzystania pamięci. W przypadku 100 odniesień do zmiennej o wielkości 100 MB, nawet jeśli zostałyby skopiowane 100 razy, można by się spodziewać, że wykorzystanie danych nie będzie większe niż 10 GB (nie mówiąc już o 30 GB na 3 węzłach). Jednak widzę z błędami pamięci podczas uruchamiania następujący test:

data = list(range(int(10*1e6))) 
metadata = sc.broadcast(data) 
ids = sc.parallelize(zip(range(100), range(100))) 
joined_rdd = ids.mapValues(lambda _: metadata.value) 
joined_rdd.persist() 
print('count: {}'.format(joined_rdd.count())) 

ślad stosu:

TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13): 

org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func 
    return f(iterator) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda> 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr> 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream 
    yield self._read_with_length(stream) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
MemoryError 


    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) 
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job 
--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-1-7a262fdfa561> in <module>() 
     7 joined_rdd.persist() 
     8 print('persist called') 
----> 9 print('count: {}'.format(joined_rdd.count())) 

/usr/lib/spark/python/pyspark/rdd.py in count(self) 
    1004   3 
    1005   """ 
-> 1006   return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    1007 
    1008  def stats(self): 

/usr/lib/spark/python/pyspark/rdd.py in sum(self) 
    995   6.0 
    996   """ 
--> 997   return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 
    998 
    999  def count(self): 

/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op) 
    869   # zeroValue provided to each partition is unique from the one provided 
    870   # to the final reduce call 
--> 871   vals = self.mapPartitions(func).collect() 
    872   return reduce(op, vals, zeroValue) 
    873 

/usr/lib/spark/python/pyspark/rdd.py in collect(self) 
    771   """ 
    772   with SCCallSiteSync(self.context) as css: 
--> 773    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    774   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    775 

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

widziałem poprzednich wątków o wykorzystaniu pamięci Peklowanie deserializacji będącej problemem. Oczekuję jednak, że zmienna rozgłaszania będzie tylko deserializowana (i ładowana do pamięci na executorze), a kolejne odniesienia do .value będą odwoływać się do tego adresu w pamięci. Jednak wydaje się, że tak nie jest. Czy czegoś brakuje?

Przykłady, które widziałem ze zmiennymi rozgłoszeniowymi, mają je jako słowniki, użyte jednorazowo do przekształcenia zestawu danych (tj. Zastąpienie akronimów lotniskowych nazwami portów lotniczych). Motywacją do utrzymania ich w tym miejscu jest tworzenie obiektów z wiedzą o zmiennej rozgłaszanej i jak z nią współdziałać, utrzymywanie tych obiektów i wykonywanie wielu obliczeń za ich pomocą (z iskrą dbającą o trzymanie ich w pamięci).

Jakie są wskazówki dotyczące korzystania z dużych (100 MB +) zmiennych rozgłoszeniowych? Czy wprowadzanie zmiennej programowej jest błędne? Czy jest to problem, który może być specyficzny dla PySpark?

Dziękujemy! Twoja pomoc jest doceniana.

Uwaga, ja też pisał to pytanie na databricks forums

Edycja - nawiązanie pytanie:

Zasugerowano, że serializer domyślny Spark ma wielkość partii 65337. obiektów w różnych odcinkach partie nie są identyfikowane jako takie same i mają przypisane różne adresy pamięci, sprawdzane tutaj za pomocą wbudowanej funkcji id. Jednak nawet przy większej zmiennej rozgłaszania, która teoretycznie wymaga 256 serializacji do serializowania, nadal widzę tylko 2 odrębne kopie. Czy nie powinienem widzieć o wiele więcej? Czy rozumiem, w jaki sposób serializacja partii działa nieprawidłowo?

>>> sc.serializer.bestSize 
65536 
>>> import cPickle as pickle 
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))} 
>>> len(pickle.dumps(broadcast_data)) 
16777786 
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))}))/sc.serializer.bestSize 
256 
>>> bd = sc.broadcast(broadcast_data) 
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value) 
>>> rdd.map(id).distinct().count() 
1 
>>> rdd.cache().count() 
100 
>>> rdd.map(id).distinct().count() 
2 

Odpowiedz

5

Cóż, diabeł tkwi w szczegółach. Aby zrozumieć, dlaczego tak się stało, musimy przyjrzeć się bliżej serializatorom PySpark.Po pierwsze pozwala tworzyć SparkContext z ustawieniami domyślnymi:

from pyspark import SparkContext 

sc = SparkContext("local", "foo") 

i sprawdzić, co jest domyślnym serializer:

sc.serializer 
## AutoBatchedSerializer(PickleSerializer()) 

sc.serializer.bestSize 
## 65536 

Mówi nam trzy różne rzeczy:

  • tego jest AutoBatchedSerializer serializer
  • używa on PickleSerializer do wykonywania rzeczywistej pracy
  • bestSize z odcinkach dozowane jest 65536 bajtów

szybki rzut oka at the source code pokaże, że to serialize dostosowuje liczbę rekordów w odcinkach w czasie na starcie i stara się utrzymać wielkość partii mniej niż 10 * bestSize. Ważne jest to, że nie wszystkie rekordy w jednej partycji są serializowane w tym samym czasie.

Możemy sprawdzić doświadczalnie, że w następujący sposób:

from operator import add 

bd = sc.broadcast({}) 

rdd = sc.parallelize(range(10), 1).map(lambda _: bd.value) 
rdd.map(id).distinct().count() 
## 1 

rdd.cache().count() 
## 10 

rdd.map(id).distinct().count() 
## 2 

Jak widać nawet w tym prostym przykładzie po serializacji-deserializacji otrzymujemy dwa odrębne obiekty. można obserwować podobne zachowanie pracujących bezpośrednio z pickle:

v = {} 
vs = [v, v, v, v] 

v1, *_, v4 = pickle.loads(pickle.dumps(vs)) 
v1 is v4 
## True 

(v1_, v2_), (v3_, v4_) = (
    pickle.loads(pickle.dumps(vs[:2])), 
    pickle.loads(pickle.dumps(vs[2:])) 
) 

v1_ is v4_ 
## False 

v3_ is v4_ 
## True 

wartości szeregowane w tym samym odniesienie wsadowym, po unpickling, tego samego obiektu. Wartości z różnych partii wskazują na różne obiekty.

W praktyce Spark wielokrotne serializacje i różne strategie serializacji. Można na przykład użyć partii nieskończonej wielkości:

from pyspark.serializers import BatchedSerializer, PickleSerializer 

rdd_ = (sc.parallelize(range(10), 1).map(lambda _: bd.value) 
    ._reserialize(BatchedSerializer(PickleSerializer()))) 
rdd_.cache().count() 

rdd_.map(id).distinct().count() 
## 1 

można zmienić serializatora przekazując serializer i/lub batchSize parametry SparkContext konstruktora:

sc = SparkContext(
    "local", "bar", 
    serializer=PickleSerializer(), # Default serializer 
    # Unlimited batch size -> BatchedSerializer instead of AutoBatchedSerializer 
    batchSize=-1 
) 

sc.serializer 
## BatchedSerializer(PickleSerializer(), -1) 

wybierając różne serializers i dawkowania strategie wyniki w różnych handlu -offs (szybkość, możliwość przekształcania do postaci szeregowej dowolnych obiektów, wymagania dotyczące pamięci itp.).

Należy również pamiętać, że zmienne rozgłoszeniowe w Sparku nie są współużytkowane między wątkami executorów, więc na tym samym elemencie pracującym mogą istnieć jednocześnie wiele deserializowanych kopii.

Ponadto zobaczysz podobne zachowanie, jeśli wykonasz transformację, która wymaga tasowania.

+0

Czy możesz wypowiedzieć się na temat kompromisu między serializacjami? Przy zwiększonym rozmiarze wsadu, czy powinniśmy oczekiwać więcej pamięci wymaganej do serializacji? Jak wpłynie to na szybkość serializacji? Dlaczego warto wybrać serializer, który nie może serializować arbitralnych obiektów? – captaincapsaicin

+0

Cóż, pełna partycja może nie pasować do pamięci, więc jeśli partia jest nieskończona, nie ma gwarancji, że się uda. To na początek. Wyższe zużycie pamięci może prowadzić do różnych problemów z GC. Jeśli chodzi o twoje ostatnie pytania, żaden serializator nie może przetworzyć arbitralnego obiektu, szczególnie jeśli jest on powiązany z natywnym kodem. Istnieje kilka konstrukcji językowych, które nie mogą być serializowane domyślnie według projektu (jak wyrażenia lambda) i wymagają specjalistycznych narzędzi. Z drugiej strony serializacja złożonych zamknięć może być powolna. – zero323

+0

Edytowałem również w odpowiedzi na moje oryginalne pytanie. Czy możesz rzucić okiem i wyjaśnić, w jaki sposób wielkość partii koreluje z liczbą różnych obiektów, które widzimy w pamięci Sparka? – captaincapsaicin