Live video using Youtube Live Streaming Api [Python]

87 views Asked by At

I'm trying to create a Python script that will create a live broadcast using the YouTube Live Streaming API and OAuth. In the first part of the code, authentication via OAuth is performed, which works fine.

In the next part, I create a live stream from my public video on YouTube and initiate the live broadcast. Then I bind the stream to the live broadcast and start the live broadcast.

After running the script, it prints the message "Live broadcast has been successfully started."

However, when I check YouTube, the live broadcast is created but nothing is playing in it. It still shows the message that it's waiting for the broadcast.

I'm completely lost because I'm a beginner. Can you give me any idea how to solve this?

import os
import pickle
import time
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from datetime import datetime, timedelta

# Function for OAuth authentication
def authenticate():
    SCOPES = ['https://www.googleapis.com/auth/youtube.force-ssl', 
              'https://www.googleapis.com/auth/youtube', 
              'https://www.googleapis.com/auth/youtube.readonly', 
              'https://www.googleapis.com/auth/youtubepartner', 
              'https://www.googleapis.com/auth/youtube.upload']

    credentials_file = 'token.pickle'

    if os.path.exists(credentials_file):
        with open(credentials_file, 'rb') as token:
            credentials = pickle.load(token)
    else:
        flow = InstalledAppFlow.from_client_secrets_file('client_secrets.json', scopes=SCOPES)
        credentials = flow.run_local_server(port=8080)

        with open(credentials_file, 'wb') as token:
            pickle.dump(credentials, token)

    return credentials

# Create an authorized YouTube API client
def create_youtube_client(credentials):
    return build('youtube', 'v3', credentials=credentials)

# Get the video ID from your channel
def get_video_id():
    # Here you can implement code to get the video ID, for example using the YouTube Data API
    return "my video ID from https"

# Create a live stream
def create_live_stream(youtube):
    request = youtube.liveStreams().insert(
        part="snippet,cdn,contentDetails,status",
        body={
            "snippet": {
                "title": "Your new video stream's name",
                "description": "A description of your video stream. This field is optional."
            },
            "cdn": {
                "frameRate": "variable",
                "ingestionType": "rtmp",
                "resolution": "variable"
            },
            "contentDetails": {
                "enableAutoStart": True,
                "isReusable": True
            }
        }
    )
    response = request.execute()
    return response["id"]

# Create a live broadcast
def create_live_broadcast(youtube, video_id, stream_id):
    request = youtube.liveBroadcasts().insert(
        part="snippet,status,contentDetails",
        body={
            "snippet": {
                "title": "Your live broadcast title",
                "description": "Your live broadcast description",
                "scheduledStartTime": (datetime.now() + timedelta(minutes=1)).isoformat(),  # Schedule the broadcast for 1 minutes later
                "thumbnails": {
                    "default": {
                        "url": "https://example.com/live-thumbnail.jpg"
                    }
                }
            },
            "status": {
                "privacyStatus": "private"
            },
            "contentDetails": {
                "enableAutoStart": True
            }
        }
    )
    response = request.execute()
    broadcast_id = response["id"]

    # Bind the stream to the live broadcast
    bind_request = youtube.liveBroadcasts().bind(
        part="id,snippet",
        id=broadcast_id,
        streamId=stream_id
    )
    bind_response = bind_request.execute()

    return broadcast_id

# Start the live broadcast
def start_live_broadcast(youtube, broadcast_id):
    request = youtube.liveBroadcasts().update(
        part="id,status",
        body={
            "id": broadcast_id,
            "status": {
                "lifeCycleStatus": "live"
            }
        }
    )
    response = request.execute()
    return response["id"]

# Main function
def main():
    # Authenticate using OAuth
    credentials = authenticate()

    # Create an authorized client for the YouTube API
    youtube = create_youtube_client(credentials)

    # Get the video ID
    video_id = get_video_id()

    # Create a live stream
    stream_id = create_live_stream(youtube)

    # Create a live broadcast
    broadcast_id = create_live_broadcast(youtube, video_id, stream_id)

    # Wait for 10 seconds to initialize the broadcast
    time.sleep(10)

    # Start the live broadcast
    start_live_broadcast(youtube, broadcast_id)

    print("Live broadcast has been successfully started.")

if __name__ == "__main__":
    main()

I've tried different video IDs from my channel, tried upload new video using "videos.insert" from youtube api, but nothing seems to work. There must be some error that my amateur eye can't catch, but a developer would spot right away.

It seems like the live broadcast is not receiving the video source for streaming, but I don't know why.

1

There are 1 answers

2
zaibe_.x On

The issue stems from the incorrect parameter used to update the broadcast status to "live" in the start_live_broadcast function. The correct parameter is status.lifeCycleStatus instead of status.

def start_live_broadcast(youtube, broadcast_id):
request = youtube.liveBroadcasts().transition(
    broadcastStatus="live",
    id=broadcast_id,
    part="id,status"
)
response = request.execute()
return response["id"]