Chrysalis Edge Proxy Python gRPC Service Connector

Installation

Prerequisites (Python)

Install gRPC and Protocol Buffers:

pip install grpcio grpcio-tools

Download Chrysalis Edge Proxy protocol specification file:

wget https://raw.githubusercontent.com/chryscloud/video-edge-ai-proxy/annotate/proto/video_streaming.proto

Prepare the code:

python3 -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. video_streaming.proto

This will generate two files in the same folder as the downloaded video_streaming.proto:

  • video_streaming_pb2_grpc.py
  • video_streaming_pb2.py

Chrysalis Edge Proxy Services

Checking the video_streaming.proto we can see couple of defined services:

service Image {
    rpc VideoLatestImage(stream VideoFrameRequest) returns (stream VideoFrame) {}
    rpc ListStreams(ListStreamRequest) returns (stream ListStream) {}
    rpc Annotate(AnnotateRequest) returns (AnnotateResponse) {}
    rpc Proxy(ProxyRequest) returns (ProxyResponse) {} // start stop rtmp passthrough
    rpc Storage(StorageRequest) returns (StorageResponse) {} // start stop storage request on the Chrysalis servers
}
  • VideoLatestImage returns the latest video frame
  • ListStreams lists all cameras connected to the Chrysalis Edge Proxy

Other services require premium account on https://cloud.chrysalis.com:

  • Annotate send Computer Vision annotated events to Chrysalis Cloud Streaming API
  • Proxy Start and Stop RTMP stream passthrough to Chrysalis Cloud (e.g. controlling bandwidth consumption)
  • Storage Enable/Disable video storage on Chrysalis Cloud

Proxy and Storage services require Chrysalis RTMP endpoint to be part of a specific RTSP camera settings. RTMP endpoints can be defined on Chrysalis Cloud.

Establish gRPC channel

Default port is 50001

    # grpc connection to video-edge-ai-proxy
    channel = grpc.insecure_channel('127.0.0.1:50001')
    stub = video_streaming_pb2_grpc.ImageStub(channel)

Calling a service

Listing all RTSP streams:


import grpc
import video_streaming_pb2_grpc, video_streaming_pb2

def send_list_stream_request(stub):
    """ Create a list of streams request object """


    stream_request = video_streaming_pb2.ListStreamRequest()   
    responses = stub.ListStreams(stream_request)
    for stream_resp in responses:
        yield stream_resp

list_streams = send_list_stream_request(stub)
for stream in list_streams:
    print(stream)

 

Get latest video frame from a live camera stream


import grpc
import video_streaming_pb2_grpc, video_streaming_pb2
import numpy as np

def gen_image_request(device_name, keyframe_only):
    """ Create an object to request a video frame """


    req = video_streaming_pb2.VideoFrameRequest()
    req.device_id = device_name
    req.key_frame_only = keyframe_only
    yield req

# grpc connection to video-edge-ai-proxy
channel = grpc.insecure_channel('127.0.0.1:50001')
stub = video_streaming_pb2_grpc.ImageStub(channel)

while True:
     for frame in stub.VideoLatestImage(gen_image_request(device_name="myrtspcameraname",keyframe_only=True/False)):
        img_bytes = frame.data 
        re_img = np.frombuffer(img_bytes, dtype=np.uint8)
        reshape = tuple([int(dim.size) for dim in frame.shape.dim])
        latest_video_frame = np.reshape(re_img, reshape) # bgr24 format
What does this code do?

As previously we connect to gRPC over port 50001. We send a request for a VideoFrame (VideoLatestImage service with request object VideoFrameRequest).

The service returns a VideoFrame message. The structure of VideoFrame message is:

message VideoFrame {
    int64 width = 1; # image width
    int64 height = 2; # image height
    bytes data = 3; # byte data of the frame itself
    int64 timestamp = 4; # timestamp of frame creation
    bool is_keyframe = 5; # if the frame is a keyframe
    int64 pts = 6; # presentation timestamp
    int64 dts = 7; # decoding timestamp
    string frame_type = 8; # (I, P, B)
    bool is_corrupt = 9; # if the frame has been corrupt through transfer
    double time_base = 10; # time base (info usually only for decoders)
    ShapeProto shape = 11; # Frame dimensions
    string device_id = 12; # device_id / device name
    int64 packet = 13; # sequential packet number within the current GOP
    int64 keyframe = 14; # sequential keyframe number within the current  GOP
}

To get the image out we’re interested in only these fields: data, shape

re_img = np.frombuffer(img_bytes, dtype=np.uint8)

