2016-07-28 32 views
7

Czy możliwe jest wykonywanie skryptów Pythona w logstash? Mogę zaimportować dane CSV do elastycznego wyszukiwania za pomocą logstash. Ale muszę użyć aktualizacji API zamiast po prostu indeksować wszystkie wiersze.Skrypty w logstash

Oto mój przykładowy plik CSV ...

vi /tmp/head.txt 
"Home","Home-66497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1VSZj","919359000000","HMSHOP","916265100000","2016-05-18 08:41:49" 
"Home","Home-26497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1V1","919359000001","HMSHOP","916265100000","2016-05-18 18:41:49" 
"Home","Home-36497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/SZj1","919359000001","HMSHOP","916265100000","2016-05-18 12:41:49" 
"Home","Home-46497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1","919359000000","HMSHOP","916265100000","2016-05-18 14:41:49" 
"Home","Home-56497273a5a83c99","Spice Xlife 350, 3.5inch Android, bit.ly/1VSZj1xc","919359000000","HMSHOP","916265100000","2016-05-18 16:41:49" 

Oto logstash plik konfiguracyjny ...

vi logstash.conf 
input { 
    file { 
     path => "/tmp/head.txt" 
     type => "csv" 
     start_position => beginning 
    } 
} 
filter { 
    csv { 
     columns => ["user", "messageid", "message", "destination", "code", "mobile", "mytimestamp"] 
     separator => "," 
    } 
} 

output { 
    elasticsearch { 
     action => "index" 
     hosts => ["172.17.0.1"] 
     index => "logstash-%{+YYYY.MM.dd}" 
     workers => 1 
    } 
} 

I potwierdziły, że powyższa konfiguracja działa zgodnie z oczekiwaniami, a wszystkie 5 rekordy są przechowywane jako 5 oddzielnych dokumentów.

tutaj jest dla mnie rozkazem doker ...

docker run -d -v "/tmp/logstash.conf":/usr/local/logstash/config/logstash.conf -v /tmp/:/tmp/ logstash -f /usr/local/logstash/config/logstash.conf 

Problem polega na tym, że trzeba łączyć dokumenty na podstawie numeru docelowego. Miejsce docelowe powinno być identyfikatorem dokumentu. Jest kilka wierszy z tym samym miejscem docelowym. Dla np. _id: 919359000001 Ten dokument powinien zawierać oba następujące rekordy jako obiekty zagnieżdżone.

"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49" 
"user": "Home", "messageid" "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/SZj1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp": "2016-05-18 12:41:49" 

Elasticsearch poprawnie konwertuje dane CSV do json, jak pokazano powyżej. Potrzebuję sformatować oświadczenie, aby skorzystać ze skryptów przy użyciu aktualizacji API Poniższy kod działa poprawnie.

POST /test_index/doc/_bulk 
{ "update" : { "_id" : "919359000001"} } 
{ "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}}, "upsert": {"parent" : [{"user": "Home", "messageid": "Home-26497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V1", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 18:41:49"}] }} 
{ "update" : { "_id" : "919359000001"} } 
{ "script" : { "inline": "ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]", "lang" : "groovy", "params" : {"user": "Home", "messageid": "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V13343", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}}, "upsert": {"parent" : [{"user": "Home", "messageid": "Home-36497273a5a83c99", "message": "Spice Xlife 350, 3.5inch Android, bit.ly/1V13343", "code": "HMSHOP", "mobile": "916265100000", "mytimestamp" : "2016-05-18 12:41:49"}] }} 

Jak mogę kodować w logstash, aby przekonwertować moje dane CSV, aby wyglądały jak wyżej?


Aktualizacja

Mam kodu Pythona, który działa zgodnie z oczekiwaniami. Chciałbym wiedzieć, jak zmodyfikować ten kod, aby pasował do "wyjściowych" parametrów sugerowanych zgodnie z odpowiedzią. W poniższym przykładzie df_json jest obiektem python, który jest niczym innym jak pythonową ramką danych spłaszczoną do json.

