From 01b930964acdd9475d46044c459396f8c3cf8a79 Mon Sep 17 00:00:00 2001 From: Laurent Pinchart Date: Sun, 24 Mar 2019 03:21:28 +0200 Subject: libcamera: thread: Add a messaging passing API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create a new Message class to model a message that can be passed to an object living in another thread. Only an invalid message type is currently defined, more messages will be added in the future. The Thread class is extended with a messages queue, and the Object class with thread affinity. Signed-off-by: Laurent Pinchart Reviewed-by: Niklas Söderlund --- src/libcamera/thread.cpp | 147 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 146 insertions(+), 1 deletion(-) (limited to 'src/libcamera/thread.cpp') diff --git a/src/libcamera/thread.cpp b/src/libcamera/thread.cpp index 95636eca..5d46eeb8 100644 --- a/src/libcamera/thread.cpp +++ b/src/libcamera/thread.cpp @@ -8,11 +8,13 @@ #include "thread.h" #include +#include #include #include "event_dispatcher_poll.h" #include "log.h" +#include "message.h" /** * \file thread.h @@ -25,6 +27,22 @@ LOG_DEFINE_CATEGORY(Thread) class ThreadMain; +/** + * \brief A queue of posted messages + */ +class MessageQueue +{ +public: + /** + * \brief List of queued Message instances + */ + std::list> list_; + /** + * \brief Protects the \ref list_ + */ + Mutex mutex_; +}; + /** * \brief Thread-local internal data */ @@ -51,6 +69,8 @@ private: std::atomic exit_; int exitCode_; + + MessageQueue messages_; }; /** @@ -192,8 +212,10 @@ int Thread::exec() locker.unlock(); - while (!data_->exit_.load(std::memory_order_acquire)) + while (!data_->exit_.load(std::memory_order_acquire)) { + dispatchMessages(); dispatcher->processEvents(); + } locker.lock(); @@ -332,4 +354,127 @@ EventDispatcher *Thread::eventDispatcher() return data_->dispatcher_.load(std::memory_order_relaxed); } +/** + * \brief Post a message to the thread for the \a receiver + * \param[in] msg The message + * \param[in] receiver The receiver + * + * This method stores the message \a msg in the message queue of the thread for + * the \a receiver and wake up the thread's event loop. Message ownership is + * passed to the thread, and the message will be deleted after being delivered. + * + * Messages are delivered through the thread's event loop. If the thread is not + * running its event loop the message will not be delivered until the event + * loop gets started. + * + * If the \a receiver is not bound to this thread the behaviour is undefined. + * + * \sa exec() + */ +void Thread::postMessage(std::unique_ptr msg, Object *receiver) +{ + msg->receiver_ = receiver; + + ASSERT(data_ == receiver->thread()->data_); + + MutexLocker locker(data_->messages_.mutex_); + data_->messages_.list_.push_back(std::move(msg)); + receiver->pendingMessages_++; + locker.unlock(); + + EventDispatcher *dispatcher = + data_->dispatcher_.load(std::memory_order_acquire); + if (dispatcher) + dispatcher->interrupt(); +} + +/** + * \brief Remove all posted messages for the \a receiver + * \param[in] receiver The receiver + * + * If the \a receiver is not bound to this thread the behaviour is undefined. + */ +void Thread::removeMessages(Object *receiver) +{ + ASSERT(data_ == receiver->thread()->data_); + + MutexLocker locker(data_->messages_.mutex_); + if (!receiver->pendingMessages_) + return; + + std::vector> toDelete; + for (std::unique_ptr &msg : data_->messages_.list_) { + if (!msg) + continue; + if (msg->receiver_ != receiver) + continue; + + /* + * Move the message to the pending deletion list to delete it + * after releasing the lock. The messages list element will + * contain a null pointer, and will be removed when dispatching + * messages. + */ + toDelete.push_back(std::move(msg)); + receiver->pendingMessages_--; + } + + ASSERT(!receiver->pendingMessages_); + locker.unlock(); + + toDelete.clear(); +} + +/** + * \brief Dispatch all posted messages for this thread + */ +void Thread::dispatchMessages() +{ + MutexLocker locker(data_->messages_.mutex_); + + while (!data_->messages_.list_.empty()) { + std::unique_ptr msg = std::move(data_->messages_.list_.front()); + data_->messages_.list_.pop_front(); + if (!msg) + continue; + + Object *receiver = msg->receiver_; + ASSERT(data_ == receiver->thread()->data_); + + locker.unlock(); + receiver->message(msg.get()); + locker.lock(); + + receiver->pendingMessages_--; + } +} + +/** + * \brief Move an \a object to the thread + * \param[in] object The object + */ +void Thread::moveObject(Object *object) +{ + ThreadData *currentData = object->thread_->data_; + ThreadData *targetData = data_; + + MutexLocker lockerFrom(currentData->mutex_, std::defer_lock); + MutexLocker lockerTo(targetData->mutex_, std::defer_lock); + std::lock(lockerFrom, lockerTo); + + /* Move pending messages to the message queue of the new thread. */ + if (object->pendingMessages_) { + for (std::unique_ptr &msg : currentData->messages_.list_) { + if (!msg) + continue; + if (msg->receiver_ != object) + continue; + + targetData->messages_.list_.push_back(std::move(msg)); + } + } + + object->thread_ = this; +} + }; /* namespace libcamera */ -- cgit v1.2.1