interprets a buffer as 1 dimensional array. Then we need to convert 1 dimensional array into an original image shape (e.g. 512x512x3), so we create a reshape tuple with dimensions from ShapeProto message.

Then finally we can reshape an image into it’s original array dimensions (which is always in bgr24 format)

Asynchronous annotation service

Checking the video_streaming.proto you can inspect the Annotation service:

// Annotation messages
message AnnotateRequest {

    string device_name = 1; // required: device name (required) identity of device
    string remote_stream_id = 2; //optional: if associated with storage, the ID of Chrysalis Cloud deviceID
    string type = 3; // required: event type: e.g. moving, exit, entry, stopped, parked, ...
    int64 start_timestamp = 4; //required: start of the event
    int64 end_timestamp = 5; // optional: event of the event
    string object_type = 6; // optional: e.g. person, car, face, bag, roadsign,...
    string object_id = 7; // optional: e.g. object id from the ML model
    string object_tracking_id = 8; // optional: tracking id of the object
    double confidence = 9; // confidence of inference [0-1.0]
    BoudingBox object_bouding_box = 10; // optional: object bounding box
    Location location = 11; // optional: object GEO location
    Coordinate object_coordinate = 12; // optional: object coordinates within the image
    repeated Coordinate mask = 13; // optional" object mask (polygon)
    repeated double object_signature = 14; // optional: signature of the detected item
    string ml_model = 15; // optional: description of the module that generated this event
    string ml_model_version = 16; // optional: version of the ML model
    int32 width = 17; // optional: image width
    int32 height = 18; // optional: image height
    bool is_keyframe = 19; // optional: true/false if this annotation is from keyframe
    string video_type = 20; // optional: e.g. mp4 filename, live stream, ...
    int64 offset_timestamp = 21; // optional: offset from the beginning
    int64 offset_duration = 22; // optional: duration from the offset
    int64 offset_frame_id = 23; // optional: frame id of the 
    int64 offset_packet_id = 24; // optional: offset of the packet

    // extending the event message meta data (optional)
    string custom_meta_1 = 25; // e.g. gender, hair, car model, ...
    string custom_meta_2 = 26;
    string custom_meta_3 = 27;
    string custom_meta_4 = 28;
    string custom_meta_5 = 29;
}

message AnnotateResponse {
    string device_name = 1;
    string remote_stream_id = 2;
    string type = 3;
    int64 start_timestamp = 4;
}

The Annotation message is designed with Computer Vision in mind.

To use Annotation service we yet again connect to gRPC service on port 50001 and construct an annotation message:

def annotate(stub, device_name, event_type):
    """ Sending annotation to Chrysalis Cloud """


    annotation_request = video_streaming_pb2.AnnotateRequest()
    annotation_request.device_name = device_name
    annotation_request.type = event_type
    annotation_request.start_timestamp = int(round(time.time() * 1000))
    annotation_request.end_timestamp = int(round(time.time() * 1000))
    try:
        resp = stub.Annotate(annotation_request)
        print(resp)
    except grpc.RpcError as rpc_error_call:
        print("start proxy failed with", rpc_error_call.code(), rpc_error_call.details())
What does this code do?

The minimum required fields to send an Annotation message are:

  • device_name (the id/name of the camera from which the annotation message is created for)
  • type (event type, e.g. moving, exit, entry, stopped, parked,…)
  • start_timestamp (UTC timestamp in milliseconds).

It’s recommended that you sync your clock with NTP servers on the edge.

Enable/Disable storage on Chrysalis Cloud

Every stream coming from Chrysalis Edge Proxy can be optionally turned on or off. By default Chrysalis Cloud is not storing any incoming live video.

def storage(stub, device_name, onoff=False):
    """ Enabling or disabling storage on live RTMP stream """


    storage_request = video_streaming_pb2.StorageRequest()
    storage_request.device_id = device_name
    storage_request.start = onoff
    try:
        resp = stub.Storage(storage_request)
        print(resp)
    except grpc.RpcError as rpc_error_call:
        print("start proxy failed with", rpc_error_call.code(), rpc_error_call.details())

 

Start/Stop pass-through streaming to Chrysalis Cloud

Pass-through streaming allows you to start and stop the streaming to the Chrysalis Cloud RTMP endpoint to preserve bandwidth on the edge. If RTMP endpoint added on RTSP camera setup, then RTMP pass-through streaming is enabled by default.

Coming soon.

What’s next?

  • Explore samples. The Chrysalis Edge Proxy client is accessible by gRPC service. Samples for Python are available on GitHub: