Interfacing the RPi camera module 2 with Python on BY-AI

Hello,

The default tutorial in the documentation file did not really work for me, so I wanted to share some of experience interfacing Raspberry Pi v2 camera modules (CSI, Sony IMX219 8-megapixel sensor) with a Python program running on the BeagleY-AI platform here.

The first steps were the same as in the Using IMX219 CSI Cameras tutorial, i.e. adding a device tree overlay to /boot/firmware/extlinux/extlinux.conf, running sudo beagle-camera-setup etc. After the camera appeared in the /dev/ folder, I’ve used the following command to test it and obtain several frames:

~$ gst-launch-1.0 -v \
    v4l2src num-buffers=5 device=/dev/video3 io-mode=dmabuf ! \
    video/x-bayer, width=1920, height=1080, framerate=30/1, format=rggb ! \
    bayer2rgb ! videoconvert ! jpegenc ! \
    multifilesink location="imx219-image-%d.jpg"

Although the shell command worked correctly, obtaining frames within a Python script proved to be challenging. What worked in the end was building opencv from sources with gstreamer support.

Initially, I tried to make a minimalistic opencv configuration with only the modules that I needed using -D BUILD_LIST option, but faced problems due to opencv internal bugs (e.g. the flag to not build Python documentation was ignored, and stubs generation failed because it referenced some dnn functions despite them being disabled too).

Eventually, the following full build command worked:

~$ cmake \
  -D CMAKE_BUILD_TYPE=Release \
  -D CMAKE_INSTALL_PREFIX=$CONDA_PREFIX \
  -D BUILD_TESTS=OFF \
  -D BUILD_PERF_TESTS=OFF \
  -D BUILD_EXAMPLES=OFF \
  -D BUILD_opencv_apps=OFF \
  -D BUILD_JAVA=OFF \
  -D PYTHON3_EXECUTABLE="$(which python3)" \
  -D PYTHON3_INCLUDE_DIR="$(python3 -c 'import sysconfig; print(sysconfig.get_paths()["include"])')" \
  -D PYTHON3_LIBRARY="$(python3 -c 'import sysconfig; print(sysconfig.get_config_var("LIBDIR"))')/libpython3.9.so" \
  -D PYTHON3_NUMPY_INCLUDE_DIRS="$(python3 -c 'import numpy; print(numpy.get_include())')" \
  -D OPENCV_PYTHON_INSTALL_PATH="lib/python3.9/site-packages" \
  -D WITH_GSTREAMER=ON \
  -D WITH_PNG=ON \
  -D WITH_JPEG=OFF \
  -D WITH_TIFF=OFF \
  -D WITH_WEBP=OFF \
  -D WITH_OPENJPEG=OFF \
  -D WITH_JASPER=OFF \
  -D WITH_OPENEXR=OFF \
  ../opencv

In cmake output, one needs to make sure that gstreamer is ON and python3 is on, like:

Video I/O:
--     FFMPEG:                      YES
--       avcodec:                   YES (59.37.100)
--       avformat:                  YES (59.27.100)
--       avutil:                    YES (57.28.100)
--       swscale:                   YES (6.7.100)
--       avresample:                NO
--     GStreamer:                   YES (1.22.0)
--     v4l/v4l2:                    YES (linux/videodev2.h)
...
--   Python 3:
--     Interpreter:                 /home/uname/miniforge3/envs/tfl/bin/python3 (ver 3.9.21)
--     Libraries:                   /home/uname/miniforge3/envs/tfl/lib/libpython3.9.so (ver 3.9.21)
--     Limited API:                 NO
--     numpy:                       /home/uname/miniforge3/envs/tfl/lib/python3.9/site-packages/numpy/_core/include (ver 2.0.2)
--     install path:                lib/python3.9/site-packages/cv2/python-3.9

Here, for instance, is how to install developer gstreamer libs:

~$ sudo apt-get install \
  libgstreamer1.0-dev \
  libgstreamer-plugins-base1.0-dev \
  gstreamer1.0-tools \
  gstreamer1.0-plugins-base \
  gstreamer1.0-plugins-good \
  gstreamer1.0-plugins-bad \
  gstreamer1.0-plugins-ugly

Then, if cmake output is correct, after

~$ make -j$(nproc)
~$ make install

