Overview
I have a problem in writing bytes in read method Data Source Media 3
My bytes are stored on top of each other something like this (I need write sequentially) I Have Custom Scope with IO Dispatcher
my WriteMsg class
for sending data for write on the disk
class WriteMsg (val byteArray: ByteArray, val off: Int, val length: Int, val fileName: String = "")
my channel in My CustomDataSource class
class NetworkHaavangDataSourceStrategy(
private val url: Uri,
private val connectTimeout: Int,
private val readTimeout: Int,
private val defaultRequestProperties: HttpDataSource.RequestProperties,
private val cacheSrc: String,
private val storageSrc: String,
private val context: Context,
private val coroutineScope: CoroutineScope,
): BaseDataSource(true) {
private var dataSpec: DataSpec? = null
private var opened: Boolean = false
private var bytesToRead: Long = 0
private var bytesRead: Long = 0
private var connection: HttpsURLConnection? = null
private var responseCode: Int = 0
private var responseMessage: String? = null
private var responseByteStream: InputStream? = null
private var cacheOutputStream: OutputStream? = null
// Channel Object
private var cacheChannel = Channel<WriteMsg> (capacity = Channel.UNLIMITED)
private fun createOrGetOutputStreamSegment(): OutputStream {
val file = File(cacheSrc , "some name auto generate")
try {
if (!file.exists()) {
val result = file.createNewFile()
}
} catch (err: IOException) {
Log.i("Test" , "Exception createNewFile : ${err.message}")
}
return FileOutputStream(file)
}
// starting consumer in open method below
private fun writeReceiverChannel() {
coroutineScope.launch(Dispatchers.IO) {
cacheChannel.consumeEach {
cacheOutputStream?.write(it.byteArray , it.off , it.length)
}
}
}
// produce msg for sending (This method is executed every time the read method is called )
private fun writeToSegmentFile(buffer: ByteArray, offset: Int, length: Int) {
coroutineScope.launch(Dispatchers.IO) {
cacheChannel.send(WriteMsg(buffer , offset , length))
}
}
}
my open method
override fun open(dataSpec: DataSpec): Long {
this.dataSpec = dataSpec
bytesToRead = 0
bytesRead = 0
transferStarted(dataSpec)
val connection: HttpURLConnection
try {
this.connection = makeRequest(dataSpec)
connection = this.connection!!
responseCode = connection.responseCode
responseMessage = connection.responseMessage
} catch (err: IOException) {
closeConnectionQuietly()
throw HttpDataSource.HttpDataSourceException.createForIOException(
err, dataSpec, HttpDataSource.HttpDataSourceException.TYPE_OPEN
)
}
// Check for a valid response code.
// Check for a valid Content Type
// Skip and Some Manage Position
opened = true
transferStarted(dataSpec)
cacheOutputStream = createOrGetOutputStreamSegment()
writeReceiverChannel() // start our channel consumer
return bytesToRead
}
my read method
override fun read(buffer: ByteArray, offset: Int, length: Int): Int {
var readLength = length
if (readLength == 0) {
return 0
}
if (bytesToRead != C.LENGTH_UNSET.toLong()) {
val bytesRemaining = bytesToRead - bytesRead
if (bytesRemaining == 0L) {
close()
return C.RESULT_END_OF_INPUT
}
readLength = Integer.min(readLength, bytesRemaining.toInt())
}
val read = responseByteStream!!.read(buffer , offset , readLength)
if (read == -1) {
close()
return C.RESULT_END_OF_INPUT
}
writeToSegmentFile(buffer , offset , read) // our producer
bytesRead += read.toLong()
bytesTransferred(read)
return read
}
I have tested these
- I tested With capacity 1 and unlimited and its not working (channel capacity)
- When I used normal Coroutine without any channel with Synchronized its not working
- When I used without any coroutine and any channel it's working (wrote in read method: it's sync obviously)
- When I used LinkedBlockingQueue in Java It's working (actually with capacity 1)