Because Sieve makes it easy to push to function queues, it’s easy to parallelize data processing operations in just a few lines of code. In this example, we’ll build an app that performs face detection on a video using Sieve.

Let’s start with a simple function that has ffmpeg installed. This will allow us to split a video into many frames.

import sieve

@sieve.function(
    name="video_face_detector",
    system_packages=["ffmpeg"],
)
def detector(video: sieve.Video):
    """
    :param video: a video to process
    :return: a bunch of face coordinates, yielded
    """
    return

Now let’s write code that splits the video into frames and starts passing it into a face detector. In this case, we’ll use a face detection model that is already available on Sieve, though you could also deploy your own.

import sieve
import concurrent.futures

@sieve.function(
    name="video_face_detector",
    system_packages=["ffmpeg"],
)
def detector(video: sieve.Video):
    """
    :param video: a video to process
    :return: a bunch of face coordinates, yielded
    """

    import os
    import subprocess
    import glob
    import shutil
    import time

    print("Starting video face detection...")
    video_path = video.path

    face_detector = sieve.function.get("sieve/mediapipe_face_detector")

    temp_dir ='temp'
    # delete temp dir if it exists
    if os.path.exists(temp_dir):
        shutil.rmtree(temp_dir)
    os.makedirs(temp_dir, exist_ok=True)

    # extract frames from video
    subprocess.run(["ffmpeg", "-i", video_path, "-qscale:v", "2", f"{temp_dir}/%06d.jpg", '-loglevel', 'error'])

    # get list of frames
    frames = glob.glob(f"{temp_dir}/*.jpg")
    frames.sort()

    # make directory for output
    output_dir = 'output'
    # delete output dir if it exists
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
    os.makedirs(output_dir, exist_ok=True)

    # start joining frames back together as they finish
    imgs = [sieve.File(path=frame) for frame in frames]
    with concurrent.futures.ThreadPoolExecutor() as executor:
        count = 0
        for job in executor.map(face_detector.push, imgs):
            t = time.time()
            res = list(job.result())
            yield {
                "frame_number": count,
                "boxes": res[0]["boxes"],
            }
            print(f"Detected faces in frame {count} in {time.time() - t} seconds")
            count += 1

    # delete temp dir
    shutil.rmtree(temp_dir)

if __name__=="__main__":
    video = sieve.Video(url="https://storage.googleapis.com/mango-public-models/david.mp4")

    # Calling detector.run both deploys and runs the current version of your code
    for detection in detector.run(video):
        print(detection)

Once defined, you can run this as follows. You’ll notice the code calling detector.run directly, where detector is the actual function annotated with @sieve.function. In this case, the function will first deploy and then will be remotely called.

$ python file.py

You can view this job on your dashboard, or start seeing data stream to your terminal.