How to stream downloads using Scalaj-Http and Hadoop HttpFs

518 views Asked by At

My question is how to use a Buffered stream when using Scalaj-Http.

I have written the following code which is a complete working example that will download a file from Hadoop HDFS using HttpFS. My goal is to handle very large files and this will require using a buffered approach with multiple I/O writes to a local file.

I have not been able to find documentation on how to use a stream with the ScalaJ-Http interface. I am interested in an example for both download and upload that can handle large multi GB files. My code below uses in memory buffering which is appropriate for only prototyping.

import scalaj.http._
import ujson.Js
import java.text.SimpleDateFormat
import java.net.SocketTimeoutException
import java.io.InputStream
import java.io.BufferedOutputStream
import java.io.FileOutputStream
import java.io.FileNotFoundException

object CopyFileFromHdfs {
    def main(args: Array[String]) {
        val host = "hadoop.example.com"
        val user = "root"
        var dstFile = ""
        var srcFile = ""
        val operation = "OPEN"
        val port = 14000

        System.setProperty("sun.net.http.allowRestrictedHeaders", "true")

        if (args.length != 2)
        {
            println("Error: Missing or too many arguments")
            println("Usage: CopyFileFromHdfs <srcfile> <dstfile>")

            System.exit(1)
        }

        srcFile = args(0)
        dstFile = args(1)


        // ********************************************************************************
        // Create the URL string that we will use to connect to Hadoop HttpFS
        //
        // The string will look like this:
        // http://[email protected]:14000/webhdfs/v1/?user.name=root&op=OPEN
        // ********************************************************************************

        val url = makeHttpfsUrl(host, user, srcFile, operation, port)

        // ********************************************************************************
        // Using HTTP, call the HttpFS server
        //
        // Exceptions:
        //  java.net.SocketTimeoutException
        //  java.net.UnknownHostException
        //  java.lang.IllegalArgumentException
        // Remote Exceptions:
        //  java.io.FileNotFoundException
        //  com.sun.jersey.api.NotFoundException
        // ********************************************************************************

        try {
            var response = Http(url)
                .timeout(connTimeoutMs = 1000, readTimeoutMs = 5000)
                .asBytes

            // ********************************************************************************
            // Check for an error. We are expecting an HTTP 200 response
            // ********************************************************************************

            if (response.code < 200 || response.code > 299)
            {
                val data = ujson.read(response.body)

                printf("Error: Cannot download file: %s\n", dstFile)
                println(removeQuotes(data("RemoteException")("message").str))
                println(removeQuotes(data("RemoteException")("exception").str))

                System.exit(1)
            }

            val is = new FileOutputStream(dstFile)
            val bs = new BufferedOutputStream(is)

            bs.write(response.body, 0, response.body.length)

            bs.close()
            is.close()
        } catch {
            case e: SocketTimeoutException => {
                printf("Error: Cannot connect to host %s on port %d\n", host, port)
                println(e)
                System.exit(1);
            }
            case e: Exception => {
                printf("Error (other): Cannot download file %s\n", srcFile)
                println(e)
                System.exit(1);
            }
        }

        printf("Success: File downloaded. %s -> %s\n", srcFile, dstFile)

        System.exit(0)
    }

    // ********************************************************************************
    // The Json strings are surrounded by quotes.
    // This function will remove them (only at the start and the end).
    // ********************************************************************************

    def removeQuotes(str: String): String = {
        // This expression will delete quotes at the beginning and end of a string
        return str.replaceAll("^\"|\"$", "");
    }

    // ********************************************************************************
    // Create the URL string that we will use to connect to Hadoop HttpFS
    //
    // The string will look like this:
    // http://[email protected]:14000/webhdfs/v1/?user.name=root&op=LISTSTATUS
    // ********************************************************************************

    def makeHttpfsUrl(
            host: String,
            user: String,
            hdfsPath: String,
            operation: String,
            port: Integer) : String = {

        var url = "http://" + user + "@" + host + ":" + port.toString + "/webhdfs/v1"

        if (hdfsPath(0) == '/')
            url += hdfsPath
        else
            url += "/" + hdfsPath

        url += "?user.name=" + user + "&op=" + operation

        return url
    }
}
0

There are 0 answers