6

W jaki sposób mogę zebrać te dane na konsoli (Spark Shell lub Spark submit job) zaraz po wykonaniu zadania lub zadania.Jak pobierać dane takie jak rozmiar wyjściowy i zapisy napisane w interfejsie użytkownika Spark?

Używamy Sparka do ładowania danych z Mysql do Cassandry i jest dość duży (np .: ~ 200 GB i 600M wierszy). Kiedy zadanie zostanie wykonane, chcemy sprawdzić, ile dokładnie wierszy przebiegło? Możemy uzyskać numer z Spark UI, ale jak możemy pobrać ten numer ("Zapisane dane wyjściowe") z iskrzaka lub z pracy z iskrami.

Przykładowe polecenie do załadowania z MySQL do Cassandry.

val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load() 

pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map("table" -> "payment_types", "keyspace" -> "test")) 

Chcę pobrać wszystkie dane Spark UI w powyższym zadaniu głównie Rozmiar wyjściowy i zapisy w formie pisemnej.

Proszę o pomoc.

Dziękujemy za poświęcony czas!

Odpowiedz

3

Znaleziono odpowiedź. Możesz uzyskać statystyki za pomocą SparkListener.

Jeśli Twoje zadanie nie ma danych wejściowych ani wyjściowych, możesz uzyskać wyjątki None.get, które możesz bezpiecznie zignorować, podając, czy stmt.

sc.addSparkListener(new SparkListener() { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    val metrics = taskEnd.taskMetrics 
    if(metrics.inputMetrics != None){ 
     inputRecords += metrics.inputMetrics.get.recordsRead} 
    if(metrics.outputMetrics != None){ 
     outputWritten += metrics.outputMetrics.get.recordsWritten } 
    } 
}) 

Proszę znaleźć poniższy przykład.

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import com.datastax.spark.connector._ 
import org.apache.spark.sql._ 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 

val conf = new SparkConf() 
.set("spark.cassandra.connection.host", "...") 
.set("spark.driver.allowMultipleContexts","true") 
.set("spark.master","spark://....:7077") 
.set("spark.driver.memory","1g") 
.set("spark.executor.memory","10g") 
.set("spark.shuffle.spill","true") 
.set("spark.shuffle.memoryFraction","0.2") 
.setAppName("CassandraTest") 
sc.stop 
val sc = new SparkContext(conf) 
val sqlcontext = new org.apache.spark.sql.SQLContext(sc) 

var outputWritten = 0L 

sc.addSparkListener(new SparkListener() { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    val metrics = taskEnd.taskMetrics 
    if(metrics.inputMetrics != None){ 
     inputRecords += metrics.inputMetrics.get.recordsRead} 
    if(metrics.outputMetrics != None){ 
     outputWritten += metrics.outputMetrics.get.recordsWritten } 
    } 
}) 

val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load() 
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map("table" -> "bucks_payments", "keyspace" -> "test")) 

println("outputWritten",outputWritten) 

Wynik:

scala> println("outputWritten",outputWritten) 
(outputWritten,16383)