2016-10-07 15 views
7

Próbuję odczytać plik csv do ramki danych. Wiem, jaki powinien być schemat mojej ramki danych, ponieważ znam mój plik CSV. Używam również pakietu csv do odczytywania pliku. Próbuję określić schemat jak poniżej.Podaj schemat podczas odczytu pliku csv jako ramka danych

val pagecount = sqlContext.read.format("csv") 
      .option("delimiter"," ").option("quote","") 
      .option("schema","project: string ,article: string ,requests: integer ,bytes_served: long") 
      .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000") 

Ale kiedy sprawdzam schemat utworzonej ramki danych, wygląda na to, że przyjęła ona własny schemat. Czy robię coś złego? jak zrobić iskrę, aby podnieść schemat, o którym wspomniałem?

> pagecount.printSchema 
root 
|-- _c0: string (nullable = true) 
|-- _c1: string (nullable = true) 
|-- _c2: string (nullable = true) 
|-- _c3: string (nullable = true) 
+0

która wersja iskry używasz? –

Odpowiedz

12

Spróbuj poniżej, nie musisz określać schematu. kiedy podasz inferSchema jako true, powinieneś pobrać ją z pliku csv.

val pagecount = sqlContext.read.format("csv") 
    .option("delimiter"," ").option("quote","") 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000") 

jeśli chcesz ręcznie określić schematu, trzeba zrobić jak poniżej

import org.apache.spark.sql.types._ 

val customSchema = StructType(Array(
     StructField("project", StringType, true), 
     StructField("article", StringType, true), 
     StructField("requests", IntegerType, true), 
     StructField("bytes_served", DoubleType, true))) 

    val pagecount = sqlContext.read.format("csv") 
      .option("delimiter"," ").option("quote","") 
      .option("header", "true") 
      .schema(customSchema) 
      .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000") 
+0

Próbowałem wykonać kod, ale dało mi to błąd poniżej. val customSchema = StructType (Array ( StructField ("projekt", StringType, true), StructField ("article", StringType, true), StructField ("requests", IntegerType, true), StructField ("bytes_served", DoubleType, true))) : 30: błąd: nieznalezny: wartość StructType val customSchema = StructType (Array ( – Pa1

+0

Teoretycznie wiem, że możemy wspomnieć o schemacie, ale zagubiłem się, mówiąc o schemacie pod względem składni, czy mogę uzyskać pomoc? Przekazałem oficjalny dokument, ale nie wspominam o tej sprawie i nie mam zbyt wielu przykładów. – Pa1

+0

Czy możesz załączyć zrzut ekranu błędu po tym, jak –

0

Dzięki odpowiedź przez @Nulu, że pracuje dla pyspark z minimalnym szczypanie

from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType 

customSchema = StructType(Array(
    StructField("project", StringType, true), 
    StructField("article", StringType, true), 
    StructField("requests", IntegerType, true), 
    StructField("bytes_served", DoubleType, true))) 

pagecount = sc.read.format("com.databricks.spark.csv") 
     .option("delimiter"," ") 
     .option("quote","") 
     .option("header", "false") 
     .schema(customSchema) 
     .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000") 
-1

Oto jak można pracować z niestandardowym schematem, pełnym demo:

$> numer seryjny,

echo " 
Slingo, iOS 
Slingo, Android 
" > game.csv 

kod Scala:

import org.apache.spark.sql.types._ 

val customSchema = StructType(Array(
    StructField("game_id", StringType, true), 
    StructField("os_id", StringType, true) 
)) 

val csv_df = spark.read.format("csv").schema(customSchema).load("game.csv") 
csv_df.show 

csv_df.orderBy(asc("game_id"), desc("os_id")).show 
csv_df.createOrReplaceTempView("game_view") 
val sort_df = sql("select * from game_view order by game_id, os_id desc") 
sort_df.show 
+0

http://mujiang.blogspot.ca/2018/01/ work-with-custom-schema-with-spark.html –