summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/libcamera/object.h13
-rw-r--r--src/libcamera/include/message.h37
-rw-r--r--src/libcamera/include/thread.h9
-rw-r--r--src/libcamera/meson.build2
-rw-r--r--src/libcamera/message.cpp71
-rw-r--r--src/libcamera/object.cpp77
-rw-r--r--src/libcamera/thread.cpp147
7 files changed, 354 insertions, 2 deletions
diff --git a/include/libcamera/object.h b/include/libcamera/object.h
index eadd41f9..d61dfb1e 100644
--- a/include/libcamera/object.h
+++ b/include/libcamera/object.h
@@ -8,26 +8,39 @@
#define __LIBCAMERA_OBJECT_H__
#include <list>
+#include <memory>
namespace libcamera {
+class Message;
class SignalBase;
template<typename... Args>
class Signal;
+class Thread;
class Object
{
public:
+ Object();
virtual ~Object();
+ void postMessage(std::unique_ptr<Message> msg);
+ virtual void message(Message *msg);
+
+ Thread *thread() const { return thread_; }
+ void moveToThread(Thread *thread);
+
private:
template<typename... Args>
friend class Signal;
+ friend class Thread;
void connect(SignalBase *signal);
void disconnect(SignalBase *signal);
+ Thread *thread_;
std::list<SignalBase *> signals_;
+ unsigned int pendingMessages_;
};
}; /* namespace libcamera */
diff --git a/src/libcamera/include/message.h b/src/libcamera/include/message.h
new file mode 100644
index 00000000..97c9b80e
--- /dev/null
+++ b/src/libcamera/include/message.h
@@ -0,0 +1,37 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2019, Google Inc.
+ *
+ * message.h - Message queue support
+ */
+#ifndef __LIBCAMERA_MESSAGE_H__
+#define __LIBCAMERA_MESSAGE_H__
+
+namespace libcamera {
+
+class Object;
+class Thread;
+
+class Message
+{
+public:
+ enum Type {
+ None = 0,
+ };
+
+ Message(Type type);
+ virtual ~Message();
+
+ Type type() const { return type_; }
+ Object *receiver() const { return receiver_; }
+
+private:
+ friend class Thread;
+
+ Type type_;
+ Object *receiver_;
+};
+
+} /* namespace libcamera */
+
+#endif /* __LIBCAMERA_MESSAGE_H__ */
diff --git a/src/libcamera/include/thread.h b/src/libcamera/include/thread.h
index e881d90e..acae91cb 100644
--- a/src/libcamera/include/thread.h
+++ b/src/libcamera/include/thread.h
@@ -16,6 +16,8 @@
namespace libcamera {
class EventDispatcher;
+class Message;
+class Object;
class ThreadData;
class ThreadMain;
@@ -49,9 +51,16 @@ private:
void startThread();
void finishThread();
+ void postMessage(std::unique_ptr<Message> msg, Object *receiver);
+ void removeMessages(Object *receiver);
+ void dispatchMessages();
+
+ friend class Object;
friend class ThreadData;
friend class ThreadMain;
+ void moveObject(Object *object);
+
std::thread thread_;
ThreadData *data_;
};
diff --git a/src/libcamera/meson.build b/src/libcamera/meson.build
index bad60da4..eda506b2 100644
--- a/src/libcamera/meson.build
+++ b/src/libcamera/meson.build
@@ -18,6 +18,7 @@ libcamera_sources = files([
'log.cpp',
'media_device.cpp',
'media_object.cpp',
+ 'message.cpp',
'object.cpp',
'pipeline_handler.cpp',
'request.cpp',
@@ -45,6 +46,7 @@ libcamera_headers = files([
'include/log.h',
'include/media_device.h',
'include/media_object.h',
+ 'include/message.h',
'include/pipeline_handler.h',
'include/thread.h',
'include/utils.h',
diff --git a/src/libcamera/message.cpp b/src/libcamera/message.cpp
new file mode 100644
index 00000000..5bb17ae2
--- /dev/null
+++ b/src/libcamera/message.cpp
@@ -0,0 +1,71 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2019, Google Inc.
+ *
+ * message.cpp - Message queue support
+ */
+
+#include "message.h"
+
+#include "log.h"
+
+/**
+ * \file message.h
+ * \brief Message queue support
+ *
+ * The messaging API enables inter-thread communication through message
+ * posting. Messages can be sent from any thread to any recipient deriving from
+ * the Object class.
+ *
+ * To post a message, the sender allocates it dynamically as instance of a class
+ * derived from Message. It then posts the message to an Object recipient
+ * through Object::postMessage(). Message ownership is passed to the object,
+ * thus the message shall not store any temporary data.
+ *
+ * The message is delivered in the context of the object's thread, through the
+ * Object::message() virtual method. After delivery the message is
+ * automatically deleted.
+ */
+
+namespace libcamera {
+
+LOG_DEFINE_CATEGORY(Message)
+
+/**
+ * \class Message
+ * \brief A message that can be posted to a Thread
+ */
+
+/**
+ * \enum Message::Type
+ * \brief The message type
+ * \var Message::None
+ * \brief Invalid message type
+ */
+
+/**
+ * \brief Construct a message object of type \a type
+ * \param[in] type The message type
+ */
+Message::Message(Message::Type type)
+ : type_(type)
+{
+}
+
+Message::~Message()
+{
+}
+
+/**
+ * \fn Message::type()
+ * \brief Retrieve the message type
+ * \return The message type
+ */
+
+/**
+ * \fn Message::receiver()
+ * \brief Retrieve the message receiver
+ * \return The message receiver
+ */
+
+}; /* namespace libcamera */
diff --git a/src/libcamera/object.cpp b/src/libcamera/object.cpp
index a504ca2c..fa228483 100644
--- a/src/libcamera/object.cpp
+++ b/src/libcamera/object.cpp
@@ -9,6 +9,9 @@
#include <libcamera/signal.h>
+#include "log.h"
+#include "thread.h"
+
/**
* \file object.h
* \brief Base object to support automatic signal disconnection
@@ -24,13 +27,85 @@ namespace libcamera {
* slots. By inheriting from Object, an object is automatically disconnected
* from all connected signals when it gets destroyed.
*
- * \sa Signal
+ * Object instances are bound to the thread in which they're created. When a
+ * message is posted to an object, its handler will run in the object's thread.
+ * This allows implementing easy message passing between threads by inheriting
+ * from the Object class.
+ *
+ * \sa Message, Signal, Thread
*/
+Object::Object()
+ : pendingMessages_(0)
+{
+ thread_ = Thread::current();
+}
+
Object::~Object()
{
for (SignalBase *signal : signals_)
signal->disconnect(this);
+
+ if (pendingMessages_)
+ thread()->removeMessages(this);
+}
+
+/**
+ * \brief Post a message to the object's thread
+ * \param[in] msg The message
+ *
+ * This method posts the message \a msg to the message queue of the object's
+ * thread, to be delivered to the object through the message() method in the
+ * context of its thread. Message ownership is passed to the thread, and the
+ * message will be deleted after being delivered.
+ *
+ * Messages are delivered through the thread's event loop. If the thread is not
+ * running its event loop the message will not be delivered until the event
+ * loop gets started.
+ */
+void Object::postMessage(std::unique_ptr<Message> msg)
+{
+ thread()->postMessage(std::move(msg), this);
+}
+
+/**
+ * \brief Message handler for the object
+ * \param[in] msg The message
+ *
+ * This virtual method receives messages for the object. It is called in the
+ * context of the object's thread, and can be overridden to process custom
+ * messages. The parent Object::message() method shall be called for any
+ * message not handled by the override method.
+ *
+ * The message \a msg is valid only for the duration of the call, no reference
+ * to it shall be kept after this method returns.
+ */
+void Object::message(Message *msg)
+{
+}
+
+/**
+ * \fn Object::thread()
+ * \brief Retrieve the thread the object is bound to
+ * \return The thread the object is bound to
+ */
+
+/**
+ * \brief Move the object to a different thread
+ * \param[in] thread The target thread
+ *
+ * This method moves the object from the current thread to the new \a thread.
+ * It shall be called from the thread in which the object currently lives,
+ * otherwise the behaviour is undefined.
+ */
+void Object::moveToThread(Thread *thread)
+{
+ ASSERT(Thread::current() == thread_);
+
+ if (thread_ == thread)
+ return;
+
+ thread->moveObject(this);
}
void Object::connect(SignalBase *signal)
diff --git a/src/libcamera/thread.cpp b/src/libcamera/thread.cpp
index 95636eca..5d46eeb8 100644
--- a/src/libcamera/thread.cpp
+++ b/src/libcamera/thread.cpp
@@ -8,11 +8,13 @@
#include "thread.h"
#include <atomic>
+#include <list>
#include <libcamera/event_dispatcher.h>
#include "event_dispatcher_poll.h"
#include "log.h"
+#include "message.h"
/**
* \file thread.h
@@ -26,6 +28,22 @@ LOG_DEFINE_CATEGORY(Thread)
class ThreadMain;
/**
+ * \brief A queue of posted messages
+ */
+class MessageQueue
+{
+public:
+ /**
+ * \brief List of queued Message instances
+ */
+ std::list<std::unique_ptr<Message>> list_;
+ /**
+ * \brief Protects the \ref list_
+ */
+ Mutex mutex_;
+};
+
+/**
* \brief Thread-local internal data
*/
class ThreadData
@@ -51,6 +69,8 @@ private:
std::atomic<bool> exit_;
int exitCode_;
+
+ MessageQueue messages_;
};
/**
@@ -192,8 +212,10 @@ int Thread::exec()
locker.unlock();
- while (!data_->exit_.load(std::memory_order_acquire))
+ while (!data_->exit_.load(std::memory_order_acquire)) {
+ dispatchMessages();
dispatcher->processEvents();
+ }
locker.lock();
@@ -332,4 +354,127 @@ EventDispatcher *Thread::eventDispatcher()
return data_->dispatcher_.load(std::memory_order_relaxed);
}
+/**
+ * \brief Post a message to the thread for the \a receiver
+ * \param[in] msg The message
+ * \param[in] receiver The receiver
+ *
+ * This method stores the message \a msg in the message queue of the thread for
+ * the \a receiver and wake up the thread's event loop. Message ownership is
+ * passed to the thread, and the message will be deleted after being delivered.
+ *
+ * Messages are delivered through the thread's event loop. If the thread is not
+ * running its event loop the message will not be delivered until the event
+ * loop gets started.
+ *
+ * If the \a receiver is not bound to this thread the behaviour is undefined.
+ *
+ * \sa exec()
+ */
+void Thread::postMessage(std::unique_ptr<Message> msg, Object *receiver)
+{
+ msg->receiver_ = receiver;
+
+ ASSERT(data_ == receiver->thread()->data_);
+
+ MutexLocker locker(data_->messages_.mutex_);
+ data_->messages_.list_.push_back(std::move(msg));
+ receiver->pendingMessages_++;
+ locker.unlock();
+
+ EventDispatcher *dispatcher =
+ data_->dispatcher_.load(std::memory_order_acquire);
+ if (dispatcher)
+ dispatcher->interrupt();
+}
+
+/**
+ * \brief Remove all posted messages for the \a receiver
+ * \param[in] receiver The receiver
+ *
+ * If the \a receiver is not bound to this thread the behaviour is undefined.
+ */
+void Thread::removeMessages(Object *receiver)
+{
+ ASSERT(data_ == receiver->thread()->data_);
+
+ MutexLocker locker(data_->messages_.mutex_);
+ if (!receiver->pendingMessages_)
+ return;
+
+ std::vector<std::unique_ptr<Message>> toDelete;
+ for (std::unique_ptr<Message> &msg : data_->messages_.list_) {
+ if (!msg)
+ continue;
+ if (msg->receiver_ != receiver)
+ continue;
+
+ /*
+ * Move the message to the pending deletion list to delete it
+ * after releasing the lock. The messages list element will
+ * contain a null pointer, and will be removed when dispatching
+ * messages.
+ */
+ toDelete.push_back(std::move(msg));
+ receiver->pendingMessages_--;
+ }
+
+ ASSERT(!receiver->pendingMessages_);
+ locker.unlock();
+
+ toDelete.clear();
+}
+
+/**
+ * \brief Dispatch all posted messages for this thread
+ */
+void Thread::dispatchMessages()
+{
+ MutexLocker locker(data_->messages_.mutex_);
+
+ while (!data_->messages_.list_.empty()) {
+ std::unique_ptr<Message> msg = std::move(data_->messages_.list_.front());
+ data_->messages_.list_.pop_front();
+ if (!msg)
+ continue;
+
+ Object *receiver = msg->receiver_;
+ ASSERT(data_ == receiver->thread()->data_);
+
+ locker.unlock();
+ receiver->message(msg.get());
+ locker.lock();
+
+ receiver->pendingMessages_--;
+ }
+}
+
+/**
+ * \brief Move an \a object to the thread
+ * \param[in] object The object
+ */
+void Thread::moveObject(Object *object)
+{
+ ThreadData *currentData = object->thread_->data_;
+ ThreadData *targetData = data_;
+
+ MutexLocker lockerFrom(currentData->mutex_, std::defer_lock);
+ MutexLocker lockerTo(targetData->mutex_, std::defer_lock);
+ std::lock(lockerFrom, lockerTo);
+
+ /* Move pending messages to the message queue of the new thread. */
+ if (object->pendingMessages_) {
+ for (std::unique_ptr<Message> &msg : currentData->messages_.list_) {
+ if (!msg)
+ continue;
+ if (msg->receiver_ != object)
+ continue;
+
+ targetData->messages_.list_.push_back(std::move(msg));
+ }
+ }
+
+ object->thread_ = this;
+}
+
}; /* namespace libcamera */