2016-01-05 27 views
6

Jestem całkowicie nowym użytkownikiem Kafki i Avro i próbuję użyć pakietu konfluentującego. Mamy istniejące POJO, których używamy do JPA i chciałbym móc po prostu utworzyć instancję moich POJO bez konieczności ręcznego odzwierciedlania każdej wartości w generycznym rekordzie. Wygląda na to, że brakuje mi tego w dokumentacji.Konwertowanie danych na generyczne zapisy w pliku confluent.io i przesyłanie za pośrednictwem produktu KafkaProducer

Przykłady użyć rodzajowe rekord i ustawić każdą wartość jeden po drugim tak:

String key = "key1"; 
String userSchema = "{\"type\":\"record\"," + 
        "\"name\":\"myrecord\"," + 
        "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"; 
Schema.Parser parser = new Schema.Parser(); 
Schema schema = parser.parse(userSchema); 
GenericRecord avroRecord = new GenericData.Record(schema); 
avroRecord.put("f1", "value1"); 

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord); 
try { 
    producer.send(record); 
} catch(SerializationException e) { 
    // may need to do something with it 
} 

Istnieje kilka przykładów dla uzyskania schematu z klasy i znalazłem adnotacje do zmiany tego schematu jest to konieczne. Teraz jak wziąć instancję POJO i po prostu wysłać ją do serializera tak jak jest i czy biblioteka wykonuje pracę polegającą na dopasowywaniu schematu do klasy, a następnie kopiowaniu wartości do ogólnego rekordu? Czy to wszystko źle? To, co chcę zrobić, to coś takiego:

String key = "key1"; 
Schema schema = ReflectData.get().getSchema(myObject.getClass()); 
GenericRecord avroRecord = ReflectData.get().getRecord(myObject, schema); 

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord); 
try { 
    producer.send(record); 
} catch(SerializationException e) { 
    // may need to do something with it 
} 

Dzięki!

Odpowiedz

1

I skończyłem tworzyć własne serializatora w tym przypadku:

public class KafkaAvroReflectionSerializer extends KafkaAvroSerializer { 
    private final EncoderFactory encoderFactory = EncoderFactory.get(); 

    @Override 
    protected byte[] serializeImpl(String subject, Object object) throws SerializationException { 
     //TODO: consider caching schemas 
     Schema schema = null; 

     if(object == null) { 
     return null; 
     } else { 
     try { 
      schema = ReflectData.get().getSchema(object.getClass()); 
      int e = this.schemaRegistry.register(subject, schema); 
      ByteArrayOutputStream out = new ByteArrayOutputStream(); 
      out.write(0); 
      out.write(ByteBuffer.allocate(4).putInt(e).array()); 

      BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null); 
      DatumWriter<Object> writer = new ReflectDatumWriter<>(schema); 
      writer.write(object, encoder); 
      encoder.flush(); 
      out.close(); 

      byte[] bytes = out.toByteArray(); 
      return bytes; 
     } catch (IOException ioe) { 
      throw new SerializationException("Error serializing Avro message", ioe); 
     } catch (RestClientException rce) { 
      throw new SerializationException("Error registering Avro schema: " + schema, rce); 
     } catch (RuntimeException re) { 
      throw new SerializationException("Error serializing Avro message", re); 
     } 
     } 
    } 
}