Używam kompilatora Java i Protoc 3.0, a mój plik proto jest wymieniony poniżej. https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yangJak odczytywać dane metadane w gRPC używając Java po stronie klienta
syntax = "proto3";
package Telemetry;
// Interface exported by Agent
service OpenConfigTelemetry {
// Request an inline subscription for data at the specified path.
// The device should send telemetry data back on the same
// connection as the subscription request.
rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {}
// Terminates and removes an exisiting telemetry subscription
rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {}
// Get the list of current telemetry subscriptions from the
// target. This command returns a list of existing subscriptions
// not including those that are established via configuration.
rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {}
// Get Telemetry Agent Operational States
rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {}
// Return the set of data encodings supported by the device for
// telemetry data
rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {}
}
// Message sent for a telemetry subscription request
message SubscriptionRequest {
// Data associated with a telemetry subscription
SubscriptionInput input = 1;
// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;
// The below configuration is not defined in Openconfig RPC.
// It is a proposed extension to configure additional
// subscription request features.
SubscriptionAdditionalConfig additional_config = 3;
}
// Data associated with a telemetry subscription
message SubscriptionInput {
// List of optional collector endpoints to send data for
// this subscription.
// If no collector destinations are specified, the collector
// destination is assumed to be the requester on the rpc channel.
repeated Collector collector_list = 1;
}
// Collector endpoints to send data specified as an ip+port combination.
message Collector {
// IP address of collector endpoint
string address = 1;
// Transport protocol port number for the collector destination.
uint32 port = 2;
}
// Data model path
message Path {
// Data model path of interest
// Path specification for elements of OpenConfig data models
string path = 1;
// Regular expression to be used in filtering state leaves
string filter = 2;
// If this is set to true, the target device will only send
// updates to the collector upon a change in data value
bool suppress_unchanged = 3;
// Maximum time in ms the target device may go without sending
// a message to the collector. If this time expires with
// suppress-unchanged set, the target device must send an update
// message regardless if the data values have changed.
uint32 max_silent_interval = 4;
// Time in ms between collection and transmission of the
// specified data to the collector platform. The target device
// will sample the corresponding data (e.g,. a counter) and
// immediately send to the collector destination.
//
// If sample-frequency is set to 0, then the network device
// must emit an update upon every datum change.
uint32 sample_frequency = 5;
}
// Configure subscription request additional features.
message SubscriptionAdditionalConfig {
// limit the number of records sent in the stream
int32 limit_records = 1;
// limit the time the stream remains open
int32 limit_time_seconds = 2;
}
// Reply to inline subscription for data at the specified path is done in
// two-folds.
// 1. Reply data message sent out using out-of-band channel.
// 2. Telemetry data send back on the same connection as the
// subscription request.
// 1. Reply data message sent out using out-of-band channel.
message SubscriptionReply {
// Response message to a telemetry subscription creation or
// get request.
SubscriptionResponse response = 1;
// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;
}
// Response message to a telemetry subscription creation or get request.
message SubscriptionResponse {
// Unique id for the subscription on the device. This is
// generated by the device and returned in a subscription
// request or when listing existing subscriptions
uint32 subscription_id = 1;
}
// 2. Telemetry data send back on the same connection as the
// subscription request.
message OpenConfigData {
// router name:export IP address
string system_id = 1;
// line card/RE (slot number)
uint32 component_id = 2;
// PFE (if applicable)
uint32 sub_component_id = 3;
// Path specification for elements of OpenConfig data models
string path = 4;
// Sequence number, monotonically increasing for each
// system_id, component_id, sub_component_id + path.
uint64 sequence_number = 5;
// timestamp (milliseconds since epoch)
uint64 timestamp = 6;
// List of key-value pairs
repeated KeyValue kv = 7;
}
// Simple Key-value, where value could be one of scalar types
message KeyValue {
// Key
string key = 1;
// One of possible values
oneof value {
double double_value = 5;
int64 int_value = 6;
uint64 uint_value = 7;
sint64 sint_value = 8;
bool bool_value = 9;
string str_value = 10;
bytes bytes_value = 11;
}
}
// Message sent for a telemetry subscription cancellation request
message CancelSubscriptionRequest {
// Subscription identifier as returned by the device when
// subscription was requested
uint32 subscription_id = 1;
}
// Reply to telemetry subscription cancellation request
message CancelSubscriptionReply {
// Return code
ReturnCode code = 1;
// Return code string
string code_str = 2;
};
// Result of the operation
enum ReturnCode {
SUCCESS = 0;
NO_SUBSCRIPTION_ENTRY = 1;
UNKNOWN_ERROR = 2;
}
// Message sent for a telemetry get request
message GetSubscriptionsRequest {
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers
uint32 subscription_id = 1;
}
// Reply to telemetry subscription get request
message GetSubscriptionsReply {
// List of current telemetry subscriptions
repeated SubscriptionReply subscription_list = 1;
}
// Message sent for telemetry agent operational states request
message GetOperationalStateRequest {
// Per-subscription_id level operational state can be requested.
//
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers including agent-level
// operational stats
// --- or ---
// If subscription_id is not present then sent only agent-level
// operational stats
uint32 subscription_id = 1;
// Control verbosity of the output
VerbosityLevel verbosity = 2;
}
// Verbosity Level
enum VerbosityLevel {
DETAIL = 0;
TERSE = 1;
BRIEF = 2;
}
// Reply to telemetry agent operational states request
message GetOperationalStateReply {
// List of key-value pairs where
// key = operational state definition
// value = operational state value
repeated KeyValue kv = 1;
}
// Message sent for a data encoding request
message DataEncodingRequest {
}
// Reply to data encodings supported request
message DataEncodingReply {
repeated EncodingType encoding_list = 1;
}
// Encoding Type Supported
enum EncodingType {
UNDEFINED = 0;
XML = 1;
JSON_IETF = 2;
PROTO3 = 3;
}
W tym celu wywołanie usług (RPC TelemetrySubscribe) najpierw muszę przeczytać nagłówek, które mają Subskrypcja a następnie rozpocząć czytanie wiadomości. Teraz, używając Java, mogę połączyć się z usługą, wprowadziłem przechwytywacz, ale gdy drukuję/odzyskuję nagłówek, jest on pusty. Mój kod wywołujący przechwytywania jest poniżej,
ClientInterceptor interceptor = new HeaderClientInterceptor();
originChannel = OkHttpChannelBuilder.forAddress(host, port)
.usePlaintext(true)
.build();
Channel channel = ClientInterceptors.intercept(originChannel, interceptor);
telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);
To jest kod przechwytywania do odczytu danych meta.
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER);
System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));
Zastanawiasz się, czy istnieje inny sposób odczytu meta-danych lub pierwszej wiadomości z identyfikatorem subskrypcji? Wszystko, czego potrzebuję, aby przeczytać pierwszą wiadomość, która ma identyfikator subskrypcji, i zwrócić ten sam identyfikator subskrypcji do serwera, aby można było rozpocząć przesyłanie strumieniowe Mam równoważny kod Pythona przy użyciu tego samego pliku proto i komunikuję się z serwerem za pomocą kodu wymienionego poniżej tylko w celach informacyjnych:
sub_req = SubscribeRequestMsg("host",port)
data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS)
metadata = data_itr.initial_metadata()
if metadata[0][0] == "responseKey":
metainfo = metadata[0][1]
print metainfo
subreply = agent_pb2.SubscriptionReply()
subreply.SetInParent()
google.protobuf.text_format.Merge(metainfo, subreply)
if subreply.response.subscription_id:
SUB_ID = subreply.response.subscription_id
Z powyższego kodu Pythona można łatwo pobrać obiekt z meta danych, nie wiesz, jak odzyskać go za pomocą Java?
Po przeczytaniu metadane wszystko otrzymuję to: Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]})
ale wiem, że jest jeszcze jedna linia z meta danych do tego, co jest
response {
subscription_id: 2
}
Jak można wyodrębnić ostatnią odpowiedź z nagłówka, które mieć identyfikator subskrypcji. Spróbowałem wielu opcji i jestem tutaj zagubiony.
public void start (Listener responseListener, nagłówki metadanych) { ClientCall.Listener słuchacz = new ClientCall.Listener () { \t @Override \t public void onHeaders (nagłówki metadanych) { \t SubscriptionReply s = \t \t \t System.out.println ("Header found1" + nagłówki); \t super.onheaders (nagłówki); \t} }; super.start (słuchacz, nagłówki); } }; Zostało ustanowione zablokowane połączenie, ale dane nie płyną. –
Ammad
@Ammad, nigdy nie wywołasz przekazywanego odbiornika ('responseListener'). Rozszerzanie SimpleForwardingClientCallListener i przekazywanie do obiektu 'responseListener' do jego konstruktora. –
Potrzebuję pomocy. Gdzie można zadzwonić do słuchacza? Widzę, że onHeaders() jest wywoływany z pewnymi początkowymi danymi. – Ammad