Source code for payloadcomputerdroneprojekt.camera.gazebo_sitl.gazebo_camera_lib

#!/usr/bin/env python

import cv2
import gi
import numpy as np

gi.require_version('Gst', '1.0')
from gi.repository import Gst  # noqa: E402


[docs] class Video(): """BlueRov video capture class constructor Attributes: port (int): Video UDP port video_codec (string): Source h264 parser video_decode (string): Transform YUV (12bits) to BGR (24bits) video_pipe (object): GStreamer top-level pipeline video_sink (object): Gstreamer sink element video_sink_conf (string): Sink configuration video_source (string): Udp source ip and port """ def __init__(self, port=5600): """Summary Args: port (int, optional): UDP port """ Gst.init(None) self.port = port self._frame = None # UDP video stream (:5600) self.video_source = 'udpsrc port={}'.format(self.port) # Cam -> CSI-2 -> H264 Raw (YUV 4-4-4 (12bits) I420) self.video_codec = \ ('! application/x-rtp, payload=96 ! ' 'rtph264depay ! h264parse ! avdec_h264') # Python don't have nibble, convert YUV nibbles (4-4-4) # to OpenCV standard BGR bytes (8-8-8) self.video_decode = \ ('! decodebin ! videoconvert ! ' 'video/x-raw,format=(string)BGR ! videoconvert') # Create a sink to get data self.video_sink_conf = \ '! appsink emit-signals=true sync=false max-buffers=2 drop=true' self.video_pipe = None self.video_sink = None self.run()
[docs] def start_gst(self, config=None): """ Start gstreamer pipeline and sink Pipeline description list e.g: [ 'videotestsrc ! decodebin', \ '! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert', '! appsink' ] Args: config (list, optional): Gstreamer pileline description list """ if not config: config = \ [ 'videotestsrc ! decodebin', '! videoconvert ! video/x-raw,' 'format=(string)BGR ! videoconvert', '! appsink' ] command = ' '.join(config) self.video_pipe = Gst.parse_launch(command) self.video_pipe.set_state(Gst.State.PLAYING) self.video_sink = self.video_pipe.get_by_name('appsink0')
[docs] @staticmethod def gst_to_opencv(sample): """Transform byte array into np array Args: sample (TYPE): Description Returns: TYPE: Description """ buf = sample.get_buffer() caps = sample.get_caps() array = np.ndarray( ( caps.get_structure(0).get_value('height'), caps.get_structure(0).get_value('width'), 3 ), buffer=buf.extract_dup(0, buf.get_size()), dtype=np.uint8) return array
[docs] def frame(self): """ Get Frame Returns: iterable: bool and image frame, cap.read() output """ return self._frame
[docs] def frame_available(self): """Check if frame is available Returns: bool: true if frame is available """ return self._frame is not None
[docs] def run(self): """ Get frame to update _frame """ self.start_gst( [ self.video_source, self.video_codec, self.video_decode, self.video_sink_conf ]) self.video_sink.connect('new-sample', self.callback)
[docs] def callback(self, sink): sample = sink.emit('pull-sample') new_frame = self.gst_to_opencv(sample) self._frame = new_frame return Gst.FlowReturn.OK
if __name__ == '__main__': # Create the video object # Add port= if is necessary to use a different one video = Video() print("hello") while True: # Wait for the next frame if not video.frame_available(): continue frame = video.frame() cv2.imwrite('frame.jpg', frame)