2017-02-13 29 views
7

Próbuję mieć logstash wyjście elasticsearch ale nie jestem pewien, w jaki sposób korzystać z mapowania I zdefiniowanego w elasticsearch ...wyjście logstash do elasticsearch indeksu i mapowanie

W Kibana, zrobiłem to:

Utworzono indeks i odwzorowanie takiego:

PUT /kafkajmx2 
{ 
    "mappings": { 
    "kafka_mbeans": { 
     "properties": { 
     "@timestamp": { 
      "type": "date" 
     }, 
     "@version": { 
      "type": "integer" 
     }, 
     "host": { 
      "type": "keyword" 
     }, 
     "metric_path": { 
      "type": "text" 
     }, 
     "type": { 
      "type": "keyword" 
     }, 
     "path": { 
      "type": "text" 
     }, 
     "metric_value_string": { 
      "type": "keyword" 
     }, 
     "metric_value_number": { 
      "type": "float" 
     } 
     } 
    } 
    } 

} 

można zapisać danych na to tak:

POST /kafkajmx2/kafka_mbeans 
{ 
    "metric_value_number":159.03478490788203, 
    "path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf", 
    "@timestamp":"2017-02-12T23:08:40.934Z", 
    "@version":"1","host":"localhost", 
    "metric_path":"node1.kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec.FifteenMinuteRate", 
    "type":null 


} 

teraz moje wyjście logstash wygląda następująco:

input { 
     kafka { 
       kafka details here 
     } 

} 
output { 

    elasticsearch { 
      hosts => "http://elasticsearch:9050" 
      index => "kafkajmx2" 

    } 

} 

i po prostu zapisuje je do indeksu kafkajmx2 ale nie korzystać z mapy, gdy kwerendy takich jak to w Kibana:

get /kafkajmx2/kafka_mbeans/_search?q=* 
{ 


} 

I dostać ten powrotem:

 { 
     "_index": "kafkajmx2", 
     "_type": "logs", 
     "_id": "AVo34xF_j-lM6k7wBavd", 
     "_score": 1, 
     "_source": { 
      "@timestamp": "2017-02-13T14:31:53.337Z", 
      "@version": "1", 
      "message": """ 
{"metric_value_number":0,"path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf","@timestamp":"2017-02-13T14:31:52.654Z","@version":"1","host":"localhost","metric_path":"node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count","type":null} 

""" 
     } 
     } 

jak mogę powiedzieć, że aby korzystać z mapy kafka_mbeans na wyjściu logstash?

----- EDIT -----

Próbowałem moje wyjście tak, ale wciąż te same wyniki:

output { 

     elasticsearch { 
       hosts => "http://10.204.93.209:9050" 
       index => "kafkajmx2" 
       template_name => "kafka_mbeans" 
       codec => plain { 
         format => "%{message}" 
       } 

     } 

} 

dane w poszukiwaniu elastycznego powinna wyglądać następująco:

{ 
    "@timestamp": "2017-02-13T14:31:52.654Z", 
    "@version": "1", 
    "host": "localhost", 
    "metric_path": "node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count", 
    "metric_value_number": 0, 
    "path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf", 
    "type": null 
} 

-------- EDIT 2 --------------

ja conajmniej dostał wiadomość do analizowania w json przez addin ga filtr tak:

input { 
     kafka { 
       ...kafka details.... 
     } 

} 
filter { 
     json { 
       source => "message" 
       remove_field => ["message"] 
     } 
} 
output { 

     elasticsearch { 
       hosts => "http://node1:9050" 
       index => "kafkajmx2" 
       template_name => "kafka_mbeans" 
     } 

} 

Nie używa szablonu nadal ale to conajmniej analizuje json poprawnie ... więc teraz mam to:

{ 
    "_index": "kafkajmx2", 
    "_type": "logs", 
    "_id": "AVo4a2Hzj-lM6k7wBcMS", 
    "_score": 1, 
    "_source": { 
     "metric_value_number": 0.9967205071482902, 
     "path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf", 
     "@timestamp": "2017-02-13T16:54:16.701Z", 
     "@version": "1", 
     "host": "localhost", 
     "metric_path": "kafka1.kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent.Value", 
     "type": null 
    } 
    } 

Odpowiedz

4

To, co należy zmienić, jest bardzo proste. Najpierw użyj kodeka json na swoim wejściu kafka. Nie potrzebujesz filtra json, możesz go usunąć.

kafka { 
      ...kafka details.... 
      codec => "json" 
    } 

Następnie w swoim wyjściu elasticsearch tracisz typ odwzorowania (parametr document_type poniżej), co jest ważne w przeciwnym razie domyślne do logs (jak widać) i że nie pasuje do Twojego typu kafka_mbeans mapowania. Co więcej, nie musisz używać szablonu, ponieważ Twój indeks już istnieje. Wprowadzić następujące zmiany:

elasticsearch { 
      hosts => "http://node1:9050" 
      index => "kafkajmx2" 
      document_type => "kafka_mbeans" 
    } 
+0

Smutno, że zrobiłeś 't dostarczyć żadnej opinii na temat, czy rozwiązał twój problem – Val

+0

przepraszam za spóźnioną bardzo późną odpowiedź ... Musiałem odłączyć od tego na chwilę, ale to działało! – user2061886

+0

Niesamowite, cieszę się, że w końcu udało się! – Val

0

ta jest zdefiniowana the template_name parameter na elasticsearch wyjście.

elasticsearch { 
     hosts   => "http://elasticsearch:9050" 
     index   => "kafkajmx2" 
     template_name => "kafka_mbeans" 
} 

Jedno ostrzeżenie, chociaż. Jeśli chcesz rozpocząć tworzenie indeksów umieszczonych w odpowiednim czasie, takich jak jeden indeks w tygodniu, musisz wykonać jeszcze kilka kroków, aby upewnić się, że mapowanie pozostało z każdym z nich. Masz kilka opcji:

  • Utwórz szablon elasticsearch i zdefiniuj go, aby zastosować go do indeksów za pomocą globu. Takich jak kafkajmx2-*
  • Użyj na wyjściu, który określa plik JSON, który definiuje twoje mapowanie, które będzie używane ze wszystkimi indeksami utworzonymi przez to wyjście.
+0

więc starałem się dodać, że, ale kiedy to zrobić w Kibana ... 'GET kafkajmx2/kafka_mbeans/_search q = * {}' mam nic z powrotem .... wszystko? jest sitll wchodząc w "kafkajmx2" nadal jak w pytaniu ... – user2061886

+0

@ user2061886 Będziesz potrzebował 'type =>" kafka_mbeans "' na swoim wejściu. Logstash 'type' jest równoważny z tym, co nazywasz' mapowaniem'. – sysadmin1138

+0

'type' nie jest już dostępny w logstash 5.2 już ... – user2061886