2015-02-04 9 views
28

Jak mogę zapytać RDD o typy złożone, takie jak mapy/tablice? na przykład, kiedy pisałem ten kod testowy:Zapytanie Spark SQL DataFrame z typami złożonymi

case class Test(name: String, map: Map[String, String]) 
val map = Map("hello" -> "world", "hey" -> "there") 
val map2 = Map("hello" -> "people", "hey" -> "you") 
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2))) 

I choć składnia byłoby coś takiego:

sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world") 

lub

sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world") 

ale mam

Can't access nested field in type MapType(StringType,StringType,true)

a nd

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes

odpowiednio.

+1

Co powiesz na przyjęcie * tomu * odpowiedzi z @ zero323? – javadba

Odpowiedz

79

To zależy od typu kolumny.Zacznijmy od jakiegoś manekina danych:

import org.apache.spark.sql.functions.{udf, lit} 
import scala.util.Try 

case class SubRecord(x: Int) 
case class ArrayElement(foo: String, bar: Int, vals: Array[Double]) 
case class Record(
    an_array: Array[Int], a_map: Map[String, String], 
    a_struct: SubRecord, an_array_of_structs: Array[ArrayElement]) 


val df = sc.parallelize(Seq(
    Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1), 
     Array(
      ArrayElement("foo", 1, Array(1.0, 2.0)), 
      ArrayElement("bar", 2, Array(3.0, 4.0)))), 
    Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2), 
     Array(ArrayElement("foz", 3, Array(5.0, 6.0)), 
       ArrayElement("baz", 4, Array(7.0, 8.0)))) 
)).toDF 
df.registerTempTable("df") 
df.printSchema 

// root 
// |-- an_array: array (nullable = true) 
// | |-- element: integer (containsNull = false) 
// |-- a_map: map (nullable = true) 
// | |-- key: string 
// | |-- value: string (valueContainsNull = true) 
// |-- a_struct: struct (nullable = true) 
// | |-- x: integer (nullable = false) 
// |-- an_array_of_structs: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- foo: string (nullable = true) 
// | | |-- bar: integer (nullable = false) 
// | | |-- vals: array (nullable = true) 
// | | | |-- element: double (containsNull = false) 
  • kolumny tablicy:

    • Column.getItem metoda

      df.select($"an_array".getItem(1)).show 
      
      // +-----------+ 
      // |an_array[1]| 
      // +-----------+ 
      // |   2| 
      // |   5| 
      // +-----------+ 
      
    • Hive brac kets składnia:

      sqlContext.sql("SELECT an_array[1] FROM df").show 
      
      // +---+ 
      // |_c0| 
      // +---+ 
      // | 2| 
      // | 5| 
      // +---+ 
      
    • UDF

      val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption) 
      
      df.select(get_ith($"an_array", lit(1))).show 
      
      // +---------------+ 
      // |UDF(an_array,1)| 
      // +---------------+ 
      // |    2| 
      // |    5| 
      // +---------------+ 
      
  • map kolumny

    • Column.getField wykorzystujące metody:

      df.select($"a_map".getField("foo")).show 
      
      // +----------+ 
      // |a_map[foo]| 
      // +----------+ 
      // |  bar| 
      // |  null| 
      // +----------+ 
      
    • użyciu Hive wsporniki składnia:

      sqlContext.sql("SELECT a_map['foz'] FROM df").show 
      
      // +----+ 
      // | _c0| 
      // +----+ 
      // |null| 
      // | baz| 
      // +----+ 
      
    • stosując pełną ścieżkę z kropką składnię:

      df.select($"a_map.foo").show 
      
      // +----+ 
      // | foo| 
      // +----+ 
      // | bar| 
      // |null| 
      // +----+ 
      
    • używając UDF

      val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k)) 
      
      df.select(get_field($"a_map", lit("foo"))).show 
      
      // +--------------+ 
      // |UDF(a_map,foo)| 
      // +--------------+ 
      // |   bar| 
      // |   null| 
      // +--------------+ 
      
  • struct kolumny wykorzystujące pełne ścieżka ze składnią kropkową:

    • z DataFrame API

      df.select($"a_struct.x").show 
      
      // +---+ 
      // | x| 
      // +---+ 
      // | 1| 
      // | 2| 
      // +---+ 
      
    • z surowego SQL

      sqlContext.sql("SELECT a_struct.x FROM df").show 
      
      // +---+ 
      // | x| 
      // +---+ 
      // | 1| 
      // | 2| 
      // +---+ 
      
  • pola wewnątrz tablicy structs można uzyskać za pomocą kropki-składni, nazwy i standardowych Column metody :

    df.select($"an_array_of_structs.foo").show 
    
    // +----------+ 
    // |  foo| 
    // +----------+ 
    // |[foo, bar]| 
    // |[foz, baz]| 
    // +----------+ 
    
    sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show 
    
    // +---+ 
    // |_c0| 
    // +---+ 
    // |foo| 
    // |foz| 
    // +---+ 
    
    df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show 
    
    // +------------------------------+ 
    // |an_array_of_structs.vals[1][1]| 
    // +------------------------------+ 
    // |       4.0| 
    // |       8.0| 
    // +------------------------------+ 
    
  • Pola zdefiniowane przez użytkownika (UDT) mogą być dostępne za pomocą funkcji UDF. Aby uzyskać szczegółowe informacje, patrz SparkSQL referencing attributes of UDT.

