How to stream uploaded video with AWS?

1.1k views Asked by At

The main task is to protect video from downloading.

To achieve it, we decided to set up Video Streaming from S3.

The project has an PHP API and a client. The API generates Pre-Signed URL to where the video should be uploaded in S3 bucket. Then, client can request video by a CDN URL. But, with signed urls, video can be downloaded from the client.

We found an approach, when video is converted to MPEG-DASH with AWS Elemental MediaConverter. The Job for MediaConverter can be created via API. Then it should be streamed via AWS Elemental MediaPackage and CloudFront.

The problems are:

  • How to understand when the video upload is finished, to start MediaConverter Job?
  • MPEG-DASH file has a .mpd manifest, but MediaPackage requires .smil manifest. How to auto generate this file from a .mpd?

P.S. If I'm wrong somewhere, please, correct me.

1

There are 1 answers

1
MichaelTam On

How to understand when the video upload is finished, to start MediaConverter Job? It could be achieved by the following workflow

  1. the ingest user uploads a video to the watchfolder bucket in S3
  2. the s3:PutItem event triggers a Lambda function that calls MediaConvert to convert the videos.
  3. Converted videos are stored in S3 by MediaConvert

High level instructions as follow.

  • create an Amazon S3 bucket to use for uploading videos to be converted. Bucket name example: vod-watchfolder-firstname-lastname

  • create an Amazon S3 bucket to use for storing converted video outputs from MediaConvert (enables public read, Static website hosting and CORS)

      <?xml version="1.0" encoding="UTF-8"?>
      <CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
      <CORSRule>
          <AllowedOrigin>*</AllowedOrigin>
          <AllowedMethod>GET</AllowedMethod>
          <MaxAgeSeconds>3000</MaxAgeSeconds>
          <AllowedHeader>*</AllowedHeader>
      </CORSRule>
      </CORSConfiguration>
    
  • create an IAM role to Pass to MediaConvert. Use the IAM console to create a new role. Name it MediaConvertRole and select AWS Lambda for the role type. Use inline policies to grant permissions to other resources needed for the lambda to execute.

  • Create an IAM Role for Your Lambda function. Use the IAM console to create a role. Name it VODLambdaRole and select AWS Lambda for the role type. Attach the managed policy called AWSLambdaBasicExecutionRole to this role to grant the necessary CloudWatch Logs permissions. Use inline policies to grant permissions to other resources needed for the lambda to execute.

      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Action": [
                      "logs:CreateLogGroup",
                      "logs:CreateLogStream",
                      "logs:PutLogEvents"
                  ],
                  "Resource": "*",
                  "Effect": "Allow",
                  "Sid": "Logging"
              },
              {
                  "Action": [
                      "iam:PassRole"
                  ],
                  "Resource": [
                      "ARNforMediaConvertRole"
                  ],
                  "Effect": "Allow",
                  "Sid": "PassRole"
              },
              {
                  "Action": [
                      "mediaconvert:*"
                  ],
                  "Resource": [
                      "*"
                  ],
                  "Effect": "Allow",
                  "Sid": "MediaConvertService"
              },
              {
                  "Action": [
                      "s3:*"
                  ],
                  "Resource": [
                      "*"
                  ],
                  "Effect": "Allow",
                  "Sid": "S3Service"
              }
          ]
      }
    
  • Create a lambda Function for converting videos. Use the AWS Lambda console to create a new Lambda function called VODLambdaConvert that will process the API requests. Use the provided convert.py example implementation for your function code.

      #!/usr/bin/env python
    
      import glob
      import json
      import os
      import uuid
      import boto3
      import datetime
      import random
      from urllib.parse import urlparse
      import logging
    
      from botocore.client import ClientError
    
      logger = logging.getLogger()
      logger.setLevel(logging.INFO)
    
      S3 = boto3.resource('s3')
    
      def handler(event, context):
      '''
      Watchfolder handler - this lambda is triggered when video objects are uploaded to the 
      SourceS3Bucket/inputs folder.
      It will look for two sets of file inputs:
          SourceS3Bucket/inputs/SourceS3Key:
              the input video to be converted
    
          SourceS3Bucket/jobs/*.json:
              job settings for MediaConvert jobs to be run against the input video. If 
              there are no settings files in the jobs folder, then the Default job will be run 
              from the job.json file in lambda environment. 
    
      Ouput paths stored in outputGroup['OutputGroupSettings']['DashIsoGroupSettings']['Destination']
      are constructed from the name of the job settings files as follows:
    
          s3://<MediaBucket>/<basename(job settings filename)>/<basename(input)>/<Destination value from job settings file>
      '''
    
      assetID = str(uuid.uuid4())
      sourceS3Bucket = event['Records'][0]['s3']['bucket']['name']
      sourceS3Key = event['Records'][0]['s3']['object']['key']
      sourceS3 = 's3://'+ sourceS3Bucket + '/' + sourceS3Key
      destinationS3 = 's3://' + os.environ['DestinationBucket']
      mediaConvertRole = os.environ['MediaConvertRole']
      application = os.environ['Application']
      region = os.environ['AWS_DEFAULT_REGION']
      statusCode = 200
      jobs = []
      job = {}
    
      # Use MediaConvert SDK UserMetadata to tag jobs with the assetID 
      # Events from MediaConvert will have the assetID in UserMedata
      jobMetadata = {}
      jobMetadata['assetID'] = assetID
      jobMetadata['application'] = application
      jobMetadata['input'] = sourceS3
    
      try:    
    
          # Build a list of jobs to run against the input.  Use the settings files in WatchFolder/jobs
          # if any exist.  Otherwise, use the default job.
    
          jobInput = {}
          # Iterates through all the objects in jobs folder of the WatchFolder bucket, doing the pagination for you. Each obj
          # contains a jobSettings JSON
          bucket = S3.Bucket(sourceS3Bucket)
          for obj in bucket.objects.filter(Prefix='jobs/'):
              if obj.key != "jobs/":
                  jobInput = {}
                  jobInput['filename'] = obj.key
                  logger.info('jobInput: %s', jobInput['filename'])
    
                  jobInput['settings'] = json.loads(obj.get()['Body'].read())
                  logger.info(json.dumps(jobInput['settings'])) 
    
                  jobs.append(jobInput)
    
          # Use Default job settings in the lambda zip file in the current working directory
          if not jobs:
    
              with open('job.json') as json_data:
                  jobInput['filename'] = 'Default'
                  logger.info('jobInput: %s', jobInput['filename'])
    
                  jobInput['settings'] = json.load(json_data)
                  logger.info(json.dumps(jobInput['settings']))
    
                  jobs.append(jobInput)
    
          # get the account-specific mediaconvert endpoint for this region
          mediaconvert_client = boto3.client('mediaconvert', region_name=region)
          endpoints = mediaconvert_client.describe_endpoints()
    
          # add the account-specific endpoint to the client session 
          client = boto3.client('mediaconvert', region_name=region, endpoint_url=endpoints['Endpoints'][0]['Url'], verify=False)
    
          for j in jobs:
              jobSettings = j['settings']
              jobFilename = j['filename']
    
              # Save the name of the settings file in the job userMetadata
              jobMetadata['settings'] = jobFilename
    
              # Update the job settings with the source video from the S3 event 
              jobSettings['Inputs'][0]['FileInput'] = sourceS3
    
              # Update the job settings with the destination paths for converted videos.  We want to replace the
              # destination bucket of the output paths in the job settings, but keep the rest of the
              # path
              destinationS3 = 's3://' + os.environ['DestinationBucket'] + '/' \
                  + os.path.splitext(os.path.basename(sourceS3Key))[0] + '/' \
                  + os.path.splitext(os.path.basename(jobFilename))[0]                 
    
              for outputGroup in jobSettings['OutputGroups']:
    
                  logger.info("outputGroup['OutputGroupSettings']['Type'] == %s", outputGroup['OutputGroupSettings']['Type']) 
    
                  if outputGroup['OutputGroupSettings']['Type'] == 'FILE_GROUP_SETTINGS':
                      templateDestination = outputGroup['OutputGroupSettings']['FileGroupSettings']['Destination']
                      templateDestinationKey = urlparse(templateDestination).path
                      logger.info("templateDestinationKey == %s", templateDestinationKey)
                      outputGroup['OutputGroupSettings']['FileGroupSettings']['Destination'] = destinationS3+templateDestinationKey
    
                  elif outputGroup['OutputGroupSettings']['Type'] == 'HLS_GROUP_SETTINGS':
                      templateDestination = outputGroup['OutputGroupSettings']['HlsGroupSettings']['Destination']
                      templateDestinationKey = urlparse(templateDestination).path
                      logger.info("templateDestinationKey == %s", templateDestinationKey)
                      outputGroup['OutputGroupSettings']['HlsGroupSettings']['Destination'] = destinationS3+templateDestinationKey
    
                  elif outputGroup['OutputGroupSettings']['Type'] == 'DASH_ISO_GROUP_SETTINGS':
                      templateDestination = outputGroup['OutputGroupSettings']['DashIsoGroupSettings']['Destination']
                      templateDestinationKey = urlparse(templateDestination).path
                      logger.info("templateDestinationKey == %s", templateDestinationKey)
                      outputGroup['OutputGroupSettings']['DashIsoGroupSettings']['Destination'] = destinationS3+templateDestinationKey
    
                  elif outputGroup['OutputGroupSettings']['Type'] == 'DASH_ISO_GROUP_SETTINGS':
                      templateDestination = outputGroup['OutputGroupSettings']['DashIsoGroupSettings']['Destination']
                      templateDestinationKey = urlparse(templateDestination).path
                      logger.info("templateDestinationKey == %s", templateDestinationKey)
                      outputGroup['OutputGroupSettings']['DashIsoGroupSettings']['Destination'] = destinationS3+templateDestinationKey
    
                  elif outputGroup['OutputGroupSettings']['Type'] == 'MS_SMOOTH_GROUP_SETTINGS':
                      templateDestination = outputGroup['OutputGroupSettings']['MsSmoothGroupSettings']['Destination']
                      templateDestinationKey = urlparse(templateDestination).path
                      logger.info("templateDestinationKey == %s", templateDestinationKey)
                      outputGroup['OutputGroupSettings']['MsSmoothGroupSettings']['Destination'] = destinationS3+templateDestinationKey
    
                  elif outputGroup['OutputGroupSettings']['Type'] == 'CMAF_GROUP_SETTINGS':
                      templateDestination = outputGroup['OutputGroupSettings']['CmafGroupSettings']['Destination']
                      templateDestinationKey = urlparse(templateDestination).path
                      logger.info("templateDestinationKey == %s", templateDestinationKey)
                      outputGroup['OutputGroupSettings']['CmafGroupSettings']['Destination'] = destinationS3+templateDestinationKey
                  else:
                      logger.error("Exception: Unknown Output Group Type %s", outputGroup['OutputGroupSettings']['Type'])
                      statusCode = 500
    
              logger.info(json.dumps(jobSettings))
    
              # Convert the video using AWS Elemental MediaConvert
              job = client.create_job(Role=mediaConvertRole, UserMetadata=jobMetadata, Settings=jobSettings)
    
      except Exception as e:
          logger.error('Exception: %s', e)
          statusCode = 500
          raise
    
      finally:
          return {
              'statusCode': statusCode,
              'body': json.dumps(job, indent=4, sort_keys=True, default=str),
              'headers': {'Content-Type': 'application/json', 'Access-Control-Allow-Origin': '*'}
          }
    
  • Make sure to configure your function to use the VODLambdaRole IAM role you created in the previous section.

  • Create a S3 Event Trigger for your Convert lambda. Use the AWS Lambda console to add a putItem trigger from the vod-watchfolder-firstname-lastname S3 bucket to the VODLambdaConvert lambda.

  • test the watchfolder automation. You can use your own video or use the test.mp4 video included in this folder to test the workflow.

  • For detail, please refer to this document https://github.com/aws-samples/aws-media-services-vod-automation/blob/master/MediaConvert-WorkflowWatchFolderAndNotification/README-tutorial.md

MPEG-DASH file has a .mpd manifest, but MediaPackage requires .smil manifest. How to auto generate this file from a .mpd?