Camera Fake

Opencv and GST Fake cameras for testing

https://mavlink.io/en/services/camera.html https://github.com/mavlink/mavlink-camera-manager

import time, os, sys

from UAV.logging import logging
from UAV.mavlink.mavcom import MAVCom, time_since_boot_ms, time_UTC_usec, date_time_str
from UAV.mavlink.component import Component, mavutil, mavlink, MAVLink

import cv2

from UAV.utils.general import boot_time_str, With, find_config_dir, read_camera_dict_from_toml
from UAV.camera.gst_cam import *
# show_doc(create_toml_file)
import types


def get_all_methods(cls):
    methods = []
    for name in dir(cls):
        attr = getattr(cls, name)
        if isinstance(attr, types.FunctionType):
            methods.append(name)
    return methods
methods = get_all_methods(GSTCamera)
print (methods)
['__enter__', '__exit__', '__init__', '__repr__', '__str__', '_open', 'calculate_memory_usage', 'camera_capture_status_send', 'camera_image_captured_send', 'camera_information_send', 'camera_settings_send', 'close', 'get_camera_info', 'image_capture_thread_is_running', 'image_start_capture', 'image_stop_capture', 'list_files', 'load_image_from_memoryfs', 'on_capture_image', 'on_start_image_capture', 'on_status_video_capture', 'on_stop_image_capture', 'on_video_callback', 'pause', 'play', 'save_image_to_memoryfs', 'set_source_compenent', 'show_image', 'storage_information_send', 'time_UTC_usec', 'video_start_capture', 'video_start_streaming', 'video_stop_capture', 'video_stop_streaming']
for method in methods:
    s = f"GSTCamera.{method}"
    print (s)
    show_doc(s)
GSTCamera.__enter__
GSTCamera.__exit__
GSTCamera.__init__
GSTCamera.__repr__
GSTCamera.__str__
GSTCamera._open
GSTCamera.calculate_memory_usage
GSTCamera.camera_capture_status_send
GSTCamera.camera_image_captured_send
GSTCamera.camera_information_send
GSTCamera.camera_settings_send
GSTCamera.close
GSTCamera.get_camera_info
GSTCamera.image_capture_thread_is_running
GSTCamera.image_start_capture
GSTCamera.image_stop_capture
GSTCamera.list_files
GSTCamera.load_image_from_memoryfs
GSTCamera.on_capture_image
GSTCamera.on_start_image_capture
GSTCamera.on_status_video_capture
GSTCamera.on_stop_image_capture
GSTCamera.on_video_callback
GSTCamera.pause
GSTCamera.play
GSTCamera.save_image_to_memoryfs
GSTCamera.set_source_compenent
GSTCamera.show_image
GSTCamera.storage_information_send
GSTCamera.time_UTC_usec
GSTCamera.video_start_capture
GSTCamera.video_start_streaming
GSTCamera.video_stop_capture
GSTCamera.video_stop_streaming

GSTCamera.save_image_to_memoryfs

 GSTCamera.save_image_to_memoryfs (data:bytes, filename:str)

Save image to memory filesystem.

Type Details
data bytes jpeg encoded image to save
filename str filename to save image
import inspect
from nbdev.showdoc import show_doc

class Example:
    def method_a(self):
        "This is method A"
        pass
    
    def method_b(self):
        "This is method B"
        pass

# To show docs for all methods in a class
for name, func in inspect.getmembers(Example, inspect.isfunction):
    print(name)
    show_doc(func)
method_a
method_b
from nbdev.doclinks import NbdevLookup
from nbdev.showdoc import _fmt_sig


def _html_link(url, txt): return f'{txt}'

class BasicHtmlRenderer(ShowDocRenderer):
    "Simple HTML renderer for `show_doc`"
    def _repr_html_(self):
        doc = '\n'
        doc += f'{self.nm}\n'
        doc += f'{self.nm}{_fmt_sig(self.sig)}'
        if self.docs: doc += f"{self.docs}"
        return doc

    def doc(self):
        "Show `show_doc` info along with link to docs"
        from IPython.display import display,HTML
        res = self._repr_html_()
        docs = NbdevLookup().doc(self.fn)
        if docs is not None: res += '\n' +_html_link(docs, "Show in docs") + ''
        display(HTML(res))
def doc(elt):
    "Show `show_doc` info along with link to docs"
    BasicHtmlRenderer(elt).doc()
doc(GSTCamera.save_image_to_memoryfs)
GSTCamera.save_image_to_memoryfs GSTCamera.save_image_to_memoryfs(data:bytes, filename:str)Save image to memory filesystem.

GSTCamera.save_image_to_memoryfs

 GSTCamera.save_image_to_memoryfs (data:bytes, filename:str)

Save image to memory filesystem.

