diff options
Diffstat (limited to 'src/libcamera/thread.cpp')
-rw-r--r-- | src/libcamera/thread.cpp | 147 |
1 files changed, 146 insertions, 1 deletions
diff --git a/src/libcamera/thread.cpp b/src/libcamera/thread.cpp index 95636eca..5d46eeb8 100644 --- a/src/libcamera/thread.cpp +++ b/src/libcamera/thread.cpp @@ -8,11 +8,13 @@ #include "thread.h" #include <atomic> +#include <list> #include <libcamera/event_dispatcher.h> #include "event_dispatcher_poll.h" #include "log.h" +#include "message.h" /** * \file thread.h @@ -26,6 +28,22 @@ LOG_DEFINE_CATEGORY(Thread) class ThreadMain; /** + * \brief A queue of posted messages + */ +class MessageQueue +{ +public: + /** + * \brief List of queued Message instances + */ + std::list<std::unique_ptr<Message>> list_; + /** + * \brief Protects the \ref list_ + */ + Mutex mutex_; +}; + +/** * \brief Thread-local internal data */ class ThreadData @@ -51,6 +69,8 @@ private: std::atomic<bool> exit_; int exitCode_; + + MessageQueue messages_; }; /** @@ -192,8 +212,10 @@ int Thread::exec() locker.unlock(); - while (!data_->exit_.load(std::memory_order_acquire)) + while (!data_->exit_.load(std::memory_order_acquire)) { + dispatchMessages(); dispatcher->processEvents(); + } locker.lock(); @@ -332,4 +354,127 @@ EventDispatcher *Thread::eventDispatcher() return data_->dispatcher_.load(std::memory_order_relaxed); } +/** + * \brief Post a message to the thread for the \a receiver + * \param[in] msg The message + * \param[in] receiver The receiver + * + * This method stores the message \a msg in the message queue of the thread for + * the \a receiver and wake up the thread's event loop. Message ownership is + * passed to the thread, and the message will be deleted after being delivered. + * + * Messages are delivered through the thread's event loop. If the thread is not + * running its event loop the message will not be delivered until the event + * loop gets started. + * + * If the \a receiver is not bound to this thread the behaviour is undefined. + * + * \sa exec() + */ +void Thread::postMessage(std::unique_ptr<Message> msg, Object *receiver) +{ + msg->receiver_ = receiver; + + ASSERT(data_ == receiver->thread()->data_); + + MutexLocker locker(data_->messages_.mutex_); + data_->messages_.list_.push_back(std::move(msg)); + receiver->pendingMessages_++; + locker.unlock(); + + EventDispatcher *dispatcher = + data_->dispatcher_.load(std::memory_order_acquire); + if (dispatcher) + dispatcher->interrupt(); +} + +/** + * \brief Remove all posted messages for the \a receiver + * \param[in] receiver The receiver + * + * If the \a receiver is not bound to this thread the behaviour is undefined. + */ +void Thread::removeMessages(Object *receiver) +{ + ASSERT(data_ == receiver->thread()->data_); + + MutexLocker locker(data_->messages_.mutex_); + if (!receiver->pendingMessages_) + return; + + std::vector<std::unique_ptr<Message>> toDelete; + for (std::unique_ptr<Message> &msg : data_->messages_.list_) { + if (!msg) + continue; + if (msg->receiver_ != receiver) + continue; + + /* + * Move the message to the pending deletion list to delete it + * after releasing the lock. The messages list element will + * contain a null pointer, and will be removed when dispatching + * messages. + */ + toDelete.push_back(std::move(msg)); + receiver->pendingMessages_--; + } + + ASSERT(!receiver->pendingMessages_); + locker.unlock(); + + toDelete.clear(); +} + +/** + * \brief Dispatch all posted messages for this thread + */ +void Thread::dispatchMessages() +{ + MutexLocker locker(data_->messages_.mutex_); + + while (!data_->messages_.list_.empty()) { + std::unique_ptr<Message> msg = std::move(data_->messages_.list_.front()); + data_->messages_.list_.pop_front(); + if (!msg) + continue; + + Object *receiver = msg->receiver_; + ASSERT(data_ == receiver->thread()->data_); + + locker.unlock(); + receiver->message(msg.get()); + locker.lock(); + + receiver->pendingMessages_--; + } +} + +/** + * \brief Move an \a object to the thread + * \param[in] object The object + */ +void Thread::moveObject(Object *object) +{ + ThreadData *currentData = object->thread_->data_; + ThreadData *targetData = data_; + + MutexLocker lockerFrom(currentData->mutex_, std::defer_lock); + MutexLocker lockerTo(targetData->mutex_, std::defer_lock); + std::lock(lockerFrom, lockerTo); + + /* Move pending messages to the message queue of the new thread. */ + if (object->pendingMessages_) { + for (std::unique_ptr<Message> &msg : currentData->messages_.list_) { + if (!msg) + continue; + if (msg->receiver_ != object) + continue; + + targetData->messages_.list_.push_back(std::move(msg)); + } + } + + object->thread_ = this; +} + }; /* namespace libcamera */ |