summaryrefslogtreecommitdiff
path: root/src/libcamera/base/thread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/libcamera/base/thread.cpp')
-rw-r--r--src/libcamera/base/thread.cpp680
1 files changed, 680 insertions, 0 deletions
diff --git a/src/libcamera/base/thread.cpp b/src/libcamera/base/thread.cpp
new file mode 100644
index 00000000..c7c2d6b2
--- /dev/null
+++ b/src/libcamera/base/thread.cpp
@@ -0,0 +1,680 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2019, Google Inc.
+ *
+ * thread.cpp - Thread support
+ */
+
+#include <libcamera/base/thread.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <list>
+#include <sys/syscall.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <libcamera/base/event_dispatcher.h>
+#include <libcamera/base/event_dispatcher_poll.h>
+#include <libcamera/base/log.h>
+#include <libcamera/base/message.h>
+
+/**
+ * \page thread Thread Support
+ *
+ * libcamera supports multi-threaded applications through a threading model that
+ * sets precise rules to guarantee thread-safe usage of the API. Additionally,
+ * libcamera makes internal use of threads, and offers APIs that simplify
+ * interactions with application threads. Careful compliance with the threading
+ * model will ensure avoidance of race conditions.
+ *
+ * Every thread created by libcamera is associated with an instance of the
+ * Thread class. Those threads run an internal event loop by default to
+ * dispatch events to objects. Additionally, the main thread of the application
+ * (defined as the thread that calls CameraManager::start()) is also associated
+ * with a Thread instance, but has no event loop accessible to libcamera. Other
+ * application threads are not visible to libcamera.
+ *
+ * \section thread-objects Threads and Objects
+ *
+ * Instances of the Object class and all its derived classes are thread-aware
+ * and are bound to the thread they are created in. They are said to *live* in
+ * a thread, and they interact with the event loop of their thread for the
+ * purpose of message passing and signal delivery. Messages posted to the
+ * object with Object::postMessage() will be delivered from the event loop of
+ * the thread that the object lives in. Signals delivered to the object, unless
+ * explicitly connected with ConnectionTypeDirect, will also be delivered from
+ * the object thread's event loop.
+ *
+ * All Object instances created internally by libcamera are bound to internal
+ * threads. As objects interact with thread event loops for proper operation,
+ * creating an Object instance in a thread that has no internal event loop (such
+ * as the main application thread, or libcamera threads that have a custom main
+ * loop), prevents some features of the Object class from being used. See
+ * Thread::exec() for more details.
+ *
+ * \section thread-signals Threads and Signals
+ *
+ * When sent to a receiver that does not inherit from the Object class, signals
+ * are delivered synchronously in the thread of the sender. When the receiver
+ * inherits from the Object class, delivery is by default asynchronous if the
+ * sender and receiver live in different threads. In that case, the signal is
+ * posted to the receiver's message queue and will be delivered from the
+ * receiver's event loop, running in the receiver's thread. This mechanism can
+ * be overridden by selecting a different connection type when calling
+ * Signal::connect().
+ *
+ * \section thread-reentrancy Reentrancy and Thread-Safety
+ *
+ * Through the documentation, several terms are used to define how classes and
+ * their member functions can be used from multiple threads.
+ *
+ * - A **reentrant** function may be called simultaneously from multiple
+ * threads if and only if each invocation uses a different instance of the
+ * class. This is the default for all member functions not explictly marked
+ * otherwise.
+ *
+ * - \anchor thread-safe A **thread-safe** function may be called
+ * simultaneously from multiple threads on the same instance of a class. A
+ * thread-safe function is thus reentrant. Thread-safe functions may also be
+ * called simultaneously with any other reentrant function of the same class
+ * on the same instance.
+ *
+ * - \anchor thread-bound A **thread-bound** function may be called only from
+ * the thread that the class instances lives in (see section \ref
+ * thread-objects). For instances of classes that do not derive from the
+ * Object class, this is the thread in which the instance was created. A
+ * thread-bound function is not thread-safe, and may or may not be reentrant.
+ *
+ * Neither reentrancy nor thread-safety, in this context, mean that a function
+ * may be called simultaneously from the same thread, for instance from a
+ * callback invoked by the function. This may deadlock and isn't allowed unless
+ * separately documented.
+ *
+ * A class is defined as reentrant, thread-safe or thread-bound if all its
+ * member functions are reentrant, thread-safe or thread-bound respectively.
+ * Some member functions may additionally be documented as having additional
+ * thread-related attributes.
+ *
+ * Most classes are reentrant but not thread-safe, as making them fully
+ * thread-safe would incur locking costs considered prohibitive for the
+ * expected use cases.
+ */
+
+/**
+ * \file base/thread.h
+ * \brief Thread support
+ */
+
+namespace libcamera {
+
+LOG_DEFINE_CATEGORY(Thread)
+
+class ThreadMain;
+
+/**
+ * \brief A queue of posted messages
+ */
+class MessageQueue
+{
+public:
+ /**
+ * \brief List of queued Message instances
+ */
+ std::list<std::unique_ptr<Message>> list_;
+ /**
+ * \brief Protects the \ref list_
+ */
+ Mutex mutex_;
+};
+
+/**
+ * \brief Thread-local internal data
+ */
+class ThreadData
+{
+public:
+ ThreadData()
+ : thread_(nullptr), running_(false), dispatcher_(nullptr)
+ {
+ }
+
+ static ThreadData *current();
+
+private:
+ friend class Thread;
+ friend class ThreadMain;
+
+ Thread *thread_;
+ bool running_;
+ pid_t tid_;
+
+ Mutex mutex_;
+
+ std::atomic<EventDispatcher *> dispatcher_;
+
+ std::condition_variable cv_;
+ std::atomic<bool> exit_;
+ int exitCode_;
+
+ MessageQueue messages_;
+};
+
+/**
+ * \brief Thread wrapper for the main thread
+ */
+class ThreadMain : public Thread
+{
+public:
+ ThreadMain()
+ {
+ data_->running_ = true;
+ }
+
+protected:
+ void run() override
+ {
+ LOG(Thread, Fatal) << "The main thread can't be restarted";
+ }
+};
+
+static thread_local ThreadData *currentThreadData = nullptr;
+static ThreadMain mainThread;
+
+/**
+ * \brief Retrieve thread-local internal data for the current thread
+ * \return The thread-local internal data for the current thread
+ */
+ThreadData *ThreadData::current()
+{
+ if (currentThreadData)
+ return currentThreadData;
+
+ /*
+ * The main thread doesn't receive thread-local data when it is
+ * started, set it here.
+ */
+ ThreadData *data = mainThread.data_;
+ data->tid_ = syscall(SYS_gettid);
+ currentThreadData = data;
+ return data;
+}
+
+/**
+ * \typedef Mutex
+ * \brief An alias for std::mutex
+ */
+
+/**
+ * \typedef MutexLocker
+ * \brief An alias for std::unique_lock<std::mutex>
+ */
+
+/**
+ * \class Thread
+ * \brief A thread of execution
+ *
+ * The Thread class is a wrapper around std::thread that handles integration
+ * with the Object, Signal and EventDispatcher classes.
+ *
+ * Thread instances by default run an event loop until the exit() method is
+ * called. The event loop dispatches events (messages, notifiers and timers)
+ * sent to the objects living in the thread. This behaviour can be modified by
+ * overriding the run() function.
+ *
+ * \section thread-stop Stopping Threads
+ *
+ * Threads can't be forcibly stopped. Instead, a thread user first requests the
+ * thread to exit and then waits for the thread's main function to react to the
+ * request and return, at which points the thread will stop.
+ *
+ * For threads running exec(), the exit() function is used to request the thread
+ * to exit. For threads subclassing the Thread class and implementing a custom
+ * run() function, a subclass-specific mechanism shall be provided. In either
+ * case, the wait() function shall be called to wait for the thread to stop.
+ *
+ * Due to their asynchronous nature, threads are subject to race conditions when
+ * they stop. This is of particular importance for messages posted to the thread
+ * with postMessage() (and the other mechanisms that rely on it, such as
+ * Object::invokeMethod() or asynchronous signal delivery). To understand the
+ * issues, three contexts need to be considered:
+ *
+ * - The worker is the Thread performing work and being instructed to stop.
+ * - The controller is the context which instructs the worker thread to stop.
+ * - The other contexts are any threads other than the worker and controller
+ * that interact with the worker thread.
+ *
+ * Messages posted to the worker thread from the controller context before
+ * calling exit() are queued to the thread's message queue, and the Thread class
+ * offers no guarantee that those messages will be processed before the thread
+ * stops. This allows threads to stop fast.
+ *
+ * A thread that requires delivery of messages posted from the controller
+ * context before exit() should reimplement the run() function and call
+ * dispatchMessages() after exec().
+ *
+ * Messages posted to the worker thread from the other contexts are asynchronous
+ * with respect to the exit() call from the controller context. There is no
+ * guarantee as to whether those messages will be processed or not before the
+ * thread stops.
+ *
+ * Messages that are not processed will stay in the queue, in the exact same way
+ * as messages posted after the thread has stopped. They will be processed when
+ * the thread is restarted. If the thread is never restarted, they will be
+ * deleted without being processed when the Thread instance is destroyed.
+ */
+
+/**
+ * \brief Create a thread
+ */
+Thread::Thread()
+{
+ data_ = new ThreadData;
+ data_->thread_ = this;
+}
+
+Thread::~Thread()
+{
+ delete data_->dispatcher_.load(std::memory_order_relaxed);
+ delete data_;
+}
+
+/**
+ * \brief Start the thread
+ */
+void Thread::start()
+{
+ MutexLocker locker(data_->mutex_);
+
+ if (data_->running_)
+ return;
+
+ data_->running_ = true;
+ data_->exitCode_ = -1;
+ data_->exit_.store(false, std::memory_order_relaxed);
+
+ thread_ = std::thread(&Thread::startThread, this);
+}
+
+void Thread::startThread()
+{
+ struct ThreadCleaner {
+ ThreadCleaner(Thread *thread, void (Thread::*cleaner)())
+ : thread_(thread), cleaner_(cleaner)
+ {
+ }
+ ~ThreadCleaner()
+ {
+ (thread_->*cleaner_)();
+ }
+
+ Thread *thread_;
+ void (Thread::*cleaner_)();
+ };
+
+ /*
+ * Make sure the thread is cleaned up even if the run method exits
+ * abnormally (for instance via a direct call to pthread_cancel()).
+ */
+ thread_local ThreadCleaner cleaner(this, &Thread::finishThread);
+
+ data_->tid_ = syscall(SYS_gettid);
+ currentThreadData = data_;
+
+ run();
+}
+
+/**
+ * \brief Enter the event loop
+ *
+ * This method enters an event loop based on the event dispatcher instance for
+ * the thread, and blocks until the exit() method is called. It is meant to be
+ * called within the thread from the run() method and shall not be called
+ * outside of the thread.
+ *
+ * \return The exit code passed to the exit() method
+ */
+int Thread::exec()
+{
+ MutexLocker locker(data_->mutex_);
+
+ EventDispatcher *dispatcher = eventDispatcher();
+
+ locker.unlock();
+
+ while (!data_->exit_.load(std::memory_order_acquire))
+ dispatcher->processEvents();
+
+ locker.lock();
+
+ return data_->exitCode_;
+}
+
+/**
+ * \brief Main method of the thread
+ *
+ * When the thread is started with start(), it calls this method in the context
+ * of the new thread. The run() method can be overridden to perform custom
+ * work, either custom initialization and cleanup before and after calling the
+ * Thread::exec() function, or a custom thread loop altogether. When this
+ * method returns the thread execution is stopped, and the \ref finished signal
+ * is emitted.
+ *
+ * Note that if this function is overridden and doesn't call Thread::exec(), no
+ * events will be dispatched to the objects living in the thread. These objects
+ * will not be able to use the EventNotifier, Timer or Message facilities. This
+ * includes functions that rely on message dispatching, such as
+ * Object::deleteLater().
+ *
+ * The base implementation just calls exec().
+ */
+void Thread::run()
+{
+ exec();
+}
+
+void Thread::finishThread()
+{
+ data_->mutex_.lock();
+ data_->running_ = false;
+ data_->mutex_.unlock();
+
+ finished.emit(this);
+ data_->cv_.notify_all();
+}
+
+/**
+ * \brief Stop the thread's event loop
+ * \param[in] code The exit code
+ *
+ * This method interrupts the event loop started by the exec() method, causing
+ * exec() to return \a code.
+ *
+ * Calling exit() on a thread that reimplements the run() method and doesn't
+ * call exec() will likely have no effect.
+ *
+ * \context This function is \threadsafe.
+ */
+void Thread::exit(int code)
+{
+ data_->exitCode_ = code;
+ data_->exit_.store(true, std::memory_order_release);
+
+ EventDispatcher *dispatcher = data_->dispatcher_.load(std::memory_order_relaxed);
+ if (!dispatcher)
+ return;
+
+ dispatcher->interrupt();
+}
+
+/**
+ * \brief Wait for the thread to finish
+ * \param[in] duration Maximum wait duration
+ *
+ * This function waits until the thread finishes or the \a duration has
+ * elapsed, whichever happens first. If \a duration is equal to
+ * utils::duration::max(), the wait never times out. If the thread is not
+ * running the function returns immediately.
+ *
+ * \context This function is \threadsafe.
+ *
+ * \return True if the thread has finished, or false if the wait timed out
+ */
+bool Thread::wait(utils::duration duration)
+{
+ bool hasFinished = true;
+
+ {
+ MutexLocker locker(data_->mutex_);
+
+ if (duration == utils::duration::max())
+ data_->cv_.wait(locker, [&]() { return !data_->running_; });
+ else
+ hasFinished = data_->cv_.wait_for(locker, duration,
+ [&]() { return !data_->running_; });
+ }
+
+ if (thread_.joinable())
+ thread_.join();
+
+ return hasFinished;
+}
+
+/**
+ * \brief Check if the thread is running
+ *
+ * A Thread instance is considered as running once the underlying thread has
+ * started. This method guarantees that it returns true after the start()
+ * method returns, and false after the wait() method returns.
+ *
+ * \context This function is \threadsafe.
+ *
+ * \return True if the thread is running, false otherwise
+ */
+bool Thread::isRunning()
+{
+ MutexLocker locker(data_->mutex_);
+ return data_->running_;
+}
+
+/**
+ * \var Thread::finished
+ * \brief Signal the end of thread execution
+ */
+
+/**
+ * \brief Retrieve the Thread instance for the current thread
+ * \context This function is \threadsafe.
+ * \return The Thread instance for the current thread
+ */
+Thread *Thread::current()
+{
+ ThreadData *data = ThreadData::current();
+ return data->thread_;
+}
+
+/**
+ * \brief Retrieve the ID of the current thread
+ *
+ * The thread ID corresponds to the Linux thread ID (TID) as returned by the
+ * gettid system call.
+ *
+ * \context This function is \threadsafe.
+ *
+ * \return The ID of the current thread
+ */
+pid_t Thread::currentId()
+{
+ ThreadData *data = ThreadData::current();
+ return data->tid_;
+}
+
+/**
+ * \brief Retrieve the event dispatcher
+ *
+ * This function retrieves the internal event dispatcher for the thread. The
+ * returned event dispatcher is valid until the thread is destroyed.
+ *
+ * \context This function is \threadsafe.
+ *
+ * \return Pointer to the event dispatcher
+ */
+EventDispatcher *Thread::eventDispatcher()
+{
+ if (!data_->dispatcher_.load(std::memory_order_relaxed))
+ data_->dispatcher_.store(new EventDispatcherPoll(),
+ std::memory_order_release);
+
+ return data_->dispatcher_.load(std::memory_order_relaxed);
+}
+
+/**
+ * \brief Post a message to the thread for the \a receiver
+ * \param[in] msg The message
+ * \param[in] receiver The receiver
+ *
+ * This method stores the message \a msg in the message queue of the thread for
+ * the \a receiver and wake up the thread's event loop. Message ownership is
+ * passed to the thread, and the message will be deleted after being delivered.
+ *
+ * Messages are delivered through the thread's event loop. If the thread is not
+ * running its event loop the message will not be delivered until the event
+ * loop gets started.
+ *
+ * When the thread is stopped, posted messages may not have all been processed.
+ * See \ref thread-stop for additional information.
+ *
+ * If the \a receiver is not bound to this thread the behaviour is undefined.
+ *
+ * \sa exec()
+ */
+void Thread::postMessage(std::unique_ptr<Message> msg, Object *receiver)
+{
+ msg->receiver_ = receiver;
+
+ ASSERT(data_ == receiver->thread()->data_);
+
+ MutexLocker locker(data_->messages_.mutex_);
+ data_->messages_.list_.push_back(std::move(msg));
+ receiver->pendingMessages_++;
+ locker.unlock();
+
+ EventDispatcher *dispatcher =
+ data_->dispatcher_.load(std::memory_order_acquire);
+ if (dispatcher)
+ dispatcher->interrupt();
+}
+
+/**
+ * \brief Remove all posted messages for the \a receiver
+ * \param[in] receiver The receiver
+ *
+ * If the \a receiver is not bound to this thread the behaviour is undefined.
+ */
+void Thread::removeMessages(Object *receiver)
+{
+ ASSERT(data_ == receiver->thread()->data_);
+
+ MutexLocker locker(data_->messages_.mutex_);
+ if (!receiver->pendingMessages_)
+ return;
+
+ std::vector<std::unique_ptr<Message>> toDelete;
+ for (std::unique_ptr<Message> &msg : data_->messages_.list_) {
+ if (!msg)
+ continue;
+ if (msg->receiver_ != receiver)
+ continue;
+
+ /*
+ * Move the message to the pending deletion list to delete it
+ * after releasing the lock. The messages list element will
+ * contain a null pointer, and will be removed when dispatching
+ * messages.
+ */
+ toDelete.push_back(std::move(msg));
+ receiver->pendingMessages_--;
+ }
+
+ ASSERT(!receiver->pendingMessages_);
+ locker.unlock();
+
+ toDelete.clear();
+}
+
+/**
+ * \brief Dispatch posted messages for this thread
+ * \param[in] type The message type
+ *
+ * This function immediately dispatches all the messages previously posted for
+ * this thread with postMessage() that match the message \a type. If the \a type
+ * is Message::Type::None, all messages are dispatched.
+ *
+ * Messages shall only be dispatched from the current thread, typically within
+ * the thread from the run() function. Calling this function outside of the
+ * thread results in undefined behaviour.
+ */
+void Thread::dispatchMessages(Message::Type type)
+{
+ ASSERT(data_ == ThreadData::current());
+
+ MutexLocker locker(data_->messages_.mutex_);
+
+ std::list<std::unique_ptr<Message>> &messages = data_->messages_.list_;
+
+ for (auto iter = messages.begin(); iter != messages.end(); ) {
+ std::unique_ptr<Message> &msg = *iter;
+
+ if (!msg) {
+ iter = data_->messages_.list_.erase(iter);
+ continue;
+ }
+
+ if (type != Message::Type::None && msg->type() != type) {
+ ++iter;
+ continue;
+ }
+
+ std::unique_ptr<Message> message = std::move(msg);
+ iter = data_->messages_.list_.erase(iter);
+
+ Object *receiver = message->receiver_;
+ ASSERT(data_ == receiver->thread()->data_);
+ receiver->pendingMessages_--;
+
+ locker.unlock();
+ receiver->message(message.get());
+ message.reset();
+ locker.lock();
+ }
+}
+
+/**
+ * \brief Move an \a object and all its children to the thread
+ * \param[in] object The object
+ */
+void Thread::moveObject(Object *object)
+{
+ ThreadData *currentData = object->thread_->data_;
+ ThreadData *targetData = data_;
+
+ MutexLocker lockerFrom(currentData->messages_.mutex_, std::defer_lock);
+ MutexLocker lockerTo(targetData->messages_.mutex_, std::defer_lock);
+ std::lock(lockerFrom, lockerTo);
+
+ moveObject(object, currentData, targetData);
+}
+
+void Thread::moveObject(Object *object, ThreadData *currentData,
+ ThreadData *targetData)
+{
+ /* Move pending messages to the message queue of the new thread. */
+ if (object->pendingMessages_) {
+ unsigned int movedMessages = 0;
+
+ for (std::unique_ptr<Message> &msg : currentData->messages_.list_) {
+ if (!msg)
+ continue;
+ if (msg->receiver_ != object)
+ continue;
+
+ targetData->messages_.list_.push_back(std::move(msg));
+ movedMessages++;
+ }
+
+ if (movedMessages) {
+ EventDispatcher *dispatcher =
+ targetData->dispatcher_.load(std::memory_order_acquire);
+ if (dispatcher)
+ dispatcher->interrupt();
+ }
+ }
+
+ object->thread_ = this;
+
+ /* Move all children. */
+ for (auto child : object->children_)
+ moveObject(child, currentData, targetData);
+}
+
+} /* namespace libcamera */