2016-07-17 7 views
5

Próbuję użyć SQL na ramce danych iskry. Ale ramka danych ma 1 wartość ma ciąg (co jest JSON jak struktury):Jak wyszukiwać w ramce danych, gdzie 1 pole StringType ma wartość json w Spark SQL

Uratowałem moich danych ramki do tabeli temp: TestTable

Kiedy zrobiłem desc:

col_name      data_type 
requestId      string 
name       string 
features      string 

ale dysponuje wartości jest json:

{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}} 

chcę tylko zapytać na TestTable gdzie totalSpent> 10. Czy niektóre mi powiedzieć jak to zrobić?

Mój plik JSON wygląda następująco:

{ 
     "requestId": 232323, 
     "name": "ravi", 
     "features": "{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}" 
    } 

funkcji jest ciągiem. Potrzebuję tylko TotalSpent w tym. Próbowałem z:

val features = StructType( 
Array(StructField("totalSpent",LongType,true), 
StructField("movies",LongType,true) 
)) 

val schema = StructType(Array( 
StructField("requestId",StringType,true), 
StructField("name",StringType,true), 
StructField("features",features,true), 
) 
) 

val records = sqlContext.read.schema(schema).json(filePath) 

Ponieważ każde żądanie ma jeden ciąg znaków JSON. Ale to daje mi błąd.

Kiedy próbowałem z

val records = sqlContext.jsonFile(filePath) 

records.printSchema 

pokazuje mi:

root 
|-- requestId: string (nullable = true) 
|-- features: string (nullable = true) 
|-- name: string (nullable = true) 

mogę wykorzystać parallelize wewnątrz StructField podczas tworzenia schematu? Próbowałem z:

I first tried with : 

val customer = StructField("features",StringType,true) 
val events = sc.parallelize(customer :: Nil) 


val schema = StructType(Array( 
    StructField("requestId",StringType,true), 
    StructField("name", StructType(events, true),true), 
    StructField("features",features,true), 
    ) 
    ) 

Daje mi to również błąd. Próbowałem też:

import net.liftweb.json.parse 

case class KV(k: String, v: Int) 

val parseJson = udf((s: String) => { 
    implicit val formats = net.liftweb.json.DefaultFormats 
    parse(s).extract[KV] 
}) 

val parsed = records.withColumn("parsedJSON", parseJson($"features")) 
parsed.show 

This gives me : 
<console>:78: error: object liftweb is not a member of package net 
     import net.liftweb.json.parse 

Tried:

Próbowałem z:

val parseJson = udf((s: String) => { 
    sqlContext.read.json(s) 
}) 

val parsed = records.withColumn("parsedJSON", parseJson($"features")) 
parsed.show 

ale znowu błędu.

Tried:

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 

val parseJson = udf((s: String) => { 
parse(s) 
}) 

val parsed = records.withColumn("parsedJSON", parseJson($"features")) 
parsed.show 

ale daje mi:

java.lang.UnsupportedOperationException: Schema for type org.json4s.JValue is not supported 
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153) 
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) 
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64) 
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) 

To daje mi właściwego schematu (na podstawie odpowiedzi udzielonej przez zero323:

val extractFeatures = udf((features: String) => Try { 
implicit val formats = DefaultFormats 
    parse(features).extract[Features] 
}.toOption) 

val parsed = records.withColumn("features", extractFeatures($"features")) 

parsed.printSchema

Ale kiedy zapytanie:

val value = parsed.filter($"requestId" === "232323").select($"features.totalSpent") 

value.show gives null.

+0

@ zero323: Cześć, ja poszedłem za pośrednictwem innej kwestii u odpowiedział, ale ja nie całkiem to rozumiem. Czy możesz mi wyjaśnić? Jestem bardzo nowy, by iskrzyć. – Swetha

+0

Która część jest niewyraźna? Jest to długa odpowiedź pokazująca różne metody w zależności od wymagań i wersji. Czy rozważasz jakieś konkretne rozwiązanie? – zero323

+0

@ zero323 Próbowałem z użyciem paralise na moim schemacie, ponieważ używam schematu, aby uzyskać inne wartości, więc wolę używać go w schemacie, ale daje błąd. Jestem skłonny do UDF do parsowania JSON, gdzie wspomniałeś o klasie przypadku KV (k: String, v: Int) Czy to ze względu na sposób sformatowania jego json? – Swetha

Odpowiedz

2

Podczas zwracania danych z UDF musi być reprezentowany jako typy SQL, a JSON AST nie.Jednym ze sposobów jest utworzenie klasy przypadek podobny do tego:

case class Features(
    places: Integer, 
    movies: Integer, 
    totalPlacesVisited: Integer, 
    totalSpent: Integer, 
    SpentMap: Map[String, Integer], 
    benefits: Map[String, Integer] 
) 

i użyć go do extract obiektów:

val df = Seq((
    232323, "ravi", 
    """{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}""" 
)).toDF("requestId", "name", "features") 

val extractFeatures = udf((features: String) => 
    parse(features).extract[Features]) 

val parsed = df.withColumn("features", extractFeatures($"features")) 
parsed.show(false) 

// +---------+----+-----------------------------------------------------------------+ 
// |requestId|name|features               | 
// +---------+----+-----------------------------------------------------------------+ 
// |232323 |ravi|[11,2,0,13,Map(Movie -> 2, Park Visit -> 11),Map(freeTime -> 13)]| 
// +---------+----+-----------------------------------------------------------------+ 

parsed.printSchema 

// root 
// |-- requestId: integer (nullable = false) 
// |-- name: string (nullable = true) 
// |-- features: struct (nullable = true) 
// | |-- places: integer (nullable = true) 
// | |-- movies: integer (nullable = true) 
// | |-- totalPlacesVisited: integer (nullable = true) 
// | |-- totalSpent: integer (nullable = true) 
// | |-- SpentMap: map (nullable = true) 
// | | |-- key: string 
// | | |-- value: integer (valueContainsNull = true) 
// | |-- benefits: map (nullable = true) 
// | | |-- key: string 
// | | |-- value: integer (valueContainsNull = true) 

zależności od innych zapisów i oczekiwane wykorzystanie należy dostosować reprezentacji i dodać odpowiedni błąd logika obsługi.

Można również użyć DSL dostęp do poszczególnych pól jako ciągi:

val getMovieSpent = udf((s: String) => 
    compact(render(parse(s) \\ "SpentMap" \\ "Movie"))) 

df.withColumn("movie_spent", getMovieSpent($"features").cast("bigint")).show 
// +---------+----+--------------------+-----------+ 
// |requestId|name|   features|movie_spent| 
// +---------+----+--------------------+-----------+ 
// | 232323|ravi|{"places":11,"mov...|   2| 
// +---------+----+--------------------+-----------+ 

Dla alternatywne podejścia zobaczyć How to query JSON data column using Spark DataFrames?

+0

Tak. Mój zły :) Dziękuję za szczegółowe wyjaśnienie. – Swetha

+0

Działa dobrze. Ale próbuję parse.show To daje mi: java.lang.NullPointerExceptio – Swetha

+0

Tak jak powiedziałem wcześniej, będziesz potrzebował właściwej obsługi wyjątków i prawdopodobnie pewnych korekt w zależności od tego, jak regularne są Twoje dane. Na początek możesz użyć 'scala.util.Try (...). ToOption', ale jest to bardzo prymitywne podejście. – zero323