Hello,
The default tutorial in the documentation file did not really work for me, so I wanted to share some of experience interfacing Raspberry Pi v2 camera modules (CSI, Sony IMX219 8-megapixel sensor) with a Python program running on the BeagleY-AI platform here.
The first steps were the same as in the Using IMX219 CSI Cameras tutorial, i.e. adding a device tree overlay to /boot/firmware/extlinux/extlinux.conf
, running sudo beagle-camera-setup
etc. After the camera appeared in the /dev/ folder, I’ve used the following command to test it and obtain several frames:
~$ gst-launch-1.0 -v \
v4l2src num-buffers=5 device=/dev/video3 io-mode=dmabuf ! \
video/x-bayer, width=1920, height=1080, framerate=30/1, format=rggb ! \
bayer2rgb ! videoconvert ! jpegenc ! \
multifilesink location="imx219-image-%d.jpg"
Although the shell command worked correctly, obtaining frames within a Python script proved to be challenging. What worked in the end was building opencv from sources with gstreamer support.
Initially, I tried to make a minimalistic opencv configuration with only the modules that I needed using -D BUILD_LIST
option, but faced problems due to opencv internal bugs (e.g. the flag to not build Python documentation was ignored, and stubs generation failed because it referenced some dnn functions despite them being disabled too).
Eventually, the following full build command worked:
~$ cmake \
-D CMAKE_BUILD_TYPE=Release \
-D CMAKE_INSTALL_PREFIX=$CONDA_PREFIX \
-D BUILD_TESTS=OFF \
-D BUILD_PERF_TESTS=OFF \
-D BUILD_EXAMPLES=OFF \
-D BUILD_opencv_apps=OFF \
-D BUILD_JAVA=OFF \
-D PYTHON3_EXECUTABLE="$(which python3)" \
-D PYTHON3_INCLUDE_DIR="$(python3 -c 'import sysconfig; print(sysconfig.get_paths()["include"])')" \
-D PYTHON3_LIBRARY="$(python3 -c 'import sysconfig; print(sysconfig.get_config_var("LIBDIR"))')/libpython3.9.so" \
-D PYTHON3_NUMPY_INCLUDE_DIRS="$(python3 -c 'import numpy; print(numpy.get_include())')" \
-D OPENCV_PYTHON_INSTALL_PATH="lib/python3.9/site-packages" \
-D WITH_GSTREAMER=ON \
-D WITH_PNG=ON \
-D WITH_JPEG=OFF \
-D WITH_TIFF=OFF \
-D WITH_WEBP=OFF \
-D WITH_OPENJPEG=OFF \
-D WITH_JASPER=OFF \
-D WITH_OPENEXR=OFF \
../opencv
In cmake output, one needs to make sure that gstreamer is ON and python3 is on, like:
Video I/O:
-- FFMPEG: YES
-- avcodec: YES (59.37.100)
-- avformat: YES (59.27.100)
-- avutil: YES (57.28.100)
-- swscale: YES (6.7.100)
-- avresample: NO
-- GStreamer: YES (1.22.0)
-- v4l/v4l2: YES (linux/videodev2.h)
...
-- Python 3:
-- Interpreter: /home/uname/miniforge3/envs/tfl/bin/python3 (ver 3.9.21)
-- Libraries: /home/uname/miniforge3/envs/tfl/lib/libpython3.9.so (ver 3.9.21)
-- Limited API: NO
-- numpy: /home/uname/miniforge3/envs/tfl/lib/python3.9/site-packages/numpy/_core/include (ver 2.0.2)
-- install path: lib/python3.9/site-packages/cv2/python-3.9
Here, for instance, is how to install developer gstreamer libs:
~$ sudo apt-get install \
libgstreamer1.0-dev \
libgstreamer-plugins-base1.0-dev \
gstreamer1.0-tools \
gstreamer1.0-plugins-base \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly
Then, if cmake output is correct, after
~$ make -j$(nproc)
~$ make install
the cv2 module should be installed in the environment that was activated during cmake configuration.
Here is the test program to verify the gstreamer in Python functionality:
import cv2
import time
print("cv2 imported")
pipeline = (
"v4l2src device=/dev/video3 io-mode=dmabuf ! "
"video/x-bayer, width=1920, height=1080, framerate=30/1, format=rggb ! "
"bayer2rgb ! "
"videoconvert ! "
"video/x-raw, format=BGR ! "
"appsink drop=true"
)
cap = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER)
if not cap.isOpened():
raise RuntimeError("Failed to open capture device.")
print("pre-sleep")
# Allow some time for the camera to initialize
time.sleep(3)
# Capture and write 5 frames to disk
for i in range(2):
print("inside for loop")
ret, frame = cap.read()
if not ret:
print(f"Failed to capture frame {i}")
continue
cv2.imwrite(f"frame_{i}.png", frame)
print(f"Captured frame {i}")
cap.release()
And here is the full modified object detection pipeline from the TF Lite object detection tutorial. It was tested in headless setup, so only prints in console, and does not have a good termination aside of Ctrl-C, but it works.
import cv2
from threading import Thread
class VideoStream:
"""Handles video streaming via GStreamer pipeline."""
def __init__(self, pipeline):
# Important: specify CAP_GSTREAMER when using a GStreamer pipeline with OpenCV
self.stream = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER)
if not self.stream.isOpened():
raise RuntimeError("Failed to open video stream with the provided GStreamer pipeline.")
self.grabbed, self.frame = self.stream.read()
self.stopped = False
def start(self):
"""Starts the thread that reads frames from the video stream."""
Thread(target=self.update, args=(), daemon=True).start()
return self
def update(self):
"""Continuously updates the frame from the video stream."""
while not self.stopped:
# Keep grabbing frames as long as the stream is open
self.grabbed, self.frame = self.stream.read()
if not self.grabbed:
# If frame grabbing failed, optionally you can break or set self.stopped = True
break
self.stream.release()
def read(self):
"""Returns the most recent frame."""
return self.frame
def stop(self):
"""Stops the video stream and closes resources."""
self.stopped = True
pipeline = (
"v4l2src device=/dev/video3 io-mode=dmabuf ! "
"video/x-bayer, width=1920, height=1080, framerate=30/1, format=rggb ! "
"bayer2rgb ! "
"videoconvert ! "
"video/x-raw, format=BGR ! "
"appsink drop=true"
)
def main():
import argparse
import os
import time
import numpy as np
from tflite_runtime.interpreter import Interpreter
parser = argparse.ArgumentParser()
parser.add_argument('--modeldir', required=True, help='Folder where the .tflite file is located')
parser.add_argument('--graph', default='detect.tflite', help='Name of the .tflite file')
parser.add_argument('--labels', default='labelmap.txt', help='Name of the labelmap file')
parser.add_argument('--threshold', default='0.5', help='Minimum confidence threshold')
parser.add_argument('--resolution', default='1920x1080',
help='Desired resolution in WxH. (NOTE: GStreamer pipeline may override)')
args = parser.parse_args()
# -------------------------------------------------------------------------
# Build the mostly-hardcoded pipeline string
# -------------------------------------------------------------------------
resW, resH = map(int, args.resolution.split('x'))
pipeline = (
f"v4l2src device=/dev/video3 io-mode=dmabuf ! "
f"video/x-bayer, width={resW}, height={resH}, framerate=30/1, format=rggb ! "
f"bayer2rgb ! "
f"videoconvert ! "
f"video/x-raw, format=BGR ! "
f"appsink drop=true"
)
# -------------------------------------------------------------------------
# Initialize the VideoStream using the GStreamer pipeline
# -------------------------------------------------------------------------
videostream = VideoStream(pipeline).start()
time.sleep(2) # Warm-up delay for camera
# -------------------------------------------------------------------------
# Load custom labels and TFLite model
# -------------------------------------------------------------------------
model_path = os.path.join(os.getcwd(), args.modeldir, args.graph)
labelmap_path = os.path.join(os.getcwd(), args.modeldir, args.labels)
# Load labels
def load_labels(path):
with open(path, 'r') as f:
lines = [line.strip() for line in f.readlines()]
if lines[0] == '???':
lines.pop(0)
return lines
labels = load_labels(labelmap_path)
# Load TFLite model
interpreter = Interpreter(model_path=model_path)
interpreter.allocate_tensors()
# -------------------------------------------------------------------------
# Get input & output tensor details
# -------------------------------------------------------------------------
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
# Model expects input shape: [1, height, width, 3]
imH, imW = map(int, input_details[0]['shape'][1:3])
print(f"type imW {type(imW)}")
# Indices for boxes, classes, and scores can differ in certain models
outname = output_details[0]['name']
if 'StatefulPartitionedCall' in outname:
# typical with some TF2 exported models
boxes_idx, classes_idx, scores_idx = 1, 3, 0
else:
boxes_idx, classes_idx, scores_idx = 0, 1, 2
# Floating model?
floating_model = (input_details[0]['dtype'] == np.float32)
min_conf_threshold = float(args.threshold)
frame_rate_calc = 1
freq = cv2.getTickFrequency()
# -------------------------------------------------------------------------
# Main loop
# -------------------------------------------------------------------------
while True:
t1 = cv2.getTickCount()
frame = videostream.read()
if frame is None:
# Handle the case where no frame is retrieved
print("No frame grabbed; stopping.")
break
# Convert and resize for model input
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_resized = cv2.resize(frame_rgb, (imW, imH))
input_data = np.expand_dims(frame_resized, axis=0)
# Floating model needs normalization
if floating_model:
input_data = (np.float32(input_data) - 127.5) / 127.5
# Run inference
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
# Gather results
boxes = interpreter.get_tensor(output_details[boxes_idx]['index'])[0]
classes = interpreter.get_tensor(output_details[classes_idx]['index'])[0]
scores = interpreter.get_tensor(output_details[scores_idx]['index'])[0]
# Print labels
for i in range(len(scores)):
if (scores[i] > min_conf_threshold) and (scores[i] <= 1.0):
object_name = labels[int(classes[i])] if int(classes[i]) < len(labels) else 'N/A'
label = f"{object_name}: {int(scores[i]*100)}%"
print(label)
# Print FPS
t2 = cv2.getTickCount()
time1 = (t2 - t1)/freq
frame_rate_calc = 1/time1
print(frame_rate_calc)
# Clean up # this code is never actually reached, todo: graceful exit
videostream.stop()
if __name__ == "__main__":
main()