Decode the video frames in real time using gstreamer, modify them using opencv, and then write them back
import os
import gi
import numpy as np
import cv2
gi.require_version('Gst', '1.0')
gi.require_version('GLib', '2.0')
gi.require_version('GstApp', '1.0')
from gi.repository import Gst, GLib, GstApp
os.environ['XDG_RUNTIME_DIR'] = '/tmp/runtime-dir'
os.environ['GST_DEBUG'] = '3,vaapih264enc:5,vaapih264dec:5'
Gst.init(None)
class RTSPToMP4:
def __init__(self, rtsp_url, output_file, duration):
self.rtsp_url = rtsp_url
self.output_file = output_file
self.duration = duration * 1000 # Convert to milliseconds
self.pipeline = self.create_pipeline()
self.loop = GLib.MainLoop()
def on_pad_added(self, src, new_pad):
print(f"New pad {new_pad.get_name()} added to {src.get_name()}")
new_pad_caps = new_pad.get_current_caps()
new_pad_struct = new_pad_caps.get_structure(0)
new_pad_type = new_pad_struct.get_name()
if new_pad_type.startswith("application/x-rtp"):
if "media=(string)audio" in new_pad_struct.to_string():
sink_pad = self.pipeline.get_by_name("audio_queue").get_static_pad("sink")
elif "media=(string)video" in new_pad_struct.to_string():
sink_pad = self.pipeline.get_by_name("video_queue").get_static_pad("sink")
else:
return
if sink_pad.is_linked():
print(f"Sink pad {sink_pad.get_name()} is already linked. Skipping.")
return
result = new_pad.link(sink_pad)
if result != Gst.PadLinkReturn.OK:
print(f"Failed to link {new_pad.get_name()} to {sink_pad.get_name()}: {result}")
def create_pipeline(self):
pipeline_str = (
"rtspsrc location={} latency=0 name=src "
"src. ! queue name=video_queue ! rtpjitterbuffer ! rtph264depay ! h264parse ! vaapih264dec ! videoconvert ! video/x-raw,format=RGB ! appsink name=video_sink "
"appsrc name=video_src format=time is-live=true do-timestamp=true ! queue ! videoconvert ! vaapih264enc ! h264parse ! queue ! mp4mux name=mux ! filesink location={} "
"src. ! queue name=audio_queue ! rtpjitterbuffer ! rtpmp4gdepay ! avdec_aac ! audioconvert ! queue ! avenc_aac ! queue ! mux.".format(
self.rtsp_url, self.output_file)
)
self.pipeline = Gst.parse_launch(pipeline_str)
src = self.pipeline.get_by_name('src')
src.connect("pad-added", self.on_pad_added)
self.audio_queue = self.pipeline.get_by_name('audio_queue')
self.video_queue = self.pipeline.get_by_name('video_queue')
video_sink = self.pipeline.get_by_name('video_sink')
video_sink.set_property("emit-signals", True)
video_sink.connect("new-sample", self.on_new_sample)
self.appsrc = self.pipeline.get_by_name('video_src')
return self.pipeline
def on_new_sample(self, sink):
sample = sink.emit("pull-sample")
buf = sample.get_buffer()
caps = sample.get_caps()
arr = np.ndarray(
(
caps.get_structure(0).get_value('height'),
caps.get_structure(0).get_value('width'),
3
),
buffer=buf.extract_dup(0, buf.get_size()),
dtype=np.uint8
)
arr = arr.copy() # 创建可写的副本
# Modify the frame (for example, adding text)
cv2.putText(arr, 'Hello World', (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
# Create a new buffer and write the modified frame back
new_buf = Gst.Buffer.new_wrapped(arr.tobytes())
self.appsrc.emit("push-buffer", new_buf)
return Gst.FlowReturn.OK
def on_message(self, bus, message):
t = message.type
if t == Gst.MessageType.EOS:
print("End-of-stream")
self.pipeline.set_state(Gst.State.NULL)
self.loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print("Error: %s: %s" % (err, debug))
self.pipeline.set_state(Gst.State.NULL)
self.loop.quit()
def run(self):
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self.on_message)
self.pipeline.set_state(Gst.State.PLAYING)
GLib.timeout_add(self.duration, self.stop_pipeline)
try:
self.loop.run()
except Exception as e:
print("Exception: ", e)
finally:
self.pipeline.set_state(Gst.State.NULL)
def stop_pipeline(self):
self.pipeline.send_event(Gst.Event.new_eos())
return False
if __name__ == "__main__":
rtsp_url = "rtsp://admin:abc@[email protected]:554/h264/ch1/main/av_stream".replace("abc@12345", "abc%4012345")
output_file = "output.mp4"
duration = 20
rtsp_to_mp4 = RTSPToMP4(rtsp_url, output_file, duration)
rtsp_to_mp4.run()
My requirement is to accelerate the encoding and decoding through vaapi, and then modify the video frame through the pipeline, so that the gpu resources can be used to accelerate the encoding and decoding, and I can also modify the video frame
New contributor
ni称 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.