2016-01-20 8 views
13

Próbuję wziąć mój danych wejściowych:Spark: Dodaj kolumnę dataframe warunkowo

A B  C 
-------------- 
4 blah 2 
2   3 
56 foo  3 

i dodać kolumnę do końca na podstawie tego, czy B jest pusta lub nie:

A B  C  D 
-------------------- 
4 blah 2  1 
2   3  0 
56 foo  3  1 

I można to łatwo zrobić, rejestrując wejściową ramkę danych jako tabelę tymczasową, a następnie wpisując zapytanie SQL.

Ale naprawdę chciałbym wiedzieć, jak to zrobić za pomocą tylko metod Scala i bez konieczności wypisywania zapytania SQL w Scali.

Próbowałem już .withColumn, ale nie mogę zrobić tego, co chcę.

Odpowiedz

47

Spróbuj withColumn z funkcją when następująco:

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._ // for `toDF` and $"" 
import org.apache.spark.sql.functions._ // for `when` 

val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5))) 
    .toDF("A", "B", "C") 

val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1)) 

newDf.show() przedstawia

+---+----+---+---+ 
| A| B| C| D| 
+---+----+---+---+ 
| 4|blah| 2| 1| 
| 2| | 3| 0| 
| 56| foo| 3| 1| 
|100|null| 5| 0| 
+---+----+---+---+ 

I dodaje (100, null, 5) wiersz testowania przypadku isNull.

Próbowałem tego kodu z Spark 1.6.0, ale jako komentarz w kodzie when, działa na wersjach po 1.4.0.

+0

To jest dokładnie to, czego szukałem. Próbowałem kilka różnych rzeczy z 'when' i' otherwise', ale myślę, że otrzymałem dokładny format źle. Nieco pomijam temat, ale czy wiesz, jak Spark radzi sobie z Column? Na przykład, jeśli dodaję ~ 20 kolumn, byłoby to szybciej zrobić 20 .withColumn i zachować je jako ramkę danych lub zmapować je do RDD i po prostu dodać je wszystkie na mapie, a następnie przekonwertować z powrotem na ramkę danych, aby zapisać do parkietu ? – mcmcmc

+1

Właśnie znalazłem [to] (http://stackoverflow.com/questions/33826495/spark-scala-2-10-tuple-limit). Uważam, że UDF są tym, czego szukam. – mcmcmc

+0

UDF jest o czym mówiłem poniżej ... –

0

Co powiesz na coś takiego?

val newDF = df.filter($"B" === "").take(1) match { 
    case Array() => df 
    case _ => df.withColumn("D", $"B" === "") 
} 

Korzystanie take(1) powinny mieć minimalny hit

2

My źle, przeoczył jedną część pytania.

Najlepszym, najczystszym sposobem jest użycie UDF. Wyjaśnienie w ramach kodu.

// create some example data...BY DataFrame 
// note, third record has an empty string 
case class Stuff(a:String,b:Int) 
val d= sc.parallelize(Seq(("a",1),("b",2), 
    ("",3) ,("d",4)).map { x => Stuff(x._1,x._2) }).toDF 

// now the good stuff. 
import org.apache.spark.sql.functions.udf 
// function that returns 0 is string empty 
val func = udf((s:String) => if(s.isEmpty) 0 else 1) 
// create new dataframe with added column named "notempty" 
val r = d.select($"a", $"b", func($"a").as("notempty")) 

    scala> r.show 
+---+---+--------+ 
| a| b|notempty| 
+---+---+--------+ 
| a| 1| 1111| 
| b| 2| 1111| 
| | 3|  0| 
| d| 4| 1111| 
+---+---+--------+ 
+0

Tutaj jest tylko jedna ramka danych. Możesz ponownie przeczytać pytanie: –