2015-07-21 12 views
5

Próbuję użyć CloseableHttpAsyncClient do odczytania z punktu końcowego, przesunięcia łańcucha do obiektu (przy użyciu javax.json), a następnie przekształcenia tablicy na obiekcie w jej poszczególne komponenty:Konwertuj ciąg do tablicy na obiekty w obserwowalnym

CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build(); 

client.start(); 

Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client) 
      .toObservable(); 

Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> { 
     String stringVal = new String(bb); 
     StringReader reader = new StringReader(stringVal); 
     JsonObject jobj = Json.createReader(reader).readObject(); 
     return jobj.getJsonArray("elements"); 
    })).share(); 

muszę Array JSON, a następnie filtrować obiektów tablicy:

Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1")); 
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2")); 
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3")); 

Jak przekonwertować Observable<JsonArray> w ObservableJsonObject>?

Ponieważ jest to asynchroniczne, nie mogę użyć forEach do utworzenia jakiejś tablicy do buforowania danych.

UPDATE:

Więc Patrząc pomocą CloseableHttpAsyncClient może nie być najlepszym rozwiązaniem dla co usiłuję osiągnąć - Zdałem sobie sprawę, dziś rano (pod prysznicem wszystkich rzeczy), że staram się proces dane asynchronicznie, aby następnie wywoływać asynchroniczne.

Najlepiej byłoby, gdyby nawiązywanie połączenia z CloseableHttpClient (synchronizacja) i przekazywanie danych do obserwowalnego w celu filtrowania byłoby bardziej idealne (nie potrzebuję pierwszego połączenia do zarządzania więcej niż jednym połączeniem http).

CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build(); 

    StringBuffer result = new StringBuffer(); 

    try { 
     HttpGet request = new HttpGet(uri); 
     HttpResponse response = client.execute(request); 

     BufferedReader rd = new BufferedReader(
       new InputStreamReader(response.getEntity().getContent())); 

     String line; 
     while ((line = rd.readLine()) != null) { 
      result.append(line); 
     } 
    } catch(ClientProtocolException cpe) { } catch(IOException ioe) { } 

    StringReader reader = new StringReader(result.toString()); 
    JsonObject jobj = Json.createReader(reader).readObject(); 
    JsonArray elements = jobj.getJsonArray("elements"); 

    List<JsonObject> objects = elements.getValuesAs(JsonObject.class); 


    Observable<JsonObject> shareable = Observable.from(objects).share(); 

    Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1")); 
    Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2")); 
    Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3")); 


    firstStream.subscribe(record -> { 
     //connect to SOTS/Facebook and store the results 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     Json.createWriter(baos).writeObject(record); 
     System.out.println(baos.toString()); 
    }); 

    secondStream.subscribe(record -> { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     Json.createWriter(baos).writeObject(record); 
     System.out.println(baos.toString()); 
    }); 

    thirdStream.subscribe(record -> { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     Json.createWriter(baos).writeObject(record); 
     System.out.println(baos.toString()); 
    }); 

Odpowiedz

2

Spróbuj kod:

String myjson = "{\"elements\": [{\"text\":\"Obj1\"},{\"text\":\"Obj2\"}, {\"text\":\"Obj3\"}]}"; 

    Observable.just(myjson) 
      .map(jsonStr -> new StringReader(myjson)) 
      .map(reader -> Json.createReader(reader).readObject()) 
      .map(jobj -> jobj.getJsonArray("elements")) 
      .map(elements -> elements.toArray(new JsonObject[elements.size()])) 
      .flatMap(jsonObjects -> Observable.from(jsonObjects)) 
      .subscribe(
        (jsonObject) -> System.out.println(jsonObject.getString("text")), 
        throwable -> throwable.printStackTrace(), 
        () -> System.out.println("On complete")); 

Wynik:

07-22: 12: 19: 22.362 8032-8032/com.mediamanagment.app I/System.out: Obj1
07-22 12: 19: 22.362 8032-8032/com.mediamanagment.app I/System.out: Obj2
07-22 12: 19: 22.362 8032-8032/com .mediamanagment.app I/System.out: Obj3

UWAGA:
Należy użyć tej zależności:

compile 'org.glassfish:javax.json:1.0.4' 

Zamiast tego:

compile 'javax.json:javax.json-api:1.0' 

Jeśli użyje 'javax.json:javax.json-api:1.0' dostaniesz javax.json.JsonException: Provider org.glassfish.json.JsonProviderImpl not found w kroku :

.map(reader -> Json.createReader(reader).readObject()) 

zawiadomienia, proszę użyć 'org.glassfish:javax.json:1.0.4'

UPDATE: Również zamiast

.flatMap(jsonObjects -> Observable.from(jsonObjects)) 

Można użyć flatMapIterable( ):

.flatMapIterable(jsonObjects -> jsonObjects) 
0

powinny mieć możliwość korzystania z innego flatMap() połączenia zamiast rozmowy map() używasz. Następnie użyj Observable.create() emitować JsonObject s

Observable<JsonObject> shareable = 
     observable 
       .flatMap(response -> response.getContent() 
         .flatMap(bb -> { 
          String stringVal = new String(bb); 
          StringReader reader = new StringReader(stringVal); 
          JsonObject jobj = Json.createReader(reader).readObject(); 
          JsonArray elements = jobj.getJsonArray("elements"); 
          return Observable.create(subscriber -> { 
           for (int i = 0; i < elements.length(); i++) { 
            subscriber.onNext(elements.getJSONObject(i)); 
           } 
           subscriber.onCompleted(); 
          }); 
         })) 
       .share(); 
+2

Można wpaść w kłopoty z tym jednym ponieważ flatMap wymaga danych wejściowych do obsługi przeciwciśnienia i twojego Observable.create stat ement nie obsługuje przeciwciśnienia. Sugerowałbym dla tego bitu użyć Observable.create (new AbstractOnSubscribe() {...}), który oferuje wsparcie przeciwprężne. –

+0

Dla Observable.create można również po prostu zbudować listę i zwrócić Observable.from (list). –

+0

Używając powyższego rozwiązania, musiałem zdematerializować wychodzący Observable (zwraca Observerable > więc jest blisko, ale użycie dematerializacji kończy się return Observable (nie może wydawać się pracować, jak parsować obiekt poprawnie z powrotem) – user2266870