2016-07-02 16 views
5

Dokumentacja Gumicsaerch obejmuje tylko ładowanie pełnego indeksu do Sparka.Jak zapytać o indeks Elasticsearch za pomocą Pyspark i Dataframes

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") 
df.printSchema() 

Jak można wykonać zapytanie do powrotu danych z indeksem Elasticsearch i załadować je do Spark jako DataFrame wykorzystaniem pyspark?

Odpowiedz

4

Oto jak to zrobić.

ustawienia ogólne środowisko i komenda:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6 
export PYSPARK_DRIVER_PYTHON=ipython2 

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar 

Kod:

from pyspark import SparkConf 
from pyspark.sql import SQLContext 

conf = SparkConf().setAppName("ESTest") 
sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 

q ="""{ 
    "query": { 
    "filtered": { 
     "filter": { 
     "exists": { 
      "field": "label" 
     } 
     }, 
     "query": { 
     "match_all": {} 
     } 
    } 
    } 
}""" 

es_read_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "titanic/passenger", 
    "es.query" : q 
} 

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf) 

sqlContext.createDataFrame(es_rdd).collect() 

Można również zdefiniować kolumny danych klatce. Aby uzyskać więcej informacji, patrz Here.

Mam nadzieję, że to pomoże!

+0

Właśnie to robiłem teraz, miałem nadzieję, że istnieje sposób na bezpośrednie pobranie przefiltrowanej DataFrame –

+1

Nie jestem pewien, czy jest to możliwe z najnowszym API złącza ES-Hadoop Spark. –

+1

Czy istnieje sposób na zapisanie ramek danych w elastycznym wyglądzie również przy użyciu tego interfejsu API? –

0

Uruchomiłem mój kod w klastrze EMR z Amazon za pomocą pyspark. Następnie droga zrobiłem to prace następujące kroki:

1) Umieść tę akcję bootstrap w tworzeniu klastrów (w celu utworzenia localhost serwer elasticsearch):

s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb 

2) i uruchomić te polecenia do wypełnienia baza elasticsearch z niektórych danych:

curl -XPUT "http://localhost:9200/movies/movie/1" -d' { 
    "title": "The Godfather", 
    "director": "Francis Ford Coppola", 
    "year": 1972 
    }' 

można również uruchomić inne polecenia curl jeśli chcesz, jak:

curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}} 

3) I inited pyspark stosując następujące parametry:

pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar 

ja ściągnąłem klienta elasticsearch Pythona wcześniej

4) uruchomić następujący kod:

from pyspark import SparkConf 
from pyspark.sql import SQLContext 

q ="""{ 
    "query": { 
    "match_all": {} 
    } 
}""" 

es_read_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "movies/movie", 
    "es.query" : q 
} 

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf) 

sqlContext.createDataFrame(es_rdd).collect() 

Potem wreszcie pomyślny wynik z polecenia.

0

Napotkałem problem podobny do tego, aby uzyskać dane geo-filtrowane w DataSpray PySpark. Używam elasticsearch-spark-20_2.11-5.2.2.jar ze Spark w wersji 2.1.1 i ES w wersji 5.2. Udało mi się załadować dane do DataFrame określając moje zapytanie jako opcja podczas tworzenia DataFrame

Moja geo-query

q ="""{ 
    "query": { 
     "bool" : { 
      "must" : { 
       "match_all" : {} 
      }, 
      "filter" : { 
       "geo_distance" : { 
        "distance" : "100km", 
        "location" : { 
         "lat" : 35.825, 
         "lon" : -87.99 
        } 
       } 
      } 
     } 
    } 
}""" 

użyłem następujące polecenie, aby załadować dane do DataFrame

spark_df = spark.read.format("es").option("es.query", q).load("index_name")