import copy 
with open('myfile.txt', 'w') as f: 
    for doc1 in df_json: 
     import json 
     doc = mydict(doc1) 
     docnew = copy.deepcopy(doc) 
     del docnew['destination'] 
     action = '{ "update": {"_id": %s }}\n' % doc['destination'] 
     f.write(action) 
     entry = '{ "script" : { "inline": "ctx._source.parent += [\'user\': user, \'messageid\': messageid, \'message\': message, \'code\': code, \'mobile\': mobile, \'mytimestamp\': mytimestamp]", "lang" : "groovy", "params" : %s}, "upsert": {"parent" : [%s ] }}\n' % (doc, docnew) 
     f.write(entry) 

! curl -s -XPOST XXX.xx.xx.x:9200/test_index222/doc/_bulk --data-binary @myfile.txt; echo 

Aktualizacja 2

Próbowałem następującą konfigurację i jest zastąpienie (nie aktualizowanie zgodnie Script) dokumentów.

Kiedy zmieniono działanie na "update", pojawia się następujący błąd ...

:response=>{"update"=>{"_index"=>"logstash4-2016.07.29", "_type"=>"csv", "_id"=>"919359000000", 
"status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", 
"caused_by"=>{"type"=>"script_exception", "reason"=>"failed to run in line script 
[ctx._source.parent += ['user': user, 'messageid': messageid, 'message': message, 'code': code, 'mobile': mobile, 'mytimestamp': mytimestamp]] 
using lang [groovy]", "caused_by"=>{"type"=>"missing_property_exception", "reason"=>"No such property: user for class: fe1b423dc4966b0f0b511b732474637705bf3bb1"}}}}}, :level=>:warn} 

Update 3

Jak na odpowiedź Vala dodałem imprezę i dostaję ten błąd ...

:response=>{"update"=>{"_index"=>"logstash4-2016.08.06", "_type"=>"csv", "_id"=>"%{destination}", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to execute script", "caused_by"=>{"type"=>"script_exception", "reason"=>"failed to run inline script [ctx._source.parent += ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]] using lang [groovy]", "caused_by"=>{"type"=>"null_pointer_exception", "reason"=>"Cannot execute null+{user=null, messageid=null, message=, code=null, mobile=null, mytimestamp=null}"}}}}} 

Aktualizacja 4

Zgodnie Vala zaktualizowaną odpowiedź Próbowałem to ...

script => "ctx._source.parent = (ctx._source.parent ?: []) + ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]" 

I got ten błąd:

{:timestamp=>"2016-08-12T09:40:48.869000+0000", :message=>"Pipeline main started"} 
{:timestamp=>"2016-08-12T09:40:49.517000+0000", :message=>"Error parsing csv", :field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>, :level=>:warn} 

tylko 2 rekordy zostały dodane do bazy danych.

+0

Zmiana akcji => "aktualizacja", a nie "indeks". – alpert

Odpowiedz

1

elasticsearch plugin wyjście obsługuje parametry skryptu:

output { 
    elasticsearch { 
     action => "update" 
     hosts => ["172.17.0.1"] 
     index => "logstash-%{+YYYY.MM.dd}" 
     workers => 1 
     script => "<your script here>" 
     script_type => "inline" 
     # Set the language of the used script 
     # script_lang => 
     # if enabled, script is in charge of creating non-existent document (scripted update) 
     # scripted_upsert => (default is false) 
    } 
} 
+0

Zmiana akcji na aktualizację nie pomogła. Zobacz zaktualizowane pytanie. – shantanuo

1

Zdarzenie jest przekazywane do skryptu w wyjściu z użyciem nazwy zmiennej event (domyślnie, ale można go zmienić za pomocą ustawienia script_var_name).

Skrypt w wynikach musi to uwzględnić.

script => "ctx._source.parent = (ctx._source.parent ?: []) + ['user': event.user, 'messageid': event.messageid, 'message': event.message, 'code': event.code, 'mobile': event.mobile, 'mytimestamp': event.mytimestamp]" 
+0

Czy to szczęście? – Val

+0

Wystąpił błąd. Sprawdź zaktualizowane pytanie. – shantanuo

+0

Zaktualizowałem moją odpowiedź – Val