Because Sieve makes it easy to push to function queues, it’s easy to parallelize data processing operations in just a few lines of code. In this example, we’ll build an app that performs face detection on a video using Sieve.
Let’s start with a simple function that has ffmpeg
installed. This will allow us to split a video into many frames.
import sieve
@sieve.function(
name="video_face_detector",
system_packages=["ffmpeg"],
)
def detector(video: sieve.File):
"""
:param video: a video to process
:return: a bunch of face coordinates, yielded
"""
return
Now let’s write code that splits the video into frames and starts passing it into a face detector. In this case, we’ll use a face detection model that is already available on Sieve, though you could also deploy your own.
import sieve
import concurrent.futures
@sieve.function(
name="video_face_detector",
system_packages=["ffmpeg"],
)
def detector(video: sieve.File):
"""
:param video: a video to process
:return: a bunch of face coordinates, yielded
"""
import os
import subprocess
import glob
import shutil
import time
print("Starting video face detection...")
video_path = video.path
face_detector = sieve.function.get("sieve/mediapipe_face_detector")
temp_dir ='temp'
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
os.makedirs(temp_dir, exist_ok=True)
subprocess.run(["ffmpeg", "-i", video_path, "-qscale:v", "2", f"{temp_dir}/%06d.jpg", '-loglevel', 'error'])
frames = glob.glob(f"{temp_dir}/*.jpg")
frames.sort()
output_dir = 'output'
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
imgs = [sieve.File(path=frame) for frame in frames]
with concurrent.futures.ThreadPoolExecutor() as executor:
count = 0
for job in executor.map(face_detector.push, imgs):
t = time.time()
res = list(job.result())
yield {
"frame_number": count,
"boxes": res[0]["boxes"],
}
print(f"Detected faces in frame {count} in {time.time() - t} seconds")
count += 1
shutil.rmtree(temp_dir)
if __name__=="__main__":
video = sieve.File(url="https://storage.googleapis.com/mango-public-models/david.mp4")
for detection in detector.run(video):
print(detection)
Once defined, you can run this as follows. You’ll notice the code calling detector.run
directly, where detector
is the actual function annotated with @sieve.function
. In this case, the function will first deploy and then will be remotely called.
You can view this job on your dashboard, or start seeing data stream to your terminal.