summaryrefslogtreecommitdiff
path: root/src/libcamera/thread.cpp
diff options
context:
space:
mode:
authorLaurent Pinchart <laurent.pinchart@ideasonboard.com>2019-03-24 03:21:28 +0200
committerLaurent Pinchart <laurent.pinchart@ideasonboard.com>2019-07-11 10:20:15 +0300
commit01b930964acdd9475d46044c459396f8c3cf8a79 (patch)
treee7f4625cd315426dba6da7caa2d559eca458b14c /src/libcamera/thread.cpp
parent525b19c4101235385148ff9358b7b6e778a1f148 (diff)
libcamera: thread: Add a messaging passing API
Create a new Message class to model a message that can be passed to an object living in another thread. Only an invalid message type is currently defined, more messages will be added in the future. The Thread class is extended with a messages queue, and the Object class with thread affinity. Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> Reviewed-by: Niklas Söderlund <niklas.soderlund@ragnatech.se>
Diffstat (limited to 'src/libcamera/thread.cpp')
-rw-r--r--src/libcamera/thread.cpp147
1 files changed, 146 insertions, 1 deletions
diff --git a/src/libcamera/thread.cpp b/src/libcamera/thread.cpp
index 95636eca..5d46eeb8 100644
--- a/src/libcamera/thread.cpp
+++ b/src/libcamera/thread.cpp
@@ -8,11 +8,13 @@
#include "thread.h"
#include <atomic>
+#include <list>
#include <libcamera/event_dispatcher.h>
#include "event_dispatcher_poll.h"
#include "log.h"
+#include "message.h"
/**
* \file thread.h
@@ -26,6 +28,22 @@ LOG_DEFINE_CATEGORY(Thread)
class ThreadMain;
/**
+ * \brief A queue of posted messages
+ */
+class MessageQueue
+{
+public:
+ /**
+ * \brief List of queued Message instances
+ */
+ std::list<std::unique_ptr<Message>> list_;
+ /**
+ * \brief Protects the \ref list_
+ */
+ Mutex mutex_;
+};
+
+/**
* \brief Thread-local internal data
*/
class ThreadData
@@ -51,6 +69,8 @@ private:
std::atomic<bool> exit_;
int exitCode_;
+
+ MessageQueue messages_;
};
/**
@@ -192,8 +212,10 @@ int Thread::exec()
locker.unlock();
- while (!data_->exit_.load(std::memory_order_acquire))
+ while (!data_->exit_.load(std::memory_order_acquire)) {
+ dispatchMessages();
dispatcher->processEvents();
+ }
locker.lock();
@@ -332,4 +354,127 @@ EventDispatcher *Thread::eventDispatcher()
return data_->dispatcher_.load(std::memory_order_relaxed);
}
+/**
+ * \brief Post a message to the thread for the \a receiver
+ * \param[in] msg The message
+ * \param[in] receiver The receiver
+ *
+ * This method stores the message \a msg in the message queue of the thread for
+ * the \a receiver and wake up the thread's event loop. Message ownership is
+ * passed to the thread, and the message will be deleted after being delivered.
+ *
+ * Messages are delivered through the thread's event loop. If the thread is not
+ * running its event loop the message will not be delivered until the event
+ * loop gets started.
+ *
+ * If the \a receiver is not bound to this thread the behaviour is undefined.
+ *
+ * \sa exec()
+ */
+void Thread::postMessage(std::unique_ptr<Message> msg, Object *receiver)
+{
+ msg->receiver_ = receiver;
+
+ ASSERT(data_ == receiver->thread()->data_);
+
+ MutexLocker locker(data_->messages_.mutex_);
+ data_->messages_.list_.push_back(std::move(msg));
+ receiver->pendingMessages_++;
+ locker.unlock();
+
+ EventDispatcher *dispatcher =
+ data_->dispatcher_.load(std::memory_order_acquire);
+ if (dispatcher)
+ dispatcher->interrupt();
+}
+
+/**
+ * \brief Remove all posted messages for the \a receiver
+ * \param[in] receiver The receiver
+ *
+ * If the \a receiver is not bound to this thread the behaviour is undefined.
+ */
+void Thread::removeMessages(Object *receiver)
+{
+ ASSERT(data_ == receiver->thread()->data_);
+
+ MutexLocker locker(data_->messages_.mutex_);
+ if (!receiver->pendingMessages_)
+ return;
+
+ std::vector<std::unique_ptr<Message>> toDelete;
+ for (std::unique_ptr<Message> &msg : data_->messages_.list_) {
+ if (!msg)
+ continue;
+ if (msg->receiver_ != receiver)
+ continue;
+
+ /*
+ * Move the message to the pending deletion list to delete it
+ * after releasing the lock. The messages list element will
+ * contain a null pointer, and will be removed when dispatching
+ * messages.
+ */
+ toDelete.push_back(std::move(msg));
+ receiver->pendingMessages_--;
+ }
+
+ ASSERT(!receiver->pendingMessages_);
+ locker.unlock();
+
+ toDelete.clear();
+}
+
+/**
+ * \brief Dispatch all posted messages for this thread
+ */
+void Thread::dispatchMessages()
+{
+ MutexLocker locker(data_->messages_.mutex_);
+
+ while (!data_->messages_.list_.empty()) {
+ std::unique_ptr<Message> msg = std::move(data_->messages_.list_.front());
+ data_->messages_.list_.pop_front();
+ if (!msg)
+ continue;
+
+ Object *receiver = msg->receiver_;
+ ASSERT(data_ == receiver->thread()->data_);
+
+ locker.unlock();
+ receiver->message(msg.get());
+ locker.lock();
+
+ receiver->pendingMessages_--;
+ }
+}
+
+/**
+ * \brief Move an \a object to the thread
+ * \param[in] object The object
+ */
+void Thread::moveObject(Object *object)
+{
+ ThreadData *currentData = object->thread_->data_;
+ ThreadData *targetData = data_;
+
+ MutexLocker lockerFrom(currentData->mutex_, std::defer_lock);
+ MutexLocker lockerTo(targetData->mutex_, std::defer_lock);
+ std::lock(lockerFrom, lockerTo);
+
+ /* Move pending messages to the message queue of the new thread. */
+ if (object->pendingMessages_) {
+ for (std::unique_ptr<Message> &msg : currentData->messages_.list_) {
+ if (!msg)
+ continue;
+ if (msg->receiver_ != object)
+ continue;
+
+ targetData->messages_.list_.push_back(std::move(msg));
+ }
+ }
+
+ object->thread_ = this;
+}
+
}; /* namespace libcamera */