I am trying to creat a live streaming video app, which uses websocket connection for retrieving json object containing a base64 encoded video chunk data. I am decoding that using it in CustomDataSource and utilizing it in ProgressiveMediaSource for exoplayer to play the live stream, but My exoplayer goes into buffering mode or what ! it just doesn't play the video. I do not see any evident error too in the logs. I am not sure if I have issue in DataSource or with configuration of exoplayer.

Below I have mentioned all the code I have done for this for creating a POC.

App Level build.gradle:

    dependencies {
    ...
      //exoplayer
    val mediaVersion = "1.1.1"
    implementation("androidx.media3:media3-exoplayer:$mediaVersion")
    implementation("androidx.media3:media3-ui:$mediaVersion")
    implementation("androidx.media3:media3-exoplayer-hls:$mediaVersion")

    //http
    implementation ("io.reactivex.rxjava2:rxjava:2.2.21") 
    implementation ("com.squareup.retrofit2:retrofit:2.9.0")
    implementation ("com.squareup.retrofit2:converter-gson:2.9.0")
    implementation ("com.squareup.okhttp3:okhttp:4.9.3")
    implementation ("com.squareup.okhttp3:logging-interceptor:4.9.3")
}

My layout file as below:

<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    android:background="#FFFFFF"
    android:orientation="vertical">
    <androidx.media3.ui.PlayerView
        android:id="@+id/video_view"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        app:use_controller="true"
        app:shutter_background_color="@android:color/transparent"
        app:layout_constraintStart_toStartOf="parent"
        app:layout_constraintTop_toTopOf="parent"
        app:layout_constraintEnd_toEndOf="parent"
        app:layout_constraintBottom_toBottomOf="parent"/>
</androidx.constraintlayout.widget.ConstraintLayout>

My StreamingActivity class:

import android.annotation.SuppressLint
import android.app.Activity
import android.os.Build
import android.os.Bundle
import android.util.Log
import androidx.core.view.WindowCompat
import androidx.core.view.WindowInsetsCompat
import androidx.core.view.WindowInsetsControllerCompat
import androidx.media3.common.MediaItem
import androidx.media3.common.MimeTypes
import androidx.media3.common.PlaybackException
import androidx.media3.common.Player
import androidx.media3.common.util.UnstableApi
import androidx.media3.exoplayer.ExoPlayer
import androidx.media3.exoplayer.source.MediaSource
import androidx.media3.exoplayer.source.ProgressiveMediaSource
import androidx.media3.ui.PlayerView
import com.livestreamdemo.R
import com.livestreamdemo.ui.helper.CustomMediaExtractor
import com.livestreamdemo.ui.helper.UtilHelper
import com.livestreamdemo.ui.helper.WssDSFactory
import com.livestreamdemo.ui.helper.WssStreamDataSource


class StreamingActivity : Activity() {

