2017-04-18 56 views
5

Używam kompilatora Java i Protoc 3.0, a mój plik proto jest wymieniony poniżej. https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yangJak odczytywać dane metadane w gRPC używając Java po stronie klienta

syntax = "proto3"; 

package Telemetry; 

// Interface exported by Agent 
service OpenConfigTelemetry { 
    // Request an inline subscription for data at the specified path. 
    // The device should send telemetry data back on the same 
    // connection as the subscription request. 
    rpc telemetrySubscribe(SubscriptionRequest)      returns (stream OpenConfigData) {} 

    // Terminates and removes an exisiting telemetry subscription 
    rpc cancelTelemetrySubscription(CancelSubscriptionRequest)  returns (CancelSubscriptionReply) {} 

    // Get the list of current telemetry subscriptions from the 
    // target. This command returns a list of existing subscriptions 
    // not including those that are established via configuration. 
    rpc getTelemetrySubscriptions(GetSubscriptionsRequest)   returns (GetSubscriptionsReply) {} 

    // Get Telemetry Agent Operational States 
    rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {} 

    // Return the set of data encodings supported by the device for 
    // telemetry data 
    rpc getDataEncodings(DataEncodingRequest)      returns (DataEncodingReply) {} 
} 

// Message sent for a telemetry subscription request 
message SubscriptionRequest { 
    // Data associated with a telemetry subscription 
    SubscriptionInput input         = 1; 

    // List of data models paths and filters 
    // which are used in a telemetry operation. 
    repeated Path path_list         = 2; 

    // The below configuration is not defined in Openconfig RPC. 
    // It is a proposed extension to configure additional 
    // subscription request features. 
    SubscriptionAdditionalConfig additional_config   = 3; 
} 

// Data associated with a telemetry subscription 
message SubscriptionInput { 
    // List of optional collector endpoints to send data for 
    // this subscription. 
    // If no collector destinations are specified, the collector 
    // destination is assumed to be the requester on the rpc channel. 
    repeated Collector collector_list      = 1; 
} 

// Collector endpoints to send data specified as an ip+port combination. 
message Collector { 
    // IP address of collector endpoint 
    string address           = 1; 

    // Transport protocol port number for the collector destination. 
    uint32 port            = 2; 
} 

// Data model path 
message Path { 
    // Data model path of interest 
    // Path specification for elements of OpenConfig data models 
    string path            = 1; 

    // Regular expression to be used in filtering state leaves 
    string filter           = 2; 

    // If this is set to true, the target device will only send 
    // updates to the collector upon a change in data value 
    bool suppress_unchanged         = 3; 

    // Maximum time in ms the target device may go without sending 
    // a message to the collector. If this time expires with 
    // suppress-unchanged set, the target device must send an update 
    // message regardless if the data values have changed. 
    uint32 max_silent_interval        = 4; 

    // Time in ms between collection and transmission of the 
    // specified data to the collector platform. The target device 
    // will sample the corresponding data (e.g,. a counter) and 
    // immediately send to the collector destination. 
    // 
    // If sample-frequency is set to 0, then the network device 
    // must emit an update upon every datum change. 
    uint32 sample_frequency         = 5; 
} 

// Configure subscription request additional features. 
message SubscriptionAdditionalConfig { 
    // limit the number of records sent in the stream 
    int32 limit_records          = 1; 

    // limit the time the stream remains open 
    int32 limit_time_seconds        = 2; 
} 

// Reply to inline subscription for data at the specified path is done in 
// two-folds. 
// 1. Reply data message sent out using out-of-band channel. 
// 2. Telemetry data send back on the same connection as the 
// subscription request. 

// 1. Reply data message sent out using out-of-band channel. 
message SubscriptionReply { 
    // Response message to a telemetry subscription creation or 
    // get request. 
    SubscriptionResponse response       = 1; 

    // List of data models paths and filters 
    // which are used in a telemetry operation. 
    repeated Path path_list         = 2; 
} 

// Response message to a telemetry subscription creation or get request. 
message SubscriptionResponse { 
    // Unique id for the subscription on the device. This is 
    // generated by the device and returned in a subscription 
    // request or when listing existing subscriptions 
    uint32 subscription_id = 1; 
} 

// 2. Telemetry data send back on the same connection as the 
// subscription request. 
message OpenConfigData { 
    // router name:export IP address 
    string system_id          = 1; 

    // line card/RE (slot number) 
    uint32 component_id          = 2; 

    // PFE (if applicable) 
    uint32 sub_component_id         = 3; 

    // Path specification for elements of OpenConfig data models 
    string path            = 4; 

    // Sequence number, monotonically increasing for each 
    // system_id, component_id, sub_component_id + path. 
    uint64 sequence_number         = 5; 

    // timestamp (milliseconds since epoch) 
    uint64 timestamp          = 6; 

    // List of key-value pairs 
    repeated KeyValue kv         = 7; 
} 

