diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/libcamera/include/message.h | 37 | ||||
-rw-r--r-- | src/libcamera/include/thread.h | 9 | ||||
-rw-r--r-- | src/libcamera/meson.build | 2 | ||||
-rw-r--r-- | src/libcamera/message.cpp | 71 | ||||
-rw-r--r-- | src/libcamera/object.cpp | 77 | ||||
-rw-r--r-- | src/libcamera/thread.cpp | 147 |
6 files changed, 341 insertions, 2 deletions
diff --git a/src/libcamera/include/message.h b/src/libcamera/include/message.h new file mode 100644 index 00000000..97c9b80e --- /dev/null +++ b/src/libcamera/include/message.h @@ -0,0 +1,37 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +/* + * Copyright (C) 2019, Google Inc. + * + * message.h - Message queue support + */ +#ifndef __LIBCAMERA_MESSAGE_H__ +#define __LIBCAMERA_MESSAGE_H__ + +namespace libcamera { + +class Object; +class Thread; + +class Message +{ +public: + enum Type { + None = 0, + }; + + Message(Type type); + virtual ~Message(); + + Type type() const { return type_; } + Object *receiver() const { return receiver_; } + +private: + friend class Thread; + + Type type_; + Object *receiver_; +}; + +} /* namespace libcamera */ + +#endif /* __LIBCAMERA_MESSAGE_H__ */ diff --git a/src/libcamera/include/thread.h b/src/libcamera/include/thread.h index e881d90e..acae91cb 100644 --- a/src/libcamera/include/thread.h +++ b/src/libcamera/include/thread.h @@ -16,6 +16,8 @@ namespace libcamera { class EventDispatcher; +class Message; +class Object; class ThreadData; class ThreadMain; @@ -49,9 +51,16 @@ private: void startThread(); void finishThread(); + void postMessage(std::unique_ptr<Message> msg, Object *receiver); + void removeMessages(Object *receiver); + void dispatchMessages(); + + friend class Object; friend class ThreadData; friend class ThreadMain; + void moveObject(Object *object); + std::thread thread_; ThreadData *data_; }; diff --git a/src/libcamera/meson.build b/src/libcamera/meson.build index bad60da4..eda506b2 100644 --- a/src/libcamera/meson.build +++ b/src/libcamera/meson.build @@ -18,6 +18,7 @@ libcamera_sources = files([ 'log.cpp', 'media_device.cpp', 'media_object.cpp', + 'message.cpp', 'object.cpp', 'pipeline_handler.cpp', 'request.cpp', @@ -45,6 +46,7 @@ libcamera_headers = files([ 'include/log.h', 'include/media_device.h', 'include/media_object.h', + 'include/message.h', 'include/pipeline_handler.h', 'include/thread.h', 'include/utils.h', diff --git a/src/libcamera/message.cpp b/src/libcamera/message.cpp new file mode 100644 index 00000000..5bb17ae2 --- /dev/null +++ b/src/libcamera/message.cpp @@ -0,0 +1,71 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +/* + * Copyright (C) 2019, Google Inc. + * + * message.cpp - Message queue support + */ + +#include "message.h" + +#include "log.h" + +/** + * \file message.h + * \brief Message queue support + * + * The messaging API enables inter-thread communication through message + * posting. Messages can be sent from any thread to any recipient deriving from + * the Object class. + * + * To post a message, the sender allocates it dynamically as instance of a class + * derived from Message. It then posts the message to an Object recipient + * through Object::postMessage(). Message ownership is passed to the object, + * thus the message shall not store any temporary data. + * + * The message is delivered in the context of the object's thread, through the + * Object::message() virtual method. After delivery the message is + * automatically deleted. + */ + +namespace libcamera { + +LOG_DEFINE_CATEGORY(Message) + +/** + * \class Message + * \brief A message that can be posted to a Thread + */ + +/** + * \enum Message::Type + * \brief The message type + * \var Message::None + * \brief Invalid message type + */ + +/** + * \brief Construct a message object of type \a type + * \param[in] type The message type + */ +Message::Message(Message::Type type) + : type_(type) +{ +} + +Message::~Message() +{ +} + +/** + * \fn Message::type() + * \brief Retrieve the message type + * \return The message type + */ + +/** + * \fn Message::receiver() + * \brief Retrieve the message receiver + * \return The message receiver + */ + +}; /* namespace libcamera */ diff --git a/src/libcamera/object.cpp b/src/libcamera/object.cpp index a504ca2c..fa228483 100644 --- a/src/libcamera/object.cpp +++ b/src/libcamera/object.cpp @@ -9,6 +9,9 @@ #include <libcamera/signal.h> +#include "log.h" +#include "thread.h" + /** * \file object.h * \brief Base object to support automatic signal disconnection @@ -24,13 +27,85 @@ namespace libcamera { * slots. By inheriting from Object, an object is automatically disconnected * from all connected signals when it gets destroyed. * - * \sa Signal + * Object instances are bound to the thread in which they're created. When a + * message is posted to an object, its handler will run in the object's thread. + * This allows implementing easy message passing between threads by inheriting + * from the Object class. + * + * \sa Message, Signal, Thread */ +Object::Object() + : pendingMessages_(0) +{ + thread_ = Thread::current(); +} + Object::~Object() { for (SignalBase *signal : signals_) signal->disconnect(this); + + if (pendingMessages_) + thread()->removeMessages(this); +} + +/** + * \brief Post a message to the object's thread + * \param[in] msg The message + * + * This method posts the message \a msg to the message queue of the object's + * thread, to be delivered to the object through the message() method in the + * context of its thread. Message ownership is passed to the thread, and the + * message will be deleted after being delivered. + * + * Messages are delivered through the thread's event loop. If the thread is not + * running its event loop the message will not be delivered until the event + * loop gets started. + */ +void Object::postMessage(std::unique_ptr<Message> msg) +{ + thread()->postMessage(std::move(msg), this); +} + +/** + * \brief Message handler for the object + * \param[in] msg The message + * + * This virtual method receives messages for the object. It is called in the + * context of the object's thread, and can be overridden to process custom + * messages. The parent Object::message() method shall be called for any + * message not handled by the override method. + * + * The message \a msg is valid only for the duration of the call, no reference + * to it shall be kept after this method returns. + */ +void Object::message(Message *msg) +{ +} + +/** + * \fn Object::thread() + * \brief Retrieve the thread the object is bound to + * \return The thread the object is bound to + */ + +/** + * \brief Move the object to a different thread + * \param[in] thread The target thread + * + * This method moves the object from the current thread to the new \a thread. + * It shall be called from the thread in which the object currently lives, + * otherwise the behaviour is undefined. + */ +void Object::moveToThread(Thread *thread) +{ + ASSERT(Thread::current() == thread_); + + if (thread_ == thread) + return; + + thread->moveObject(this); } void Object::connect(SignalBase *signal) diff --git a/src/libcamera/thread.cpp b/src/libcamera/thread.cpp index 95636eca..5d46eeb8 100644 --- a/src/libcamera/thread.cpp +++ b/src/libcamera/thread.cpp @@ -8,11 +8,13 @@ #include "thread.h" #include <atomic> +#include <list> #include <libcamera/event_dispatcher.h> #include "event_dispatcher_poll.h" #include "log.h" +#include "message.h" /** * \file thread.h @@ -26,6 +28,22 @@ LOG_DEFINE_CATEGORY(Thread) class ThreadMain; /** + * \brief A queue of posted messages + */ +class MessageQueue +{ +public: + /** + * \brief List of queued Message instances + */ + std::list<std::unique_ptr<Message>> list_; + /** + * \brief Protects the \ref list_ + */ + Mutex mutex_; +}; + +/** * \brief Thread-local internal data */ class ThreadData @@ -51,6 +69,8 @@ private: std::atomic<bool> exit_; int exitCode_; + + MessageQueue messages_; }; /** @@ -192,8 +212,10 @@ int Thread::exec() locker.unlock(); - while (!data_->exit_.load(std::memory_order_acquire)) + while (!data_->exit_.load(std::memory_order_acquire)) { + dispatchMessages(); dispatcher->processEvents(); + } locker.lock(); @@ -332,4 +354,127 @@ EventDispatcher *Thread::eventDispatcher() return data_->dispatcher_.load(std::memory_order_relaxed); } +/** + * \brief Post a message to the thread for the \a receiver + * \param[in] msg The message + * \param[in] receiver The receiver + * + * This method stores the message \a msg in the message queue of the thread for + * the \a receiver and wake up the thread's event loop. Message ownership is + * passed to the thread, and the message will be deleted after being delivered. + * + * Messages are delivered through the thread's event loop. If the thread is not + * running its event loop the message will not be delivered until the event + * loop gets started. + * + * If the \a receiver is not bound to this thread the behaviour is undefined. + * + * \sa exec() + */ +void Thread::postMessage(std::unique_ptr<Message> msg, Object *receiver) +{ + msg->receiver_ = receiver; + + ASSERT(data_ == receiver->thread()->data_); + + MutexLocker locker(data_->messages_.mutex_); + data_->messages_.list_.push_back(std::move(msg)); + receiver->pendingMessages_++; + locker.unlock(); + + EventDispatcher *dispatcher = + data_->dispatcher_.load(std::memory_order_acquire); + if (dispatcher) + dispatcher->interrupt(); +} + +/** + * \brief Remove all posted messages for the \a receiver + * \param[in] receiver The receiver + * + * If the \a receiver is not bound to this thread the behaviour is undefined. + */ +void Thread::removeMessages(Object *receiver) +{ + ASSERT(data_ == receiver->thread()->data_); + + MutexLocker locker(data_->messages_.mutex_); + if (!receiver->pendingMessages_) + return; + + std::vector<std::unique_ptr<Message>> toDelete; + for (std::unique_ptr<Message> &msg : data_->messages_.list_) { + if (!msg) + continue; + if (msg->receiver_ != receiver) + continue; + + /* + * Move the message to the pending deletion list to delete it + * after releasing the lock. The messages list element will + * contain a null pointer, and will be removed when dispatching + * messages. + */ + toDelete.push_back(std::move(msg)); + receiver->pendingMessages_--; + } + + ASSERT(!receiver->pendingMessages_); + locker.unlock(); + + toDelete.clear(); +} + +/** + * \brief Dispatch all posted messages for this thread + */ +void Thread::dispatchMessages() +{ + MutexLocker locker(data_->messages_.mutex_); + + while (!data_->messages_.list_.empty()) { + std::unique_ptr<Message> msg = std::move(data_->messages_.list_.front()); + data_->messages_.list_.pop_front(); + if (!msg) + continue; + + Object *receiver = msg->receiver_; + ASSERT(data_ == receiver->thread()->data_); + + locker.unlock(); + receiver->message(msg.get()); + locker.lock(); + + receiver->pendingMessages_--; + } +} + +/** + * \brief Move an \a object to the thread + * \param[in] object The object + */ +void Thread::moveObject(Object *object) +{ + ThreadData *currentData = object->thread_->data_; + ThreadData *targetData = data_; + + MutexLocker lockerFrom(currentData->mutex_, std::defer_lock); + MutexLocker lockerTo(targetData->mutex_, std::defer_lock); + std::lock(lockerFrom, lockerTo); + + /* Move pending messages to the message queue of the new thread. */ + if (object->pendingMessages_) { + for (std::unique_ptr<Message> &msg : currentData->messages_.list_) { + if (!msg) + continue; + if (msg->receiver_ != object) + continue; + + targetData->messages_.list_.push_back(std::move(msg)); + } + } + + object->thread_ = this; +} + }; /* namespace libcamera */ |