5

Przygotowuję przykład zabawki spark.ml. Spark version 1.6.0, uruchomiony na szczycie Oracle JDK version 1.8.0_65, pyspark, notatnik ipython.spark.ml StringIndexer rzuca "Niewidoczna etykieta" na dopasowanie()

Po pierwsze, nie ma to nic wspólnego z Spark, ML, StringIndexer: handling unseen labels. Wyjątek jest zgłaszany podczas dopasowywania potoku do zestawu danych, a nie przekształcania go. A tłumienie wyjątku może nie być tutaj rozwiązaniem, ponieważ obawiam się, że zbiór danych jest w tym przypadku bardzo zły.

Mój zestaw danych ma około 800 MB nieskompresowanych, więc może być trudny do odtworzenia (mniejsze podzbiory wydają się unikać tego problemu).

Zbiór danych wygląda następująco:

+--------------------+-----------+-----+-------+-----+--------------------+ 
|     url|   ip| rs| lang|label|     txt| 
+--------------------+-----------+-----+-------+-----+--------------------+ 
|http://3d-detmold...|217.160.215|378.0|  de| 0.0|homwillkommskip c...| 
| http://3davto.ru/| 188.225.16|891.0|  id| 1.0|оформить заказ пе...| 
| http://404.szm.com/| 85.248.42| 58.0|  cs| 0.0|kliknite tu alebo...| 
| http://404.xls.hu/| 212.52.166|168.0|  hu| 0.0|honlapkészítés404...| 
|http://a--m--a--t...| 66.6.43|462.0|  en| 0.0|back top archiv r...| 
|http://a-wrf.ru/c...| 78.108.80|126.0|unknown| 1.0|     | 
|http://a-wrf.ru/s...| 78.108.80|214.0|  ru| 1.0|установк фаркопна...| 
+--------------------+-----------+-----+-------+-----+--------------------+ 

Wartość są przewidywane jest label. Cały rurociąg do niego stosować:

from pyspark.ml import Pipeline 
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF 
from pyspark.ml.classification import LogisticRegression 

train, test = munge(src_dataframe).randomSplit([70., 30.], seed=12345) 

pipe_stages = [ 
    StringIndexer(inputCol='lang', outputCol='lang_idx'), 
    OneHotEncoder(inputCol='lang_idx', outputCol='lang_onehot'), 
    Tokenizer(inputCol='ip', outputCol='ip_tokens'), 
    HashingTF(numFeatures=2**10, inputCol='ip_tokens', outputCol='ip_vector'), 
    Tokenizer(inputCol='txt', outputCol='txt_tokens'), 
    HashingTF(numFeatures=2**18, inputCol='txt_tokens', outputCol='txt_vector'), 
    VectorAssembler(inputCols=['lang_onehot', 'ip_vector', 'txt_vector'], outputCol='features'), 
    LogisticRegression(labelCol='label', featuresCol='features') 
] 

pipe = Pipeline(stages=pipe_stages) 
pipemodel = pipe.fit(train) 

A oto StackTrace:

Py4JJavaError: An error occurred while calling o10793.fit. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 627.0 failed 1 times, most recent failure: Lost task 18.0 in stage 627.0 (TID 23259, localhost): org.apache.spark.SparkException: Unseen label: pl-PL. 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) 
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) 
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113) 
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:271) 
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159) 
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) 
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Unseen label: pl-PL. 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) 
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) 
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) 
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

Najbardziej interesujące jest to:

org.apache.spark.SparkException: Unseen label: pl-PL. 

Nie mam pojęcia, jak pl-PL która jest wartością od lang kolumna mogła zostać pomieszana w kolumnie label, która jest float, a nie string edycja: niektóre pochopnych coclusions, poprawione dzięki @ zero323

szukałem dalej do niego i stwierdził, że pl-PL jest wartością z części testowej zbioru danych, a nie szkolenia. Teraz nie wiem nawet, gdzie szukać winnego: może to być kod randomSplit, a nie StringIndexer i kto wie, co jeszcze.

Jak mogę to sprawdzić?

Odpowiedz

7

Unseen labelis a generic message which doesn't correspond to a specific column. Najprawdopodobniej problem jest z następnym etapie:

StringIndexer(inputCol='lang', outputCol='lang_idx') 

z pl-PL obecny w train("lang") i nie występuje w test("lang").

Można go skorygować za pomocą setHandleInvalid z skip:

from pyspark.ml.feature import StringIndexer 

train = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["k", "v"]) 
test = sc.parallelize([(3, "foo"), (4, "foobar")]).toDF(["k", "v"]) 

indexer = StringIndexer(inputCol="v", outputCol="vi") 
indexer.fit(train).transform(test).show() 

## Py4JJavaError: An error occurred while calling o112.showString. 
## : org.apache.spark.SparkException: Job aborted due to stage failure: 
## ... 
## org.apache.spark.SparkException: Unseen label: foobar. 

indexer.setHandleInvalid("skip").fit(train).transform(test).show() 

## +---+---+---+ 
## | k| v| vi| 
## +---+---+---+ 
## | 3|foo|1.0| 
## +---+---+---+ 
+0

Gdzie umieścisz setHandleInvalid ("skip") w potoku? – mikeL

+0

@mikeL Gdziekolwiek zdefiniujesz 'StringIndexer'. Jest to "Param" indeksu, a nie "Pipeline". – zero323

2

Dobrze Myślę, że mam to. Przynajmniej to działa.

Caching theframe (w tym pociąg/test partes) rozwiązuje problem. To właśnie znalazłem w tym numerze JIRA: https://issues.apache.org/jira/browse/SPARK-12590.

To nie jest błąd, tylko fakt, że randomSample może przynieść inny wynik na tym samym, ale inaczej podzielonym na partycje zbiorze danych. I najwyraźniej niektóre z moich funkcji mungowania (lub Pipeline) wymagają ponownego dzielenia, dlatego wyniki rekomputera zestawu pociągowego z jego definicji mogą się różnić.

Co nadal mnie interesuje - to odtwarzalność: to zawsze wiersz "pl-PL", który miesza się w niewłaściwej części zbioru danych, tj. Nie jest przypadkowym partycjonowaniem. To deterministyczne, tylko niekonsekwentne. Zastanawiam się, jak to się dzieje.