summaryrefslogtreecommitdiff
path: root/src/gstreamer/gstlibcamerasrc.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/gstreamer/gstlibcamerasrc.cpp')
-rw-r--r--src/gstreamer/gstlibcamerasrc.cpp192
1 files changed, 191 insertions, 1 deletions
diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index dfbf092e..0b4cae79 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -14,8 +14,11 @@
#include "gstlibcamerasrc.h"
+#include <queue>
#include <vector>
+#include <gst/base/base.h>
+
#include <libcamera/camera.h>
#include <libcamera/camera_manager.h>
@@ -29,12 +32,71 @@ using namespace libcamera;
GST_DEBUG_CATEGORY_STATIC(source_debug);
#define GST_CAT_DEFAULT source_debug
+struct RequestWrap {
+ RequestWrap(Request *request);
+ ~RequestWrap();
+
+ void attachBuffer(GstBuffer *buffer);
+ GstBuffer *detachBuffer(Stream *stream);
+
+ /* For ptr comparison only. */
+ Request *request_;
+ std::map<Stream *, GstBuffer *> buffers_;
+};
+
+RequestWrap::RequestWrap(Request *request)
+ : request_(request)
+{
+}
+
+RequestWrap::~RequestWrap()
+{
+ for (std::pair<Stream *const, GstBuffer *> &item : buffers_) {
+ if (item.second)
+ gst_buffer_unref(item.second);
+ }
+}
+
+void RequestWrap::attachBuffer(GstBuffer *buffer)
+{
+ FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
+ Stream *stream = gst_libcamera_buffer_get_stream(buffer);
+
+ request_->addBuffer(stream, fb);
+
+ auto item = buffers_.find(stream);
+ if (item != buffers_.end()) {
+ gst_buffer_unref(item->second);
+ item->second = buffer;
+ } else {
+ buffers_[stream] = buffer;
+ }
+}
+
+GstBuffer *RequestWrap::detachBuffer(Stream *stream)
+{
+ GstBuffer *buffer = nullptr;
+
+ auto item = buffers_.find(stream);
+ if (item != buffers_.end()) {
+ buffer = item->second;
+ item->second = nullptr;
+ }
+
+ return buffer;
+}
+
/* Used for C++ object with destructors. */
struct GstLibcameraSrcState {
+ GstLibcameraSrc *src_;
+
std::unique_ptr<CameraManager> cm_;
std::shared_ptr<Camera> cam_;
std::unique_ptr<CameraConfiguration> config_;
std::vector<GstPad *> srcpads_;
+ std::queue<std::unique_ptr<RequestWrap>> requests_;
+
+ void requestCompleted(Request *request);
};
struct _GstLibcameraSrc {
@@ -47,6 +109,7 @@ struct _GstLibcameraSrc {
GstLibcameraSrcState *state;
GstLibcameraAllocator *allocator;
+ GstFlowCombiner *flow_combiner;
};
enum {
@@ -70,6 +133,41 @@ GstStaticPadTemplate request_src_template = {
"src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS
};
+void
+GstLibcameraSrcState::requestCompleted(Request *request)
+{
+ GLibLocker lock(GST_OBJECT(src_));
+
+ GST_DEBUG_OBJECT(src_, "buffers are ready");
+
+ std::unique_ptr<RequestWrap> wrap = std::move(requests_.front());
+ requests_.pop();
+
+ g_return_if_fail(wrap->request_ == request);
+
+ if ((request->status() == Request::RequestCancelled)) {
+ GST_DEBUG_OBJECT(src_, "Request was cancelled");
+ return;
+ }
+
+ GstBuffer *buffer;
+ for (GstPad *srcpad : srcpads_) {
+ Stream *stream = gst_libcamera_pad_get_stream(srcpad);
+ buffer = wrap->detachBuffer(stream);
+ gst_libcamera_pad_queue_buffer(srcpad, buffer);
+ }
+
+ {
+ /* We only want to resume the task if it's paused. */
+ GstTask *task = src_->task;
+ GLibLocker lock(GST_OBJECT(task));
+ if (GST_TASK_STATE(task) == GST_TASK_PAUSED) {
+ GST_TASK_STATE(task) = GST_TASK_STARTED;
+ GST_TASK_SIGNAL(task);
+ }
+ }
+}
+
static bool
gst_libcamera_src_open(GstLibcameraSrc *self)
{
@@ -122,6 +220,8 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
return false;
}
+ cam->requestCompleted.connect(self->state, &GstLibcameraSrcState::requestCompleted);
+
/* No need to lock here, we didn't start our threads yet. */
self->state->cm_ = std::move(cm);
self->state->cam_ = cam;
@@ -133,8 +233,77 @@ static void
gst_libcamera_src_task_run(gpointer user_data)
{
GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
+ GstLibcameraSrcState *state = self->state;
+
+ Request *request = state->cam_->createRequest();
+ auto wrap = std::make_unique<RequestWrap>(request);
+ for (GstPad *srcpad : state->srcpads_) {
+ GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad);
+ GstBuffer *buffer;
+ GstFlowReturn ret;
+
+ ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool),
+ &buffer, nullptr);
+ if (ret != GST_FLOW_OK) {
+ /*
+ * RequestWrap does not take ownership, and we won't be
+ * queueing this one due to lack of buffers.
+ */
+ delete request;
+ request = nullptr;
+ break;
+ }
+
+ wrap->attachBuffer(buffer);
+ }
+
+ if (request) {
+ GLibLocker lock(GST_OBJECT(self));
+ GST_TRACE_OBJECT(self, "Requesting buffers");
+ state->cam_->queueRequest(request);
+ state->requests_.push(std::move(wrap));
+ }
+
+ GstFlowReturn ret = GST_FLOW_OK;
+ gst_flow_combiner_reset(self->flow_combiner);
+ for (GstPad *srcpad : state->srcpads_) {
+ ret = gst_libcamera_pad_push_pending(srcpad);
+ ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
+ srcpad, ret);
+ }
- GST_DEBUG_OBJECT(self, "Streaming thread is now capturing");
+ {
+ /*
+ * Here we need to decide if we want to pause or stop the task. This
+ * needs to happen in lock step with the callback thread which may want
+ * to resume the task.
+ */
+ GLibLocker lock(GST_OBJECT(self));
+ if (ret != GST_FLOW_OK) {
+ if (ret == GST_FLOW_EOS) {
+ g_autoptr(GstEvent) eos = gst_event_new_eos();
+ guint32 seqnum = gst_util_seqnum_next();
+ gst_event_set_seqnum(eos, seqnum);
+ for (GstPad *srcpad : state->srcpads_)
+ gst_pad_push_event(srcpad, gst_event_ref(eos));
+ } else if (ret != GST_FLOW_FLUSHING) {
+ GST_ELEMENT_FLOW_ERROR(self, ret);
+ }
+ gst_task_stop(self->task);
+ return;
+ }
+
+ bool do_pause = true;
+ for (GstPad *srcpad : state->srcpads_) {
+ if (gst_libcamera_pad_has_pending(srcpad)) {
+ do_pause = false;
+ break;
+ }
+ }
+
+ if (do_pause)
+ gst_task_pause(self->task);
+ }
}
static void
@@ -233,12 +402,23 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
return;
}
+ self->flow_combiner = gst_flow_combiner_new();
for (gsize i = 0; i < state->srcpads_.size(); i++) {
GstPad *srcpad = state->srcpads_[i];
const StreamConfiguration &stream_cfg = state->config_->at(i);
GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
stream_cfg.stream());
gst_libcamera_pad_set_pool(srcpad, pool);
+ gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
+ }
+
+ ret = state->cam_->start();
+ if (ret) {
+ GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
+ ("Failed to start the camera: %s", g_strerror(-ret)),
+ ("Camera.start() failed with error code %i", ret));
+ gst_task_stop(task);
+ return;
}
done:
@@ -260,10 +440,14 @@ gst_libcamera_src_task_leave(GstTask *task, GThread *thread, gpointer user_data)
GST_DEBUG_OBJECT(self, "Streaming thread is about to stop");
+ state->cam_->stop();
+
for (GstPad *srcpad : state->srcpads_)
gst_libcamera_pad_set_pool(srcpad, nullptr);
g_clear_object(&self->allocator);
+ g_clear_pointer(&self->flow_combiner,
+ (GDestroyNotify)gst_flow_combiner_free);
}
static void
@@ -343,6 +527,9 @@ gst_libcamera_src_change_state(GstElement *element, GstStateChange transition)
return GST_STATE_CHANGE_FAILURE;
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ gst_task_start(self->task);
+ break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
@@ -394,6 +581,9 @@ gst_libcamera_src_init(GstLibcameraSrc *self)
state->srcpads_.push_back(gst_pad_new_from_template(templ, "src"));
gst_element_add_pad(GST_ELEMENT(self), state->srcpads_[0]);
+
+ /* C-style friend. */
+ state->src_ = self;
self->state = state;
}