2016-01-11 16 views
15

Używam iskrę 1.6 i uruchomić w kwestii powyższej kiedy uruchomić poniższy kod:Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id jest już ustawiony

// Imports 
import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.SaveMode 
import scala.concurrent.ExecutionContext.Implicits.global 
import java.util.Properties 
import scala.concurrent.Future 

// Set up spark on local with 2 threads 
val conf = new SparkConf().setMaster("local[2]").setAppName("app") 
val sc = new SparkContext(conf) 
val sqlCtx = new HiveContext(sc) 

// Create fake dataframe 
import sqlCtx.implicits._ 
var df = sc.parallelize(1 to 50000).map { i => (i, i, i, i, i, i, i) }.toDF("a", "b", "c", "d", "e", "f", "g").repartition(2) 
// Write it as a parquet file 
df.write.parquet("/tmp/parquet1") 
df = sqlCtx.read.parquet("/tmp/parquet1") 

// JDBC connection 
val url = s"jdbc:postgresql://localhost:5432/tempdb" 
val prop = new Properties() 
prop.setProperty("user", "admin") 
prop.setProperty("password", "") 

// 4 futures - at least one of them has been consistently failing for 
val x1 = Future { df.write.jdbc(url, "temp1", prop) } 
val x2 = Future { df.write.jdbc(url, "temp2", prop) } 
val x3 = Future { df.write.jdbc(url, "temp3", prop) } 
val x4 = Future { df.write.jdbc(url, "temp4", prop) } 

Oto github sedno: https://gist.github.com/karanveerm/27d852bf311e39f05491

błąd pojawia się: na

org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1482) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:247) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:306) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at writer.SQLWriter$.writeDf(Writer.scala:75) ~[temple.temple-1.0-sans-externalized.jar:na] 
     at writer.Writer$.writeDf(Writer.scala:33) ~[temple.temple-1.0-sans-externalized.jar:na] 
     at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:460) ~[temple.temple-1.0-sans-externalized.jar:2.4.6] 
     at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:452) ~[temple.temple-1.0-sans-externalized.jar:2.4.6] 
     at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) ~[org.scala-lang.scala-library-2.11.7.jar:na] 

jest to błąd iskra lub robię coś źle/żadnego obejścia?

+0

czy mogę zapytać, na jakiej maszynie uruchomiłeś ten kod? Szczególnie interesuje mnie procesor (ile rdzeni)? –

+0

OSX El Capitan 10.11.1 | MacBook Air (13-calowy, początek 2014 r.) | 1,7 GHz Intel Core i7 | 8 GB 1600 MHz DDR3 (wierzę, że i7 to 4 rdzenie) – sparknoob

+0

ciekawe, nie mogę tego odtworzyć na podobnej konfiguracji (z iskry). To może być jakiś paskudny błąd, wcześniej mieli problemy z generowaniem identyfikatorów. Możesz stworzyć JIRA dla tego. –

Odpowiedz

0

Test 1: Czy to pomaga, jeśli uruchamiasz każdą operację df.write w sposób szeregowy zamiast równoległego w przyszłości?

Test 2: Czy pomaga, jeśli utrzymujesz ramkę danych, a następnie wykonujesz wszystkie operacje df.write równolegle i seralizuje się do unpersist, gdy wszystkie są zakończone, aby sprawdzić, czy to pomaga?

1

Po wypróbowaniu kilku rzeczy, stwierdziłem, że jeden z wątków utworzonych przez globalny numer ForkJoinPool otrzymuje właściwość spark.sql.execution.id ustawioną na losową wartość. Nie mogłem zidentyfikować procesu, który faktycznie to zrobił, ale mogłem obejść go, używając mojego własnego ExecutionContext.

import java.util.concurrent.Executors 
import concurrent.ExecutionContext 
val executorService = Executors.newFixedThreadPool(4) 
implicit val ec = ExecutionContext.fromExecutorService(executorService) 

Użyłem kodu z http://danielwestheide.com/blog/2013/01/16/the-neophytes-guide-to-scala-part-9-promises-and-futures-in-practice.html. Być może ForkJoinPool klonuje wątki podczas tworzenia nowych atrybutów, a jeśli dzieje się to podczas kontekstu wykonywania SQL, otrzyma wartość niepustą, podczas gdy FixedThreadPool utworzy wątki podczas tworzenia.

+0

Znalazłem ten sam problem. Ale to rozwiązanie nie wydaje się pomóc. Nadal widzę błąd 'spark.sql.execution.id already set'. – mottosan

+0

@Knshiro, nie powinno być Executors.newFixedThreadPool (1)? – smas

+0

@smas problem tkwi nie w liczbie wątków, ale w inicjalizacji tych wątków. Pula dołączania widełek inicjuje wątki losowo i inicjuje nowe wątki, które klonuje wszystkie atrybuty. Jeśli więc w momencie inicjowania nowego wątku istniejący wątek ma ustawiony identyfikator wykonania SQL, skopiuje go do nowego, zamiast pozwolić na wygenerowanie nowego. – Knshiro

1

Proszę sprawdzić SPARK-13747

rozważyć użycie Spark w wersji 2.2.0 lub wyższej jeśli ma zastosowanie w danym środowisku.