2015-06-10 23 views
6

Potrzebuję przeczytać duży plik w Scali i przetworzyć go w blokach k bitów (zazwyczaj k może być 65536). Jako prosty przykład (ale nie tego, czego chcę):Odczytywanie bardzo dużych plików (~ 1 TB) w blokach sekwencyjnych

bloki plików to (f1, f2, ... fk).

Chcę obliczyć SHA256(f1)+SHA256(f2)+...+ SHA256(fk)

Takie obliczenia mogą być wykonywane przy użyciu tylko stopniowo stały przechowywanie i aktualny blok bez potrzeby innych bloków.

Jaki jest najlepszy sposób na odczytanie pliku? (być może coś, co używa kontynuacji?)

EDYCJA: Połączony rodzaj pytania rozwiązuje problem, ale nie zawsze, ponieważ plik, którego szukam, zawiera dane binarne.

+0

@Christian Nie, to nie jest duplikatem pytanie przywołane. – Biswanath

+0

Nie mogę zrozumieć, jak jest to duplikat z przytoczonym pytaniem. Inne pytanie dotyczy pliku csv opartego na tekście, to pytanie dotyczy "pliku CSV opartego na braku tekstu". Odpowiedzi na drugie pytanie nie powinny mieć zastosowania. Naprawdę wątpię, kto kiedykolwiek zaznaczył to jako duplikat, jeśli w pełni przeczyta oba pytania. – Biswanath

Odpowiedz

4

Oto podejście wykorzystujące strumienie Akka. Wykorzystuje to stałą pamięć i może przetwarzać porcje plików podczas ich odczytu.

Zobacz "Plik strumieniowy IO" na dole tej strony, aby uzyskać więcej informacji. http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-io.html

zacząć od prostego build.sbt pliku:

scalaVersion := "2.11.6" 

libraryDependencies ++= Seq(
     "com.typesafe.akka" %% "akka-stream-experimental" % "1.0-RC3" 
) 

Interesującymi częściami są Source, Flow i Sink. Numer to SynchronousFileSource, który odczytuje duży plik o wielkości porcji 65536. Rozmiar ByteString wielkości fragmentu jest emitowany z Source i jest zużywany przez Flow, który oblicza wartość skrótu SHA256 dla każdej porcji. Wreszcie, Sink zużywa dane wyjściowe z Flow i wypisuje tablice bajtów. Będziesz chciał je skonwertować i zsumować za pomocą fold, aby uzyskać łączną kwotę.

import akka.stream.io._ 
import java.io.File 
import scala.concurrent.Future 
import akka.stream.scaladsl._ 
import akka.actor.ActorSystem 
import akka.stream.ActorFlowMaterializer 
import java.security.MessageDigest 

object LargeFile extends App{ 
    implicit val system = ActorSystem("Sys") 
    import system.dispatcher 
    implicit val materializer = ActorFlowMaterializer() 

    val file = new File("<path to large file>") 

    val fileSource = SynchronousFileSource(file, 65536) 

    val shaFlow = fileSource.map(chunk => sha256(chunk.toString)) 

    shaFlow.to(Sink.foreach(println(_))).run//TODO - Convert the byte[] and sum them using fold 

    def sha256(s: String) = { 
    val messageDigest = MessageDigest.getInstance("SHA-256") 
    messageDigest.digest(s.getBytes("UTF-8")) 
    } 
} 

BYTE ARRAYS!

> run 
[info] Running LargeFile 
[[email protected] 
[[email protected] 
[[email protected] 
... 
0

Tworzenie likwidacji pomocą strumienia nieustannie, które wierzę, tworzy iterator

import java.File 
import java.FileInputStream 
import java.security.MessageDigest 

val file = new File("test.in") 
val is = new FileInputStream(file) 

val md = MessageDigest.getInstance("SHA-256") 

val bytes = Array.fill[Byte](65536)(0) 

Stream 
    .continually((is.read(bytes),bytes)) 
    .takeWhile(_._1 != -1) 
    .foreach{ x => md.update(x._2,0,x._1) } 

println(md.digest()) 
// prinln(md.digest().map("%02X" format _).mkString) // if you want hex string 
+0

cala.collection.immutable.Stream jest pamiętany, który będzie czytał całość w pamięci (zgodnie z http://stackoverflow.com/questions/4255021/how-do-i-read-a-large-csv-file- with-scala-stream-class # answer-4255338) – mikebridge