summaryrefslogtreecommitdiff
path: root/src/py
diff options
context:
space:
mode:
Diffstat (limited to 'src/py')
-rwxr-xr-xsrc/py/cam/cam.py472
-rw-r--r--src/py/cam/cam_kms.py184
-rw-r--r--src/py/cam/cam_null.py47
-rw-r--r--src/py/cam/cam_qt.py182
-rw-r--r--src/py/cam/cam_qtgl.py363
-rw-r--r--src/py/cam/gl_helpers.py66
-rw-r--r--src/py/cam/helpers.py158
-rwxr-xr-xsrc/py/examples/simple-cam.py340
-rwxr-xr-xsrc/py/examples/simple-capture.py163
-rwxr-xr-xsrc/py/examples/simple-continuous-capture.py185
-rw-r--r--src/py/libcamera/__init__.py4
-rwxr-xr-xsrc/py/libcamera/gen-py-controls.py111
-rwxr-xr-xsrc/py/libcamera/gen-py-formats.py56
-rw-r--r--src/py/libcamera/meson.build102
-rw-r--r--src/py/libcamera/py_camera_manager.cpp131
-rw-r--r--src/py/libcamera/py_camera_manager.h45
-rw-r--r--src/py/libcamera/py_color_space.cpp72
-rw-r--r--src/py/libcamera/py_controls_generated.cpp.in47
-rw-r--r--src/py/libcamera/py_enums.cpp47
-rw-r--r--src/py/libcamera/py_formats_generated.cpp.in27
-rw-r--r--src/py/libcamera/py_geometry.cpp121
-rw-r--r--src/py/libcamera/py_helpers.cpp101
-rw-r--r--src/py/libcamera/py_helpers.h13
-rw-r--r--src/py/libcamera/py_main.cpp523
-rw-r--r--src/py/libcamera/py_main.h24
-rw-r--r--src/py/libcamera/py_transform.cpp83
-rw-r--r--src/py/libcamera/utils/MappedFrameBuffer.py105
-rw-r--r--src/py/libcamera/utils/__init__.py4
-rw-r--r--src/py/meson.build3
29 files changed, 3779 insertions, 0 deletions
diff --git a/src/py/cam/cam.py b/src/py/cam/cam.py
new file mode 100755
index 00000000..ff4b7f66
--- /dev/null
+++ b/src/py/cam/cam.py
@@ -0,0 +1,472 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from typing import Any
+import argparse
+import binascii
+import libcamera as libcam
+import libcamera.utils
+import sys
+import traceback
+
+
+class CameraContext:
+ camera: libcam.Camera
+ id: str
+ idx: int
+
+ opt_stream: str
+ opt_strict_formats: bool
+ opt_crc: bool
+ opt_metadata: bool
+ opt_save_frames: bool
+ opt_capture: int
+ opt_orientation: str
+
+ stream_names: dict[libcam.Stream, str]
+ streams: list[libcam.Stream]
+ allocator: libcam.FrameBufferAllocator
+ requests: list[libcam.Request]
+ reqs_queued: int
+ reqs_completed: int
+ last: int = 0
+ fps: float
+
+ def __init__(self, camera, idx):
+ self.camera = camera
+ self.idx = idx
+ self.id = 'cam' + str(idx)
+ self.reqs_queued = 0
+ self.reqs_completed = 0
+
+ def do_cmd_list_props(self):
+ print('Properties for', self.id)
+
+ for cid, val in self.camera.properties.items():
+ print('\t{}: {}'.format(cid, val))
+
+ def do_cmd_list_controls(self):
+ print('Controls for', self.id)
+
+ for cid, info in self.camera.controls.items():
+ print('\t{}: {}'.format(cid, info))
+
+ def do_cmd_info(self):
+ print('Stream info for', self.id)
+
+ roles = [libcam.StreamRole.Viewfinder]
+
+ camconfig = self.camera.generate_configuration(roles)
+ if camconfig is None:
+ raise Exception('Generating config failed')
+
+ for i, stream_config in enumerate(camconfig):
+ print('\t{}: {}'.format(i, stream_config))
+
+ formats = stream_config.formats
+ for fmt in formats.pixel_formats:
+ print('\t * Pixelformat:', fmt, formats.range(fmt))
+
+ for size in formats.sizes(fmt):
+ print('\t -', size)
+
+ def acquire(self):
+ self.camera.acquire()
+
+ def release(self):
+ self.camera.release()
+
+ def __parse_streams(self):
+ streams = []
+
+ for stream_desc in self.opt_stream:
+ stream_opts: dict[str, Any]
+ stream_opts = {'role': libcam.StreamRole.Viewfinder}
+
+ for stream_opt in stream_desc.split(','):
+ if stream_opt == 0:
+ continue
+
+ arr = stream_opt.split('=')
+ if len(arr) != 2:
+ print('Bad stream option', stream_opt)
+ sys.exit(-1)
+
+ key = arr[0]
+ value = arr[1]
+
+ if key in ['width', 'height']:
+ value = int(value)
+ elif key == 'role':
+ rolemap = {
+ 'still': libcam.StreamRole.StillCapture,
+ 'raw': libcam.StreamRole.Raw,
+ 'video': libcam.StreamRole.VideoRecording,
+ 'viewfinder': libcam.StreamRole.Viewfinder,
+ }
+
+ role = rolemap.get(value.lower(), None)
+
+ if role is None:
+ print('Bad stream role', value)
+ sys.exit(-1)
+
+ value = role
+ elif key == 'pixelformat':
+ pass
+ else:
+ print('Bad stream option key', key)
+ sys.exit(-1)
+
+ stream_opts[key] = value
+
+ streams.append(stream_opts)
+
+ return streams
+
+ def configure(self):
+ streams = self.__parse_streams()
+
+ roles = [opts['role'] for opts in streams]
+
+ camconfig = self.camera.generate_configuration(roles)
+ if camconfig is None:
+ raise Exception('Generating config failed')
+
+ for idx, stream_opts in enumerate(streams):
+ stream_config = camconfig.at(idx)
+
+ if 'width' in stream_opts:
+ stream_config.size.width = stream_opts['width']
+
+ if 'height' in stream_opts:
+ stream_config.size.height = stream_opts['height']
+
+ if 'pixelformat' in stream_opts:
+ stream_config.pixel_format = libcam.PixelFormat(stream_opts['pixelformat'])
+
+ if self.opt_orientation is not None:
+ orientation_map = {
+ 'rot0': libcam.Orientation.Rotate0,
+ 'rot180': libcam.Orientation.Rotate180,
+ 'mirror': libcam.Orientation.Rotate0Mirror,
+ 'flip': libcam.Orientation.Rotate180Mirror,
+ }
+
+ orient = orientation_map.get(self.opt_orientation, None)
+ if orient is None:
+ print('Bad orientation: ', self.opt_orientation)
+ sys.exit(-1)
+
+ camconfig.orientation = orient
+
+ stat = camconfig.validate()
+
+ if stat == libcam.CameraConfiguration.Status.Invalid:
+ print('Camera configuration invalid')
+ exit(-1)
+ elif stat == libcam.CameraConfiguration.Status.Adjusted:
+ if self.opt_strict_formats:
+ print('Adjusting camera configuration disallowed by --strict-formats argument')
+ exit(-1)
+
+ print('Camera configuration adjusted')
+
+ self.camera.configure(camconfig)
+
+ self.stream_names = {}
+ self.streams = []
+
+ for idx, stream_config in enumerate(camconfig):
+ stream = stream_config.stream
+ self.streams.append(stream)
+ self.stream_names[stream] = 'stream' + str(idx)
+ print('{}-{}: stream config {}'.format(self.id, self.stream_names[stream], stream.configuration))
+
+ def alloc_buffers(self):
+ allocator = libcam.FrameBufferAllocator(self.camera)
+
+ for stream in self.streams:
+ allocated = allocator.allocate(stream)
+
+ print('{}-{}: Allocated {} buffers'.format(self.id, self.stream_names[stream], allocated))
+
+ self.allocator = allocator
+
+ def create_requests(self):
+ self.requests = []
+
+ # Identify the stream with the least number of buffers
+ num_bufs = min([len(self.allocator.buffers(stream)) for stream in self.streams])
+
+ requests = []
+
+ for buf_num in range(num_bufs):
+ request = self.camera.create_request(self.idx)
+
+ if request is None:
+ print('Can not create request')
+ exit(-1)
+
+ for stream in self.streams:
+ buffers = self.allocator.buffers(stream)
+ buffer = buffers[buf_num]
+
+ request.add_buffer(stream, buffer)
+
+ requests.append(request)
+
+ self.requests = requests
+
+ def start(self):
+ self.camera.start()
+
+ def stop(self):
+ self.camera.stop()
+
+ def queue_requests(self):
+ for request in self.requests:
+ self.camera.queue_request(request)
+ self.reqs_queued += 1
+
+ del self.requests
+
+
+class CaptureState:
+ cm: libcam.CameraManager
+ contexts: list[CameraContext]
+ renderer: Any
+
+ def __init__(self, cm, contexts):
+ self.cm = cm
+ self.contexts = contexts
+
+ # Called from renderer when there is a libcamera event
+ def event_handler(self):
+ try:
+ reqs = self.cm.get_ready_requests()
+
+ for req in reqs:
+ ctx = next(ctx for ctx in self.contexts if ctx.idx == req.cookie)
+ self.__request_handler(ctx, req)
+
+ running = any(ctx.reqs_completed < ctx.opt_capture for ctx in self.contexts)
+ return running
+ except Exception:
+ traceback.print_exc()
+ return False
+
+ def __request_handler(self, ctx, req):
+ if req.status != libcam.Request.Status.Complete:
+ raise Exception('{}: Request failed: {}'.format(ctx.id, req.status))
+
+ buffers = req.buffers
+
+ # Compute the frame rate. The timestamp is arbitrarily retrieved from
+ # the first buffer, as all buffers should have matching timestamps.
+ ts = buffers[next(iter(buffers))].metadata.timestamp
+ last = ctx.last
+ fps = 1000000000.0 / (ts - last) if (last != 0 and (ts - last) != 0) else 0
+ ctx.last = ts
+ ctx.fps = fps
+
+ if ctx.opt_metadata:
+ reqmeta = req.metadata
+ for ctrl, val in reqmeta.items():
+ print(f'\t{ctrl} = {val}')
+
+ for stream, fb in buffers.items():
+ stream_name = ctx.stream_names[stream]
+
+ crcs = []
+ if ctx.opt_crc:
+ with libcamera.utils.MappedFrameBuffer(fb) as mfb:
+ plane_crcs = [binascii.crc32(p) for p in mfb.planes]
+ crcs.append(plane_crcs)
+
+ meta = fb.metadata
+
+ print('{:.6f} ({:.2f} fps) {}-{}: seq {}, bytes {}, CRCs {}'
+ .format(ts / 1000000000, fps,
+ ctx.id, stream_name,
+ meta.sequence,
+ '/'.join([str(p.bytes_used) for p in meta.planes]),
+ crcs))
+
+ if ctx.opt_save_frames:
+ with libcamera.utils.MappedFrameBuffer(fb) as mfb:
+ filename = 'frame-{}-{}-{}.data'.format(ctx.id, stream_name, ctx.reqs_completed)
+ with open(filename, 'wb') as f:
+ for p in mfb.planes:
+ f.write(p)
+
+ self.renderer.request_handler(ctx, req)
+
+ ctx.reqs_completed += 1
+
+ # Called from renderer when it has finished with a request
+ def request_processed(self, ctx, req):
+ if ctx.reqs_queued < ctx.opt_capture:
+ req.reuse()
+ ctx.camera.queue_request(req)
+ ctx.reqs_queued += 1
+
+ def __capture_init(self):
+ for ctx in self.contexts:
+ ctx.acquire()
+
+ for ctx in self.contexts:
+ ctx.configure()
+
+ for ctx in self.contexts:
+ ctx.alloc_buffers()
+
+ for ctx in self.contexts:
+ ctx.create_requests()
+
+ def __capture_start(self):
+ for ctx in self.contexts:
+ ctx.start()
+
+ for ctx in self.contexts:
+ ctx.queue_requests()
+
+ def __capture_deinit(self):
+ for ctx in self.contexts:
+ ctx.stop()
+
+ for ctx in self.contexts:
+ ctx.release()
+
+ def do_cmd_capture(self):
+ self.__capture_init()
+
+ self.renderer.setup()
+
+ self.__capture_start()
+
+ self.renderer.run()
+
+ self.__capture_deinit()
+
+
+class CustomAction(argparse.Action):
+ def __init__(self, option_strings, dest, **kwargs):
+ super().__init__(option_strings, dest, default={}, **kwargs)
+
+ def __call__(self, parser, namespace, values, option_string=None):
+ if len(namespace.camera) == 0:
+ print(f'Option {option_string} requires a --camera context')
+ sys.exit(-1)
+
+ if self.type == bool:
+ values = True
+
+ current = namespace.camera[-1]
+
+ data = getattr(namespace, self.dest)
+
+ if self.nargs == '+':
+ if current not in data:
+ data[current] = []
+
+ data[current] += values
+ else:
+ data[current] = values
+
+
+def do_cmd_list(cm):
+ print('Available cameras:')
+
+ for idx, c in enumerate(cm.cameras):
+ print(f'{idx + 1}: {c.id}')
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ # global options
+ parser.add_argument('-l', '--list', action='store_true', help='List all cameras')
+ parser.add_argument('-c', '--camera', type=int, action='extend', nargs=1, default=[], help='Specify which camera to operate on, by index')
+ parser.add_argument('-p', '--list-properties', action='store_true', help='List cameras properties')
+ parser.add_argument('--list-controls', action='store_true', help='List cameras controls')
+ parser.add_argument('-I', '--info', action='store_true', help='Display information about stream(s)')
+ parser.add_argument('-R', '--renderer', default='null', help='Renderer (null, kms, qt, qtgl)')
+
+ # per camera options
+ parser.add_argument('-C', '--capture', nargs='?', type=int, const=1000000, action=CustomAction, help='Capture until interrupted by user or until CAPTURE frames captured')
+ parser.add_argument('--crc', nargs=0, type=bool, action=CustomAction, help='Print CRC32 for captured frames')
+ parser.add_argument('--save-frames', nargs=0, type=bool, action=CustomAction, help='Save captured frames to files')
+ parser.add_argument('--metadata', nargs=0, type=bool, action=CustomAction, help='Print the metadata for completed requests')
+ parser.add_argument('--strict-formats', type=bool, nargs=0, action=CustomAction, help='Do not allow requested stream format(s) to be adjusted')
+ parser.add_argument('-s', '--stream', nargs='+', action=CustomAction)
+ parser.add_argument('-o', '--orientation', help='Desired image orientation (rot0, rot180, mirror, flip)')
+ args = parser.parse_args()
+
+ cm = libcam.CameraManager.singleton()
+
+ if args.list:
+ do_cmd_list(cm)
+
+ contexts = []
+
+ for cam_idx in args.camera:
+ camera = next((c for i, c in enumerate(cm.cameras) if i + 1 == cam_idx), None)
+
+ if camera is None:
+ print('Unable to find camera', cam_idx)
+ return -1
+
+ ctx = CameraContext(camera, cam_idx)
+ ctx.opt_capture = args.capture.get(cam_idx, 0)
+ ctx.opt_crc = args.crc.get(cam_idx, False)
+ ctx.opt_save_frames = args.save_frames.get(cam_idx, False)
+ ctx.opt_metadata = args.metadata.get(cam_idx, False)
+ ctx.opt_strict_formats = args.strict_formats.get(cam_idx, False)
+ ctx.opt_stream = args.stream.get(cam_idx, ['role=viewfinder'])
+ ctx.opt_orientation = args.orientation
+ contexts.append(ctx)
+
+ for ctx in contexts:
+ print('Using camera {} as {}'.format(ctx.camera.id, ctx.id))
+
+ for ctx in contexts:
+ if args.list_properties:
+ ctx.do_cmd_list_props()
+ if args.list_controls:
+ ctx.do_cmd_list_controls()
+ if args.info:
+ ctx.do_cmd_info()
+
+ # Filter out capture contexts which are not marked for capture
+ contexts = [ctx for ctx in contexts if ctx.opt_capture > 0]
+
+ if contexts:
+ state = CaptureState(cm, contexts)
+
+ if args.renderer == 'null':
+ import cam_null
+ renderer = cam_null.NullRenderer(state)
+ elif args.renderer == 'kms':
+ import cam_kms
+ renderer = cam_kms.KMSRenderer(state)
+ elif args.renderer == 'qt':
+ import cam_qt
+ renderer = cam_qt.QtRenderer(state)
+ elif args.renderer == 'qtgl':
+ import cam_qtgl
+ renderer = cam_qtgl.QtRenderer(state)
+ else:
+ print('Bad renderer', args.renderer)
+ return -1
+
+ state.renderer = renderer
+
+ state.do_cmd_capture()
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/src/py/cam/cam_kms.py b/src/py/cam/cam_kms.py
new file mode 100644
index 00000000..38fc382d
--- /dev/null
+++ b/src/py/cam/cam_kms.py
@@ -0,0 +1,184 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+import pykms
+import selectors
+import sys
+
+
+class KMSRenderer:
+ def __init__(self, state):
+ self.state = state
+
+ self.cm = state.cm
+ self.contexts = state.contexts
+ self.running = False
+
+ card = pykms.Card()
+
+ res = pykms.ResourceManager(card)
+ conn = res.reserve_connector()
+ crtc = res.reserve_crtc(conn)
+ mode = conn.get_default_mode()
+ modeb = mode.to_blob(card)
+
+ req = pykms.AtomicReq(card)
+ req.add_connector(conn, crtc)
+ req.add_crtc(crtc, modeb)
+ r = req.commit_sync(allow_modeset=True)
+ assert(r == 0)
+
+ self.card = card
+ self.resman = res
+ self.crtc = crtc
+ self.mode = mode
+
+ self.bufqueue = []
+ self.current = None
+ self.next = None
+ self.cam_2_drm = {}
+
+ # KMS
+
+ def close(self):
+ req = pykms.AtomicReq(self.card)
+ for s in self.streams:
+ req.add_plane(s['plane'], None, None, dst=(0, 0, 0, 0))
+ req.commit()
+
+ def add_plane(self, req, stream, fb):
+ s = next(s for s in self.streams if s['stream'] == stream)
+ idx = s['idx']
+ plane = s['plane']
+
+ if idx % 2 == 0:
+ x = 0
+ else:
+ x = self.mode.hdisplay - fb.width
+
+ if idx // 2 == 0:
+ y = 0
+ else:
+ y = self.mode.vdisplay - fb.height
+
+ req.add_plane(plane, fb, self.crtc, dst=(x, y, fb.width, fb.height))
+
+ def apply_request(self, drmreq):
+
+ buffers = drmreq['camreq'].buffers
+
+ req = pykms.AtomicReq(self.card)
+
+ for stream, fb in buffers.items():
+ drmfb = self.cam_2_drm.get(fb, None)
+ self.add_plane(req, stream, drmfb)
+
+ req.commit()
+
+ def handle_page_flip(self, frame, time):
+ old = self.current
+ self.current = self.next
+
+ if len(self.bufqueue) > 0:
+ self.next = self.bufqueue.pop(0)
+ else:
+ self.next = None
+
+ if self.next:
+ drmreq = self.next
+
+ self.apply_request(drmreq)
+
+ if old:
+ req = old['camreq']
+ ctx = old['camctx']
+ self.state.request_processed(ctx, req)
+
+ def queue(self, drmreq):
+ if not self.next:
+ self.next = drmreq
+ self.apply_request(drmreq)
+ else:
+ self.bufqueue.append(drmreq)
+
+ # libcamera
+
+ def setup(self):
+ self.streams = []
+
+ idx = 0
+ for ctx in self.contexts:
+ for stream in ctx.streams:
+
+ cfg = stream.configuration
+ fmt = cfg.pixel_format
+ fmt = pykms.PixelFormat(fmt.fourcc)
+
+ plane = self.resman.reserve_generic_plane(self.crtc, fmt)
+ assert(plane is not None)
+
+ self.streams.append({
+ 'idx': idx,
+ 'stream': stream,
+ 'plane': plane,
+ 'fmt': fmt,
+ 'size': cfg.size,
+ })
+
+ for fb in ctx.allocator.buffers(stream):
+ w = cfg.size.width
+ h = cfg.size.height
+ fds = []
+ strides = []
+ offsets = []
+ for plane in fb.planes:
+ fds.append(plane.fd)
+ strides.append(cfg.stride)
+ offsets.append(plane.offset)
+
+ drmfb = pykms.DmabufFramebuffer(self.card, w, h, fmt,
+ fds, strides, offsets)
+ self.cam_2_drm[fb] = drmfb
+
+ idx += 1
+
+ def readdrm(self, fileobj):
+ for ev in self.card.read_events():
+ if ev.type == pykms.DrmEventType.FLIP_COMPLETE:
+ self.handle_page_flip(ev.seq, ev.time)
+
+ def readcam(self, fd):
+ self.running = self.state.event_handler()
+
+ def readkey(self, fileobj):
+ sys.stdin.readline()
+ self.running = False
+
+ def run(self):
+ print('Capturing...')
+
+ self.running = True
+
+ sel = selectors.DefaultSelector()
+ sel.register(self.card.fd, selectors.EVENT_READ, self.readdrm)
+ sel.register(self.cm.event_fd, selectors.EVENT_READ, self.readcam)
+ sel.register(sys.stdin, selectors.EVENT_READ, self.readkey)
+
+ print('Press enter to exit')
+
+ while self.running:
+ events = sel.select()
+ for key, mask in events:
+ callback = key.data
+ callback(key.fileobj)
+
+ print('Exiting...')
+
+ def request_handler(self, ctx, req):
+
+ drmreq = {
+ 'camctx': ctx,
+ 'camreq': req,
+ }
+
+ self.queue(drmreq)
diff --git a/src/py/cam/cam_null.py b/src/py/cam/cam_null.py
new file mode 100644
index 00000000..40dbd266
--- /dev/null
+++ b/src/py/cam/cam_null.py
@@ -0,0 +1,47 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+import selectors
+import sys
+
+
+class NullRenderer:
+ def __init__(self, state):
+ self.state = state
+
+ self.cm = state.cm
+ self.contexts = state.contexts
+
+ self.running = False
+
+ def setup(self):
+ pass
+
+ def run(self):
+ print('Capturing...')
+
+ self.running = True
+
+ sel = selectors.DefaultSelector()
+ sel.register(self.cm.event_fd, selectors.EVENT_READ, self.readcam)
+ sel.register(sys.stdin, selectors.EVENT_READ, self.readkey)
+
+ print('Press enter to exit')
+
+ while self.running:
+ events = sel.select()
+ for key, mask in events:
+ callback = key.data
+ callback(key.fileobj)
+
+ print('Exiting...')
+
+ def readcam(self, fd):
+ self.running = self.state.event_handler()
+
+ def readkey(self, fileobj):
+ sys.stdin.readline()
+ self.running = False
+
+ def request_handler(self, ctx, req):
+ self.state.request_processed(ctx, req)
diff --git a/src/py/cam/cam_qt.py b/src/py/cam/cam_qt.py
new file mode 100644
index 00000000..22d8c4da
--- /dev/null
+++ b/src/py/cam/cam_qt.py
@@ -0,0 +1,182 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from helpers import mfb_to_rgb
+from PyQt6 import QtCore, QtGui, QtWidgets
+import libcamera as libcam
+import libcamera.utils
+import sys
+
+
+# Loading MJPEG to a QPixmap produces corrupt JPEG data warnings. Ignore these.
+def qt_message_handler(msg_type, msg_log_context, msg_string):
+ if msg_string.startswith("Corrupt JPEG data"):
+ return
+
+ # For some reason qInstallMessageHandler returns None, so we won't
+ # call the old handler
+ if old_msg_handler is not None:
+ old_msg_handler(msg_type, msg_log_context, msg_string)
+ else:
+ print(msg_string)
+
+
+old_msg_handler = QtCore.qInstallMessageHandler(qt_message_handler)
+
+
+def rgb_to_pix(rgb):
+ w = rgb.shape[1]
+ h = rgb.shape[0]
+ qim = QtGui.QImage(rgb, w, h, QtGui.QImage.Format.Format_RGB888)
+ pix = QtGui.QPixmap.fromImage(qim)
+ return pix
+
+
+class QtRenderer:
+ def __init__(self, state):
+ self.state = state
+
+ self.cm = state.cm
+ self.contexts = state.contexts
+
+ def setup(self):
+ self.app = QtWidgets.QApplication([])
+
+ windows = []
+
+ for ctx in self.contexts:
+ for stream in ctx.streams:
+ window = MainWindow(ctx, stream)
+ window.show()
+ windows.append(window)
+
+ self.windows = windows
+
+ buf_mmap_map = {}
+
+ for ctx in self.contexts:
+ for stream in ctx.streams:
+ for buf in ctx.allocator.buffers(stream):
+ mfb = libcamera.utils.MappedFrameBuffer(buf).mmap()
+ buf_mmap_map[buf] = mfb
+
+ self.buf_mmap_map = buf_mmap_map
+
+ def run(self):
+ camnotif = QtCore.QSocketNotifier(self.cm.event_fd, QtCore.QSocketNotifier.Type.Read)
+ camnotif.activated.connect(lambda _: self.readcam())
+
+ keynotif = QtCore.QSocketNotifier(sys.stdin.fileno(), QtCore.QSocketNotifier.Type.Read)
+ keynotif.activated.connect(lambda _: self.readkey())
+
+ print('Capturing...')
+
+ self.app.exec()
+
+ print('Exiting...')
+
+ def readcam(self):
+ running = self.state.event_handler()
+
+ if not running:
+ self.app.quit()
+
+ def readkey(self):
+ sys.stdin.readline()
+ self.app.quit()
+
+ def request_handler(self, ctx, req):
+ buffers = req.buffers
+
+ for stream, fb in buffers.items():
+ wnd = next(wnd for wnd in self.windows if wnd.stream == stream)
+
+ mfb = self.buf_mmap_map[fb]
+
+ wnd.handle_request(stream, mfb)
+
+ self.state.request_processed(ctx, req)
+
+ def cleanup(self):
+ for w in self.windows:
+ w.close()
+
+
+class MainWindow(QtWidgets.QWidget):
+ def __init__(self, ctx, stream):
+ super().__init__()
+
+ self.ctx = ctx
+ self.stream = stream
+
+ self.label = QtWidgets.QLabel()
+
+ windowLayout = QtWidgets.QHBoxLayout()
+ self.setLayout(windowLayout)
+
+ windowLayout.addWidget(self.label)
+
+ controlsLayout = QtWidgets.QVBoxLayout()
+ windowLayout.addLayout(controlsLayout)
+
+ windowLayout.addStretch()
+
+ group = QtWidgets.QGroupBox('Info')
+ groupLayout = QtWidgets.QVBoxLayout()
+ group.setLayout(groupLayout)
+ controlsLayout.addWidget(group)
+
+ lab = QtWidgets.QLabel(ctx.id)
+ groupLayout.addWidget(lab)
+
+ self.frameLabel = QtWidgets.QLabel()
+ groupLayout.addWidget(self.frameLabel)
+
+ group = QtWidgets.QGroupBox('Properties')
+ groupLayout = QtWidgets.QVBoxLayout()
+ group.setLayout(groupLayout)
+ controlsLayout.addWidget(group)
+
+ camera = ctx.camera
+
+ for cid, cv in camera.properties.items():
+ lab = QtWidgets.QLabel()
+ lab.setText('{} = {}'.format(cid, cv))
+ groupLayout.addWidget(lab)
+
+ group = QtWidgets.QGroupBox('Controls')
+ groupLayout = QtWidgets.QVBoxLayout()
+ group.setLayout(groupLayout)
+ controlsLayout.addWidget(group)
+
+ for cid, cinfo in camera.controls.items():
+ lab = QtWidgets.QLabel()
+ lab.setText('{} = {}/{}/{}'
+ .format(cid, cinfo.min, cinfo.max, cinfo.default))
+ groupLayout.addWidget(lab)
+
+ controlsLayout.addStretch()
+
+ def buf_to_qpixmap(self, stream, mfb):
+ cfg = stream.configuration
+
+ if cfg.pixel_format == libcam.formats.MJPEG:
+ pix = QtGui.QPixmap(cfg.size.width, cfg.size.height)
+ pix.loadFromData(mfb.planes[0])
+ else:
+ rgb = mfb_to_rgb(mfb, cfg)
+ if rgb is None:
+ raise Exception('Format not supported: ' + cfg.pixel_format)
+
+ pix = rgb_to_pix(rgb)
+
+ return pix
+
+ def handle_request(self, stream, mfb):
+ ctx = self.ctx
+
+ pix = self.buf_to_qpixmap(stream, mfb)
+ self.label.setPixmap(pix)
+
+ self.frameLabel.setText('Queued: {}\nDone: {}\nFps: {:.2f}'
+ .format(ctx.reqs_queued, ctx.reqs_completed, ctx.fps))
diff --git a/src/py/cam/cam_qtgl.py b/src/py/cam/cam_qtgl.py
new file mode 100644
index 00000000..35b4b06b
--- /dev/null
+++ b/src/py/cam/cam_qtgl.py
@@ -0,0 +1,363 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from PyQt6 import QtCore, QtWidgets
+from PyQt6.QtCore import Qt
+
+import math
+import os
+import sys
+
+os.environ['PYOPENGL_PLATFORM'] = 'egl'
+
+from OpenGL.EGL.EXT.image_dma_buf_import import *
+from OpenGL.EGL.KHR.image import *
+from OpenGL.EGL.VERSION.EGL_1_0 import *
+from OpenGL.EGL.VERSION.EGL_1_2 import *
+from OpenGL.EGL.VERSION.EGL_1_3 import *
+
+from OpenGL.GLES2.OES.EGL_image import *
+from OpenGL.GLES2.OES.EGL_image_external import *
+from OpenGL.GLES2.VERSION.GLES2_2_0 import *
+from OpenGL.GLES3.VERSION.GLES3_3_0 import *
+
+from OpenGL.GL import shaders
+
+from gl_helpers import *
+
+
+class EglState:
+ def __init__(self):
+ self.create_display()
+ self.choose_config()
+ self.create_context()
+ self.check_extensions()
+
+ def create_display(self):
+ xdpy = getEGLNativeDisplay()
+ dpy = eglGetDisplay(xdpy)
+ self.display = dpy
+
+ def choose_config(self):
+ dpy = self.display
+
+ major, minor = EGLint(), EGLint()
+
+ b = eglInitialize(dpy, major, minor)
+ assert(b)
+
+ print('EGL {} {}'.format(
+ eglQueryString(dpy, EGL_VENDOR).decode(),
+ eglQueryString(dpy, EGL_VERSION).decode()))
+
+ check_egl_extensions(dpy, ['EGL_EXT_image_dma_buf_import'])
+
+ b = eglBindAPI(EGL_OPENGL_ES_API)
+ assert(b)
+
+ def print_config(dpy, cfg):
+
+ def getconf(a):
+ value = ctypes.c_long()
+ eglGetConfigAttrib(dpy, cfg, a, value)
+ return value.value
+
+ print('EGL Config {}: color buf {}/{}/{}/{} = {}, depth {}, stencil {}, native visualid {}, native visualtype {}'.format(
+ getconf(EGL_CONFIG_ID),
+ getconf(EGL_ALPHA_SIZE),
+ getconf(EGL_RED_SIZE),
+ getconf(EGL_GREEN_SIZE),
+ getconf(EGL_BLUE_SIZE),
+ getconf(EGL_BUFFER_SIZE),
+ getconf(EGL_DEPTH_SIZE),
+ getconf(EGL_STENCIL_SIZE),
+ getconf(EGL_NATIVE_VISUAL_ID),
+ getconf(EGL_NATIVE_VISUAL_TYPE)))
+
+ if False:
+ num_configs = ctypes.c_long()
+ eglGetConfigs(dpy, None, 0, num_configs)
+ print('{} configs'.format(num_configs.value))
+
+ configs = (EGLConfig * num_configs.value)()
+ eglGetConfigs(dpy, configs, num_configs.value, num_configs)
+ for config_id in configs:
+ print_config(dpy, config_id)
+
+ config_attribs = [
+ EGL_SURFACE_TYPE, EGL_WINDOW_BIT,
+ EGL_RED_SIZE, 8,
+ EGL_GREEN_SIZE, 8,
+ EGL_BLUE_SIZE, 8,
+ EGL_ALPHA_SIZE, 0,
+ EGL_RENDERABLE_TYPE, EGL_OPENGL_ES2_BIT,
+ EGL_NONE,
+ ]
+
+ n = EGLint()
+ configs = (EGLConfig * 1)()
+ b = eglChooseConfig(dpy, config_attribs, configs, 1, n)
+ assert(b and n.value == 1)
+ config = configs[0]
+
+ print('Chosen Config:')
+ print_config(dpy, config)
+
+ self.config = config
+
+ def create_context(self):
+ dpy = self.display
+
+ context_attribs = [
+ EGL_CONTEXT_CLIENT_VERSION, 2,
+ EGL_NONE,
+ ]
+
+ context = eglCreateContext(dpy, self.config, EGL_NO_CONTEXT, context_attribs)
+ assert(context)
+
+ b = eglMakeCurrent(dpy, EGL_NO_SURFACE, EGL_NO_SURFACE, context)
+ assert(b)
+
+ self.context = context
+
+ def check_extensions(self):
+ check_gl_extensions(['GL_OES_EGL_image'])
+
+ assert(eglCreateImageKHR)
+ assert(eglDestroyImageKHR)
+ assert(glEGLImageTargetTexture2DOES)
+
+
+class QtRenderer:
+ def __init__(self, state):
+ self.state = state
+
+ def setup(self):
+ self.app = QtWidgets.QApplication([])
+
+ window = MainWindow(self.state)
+ window.show()
+
+ self.window = window
+
+ def run(self):
+ camnotif = QtCore.QSocketNotifier(self.state.cm.event_fd, QtCore.QSocketNotifier.Type.Read)
+ camnotif.activated.connect(lambda _: self.readcam())
+
+ keynotif = QtCore.QSocketNotifier(sys.stdin.fileno(), QtCore.QSocketNotifier.Type.Read)
+ keynotif.activated.connect(lambda _: self.readkey())
+
+ print('Capturing...')
+
+ self.app.exec()
+
+ print('Exiting...')
+
+ def readcam(self):
+ running = self.state.event_handler()
+
+ if not running:
+ self.app.quit()
+
+ def readkey(self):
+ sys.stdin.readline()
+ self.app.quit()
+
+ def request_handler(self, ctx, req):
+ self.window.handle_request(ctx, req)
+
+ def cleanup(self):
+ self.window.close()
+
+
+class MainWindow(QtWidgets.QWidget):
+ def __init__(self, state):
+ super().__init__()
+
+ self.setAttribute(Qt.WidgetAttribute.WA_PaintOnScreen)
+ self.setAttribute(Qt.WidgetAttribute.WA_NativeWindow)
+
+ self.state = state
+
+ self.textures = {}
+ self.reqqueue = {}
+ self.current = {}
+
+ for ctx in self.state.contexts:
+
+ self.reqqueue[ctx.idx] = []
+ self.current[ctx.idx] = []
+
+ for stream in ctx.streams:
+ self.textures[stream] = None
+
+ num_tiles = len(self.textures)
+ self.num_columns = math.ceil(math.sqrt(num_tiles))
+ self.num_rows = math.ceil(num_tiles / self.num_columns)
+
+ self.egl = EglState()
+
+ self.surface = None
+
+ def paintEngine(self):
+ return None
+
+ def create_surface(self):
+ native_surface = c_void_p(self.winId().__int__())
+ surface = eglCreateWindowSurface(self.egl.display, self.egl.config,
+ native_surface, None)
+
+ b = eglMakeCurrent(self.egl.display, self.surface, self.surface, self.egl.context)
+ assert(b)
+
+ self.surface = surface
+
+ def init_gl(self):
+ self.create_surface()
+
+ vertShaderSrc = '''
+ attribute vec2 aPosition;
+ varying vec2 texcoord;
+
+ void main()
+ {
+ gl_Position = vec4(aPosition * 2.0 - 1.0, 0.0, 1.0);
+ texcoord.x = aPosition.x;
+ texcoord.y = 1.0 - aPosition.y;
+ }
+ '''
+ fragShaderSrc = '''
+ #extension GL_OES_EGL_image_external : enable
+ precision mediump float;
+ varying vec2 texcoord;
+ uniform samplerExternalOES texture;
+
+ void main()
+ {
+ gl_FragColor = texture2D(texture, texcoord);
+ }
+ '''
+
+ program = shaders.compileProgram(
+ shaders.compileShader(vertShaderSrc, GL_VERTEX_SHADER),
+ shaders.compileShader(fragShaderSrc, GL_FRAGMENT_SHADER)
+ )
+
+ glUseProgram(program)
+
+ glClearColor(0.5, 0.8, 0.7, 1.0)
+
+ vertPositions = [
+ 0.0, 0.0,
+ 1.0, 0.0,
+ 1.0, 1.0,
+ 0.0, 1.0
+ ]
+
+ inputAttrib = glGetAttribLocation(program, 'aPosition')
+ glVertexAttribPointer(inputAttrib, 2, GL_FLOAT, GL_FALSE, 0, vertPositions)
+ glEnableVertexAttribArray(inputAttrib)
+
+ def create_texture(self, stream, fb):
+ cfg = stream.configuration
+ fmt = cfg.pixel_format.fourcc
+ w = cfg.size.width
+ h = cfg.size.height
+
+ attribs = [
+ EGL_WIDTH, w,
+ EGL_HEIGHT, h,
+ EGL_LINUX_DRM_FOURCC_EXT, fmt,
+ EGL_DMA_BUF_PLANE0_FD_EXT, fb.planes[0].fd,
+ EGL_DMA_BUF_PLANE0_OFFSET_EXT, 0,
+ EGL_DMA_BUF_PLANE0_PITCH_EXT, cfg.stride,
+ EGL_NONE,
+ ]
+
+ image = eglCreateImageKHR(self.egl.display,
+ EGL_NO_CONTEXT,
+ EGL_LINUX_DMA_BUF_EXT,
+ None,
+ attribs)
+ assert(image)
+
+ textures = glGenTextures(1)
+ glBindTexture(GL_TEXTURE_EXTERNAL_OES, textures)
+ glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_MAG_FILTER, GL_LINEAR)
+ glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_MIN_FILTER, GL_LINEAR)
+ glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_WRAP_S, GL_CLAMP_TO_EDGE)
+ glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_WRAP_T, GL_CLAMP_TO_EDGE)
+ glEGLImageTargetTexture2DOES(GL_TEXTURE_EXTERNAL_OES, image)
+
+ return textures
+
+ def resizeEvent(self, event):
+ size = event.size()
+
+ print('Resize', size)
+
+ super().resizeEvent(event)
+
+ if self.surface is None:
+ return
+
+ glViewport(0, 0, size.width() // 2, size.height())
+
+ def paintEvent(self, event):
+ if self.surface is None:
+ self.init_gl()
+
+ for ctx_idx, queue in self.reqqueue.items():
+ if len(queue) == 0:
+ continue
+
+ ctx = next(ctx for ctx in self.state.contexts if ctx.idx == ctx_idx)
+
+ if self.current[ctx_idx]:
+ old = self.current[ctx_idx]
+ self.current[ctx_idx] = None
+ self.state.request_processed(ctx, old)
+
+ next_req = queue.pop(0)
+ self.current[ctx_idx] = next_req
+
+ stream, fb = next(iter(next_req.buffers.items()))
+
+ self.textures[stream] = self.create_texture(stream, fb)
+
+ self.paint_gl()
+
+ def paint_gl(self):
+ b = eglMakeCurrent(self.egl.display, self.surface, self.surface, self.egl.context)
+ assert(b)
+
+ glClear(GL_COLOR_BUFFER_BIT)
+
+ size = self.size()
+
+ for idx, ctx in enumerate(self.state.contexts):
+ for stream in ctx.streams:
+ if self.textures[stream] is None:
+ continue
+
+ w = size.width() // self.num_columns
+ h = size.height() // self.num_rows
+
+ x = idx % self.num_columns
+ y = idx // self.num_columns
+
+ x *= w
+ y *= h
+
+ glViewport(x, y, w, h)
+
+ glBindTexture(GL_TEXTURE_EXTERNAL_OES, self.textures[stream])
+ glDrawArrays(GL_TRIANGLE_FAN, 0, 4)
+
+ b = eglSwapBuffers(self.egl.display, self.surface)
+ assert(b)
+
+ def handle_request(self, ctx, req):
+ self.reqqueue[ctx.idx].append(req)
+ self.update()
diff --git a/src/py/cam/gl_helpers.py b/src/py/cam/gl_helpers.py
new file mode 100644
index 00000000..53b3e9df
--- /dev/null
+++ b/src/py/cam/gl_helpers.py
@@ -0,0 +1,66 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from OpenGL.EGL.VERSION.EGL_1_0 import EGLNativeDisplayType, eglGetProcAddress, eglQueryString, EGL_EXTENSIONS
+
+from OpenGL.raw.GLES2 import _types as _cs
+from OpenGL.GLES2.VERSION.GLES2_2_0 import *
+from OpenGL.GLES3.VERSION.GLES3_3_0 import *
+from OpenGL import GL as gl
+
+from ctypes import c_int, c_char_p, c_void_p, cdll, POINTER, util, \
+ pointer, CFUNCTYPE, c_bool
+
+
+def getEGLNativeDisplay():
+ _x11lib = cdll.LoadLibrary(util.find_library('X11'))
+ XOpenDisplay = _x11lib.XOpenDisplay
+ XOpenDisplay.argtypes = [c_char_p]
+ XOpenDisplay.restype = POINTER(EGLNativeDisplayType)
+
+ return XOpenDisplay(None)
+
+
+# Hack. PyOpenGL doesn't seem to manage to find glEGLImageTargetTexture2DOES.
+def getglEGLImageTargetTexture2DOES():
+ funcptr = eglGetProcAddress('glEGLImageTargetTexture2DOES')
+ prototype = CFUNCTYPE(None, _cs.GLenum, _cs.GLeglImageOES)
+ return prototype(funcptr)
+
+
+glEGLImageTargetTexture2DOES = getglEGLImageTargetTexture2DOES()
+
+
+def get_gl_extensions():
+ n = GLint()
+ glGetIntegerv(GL_NUM_EXTENSIONS, n)
+ gl_extensions = []
+ for i in range(n.value):
+ gl_extensions.append(gl.glGetStringi(GL_EXTENSIONS, i).decode())
+ return gl_extensions
+
+
+def check_gl_extensions(required_extensions):
+ extensions = get_gl_extensions()
+
+ if False:
+ print('GL EXTENSIONS: ', ' '.join(extensions))
+
+ for ext in required_extensions:
+ if ext not in extensions:
+ raise Exception(ext + ' missing')
+
+
+def get_egl_extensions(egl_display):
+ return eglQueryString(egl_display, EGL_EXTENSIONS).decode().split(' ')
+
+
+def check_egl_extensions(egl_display, required_extensions):
+ extensions = get_egl_extensions(egl_display)
+
+ if False:
+ print('EGL EXTENSIONS: ', ' '.join(extensions))
+
+ for ext in required_extensions:
+ if ext not in extensions:
+ raise Exception(ext + ' missing')
diff --git a/src/py/cam/helpers.py b/src/py/cam/helpers.py
new file mode 100644
index 00000000..2d906667
--- /dev/null
+++ b/src/py/cam/helpers.py
@@ -0,0 +1,158 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+#
+# Debayering code from PiCamera documentation
+
+from numpy.lib.stride_tricks import as_strided
+import libcamera as libcam
+import libcamera.utils
+import numpy as np
+
+
+def demosaic(data, r0, g0, g1, b0):
+ # Separate the components from the Bayer data to RGB planes
+
+ rgb = np.zeros(data.shape + (3,), dtype=data.dtype)
+ rgb[r0[1]::2, r0[0]::2, 0] = data[r0[1]::2, r0[0]::2] # Red
+ rgb[g0[1]::2, g0[0]::2, 1] = data[g0[1]::2, g0[0]::2] # Green
+ rgb[g1[1]::2, g1[0]::2, 1] = data[g1[1]::2, g1[0]::2] # Green
+ rgb[b0[1]::2, b0[0]::2, 2] = data[b0[1]::2, b0[0]::2] # Blue
+
+ # Below we present a fairly naive de-mosaic method that simply
+ # calculates the weighted average of a pixel based on the pixels
+ # surrounding it. The weighting is provided by a byte representation of
+ # the Bayer filter which we construct first:
+
+ bayer = np.zeros(rgb.shape, dtype=np.uint8)
+ bayer[r0[1]::2, r0[0]::2, 0] = 1 # Red
+ bayer[g0[1]::2, g0[0]::2, 1] = 1 # Green
+ bayer[g1[1]::2, g1[0]::2, 1] = 1 # Green
+ bayer[b0[1]::2, b0[0]::2, 2] = 1 # Blue
+
+ # Allocate an array to hold our output with the same shape as the input
+ # data. After this we define the size of window that will be used to
+ # calculate each weighted average (3x3). Then we pad out the rgb and
+ # bayer arrays, adding blank pixels at their edges to compensate for the
+ # size of the window when calculating averages for edge pixels.
+
+ output = np.empty(rgb.shape, dtype=rgb.dtype)
+ window = (3, 3)
+ borders = (window[0] - 1, window[1] - 1)
+ border = (borders[0] // 2, borders[1] // 2)
+
+ rgb = np.pad(rgb, [
+ (border[0], border[0]),
+ (border[1], border[1]),
+ (0, 0),
+ ], 'constant')
+ bayer = np.pad(bayer, [
+ (border[0], border[0]),
+ (border[1], border[1]),
+ (0, 0),
+ ], 'constant')
+
+ # For each plane in the RGB data, we use a nifty numpy trick
+ # (as_strided) to construct a view over the plane of 3x3 matrices. We do
+ # the same for the bayer array, then use Einstein summation on each
+ # (np.sum is simpler, but copies the data so it's slower), and divide
+ # the results to get our weighted average:
+
+ for plane in range(3):
+ p = rgb[..., plane]
+ b = bayer[..., plane]
+ pview = as_strided(p, shape=(
+ p.shape[0] - borders[0],
+ p.shape[1] - borders[1]) + window, strides=p.strides * 2)
+ bview = as_strided(b, shape=(
+ b.shape[0] - borders[0],
+ b.shape[1] - borders[1]) + window, strides=b.strides * 2)
+ psum = np.einsum('ijkl->ij', pview)
+ bsum = np.einsum('ijkl->ij', bview)
+ output[..., plane] = psum // bsum
+
+ return output
+
+
+def to_rgb(fmt, size, data):
+ w = size.width
+ h = size.height
+
+ if fmt == libcam.formats.YUYV:
+ # YUV422
+ yuyv = data.reshape((h, w // 2 * 4))
+
+ # YUV444
+ yuv = np.empty((h, w, 3), dtype=np.uint8)
+ yuv[:, :, 0] = yuyv[:, 0::2] # Y
+ yuv[:, :, 1] = yuyv[:, 1::4].repeat(2, axis=1) # U
+ yuv[:, :, 2] = yuyv[:, 3::4].repeat(2, axis=1) # V
+
+ m = np.array([
+ [1.0, 1.0, 1.0],
+ [-0.000007154783816076815, -0.3441331386566162, 1.7720025777816772],
+ [1.4019975662231445, -0.7141380310058594, 0.00001542569043522235]
+ ])
+
+ rgb = np.dot(yuv, m)
+ rgb[:, :, 0] -= 179.45477266423404
+ rgb[:, :, 1] += 135.45870971679688
+ rgb[:, :, 2] -= 226.8183044444304
+ rgb = rgb.astype(np.uint8)
+
+ elif fmt == libcam.formats.RGB888:
+ rgb = data.reshape((h, w, 3))
+ rgb[:, :, [0, 1, 2]] = rgb[:, :, [2, 1, 0]]
+
+ elif fmt == libcam.formats.BGR888:
+ rgb = data.reshape((h, w, 3))
+
+ elif fmt in [libcam.formats.ARGB8888, libcam.formats.XRGB8888]:
+ rgb = data.reshape((h, w, 4))
+ rgb = np.flip(rgb, axis=2)
+ # drop alpha component
+ rgb = np.delete(rgb, np.s_[0::4], axis=2)
+
+ elif str(fmt).startswith('S'):
+ fmt = str(fmt)
+ bayer_pattern = fmt[1:5]
+ bitspp = int(fmt[5:])
+
+ if bitspp == 8:
+ data = data.reshape((h, w))
+ data = data.astype(np.uint16)
+ elif bitspp in [10, 12]:
+ data = data.view(np.uint16)
+ data = data.reshape((h, w))
+ else:
+ raise Exception('Bad bitspp:' + str(bitspp))
+
+ idx = bayer_pattern.find('R')
+ assert(idx != -1)
+ r0 = (idx % 2, idx // 2)
+
+ idx = bayer_pattern.find('G')
+ assert(idx != -1)
+ g0 = (idx % 2, idx // 2)
+
+ idx = bayer_pattern.find('G', idx + 1)
+ assert(idx != -1)
+ g1 = (idx % 2, idx // 2)
+
+ idx = bayer_pattern.find('B')
+ assert(idx != -1)
+ b0 = (idx % 2, idx // 2)
+
+ rgb = demosaic(data, r0, g0, g1, b0)
+ rgb = (rgb >> (bitspp - 8)).astype(np.uint8)
+
+ else:
+ rgb = None
+
+ return rgb
+
+
+# A naive format conversion to 24-bit RGB
+def mfb_to_rgb(mfb: libcamera.utils.MappedFrameBuffer, cfg: libcam.StreamConfiguration):
+ data = np.array(mfb.planes[0], dtype=np.uint8)
+ rgb = to_rgb(cfg.pixel_format, cfg.size, data)
+ return rgb
diff --git a/src/py/examples/simple-cam.py b/src/py/examples/simple-cam.py
new file mode 100755
index 00000000..1cd1019d
--- /dev/null
+++ b/src/py/examples/simple-cam.py
@@ -0,0 +1,340 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+# A simple libcamera capture example
+#
+# This is a python version of simple-cam from:
+# https://git.libcamera.org/libcamera/simple-cam.git
+#
+# \todo Move to simple-cam repository when the Python API has stabilized more
+
+import libcamera as libcam
+import selectors
+import sys
+import time
+
+TIMEOUT_SEC = 3
+
+
+def handle_camera_event(cm):
+ # cm.get_ready_requests() returns the ready requests, which in our case
+ # should almost always return a single Request, but in some cases there
+ # could be multiple or none.
+
+ reqs = cm.get_ready_requests()
+
+ # Process the captured frames
+
+ for req in reqs:
+ process_request(req)
+
+
+def process_request(request):
+ global camera
+
+ print()
+
+ print(f'Request completed: {request}')
+
+ # When a request has completed, it is populated with a metadata control
+ # list that allows an application to determine various properties of
+ # the completed request. This can include the timestamp of the Sensor
+ # capture, or its gain and exposure values, or properties from the IPA
+ # such as the state of the 3A algorithms.
+ #
+ # To examine each request, print all the metadata for inspection. A custom
+ # application can parse each of these items and process them according to
+ # its needs.
+
+ requestMetadata = request.metadata
+ for id, value in requestMetadata.items():
+ print(f'\t{id.name} = {value}')
+
+ # Each buffer has its own FrameMetadata to describe its state, or the
+ # usage of each buffer. While in our simple capture we only provide one
+ # buffer per request, a request can have a buffer for each stream that
+ # is established when configuring the camera.
+ #
+ # This allows a viewfinder and a still image to be processed at the
+ # same time, or to allow obtaining the RAW capture buffer from the
+ # sensor along with the image as processed by the ISP.
+
+ buffers = request.buffers
+ for _, buffer in buffers.items():
+ metadata = buffer.metadata
+
+ # Print some information about the buffer which has completed.
+ print(f' seq: {metadata.sequence:06} timestamp: {metadata.timestamp} bytesused: ' +
+ '/'.join([str(p.bytes_used) for p in metadata.planes]))
+
+ # Image data can be accessed here, but the FrameBuffer
+ # must be mapped by the application
+
+ # Re-queue the Request to the camera.
+ request.reuse()
+ camera.queue_request(request)
+
+
+# ----------------------------------------------------------------------------
+# Camera Naming.
+#
+# Applications are responsible for deciding how to name cameras, and present
+# that information to the users. Every camera has a unique identifier, though
+# this string is not designed to be friendly for a human reader.
+#
+# To support human consumable names, libcamera provides camera properties
+# that allow an application to determine a naming scheme based on its needs.
+#
+# In this example, we focus on the location property, but also detail the
+# model string for external cameras, as this is more likely to be visible
+# information to the user of an externally connected device.
+#
+# The unique camera ID is appended for informative purposes.
+#
+def camera_name(camera):
+ props = camera.properties
+ location = props.get(libcam.properties.Location, None)
+
+ if location == libcam.properties.LocationEnum.Front:
+ name = 'Internal front camera'
+ elif location == libcam.properties.LocationEnum.Back:
+ name = 'Internal back camera'
+ elif location == libcam.properties.LocationEnum.External:
+ name = 'External camera'
+ if libcam.properties.Model in props:
+ name += f' "{props[libcam.properties.Model]}"'
+ else:
+ name = 'Undefined location'
+
+ name += f' ({camera.id})'
+
+ return name
+
+
+def main():
+ global camera
+
+ # --------------------------------------------------------------------
+ # Get the Camera Manager.
+ #
+ # The Camera Manager is responsible for enumerating all the Camera
+ # in the system, by associating Pipeline Handlers with media entities
+ # registered in the system.
+ #
+ # The CameraManager provides a list of available Cameras that
+ # applications can operate on.
+ #
+ # There can only be a single CameraManager within any process space.
+
+ cm = libcam.CameraManager.singleton()
+
+ # Just as a test, generate names of the Cameras registered in the
+ # system, and list them.
+
+ for camera in cm.cameras:
+ print(f' - {camera_name(camera)}')
+
+ # --------------------------------------------------------------------
+ # Camera
+ #
+ # Camera are entities created by pipeline handlers, inspecting the
+ # entities registered in the system and reported to applications
+ # by the CameraManager.
+ #
+ # In general terms, a Camera corresponds to a single image source
+ # available in the system, such as an image sensor.
+ #
+ # Application lock usage of Camera by 'acquiring' them.
+ # Once done with it, application shall similarly 'release' the Camera.
+ #
+ # As an example, use the first available camera in the system after
+ # making sure that at least one camera is available.
+ #
+ # Cameras can be obtained by their ID or their index, to demonstrate
+ # this, the following code gets the ID of the first camera; then gets
+ # the camera associated with that ID (which is of course the same as
+ # cm.cameras[0]).
+
+ if not cm.cameras:
+ print('No cameras were identified on the system.')
+ return -1
+
+ camera_id = cm.cameras[0].id
+ camera = cm.get(camera_id)
+ camera.acquire()
+
+ # --------------------------------------------------------------------
+ # Stream
+ #
+ # Each Camera supports a variable number of Stream. A Stream is
+ # produced by processing data produced by an image source, usually
+ # by an ISP.
+ #
+ # +-------------------------------------------------------+
+ # | Camera |
+ # | +-----------+ |
+ # | +--------+ | |------> [ Main output ] |
+ # | | Image | | | |
+ # | | |---->| ISP |------> [ Viewfinder ] |
+ # | | Source | | | |
+ # | +--------+ | |------> [ Still Capture ] |
+ # | +-----------+ |
+ # +-------------------------------------------------------+
+ #
+ # The number and capabilities of the Stream in a Camera are
+ # a platform dependent property, and it's the pipeline handler
+ # implementation that has the responsibility of correctly
+ # report them.
+
+ # --------------------------------------------------------------------
+ # Camera Configuration.
+ #
+ # Camera configuration is tricky! It boils down to assign resources
+ # of the system (such as DMA engines, scalers, format converters) to
+ # the different image streams an application has requested.
+ #
+ # Depending on the system characteristics, some combinations of
+ # sizes, formats and stream usages might or might not be possible.
+ #
+ # A Camera produces a CameraConfigration based on a set of intended
+ # roles for each Stream the application requires.
+
+ config = camera.generate_configuration([libcam.StreamRole.Viewfinder])
+
+ # The CameraConfiguration contains a StreamConfiguration instance
+ # for each StreamRole requested by the application, provided
+ # the Camera can support all of them.
+ #
+ # Each StreamConfiguration has default size and format, assigned
+ # by the Camera depending on the Role the application has requested.
+
+ stream_config = config.at(0)
+ print(f'Default viewfinder configuration is: {stream_config}')
+
+ # Each StreamConfiguration parameter which is part of a
+ # CameraConfiguration can be independently modified by the
+ # application.
+ #
+ # In order to validate the modified parameter, the CameraConfiguration
+ # should be validated -before- the CameraConfiguration gets applied
+ # to the Camera.
+ #
+ # The CameraConfiguration validation process adjusts each
+ # StreamConfiguration to a valid value.
+
+ # Validating a CameraConfiguration -before- applying it will adjust it
+ # to a valid configuration which is as close as possible to the one
+ # requested.
+
+ config.validate()
+ print(f'Validated viewfinder configuration is: {stream_config}')
+
+ # Once we have a validated configuration, we can apply it to the
+ # Camera.
+
+ camera.configure(config)
+
+ # --------------------------------------------------------------------
+ # Buffer Allocation
+ #
+ # Now that a camera has been configured, it knows all about its
+ # Streams sizes and formats. The captured images need to be stored in
+ # framebuffers which can either be provided by the application to the
+ # library, or allocated in the Camera and exposed to the application
+ # by libcamera.
+ #
+ # An application may decide to allocate framebuffers from elsewhere,
+ # for example in memory allocated by the display driver that will
+ # render the captured frames. The application will provide them to
+ # libcamera by constructing FrameBuffer instances to capture images
+ # directly into.
+ #
+ # Alternatively libcamera can help the application by exporting
+ # buffers allocated in the Camera using a FrameBufferAllocator
+ # instance and referencing a configured Camera to determine the
+ # appropriate buffer size and types to create.
+
+ allocator = libcam.FrameBufferAllocator(camera)
+
+ for cfg in config:
+ allocated = allocator.allocate(cfg.stream)
+ print(f'Allocated {allocated} buffers for stream')
+
+ # --------------------------------------------------------------------
+ # Frame Capture
+ #
+ # libcamera frames capture model is based on the 'Request' concept.
+ # For each frame a Request has to be queued to the Camera.
+ #
+ # A Request refers to (at least one) Stream for which a Buffer that
+ # will be filled with image data shall be added to the Request.
+ #
+ # A Request is associated with a list of Controls, which are tunable
+ # parameters (similar to v4l2_controls) that have to be applied to
+ # the image.
+ #
+ # Once a request completes, all its buffers will contain image data
+ # that applications can access and for each of them a list of metadata
+ # properties that reports the capture parameters applied to the image.
+
+ stream = stream_config.stream
+ buffers = allocator.buffers(stream)
+ requests = []
+ for i in range(len(buffers)):
+ request = camera.create_request()
+
+ buffer = buffers[i]
+ request.add_buffer(stream, buffer)
+
+ # Controls can be added to a request on a per frame basis.
+ request.set_control(libcam.controls.Brightness, 0.5)
+
+ requests.append(request)
+
+ # --------------------------------------------------------------------
+ # Start Capture
+ #
+ # In order to capture frames the Camera has to be started and
+ # Request queued to it. Enough Request to fill the Camera pipeline
+ # depth have to be queued before the Camera start delivering frames.
+ #
+ # When a Request has been completed, it will be added to a list in the
+ # CameraManager and an event will be raised using eventfd.
+ #
+ # The list of completed Requests can be retrieved with
+ # CameraManager.get_ready_requests(), which will also clear the list in the
+ # CameraManager.
+ #
+ # The eventfd can be retrieved from CameraManager.event_fd, and the fd can
+ # be waited upon using e.g. Python's selectors.
+
+ camera.start()
+ for request in requests:
+ camera.queue_request(request)
+
+ sel = selectors.DefaultSelector()
+ sel.register(cm.event_fd, selectors.EVENT_READ, lambda fd: handle_camera_event(cm))
+
+ start_time = time.time()
+
+ while time.time() - start_time < TIMEOUT_SEC:
+ events = sel.select()
+ for key, mask in events:
+ key.data(key.fileobj)
+
+ # --------------------------------------------------------------------
+ # Clean Up
+ #
+ # Stop the Camera, release resources and stop the CameraManager.
+ # libcamera has now released all resources it owned.
+
+ camera.stop()
+ camera.release()
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/src/py/examples/simple-capture.py b/src/py/examples/simple-capture.py
new file mode 100755
index 00000000..4b85408f
--- /dev/null
+++ b/src/py/examples/simple-capture.py
@@ -0,0 +1,163 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+# A simple capture example showing:
+# - How to setup the camera
+# - Capture certain number of frames in a blocking manner
+# - How to stop the camera
+#
+# This simple example is, in many ways, too simple. The purpose of the example
+# is to introduce the concepts. A more realistic example is given in
+# simple-continuous-capture.py.
+
+import argparse
+import libcamera as libcam
+import selectors
+import sys
+
+# Number of frames to capture
+TOTAL_FRAMES = 30
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-c', '--camera', type=str, default='1',
+ help='Camera index number (starting from 1) or part of the name')
+ parser.add_argument('-f', '--format', type=str, help='Pixel format')
+ parser.add_argument('-s', '--size', type=str, help='Size ("WxH")')
+ args = parser.parse_args()
+
+ cm = libcam.CameraManager.singleton()
+
+ try:
+ if args.camera.isnumeric():
+ cam_idx = int(args.camera)
+ cam = next((cam for i, cam in enumerate(cm.cameras) if i + 1 == cam_idx))
+ else:
+ cam = next((cam for cam in cm.cameras if args.camera in cam.id))
+ except Exception:
+ print(f'Failed to find camera "{args.camera}"')
+ return -1
+
+ # Acquire the camera for our use
+
+ cam.acquire()
+
+ # Configure the camera
+
+ cam_config = cam.generate_configuration([libcam.StreamRole.Viewfinder])
+
+ stream_config = cam_config.at(0)
+
+ if args.format:
+ fmt = libcam.PixelFormat(args.format)
+ stream_config.pixel_format = fmt
+
+ if args.size:
+ w, h = [int(v) for v in args.size.split('x')]
+ stream_config.size = libcam.Size(w, h)
+
+ cam.configure(cam_config)
+
+ print(f'Capturing {TOTAL_FRAMES} frames with {stream_config}')
+
+ stream = stream_config.stream
+
+ # Allocate the buffers for capture
+
+ allocator = libcam.FrameBufferAllocator(cam)
+ ret = allocator.allocate(stream)
+ assert ret > 0
+
+ num_bufs = len(allocator.buffers(stream))
+
+ # Create the requests and assign a buffer for each request
+
+ reqs = []
+ for i in range(num_bufs):
+ # Use the buffer index as the cookie
+ req = cam.create_request(i)
+
+ buffer = allocator.buffers(stream)[i]
+ req.add_buffer(stream, buffer)
+
+ reqs.append(req)
+
+ # Start the camera
+
+ cam.start()
+
+ # frames_queued and frames_done track the number of frames queued and done
+
+ frames_queued = 0
+ frames_done = 0
+
+ # Queue the requests to the camera
+
+ for req in reqs:
+ cam.queue_request(req)
+ frames_queued += 1
+
+ # The main loop. Wait for the queued Requests to complete, process them,
+ # and re-queue them again.
+
+ sel = selectors.DefaultSelector()
+ sel.register(cm.event_fd, selectors.EVENT_READ)
+
+ while frames_done < TOTAL_FRAMES:
+ # cm.get_ready_requests() does not block, so we use a Selector to wait
+ # for a camera event. Here we should almost always get a single
+ # Request, but in some cases there could be multiple or none.
+
+ events = sel.select()
+ if not events:
+ continue
+
+ reqs = cm.get_ready_requests()
+
+ for req in reqs:
+ frames_done += 1
+
+ buffers = req.buffers
+
+ # A ready Request could contain multiple buffers if multiple streams
+ # were being used. Here we know we only have a single stream,
+ # and we use next(iter()) to get the first and only buffer.
+
+ assert len(buffers) == 1
+
+ stream, fb = next(iter(buffers.items()))
+
+ # Here we could process the received buffer. In this example we only
+ # print a few details below.
+
+ meta = fb.metadata
+
+ print("seq {:3}, bytes {}, frames queued/done {:3}/{:<3}"
+ .format(meta.sequence,
+ '/'.join([str(p.bytes_used) for p in meta.planes]),
+ frames_queued, frames_done))
+
+ # If we want to capture more frames we need to queue more Requests.
+ # We could create a totally new Request, but it is more efficient
+ # to reuse the existing one that we just received.
+ if frames_queued < TOTAL_FRAMES:
+ req.reuse()
+ cam.queue_request(req)
+ frames_queued += 1
+
+ # Stop the camera
+
+ cam.stop()
+
+ # Release the camera
+
+ cam.release()
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/src/py/examples/simple-continuous-capture.py b/src/py/examples/simple-continuous-capture.py
new file mode 100755
index 00000000..e1cb931e
--- /dev/null
+++ b/src/py/examples/simple-continuous-capture.py
@@ -0,0 +1,185 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+# A simple capture example extending the simple-capture.py example:
+# - Capture frames using events from multiple cameras
+# - Listening events from stdin to exit the application
+# - Memory mapping the frames and calculating CRC
+
+import binascii
+import libcamera as libcam
+import libcamera.utils
+import selectors
+import sys
+
+
+# A container class for our state per camera
+class CameraCaptureContext:
+ idx: int
+ cam: libcam.Camera
+ reqs: list[libcam.Request]
+ mfbs: dict[libcam.FrameBuffer, libcamera.utils.MappedFrameBuffer]
+
+ def __init__(self, cam, idx):
+ self.idx = idx
+ self.cam = cam
+
+ # Acquire the camera for our use
+
+ cam.acquire()
+
+ # Configure the camera
+
+ cam_config = cam.generate_configuration([libcam.StreamRole.Viewfinder])
+
+ stream_config = cam_config.at(0)
+
+ cam.configure(cam_config)
+
+ stream = stream_config.stream
+
+ # Allocate the buffers for capture
+
+ allocator = libcam.FrameBufferAllocator(cam)
+ ret = allocator.allocate(stream)
+ assert ret > 0
+
+ num_bufs = len(allocator.buffers(stream))
+
+ print(f'cam{idx} ({cam.id}): capturing {num_bufs} buffers with {stream_config}')
+
+ # Create the requests and assign a buffer for each request
+
+ self.reqs = []
+ self.mfbs = {}
+
+ for i in range(num_bufs):
+ # Use the buffer index as the "cookie"
+ req = cam.create_request(idx)
+
+ buffer = allocator.buffers(stream)[i]
+ req.add_buffer(stream, buffer)
+
+ self.reqs.append(req)
+
+ # Save a mmapped buffer so we can calculate the CRC later
+ self.mfbs[buffer] = libcamera.utils.MappedFrameBuffer(buffer).mmap()
+
+ def uninit_camera(self):
+ # Stop the camera
+
+ self.cam.stop()
+
+ # Release the camera
+
+ self.cam.release()
+
+
+# A container class for our state
+class CaptureContext:
+ cm: libcam.CameraManager
+ camera_contexts: list[CameraCaptureContext] = []
+
+ def handle_camera_event(self):
+ # cm.get_ready_requests() returns the ready requests, which in our case
+ # should almost always return a single Request, but in some cases there
+ # could be multiple or none.
+
+ reqs = self.cm.get_ready_requests()
+
+ # Process the captured frames
+
+ for req in reqs:
+ self.handle_request(req)
+
+ return True
+
+ def handle_request(self, req: libcam.Request):
+ cam_ctx = self.camera_contexts[req.cookie]
+
+ buffers = req.buffers
+
+ assert len(buffers) == 1
+
+ # A ready Request could contain multiple buffers if multiple streams
+ # were being used. Here we know we only have a single stream,
+ # and we use next(iter()) to get the first and only buffer.
+
+ stream, fb = next(iter(buffers.items()))
+
+ # Use the MappedFrameBuffer to access the pixel data with CPU. We calculate
+ # the crc for each plane.
+
+ mfb = cam_ctx.mfbs[fb]
+ crcs = [binascii.crc32(p) for p in mfb.planes]
+
+ meta = fb.metadata
+
+ print('cam{:<6} seq {:<6} bytes {:10} CRCs {}'
+ .format(cam_ctx.idx,
+ meta.sequence,
+ '/'.join([str(p.bytes_used) for p in meta.planes]),
+ crcs))
+
+ # We want to re-queue the buffer we just handled. Instead of creating
+ # a new Request, we re-use the old one. We need to call req.reuse()
+ # to re-initialize the Request before queuing.
+
+ req.reuse()
+ cam_ctx.cam.queue_request(req)
+
+ def handle_key_event(self):
+ sys.stdin.readline()
+ print('Exiting...')
+ return False
+
+ def capture(self):
+ # Queue the requests to the camera
+
+ for cam_ctx in self.camera_contexts:
+ for req in cam_ctx.reqs:
+ cam_ctx.cam.queue_request(req)
+
+ # Use Selector to wait for events from the camera and from the keyboard
+
+ sel = selectors.DefaultSelector()
+ sel.register(sys.stdin, selectors.EVENT_READ, self.handle_key_event)
+ sel.register(self.cm.event_fd, selectors.EVENT_READ, lambda: self.handle_camera_event())
+
+ running = True
+
+ while running:
+ events = sel.select()
+ for key, mask in events:
+ # If the handler return False, we should exit
+ if not key.data():
+ running = False
+
+
+def main():
+ cm = libcam.CameraManager.singleton()
+
+ ctx = CaptureContext()
+ ctx.cm = cm
+
+ for idx, cam in enumerate(cm.cameras):
+ cam_ctx = CameraCaptureContext(cam, idx)
+ ctx.camera_contexts.append(cam_ctx)
+
+ # Start the cameras
+
+ for cam_ctx in ctx.camera_contexts:
+ cam_ctx.cam.start()
+
+ ctx.capture()
+
+ for cam_ctx in ctx.camera_contexts:
+ cam_ctx.uninit_camera()
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/src/py/libcamera/__init__.py b/src/py/libcamera/__init__.py
new file mode 100644
index 00000000..e234a5e4
--- /dev/null
+++ b/src/py/libcamera/__init__.py
@@ -0,0 +1,4 @@
+# SPDX-License-Identifier: LGPL-2.1-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from ._libcamera import *
diff --git a/src/py/libcamera/gen-py-controls.py b/src/py/libcamera/gen-py-controls.py
new file mode 100755
index 00000000..d43a7c1c
--- /dev/null
+++ b/src/py/libcamera/gen-py-controls.py
@@ -0,0 +1,111 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Generate Python bindings controls from YAML
+
+import argparse
+import jinja2
+import sys
+import yaml
+
+from controls import Control
+
+
+def find_common_prefix(strings):
+ prefix = strings[0]
+
+ for string in strings[1:]:
+ while string[:len(prefix)] != prefix and prefix:
+ prefix = prefix[:len(prefix) - 1]
+ if not prefix:
+ break
+
+ return prefix
+
+
+def extend_control(ctrl, mode):
+ if ctrl.vendor != 'libcamera':
+ ctrl.klass = ctrl.vendor
+ ctrl.namespace = f'{ctrl.vendor}::'
+ else:
+ ctrl.klass = mode
+ ctrl.namespace = ''
+
+ if not ctrl.is_enum:
+ return ctrl
+
+ if mode == 'controls':
+ # Adjustments for controls
+ if ctrl.name == 'LensShadingMapMode':
+ prefix = 'LensShadingMapMode'
+ else:
+ prefix = find_common_prefix([e.name for e in ctrl.enum_values])
+ else:
+ # Adjustments for properties
+ prefix = find_common_prefix([e.name for e in ctrl.enum_values])
+
+ for enum in ctrl.enum_values:
+ enum.py_name = enum.name[len(prefix):]
+
+ return ctrl
+
+
+def main(argv):
+ headers = {
+ 'controls': 'control_ids.h',
+ 'properties': 'property_ids.h',
+ }
+
+ # Parse command line arguments
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--mode', '-m', type=str, required=True,
+ help='Mode is either "controls" or "properties"')
+ parser.add_argument('--output', '-o', metavar='file', type=str,
+ help='Output file name. Defaults to standard output if not specified.')
+ parser.add_argument('--template', '-t', type=str, required=True,
+ help='Template file name.')
+ parser.add_argument('input', type=str, nargs='+',
+ help='Input file name.')
+ args = parser.parse_args(argv[1:])
+
+ if not headers.get(args.mode):
+ print(f'Invalid mode option "{args.mode}"', file=sys.stderr)
+ return -1
+
+ controls = []
+ vendors = []
+
+ for input in args.input:
+ data = yaml.safe_load(open(input, 'rb').read())
+
+ vendor = data['vendor']
+ if vendor != 'libcamera':
+ vendors.append(vendor)
+
+ for ctrl in data['controls']:
+ ctrl = Control(*ctrl.popitem(), vendor, args.mode)
+ controls.append(extend_control(ctrl, args.mode))
+
+ data = {
+ 'mode': args.mode,
+ 'header': headers[args.mode],
+ 'vendors': vendors,
+ 'controls': controls,
+ }
+
+ env = jinja2.Environment()
+ template = env.from_string(open(args.template, 'r', encoding='utf-8').read())
+ string = template.render(data)
+
+ if args.output:
+ output = open(args.output, 'w', encoding='utf-8')
+ output.write(string)
+ output.close()
+ else:
+ sys.stdout.write(string)
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main(sys.argv))
diff --git a/src/py/libcamera/gen-py-formats.py b/src/py/libcamera/gen-py-formats.py
new file mode 100755
index 00000000..0ff1d12a
--- /dev/null
+++ b/src/py/libcamera/gen-py-formats.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Generate Python format definitions from YAML
+
+import argparse
+import string
+import sys
+import yaml
+
+
+def generate(formats):
+ fmts = []
+
+ for format in formats:
+ name, format = format.popitem()
+ fmts.append(f'\t\t.def_readonly_static("{name}", &libcamera::formats::{name})')
+
+ return {'formats': '\n'.join(fmts)}
+
+
+def fill_template(template, data):
+ with open(template, encoding='utf-8') as f:
+ template = f.read()
+
+ template = string.Template(template)
+ return template.substitute(data)
+
+
+def main(argv):
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-o', dest='output', metavar='file', type=str,
+ help='Output file name. Defaults to standard output if not specified.')
+ parser.add_argument('input', type=str,
+ help='Input file name.')
+ parser.add_argument('template', type=str,
+ help='Template file name.')
+ args = parser.parse_args(argv[1:])
+
+ with open(args.input, encoding='utf-8') as f:
+ formats = yaml.safe_load(f)['formats']
+
+ data = generate(formats)
+ data = fill_template(args.template, data)
+
+ if args.output:
+ with open(args.output, 'w', encoding='utf-8') as f:
+ f.write(data)
+ else:
+ sys.stdout.write(data)
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main(sys.argv))
diff --git a/src/py/libcamera/meson.build b/src/py/libcamera/meson.build
new file mode 100644
index 00000000..596a203c
--- /dev/null
+++ b/src/py/libcamera/meson.build
@@ -0,0 +1,102 @@
+# SPDX-License-Identifier: CC0-1.0
+
+py3_dep = dependency('python3', required : get_option('pycamera'))
+
+if not py3_dep.found()
+ pycamera_enabled = false
+ subdir_done()
+endif
+
+pybind11_dep = dependency('pybind11', required : get_option('pycamera'))
+
+if not pybind11_dep.found()
+ pycamera_enabled = false
+ subdir_done()
+endif
+
+pycamera_enabled = true
+
+pycamera_sources = files([
+ 'py_camera_manager.cpp',
+ 'py_color_space.cpp',
+ 'py_enums.cpp',
+ 'py_geometry.cpp',
+ 'py_helpers.cpp',
+ 'py_main.cpp',
+ 'py_transform.cpp',
+])
+
+# Generate controls and properties
+
+gen_py_controls_template = files('py_controls_generated.cpp.in')
+gen_py_controls = files('gen-py-controls.py')
+
+pycamera_sources += custom_target('py_gen_controls',
+ input : controls_files,
+ output : ['py_controls_generated.cpp'],
+ command : [gen_py_controls, '--mode', 'controls', '-o', '@OUTPUT@',
+ '-t', gen_py_controls_template, '@INPUT@'],
+ env : py_build_env)
+
+pycamera_sources += custom_target('py_gen_properties',
+ input : properties_files,
+ output : ['py_properties_generated.cpp'],
+ command : [gen_py_controls, '--mode', 'properties', '-o', '@OUTPUT@',
+ '-t', gen_py_controls_template, '@INPUT@'],
+ env : py_build_env)
+
+# Generate formats
+
+gen_py_formats_input_files = files([
+ '../../libcamera/formats.yaml',
+ 'py_formats_generated.cpp.in',
+])
+
+gen_py_formats = files('gen-py-formats.py')
+
+pycamera_sources += custom_target('py_gen_formats',
+ input : gen_py_formats_input_files,
+ output : ['py_formats_generated.cpp'],
+ command : [gen_py_formats, '-o', '@OUTPUT@', '@INPUT@'])
+
+pycamera_deps = [
+ libcamera_private,
+ py3_dep,
+ pybind11_dep,
+]
+
+pycamera_args = [
+ '-fvisibility=hidden',
+ '-Wno-shadow',
+ '-DPYBIND11_USE_SMART_HOLDER_AS_DEFAULT',
+]
+
+destdir = get_option('libdir') / ('python' + py3_dep.version()) / 'site-packages' / 'libcamera'
+
+pycamera = shared_module('_libcamera',
+ pycamera_sources,
+ install : true,
+ install_dir : destdir,
+ install_tag : 'python-runtime',
+ name_prefix : '',
+ dependencies : pycamera_deps,
+ cpp_args : pycamera_args)
+
+# Create symlinks from the build dir to the source dir so that we can use the
+# Python module directly from the build dir.
+
+run_command('ln', '-fsrT', files('__init__.py'),
+ meson.current_build_dir() / '__init__.py',
+ check : true)
+
+run_command('ln', '-fsrT', meson.current_source_dir() / 'utils',
+ meson.current_build_dir() / 'utils',
+ check : true)
+
+install_data(['__init__.py'],
+ install_dir : destdir,
+ install_tag : 'python-runtime')
+
+# \todo Generate stubs when building. See https://peps.python.org/pep-0484/#stub-files
+# Note: Depends on pybind11-stubgen. To generate pylibcamera stubs:
+# $ PYTHONPATH=build/src/py pybind11-stubgen --no-setup-py -o build/src/py libcamera
diff --git a/src/py/libcamera/py_camera_manager.cpp b/src/py/libcamera/py_camera_manager.cpp
new file mode 100644
index 00000000..9ccb7aad
--- /dev/null
+++ b/src/py/libcamera/py_camera_manager.cpp
@@ -0,0 +1,131 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ */
+
+#include "py_camera_manager.h"
+
+#include <errno.h>
+#include <memory>
+#include <sys/eventfd.h>
+#include <system_error>
+#include <unistd.h>
+#include <vector>
+
+#include "py_main.h"
+
+namespace py = pybind11;
+
+using namespace libcamera;
+
+PyCameraManager::PyCameraManager()
+{
+ LOG(Python, Debug) << "PyCameraManager()";
+
+ cameraManager_ = std::make_unique<CameraManager>();
+
+ int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+ if (fd == -1)
+ throw std::system_error(errno, std::generic_category(),
+ "Failed to create eventfd");
+
+ eventFd_ = UniqueFD(fd);
+
+ int ret = cameraManager_->start();
+ if (ret)
+ throw std::system_error(-ret, std::generic_category(),
+ "Failed to start CameraManager");
+}
+
+PyCameraManager::~PyCameraManager()
+{
+ LOG(Python, Debug) << "~PyCameraManager()";
+}
+
+py::list PyCameraManager::cameras()
+{
+ /*
+ * Create a list of Cameras, where each camera has a keep-alive to
+ * CameraManager.
+ */
+ py::list l;
+
+ for (auto &camera : cameraManager_->cameras()) {
+ py::object py_cm = py::cast(this);
+ py::object py_cam = py::cast(camera);
+ py::detail::keep_alive_impl(py_cam, py_cm);
+ l.append(py_cam);
+ }
+
+ return l;
+}
+
+std::vector<py::object> PyCameraManager::getReadyRequests()
+{
+ int ret = readFd();
+
+ if (ret == -EAGAIN)
+ return std::vector<py::object>();
+
+ if (ret != 0)
+ throw std::system_error(-ret, std::generic_category());
+
+ std::vector<py::object> py_reqs;
+
+ for (Request *request : getCompletedRequests()) {
+ py::object o = py::cast(request);
+ /* Decrease the ref increased in Camera.queue_request() */
+ o.dec_ref();
+ py_reqs.push_back(o);
+ }
+
+ return py_reqs;
+}
+
+/* Note: Called from another thread */
+void PyCameraManager::handleRequestCompleted(Request *req)
+{
+ pushRequest(req);
+ writeFd();
+}
+
+void PyCameraManager::writeFd()
+{
+ uint64_t v = 1;
+
+ size_t s = write(eventFd_.get(), &v, 8);
+ /*
+ * We should never fail, and have no simple means to manage the error,
+ * so let's log a fatal error.
+ */
+ if (s != 8)
+ LOG(Python, Fatal) << "Unable to write to eventfd";
+}
+
+int PyCameraManager::readFd()
+{
+ uint8_t buf[8];
+
+ ssize_t ret = read(eventFd_.get(), buf, 8);
+
+ if (ret == 8)
+ return 0;
+ else if (ret < 0)
+ return -errno;
+ else
+ return -EIO;
+}
+
+void PyCameraManager::pushRequest(Request *req)
+{
+ MutexLocker guard(completedRequestsMutex_);
+ completedRequests_.push_back(req);
+}
+
+std::vector<Request *> PyCameraManager::getCompletedRequests()
+{
+ std::vector<Request *> v;
+ MutexLocker guard(completedRequestsMutex_);
+ swap(v, completedRequests_);
+ return v;
+}
diff --git a/src/py/libcamera/py_camera_manager.h b/src/py/libcamera/py_camera_manager.h
new file mode 100644
index 00000000..3574db23
--- /dev/null
+++ b/src/py/libcamera/py_camera_manager.h
@@ -0,0 +1,45 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ */
+
+#pragma once
+
+#include <libcamera/base/mutex.h>
+
+#include <libcamera/libcamera.h>
+
+#include <pybind11/pybind11.h>
+
+using namespace libcamera;
+
+class PyCameraManager
+{
+public:
+ PyCameraManager();
+ ~PyCameraManager();
+
+ pybind11::list cameras();
+ std::shared_ptr<Camera> get(const std::string &name) { return cameraManager_->get(name); }
+
+ static const std::string &version() { return CameraManager::version(); }
+
+ int eventFd() const { return eventFd_.get(); }
+
+ std::vector<pybind11::object> getReadyRequests();
+
+ void handleRequestCompleted(Request *req);
+
+private:
+ std::unique_ptr<CameraManager> cameraManager_;
+
+ UniqueFD eventFd_;
+ libcamera::Mutex completedRequestsMutex_;
+ std::vector<Request *> completedRequests_
+ LIBCAMERA_TSA_GUARDED_BY(completedRequestsMutex_);
+
+ void writeFd();
+ int readFd();
+ void pushRequest(Request *req);
+ std::vector<Request *> getCompletedRequests();
+};
diff --git a/src/py/libcamera/py_color_space.cpp b/src/py/libcamera/py_color_space.cpp
new file mode 100644
index 00000000..fd5a5dab
--- /dev/null
+++ b/src/py/libcamera/py_color_space.cpp
@@ -0,0 +1,72 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ *
+ * Python bindings - Color Space classes
+ */
+
+#include <libcamera/color_space.h>
+#include <libcamera/libcamera.h>
+
+#include <pybind11/operators.h>
+#include <pybind11/pybind11.h>
+#include <pybind11/stl.h>
+
+#include "py_main.h"
+
+namespace py = pybind11;
+
+using namespace libcamera;
+
+void init_py_color_space(py::module &m)
+{
+ auto pyColorSpace = py::class_<ColorSpace>(m, "ColorSpace");
+ auto pyColorSpacePrimaries = py::enum_<ColorSpace::Primaries>(pyColorSpace, "Primaries");
+ auto pyColorSpaceTransferFunction = py::enum_<ColorSpace::TransferFunction>(pyColorSpace, "TransferFunction");
+ auto pyColorSpaceYcbcrEncoding = py::enum_<ColorSpace::YcbcrEncoding>(pyColorSpace, "YcbcrEncoding");
+ auto pyColorSpaceRange = py::enum_<ColorSpace::Range>(pyColorSpace, "Range");
+
+ pyColorSpace
+ .def(py::init([](ColorSpace::Primaries primaries,
+ ColorSpace::TransferFunction transferFunction,
+ ColorSpace::YcbcrEncoding ycbcrEncoding,
+ ColorSpace::Range range) {
+ return ColorSpace(primaries, transferFunction, ycbcrEncoding, range);
+ }), py::arg("primaries"), py::arg("transferFunction"),
+ py::arg("ycbcrEncoding"), py::arg("range"))
+ .def(py::init([](ColorSpace &other) { return other; }))
+ .def("__str__", [](ColorSpace &self) {
+ return "<libcamera.ColorSpace '" + self.toString() + "'>";
+ })
+ .def_readwrite("primaries", &ColorSpace::primaries)
+ .def_readwrite("transferFunction", &ColorSpace::transferFunction)
+ .def_readwrite("ycbcrEncoding", &ColorSpace::ycbcrEncoding)
+ .def_readwrite("range", &ColorSpace::range)
+ .def_static("Raw", []() { return ColorSpace::Raw; })
+ .def_static("Srgb", []() { return ColorSpace::Srgb; })
+ .def_static("Sycc", []() { return ColorSpace::Sycc; })
+ .def_static("Smpte170m", []() { return ColorSpace::Smpte170m; })
+ .def_static("Rec709", []() { return ColorSpace::Rec709; })
+ .def_static("Rec2020", []() { return ColorSpace::Rec2020; });
+
+ pyColorSpacePrimaries
+ .value("Raw", ColorSpace::Primaries::Raw)
+ .value("Smpte170m", ColorSpace::Primaries::Smpte170m)
+ .value("Rec709", ColorSpace::Primaries::Rec709)
+ .value("Rec2020", ColorSpace::Primaries::Rec2020);
+
+ pyColorSpaceTransferFunction
+ .value("Linear", ColorSpace::TransferFunction::Linear)
+ .value("Srgb", ColorSpace::TransferFunction::Srgb)
+ .value("Rec709", ColorSpace::TransferFunction::Rec709);
+
+ pyColorSpaceYcbcrEncoding
+ .value("Null", ColorSpace::YcbcrEncoding::None)
+ .value("Rec601", ColorSpace::YcbcrEncoding::Rec601)
+ .value("Rec709", ColorSpace::YcbcrEncoding::Rec709)
+ .value("Rec2020", ColorSpace::YcbcrEncoding::Rec2020);
+
+ pyColorSpaceRange
+ .value("Full", ColorSpace::Range::Full)
+ .value("Limited", ColorSpace::Range::Limited);
+}
diff --git a/src/py/libcamera/py_controls_generated.cpp.in b/src/py/libcamera/py_controls_generated.cpp.in
new file mode 100644
index 00000000..22a132d1
--- /dev/null
+++ b/src/py/libcamera/py_controls_generated.cpp.in
@@ -0,0 +1,47 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ *
+ * Python bindings - Auto-generated {{mode}}
+ *
+ * This file is auto-generated. Do not edit.
+ */
+
+#include <libcamera/{{header}}>
+
+#include <pybind11/pybind11.h>
+
+#include "py_main.h"
+
+namespace py = pybind11;
+
+class Py{{mode|capitalize}}
+{
+};
+
+{% for vendor in vendors -%}
+class Py{{vendor|capitalize}}{{mode|capitalize}}
+{
+};
+
+{% endfor -%}
+
+void init_py_{{mode}}_generated(py::module& m)
+{
+ auto {{mode}} = py::class_<Py{{mode|capitalize}}>(m, "{{mode}}");
+{%- for vendor in vendors %}
+ auto {{vendor}} = py::class_<Py{{vendor|capitalize}}{{mode|capitalize}}>({{mode}}, "{{vendor}}");
+{%- endfor %}
+
+{% for ctrl in controls %}
+ {{ctrl.klass}}.def_readonly_static("{{ctrl.name}}", static_cast<const libcamera::ControlId *>(&libcamera::{{mode}}::{{ctrl.namespace}}{{ctrl.name}}));
+{%- if ctrl.is_enum %}
+
+ py::enum_<libcamera::{{mode}}::{{ctrl.namespace}}{{ctrl.name}}Enum>({{ctrl.klass}}, "{{ctrl.name}}Enum")
+{%- for enum in ctrl.enum_values %}
+ .value("{{enum.py_name}}", libcamera::{{mode}}::{{ctrl.namespace}}{{enum.name}})
+{%- endfor %}
+ ;
+{%- endif %}
+{% endfor -%}
+}
diff --git a/src/py/libcamera/py_enums.cpp b/src/py/libcamera/py_enums.cpp
new file mode 100644
index 00000000..9e75ec1a
--- /dev/null
+++ b/src/py/libcamera/py_enums.cpp
@@ -0,0 +1,47 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ *
+ * Python bindings - Enumerations
+ */
+
+#include <libcamera/libcamera.h>
+
+#include <pybind11/pybind11.h>
+
+#include "py_main.h"
+
+namespace py = pybind11;
+
+using namespace libcamera;
+
+void init_py_enums(py::module &m)
+{
+ py::enum_<StreamRole>(m, "StreamRole")
+ .value("StillCapture", StreamRole::StillCapture)
+ .value("Raw", StreamRole::Raw)
+ .value("VideoRecording", StreamRole::VideoRecording)
+ .value("Viewfinder", StreamRole::Viewfinder);
+
+ py::enum_<ControlType>(m, "ControlType")
+ .value("Null", ControlType::ControlTypeNone)
+ .value("Bool", ControlType::ControlTypeBool)
+ .value("Byte", ControlType::ControlTypeByte)
+ .value("Integer32", ControlType::ControlTypeInteger32)
+ .value("Integer64", ControlType::ControlTypeInteger64)
+ .value("Float", ControlType::ControlTypeFloat)
+ .value("String", ControlType::ControlTypeString)
+ .value("Rectangle", ControlType::ControlTypeRectangle)
+ .value("Size", ControlType::ControlTypeSize)
+ .value("Point", ControlType::ControlTypePoint);
+
+ py::enum_<Orientation>(m, "Orientation")
+ .value("Rotate0", Orientation::Rotate0)
+ .value("Rotate0Mirror", Orientation::Rotate0Mirror)
+ .value("Rotate180", Orientation::Rotate180)
+ .value("Rotate180Mirror", Orientation::Rotate180Mirror)
+ .value("Rotate90Mirror", Orientation::Rotate90Mirror)
+ .value("Rotate270", Orientation::Rotate270)
+ .value("Rotate270Mirror", Orientation::Rotate270Mirror)
+ .value("Rotate90", Orientation::Rotate90);
+}
diff --git a/src/py/libcamera/py_formats_generated.cpp.in b/src/py/libcamera/py_formats_generated.cpp.in
new file mode 100644
index 00000000..c5fb9063
--- /dev/null
+++ b/src/py/libcamera/py_formats_generated.cpp.in
@@ -0,0 +1,27 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ *
+ * Python bindings - Auto-generated formats
+ *
+ * This file is auto-generated. Do not edit.
+ */
+
+#include <libcamera/formats.h>
+
+#include <pybind11/pybind11.h>
+
+#include "py_main.h"
+
+namespace py = pybind11;
+
+class PyFormats
+{
+};
+
+void init_py_formats_generated(py::module& m)
+{
+ py::class_<PyFormats>(m, "formats")
+${formats}
+ ;
+}
diff --git a/src/py/libcamera/py_geometry.cpp b/src/py/libcamera/py_geometry.cpp
new file mode 100644
index 00000000..c7e30360
--- /dev/null
+++ b/src/py/libcamera/py_geometry.cpp
@@ -0,0 +1,121 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ *
+ * Python bindings - Geometry classes
+ */
+
+#include <array>
+
+#include <libcamera/geometry.h>
+#include <libcamera/libcamera.h>
+
+#include <pybind11/operators.h>
+#include <pybind11/pybind11.h>
+#include <pybind11/stl.h>
+
+#include "py_main.h"
+
+namespace py = pybind11;
+
+using namespace libcamera;
+
+void init_py_geometry(py::module &m)
+{
+ auto pyPoint = py::class_<Point>(m, "Point");
+ auto pySize = py::class_<Size>(m, "Size");
+ auto pySizeRange = py::class_<SizeRange>(m, "SizeRange");
+ auto pyRectangle = py::class_<Rectangle>(m, "Rectangle");
+
+ pyPoint
+ .def(py::init<>())
+ .def(py::init<int, int>())
+ .def_readwrite("x", &Point::x)
+ .def_readwrite("y", &Point::y)
+ .def(py::self == py::self)
+ .def(-py::self)
+ .def("__str__", &Point::toString)
+ .def("__repr__", [](const Point &self) {
+ return py::str("libcamera.Point({}, {})")
+ .format(self.x, self.y);
+ });
+
+ pySize
+ .def(py::init<>())
+ .def(py::init<unsigned int, unsigned int>())
+ .def_readwrite("width", &Size::width)
+ .def_readwrite("height", &Size::height)
+ .def_property_readonly("is_null", &Size::isNull)
+ .def("align_down_to", &Size::alignDownTo)
+ .def("align_up_to", &Size::alignUpTo)
+ .def("bound_to", &Size::boundTo)
+ .def("expand_to", &Size::expandTo)
+ .def("grow_by", &Size::growBy)
+ .def("shrink_by", &Size::shrinkBy)
+ .def("aligned_up_to", &Size::alignedUpTo)
+ .def("aligned_up_to", &Size::alignedUpTo)
+ .def("bounded_to", &Size::boundedTo)
+ .def("expanded_to", &Size::expandedTo)
+ .def("grown_by", &Size::grownBy)
+ .def("shrunk_by", &Size::shrunkBy)
+ .def("bounded_to_aspect_ratio", &Size::boundedToAspectRatio)
+ .def("expanded_to_aspect_ratio", &Size::expandedToAspectRatio)
+ .def("centered_to", &Size::centeredTo)
+ .def(py::self == py::self)
+ .def(py::self < py::self)
+ .def(py::self <= py::self)
+ .def(py::self * float())
+ .def(py::self / float())
+ .def(py::self *= float())
+ .def(py::self /= float())
+ .def("__str__", &Size::toString)
+ .def("__repr__", [](const Size &self) {
+ return py::str("libcamera.Size({}, {})")
+ .format(self.width, self.height);
+ });
+
+ pySizeRange
+ .def(py::init<>())
+ .def(py::init<Size>())
+ .def(py::init<Size, Size>())
+ .def(py::init<Size, Size, unsigned int, unsigned int>())
+ .def_readwrite("min", &SizeRange::min)
+ .def_readwrite("max", &SizeRange::max)
+ .def_readwrite("hStep", &SizeRange::hStep)
+ .def_readwrite("vStep", &SizeRange::vStep)
+ .def("contains", &SizeRange::contains)
+ .def(py::self == py::self)
+ .def("__str__", &SizeRange::toString)
+ .def("__repr__", [](const SizeRange &self) {
+ return py::str("libcamera.SizeRange(({}, {}), ({}, {}), {}, {})")
+ .format(self.min.width, self.min.height,
+ self.max.width, self.max.height,
+ self.hStep, self.vStep);
+ });
+
+ pyRectangle
+ .def(py::init<>())
+ .def(py::init<int, int, Size>())
+ .def(py::init<int, int, unsigned int, unsigned int>())
+ .def(py::init<Size>())
+ .def_readwrite("x", &Rectangle::x)
+ .def_readwrite("y", &Rectangle::y)
+ .def_readwrite("width", &Rectangle::width)
+ .def_readwrite("height", &Rectangle::height)
+ .def_property_readonly("is_null", &Rectangle::isNull)
+ .def_property_readonly("center", &Rectangle::center)
+ .def_property_readonly("size", &Rectangle::size)
+ .def_property_readonly("topLeft", &Rectangle::topLeft)
+ .def("scale_by", &Rectangle::scaleBy)
+ .def("translate_by", &Rectangle::translateBy)
+ .def("bounded_to", &Rectangle::boundedTo)
+ .def("enclosed_in", &Rectangle::enclosedIn)
+ .def("scaled_by", &Rectangle::scaledBy)
+ .def("translated_by", &Rectangle::translatedBy)
+ .def(py::self == py::self)
+ .def("__str__", &Rectangle::toString)
+ .def("__repr__", [](const Rectangle &self) {
+ return py::str("libcamera.Rectangle({}, {}, {}, {})")
+ .format(self.x, self.y, self.width, self.height);
+ });
+}
diff --git a/src/py/libcamera/py_helpers.cpp b/src/py/libcamera/py_helpers.cpp
new file mode 100644
index 00000000..1ad1d4c1
--- /dev/null
+++ b/src/py/libcamera/py_helpers.cpp
@@ -0,0 +1,101 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ */
+
+#include "py_helpers.h"
+
+#include <libcamera/libcamera.h>
+
+#include <pybind11/functional.h>
+#include <pybind11/stl.h>
+#include <pybind11/stl_bind.h>
+
+namespace py = pybind11;
+
+using namespace libcamera;
+
+template<typename T>
+static py::object valueOrTuple(const ControlValue &cv)
+{
+ if (cv.isArray()) {
+ const T *v = reinterpret_cast<const T *>(cv.data().data());
+ auto t = py::tuple(cv.numElements());
+
+ for (size_t i = 0; i < cv.numElements(); ++i)
+ t[i] = v[i];
+
+ return std::move(t);
+ }
+
+ return py::cast(cv.get<T>());
+}
+
+py::object controlValueToPy(const ControlValue &cv)
+{
+ switch (cv.type()) {
+ case ControlTypeNone:
+ return py::none();
+ case ControlTypeBool:
+ return valueOrTuple<bool>(cv);
+ case ControlTypeByte:
+ return valueOrTuple<uint8_t>(cv);
+ case ControlTypeInteger32:
+ return valueOrTuple<int32_t>(cv);
+ case ControlTypeInteger64:
+ return valueOrTuple<int64_t>(cv);
+ case ControlTypeFloat:
+ return valueOrTuple<float>(cv);
+ case ControlTypeString:
+ return py::cast(cv.get<std::string>());
+ case ControlTypeSize: {
+ const Size *v = reinterpret_cast<const Size *>(cv.data().data());
+ return py::cast(v);
+ }
+ case ControlTypeRectangle:
+ return valueOrTuple<Rectangle>(cv);
+ case ControlTypePoint:
+ return valueOrTuple<Point>(cv);
+ default:
+ throw std::runtime_error("Unsupported ControlValue type");
+ }
+}
+
+template<typename T>
+static ControlValue controlValueMaybeArray(const py::object &ob)
+{
+ if (py::isinstance<py::list>(ob) || py::isinstance<py::tuple>(ob)) {
+ std::vector<T> vec = ob.cast<std::vector<T>>();
+ return ControlValue(Span<const T>(vec));
+ }
+
+ return ControlValue(ob.cast<T>());
+}
+
+ControlValue pyToControlValue(const py::object &ob, ControlType type)
+{
+ switch (type) {
+ case ControlTypeNone:
+ return ControlValue();
+ case ControlTypeBool:
+ return ControlValue(ob.cast<bool>());
+ case ControlTypeByte:
+ return controlValueMaybeArray<uint8_t>(ob);
+ case ControlTypeInteger32:
+ return controlValueMaybeArray<int32_t>(ob);
+ case ControlTypeInteger64:
+ return controlValueMaybeArray<int64_t>(ob);
+ case ControlTypeFloat:
+ return controlValueMaybeArray<float>(ob);
+ case ControlTypeString:
+ return ControlValue(ob.cast<std::string>());
+ case ControlTypeRectangle:
+ return controlValueMaybeArray<Rectangle>(ob);
+ case ControlTypeSize:
+ return ControlValue(ob.cast<Size>());
+ case ControlTypePoint:
+ return controlValueMaybeArray<Point>(ob);
+ default:
+ throw std::runtime_error("Control type not implemented");
+ }
+}
diff --git a/src/py/libcamera/py_helpers.h b/src/py/libcamera/py_helpers.h
new file mode 100644
index 00000000..983969df
--- /dev/null
+++ b/src/py/libcamera/py_helpers.h
@@ -0,0 +1,13 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ */
+
+#pragma once
+
+#include <libcamera/libcamera.h>
+
+#include <pybind11/pybind11.h>
+
+pybind11::object controlValueToPy(const libcamera::ControlValue &cv);
+libcamera::ControlValue pyToControlValue(const pybind11::object &ob, libcamera::ControlType type);
diff --git a/src/py/libcamera/py_main.cpp b/src/py/libcamera/py_main.cpp
new file mode 100644
index 00000000..441a70ab
--- /dev/null
+++ b/src/py/libcamera/py_main.cpp
@@ -0,0 +1,523 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ *
+ * Python bindings
+ */
+
+#include "py_main.h"
+
+#include <limits>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include <libcamera/base/log.h>
+
+#include <libcamera/libcamera.h>
+
+#include <pybind11/functional.h>
+#include <pybind11/pybind11.h>
+#include <pybind11/stl.h>
+#include <pybind11/stl_bind.h>
+
+#include "py_camera_manager.h"
+#include "py_helpers.h"
+
+namespace py = pybind11;
+
+using namespace libcamera;
+
+namespace libcamera {
+
+LOG_DEFINE_CATEGORY(Python)
+
+}
+
+/*
+ * This is a holder class used only for the Camera class, for the sole purpose
+ * of avoiding the compilation issue with Camera's private destructor.
+ *
+ * pybind11 requires a public destructor for classes held with shared_ptrs, even
+ * in cases where the public destructor is not strictly needed. The current
+ * understanding is that there are the following options to solve the problem:
+ *
+ * - Use pybind11 'smart_holder' branch. The downside is that 'smart_holder'
+ * is not the mainline branch, and not available in distributions.
+ * - https://github.com/pybind/pybind11/pull/2067
+ * - Make the Camera destructor public
+ * - Something like the PyCameraSmartPtr here, which adds a layer, hiding the
+ * issue.
+ */
+template<typename T>
+class PyCameraSmartPtr
+{
+public:
+ using element_type = T;
+
+ PyCameraSmartPtr()
+ {
+ }
+
+ explicit PyCameraSmartPtr(T *)
+ {
+ throw std::runtime_error("invalid SmartPtr constructor call");
+ }
+
+ explicit PyCameraSmartPtr(std::shared_ptr<T> p)
+ : ptr_(p)
+ {
+ }
+
+ T *get() const { return ptr_.get(); }
+
+ operator std::shared_ptr<T>() const { return ptr_; }
+
+private:
+ std::shared_ptr<T> ptr_;
+};
+
+PYBIND11_DECLARE_HOLDER_TYPE(T, PyCameraSmartPtr<T>)
+
+/*
+ * Note: global C++ destructors can be ran on this before the py module is
+ * destructed.
+ */
+static std::weak_ptr<PyCameraManager> gCameraManager;
+
+PYBIND11_MODULE(_libcamera, m)
+{
+ init_py_enums(m);
+ init_py_controls_generated(m);
+ init_py_geometry(m);
+ init_py_properties_generated(m);
+ init_py_color_space(m);
+ init_py_transform(m);
+
+ /* Forward declarations */
+
+ /*
+ * We need to declare all the classes here so that Python docstrings
+ * can be generated correctly.
+ * https://pybind11.readthedocs.io/en/latest/advanced/misc.html#avoiding-c-types-in-docstrings
+ */
+
+ auto pyCameraManager = py::class_<PyCameraManager, std::shared_ptr<PyCameraManager>>(m, "CameraManager");
+ auto pyCamera = py::class_<Camera, PyCameraSmartPtr<Camera>>(m, "Camera");
+ auto pySensorConfiguration = py::class_<SensorConfiguration>(m, "SensorConfiguration");
+ auto pyCameraConfiguration = py::class_<CameraConfiguration>(m, "CameraConfiguration");
+ auto pyCameraConfigurationStatus = py::enum_<CameraConfiguration::Status>(pyCameraConfiguration, "Status");
+ auto pyStreamConfiguration = py::class_<StreamConfiguration>(m, "StreamConfiguration");
+ auto pyStreamFormats = py::class_<StreamFormats>(m, "StreamFormats");
+ auto pyFrameBufferAllocator = py::class_<FrameBufferAllocator>(m, "FrameBufferAllocator");
+ auto pyFrameBuffer = py::class_<FrameBuffer>(m, "FrameBuffer");
+ auto pyFrameBufferPlane = py::class_<FrameBuffer::Plane>(pyFrameBuffer, "Plane");
+ auto pyStream = py::class_<Stream>(m, "Stream");
+ auto pyControlId = py::class_<ControlId>(m, "ControlId");
+ auto pyControlInfo = py::class_<ControlInfo>(m, "ControlInfo");
+ auto pyRequest = py::class_<Request>(m, "Request");
+ auto pyRequestStatus = py::enum_<Request::Status>(pyRequest, "Status");
+ auto pyRequestReuse = py::enum_<Request::ReuseFlag>(pyRequest, "Reuse");
+ auto pyFrameMetadata = py::class_<FrameMetadata>(m, "FrameMetadata");
+ auto pyFrameMetadataStatus = py::enum_<FrameMetadata::Status>(pyFrameMetadata, "Status");
+ auto pyFrameMetadataPlane = py::class_<FrameMetadata::Plane>(pyFrameMetadata, "Plane");
+ auto pyPixelFormat = py::class_<PixelFormat>(m, "PixelFormat");
+
+ init_py_formats_generated(m);
+
+ /* Global functions */
+ m.def("log_set_level", &logSetLevel);
+
+ /* Classes */
+ pyCameraManager
+ .def_static("singleton", []() {
+ std::shared_ptr<PyCameraManager> cm = gCameraManager.lock();
+
+ if (!cm) {
+ cm = std::make_shared<PyCameraManager>();
+ gCameraManager = cm;
+ }
+
+ return cm;
+ })
+
+ .def_property_readonly_static("version", [](py::object /* self */) { return PyCameraManager::version(); })
+ .def("get", &PyCameraManager::get, py::keep_alive<0, 1>())
+ .def_property_readonly("cameras", &PyCameraManager::cameras)
+
+ .def_property_readonly("event_fd", &PyCameraManager::eventFd)
+ .def("get_ready_requests", &PyCameraManager::getReadyRequests);
+
+ pyCamera
+ .def_property_readonly("id", &Camera::id)
+ .def("acquire", [](Camera &self) {
+ int ret = self.acquire();
+ if (ret)
+ throw std::system_error(-ret, std::generic_category(),
+ "Failed to acquire camera");
+ })
+ .def("release", [](Camera &self) {
+ int ret = self.release();
+ if (ret)
+ throw std::system_error(-ret, std::generic_category(),
+ "Failed to release camera");
+ })
+ .def("start", [](Camera &self,
+ const std::unordered_map<const ControlId *, py::object> &controls) {
+ /* \todo What happens if someone calls start() multiple times? */
+
+ auto cm = gCameraManager.lock();
+ ASSERT(cm);
+
+ self.requestCompleted.connect(cm.get(), &PyCameraManager::handleRequestCompleted);
+
+ ControlList controlList(self.controls());
+
+ for (const auto &[id, obj] : controls) {
+ auto val = pyToControlValue(obj, id->type());
+ controlList.set(id->id(), val);
+ }
+
+ int ret = self.start(&controlList);
+ if (ret) {
+ self.requestCompleted.disconnect();
+ throw std::system_error(-ret, std::generic_category(),
+ "Failed to start camera");
+ }
+ }, py::arg("controls") = std::unordered_map<const ControlId *, py::object>())
+
+ .def("stop", [](Camera &self) {
+ int ret = self.stop();
+
+ self.requestCompleted.disconnect();
+
+ if (ret)
+ throw std::system_error(-ret, std::generic_category(),
+ "Failed to stop camera");
+ })
+
+ .def("__str__", [](Camera &self) {
+ return "<libcamera.Camera '" + self.id() + "'>";
+ })
+
+ /* Keep the camera alive, as StreamConfiguration contains a Stream* */
+ .def("generate_configuration", [](Camera &self, const std::vector<StreamRole> &roles) {
+ return self.generateConfiguration(roles);
+ }, py::keep_alive<0, 1>())
+
+ .def("configure", [](Camera &self, CameraConfiguration *config) {
+ int ret = self.configure(config);
+ if (ret)
+ throw std::system_error(-ret, std::generic_category(),
+ "Failed to configure camera");
+ })
+
+ .def("create_request", [](Camera &self, uint64_t cookie) {
+ std::unique_ptr<Request> req = self.createRequest(cookie);
+ if (!req)
+ throw std::system_error(ENOMEM, std::generic_category(),
+ "Failed to create request");
+ return req;
+ }, py::arg("cookie") = 0)
+
+ .def("queue_request", [](Camera &self, Request *req) {
+ py::object py_req = py::cast(req);
+
+ /*
+ * Increase the reference count, will be dropped in
+ * CameraManager.get_ready_requests().
+ */
+
+ py_req.inc_ref();
+
+ int ret = self.queueRequest(req);
+ if (ret) {
+ py_req.dec_ref();
+ throw std::system_error(-ret, std::generic_category(),
+ "Failed to queue request");
+ }
+ })
+
+ .def_property_readonly("streams", [](Camera &self) {
+ py::set set;
+ for (auto &s : self.streams()) {
+ py::object py_self = py::cast(self);
+ py::object py_s = py::cast(s);
+ py::detail::keep_alive_impl(py_s, py_self);
+ set.add(py_s);
+ }
+ return set;
+ })
+
+ .def_property_readonly("controls", [](Camera &self) {
+ /* Convert ControlInfoMap to std container */
+
+ std::unordered_map<const ControlId *, ControlInfo> ret;
+
+ for (const auto &[k, cv] : self.controls())
+ ret[k] = cv;
+
+ return ret;
+ })
+
+ .def_property_readonly("properties", [](Camera &self) {
+ /* Convert ControlList to std container */
+
+ std::unordered_map<const ControlId *, py::object> ret;
+
+ for (const auto &[k, cv] : self.properties()) {
+ const ControlId *id = properties::properties.at(k);
+ py::object ob = controlValueToPy(cv);
+ ret[id] = ob;
+ }
+
+ return ret;
+ });
+
+ pySensorConfiguration
+ .def(py::init<>())
+ .def_readwrite("bit_depth", &SensorConfiguration::bitDepth)
+ .def_readwrite("analog_crop", &SensorConfiguration::analogCrop)
+ .def_property(
+ "binning",
+ [](SensorConfiguration &self) {
+ return py::make_tuple(self.binning.binX, self.binning.binY);
+ },
+ [](SensorConfiguration &self, py::object value) {
+ auto vec = value.cast<std::vector<unsigned int>>();
+ if (vec.size() != 2)
+ throw std::runtime_error("binning requires iterable of 2 values");
+ self.binning.binX = vec[0];
+ self.binning.binY = vec[1];
+ })
+ .def_property(
+ "skipping",
+ [](SensorConfiguration &self) {
+ return py::make_tuple(self.skipping.xOddInc, self.skipping.xEvenInc,
+ self.skipping.yOddInc, self.skipping.yEvenInc);
+ },
+ [](SensorConfiguration &self, py::object value) {
+ auto vec = value.cast<std::vector<unsigned int>>();
+ if (vec.size() != 4)
+ throw std::runtime_error("skipping requires iterable of 4 values");
+ self.skipping.xOddInc = vec[0];
+ self.skipping.xEvenInc = vec[1];
+ self.skipping.yOddInc = vec[2];
+ self.skipping.yEvenInc = vec[3];
+ })
+ .def_readwrite("output_size", &SensorConfiguration::outputSize)
+ .def("is_valid", &SensorConfiguration::isValid);
+
+ pyCameraConfiguration
+ .def("__iter__", [](CameraConfiguration &self) {
+ return py::make_iterator<py::return_value_policy::reference_internal>(self);
+ }, py::keep_alive<0, 1>())
+ .def("__len__", [](CameraConfiguration &self) {
+ return self.size();
+ })
+ .def("validate", &CameraConfiguration::validate)
+ .def("at", py::overload_cast<unsigned int>(&CameraConfiguration::at),
+ py::return_value_policy::reference_internal)
+ .def_property_readonly("size", &CameraConfiguration::size)
+ .def_property_readonly("empty", &CameraConfiguration::empty)
+ .def_readwrite("sensor_config", &CameraConfiguration::sensorConfig)
+ .def_readwrite("orientation", &CameraConfiguration::orientation);
+
+ pyCameraConfigurationStatus
+ .value("Valid", CameraConfiguration::Valid)
+ .value("Adjusted", CameraConfiguration::Adjusted)
+ .value("Invalid", CameraConfiguration::Invalid);
+
+ pyStreamConfiguration
+ .def("__str__", &StreamConfiguration::toString)
+ .def_property_readonly("stream", &StreamConfiguration::stream,
+ py::return_value_policy::reference_internal)
+ .def_readwrite("size", &StreamConfiguration::size)
+ .def_readwrite("pixel_format", &StreamConfiguration::pixelFormat)
+ .def_readwrite("stride", &StreamConfiguration::stride)
+ .def_readwrite("frame_size", &StreamConfiguration::frameSize)
+ .def_readwrite("buffer_count", &StreamConfiguration::bufferCount)
+ .def_property_readonly("formats", &StreamConfiguration::formats,
+ py::return_value_policy::reference_internal)
+ .def_readwrite("color_space", &StreamConfiguration::colorSpace);
+
+ pyStreamFormats
+ .def_property_readonly("pixel_formats", &StreamFormats::pixelformats)
+ .def("sizes", &StreamFormats::sizes)
+ .def("range", &StreamFormats::range);
+
+ pyFrameBufferAllocator
+ .def(py::init<PyCameraSmartPtr<Camera>>(), py::keep_alive<1, 2>())
+ .def("allocate", [](FrameBufferAllocator &self, Stream *stream) {
+ int ret = self.allocate(stream);
+ if (ret < 0)
+ throw std::system_error(-ret, std::generic_category(),
+ "Failed to allocate buffers");
+ return ret;
+ })
+ .def_property_readonly("allocated", &FrameBufferAllocator::allocated)
+ /* Create a list of FrameBuffers, where each FrameBuffer has a keep-alive to FrameBufferAllocator */
+ .def("buffers", [](FrameBufferAllocator &self, Stream *stream) {
+ py::object py_self = py::cast(self);
+ py::list l;
+ for (auto &ub : self.buffers(stream)) {
+ py::object py_buf = py::cast(ub.get(), py::return_value_policy::reference_internal, py_self);
+ l.append(py_buf);
+ }
+ return l;
+ });
+
+ pyFrameBuffer
+ .def(py::init<std::vector<FrameBuffer::Plane>, unsigned int>(),
+ py::arg("planes"), py::arg("cookie") = 0)
+ .def_property_readonly("metadata", &FrameBuffer::metadata, py::return_value_policy::reference_internal)
+ .def_property_readonly("planes", &FrameBuffer::planes)
+ .def_property("cookie", &FrameBuffer::cookie, &FrameBuffer::setCookie);
+
+ pyFrameBufferPlane
+ .def(py::init())
+ .def(py::init([](int fd, unsigned int offset, unsigned int length) {
+ auto p = FrameBuffer::Plane();
+ p.fd = SharedFD(fd);
+ p.offset = offset;
+ p.length = length;
+ return p;
+ }), py::arg("fd"), py::arg("offset"), py::arg("length"))
+ .def_property("fd",
+ [](const FrameBuffer::Plane &self) {
+ return self.fd.get();
+ },
+ [](FrameBuffer::Plane &self, int fd) {
+ self.fd = SharedFD(fd);
+ })
+ .def_readwrite("offset", &FrameBuffer::Plane::offset)
+ .def_readwrite("length", &FrameBuffer::Plane::length);
+
+ pyStream
+ .def_property_readonly("configuration", &Stream::configuration);
+
+ pyControlId
+ .def_property_readonly("id", &ControlId::id)
+ .def_property_readonly("name", &ControlId::name)
+ .def_property_readonly("vendor", &ControlId::vendor)
+ .def_property_readonly("type", &ControlId::type)
+ .def_property_readonly("isArray", &ControlId::isArray)
+ .def_property_readonly("size", &ControlId::size)
+ .def("__str__", [](const ControlId &self) { return self.name(); })
+ .def("__repr__", [](const ControlId &self) {
+ std::string sizeStr = "";
+ if (self.isArray()) {
+ sizeStr = "[";
+ size_t size = self.size();
+ if (size == std::numeric_limits<size_t>::max())
+ sizeStr += "n";
+ else
+ sizeStr += std::to_string(size);
+ sizeStr += "]";
+ }
+ return py::str("libcamera.ControlId({}, {}.{}{}, {})")
+ .format(self.id(), self.vendor(), self.name(), sizeStr, self.type());
+ })
+ .def("enumerators", &ControlId::enumerators);
+
+ pyControlInfo
+ .def_property_readonly("min", [](const ControlInfo &self) {
+ return controlValueToPy(self.min());
+ })
+ .def_property_readonly("max", [](const ControlInfo &self) {
+ return controlValueToPy(self.max());
+ })
+ .def_property_readonly("default", [](const ControlInfo &self) {
+ return controlValueToPy(self.def());
+ })
+ .def_property_readonly("values", [](const ControlInfo &self) {
+ py::list l;
+ for (const auto &v : self.values())
+ l.append(controlValueToPy(v));
+ return l;
+ })
+ .def("__str__", &ControlInfo::toString)
+ .def("__repr__", [](const ControlInfo &self) {
+ return py::str("libcamera.ControlInfo({})")
+ .format(self.toString());
+ });
+
+ pyRequest
+ /* \todo Fence is not supported, so we cannot expose addBuffer() directly */
+ .def("add_buffer", [](Request &self, const Stream *stream, FrameBuffer *buffer) {
+ int ret = self.addBuffer(stream, buffer);
+ if (ret)
+ throw std::system_error(-ret, std::generic_category(),
+ "Failed to add buffer");
+ }, py::keep_alive<1, 3>()) /* Request keeps Framebuffer alive */
+ .def_property_readonly("status", &Request::status)
+ .def_property_readonly("buffers", &Request::buffers)
+ .def_property_readonly("cookie", &Request::cookie)
+ .def_property_readonly("sequence", &Request::sequence)
+ .def_property_readonly("has_pending_buffers", &Request::hasPendingBuffers)
+ .def("set_control", [](Request &self, const ControlId &id, py::object value) {
+ self.controls().set(id.id(), pyToControlValue(value, id.type()));
+ })
+ .def_property_readonly("metadata", [](Request &self) {
+ /* Convert ControlList to std container */
+
+ std::unordered_map<const ControlId *, py::object> ret;
+
+ for (const auto &[key, cv] : self.metadata()) {
+ const ControlId *id = controls::controls.at(key);
+ py::object ob = controlValueToPy(cv);
+ ret[id] = ob;
+ }
+
+ return ret;
+ })
+ /*
+ * \todo As we add a keep_alive to the fb in addBuffers(), we
+ * can only allow reuse with ReuseBuffers.
+ */
+ .def("reuse", [](Request &self) { self.reuse(Request::ReuseFlag::ReuseBuffers); })
+ .def("__str__", &Request::toString);
+
+ pyRequestStatus
+ .value("Pending", Request::RequestPending)
+ .value("Complete", Request::RequestComplete)
+ .value("Cancelled", Request::RequestCancelled);
+
+ pyRequestReuse
+ .value("Default", Request::ReuseFlag::Default)
+ .value("ReuseBuffers", Request::ReuseFlag::ReuseBuffers);
+
+ pyFrameMetadata
+ .def_readonly("status", &FrameMetadata::status)
+ .def_readonly("sequence", &FrameMetadata::sequence)
+ .def_readonly("timestamp", &FrameMetadata::timestamp)
+ .def_property_readonly("planes", [](const FrameMetadata &self) {
+ /* Convert from Span<> to std::vector<> */
+ /* Note: this creates a copy */
+ std::vector<FrameMetadata::Plane> v(self.planes().begin(), self.planes().end());
+ return v;
+ });
+
+ pyFrameMetadataStatus
+ .value("Success", FrameMetadata::FrameSuccess)
+ .value("Error", FrameMetadata::FrameError)
+ .value("Cancelled", FrameMetadata::FrameCancelled);
+
+ pyFrameMetadataPlane
+ .def_readwrite("bytes_used", &FrameMetadata::Plane::bytesused);
+
+ pyPixelFormat
+ .def(py::init<>())
+ .def(py::init<uint32_t, uint64_t>())
+ .def(py::init<>([](const std::string &str) {
+ return PixelFormat::fromString(str);
+ }))
+ .def_property_readonly("fourcc", &PixelFormat::fourcc)
+ .def_property_readonly("modifier", &PixelFormat::modifier)
+ .def(py::self == py::self)
+ .def("__str__", &PixelFormat::toString)
+ .def("__repr__", [](const PixelFormat &self) {
+ return "libcamera.PixelFormat('" + self.toString() + "')";
+ });
+}
diff --git a/src/py/libcamera/py_main.h b/src/py/libcamera/py_main.h
new file mode 100644
index 00000000..4d594326
--- /dev/null
+++ b/src/py/libcamera/py_main.h
@@ -0,0 +1,24 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ */
+
+#pragma once
+
+#include <libcamera/base/log.h>
+
+#include <pybind11/pybind11.h>
+
+namespace libcamera {
+
+LOG_DECLARE_CATEGORY(Python)
+
+}
+
+void init_py_color_space(pybind11::module &m);
+void init_py_controls_generated(pybind11::module &m);
+void init_py_enums(pybind11::module &m);
+void init_py_formats_generated(pybind11::module &m);
+void init_py_geometry(pybind11::module &m);
+void init_py_properties_generated(pybind11::module &m);
+void init_py_transform(pybind11::module &m);
diff --git a/src/py/libcamera/py_transform.cpp b/src/py/libcamera/py_transform.cpp
new file mode 100644
index 00000000..768260ff
--- /dev/null
+++ b/src/py/libcamera/py_transform.cpp
@@ -0,0 +1,83 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+ *
+ * Python bindings - Transform class
+ */
+
+#include <libcamera/transform.h>
+#include <libcamera/libcamera.h>
+
+#include <pybind11/operators.h>
+#include <pybind11/pybind11.h>
+#include <pybind11/stl.h>
+
+#include "py_main.h"
+
+namespace py = pybind11;
+
+using namespace libcamera;
+
+void init_py_transform(py::module &m)
+{
+ auto pyTransform = py::class_<Transform>(m, "Transform");
+
+ pyTransform
+ .def(py::init([](int rotation, bool hflip, bool vflip, bool transpose) {
+ bool ok;
+
+ Transform t = transformFromRotation(rotation, &ok);
+ if (!ok)
+ throw std::invalid_argument("Invalid rotation");
+
+ if (hflip)
+ t ^= Transform::HFlip;
+ if (vflip)
+ t ^= Transform::VFlip;
+ if (transpose)
+ t ^= Transform::Transpose;
+ return t;
+ }), py::arg("rotation") = 0, py::arg("hflip") = false,
+ py::arg("vflip") = false, py::arg("transpose") = false)
+ .def(py::init([](Transform &other) { return other; }))
+ .def("__str__", [](Transform &self) {
+ return "<libcamera.Transform '" + std::string(transformToString(self)) + "'>";
+ })
+ .def_property("hflip",
+ [](Transform &self) {
+ return !!(self & Transform::HFlip);
+ },
+ [](Transform &self, bool hflip) {
+ if (hflip)
+ self |= Transform::HFlip;
+ else
+ self &= ~Transform::HFlip;
+ })
+ .def_property("vflip",
+ [](Transform &self) {
+ return !!(self & Transform::VFlip);
+ },
+ [](Transform &self, bool vflip) {
+ if (vflip)
+ self |= Transform::VFlip;
+ else
+ self &= ~Transform::VFlip;
+ })
+ .def_property("transpose",
+ [](Transform &self) {
+ return !!(self & Transform::Transpose);
+ },
+ [](Transform &self, bool transpose) {
+ if (transpose)
+ self |= Transform::Transpose;
+ else
+ self &= ~Transform::Transpose;
+ })
+ .def("inverse", [](Transform &self) { return -self; })
+ .def("invert", [](Transform &self) {
+ self = -self;
+ })
+ .def("compose", [](Transform &self, Transform &other) {
+ self = self * other;
+ });
+}
diff --git a/src/py/libcamera/utils/MappedFrameBuffer.py b/src/py/libcamera/utils/MappedFrameBuffer.py
new file mode 100644
index 00000000..329e51fa
--- /dev/null
+++ b/src/py/libcamera/utils/MappedFrameBuffer.py
@@ -0,0 +1,105 @@
+# SPDX-License-Identifier: LGPL-2.1-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+import libcamera
+from typing import Tuple
+
+class MappedFrameBuffer:
+ """
+ Provides memoryviews for the FrameBuffer's planes
+ """
+ def __init__(self, fb: libcamera.FrameBuffer):
+ self.__fb = fb
+ self.__planes = ()
+ self.__maps = ()
+
+ def __enter__(self):
+ return self.mmap()
+
+ def __exit__(self, exc_type, exc_value, exc_traceback):
+ self.munmap()
+
+ def mmap(self):
+ if self.__planes:
+ raise RuntimeError('MappedFrameBuffer already mmapped')
+
+ import os
+ import mmap
+
+ fb = self.__fb
+
+ # Collect information about the buffers
+
+ bufinfos = {}
+
+ for plane in fb.planes:
+ fd = plane.fd
+
+ if fd not in bufinfos:
+ buflen = os.lseek(fd, 0, os.SEEK_END)
+ bufinfos[fd] = {'maplen': 0, 'buflen': buflen}
+ else:
+ buflen = bufinfos[fd]['buflen']
+
+ if plane.offset > buflen or plane.offset + plane.length > buflen:
+ raise RuntimeError(f'plane is out of buffer: buffer length={buflen}, ' +
+ f'plane offset={plane.offset}, plane length={plane.length}')
+
+ bufinfos[fd]['maplen'] = max(bufinfos[fd]['maplen'], plane.offset + plane.length)
+
+ # mmap the buffers
+
+ maps = []
+
+ for fd, info in bufinfos.items():
+ map = mmap.mmap(fd, info['maplen'], mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE)
+ info['map'] = map
+ maps.append(map)
+
+ self.__maps = tuple(maps)
+
+ # Create memoryviews for the planes
+
+ planes = []
+
+ for plane in fb.planes:
+ fd = plane.fd
+ info = bufinfos[fd]
+
+ mv = memoryview(info['map'])
+
+ start = plane.offset
+ end = plane.offset + plane.length
+
+ mv = mv[start:end]
+
+ planes.append(mv)
+
+ self.__planes = tuple(planes)
+
+ return self
+
+ def munmap(self):
+ if not self.__planes:
+ raise RuntimeError('MappedFrameBuffer not mmapped')
+
+ for p in self.__planes:
+ p.release()
+
+ for mm in self.__maps:
+ mm.close()
+
+ self.__planes = ()
+ self.__maps = ()
+
+ @property
+ def planes(self) -> Tuple[memoryview, ...]:
+ """memoryviews for the planes"""
+ if not self.__planes:
+ raise RuntimeError('MappedFrameBuffer not mmapped')
+
+ return self.__planes
+
+ @property
+ def fb(self):
+ return self.__fb
diff --git a/src/py/libcamera/utils/__init__.py b/src/py/libcamera/utils/__init__.py
new file mode 100644
index 00000000..4a23ce36
--- /dev/null
+++ b/src/py/libcamera/utils/__init__.py
@@ -0,0 +1,4 @@
+# SPDX-License-Identifier: LGPL-2.1-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from .MappedFrameBuffer import MappedFrameBuffer
diff --git a/src/py/meson.build b/src/py/meson.build
new file mode 100644
index 00000000..a4586b4a
--- /dev/null
+++ b/src/py/meson.build
@@ -0,0 +1,3 @@
+# SPDX-License-Identifier: CC0-1.0
+
+subdir('libcamera')