// Simple Key-value, where value could be one of scalar types 
message KeyValue { 
    // Key 
    string key            = 1; 

    // One of possible values 
    oneof value { 
     double double_value         = 5; 
     int64 int_value         = 6; 
     uint64 uint_value         = 7; 
     sint64 sint_value         = 8; 
     bool bool_value         = 9; 
     string str_value         = 10; 
     bytes bytes_value         = 11; 
    } 
} 

// Message sent for a telemetry subscription cancellation request 
message CancelSubscriptionRequest { 
    // Subscription identifier as returned by the device when 
    // subscription was requested 
    uint32 subscription_id         = 1; 
} 

// Reply to telemetry subscription cancellation request 
message CancelSubscriptionReply { 
    // Return code 
    ReturnCode code           = 1; 

    // Return code string 
    string  code_str          = 2; 
}; 

// Result of the operation 
enum ReturnCode { 
    SUCCESS             = 0; 
    NO_SUBSCRIPTION_ENTRY         = 1; 
    UNKNOWN_ERROR           = 2; 
} 

// Message sent for a telemetry get request 
message GetSubscriptionsRequest { 
    // Subscription identifier as returned by the device when 
    // subscription was requested 
    // --- or --- 
    // 0xFFFFFFFF for all subscription identifiers 
    uint32 subscription_id         = 1; 
} 

// Reply to telemetry subscription get request 
message GetSubscriptionsReply { 
    // List of current telemetry subscriptions 
    repeated SubscriptionReply subscription_list   = 1; 
} 

// Message sent for telemetry agent operational states request 
message GetOperationalStateRequest { 
    // Per-subscription_id level operational state can be requested. 
    // 
    // Subscription identifier as returned by the device when 
    // subscription was requested 
    // --- or --- 
    // 0xFFFFFFFF for all subscription identifiers including agent-level 
    // operational stats 
    // --- or --- 
    // If subscription_id is not present then sent only agent-level 
    // operational stats 
    uint32 subscription_id         = 1; 

    // Control verbosity of the output 
    VerbosityLevel verbosity        = 2; 
} 

// Verbosity Level 
enum VerbosityLevel { 
    DETAIL             = 0; 
    TERSE             = 1; 
    BRIEF             = 2; 
} 

// Reply to telemetry agent operational states request 
message GetOperationalStateReply { 
    // List of key-value pairs where 
    //  key  = operational state definition 
    //  value = operational state value 
    repeated KeyValue kv         = 1; 
} 

// Message sent for a data encoding request 
message DataEncodingRequest { 
} 

// Reply to data encodings supported request 
message DataEncodingReply { 
    repeated EncodingType encoding_list     = 1; 
} 

// Encoding Type Supported 
enum EncodingType { 
    UNDEFINED            = 0; 
    XML              = 1; 
    JSON_IETF            = 2; 
    PROTO3             = 3; 
} 

W tym celu wywołanie usług (RPC TelemetrySubscribe) najpierw muszę przeczytać nagłówek, które mają Subskrypcja a następnie rozpocząć czytanie wiadomości. Teraz, używając Java, mogę połączyć się z usługą, wprowadziłem przechwytywacz, ale gdy drukuję/odzyskuję nagłówek, jest on pusty. Mój kod wywołujący przechwytywania jest poniżej,

ClientInterceptor interceptor = new HeaderClientInterceptor(); 
     originChannel = OkHttpChannelBuilder.forAddress(host, port) 
     .usePlaintext(true) 
     .build(); 
    Channel channel = ClientInterceptors.intercept(originChannel, interceptor); 
     telemetryStub = OpenConfigTelemetryGrpc.newStub(channel); 

To jest kod przechwytywania do odczytu danych meta.

@Override 
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, 
     CallOptions callOptions, Channel next) { 
    return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { 

     @Override 
     public void start(Listener<RespT> responseListener, Metadata headers) { 

     super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) { 
      @Override 
      public void onHeaders(Metadata headers) { 

      Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER); 

      System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY)); 

Zastanawiasz się, czy istnieje inny sposób odczytu meta-danych lub pierwszej wiadomości z identyfikatorem subskrypcji? Wszystko, czego potrzebuję, aby przeczytać pierwszą wiadomość, która ma identyfikator subskrypcji, i zwrócić ten sam identyfikator subskrypcji do serwera, aby można było rozpocząć przesyłanie strumieniowe Mam równoważny kod Pythona przy użyciu tego samego pliku proto i komunikuję się z serwerem za pomocą kodu wymienionego poniżej tylko w celach informacyjnych:

 sub_req = SubscribeRequestMsg("host",port) 
    data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS) 
    metadata = data_itr.initial_metadata() 

        if metadata[0][0] == "responseKey": 
        metainfo = metadata[0][1] 
        print metainfo 

        subreply = agent_pb2.SubscriptionReply() 
        subreply.SetInParent() 
        google.protobuf.text_format.Merge(metainfo, subreply) 

        if subreply.response.subscription_id: 
        SUB_ID = subreply.response.subscription_id 

