Próbuję użyć CloseableHttpAsyncClient
do odczytania z punktu końcowego, przesunięcia łańcucha do obiektu (przy użyciu javax.json), a następnie przekształcenia tablicy na obiekcie w jej poszczególne komponenty:Konwertuj ciąg do tablicy na obiekty w obserwowalnym
CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build();
client.start();
Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client)
.toObservable();
Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> {
String stringVal = new String(bb);
StringReader reader = new StringReader(stringVal);
JsonObject jobj = Json.createReader(reader).readObject();
return jobj.getJsonArray("elements");
})).share();
muszę Array JSON, a następnie filtrować obiektów tablicy:
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
Jak przekonwertować Observable<JsonArray>
w ObservableJsonObject>
?
Ponieważ jest to asynchroniczne, nie mogę użyć forEach do utworzenia jakiejś tablicy do buforowania danych.
UPDATE:
Więc Patrząc pomocą CloseableHttpAsyncClient może nie być najlepszym rozwiązaniem dla co usiłuję osiągnąć - Zdałem sobie sprawę, dziś rano (pod prysznicem wszystkich rzeczy), że staram się proces dane asynchronicznie, aby następnie wywoływać asynchroniczne.
Najlepiej byłoby, gdyby nawiązywanie połączenia z CloseableHttpClient (synchronizacja) i przekazywanie danych do obserwowalnego w celu filtrowania byłoby bardziej idealne (nie potrzebuję pierwszego połączenia do zarządzania więcej niż jednym połączeniem http).
CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build();
StringBuffer result = new StringBuffer();
try {
HttpGet request = new HttpGet(uri);
HttpResponse response = client.execute(request);
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
} catch(ClientProtocolException cpe) { } catch(IOException ioe) { }
StringReader reader = new StringReader(result.toString());
JsonObject jobj = Json.createReader(reader).readObject();
JsonArray elements = jobj.getJsonArray("elements");
List<JsonObject> objects = elements.getValuesAs(JsonObject.class);
Observable<JsonObject> shareable = Observable.from(objects).share();
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
firstStream.subscribe(record -> {
//connect to SOTS/Facebook and store the results
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
secondStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
thirdStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
Można wpaść w kłopoty z tym jednym ponieważ flatMap wymaga danych wejściowych do obsługi przeciwciśnienia i twojego Observable.create stat ement nie obsługuje przeciwciśnienia. Sugerowałbym dla tego bitu użyć Observable.create (new AbstractOnSubscribe() {...}), który oferuje wsparcie przeciwprężne. –
Dla Observable.create można również po prostu zbudować listę i zwrócić Observable.from (list). –
Używając powyższego rozwiązania, musiałem zdematerializować wychodzący Observable (zwraca Observerable> więc jest blisko, ale użycie dematerializacji kończy się return Observable