10

Mam tabeli Cassandra że dla uproszczenia wyglądał:Jak wysłać zapytanie do kolumny danych JSON za pomocą Spark DataFrames?

key: text 
jsonData: text 
blobData: blob 

mogę utworzyć podstawowy ramkę danych dla tej użyciem iskry i Iskra-Cassandra złącza za pomocą:

val df = sqlContext.read 
    .format("org.apache.spark.sql.cassandra") 
    .options(Map("table" -> "mytable", "keyspace" -> "ks1")) 
    .load() 

I m walczy jednak o rozszerzenie danych JSON do swojej podstawowej struktury. Ostatecznie chcę móc filtrować w oparciu o atrybuty w ciągu JSON i zwracać dane typu blob. Coś jak jsonData.foo = "bar" i zwróć blobData. Czy to jest obecnie możliwe?

+0

Czy 'klucz' jest niepowtarzalnym identyfikatorem? – zero323

+0

tak, klucz jest kluczem podstawowym tabeli – JDesuv

Odpowiedz

27

Spark 2.1+

Można użyć from_json funkcję:

import org.apache.spark.sql.functions.from_json 
import org.apache.spark.sql.types._ 

val schema = StructType(Seq(
    StructField("k", StringType, true), StructField("v", DoubleType, true) 
)) 

df.withColumn("jsonData", from_json($"jsonData", schema)) 

Spark 1.6+

Można użyć get_json_object która zaczyna kolumnę i ścieżkę:

import org.apache.spark.sql.functions.get_json_object 

val exprs = Seq("k", "v").map(
    c => get_json_object($"jsonData", s"$$.$c").alias(c)) 

df.select($"*" +: exprs: _*) 

i wyodrębnia pola do poszczególnych ciągów, które można dalej rzutować na oczekiwane typy.

Spark < = 1,5:

Jest to obecnie możliwe?

O ile mi wiadomo, nie jest to bezpośrednio możliwe. Można spróbować coś podobnego do tego:

val df = sc.parallelize(Seq(
    ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"), 
    ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2") 
)).toDF("key", "jsonData", "blobData") 

Zakładam, że blob pole nie może być reprezentowane w JSON. W przeciwnym razie taksówki pominąć dzielenia i łączenia:

import org.apache.spark.sql.Row 

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey") 
val jsons = sqlContext.read.json(df.drop("blobData").map{ 
    case Row(key: String, json: String) => 
    s"""{"key": "$key", "jsonData": $json}""" 
}) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey") 
parsed.printSchema 

// root 
// |-- jsonData: struct (nullable = true) 
// | |-- k: string (nullable = true) 
// | |-- v: double (nullable = true) 
// |-- key: long (nullable = true) 
// |-- blobData: string (nullable = true) 

Alternatywą (taniej, chociaż bardziej skomplikowane) podejście jest użycie UDF do analizowania JSON i wyjście kolumnę struct lub map. Na przykład coś takiego:

import net.liftweb.json.parse 

case class KV(k: String, v: Int) 

val parseJson = udf((s: String) => { 
    implicit val formats = net.liftweb.json.DefaultFormats 
    parse(s).extract[KV] 
}) 

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData")) 
parsed.show 

// +---+--------------------+------------------+----------+ 
// |key|   jsonData|   blobData|parsedJSON| 
// +---+--------------------+------------------+----------+ 
// | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]| 
// | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]| 
// +---+--------------------+------------------+----------+ 

parsed.printSchema 

// root 
// |-- key: string (nullable = true) 
// |-- jsonData: string (nullable = true) 
// |-- blobData: string (nullable = true) 
// |-- parsedJSON: struct (nullable = true) 
// | |-- k: string (nullable = true) 
// | |-- v: integer (nullable = false) 
0

leżącej JSON ciąg jest

"{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}"; 

Poniżej jest skrypt do filtrowania JSON i załadować wymaganych danych do Cassandry.

sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2") 
      .write.format("org.apache.spark.sql.cassandra") 
      .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name")) 
      .mode(SaveMode.Append) 
      .save() 
1

Funkcja from_json jest dokładnie tym, czego szukasz. Twój kod będzie wyglądać następująco:

val df = sqlContext.read 
    .format("org.apache.spark.sql.cassandra") 
    .options(Map("table" -> "mytable", "keyspace" -> "ks1")) 
    .load() 

//You can define whatever struct type that your json states 
val schema = StructType(Seq(
    StructField("key", StringType, true), 
    StructField("value", DoubleType, true) 
)) 

df.withColumn("jsonData", from_json(col("jsonData"), schema))