2012-07-15 29 views
6

Zacząłem grać z Kafką. Ustawiłem konfigurację zookeepera i udało mi się wysłać i zużyć ciągi wiadomości. Teraz próbuję przekazać obiekt (w java), ale z jakiegoś powodu podczas analizowania wiadomości w kliencie mam problemy z nagłówkiem. Próbowałem kilka opcji serializacji (używając dekodera/enkodera), a wszystkie zwracają ten sam problem z nagłówkiem.Kafka Serializacja obiektu

Oto mój kod Producent:

 Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config); 
     ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails()); 
     try { 
      producer.send(data); 
     } finally { 
      producer.close(); 
     } 

i konsumenta:

 Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("zk.connectiontimeout.ms", "1000000"); 
     props.put("groupid", "test_group"); 

     // Create the connection to the cluster 
     ConsumerConfig consumerConfig = new ConsumerConfig(props); 
     ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); 

     // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume 
     Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams = 
       consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer()); 
     List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3"); 

     // create list of 4 threads to consume from each of the partitions 
     ExecutorService executor = Executors.newFixedThreadPool(4); 

     // consume the messages in the threads 
     for (final KafkaMessageStream<EventDetails> stream: streams) { 
      executor.submit(new Runnable() { 
       public void run() { 
        for(EventDetails event: stream) { 
         System.err.println("********** Got message" + event.toString());   
        } 
       } 
      }); 
     } 

i mój serializatora:

public class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> { 
    public Message toMessage(EventDetails eventDetails) { 
     try { 
      ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
      byte[] serialized = mapper.writeValueAsBytes(eventDetails); 
      return new Message(serialized); 
} catch (IOException e) { 
      e.printStackTrace(); 
      return null; // TODO 
     } 
} 
    public EventDetails toEvent(Message message) { 
     EventDetails event = new EventDetails(); 

     ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
     try { 
      //TODO handle error 
      return mapper.readValue(message.payload().array(), EventDetails.class); 
     } catch (IOException e) { 
      e.printStackTrace(); 
      return null; 
     } 

    } 
} 

A to błąd pojawia się:

org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse 
at [Source: N/A; line: -1, column: -1] 

Kiedy pracowałem z i zwykłym pisaniem do ObjectOutputStream, otrzymałem podobny problem z nagłówkiem. Próbowałem również dodać CRC32 ładunku do wiadomości, ale to też nie pomogło.

Co ja tu robię źle?

Odpowiedz

1

Metoda Bajbuffers .array() nie jest bardzo niezawodna. To zależy od konkretnej implementacji. Może chcesz spróbować

ByteBuffer bb = message.payload() 

byte[] b = new byte[bb.remaining()] 
bb.get(b, 0, b.length); 
return mapper.readValue(b, EventDetails.class) 
+0

Dzięki temu rozwiązałem bardzo podobny problem, jaki miałem! – Jarmex

3

Hm, nie mam biegać w tym samym numerze nagłówka, które napotykają ale mój projekt nie został poprawnie opracowującym kiedy nie zapewniają VerifiableProperties konstruktora w moim koder/dekoder . Wydaje się dziwne, że brakujący konstruktor mógłby uszkodzić deserializację Jacksona.

Być może spróbuj podzielić swój koder i dekoder i uwzględnić konstruktor VerifiableProperties w obu; nie powinieneś potrzebować implementacji Decoder[T] dla serializacji. Udało mi się z powodzeniem zaimplementować json de/serialization przy użyciu ObjectMapper zgodnie z formatem w this post.

Powodzenia!