2016-12-21 37 views
5

Jestem migracji z Impala SparkSQL, stosując następujący kod do odczytu tabeli:SparkSQL - odczytać pliku parkiet bezpośrednio

my_data = sqlContext.read.parquet('hdfs://my_hdfs_path/my_db.db/my_table') 

Jak mogę powołać SparkSQL powyżej, więc może powrócić coś takiego:

'select col_A, col_B from my_table' 

Odpowiedz

4

Po utworzeniu ramki danych z pliku parkietu, należy zarejestrować ją jako tabelę tymczasową, aby uruchomić na niej sql queries.

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

val df = sqlContext.read.parquet("src/main/resources/peopleTwo.parquet") 

df.printSchema 

// after registering as a table you will be able to run sql queries 
df.registerTempTable("people") 

sqlContext.sql("select * from people").collect.foreach(println) 
+0

Czy konieczne jest zbieranie (lub dobry pomysł)? Ponieważ jeśli dane są duże, nie chcemy zbierać wszystkiego do sterownika? – Edamame

+1

to tylko przykład użycia sql. To zależy od Ciebie, w jaki sposób chcesz z niego korzystać. możesz zmienić zapytanie lub zrobić .take() również, aby uzyskać wymagane dane na temat sterownika –

5

Mamy można uruchomić SQL bezpośrednio na plikach, takich jak JSON, ORC, parkiet i CSV bez tworzenia tabeli.

//This Spark 2.x code you can do the same on sqlContext as well 
val spark: SparkSession = SparkSession.builder.master("set_the_master").getOrCreate 

spark.sql("select col_A, col_B from parquet.`hdfs://my_hdfs_path/my_db.db/my_table`") 
    .show() 
+0

Widzę ten błąd "Nie znaleziono pliku. Możliwe, że podstawowe pliki zostały zaktualizowane. Możesz jawnie unieważnić pamięć podręczną w Sparku przez uruchamianie polecenia "REFRESH TABLE tableName" w SQL lub poprzez ponowne utworzenie danych Dataset/DataFrame. " Jak rozwiązać ten problem? – Passionate

+0

Nie pomaga, jeśli robię iskry.sqlContext(). SetConf ("spark.sql.parquet.cacheMetadata", "false"); – Passionate

+1

Działa! Po prostu zastąp ścieżkę pliku 'hdfs: // my_hdfs_path/my_db.db/my_table'. :) – Cherry