From 04f1f2033724f038ab1152e8135292770a33f97a Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Wed, 5 Jun 2024 15:41:20 -0400 Subject: gstreamer: pool: Replace GstAtomicQueue with deque and mutex The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We pop and push on error cases and we may have multiple threads downstream returning buffer (using tee), which breaks this assumption. On top of which, the release function, that notifies when the queue goes from empty to not-empty relies on a racy empty check. The downstream thread that does this check is effectively concurrent with our thread calling acquire(). Fix this by replacing the GstAtomicQueue with a std::deque, and protect access to that using the object lock. Bug: https://bugs.libcamera.org/show_bug.cgi?id=201 Signed-off-by: Nicolas Dufresne Reviewed-by: Kieran Bingham Acked-by: Jacopo Mondi Signed-off-by: Kieran Bingham --- src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 9 deletions(-) (limited to 'src/gstreamer') diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp index 9661c67a..0b1a5689 100644 --- a/src/gstreamer/gstlibcamerapool.cpp +++ b/src/gstreamer/gstlibcamerapool.cpp @@ -8,6 +8,7 @@ #include "gstlibcamerapool.h" +#include #include #include "gstlibcamera-utils.h" @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS]; struct _GstLibcameraPool { GstBufferPool parent; - GstAtomicQueue *queue; + std::deque *queue; GstLibcameraAllocator *allocator; Stream *stream; }; G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL) +static GstBuffer * +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self) +{ + GLibLocker lock(GST_OBJECT(self)); + GstBuffer *buf; + + if (self->queue->empty()) + return nullptr; + + buf = self->queue->front(); + self->queue->pop_front(); + + return buf; +} + static GstFlowReturn gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer, [[maybe_unused]] GstBufferPoolAcquireParams *params) { GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); - GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)); + GstBuffer *buf = gst_libcamera_pool_pop_buffer(self); + if (!buf) return GST_FLOW_ERROR; if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) { - gst_atomic_queue_push(self->queue, buf); + GLibLocker lock(GST_OBJECT(self)); + self->queue->push_back(buf); return GST_FLOW_ERROR; } @@ -64,9 +82,13 @@ static void gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) { GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); - bool do_notify = gst_atomic_queue_length(self->queue) == 0; + bool do_notify; - gst_atomic_queue_push(self->queue, buffer); + { + GLibLocker lock(GST_OBJECT(self)); + do_notify = self->queue->empty(); + self->queue->push_back(buffer); + } if (do_notify) g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0); @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) static void gst_libcamera_pool_init(GstLibcameraPool *self) { - self->queue = gst_atomic_queue_new(4); + self->queue = new std::deque(); } static void @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object) GstLibcameraPool *self = GST_LIBCAMERA_POOL(object); GstBuffer *buf; - while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)))) + while ((buf = gst_libcamera_pool_pop_buffer(self))) gst_buffer_unref(buf); - gst_atomic_queue_unref(self->queue); + delete self->queue; g_object_unref(self->allocator); G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object); @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream) gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream); for (gsize i = 0; i < pool_size; i++) { GstBuffer *buffer = gst_buffer_new(); - gst_atomic_queue_push(pool->queue, buffer); + pool->queue->push_back(buffer); } return pool; -- cgit v1.2.1