2017-09-26 77 views
7

Uruchomiłem zadanie iskry w EMR z YARN jako menedżerem zasobów i na 2 węzłach. Muszę celowo nie wykonać kroku, jeśli mój warunek nie jest spełniony, więc następny krok nie zostanie wykonany zgodnie z konfiguracją. Aby to osiągnąć, wyrzucam niestandardowy wyjątek po wstawieniu komunikatu dziennika w dynamoDB.Spark, Niepoprawne zachowanie podczas rzucania wyjątku SparkException w EMR

Działa dobrze, ale rekord w Dynamo jest wstawiany dwukrotnie.

Poniżej znajduje się mój kod.

if(<condition>) { 
    <method call to insert in dynamo> 
    throw new SparkException(<msg>); 
    return; 
} 

Jeśli usunę linię, aby rzucić wyjątek, działa dobrze, ale krok jest zakończony.

W jaki sposób można uczynić krok niepowodzeniem, bez dwukrotnego otrzymywania komunikatu dziennika.

Dzięki za pomoc.

Pozdrawiam, Sorabh

Odpowiedz

2

Prawdopodobnie powodem wiadomość dynamo wstawiono dwa razy był błąd, ponieważ warunek został trafiony i przetwarzane przez dwóch różnych wykonawców. Spark dzieli dzieło, które należy wykonać między robotników, a ci pracownicy nie dzielą się wiedzą.

Nie jestem pewien, co napędza twój wymóg, aby Spark przestał działać FAIL, ale sugerowałbym zamiast tego śledzenie tego przypadku awarii w twoim kodzie aplikacji zamiast próbować mieć iskrę bezpośrednio. Innymi słowy, napisz kod, który wykrywa błąd i przekazuje go do sterownika iskrzenia, a następnie wykonaj odpowiednie czynności.

Jednym ze sposobów na to byłoby użycie akumulatora do policzenia błędów występujących podczas przetwarzania danych. Byłoby to wyglądać mniej więcej tak (jestem zakładając Scala i DataFrames, ale można dostosować do RDD i/lub Python jest to konieczne):

val accum = sc.longAccumulator("Error Counter") 
def doProcessing(a: String, b: String): String = { 
    if(condition) { 
    accum.add(1) 
    null 
    } 
    else { 
    doComputation(a, b) 
    } 
} 
val doProcessingUdf = udf(doProcessing _) 

df = df.withColumn("result", doProcessing($"a", $"b")) 

df.write.format(..).save(..) // Accumulator value not computed until an action occurs! 

if(accum.value > 0) { 
    // An error detected during computation! Do whatever needs to be done. 
    <insert dynamo message here> 
} 

Jeden Zaletą tego podejścia jest to, jeśli szukasz informacji zwrotnej w interfejsie Spark będziesz mógł zobaczyć wartości akumulatorów podczas pracy. Dla odniesienia, tutaj jest dokumentacja na akumulatorach: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators