diff options
author | Nicolas Dufresne <nicolas.dufresne@collabora.com> | 2024-06-05 15:41:20 -0400 |
---|---|---|
committer | Kieran Bingham <kieran.bingham@ideasonboard.com> | 2024-07-25 09:54:56 +0100 |
commit | 04f1f2033724f038ab1152e8135292770a33f97a (patch) | |
tree | 81ddfe145cdc3cf1c4d4d346d5bd37e388489186 /src | |
parent | 01132257b9e70e6408e98e7df684c6db3aefe8e8 (diff) |
gstreamer: pool: Replace GstAtomicQueue with deque and mutex
The GstAtomicQueue only supports 2 threads, one pushing, and one
popping. We pop and push on error cases and we may have multiple threads
downstream returning buffer (using tee), which breaks this assumption.
On top of which, the release function, that notifies when the queue goes
from empty to not-empty relies on a racy empty check. The downstream
thread that does this check is effectively concurrent with our thread
calling acquire().
Fix this by replacing the GstAtomicQueue with a std::deque, and protect
access to that using the object lock.
Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
Reviewed-by: Kieran Bingham <kieran.bingham@ideasonboard.com>
Acked-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com>
Signed-off-by: Kieran Bingham <kieran.bingham@ideasonboard.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/gstreamer/gstlibcamerapool.cpp | 40 |
1 files changed, 31 insertions, 9 deletions
diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp index 9661c67a..0b1a5689 100644 --- a/src/gstreamer/gstlibcamerapool.cpp +++ b/src/gstreamer/gstlibcamerapool.cpp @@ -8,6 +8,7 @@ #include "gstlibcamerapool.h" +#include <deque> #include <libcamera/stream.h> #include "gstlibcamera-utils.h" @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS]; struct _GstLibcameraPool { GstBufferPool parent; - GstAtomicQueue *queue; + std::deque<GstBuffer *> *queue; GstLibcameraAllocator *allocator; Stream *stream; }; G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL) +static GstBuffer * +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self) +{ + GLibLocker lock(GST_OBJECT(self)); + GstBuffer *buf; + + if (self->queue->empty()) + return nullptr; + + buf = self->queue->front(); + self->queue->pop_front(); + + return buf; +} + static GstFlowReturn gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer, [[maybe_unused]] GstBufferPoolAcquireParams *params) { GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); - GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)); + GstBuffer *buf = gst_libcamera_pool_pop_buffer(self); + if (!buf) return GST_FLOW_ERROR; if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) { - gst_atomic_queue_push(self->queue, buf); + GLibLocker lock(GST_OBJECT(self)); + self->queue->push_back(buf); return GST_FLOW_ERROR; } @@ -64,9 +82,13 @@ static void gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) { GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); - bool do_notify = gst_atomic_queue_length(self->queue) == 0; + bool do_notify; - gst_atomic_queue_push(self->queue, buffer); + { + GLibLocker lock(GST_OBJECT(self)); + do_notify = self->queue->empty(); + self->queue->push_back(buffer); + } if (do_notify) g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0); @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) static void gst_libcamera_pool_init(GstLibcameraPool *self) { - self->queue = gst_atomic_queue_new(4); + self->queue = new std::deque<GstBuffer *>(); } static void @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object) GstLibcameraPool *self = GST_LIBCAMERA_POOL(object); GstBuffer *buf; - while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)))) + while ((buf = gst_libcamera_pool_pop_buffer(self))) gst_buffer_unref(buf); - gst_atomic_queue_unref(self->queue); + delete self->queue; g_object_unref(self->allocator); G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object); @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream) gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream); for (gsize i = 0; i < pool_size; i++) { GstBuffer *buffer = gst_buffer_new(); - gst_atomic_queue_push(pool->queue, buffer); + pool->queue->push_back(buffer); } return pool; |