Mimo że można do tego celu użyć Sink.foreach
(o czym wspomniał Ramon), bezpieczniej i prawdopodobnie szybciej (uruchamiając przekładki równolegle), można użyć mapAsync
Flow
. Problemem, który napotkasz przy użyciu Sink.foreach
jest to, że nie ma on wartości zwracanej. Wstawienie do bazy danych za pomocą slicks db.run
Metoda zwraca Future
, która następnie ucieknie z oparów zwrócony Future[Done]
, który kończy się, gdy kończy się Sink.foreach
.
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") {
def value = column[Int]("value")
def * = value
}
val numbers = TableQuery[Numbers]
val db = Database.forConfig("postgres")
Await.result(db.run(numbers.schema.create), Duration.Inf)
val streamFuture: Future[Done] = Source(0 to 100)
.runWith(Sink.foreach[Int] { (i: Int) =>
db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done"))
})
Await.result(streamFuture, Duration.Inf)
println("stream 1 done")
//// sample 1 output: ////
// stream 1 insert 1 done
// ...
// stream 1 insert 99 done
// stream 1 done <-- stream Future[Done] returned before inserts finished
// stream 1 insert 100 done
Z drugiej strony def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])
Flow
pozwala na uruchamianie wkładki równolegle poprzez paramerter równoległości i przyjmuje funkcję z wartością upstream się do przyszłości jakiegoś typu. To pasuje do naszej funkcji i => db.run(numbers += i)
. Wielką zaletą tego Flow
jest to, że następnie podaje wynik tych Futures
w dół.
val streamFuture2: Future[Done] = Source(0 to 100)
.mapAsync(1) { (i: Int) =>
db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r }
}
.runWith(Sink.ignore)
Await.result(streamFuture2, Duration.Inf)
println("stream 2 done")
//// sample 2 output: ////
// stream 2 insert 1 done
// ...
// stream 2 insert 100 done
// stream 1 done <-- stream Future[Done] returned after inserts finished
celu udowodnienia tej tezy można jeszcze powrócić prawdziwy wynik ze strumienia zamiast Future[Done]
(Z Gotowe reprezentujących jednostki).Strumień ten doda również wyższą wartość paralelizmu i grupowania dla dodatkowej wydajności. *
val streamFuture3: Future[Int] = Source(0 to 100)
.via(Flow[Int].grouped(10)) // Batch in size 10
.mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count
.runWith(Sink.fold(0)(_+_)) // count all inserts and return total
val rowsInserted = Await.result(streamFuture3, Duration.Inf)
println(s"stream 3 done, inserted $rowsInserted rows")
// sample 3 output:
// stream 3 done, inserted 101 rows
- Uwaga: Prawdopodobnie nie będzie widać lepszą wydajność dla takiego małego zbioru danych, ale kiedy mam do czynienia z 1,7 wstawić udało mi się uzyskać najlepszą wydajność na moim komputerze z wielkość partii 1000 i wartość równoległości 8, lokalnie z postgresql. Było to około dwa razy tyle, co nie równolegle. Jak zawsze, gdy masz do czynienia z wynikami, wyniki mogą się różnić i powinieneś mierzyć dla siebie.