the cv2 module should be installed in the environment that was activated during cmake configuration.

Here is the test program to verify the gstreamer in Python functionality:

import cv2
import time

print("cv2 imported")

pipeline = (
    "v4l2src device=/dev/video3 io-mode=dmabuf ! "
    "video/x-bayer, width=1920, height=1080, framerate=30/1, format=rggb ! "
    "bayer2rgb ! "
    "videoconvert ! "
    "video/x-raw, format=BGR ! "
    "appsink drop=true"
)

cap = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER)
if not cap.isOpened():
    raise RuntimeError("Failed to open capture device.")

print("pre-sleep")

# Allow some time for the camera to initialize
time.sleep(3)

# Capture and write 5 frames to disk
for i in range(2):
    print("inside for loop")
    ret, frame = cap.read()
    if not ret:
        print(f"Failed to capture frame {i}")
        continue
    cv2.imwrite(f"frame_{i}.png", frame)
    print(f"Captured frame {i}")

cap.release()

And here is the full modified object detection pipeline from the TF Lite object detection tutorial. It was tested in headless setup, so only prints in console, and does not have a good termination aside of Ctrl-C, but it works.

import cv2
from threading import Thread

class VideoStream:
    """Handles video streaming via GStreamer pipeline."""
    def __init__(self, pipeline):
        # Important: specify CAP_GSTREAMER when using a GStreamer pipeline with OpenCV
        self.stream = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER)
        if not self.stream.isOpened():
            raise RuntimeError("Failed to open video stream with the provided GStreamer pipeline.")

        self.grabbed, self.frame = self.stream.read()
        self.stopped = False

    def start(self):
        """Starts the thread that reads frames from the video stream."""
        Thread(target=self.update, args=(), daemon=True).start()
        return self

    def update(self):
        """Continuously updates the frame from the video stream."""
        while not self.stopped:
            # Keep grabbing frames as long as the stream is open
            self.grabbed, self.frame = self.stream.read()
            if not self.grabbed:
                # If frame grabbing failed, optionally you can break or set self.stopped = True
                break

        self.stream.release()

    def read(self):
        """Returns the most recent frame."""
        return self.frame

    def stop(self):
        """Stops the video stream and closes resources."""
        self.stopped = True

pipeline = (
    "v4l2src device=/dev/video3 io-mode=dmabuf ! "
    "video/x-bayer, width=1920, height=1080, framerate=30/1, format=rggb ! "
    "bayer2rgb ! "
    "videoconvert ! "
    "video/x-raw, format=BGR ! "
    "appsink drop=true"
)

