/ aws

AWS Kinesis Video Streams: tips & tricks

If you've used AWS Kinesis Video Streams, you probably know that some scenarios aren't trivial. What's KVS and what does it do?

Amazon Kinesis Video Streams makes it easy to securely stream video from connected devices to AWS for analytics, machine learning (ML), playback, and other processing. Kinesis Video Streams automatically provisions and elastically scales all the infrastructure needed to ingest streaming video data from millions of devices. It also durably stores, encrypts, and indexes video data in your streams, and allows you to access your data through easy-to-use APIs. Kinesis Video Streams enables you to playback video for live and on-demand viewing, and quickly build applications that take advantage of computer vision and video analytics through integration with Amazon Rekognition Video, and libraries for ML frameworks such as Apache MxNet, TensorFlow, and OpenCV.

Let's look at some things that aren't obvious, poorly documented, or just hard to do.

Problem A: stream splitting (not to be confused with shard-splitting).

Sometimes, you want to create multiple Kinesis Video Streams from a single RTSP source.

Solution: build a Gstreamer pipeline using a "tee" element:

This element splits data to multiple pads. Splitting the data flow is useful, for example, when capturing a video where the video is shown on the screen and also encoded and written to a file. Another example is playing music and hooking up a visualization module. One needs to use separate queue elements in each branch to provide separate threads for each branch. Otherwise a blocked dataflow in one branch would stall the other branches.

tee

For more details, see here and here.

Example 1:

gst-launch-1.0 -v rtspsrc location=rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov short-header=TRUE ! rtph264depay ! tee name=t \
! queue ! h264parse ! kvssink stream-name=Stream1 storage-size=512 \
t. ! queue ! h264parse ! kvssink stream-name=Stream2 storage-size=512

Example 2:

gst-launch-1.0 -v rtspsrc location=rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov short-header=TRUE ! rtph264depay ! h264parse ! tee name=t \
! queue ! kvssink stream-name=Stream1 storage-size=512 \
t. ! queue ! kvssink stream-name=Stream2 storage-size=512

Each will create two identical streams in Kinesis, named Stream1 and Stream2. The difference is in the location of the "tee" element, which define where in the pipeline the split is performed.

Problem B: getting playable HLS URL from Kinesis

You want to view a Kinesis video stream using HTTP Live Streaming (HLS), but not sure how to access it. All you have is the stream's ARN.

Solution: use AWS SDK for Python (boto3) and GetDataEndpoint API to retrieve an HLS URL.

# This example shows how to get a Kinesis HLS URL for the stream based on its StreamARN.

# Note that your AWS credentials (and the default datacenter - us-west-2 in my case) need to specified in ~/.aws/credentials.
#
# This script will:
# 1. Get an endpoint with the GetDataEndpoint API
# 2. Use that endpoint to get an HLS streaming URL with the GetHLSStreamingSessionURL API

import boto3

hls_stream_ARN = "arn:aws:kinesisvideo:us-west-2:430343295543:stream/MyStream/1561431365342"

STREAM_NAME = "MyStream"
kvs = boto3.client("kinesisvideo")

print("Attempting to get an HLS streaming URL from AWS GetDataEndpoint API...")

# Grab the endpoint fVeryFirstrom GetDataEndpoint
endpoint = kvs.get_data_endpoint(
    APIName="GET_HLS_STREAMING_SESSION_URL",
    StreamARN=hls_stream_ARN
)['DataEndpoint']
# Grab the HLS Stream URL from the endpoint
kvam = boto3.client("kinesis-video-archived-media", endpoint_url=endpoint)
url = kvam.get_hls_streaming_session_url(
    StreamName=STREAM_NAME,
    PlaybackMode="LIVE"
)['HLSStreamingSessionURL']

print("HLS URL:", url)

For more information about get-hls-streaming-session-url, see Amazon's official documentation.

Problem C: save Kinesis video fragments

You want to get Kinesis Video Streams segments and save them to a file.

Solution: use AWS SDK for Python (boto3) and GetDataEndpoint API to retrieve its HLS URL, then get its video chunks using GetMedia - and save them to a file (in MKV format).

import boto3
import cv2
import time
import json

# make sure to use the right ARN here:
hls_stream_ARN = "arn:aws:kinesisvideo:us-west-2:420343293343:stream/MyStream/1563421665582"

STREAM_NAME = "MyStream"
kvs = boto3.client("kinesisvideo")

print("Attempting to get an HLS streaming URL from AWS GetDataEndpoint API...")

# Grab the endpoint from GetDataEndpoint
endpoint = kvs.get_data_endpoint(
    APIName="GET_HLS_STREAMING_SESSION_URL",
    StreamARN=hls_stream_ARN
)['DataEndpoint']
# Grab the HLS Stream URL from the endpoint
kvam = boto3.client("kinesis-video-archived-media", endpoint_url=endpoint)
url = kvam.get_hls_streaming_session_url(
    StreamName=STREAM_NAME,
    PlaybackMode="LIVE"
)['HLSStreamingSessionURL']

print("HLS URL:", url)

kvs = boto3.client("kinesisvideo")
response = kvs.list_streams()

print("Stream data:")

for i in range(len(response['StreamInfoList'])):

  streaminfo = response['StreamInfoList'][i]
  for key,value in streaminfo.items():
     print(key,':',value)
  streamname = streaminfo['StreamName']
  datapointinfo = kvs.get_data_endpoint(StreamName=streamname, APIName='GET_MEDIA')
  datapoint = datapointinfo['DataEndpoint']
  print(datapoint)


# Now try getting video chunks using GetMedia

response = kvs.get_data_endpoint(
    #StreamName=STREAM_NAME,
    StreamARN=hls_stream_ARN,
    APIName='GET_MEDIA'
)

print("Getting data endpoint...", response)

endpoint_url_string = response['DataEndpoint']

streaming_client = boto3.client(
	'kinesis-video-media', 
	endpoint_url=endpoint_url_string, 
	#region_name='us-east-1'
)

kinesis_stream = streaming_client.get_media(
	StreamARN=hls_stream_ARN,
	StartSelector={'StartSelectorType': 'EARLIEST'}
	#StartSelector={'StartSelectorType': 'NOW'}

)

stream_payload = kinesis_stream['Payload']

print("Received stream payload.")

f = open("fragments_"+ str(time.time()) +".mkv", 'w+b')
f.write(stream_payload.read())
f.close() 
print("Saved to a file.")

Note: as you can tell from the resulting file's erratic playback, the fragments often arrive out-of-order (and therefore saved out-of-order):

kinesis

To put them in the correct order:

  • call ListFragmenst API
  • sort the list by server timestamp
  • GetMediaFromFragmentList

Keep in mind that you can only access chunk-level data with Python. Frame-level data can only be accessed in Java!