Type Details
data bytes jpeg encoded image to save
filename str filename to save image
# config_path = Path("../../config")
# create_toml_file(find_config_dir()/"____test_camera_info.toml")
# assert False, "stop here"
# assert False, "stop here"
from gstreamer import GstVidSrcValve,  GstVideoSave, GstJpegEnc
import gstreamer.utils as gst_utils

GSTCamera

 GSTCamera (camera_dict=None, udp_encoder='H264', loglevel=20)

Create a fake camera component for testing using GStreamer

Type Default Details
camera_dict NoneType None camera_info dict
udp_encoder str H264 encoder for video streaming
loglevel int 20 log flag

GSTCamera.image_start_capture

 GSTCamera.image_start_capture (interval, count)

Start image capture sequence.

Details
interval Image capture interval
count Number of images to capture (0 for unlimited)

BaseCamera.camera_information_send

 BaseCamera.camera_information_send ()

Information about a camera. Can be requested with a MAV_CMD_REQUEST_MESSAGE command.


CV2Camera.camera_settings_send

 CV2Camera.camera_settings_send ()

Information about a camera. Can be requested with a MAV_CMD_REQUEST_MESSAGE command.


read_camera_dict_from_toml

 read_camera_dict_from_toml (toml_file_path)

Read MAVLink camera info from a TOML file.

Type Details
toml_file_path path to TOML file
Returns dict camera_info dict
# with GstContext(loglevel=LogLevels.CRITICAL):  # GST main loop in thread
#     with GstPipeline(command_h264_display, loglevel=LogLevels.INFO) as disp_pipeline:
#         with GstPipeline(command_src, loglevel=10) as src_pipeline:
#             with GstJpegEnc(command_jpg, max_count=5, on_jpeg_capture=on_capture, loglevel=LogLevels.INFO) as jpg_pipeline:
#                 while not jpg_pipeline.is_done:
#                     time.sleep(.1)
# 
#             with GstStreamUDP(command_udp, on_callback=on_video_callback, loglevel=LogLevels.INFO) as udp_pipeline:
#                 time.sleep(5)
print (f"{boot_time_str =}")
# connection_string = 'udp:127.0.0.1:14550'
# mav = mavutil.mavlink_connection(connection_string)


with  GSTCamera( camera_dict=read_camera_dict_from_toml(find_config_dir()/"test_camera_info.toml")) as cam_gst_1:
    cam_gst_1.image_start_capture(interval=.1, count=5)
    time.sleep(1)
    # while cam_gst_1.pipeline.is_done is False:
    #     # if cam_gst_1.last_image is not None:
    #     time.sleep(0.1)
            # cv2.imshow('image', cam_gst_1.last_image)
            # cam_gst_1.last_image = None
            # cv2.waitKey(10)
            
    # cv2.waitKey(500)
    # cv2.destroyAllWindows()
INFO   | uav.GSTCamera   | 29.670 | gst_cam.py:350 | MainThread         | GSTCamera Started
INFO   | pygst.GstPipeli | 29.673 | gst_tools.py:223 | MainThread         | Starting GstPipeline: videotestsrc pattern=ball is-live=true ! capsfilter caps=video/x-raw,format=RGB,width=640,height=480,framerate=10/1 ! tee name=t t. ! queue ! videoconvert ! autovideosink t. ! queue leaky=2 ! intervideosink channel=channel_0  sync=false t. ! queue leaky=2 ! intervideosink channel=channel_1  sync=false t. ! queue leaky=2 ! intervideosink channel=channel_2  sync=false
INFO   | pygst.GstJpegEn | 29.686 | gst_tools.py:223 | MainThread         | Starting GstJpegEnc: intervideosrc channel=channel_1   ! videoconvert ! videoscale ! video/x-raw,width=640,height=480,framerate=10/1 ! queue ! jpegenc quality=85 ! appsink name=mysink emit-signals=True max-buffers=1 drop=True
INFO   | uav.GSTCamera   | 29.710 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:29_0000.jpg
INFO   | uav.GSTCamera   | 29.810 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:29_0001.jpg
INFO   | uav.GSTCamera   | 29.910 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:29_0002.jpg
INFO   | uav.GSTCamera   | 30.011 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:30_0003.jpg
INFO   | uav.GSTCamera   | 30.110 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:30_0004.jpg
INFO   | pygst.GstJpegEn | 30.111 | gst_tools.py:884 | Thread-13 (_launch | Sending EOS event, to trigger shutdown of pipeline
INFO   | pygst.GstPipeli | 30.829 | gst_tools.py:306 | MainThread         | GstPipeline Shutdown
INFO   | uav.GSTCamera   | 30.830 | gst_cam.py:518 | MainThread         | GSTCamera closed
boot_time_str ='2023-10-21|10:29:09'
John Doe