6

Mam robotę iskrową, gdy robię zewnętrzne sprzężenie między dwiema ramkami danych. Rozmiar pierwszej ramki danych wynosi 260 GB, format pliku to pliki tekstowe podzielone na 2200 plików, a rozmiar drugiej ramki danych to 2 GB.Jak dostroić iskrownik do EMR, aby szybko pisać duże ilości danych na S3

Ładowanie tych dwóch plików do samej ramki danych zajmuje 10 minut.

Następnie zapisanie danych ramki wyjściowej, która wynosi około 260 GB na S3 trwa około 1 godziny.

Oto moje informacje o klastrze.

emr-5.9.0 
Master:1m3.2xlarge 
Core:c3.4large 5 machines 

o to szczegóły każdej maszyny c3.4xlarge

CPU:16 
RAM:30 
DISK:2 × 160 GB SSD 

Zeppelin 0.7.2, Spark 2.2.0, Ganglia 3.7.2 

To jest mój config klaster, że jestem ustawienie

[ 
    { 
     "Classification": "spark-defaults" 
     , "Properties": { 
      "spark.serializer": "org.apache.spark.serializer.KryoSerializer" 
     } 
    },{ 
    "Classification": "emrfs-site", 
    "Properties": { 
     "fs.s3.maxConnections": "200" 
    } 
    } 
] 

Oto mój kod

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    import org.apache.spark.{ SparkConf, SparkContext } 
    import java.sql.{Date, Timestamp} 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.sql.functions.udf 
    import java.io.File 
    import org.apache.hadoop.fs._ 



import org.apache.spark.sql.functions.input_file_name 
import org.apache.spark.sql.functions.regexp_extract 


val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3)) 
val get_cus_valYear = spark.udf.register("get_cus_valYear", (filePath: String) => filePath.split("\\.")(4)) 


val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load(("s3://trfsdisu/SPARK/FundamentalAnalytic/MAIN")) 

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) 
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) 

val df1resultFinal=df1result.withColumn("DataPartition", get_cus_val(input_file_name)) 
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_valYear(input_file_name)) 


val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FundamentalAnalytic/INCR") 
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) 
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) 

import org.apache.spark.sql.expressions._ 
val windowSpec = Window.partitionBy("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") 


val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId"), "outer") 
     .select($"FundamentalSeriesId", 
     when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"), 
     when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear".cast(DataTypes.StringType)).as("PartitionYear"), 
     when($"FundamentalSeriesId_objectTypeId_1".isNotNull, $"FundamentalSeriesId_objectTypeId_1").otherwise($"FundamentalSeriesId_objectTypeId".cast(DataTypes.StringType)).as("FundamentalSeriesId_objectTypeId"), 
     when($"analyticItemInstanceKey_1".isNotNull, $"analyticItemInstanceKey_1").otherwise($"analyticItemInstanceKey").as("analyticItemInstanceKey"), 
     when($"AnalyticValue_1".isNotNull, $"AnalyticValue_1").otherwise($"AnalyticValue").as("AnalyticValue"), 
     when($"AnalyticConceptCode_1".isNotNull, $"AnalyticConceptCode_1").otherwise($"AnalyticConceptCode").as("AnalyticConceptCode"), 
     $"AuditID_1").otherwise($"AuditID").as("AuditID"), 
     when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), 
     when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction")) 
     .filter(!$"FFAction".contains("D")) 



    val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated")) 

    val dfMainOutputWithoutFinalYear = dfMainOutputFinal.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutputFinal.schema.fieldNames.filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated")) 

    val headerColumn = df.columns.filter(v => (!v.contains("^") && !v.contains("_c42"))).toSeq 

    val header = headerColumn.dropRight(1).mkString("", "|^|", "|!|") 

    val dfMainOutputFinalWithoutNull = dfMainOutputWithoutFinalYear.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header) 
    dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","PartitionYear") 
     .format("csv") 
     .option("nullValue", "") 
     .option("header", "true") 
     .option("codec", "gzip") 
     .save("s3://trfsdisu/SPARK/FundamentalAnalytic/output") 

I Poza tym próbowałem najpierw zapisywać dane w HDFS, ale tam też zajmuje to samo (4 minuty mniej niż S3) zapisanie w katalogu HDFS.

Dodawanie SQL planu Physical

enter image description here

Wszystko Stage DAG enter image description here

Dodawanie DAG

enter image description here

pamięć używana ostatnia godzina mojej pracy

enter image description here

Oto niektóre dzienniki pracy

I looking into the job execution on the below clusters .Here were some on my observations : 

Majority time is being consumed at RDD getting spilling in the disk . 

######### 

17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 91 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:33 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (1 time so far) 

######### 

This is causing spilling of 700MB memory pages constantly on the disk and then reading it back before shuffle phase . The same is see in all the containers ran for the job . The reason why lot of spilling is happening is because the executor are launched in a container with size : 

######################### 
    17/10/17 15:20:18 INFO YarnAllocator: Will request 1 executor container(s), each with 4 core(s) and 5632 MB memory (including 512 MB of overhead) 
######################### 

Which means each containers are on 5GB and hence they are getting full very quickly .and because of memory pressure they are getting spilled . 

You will notice the same in the nodemanager Logs : 

2017-10-17 15:58:21,590 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 7759 for container-id container_1508253273035_0001_01_000003: 5.0 GB of 5.5 GB physical memory used; 8.6 GB of 27.5 GB virtual memory used 
+0

Pokaż plany logiczne z interfejsu WWW. Zrób zrzuty ekranu, aby dać mi/ludziom wyobrażenie o tym, co robi kod. Dzięki. –

+0

Pytałem o fizyczne plany dotyczące zapytań z interfejsu internetowego Sparka. To powinno pomóc mi dowiedzieć się, co dokładnie robią twoje zapytania. –

+0

@JacekLaskowski No dobrze .. Czy mogę zobaczyć te w Gangelii? – SUDARSHAN

Odpowiedz

1

używasz instancji EC2 pięć c3.4large, który ma 30 GB pamięci RAM każdego. A więc to tylko 150 GB, czyli znacznie mniej niż twoja ramka> 200 GB do dołączenia. Stąd wiele rozlania dysku. Być może zamiast tego możesz uruchomić instancje r2 typu EC2 (zoptymalizowana pamięć przeciwstawiona typowi c, która jest zoptymalizowana pod kątem obliczeń) i sprawdzić, czy nastąpiła poprawa wydajności.

+0

Nawet jeśli używam r4.4xlarge, to również zajmuje to prawie tyle samo czasu ...Używany rdzeń V jest nadal 1 – SUDARSHAN

+1

@UDUDSZANAN dodaj tę własność do konfiguracji klastra '[{" klasyfikacja ":" iskra "," właściwości ": {" maximResourceAllocation ":" true "}}] – Will