Z powyższego kodu Pythona można łatwo pobrać obiekt z meta danych, nie wiesz, jak odzyskać go za pomocą Java?

Po przeczytaniu metadane wszystko otrzymuję to: Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]})

ale wiem, że jest jeszcze jedna linia z meta danych do tego, co jest

response { 
    subscription_id: 2 
} 

Jak można wyodrębnić ostatnią odpowiedź z nagłówka, które mieć identyfikator subskrypcji. Spróbowałem wielu opcji i jestem tutaj zagubiony.

Odpowiedz

4

Wybrana metoda używana jest do prośba metadanych, nie odpowiedzi metadanych:

public void start(Listener<RespT> responseListener, Metadata headers) { 

metadanych odpowiedzi, trzeba będzie ClientCall.Listener i czekać na oddzwonienie onHeaders:

public void onHeaders(Metadata headers) 

Mam wrażenie, że użycie wspomnianych metadanych wydaje się dziwne. Metadane generalnie zawierają dodatkowe szczegóły błędów lub funkcje przekrojowe, które nie są specyficzne dla metody RPC (takie jak auth, śledzenie itp.).

+0

public void start (Listener responseListener, nagłówki metadanych) { ClientCall.Listener słuchacz = new ClientCall.Listener () { \t @Override \t public void onHeaders (nagłówki metadanych) { \t SubscriptionReply s = \t \t \t System.out.println ("Header found1" + nagłówki); \t super.onheaders (nagłówki); \t} }; super.start (słuchacz, nagłówki); } }; Zostało ustanowione zablokowane połączenie, ale dane nie płyną. – Ammad

+0

@Ammad, nigdy nie wywołasz przekazywanego odbiornika ('responseListener'). Rozszerzanie SimpleForwardingClientCallListener i przekazywanie do obiektu 'responseListener' do jego konstruktora. –

+0

Potrzebuję pomocy. Gdzie można zadzwonić do słuchacza? Widzę, że onHeaders() jest wywoływany z pewnymi początkowymi danymi. – Ammad

4

Często korzystanie z narzędzia ClientInterceptor jest niewygodne, ponieważ konieczne jest zachowanie odniesienia do niego w celu wycofania danych. W twoim przypadku dane są w rzeczywistości metadanymi. Jednym ze sposobów na łatwiejszy dostęp do metadanych jest umieszczenie ich wewnątrz Context. Na przykład można utworzyć Context.Key dla identyfikatora subskrypcji. W przechwytywaczu klienta można wyodrębnić żądany nagłówek Metadata i umieścić go wewnątrz Context, używając Context.current().withValue(key, metadata). Wewnątrz twojego StreamObserver możesz wyodrębnić to, dzwoniąc pod numer key.get(Context.current()). Zakłada to, że korzystasz z Async API, a nie z blokującego interfejsu API.

Powód jest trudniejszy, ponieważ zazwyczaj metadane są informacjami o połączeniu, ale nie są bezpośrednio związane z samym połączeniem.Dotyczy to takich czynności, jak śledzenie, kodowanie, statystyki, anulowanie i podobne rzeczy. Jeśli coś zmieni sposób, w jaki potraktujesz żądanie, prawdopodobnie musi przejść bezpośrednio do samego żądania, a nie być po stronie.

+0

Cześć Carl, jestem tylko abonent usługi. Nie mogę podać kluczowi, żeby klient mógł wyciągnąć. Wszystko, co mogę zrobić, to odebrać dane nagłówka i odczytać klucz nagłówka jako pierwszy krok, aby mógł rozpocząć się strumień. – Ammad

+0

Jak utworzyć Context.key dla subskrypcji? Zakładając, że subscription_id jest ciągiem? – Ammad

+1

Tworzenie nowego kontekstu wywołania na klienta po stronie klienta jest niebezpieczne, ponieważ może tworzyć nieskończenie długi łańcuch kontekstowy z interfejsami API Async i Future. Dokumenty dla ClientInterceptors [teraz wyraźnie to nazywają] (https://github.com/grpc/grpc-java/blob/c112a2c/core/src/main/java/io/grpc/ClientInterceptor.java#L56). Bezpieczniejszym sposobem po stronie klienta jest użycie 'AbstractStub.withOption' i pobranie konfiguracji poprzez' CallOptions.getOption'. –