Mam robotę iskrową, gdy robię zewnętrzne sprzężenie między dwiema ramkami danych. Rozmiar pierwszej ramki danych wynosi 260 GB, format pliku to pliki tekstowe podzielone na 2200 plików, a rozmiar drugiej ramki danych to 2 GB.Jak dostroić iskrownik do EMR, aby szybko pisać duże ilości danych na S3
Ładowanie tych dwóch plików do samej ramki danych zajmuje 10 minut.
Następnie zapisanie danych ramki wyjściowej, która wynosi około 260 GB na S3 trwa około 1 godziny.
Oto moje informacje o klastrze.
emr-5.9.0
Master:1m3.2xlarge
Core:c3.4large 5 machines
o to szczegóły każdej maszyny c3.4xlarge
CPU:16
RAM:30
DISK:2 × 160 GB SSD
Zeppelin 0.7.2, Spark 2.2.0, Ganglia 3.7.2
To jest mój config klaster, że jestem ustawienie
[
{
"Classification": "spark-defaults"
, "Properties": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
}
},{
"Classification": "emrfs-site",
"Properties": {
"fs.s3.maxConnections": "200"
}
}
]
Oto mój kod
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import java.io.File
import org.apache.hadoop.fs._
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val get_cus_valYear = spark.udf.register("get_cus_valYear", (filePath: String) => filePath.split("\\.")(4))
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load(("s3://trfsdisu/SPARK/FundamentalAnalytic/MAIN"))
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1resultFinal=df1result.withColumn("DataPartition", get_cus_val(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_valYear(input_file_name))
val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FundamentalAnalytic/INCR")
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*)
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*)
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId"), "outer")
.select($"FundamentalSeriesId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"),
when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear".cast(DataTypes.StringType)).as("PartitionYear"),
when($"FundamentalSeriesId_objectTypeId_1".isNotNull, $"FundamentalSeriesId_objectTypeId_1").otherwise($"FundamentalSeriesId_objectTypeId".cast(DataTypes.StringType)).as("FundamentalSeriesId_objectTypeId"),
when($"analyticItemInstanceKey_1".isNotNull, $"analyticItemInstanceKey_1").otherwise($"analyticItemInstanceKey").as("analyticItemInstanceKey"),
when($"AnalyticValue_1".isNotNull, $"AnalyticValue_1").otherwise($"AnalyticValue").as("AnalyticValue"),
when($"AnalyticConceptCode_1".isNotNull, $"AnalyticConceptCode_1").otherwise($"AnalyticConceptCode").as("AnalyticConceptCode"),
$"AuditID_1").otherwise($"AuditID").as("AuditID"),
when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
.filter(!$"FFAction".contains("D"))
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val dfMainOutputWithoutFinalYear = dfMainOutputFinal.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutputFinal.schema.fieldNames.filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated"))
val headerColumn = df.columns.filter(v => (!v.contains("^") && !v.contains("_c42"))).toSeq
val header = headerColumn.dropRight(1).mkString("", "|^|", "|!|")
val dfMainOutputFinalWithoutNull = dfMainOutputWithoutFinalYear.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header)
dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","PartitionYear")
.format("csv")
.option("nullValue", "")
.option("header", "true")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FundamentalAnalytic/output")
I Poza tym próbowałem najpierw zapisywać dane w HDFS, ale tam też zajmuje to samo (4 minuty mniej niż S3) zapisanie w katalogu HDFS.
Dodawanie SQL planu Physical
Dodawanie DAG
pamięć używana ostatnia godzina mojej pracy
Oto niektóre dzienniki pracy
I looking into the job execution on the below clusters .Here were some on my observations :
Majority time is being consumed at RDD getting spilling in the disk .
#########
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 91 spilling sort data of 704.0 MB to disk (0 time so far)
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (1 time so far)
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (1 time so far)
17/10/17 15:42:33 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (1 time so far)
#########
This is causing spilling of 700MB memory pages constantly on the disk and then reading it back before shuffle phase . The same is see in all the containers ran for the job . The reason why lot of spilling is happening is because the executor are launched in a container with size :
#########################
17/10/17 15:20:18 INFO YarnAllocator: Will request 1 executor container(s), each with 4 core(s) and 5632 MB memory (including 512 MB of overhead)
#########################
Which means each containers are on 5GB and hence they are getting full very quickly .and because of memory pressure they are getting spilled .
You will notice the same in the nodemanager Logs :
2017-10-17 15:58:21,590 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 7759 for container-id container_1508253273035_0001_01_000003: 5.0 GB of 5.5 GB physical memory used; 8.6 GB of 27.5 GB virtual memory used
Pokaż plany logiczne z interfejsu WWW. Zrób zrzuty ekranu, aby dać mi/ludziom wyobrażenie o tym, co robi kod. Dzięki. –
Pytałem o fizyczne plany dotyczące zapytań z interfejsu internetowego Sparka. To powinno pomóc mi dowiedzieć się, co dokładnie robią twoje zapytania. –
@JacekLaskowski No dobrze .. Czy mogę zobaczyć te w Gangelii? – SUDARSHAN