2015-01-23 10 views
8

Próbuję utworzyć wiersz (org.apache.spark.sql.catalyst.expressions.Row) na podstawie danych wprowadzonych przez użytkownika. Nie jestem w stanie losowo utworzyć wiersza.Jak utworzyć wiersz z listy lub tablicy w Spark przy użyciu Scala

Czy istnieje możliwość utworzenia wiersza od List lub Array.

Dla np. Jeśli mam plik .csv z następującym formacie

"91xxxxxxxxxx,21.31,15,0,0" 

Jeśli wejście użytkownik [1, 2] następnie trzeba podjąć tylko 2nd kolumny i 3 kolumny wraz z customer_id który jest pierwsza kolumna

staram się analizować za pomocą kodu:

val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) ` 

gdzie foo jest defi Ned jako

def f(n: List[Int], s: String) : Row = { 
    val n = input.length 
    var out = new Array[Any](n+1) 
    var r = s.split(",") 
    out(0) = r(0) 
    for (i <- 1 to n) 
     out(i) = r(input(i-1)).toDouble 
    Row(out) 
} 

i wejście jest Lista powiedzieć

val input = List(1,2) 

Wykonanie tego kodu uzyskać L3 jak:

Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916]) 

Ale co chcę jest:

Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])` 

To musi być przekazane, aby utworzyć schemat w Spark SQL

Odpowiedz

13

coś jak następuje powinno działać:

import org.apache.spark.sql._ 

def f(n: List[Int], s: String) : Row = 
    Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq) 
+5

Działa to dobrze, jeśli chcę przeanalizować go jako pojedynczy wiersz z 3 wartościami ciągu. Ale jak z niego korzystać, jeśli pierwsza wartość jest ciągiem, druga i trzecia wartość są podwójne? Czy to możliwe? – Anju

2

Brakuje utworzenie StructField i StructType. Zapoznać się z oficjalnym przewodniku http://spark.apache.org/docs/latest/sql-programming-guide.html, część Programowo Określanie schematu

Nie jestem specjalistą Scala, ale w Pythonie to będzie wyglądać następująco:

from pyspark.sql import * 
sqlContext = SQLContext(sc) 

input = [1,2] 

def parse(line): 
    global input 
    l = line.split(',') 
    res = [l[0]] 
    for ind in input: 
     res.append(l[ind]) 
    return res 

csv = sc.textFile("file:///tmp/inputfile.csv") 
rows = csv.map(lambda x: parse(x)) 

fieldnum = len(input) + 1 
fields = [StructField("col"+str(i), StringType(), True) for i in range(fieldnum)] 
schema = StructType(fields) 

csvWithSchema = sqlContext.applySchema(rows, schema) 
csvWithSchema.registerTempTable("test") 
sqlContext.sql("SELECT * FROM test").collect() 

Mówiąc krótko, nie należy bezpośrednio konwertować je do obiektów wiersz, po prostu zostawić jak RDD i zastosować schemat do niego z applySchema

0

można też spróbować:

Row.fromSeq(line(0).toString ++ line(1).toDouble ++ line(2).toDouble ++ line.slice(2, line.size).map(value => value.toString))