    private var player: Player? = null
    private var mPlayWhenReady = true
    private lateinit var videoView: PlayerView
    private lateinit var wsDataSource: WssStreamDataSource



    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_streaming)
        videoView = findViewById<PlayerView>(R.id.video_view)
    }

    @androidx.annotation.OptIn(androidx.media3.common.util.UnstableApi::class)
    private fun initPlayer() {

        player = ExoPlayer.Builder(this).build()
            .also { exoPlayer ->

                videoView.player = exoPlayer

                // Update the track selection parameters to only pick standard definition tracks
                exoPlayer.trackSelectionParameters = exoPlayer.trackSelectionParameters
                    .buildUpon()
                    .setPreferredVideoMimeType(MimeTypes.VIDEO_MP4)//see if this actually needed
                    .setMaxVideoSizeSd()
                    .build()
                exoPlayer.playWhenReady = mPlayWhenReady

                // Create a Regular media source pointing to a playlist uri.
                wsDataSource = WssStreamDataSource()
                val factory = ProgressiveMediaSource.Factory(
                    WssDSFactory(wsDataSource),
                    CustomMediaExtractor()
                )


                val mediaItem = MediaItem.Builder().setUri(UtilHelper.wsUrl).setMimeType(
                    MimeTypes.VIDEO_MP4
                ).build()
                val progressiveMediaSource: MediaSource = factory.createMediaSource(mediaItem)

                // Set the media source to be played.
                exoPlayer.setMediaSource(progressiveMediaSource)
                // Prepare the player.

                exoPlayer.addListener(playerCallBacks)
                exoPlayer.prepare()
                exoPlayer.play()
            }

    }


    public override fun onStart() {
        super.onStart()
        initPlayer()
    }

    public override fun onResume() {
        super.onResume()
        hideSystemUi()
    }

    public override fun onPause() {
        super.onPause()
        if (Build.VERSION.SDK_INT <= 23) {
            releasePlayer()
        }
    }

    public override fun onStop() {
        super.onStop()
        if (Build.VERSION.SDK_INT > 23) {
            releasePlayer()
        }
    }


    private fun releasePlayer() {
        player?.let { player ->
            /* playbackPosition = player.currentPosition
             mediaItemIndex = player.currentMediaItemIndex
             playWhenReady = player.playWhenReady*/
            player.removeListener(playerCallBacks)
            player.release()
        }
        player = null
    }


    @SuppressLint("InlinedApi")
    private fun hideSystemUi() {
        WindowCompat.setDecorFitsSystemWindows(window, false)
        WindowInsetsControllerCompat(window, videoView).let { controller ->
            controller.hide(WindowInsetsCompat.Type.systemBars())
            controller.systemBarsBehavior =
                WindowInsetsControllerCompat.BEHAVIOR_SHOW_TRANSIENT_BARS_BY_SWIPE
        }
    }


    private val playerCallBacks = @UnstableApi object : Player.Listener {
        override fun onPlayerError(error: PlaybackException) {
            super.onPlayerError(error)
            Log.e("playerError", error.message.toString())
        }

        override fun onPlayerStateChanged(playWhenReady: Boolean, playbackState: Int) {
            val stateString: String = when (playbackState) {
                ExoPlayer.STATE_IDLE -> "ExoPlayer.STATE_IDLE      -"
                ExoPlayer.STATE_BUFFERING -> "ExoPlayer.STATE_BUFFERING -"
                ExoPlayer.STATE_READY -> "ExoPlayer.STATE_READY     -"
                ExoPlayer.STATE_ENDED -> "ExoPlayer.STATE_ENDED     -"
                else -> "UNKNOWN_STATE             -"
            }

            Log.d("onPlayerStateChanged", "changed state to $stateString")
        }
    }
}

My CustomDataSource class:

import android.net.Uri
import androidx.media3.common.C
import androidx.media3.common.util.UnstableApi
import androidx.media3.datasource.BaseDataSource
import androidx.media3.datasource.DataSource
import androidx.media3.datasource.DataSpec
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.WebSocket
import java.lang.Integer.min


@androidx.annotation.OptIn(androidx.media3.common.util.UnstableApi::class)
class WssStreamDataSource : BaseDataSource(true) {

    fun getUrl(): String{
        return wsUrl.replace("proto","json")
    }

    private lateinit var httpClient: OkHttpClient
    private lateinit var dataStreamCollector: WssDataStreamCollector

    private var webSocketClient: WebSocket? = null

    private var currentByteStream: ByteArray? = null
    private var currentPosition = 0;
    private var remainingBytes = 0;

    init {
        httpClient=initAndGetHttpClient()
        dataStreamCollector= WssDataStreamCollector()
    }

    override fun open(dataSpec: DataSpec): Long {
        // Form the request and open the socket.
        // Provide the listener
        // which collects the data for us (Previous class).

           webSocketClient = httpClient.newWebSocket(
            Request.Builder().apply {
                dataSpec.httpRequestHeaders.forEach { entry ->
                    addHeader(entry.key, entry.value)
                }
            }.url(dataSpec.uri.toString()).build(),
            dataStreamCollector)

        return C.LENGTH_UNSET.toLong() // Return -1 as the size is unknown (streaming)
    }