def main():
    import argparse
    import os
    import time
    import numpy as np
    from tflite_runtime.interpreter import Interpreter
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--modeldir', required=True, help='Folder where the .tflite file is located')
    parser.add_argument('--graph', default='detect.tflite', help='Name of the .tflite file')
    parser.add_argument('--labels', default='labelmap.txt', help='Name of the labelmap file')
    parser.add_argument('--threshold', default='0.5', help='Minimum confidence threshold')
    parser.add_argument('--resolution', default='1920x1080',
                        help='Desired resolution in WxH. (NOTE: GStreamer pipeline may override)')
    args = parser.parse_args()

    # -------------------------------------------------------------------------
    # Build the mostly-hardcoded pipeline string
    # -------------------------------------------------------------------------
    resW, resH = map(int, args.resolution.split('x'))
    pipeline = (
        f"v4l2src device=/dev/video3 io-mode=dmabuf ! "
        f"video/x-bayer, width={resW}, height={resH}, framerate=30/1, format=rggb ! "
        f"bayer2rgb ! "
        f"videoconvert ! "
        f"video/x-raw, format=BGR ! "
        f"appsink drop=true"
    )
    
    # -------------------------------------------------------------------------
    # Initialize the VideoStream using the GStreamer pipeline
    # -------------------------------------------------------------------------
    videostream = VideoStream(pipeline).start()
    time.sleep(2)  # Warm-up delay for camera

    # -------------------------------------------------------------------------
    # Load custom labels and TFLite model
    # -------------------------------------------------------------------------
    model_path = os.path.join(os.getcwd(), args.modeldir, args.graph)
    labelmap_path = os.path.join(os.getcwd(), args.modeldir, args.labels)
    
    # Load labels
    def load_labels(path):
        with open(path, 'r') as f:
            lines = [line.strip() for line in f.readlines()]
        if lines[0] == '???':
            lines.pop(0)
        return lines

    labels = load_labels(labelmap_path)

    # Load TFLite model
    interpreter = Interpreter(model_path=model_path)
    interpreter.allocate_tensors()

    # -------------------------------------------------------------------------
    # Get input & output tensor details
    # -------------------------------------------------------------------------
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    # Model expects input shape: [1, height, width, 3]
    imH, imW = map(int, input_details[0]['shape'][1:3])
    print(f"type imW {type(imW)}")
    # Indices for boxes, classes, and scores can differ in certain models
    outname = output_details[0]['name']
    if 'StatefulPartitionedCall' in outname:  
        # typical with some TF2 exported models
        boxes_idx, classes_idx, scores_idx = 1, 3, 0
    else:
        boxes_idx, classes_idx, scores_idx = 0, 1, 2

    # Floating model?
    floating_model = (input_details[0]['dtype'] == np.float32)
    min_conf_threshold = float(args.threshold)

    frame_rate_calc = 1
    freq = cv2.getTickFrequency()

    # -------------------------------------------------------------------------
    # Main loop
    # -------------------------------------------------------------------------
    while True:
        t1 = cv2.getTickCount()
        frame = videostream.read()
        if frame is None:
            # Handle the case where no frame is retrieved
            print("No frame grabbed; stopping.")
            break

        # Convert and resize for model input
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame_resized = cv2.resize(frame_rgb, (imW, imH))
        input_data = np.expand_dims(frame_resized, axis=0)

        # Floating model needs normalization
        if floating_model:
            input_data = (np.float32(input_data) - 127.5) / 127.5

        # Run inference
        interpreter.set_tensor(input_details[0]['index'], input_data)
        interpreter.invoke()

        # Gather results
        boxes   = interpreter.get_tensor(output_details[boxes_idx]['index'])[0]
        classes = interpreter.get_tensor(output_details[classes_idx]['index'])[0]
        scores  = interpreter.get_tensor(output_details[scores_idx]['index'])[0]

        # Print labels
        for i in range(len(scores)):
            if (scores[i] > min_conf_threshold) and (scores[i] <= 1.0):
                object_name = labels[int(classes[i])] if int(classes[i]) < len(labels) else 'N/A'
                label = f"{object_name}: {int(scores[i]*100)}%"
                print(label)

        # Print FPS
        t2 = cv2.getTickCount()
        time1 = (t2 - t1)/freq
        frame_rate_calc = 1/time1
        print(frame_rate_calc)

    # Clean up # this code is never actually reached, todo: graceful exit
    videostream.stop()

if __name__ == "__main__":
    main()
3 Likes

Thats awesome!

Remember, the Documentation site is always grateful for MR’s
if you found anything to be wrong, incomplete or missing…

Documentation / docs.beagleboard.io · GitLab

2 Likes

I figured out a way to achieve this with the default V4L2 backend in a way that doesn’t require building OpenCV with GStreamer support.

The following can be used to accomplish this and the full code demos can be found here.

Bootstrap the Camera

NOTE: You MUST run this code EVERY time you wish to use the CSI camera after booting or rebooting!

sudo beagle-camera-setup
media-ctl -V '"imx219 5-0010":0[fmt:SRGGB8_1X8/640x480 field:none]'

Camera Settings

These settings will set the camera gain and exposure for the camera to a sensible default.

If you find that your image is washed out, too dark or even blank, you may need to tweak these parameters for your own lighting environment.

v4l2-ctl -d /dev/v4l-subdev2 --set-ctrl=digital_gain=2048
v4l2-ctl -d /dev/v4l-subdev2 --set-ctrl=analogue_gain=230
v4l2-ctl -d /dev/v4l-subdev2 --set-ctrl=exposure=1750

Python Code Changes

Change the following lines (beginning on line 17):

        self.stream.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
        self.stream.set(3, resolution[0])
        self.stream.set(4, resolution[1])

to

        self.stream.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'RGGB'))
#        self.stream.set(3, resolution[0])
#        self.stream.set(4, resolution[1])
1 Like