Example: Parallelized Face Detection
Because Sieve makes it easy to push to function queues, it’s relatively easy to parallelize data processing operations in just a few lines of code. To learn this, we’ll build an app that performs face detection on a video using Sieve.
Let’s start with a simple function that has ffmpeg installed. This will allow us to split a video into many frames that get processed later on.
import sieve
@sieve.function(
name="video_face_detector",
python_packages=["ffmpeg-python==0.2.0"],
system_packages=["libgl1-mesa-glx", "libglib2.0-0", "ffmpeg"],
)
def vfd(video: sieve.Video):
"""
:param video: a video to process
:return: a bunch of face coordinates
"""
Now let’s write code that splits the video into frames and starts passing it into a face detector. In this case, we’ll use a face detection model that is already available on Sieve though you could also deploy your own using a setup similar to this.
import sieve
import concurrent.futures
@sieve.function(
name="video_face_detector",
system_packages=["ffmpeg"],
)
def detector(video: sieve.Video):
print("Starting video face detection...")
video_path = video.path
face_detector = sieve.function.get("sieve/mediapipe_face_detector")
# use ffmpeg to extract frames from video
import subprocess
import os
temp_dir ='temp'
# delete temp dir if it exists
if os.path.exists(temp_dir):
import shutil
shutil.rmtree(temp_dir)
os.makedirs(temp_dir, exist_ok=True)
# extract frames from video
subprocess.run(["ffmpeg", "-i", video_path, "-qscale:v", "2", f"{temp_dir}/%06d.jpg"])
# get list of frames
import glob
frames = glob.glob(f"{temp_dir}/*.jpg")
frames.sort()
# remove background from each frame
# remove background from each frame
imgs = []
for frame_num, frame in enumerate(frames):
img = sieve.Image(path=frame)
imgs.append(img)
# print(f"Frame {frame_num} created for background remover")
# make directory for output
output_dir = 'output'
# delete output dir if it exists
if os.path.exists(output_dir):
import shutil
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
# start joining frames back together as they finish
import time
with concurrent.futures.ThreadPoolExecutor() as executor:
count = 0
for job in executor.map(face_detector.push, imgs):
t = time.time()
res = list(job.result())
yield {
"frame_number": count,
"boxes": res[1]
}
print(f"Detected faces in frame {count} in {time.time() - t} seconds")
count += 1
# delete temp dir
import shutil
shutil.rmtree(temp_dir)
if __name__=="__main__":
video = sieve.Video(url="https://storage.googleapis.com/mango-public-models/david.mp4")
for detection in detector.run(video):
print(detection)
Once defined, you can run this as follows. You’ll notice the code having a main
function. This is because that’s how we’re triggering a call to the function. In this case, the function will first deploy and then the main
function will be called.
$ python file.py
You can view this job on your dashboard, or start seeing data stream to your terminal.