import time, os, sys
from UAV.logging import logging
from UAV.mavlink.mavcom import MAVCom, time_since_boot_ms, time_UTC_usec, date_time_str
from UAV.mavlink.component import Component, mavutil, mavlink, MAVLink
import cv2
from UAV.utils.general import boot_time_str, With, find_config_dir, read_camera_dict_from_toml
from UAV.camera.gst_cam import *Camera Fake
Opencv and GST Fake cameras for testing
https://mavlink.io/en/services/camera.html https://github.com/mavlink/mavlink-camera-manager
# show_doc(create_toml_file)import types
def get_all_methods(cls):
methods = []
for name in dir(cls):
attr = getattr(cls, name)
if isinstance(attr, types.FunctionType):
methods.append(name)
return methodsmethods = get_all_methods(GSTCamera)
print (methods)['__enter__', '__exit__', '__init__', '__repr__', '__str__', '_open', 'calculate_memory_usage', 'camera_capture_status_send', 'camera_image_captured_send', 'camera_information_send', 'camera_settings_send', 'close', 'get_camera_info', 'image_capture_thread_is_running', 'image_start_capture', 'image_stop_capture', 'list_files', 'load_image_from_memoryfs', 'on_capture_image', 'on_start_image_capture', 'on_status_video_capture', 'on_stop_image_capture', 'on_video_callback', 'pause', 'play', 'save_image_to_memoryfs', 'set_source_compenent', 'show_image', 'storage_information_send', 'time_UTC_usec', 'video_start_capture', 'video_start_streaming', 'video_stop_capture', 'video_stop_streaming']
for method in methods:
s = f"GSTCamera.{method}"
print (s)
show_doc(s)GSTCamera.__enter__
GSTCamera.__exit__
GSTCamera.__init__
GSTCamera.__repr__
GSTCamera.__str__
GSTCamera._open
GSTCamera.calculate_memory_usage
GSTCamera.camera_capture_status_send
GSTCamera.camera_image_captured_send
GSTCamera.camera_information_send
GSTCamera.camera_settings_send
GSTCamera.close
GSTCamera.get_camera_info
GSTCamera.image_capture_thread_is_running
GSTCamera.image_start_capture
GSTCamera.image_stop_capture
GSTCamera.list_files
GSTCamera.load_image_from_memoryfs
GSTCamera.on_capture_image
GSTCamera.on_start_image_capture
GSTCamera.on_status_video_capture
GSTCamera.on_stop_image_capture
GSTCamera.on_video_callback
GSTCamera.pause
GSTCamera.play
GSTCamera.save_image_to_memoryfs
GSTCamera.set_source_compenent
GSTCamera.show_image
GSTCamera.storage_information_send
GSTCamera.time_UTC_usec
GSTCamera.video_start_capture
GSTCamera.video_start_streaming
GSTCamera.video_stop_capture
GSTCamera.video_stop_streaming
GSTCamera.save_image_to_memoryfs
GSTCamera.save_image_to_memoryfs (data:bytes, filename:str)
Save image to memory filesystem.
| Type | Details | |
|---|---|---|
| data | bytes | jpeg encoded image to save |
| filename | str | filename to save image |
import inspect
from nbdev.showdoc import show_doc
class Example:
def method_a(self):
"This is method A"
pass
def method_b(self):
"This is method B"
pass
# To show docs for all methods in a class
for name, func in inspect.getmembers(Example, inspect.isfunction):
print(name)
show_doc(func)method_a
method_b
from nbdev.doclinks import NbdevLookup
from nbdev.showdoc import _fmt_sig
def _html_link(url, txt): return f'{txt}'
class BasicHtmlRenderer(ShowDocRenderer):
"Simple HTML renderer for `show_doc`"
def _repr_html_(self):
doc = '\n'
doc += f'{self.nm}\n'
doc += f'{self.nm}{_fmt_sig(self.sig)}'
if self.docs: doc += f"{self.docs}"
return doc
def doc(self):
"Show `show_doc` info along with link to docs"
from IPython.display import display,HTML
res = self._repr_html_()
docs = NbdevLookup().doc(self.fn)
if docs is not None: res += '\n' +_html_link(docs, "Show in docs") + ''
display(HTML(res))def doc(elt):
"Show `show_doc` info along with link to docs"
BasicHtmlRenderer(elt).doc()doc(GSTCamera.save_image_to_memoryfs)
GSTCamera.save_image_to_memoryfs
GSTCamera.save_image_to_memoryfs(data:bytes, filename:str)Save image to memory filesystem.
GSTCamera.save_image_to_memoryfs
GSTCamera.save_image_to_memoryfs (data:bytes, filename:str)
Save image to memory filesystem.
| Type | Details | |
|---|---|---|
| data | bytes | jpeg encoded image to save |
| filename | str | filename to save image |
# config_path = Path("../../config")
# create_toml_file(find_config_dir()/"____test_camera_info.toml")# assert False, "stop here"# assert False, "stop here"from gstreamer import GstVidSrcValve, GstVideoSave, GstJpegEnc
import gstreamer.utils as gst_utilsGSTCamera
GSTCamera (camera_dict=None, udp_encoder='H264', loglevel=20)
Create a fake camera component for testing using GStreamer
| Type | Default | Details | |
|---|---|---|---|
| camera_dict | NoneType | None | camera_info dict |
| udp_encoder | str | H264 | encoder for video streaming |
| loglevel | int | 20 | log flag |
GSTCamera.image_start_capture
GSTCamera.image_start_capture (interval, count)
Start image capture sequence.
| Details | |
|---|---|
| interval | Image capture interval |
| count | Number of images to capture (0 for unlimited) |
BaseCamera.camera_information_send
BaseCamera.camera_information_send ()
Information about a camera. Can be requested with a MAV_CMD_REQUEST_MESSAGE command.
CV2Camera.camera_settings_send
CV2Camera.camera_settings_send ()
Information about a camera. Can be requested with a MAV_CMD_REQUEST_MESSAGE command.
read_camera_dict_from_toml
read_camera_dict_from_toml (toml_file_path)
Read MAVLink camera info from a TOML file.
| Type | Details | |
|---|---|---|
| toml_file_path | path to TOML file | |
| Returns | dict | camera_info dict |
# with GstContext(loglevel=LogLevels.CRITICAL): # GST main loop in thread
# with GstPipeline(command_h264_display, loglevel=LogLevels.INFO) as disp_pipeline:
# with GstPipeline(command_src, loglevel=10) as src_pipeline:
# with GstJpegEnc(command_jpg, max_count=5, on_jpeg_capture=on_capture, loglevel=LogLevels.INFO) as jpg_pipeline:
# while not jpg_pipeline.is_done:
# time.sleep(.1)
#
# with GstStreamUDP(command_udp, on_callback=on_video_callback, loglevel=LogLevels.INFO) as udp_pipeline:
# time.sleep(5)print (f"{boot_time_str =}")
# connection_string = 'udp:127.0.0.1:14550'
# mav = mavutil.mavlink_connection(connection_string)
with GSTCamera( camera_dict=read_camera_dict_from_toml(find_config_dir()/"test_camera_info.toml")) as cam_gst_1:
cam_gst_1.image_start_capture(interval=.1, count=5)
time.sleep(1)
# while cam_gst_1.pipeline.is_done is False:
# # if cam_gst_1.last_image is not None:
# time.sleep(0.1)
# cv2.imshow('image', cam_gst_1.last_image)
# cam_gst_1.last_image = None
# cv2.waitKey(10)
# cv2.waitKey(500)
# cv2.destroyAllWindows()INFO | uav.GSTCamera | 29.670 | gst_cam.py:350 | MainThread | GSTCamera Started
INFO | pygst.GstPipeli | 29.673 | gst_tools.py:223 | MainThread | Starting GstPipeline: videotestsrc pattern=ball is-live=true ! capsfilter caps=video/x-raw,format=RGB,width=640,height=480,framerate=10/1 ! tee name=t t. ! queue ! videoconvert ! autovideosink t. ! queue leaky=2 ! intervideosink channel=channel_0 sync=false t. ! queue leaky=2 ! intervideosink channel=channel_1 sync=false t. ! queue leaky=2 ! intervideosink channel=channel_2 sync=false
INFO | pygst.GstJpegEn | 29.686 | gst_tools.py:223 | MainThread | Starting GstJpegEnc: intervideosrc channel=channel_1 ! videoconvert ! videoscale ! video/x-raw,width=640,height=480,framerate=10/1 ! queue ! jpegenc quality=85 ! appsink name=mysink emit-signals=True max-buffers=1 drop=True
INFO | uav.GSTCamera | 29.710 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:29_0000.jpg
INFO | uav.GSTCamera | 29.810 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:29_0001.jpg
INFO | uav.GSTCamera | 29.910 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:29_0002.jpg
INFO | uav.GSTCamera | 30.011 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:30_0003.jpg
INFO | uav.GSTCamera | 30.110 | gst_cam.py:577 | Thread-13 (_launch | Image saved to memory filesystem with name: 2023-10-21|10:37:30_0004.jpg
INFO | pygst.GstJpegEn | 30.111 | gst_tools.py:884 | Thread-13 (_launch | Sending EOS event, to trigger shutdown of pipeline
INFO | pygst.GstPipeli | 30.829 | gst_tools.py:306 | MainThread | GstPipeline Shutdown
INFO | uav.GSTCamera | 30.830 | gst_cam.py:518 | MainThread | GSTCamera closed
boot_time_str ='2023-10-21|10:29:09'
John Doe