    override fun read(buffer: ByteArray, offset: Int, length: Int): Int {
        // return 0 (nothing read) when no data present...
        if (currentByteStream == null && !dataStreamCollector.canStream()) {
            return 0
        }

        // parse one (data) ByteString at a time.
        // reset the current position and remaining bytes
        // for every new data
        if (currentByteStream == null) {
            currentByteStream = dataStreamCollector.getNextStream().toByteArray()
            currentPosition = 0
            remainingBytes = currentByteStream?.size ?: 0
        }

        val readSize = min(length, remainingBytes)

        currentByteStream?.copyInto(buffer, offset, currentPosition, currentPosition + readSize)
        currentPosition += readSize
        remainingBytes -= readSize


        // once the data is read set currentByteStream to null
        // so the next data would be collected to process in next
        // iteration.
        if (remainingBytes == 0) {
            currentByteStream = null
        }

        return readSize

    }

    override fun getUri(): Uri? {
        webSocketClient?.request()?.url?.let {
            return Uri.parse(it.toString())
        }

        return null
    }

    override fun close() {
        // close the socket and release the resources
        closeWebsocketConnection()
    }

    private fun closeWebsocketConnection() {
//        webSocketClient?.close(1000, "Closing connection")
        webSocketClient?.cancel()
    }

    private fun initAndGetHttpClient() : OkHttpClient{
        val builder = OkHttpClient.Builder()
        return builder.build();
    }
}

@UnstableApi // Factory class for DataSource
class WssDSFactory(private val dataSource: WssStreamDataSource) : DataSource.Factory {
    override fun createDataSource(): DataSource = dataSource
}

Websocket listener implemented class

import android.os.Build
import android.util.Log
import androidx.annotation.RequiresApi
import com.livestreamdemo.ui.models.StreamResponse
import com.google.gson.Gson
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import okio.ByteString
import java.util.concurrent.ConcurrentSkipListSet

    class WssDataStreamCollector : WebSocketListener() {
        companion object{
            private const val TAG="WssDataStreamCollector"
        }
        private val wssData = ConcurrentSkipListSet<ByteString>()
    
        override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
            Log.e(TAG, "onMessage: byteString")
            wssData.add(bytes)
        }
    
        @RequiresApi(Build.VERSION_CODES.O)
        override fun onMessage(webSocket: WebSocket, text: String) {
            val response: StreamResponse?=Gson().fromJson<StreamResponse>(text, StreamResponse::class.java)
            response?.let {
                val videoChunk: ByteString= UtilHelper.base64StringToByteString(it.dataChunk)!!//todo check add null safety
                wssData.add(videoChunk)
     Log.e(TAG, "video_chunk receiver: ${it.timestamp}")
            }
        }
    
        override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
            super.onClosing(webSocket, code, reason)
            wssData.removeAll(wssData)
            Log.e("WssDataStreamCollector","connection Closed")
        }
    
        override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
            Log.e("onFailure","reason ${response.toString()}")
            Log.e("onFailure","errMsg ${t.message.toString()}")
        }
    
        fun canStream(): Boolean {
            return wssData.size > 0
        }
    
        fun getNextStream(): ByteString {
            return wssData.pollFirst()
        }
    }

CustomMediaExtractor:

@UnstableApi
 internal class CustomMediaExtractor : ExtractorsFactory {
    override fun createExtractors(): Array<Extractor> {
        return arrayOf(FragmentedMp4Extractor())
    }
}

Helper Class:

import android.os.Build
import androidx.annotation.RequiresApi
import okio.ByteString
import java.text.SimpleDateFormat
import java.util.Base64
import java.util.Locale


class UtilHelper {
    companion object {

        const val wsUrl =
            "wss://some_webstream_url"

        val ddMMYYYhhmmssa = SimpleDateFormat("dd-MM-yyyy hh:mm:ss a", Locale.US)


        @RequiresApi(Build.VERSION_CODES.O)
        fun base64ToArrayBuffer(base64: String): ByteString {
            val decodedBytes = Base64.getDecoder().decode(base64)
            return ByteString.of(*decodedBytes)
        }


        @RequiresApi(Build.VERSION_CODES.O)
        fun base64StringToByteString(base64String: String): ByteString? {
            return base64ToArrayBuffer(base64String) 
        }
    }

}

My Model response class:

data class StreamResponse2(
    @SerializedName("data")
    val dataChunk: String,
    @SerializedName("timestamp")
    val timestamp: Long,
)

please follow this link to see the logs: Open this

Reference: how to play video stream from websocket url on exoplayer?

0

There are 0 answers