summaryrefslogtreecommitdiff
path: root/src/libcamera/thread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/libcamera/thread.cpp')
-rw-r--r--src/libcamera/thread.cpp147
1 files changed, 146 insertions, 1 deletions
diff --git a/src/libcamera/thread.cpp b/src/libcamera/thread.cpp
index 95636eca..5d46eeb8 100644
--- a/src/libcamera/thread.cpp
+++ b/src/libcamera/thread.cpp
@@ -8,11 +8,13 @@
#include "thread.h"
#include <atomic>
+#include <list>
#include <libcamera/event_dispatcher.h>
#include "event_dispatcher_poll.h"
#include "log.h"
+#include "message.h"
/**
* \file thread.h
@@ -26,6 +28,22 @@ LOG_DEFINE_CATEGORY(Thread)
class ThreadMain;
/**
+ * \brief A queue of posted messages
+ */
+class MessageQueue
+{
+public:
+ /**
+ * \brief List of queued Message instances
+ */
+ std::list<std::unique_ptr<Message>> list_;
+ /**
+ * \brief Protects the \ref list_
+ */
+ Mutex mutex_;
+};
+
+/**
* \brief Thread-local internal data
*/
class ThreadData
@@ -51,6 +69,8 @@ private:
std::atomic<bool> exit_;
int exitCode_;
+
+ MessageQueue messages_;
};
/**
@@ -192,8 +212,10 @@ int Thread::exec()
locker.unlock();
- while (!data_->exit_.load(std::memory_order_acquire))
+ while (!data_->exit_.load(std::memory_order_acquire)) {
+ dispatchMessages();
dispatcher->processEvents();
+ }
locker.lock();
@@ -332,4 +354,127 @@ EventDispatcher *Thread::eventDispatcher()
return data_->dispatcher_.load(std::memory_order_relaxed);
}
+/**
+ * \brief Post a message to the thread for the \a receiver
+ * \param[in] msg The message
+ * \param[in] receiver The receiver
+ *
+ * This method stores the message \a msg in the message queue of the thread for
+ * the \a receiver and wake up the thread's event loop. Message ownership is
+ * passed to the thread, and the message will be deleted after being delivered.
+ *
+ * Messages are delivered through the thread's event loop. If the thread is not
+ * running its event loop the message will not be delivered until the event
+ * loop gets started.
+ *
+ * If the \a receiver is not bound to this thread the behaviour is undefined.
+ *
+ * \sa exec()
+ */
+void Thread::postMessage(std::unique_ptr<Message> msg, Object *receiver)
+{
+ msg->receiver_ = receiver;
+
+ ASSERT(data_ == receiver->thread()->data_);
+
+ MutexLocker locker(data_->messages_.mutex_);
+ data_->messages_.list_.push_back(std::move(msg));
+ receiver->pendingMessages_++;
+ locker.unlock();
+
+ EventDispatcher *dispatcher =
+ data_->dispatcher_.load(std::memory_order_acquire);
+ if (dispatcher)
+ dispatcher->interrupt();
+}
+
+/**
+ * \brief Remove all posted messages for the \a receiver
+ * \param[in] receiver The receiver
+ *
+ * If the \a receiver is not bound to this thread the behaviour is undefined.
+ */
+void Thread::removeMessages(Object *receiver)
+{
+ ASSERT(data_ == receiver->thread()->data_);
+
+ MutexLocker locker(data_->messages_.mutex_);
+ if (!receiver->pendingMessages_)
+ return;
+
+ std::vector<std::unique_ptr<Message>> toDelete;
+ for (std::unique_ptr<Message> &msg : data_->messages_.list_) {
+ if (!msg)
+ continue;
+ if (msg->receiver_ != receiver)
+ continue;
+
+ /*
+ * Move the message to the pending deletion list to delete it
+ * after releasing the lock. The messages list element will
+ * contain a null pointer, and will be removed when dispatching
+ * messages.
+ */
+ toDelete.push_back(std::move(msg));
+ receiver->pendingMessages_--;
+ }
+
+ ASSERT(!receiver->pendingMessages_);
+ locker.unlock();
+
+ toDelete.clear();
+}
+
+/**
+ * \brief Dispatch all posted messages for this thread
+ */
+void Thread::dispatchMessages()
+{
+ MutexLocker locker(data_->messages_.mutex_);
+
+ while (!data_->messages_.list_.empty()) {
+ std::unique_ptr<Message> msg = std::move(data_->messages_.list_.front());
+ data_->messages_.list_.pop_front();
+ if (!msg)
+ continue;
+
+ Object *receiver = msg->receiver_;
+ ASSERT(data_ == receiver->thread()->data_);
+
+ locker.unlock();
+ receiver->message(msg.get());
+ locker.lock();
+
+ receiver->pendingMessages_--;
+ }
+}
+
+/**
+ * \brief Move an \a object to the thread
+ * \param[in] object The object
+ */
+void Thread::moveObject(Object *object)
+{
+ ThreadData *currentData = object->thread_->data_;
+ ThreadData *targetData = data_;
+
+ MutexLocker lockerFrom(currentData->mutex_, std::defer_lock);
+ MutexLocker lockerTo(targetData->mutex_, std::defer_lock);
+ std::lock(lockerFrom, lockerTo);
+
+ /* Move pending messages to the message queue of the new thread. */
+ if (object->pendingMessages_) {
+ for (std::unique_ptr<Message> &msg : currentData->messages_.list_) {
+ if (!msg)
+ continue;
+ if (msg->receiver_ != object)
+ continue;
+
+ targetData->messages_.list_.push_back(std::move(msg));
+ }
+ }
+
+ object->thread_ = this;
+}
+
}; /* namespace libcamera */