Android Media 3 Custom Data Source - Writing bytes to a file with Kotlin Coroutine Channel

86 views Asked by At

Overview

I have a problem in writing bytes in read method Data Source Media 3

My bytes are stored on top of each other something like this (I need write sequentially) I Have Custom Scope with IO Dispatcher

my WriteMsg class

for sending data for write on the disk

class WriteMsg (val byteArray: ByteArray, val off: Int, val length: Int, val fileName: String = "")

my channel in My CustomDataSource class

class NetworkHaavangDataSourceStrategy(
    private val url: Uri,
    private val connectTimeout: Int,
    private val readTimeout: Int,
    private val defaultRequestProperties: HttpDataSource.RequestProperties,
    private val cacheSrc: String,
    private val storageSrc: String,
    private val context: Context,
    private val coroutineScope: CoroutineScope,
): BaseDataSource(true)  {

    private var dataSpec: DataSpec? = null
    private var opened: Boolean = false  
    private var bytesToRead: Long = 0
    private var bytesRead: Long = 0
    private var connection: HttpsURLConnection? = null
    private var responseCode: Int = 0
    private var responseMessage: String? = null

    private var responseByteStream: InputStream? = null
    private var cacheOutputStream: OutputStream? = null

    // Channel Object 
    private var cacheChannel = Channel<WriteMsg> (capacity = Channel.UNLIMITED)
   

    private fun createOrGetOutputStreamSegment(): OutputStream {
        val file = File(cacheSrc ,  "some name auto generate")
        try {
            if (!file.exists()) {
                val result = file.createNewFile()
            }

        } catch (err: IOException) {
            Log.i("Test" , "Exception createNewFile : ${err.message}")
        }
        return FileOutputStream(file)
    }

    // starting consumer in open method below 
    private fun writeReceiverChannel() {
        coroutineScope.launch(Dispatchers.IO) {
             cacheChannel.consumeEach {
                cacheOutputStream?.write(it.byteArray , it.off , it.length)
           }
        }
    }

   // produce msg for sending (This method is executed every time the read method is called ) 
    private fun writeToSegmentFile(buffer: ByteArray, offset: Int, length: Int) {
         coroutineScope.launch(Dispatchers.IO) {
              cacheChannel.send(WriteMsg(buffer , offset , length))
         }
    }

}

my open method


override fun open(dataSpec: DataSpec): Long {
        
        this.dataSpec = dataSpec
        bytesToRead = 0
        bytesRead = 0

        transferStarted(dataSpec)
        val connection: HttpURLConnection
        try {
            this.connection = makeRequest(dataSpec)
            connection = this.connection!!
            responseCode = connection.responseCode
            responseMessage = connection.responseMessage
        } catch (err: IOException) {
            closeConnectionQuietly()
            throw HttpDataSource.HttpDataSourceException.createForIOException(
                err, dataSpec, HttpDataSource.HttpDataSourceException.TYPE_OPEN
            )
        }

        //   Check for a valid response code.
        //   Check for a valid Content Type
        //   Skip and Some Manage Position 

        opened = true

        transferStarted(dataSpec)
        
        cacheOutputStream = createOrGetOutputStreamSegment()
        writeReceiverChannel() // start our channel consumer 
        return bytesToRead
    }

my read method

override fun read(buffer: ByteArray, offset: Int, length: Int): Int {
        var readLength = length
        if (readLength == 0) {
            return 0
        }

        if (bytesToRead != C.LENGTH_UNSET.toLong()) {
            val bytesRemaining = bytesToRead - bytesRead
            if (bytesRemaining == 0L) {                
                close()
                return C.RESULT_END_OF_INPUT
            }
            readLength = Integer.min(readLength, bytesRemaining.toInt())
        }

        val read = responseByteStream!!.read(buffer , offset , readLength)
        if (read == -1) {          
            close()
            return C.RESULT_END_OF_INPUT
        }
        writeToSegmentFile(buffer , offset , read) // our producer 
        bytesRead += read.toLong()
        bytesTransferred(read)
        return read
}

I have tested these

  • I tested With capacity 1 and unlimited and its not working (channel capacity)
  • When I used normal Coroutine without any channel with Synchronized its not working
  • When I used without any coroutine and any channel it's working (wrote in read method: it's sync obviously)
  • When I used LinkedBlockingQueue in Java It's working (actually with capacity 1)
0

There are 0 answers