Uwagi:

  • w zależności od wersji Spark niektóre z tych metod mogą być dostępne tylko z HiveContext. UDF powinny działać niezależnie od wersji zarówno w standardzie SQLContext, jak i HiveContext.
  • ogólnie rzecz biorąc zagnieżdżone wartości są obywatelami drugiej kategorii. Nie wszystkie typowe operacje są obsługiwane w zagnieżdżonych polach. W zależności od kontekstu może być lepiej spłaszczyć schematu i/lub wybuch zbiory

    df.select(explode($"an_array_of_structs")).show 
    
    // +--------------------+ 
    // |     col| 
    // +--------------------+ 
    // |[foo,1,WrappedArr...| 
    // |[bar,2,WrappedArr...| 
    // |[foz,3,WrappedArr...| 
    // |[baz,4,WrappedArr...| 
    // +--------------------+ 
    
  • składnia Dot można łączyć z symbolu wieloznacznego (*), aby wybrać (ewentualnie wielokrotne) pól bez określania nazwy wyraźnie:

    df.select($"a_struct.*").show 
    // +---+ 
    // | x| 
    // +---+ 
    // | 1| 
    // | 2| 
    // +---+ 
    
+5

czy możesz podać jakieś szczegóły? lol –

+0

Czy można pobrać wszystkie elementy w tablicy struct? Czy coś takiego jest możliwe .. sqlContext.sql ("SELECT an_array_of_structs [0] .foo FROM df"). Show – user1384205

+1

To powinna być zaakceptowana odpowiedź. –

2

Po przekonwertować go do DF, u można po prostu pobrać dane jako

val rddRow= rdd.map(kv=>{ 
    val k = kv._1 
    val v = kv._2 
    Row(k, v) 
    }) 

val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true) 
val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true) 
val arr = Array(myFld1, myFld2) 
val schema = StructType(arr) 
val rowrddDF = sqc.createDataFrame(rddRow, schema) 
rowrddDF.registerTempTable("rowtbl") 
val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one")) 
or 
val rowrddDFFinal = rowrddDF.select("map.one") 
+0

kiedy próbuję tego, otrzymuję komunikat 'error: value _1 is not a member of org.apache.spark.sql.Row' – Paul

0

tutaj było to, co zrobiłem i zadziałało

case class Test(name: String, m: Map[String, String]) 
val map = Map("hello" -> "world", "hey" -> "there") 
val map2 = Map("hello" -> "people", "hey" -> "you") 
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2))) 
val rdddf = rdd.toDF 
rdddf.registerTempTable("mytable") 
sqlContext.sql("select m.hello from mytable").show 

Wyniki

+------+ 
| hello| 
+------+ 
| world| 
|people| 
+------+