2015-12-14 11 views
7

Czy możliwe jest dynamicznie deserializowania się zewnętrznego, o nieznanej długości ByteString strumienia z Akka HTTP do obiektów domeny?Akka HTTP Streaming JSON deserializacji


Kontekst

nazywam nieskończenie długo HTTP końcowy, który wysyła JSON Array że rośnie:

[ 
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, 
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, 
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, 
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, 
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, 
    ... 
] <- Never sees the daylight 
+0

Dla wyjaśnienia, próbujesz odebrać ten strumień JSON lub transmitować ten strumień? Jeśli transmitujesz, jaka jest twoja wewnętrzna reprezentacja (np. Iterator, scala Stream, ...)? Czy komunikacja musi być tablicą lub może być strumieniem pojedynczych obiektów domeny? –

+0

@RamonJRomeroyVigil Ten strumień będzie całkowicie zewnętrzny. – Martijn

+0

W twoim konkretnym przypadku możesz poczekać na zamknięcie '}' i wywołać deserializator do wyboru dla tekstu pomiędzy. Wymaga to pewnych operacji i prawdopodobnie buforowania na ByteString, ale są one dość proste. –

Odpowiedz

0

myślę, że play-iteratees-extras musi pomóc. Ta biblioteka pozwala analizować Json za pomocą wzoru Enumerator/Iteratee i, oczywiście, nie czeka na otrzymanie wszystkich danych.

Na przykład, nie buduj "nieskończonego" strumienia bajtów, który reprezentuje "nieskończoną" tablicę Json.

import play.api.libs.iteratee.{Enumeratee, Enumerator, Iteratee} 

var i = 0 
var isFirstWas = false 

val max = 10000 

val stream = Enumerator("[".getBytes) andThen Enumerator.generateM { 
    Future { 
    i += 1 
    if (i < max) { 
     val json = Json.stringify(Json.obj(
     "prop" -> Random.nextBoolean(), 
     "prop2" -> Random.nextBoolean(), 
     "prop3" -> Random.nextInt(), 
     "prop4" -> Random.alphanumeric.take(5).mkString("") 
    )) 

     val string = if (isFirstWas) { 
     "," + json 
     } else { 
     isFirstWas = true 
     json 
     } 


     Some(Codec.utf_8.encode(string)) 
    } else if (i == max) Some("]".getBytes) // <------ this is the last jsArray closing tag 
    else None 

    } 
} 

OK, ta wartość zawiera jsArray z 10000 (lub więcej) obiektów. Pozwala zdefiniować klasę case, która będzie zawierała dane każdego obiektu w naszej tablicy.

case class Props(prop: Boolean, prop2: Boolean, prop3: Int, prop4: String) 

Teraz napisać parser, który będzie analizować każdy artykuł

import play.extras.iteratees._  
import JsonBodyParser._ 
import JsonIteratees._ 
import JsonEnumeratees._ 

val parser = jsArray(jsValues(jsSimpleObject)) ><> Enumeratee.map { json => 
    for { 
    prop <- json.\("prop").asOpt[Boolean] 
    prop2 <- json.\("prop2").asOpt[Boolean] 
    prop3 <- json.\("prop3").asOpt[Int] 
    prop4 <- json.\("prop4").asOpt[String] 
    } yield Props(prop, prop2, prop3, prop4) 
} 

Proszę zobaczyć doc dla jsArray, jsValues i jsSimpleObject. Aby zbudować producent wynik:

val result = stream &> Encoding.decode() ><> parser 

Encoding.decode() z JsonIteratees pakiet dekodowania bajtów jako CharString. result wartość ma typ Enumerator[Option[Item]] i można zastosować pewne iteratee do tego modułu wyliczającego, aby rozpocząć proces analizowania.

W sumie nie wiem, jak odbierasz bajty (rozwiązanie zależy w dużym stopniu od tego), ale myślę, że pokazują jedno z możliwych rozwiązań twojego problemu.

0

Miałem bardzo podobny problem, próbując przetworzyć strumień Twitter (ciąg nieskończony) w obiekt domeny. że rozwiązany za pomocą Json4s w następujący sposób:

case class Tweet(username: String, geolocation: Option[Geo]) 
case class Geo(latitude: Float, longitude: Float) 
object Tweet{ 
    def apply(s: String): Tweet = { 
     parse(StringInput(s), useBigDecimalForDouble = false, useBigIntForLong = false).extract[Tweet] 
    } 
} 

Wtedy tylko bufor strumienia i mapowane do tweeta:

val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8")) 
var line = reader.readLine() 
while(line != null){ 
    store(Tweet.apply(line)) 
    line = reader.readLine() 
} 

Json4s ma pełną obsługę nad opcji (lub niestandardowych obiektów wewnątrz obiekt, na przykład Geo w przykładzie). Dlatego możesz ustawić opcję taką jak ja, a jeśli pole nie jest w Jsonie, zostanie ustawione na Brak.

Mam nadzieję, że pomoże!