2016-12-13 64 views
8

Jestem całkiem początkującym dla Akka Streams i Akka HTTP.Jak korzystać z HTTP Akka do generowania treści przez strumień wyjściowy

Chciałbym wygenerować prosty serwer HTTP, który może wygenerować plik zip z zawartości folderu i wysłać go do klienta.

Plik org.zeroturnaround.zip.ZipUtil bardzo ułatwia tworzenie pliku zip, ale potrzebuje on outputStream.

Oto moje rozwiązanie (w języku Scala):

  val os = new ByteArrayOutputStream() 
      ZipUtil.pack(myFolder, os) 
      HttpResponse(entity = HttpEntity(
       MediaTypes.`application/zip`, 
       os.toByteArray)) 

To rozwiązanie działa, ale zachowuje całą zawartość do pamięci, więc nie jest skalowalne.

Myślę, że kluczem do rozwiązania tego problemu jest użycie w tym:

val source = StreamConverters.asOutputStream() 

ale nie wiem, jak go używać. :-(

Każda pomoc proszę?

Odpowiedz

3

miałem ten sam problem. W celu uczynienia go przeciwciśnienie kompatybilne miałem napisać sztuczną InputStream która później przekształca się Source poprzez StreamConverters.fromInputStream(() => input) co z kolei powrocie z Akka -http DSL complete dyrektywa.

Oto co napisałem.

import java.io.{File, IOException, InputStream} 
import java.nio.charset.StandardCharsets 
import java.time.LocalDate 
import java.time.format.DateTimeFormatter 

import org.apache.commons.compress.archivers.sevenz.{SevenZArchiveEntry, SevenZFile} 

import scala.annotation.tailrec 
import scala.util.{Failure, Success, Try} 

class DownloadStatsZipReader(path: String, password: String) extends InputStream { 

    private val (archive, targetDate) = { 
    val inputFile = new SevenZFile(new File(path), password.getBytes(StandardCharsets.UTF_16LE.displayName())) 

    @tailrec 
    def findValidEntry(): Option[(LocalDate, SevenZArchiveEntry)] = 
     Option(inputFile.getNextEntry) match { 
     case Some(entry) => 
      if (!entry.isDirectory) { 
      val parts = entry.getName.toLowerCase.split("\\.(?=[^\\.]+$)") 
      if (parts(1) == "tab" && entry.getSize > 0) 
       Try(LocalDate.parse(parts(0), DateTimeFormatter.ISO_LOCAL_DATE)) match { 
       case Success(localDate) => 
        Some(localDate -> entry) 
       case Failure(_) => 
        findValidEntry() 
       } 
      else 
       findValidEntry() 
      } else 
      findValidEntry() 
     case None => None 
     } 

    val (date, _) = findValidEntry().getOrElse { 
     throw new RuntimeException(s"$path has no files named as `YYYY-MM-DD.tab`") 
    } 
    inputFile -> date 
    } 

    private val buffer = new Array[Byte](1024) 
    private var offsetBuffer: Int = 0 
    private var sizeBuffer: Int = 0 

    def getTargetDate: LocalDate = targetDate 

    override def read(): Int = 
    sizeBuffer match { 
     case -1 => 
     -1 
     case 0 => 
     loadNextChunk() 
     read() 
     case _ => 
     if (offsetBuffer < sizeBuffer) { 
      val result = buffer(offsetBuffer) 
      offsetBuffer += 1 
      result 
     } else { 
      sizeBuffer = 0 
      read() 
     } 
    } 

    @throws[IOException] 
    override def close(): Unit = { 
    archive.close() 
    } 

    private def loadNextChunk(): Unit = try { 
    val bytesRead = archive.read(buffer) 
    if (bytesRead >= 0) { 
     offsetBuffer = 0 
     sizeBuffer = bytesRead 
    } else { 
     offsetBuffer = -1 
     sizeBuffer = -1 
    } 
    } catch { 
    case ex: Throwable => 
     ex.printStackTrace() 
     throw ex 
    } 
} 

Jeśli znajdziesz błędy w moim kodu proszę dać mi znać.

9

Spróbuj

val byteSource: Source[ByteString, Unit] = StreamConverters.asOutputStream() 
    .mapMaterializedValue(os => ZipUtil.pack(myFolder, os)) 
HttpResponse(entity = HttpEntity(
      MediaTypes.`application/zip`, 
      byteSource)) 

Można mieć tylko dostęp do OutputStream raz źródłem dostaje zmaterializowanej, który może nie zdarzyć się natychmiast. Teoretycznie źródło mogło się również zmaterializować wiele razy, więc powinieneś być w stanie sobie z tym poradzić.

+2

Chciałbym wiedzieć przed :) – expert

+1

pisząc ten sam także był dość pouczające i zabawne ... –

+0

wygląda na to łatwe, ale to nie w moim przypadku: 'java.lang.IllegalStateException: jeszcze nie zainicjowany: tylko SetHandler jest dozwolone w konstruktorze GraphStageLogic ' –