Reading very large files (~ 1 TB) in sequential blocks

2.8k views Asked by At

I need to read a large file in Scala and process it in blocks of k bits (k could be 65536 typically). As a simple example (but not what I want):

file blocks are (f1, f2, ... fk).

I want to compute SHA256(f1)+SHA256(f2)+...+ SHA256(fk)

Such a computation can be done incrementally using only constant storage and the current block without needing other blocks.

What is the best way to read the file? (perhaps something that uses continuations?)

EDIT: The linked question kind of solves the problem but not always, as the file I am looking at contains binary data.

2

There are 2 answers

0
Brian On

Here is an approach using Akka Streams. This uses constant memory and and can process the file chunks as they are read.

See the "Streaming File IO" at the bottom of this page for more info. http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-io.html

Start with a simple build.sbt file:

scalaVersion := "2.11.6"

libraryDependencies ++= Seq(
        "com.typesafe.akka" %% "akka-stream-experimental" % "1.0-RC3"
)

The interesting parts are the Source, Flow, and Sink. The Source is a SynchronousFileSource that reads in a large file with a chunk size of 65536. A ByteString of chunk size is emitted from the Source and consumed by a Flow which calculates a SHA256 hash for each chunk. Lastly, the Sink consumes the output from the Flow and prints the byte arrays out. You'll want to convert these and sum them using a fold to get a total sum.

import akka.stream.io._
import java.io.File
import scala.concurrent.Future
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import java.security.MessageDigest

object LargeFile extends App{
  implicit val system = ActorSystem("Sys")
  import system.dispatcher
  implicit val materializer = ActorFlowMaterializer()

   val file = new File("<path to large file>")

   val fileSource = SynchronousFileSource(file, 65536)

   val shaFlow = fileSource.map(chunk => sha256(chunk.toString))

   shaFlow.to(Sink.foreach(println(_))).run//TODO - Convert the byte[] and sum them using fold

   def sha256(s: String) = {
     val  messageDigest = MessageDigest.getInstance("SHA-256")
     messageDigest.digest(s.getBytes("UTF-8"))
   }
}

BYTE ARRAYS!

> run
[info] Running LargeFile 
[B@3d0587a6
[B@360cc296
[B@7fbb2192
...
1
Biswanath On

Creating the digest using the stream continually, which I believe produces a iterator

import java.File
import java.FileInputStream
import java.security.MessageDigest

val file = new File("test.in")
val is = new FileInputStream(file)

val md = MessageDigest.getInstance("SHA-256")

val bytes = Array.fill[Byte](65536)(0)

Stream
    .continually((is.read(bytes),bytes))
    .takeWhile(_._1 != -1)
    .foreach{ x => md.update(x._2,0,x._1) }

println(md.digest())
// prinln(md.digest().map("%02X" format _).mkString) // if you want hex string