Używam nadawania zmiennej około 100 MB marynowanych w rozmiarze, który mam zbliżoną z:Wskazówki dotyczące poprawnego używania dużych zmiennych rozgłoszeniowych?
>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896
bieganie w klastrze z 3 wykonawców c3.2xlarge oraz sterownik m3.large, z następujące polecenia rozpoczęcia sesji interaktywnej:
IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g
W RDD, jeśli utrzymują się odniesienie do tej zmiennej transmisji, eksploduje wykorzystania pamięci. W przypadku 100 odniesień do zmiennej o wielkości 100 MB, nawet jeśli zostałyby skopiowane 100 razy, można by się spodziewać, że wykorzystanie danych nie będzie większe niż 10 GB (nie mówiąc już o 30 GB na 3 węzłach). Jednak widzę z błędami pamięci podczas uruchamiania następujący test:
data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))
ślad stosu:
TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
MemoryError
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-1-7a262fdfa561> in <module>()
7 joined_rdd.persist()
8 print('persist called')
----> 9 print('count: {}'.format(joined_rdd.count()))
/usr/lib/spark/python/pyspark/rdd.py in count(self)
1004 3
1005 """
-> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1007
1008 def stats(self):
/usr/lib/spark/python/pyspark/rdd.py in sum(self)
995 6.0
996 """
--> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
998
999 def count(self):
/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
869 # zeroValue provided to each partition is unique from the one provided
870 # to the final reduce call
--> 871 vals = self.mapPartitions(func).collect()
872 return reduce(op, vals, zeroValue)
873
/usr/lib/spark/python/pyspark/rdd.py in collect(self)
771 """
772 with SCCallSiteSync(self.context) as css:
--> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
774 return list(_load_from_socket(port, self._jrdd_deserializer))
775
/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
widziałem poprzednich wątków o wykorzystaniu pamięci Peklowanie deserializacji będącej problemem. Oczekuję jednak, że zmienna rozgłaszania będzie tylko deserializowana (i ładowana do pamięci na executorze), a kolejne odniesienia do .value
będą odwoływać się do tego adresu w pamięci. Jednak wydaje się, że tak nie jest. Czy czegoś brakuje?
Przykłady, które widziałem ze zmiennymi rozgłoszeniowymi, mają je jako słowniki, użyte jednorazowo do przekształcenia zestawu danych (tj. Zastąpienie akronimów lotniskowych nazwami portów lotniczych). Motywacją do utrzymania ich w tym miejscu jest tworzenie obiektów z wiedzą o zmiennej rozgłaszanej i jak z nią współdziałać, utrzymywanie tych obiektów i wykonywanie wielu obliczeń za ich pomocą (z iskrą dbającą o trzymanie ich w pamięci).
Jakie są wskazówki dotyczące korzystania z dużych (100 MB +) zmiennych rozgłoszeniowych? Czy wprowadzanie zmiennej programowej jest błędne? Czy jest to problem, który może być specyficzny dla PySpark?
Dziękujemy! Twoja pomoc jest doceniana.
Uwaga, ja też pisał to pytanie na databricks forums
Edycja - nawiązanie pytanie:
Zasugerowano, że serializer domyślny Spark ma wielkość partii 65337. obiektów w różnych odcinkach partie nie są identyfikowane jako takie same i mają przypisane różne adresy pamięci, sprawdzane tutaj za pomocą wbudowanej funkcji id
. Jednak nawet przy większej zmiennej rozgłaszania, która teoretycznie wymaga 256 serializacji do serializowania, nadal widzę tylko 2 odrębne kopie. Czy nie powinienem widzieć o wiele więcej? Czy rozumiem, w jaki sposób serializacja partii działa nieprawidłowo?
>>> sc.serializer.bestSize
65536
>>> import cPickle as pickle
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))}
>>> len(pickle.dumps(broadcast_data))
16777786
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))}))/sc.serializer.bestSize
256
>>> bd = sc.broadcast(broadcast_data)
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value)
>>> rdd.map(id).distinct().count()
1
>>> rdd.cache().count()
100
>>> rdd.map(id).distinct().count()
2
Czy możesz wypowiedzieć się na temat kompromisu między serializacjami? Przy zwiększonym rozmiarze wsadu, czy powinniśmy oczekiwać więcej pamięci wymaganej do serializacji? Jak wpłynie to na szybkość serializacji? Dlaczego warto wybrać serializer, który nie może serializować arbitralnych obiektów? – captaincapsaicin
Cóż, pełna partycja może nie pasować do pamięci, więc jeśli partia jest nieskończona, nie ma gwarancji, że się uda. To na początek. Wyższe zużycie pamięci może prowadzić do różnych problemów z GC. Jeśli chodzi o twoje ostatnie pytania, żaden serializator nie może przetworzyć arbitralnego obiektu, szczególnie jeśli jest on powiązany z natywnym kodem. Istnieje kilka konstrukcji językowych, które nie mogą być serializowane domyślnie według projektu (jak wyrażenia lambda) i wymagają specjalistycznych narzędzi. Z drugiej strony serializacja złożonych zamknięć może być powolna. – zero323
Edytowałem również w odpowiedzi na moje oryginalne pytanie. Czy możesz rzucić okiem i wyjaśnić, w jaki sposób wielkość partii koreluje z liczbą różnych obiektów, które widzimy w pamięci Sparka? – captaincapsaicin