栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

将OpenCV框架写入gstreamer rtsp服务器管道

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

将OpenCV框架写入gstreamer rtsp服务器管道

我找到了解决方案,很多东西都丢失了。

  • 我使用了需求数据信号,例如gst-rtsp-server的示例。

  • 我更改了appsrc的一些默认参数,例如is-live,block和format。

  • appsrc元素上的大写字母。

  • 我没有正确设置缓冲区的偏移量。

这是遇到相同问题或有类似问题的任何人的代码。

    #!/usr/bin/env python3    import cv2    import gi    gi.require_version('Gst', '1.0')    gi.require_version('GstRtspServer', '1.0')    from gi.repository import Gst, GstRtspServer, GObject    class SensorFactory(GstRtspServer.RTSPMediaFactory):        def __init__(self, **properties): super(SensorFactory, self).__init__(**properties) self.cap = cv2.VideoCapture(0) self.number_frames = 0 self.fps = 30 self.duration = 1 / self.fps * Gst.SECOND  # duration of a frame in nanoseconds self.launch_string = 'appsrc name=source is-live=true block=true format=GST_FORMAT_TIME ' 'caps=video/x-raw,format=BGR,width=640,height=480,framerate={}/1 ' '! videoconvert ! video/x-raw,format=I420 ' '! x264enc speed-preset=ultrafast tune=zerolatency ' '! rtph264pay config-interval=1 name=pay0 pt=96'.format(self.fps)        def on_need_data(self, src, lenght): if self.cap.isOpened():     ret, frame = self.cap.read()     if ret:         data = frame.tostring()         buf = Gst.Buffer.new_allocate(None, len(data), None)         buf.fill(0, data)         buf.duration = self.duration         timestamp = self.number_frames * self.duration         buf.pts = buf.dts = int(timestamp)         buf.offset = timestamp         self.number_frames += 1         retval = src.emit('push-buffer', buf)         print('pushed buffer, frame {}, duration {} ns, durations {} s'.format(self.number_frames,   self.duration,   self.duration / Gst.SECOND))         if retval != Gst.FlowReturn.OK:  print(retval)        def do_create_element(self, url): return Gst.parse_launch(self.launch_string)        def do_configure(self, rtsp_media): self.number_frames = 0 appsrc = rtsp_media.get_element().get_child_by_name('source') appsrc.connect('need-data', self.on_need_data)    class GstServer(GstRtspServer.RTSPServer):        def __init__(self, **properties): super(GstServer, self).__init__(**properties) self.factory = SensorFactory() self.factory.set_shared(True) self.get_mount_points().add_factory("/test", self.factory) self.attach(None)    GObject.threads_init()    Gst.init(None)    server = GstServer()    loop = GObject.MainLoop()    loop.run()


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/638563.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号