summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLaurent Pinchart <laurent.pinchart@ideasonboard.com>2019-07-02 00:58:53 +0300
committerLaurent Pinchart <laurent.pinchart@ideasonboard.com>2019-07-02 02:37:18 +0300
commitf137451817f47c0bfe59586afe5af7b51f8ccad4 (patch)
tree4fa3220f1a67a506c132bd308bcf97b66ef39e2b /src
parenta00fdabacdd093c3eccb3d44155e151f59d783bf (diff)
libcamera: ipc: unix: Make socket operation asynchronous
Blocking socket operation when receiving messages may lead to long delays, and possibly a complete deadlock, if the remote side delays sending of the payload after the header, or doesn't send the payload at all. To avoid this, make the socket non-blocking and implement a simple state machine to receive the header synchronously with the socket read notification. The payload read is still synchronous with the receive() method to avoid data copies. Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> Reviewed-by: Niklas Söderlund <niklas.soderlund@ragnatech.se>
Diffstat (limited to 'src')
-rw-r--r--src/libcamera/include/ipc_unixsocket.h2
-rw-r--r--src/libcamera/ipc_unixsocket.cpp88
2 files changed, 63 insertions, 27 deletions
diff --git a/src/libcamera/include/ipc_unixsocket.h b/src/libcamera/include/ipc_unixsocket.h
index ef166d74..03e9fe49 100644
--- a/src/libcamera/include/ipc_unixsocket.h
+++ b/src/libcamera/include/ipc_unixsocket.h
@@ -49,6 +49,8 @@ private:
void dataNotifier(EventNotifier *notifier);
int fd_;
+ bool headerReceived_;
+ struct Header header_;
EventNotifier *notifier_;
};
diff --git a/src/libcamera/ipc_unixsocket.cpp b/src/libcamera/ipc_unixsocket.cpp
index c11f1160..def08eef 100644
--- a/src/libcamera/ipc_unixsocket.cpp
+++ b/src/libcamera/ipc_unixsocket.cpp
@@ -7,6 +7,7 @@
#include "ipc_unixsocket.h"
+#include <poll.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
@@ -49,10 +50,10 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket)
* transporting entire payloads with guaranteed ordering.
*
* The IPC design is asynchronous, a message is queued to a receiver which gets
- * notified that a message is ready to be consumed by a signal. The queuer of
- * the message gets no notification when a message is delivered nor processed.
- * If such interactions are needed a protocol specific to the users use-case
- * should be implemented on top of the IPC objects.
+ * notified that a message is ready to be consumed by the \ref readyRead
+ * signal. The sender of the message gets no notification when a message is
+ * delivered nor processed. If such interactions are needed a protocol specific
+ * to the users use-case should be implemented on top of the IPC objects.
*
* Establishment of an IPC channel is asymmetrical. The side that initiates
* communication first instantiates a local side socket and creates the channel
@@ -64,7 +65,7 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket)
*/
IPCUnixSocket::IPCUnixSocket()
- : fd_(-1), notifier_(nullptr)
+ : fd_(-1), headerReceived_(false), notifier_(nullptr)
{
}
@@ -89,7 +90,7 @@ int IPCUnixSocket::create()
int sockets[2];
int ret;
- ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets);
+ ret = socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, sockets);
if (ret) {
ret = -errno;
LOG(IPCUnixSocket, Error)
@@ -142,6 +143,7 @@ void IPCUnixSocket::close()
::close(fd_);
fd_ = -1;
+ headerReceived_ = false;
}
/**
@@ -193,38 +195,38 @@ int IPCUnixSocket::send(const Payload &payload)
* \param[out] payload Payload where to write the received message
*
* This method receives the message payload from the IPC channel and writes it
- * to the \a payload. It blocks until one message is received, if an
- * asynchronous behavior is desired this method should be called when the
- * readyRead signal is emitted.
+ * to the \a payload. If no message payload is available, it returns
+ * immediately with -EAGAIN. The \ref readyRead signal shall be used to receive
+ * notification of message availability.
*
* \todo Add state machine to make sure we don't block forever and that
* a header is always followed by a payload.
*
* \return 0 on success or a negative error code otherwise
+ * \retval -EAGAIN No message payload is available
+ * \retval -ENOTCONN The socket is not connected (neither create() nor bind()
+ * has been called)
*/
int IPCUnixSocket::receive(Payload *payload)
{
- Header hdr;
- int ret;
-
if (!isBound())
return -ENOTCONN;
- if (!payload)
- return -EINVAL;
+ if (!headerReceived_)
+ return -EAGAIN;
- ret = ::recv(fd_, &hdr, sizeof(hdr), 0);
- if (ret < 0) {
- ret = -errno;
- LOG(IPCUnixSocket, Error)
- << "Failed to recv header: " << strerror(-ret);
+ payload->data.resize(header_.data);
+ payload->fds.resize(header_.fds);
+
+ int ret = recvData(payload->data.data(), header_.data,
+ payload->fds.data(), header_.fds);
+ if (ret < 0)
return ret;
- }
- payload->data.resize(hdr.data);
- payload->fds.resize(hdr.fds);
+ headerReceived_ = false;
+ notifier_->setEnabled(true);
- return recvData(payload->data.data(), hdr.data, payload->fds.data(), hdr.fds);
+ return 0;
}
/**
@@ -232,7 +234,8 @@ int IPCUnixSocket::receive(Payload *payload)
* \brief A Signal emitted when a message is ready to be read
*/
-int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fds, unsigned int num)
+int IPCUnixSocket::sendData(const void *buffer, size_t length,
+ const int32_t *fds, unsigned int num)
{
struct iovec iov[1];
iov[0].iov_base = const_cast<void *>(buffer);
@@ -266,7 +269,8 @@ int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fd
return 0;
}
-int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned int num)
+int IPCUnixSocket::recvData(void *buffer, size_t length,
+ int32_t *fds, unsigned int num)
{
struct iovec iov[1];
iov[0].iov_base = buffer;
@@ -291,8 +295,9 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned
if (recvmsg(fd_, &msg, 0) < 0) {
int ret = -errno;
- LOG(IPCUnixSocket, Error)
- << "Failed to recvmsg: " << strerror(-ret);
+ if (ret != -EAGAIN)
+ LOG(IPCUnixSocket, Error)
+ << "Failed to recvmsg: " << strerror(-ret);
return ret;
}
@@ -303,6 +308,35 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned
void IPCUnixSocket::dataNotifier(EventNotifier *notifier)
{
+ int ret;
+
+ if (!headerReceived_) {
+ /* Receive the header. */
+ ret = ::recv(fd_, &header_, sizeof(header_), 0);
+ if (ret < 0) {
+ ret = -errno;
+ LOG(IPCUnixSocket, Error)
+ << "Failed to receive header: " << strerror(-ret);
+ return;
+ }
+
+ headerReceived_ = true;
+ }
+
+ /*
+ * If the payload has arrived, disable the notifier and emit the
+ * readyRead signal. The notifier will be reenabled by the receive()
+ * method.
+ */
+ struct pollfd fds = { fd_, POLLIN, 0 };
+ ret = poll(&fds, 1, 0);
+ if (ret < 0)
+ return;
+
+ if (!(fds.revents & POLLIN))
+ return;
+
+ notifier_->setEnabled(false);
readyRead.emit(this);
}