summaryrefslogtreecommitdiff
path: root/src/py/cam
diff options
context:
space:
mode:
Diffstat (limited to 'src/py/cam')
-rwxr-xr-xsrc/py/cam/cam.py472
-rw-r--r--src/py/cam/cam_kms.py184
-rw-r--r--src/py/cam/cam_null.py47
-rw-r--r--src/py/cam/cam_qt.py182
-rw-r--r--src/py/cam/cam_qtgl.py363
-rw-r--r--src/py/cam/gl_helpers.py66
-rw-r--r--src/py/cam/helpers.py158
7 files changed, 1472 insertions, 0 deletions
diff --git a/src/py/cam/cam.py b/src/py/cam/cam.py
new file mode 100755
index 00000000..ff4b7f66
--- /dev/null
+++ b/src/py/cam/cam.py
@@ -0,0 +1,472 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from typing import Any
+import argparse
+import binascii
+import libcamera as libcam
+import libcamera.utils
+import sys
+import traceback
+
+
+class CameraContext:
+ camera: libcam.Camera
+ id: str
+ idx: int
+
+ opt_stream: str
+ opt_strict_formats: bool
+ opt_crc: bool
+ opt_metadata: bool
+ opt_save_frames: bool
+ opt_capture: int
+ opt_orientation: str
+
+ stream_names: dict[libcam.Stream, str]
+ streams: list[libcam.Stream]
+ allocator: libcam.FrameBufferAllocator
+ requests: list[libcam.Request]
+ reqs_queued: int
+ reqs_completed: int
+ last: int = 0
+ fps: float
+
+ def __init__(self, camera, idx):
+ self.camera = camera
+ self.idx = idx
+ self.id = 'cam' + str(idx)
+ self.reqs_queued = 0
+ self.reqs_completed = 0
+
+ def do_cmd_list_props(self):
+ print('Properties for', self.id)
+
+ for cid, val in self.camera.properties.items():
+ print('\t{}: {}'.format(cid, val))
+
+ def do_cmd_list_controls(self):
+ print('Controls for', self.id)
+
+ for cid, info in self.camera.controls.items():
+ print('\t{}: {}'.format(cid, info))
+
+ def do_cmd_info(self):
+ print('Stream info for', self.id)
+
+ roles = [libcam.StreamRole.Viewfinder]
+
+ camconfig = self.camera.generate_configuration(roles)
+ if camconfig is None:
+ raise Exception('Generating config failed')
+
+ for i, stream_config in enumerate(camconfig):
+ print('\t{}: {}'.format(i, stream_config))
+
+ formats = stream_config.formats
+ for fmt in formats.pixel_formats:
+ print('\t * Pixelformat:', fmt, formats.range(fmt))
+
+ for size in formats.sizes(fmt):
+ print('\t -', size)
+
+ def acquire(self):
+ self.camera.acquire()
+
+ def release(self):
+ self.camera.release()
+
+ def __parse_streams(self):
+ streams = []
+
+ for stream_desc in self.opt_stream:
+ stream_opts: dict[str, Any]
+ stream_opts = {'role': libcam.StreamRole.Viewfinder}
+
+ for stream_opt in stream_desc.split(','):
+ if stream_opt == 0:
+ continue
+
+ arr = stream_opt.split('=')
+ if len(arr) != 2:
+ print('Bad stream option', stream_opt)
+ sys.exit(-1)
+
+ key = arr[0]
+ value = arr[1]
+
+ if key in ['width', 'height']:
+ value = int(value)
+ elif key == 'role':
+ rolemap = {
+ 'still': libcam.StreamRole.StillCapture,
+ 'raw': libcam.StreamRole.Raw,
+ 'video': libcam.StreamRole.VideoRecording,
+ 'viewfinder': libcam.StreamRole.Viewfinder,
+ }
+
+ role = rolemap.get(value.lower(), None)
+
+ if role is None:
+ print('Bad stream role', value)
+ sys.exit(-1)
+
+ value = role
+ elif key == 'pixelformat':
+ pass
+ else:
+ print('Bad stream option key', key)
+ sys.exit(-1)
+
+ stream_opts[key] = value
+
+ streams.append(stream_opts)
+
+ return streams
+
+ def configure(self):
+ streams = self.__parse_streams()
+
+ roles = [opts['role'] for opts in streams]
+
+ camconfig = self.camera.generate_configuration(roles)
+ if camconfig is None:
+ raise Exception('Generating config failed')
+
+ for idx, stream_opts in enumerate(streams):
+ stream_config = camconfig.at(idx)
+
+ if 'width' in stream_opts:
+ stream_config.size.width = stream_opts['width']
+
+ if 'height' in stream_opts:
+ stream_config.size.height = stream_opts['height']
+
+ if 'pixelformat' in stream_opts:
+ stream_config.pixel_format = libcam.PixelFormat(stream_opts['pixelformat'])
+
+ if self.opt_orientation is not None:
+ orientation_map = {
+ 'rot0': libcam.Orientation.Rotate0,
+ 'rot180': libcam.Orientation.Rotate180,
+ 'mirror': libcam.Orientation.Rotate0Mirror,
+ 'flip': libcam.Orientation.Rotate180Mirror,
+ }
+
+ orient = orientation_map.get(self.opt_orientation, None)
+ if orient is None:
+ print('Bad orientation: ', self.opt_orientation)
+ sys.exit(-1)
+
+ camconfig.orientation = orient
+
+ stat = camconfig.validate()
+
+ if stat == libcam.CameraConfiguration.Status.Invalid:
+ print('Camera configuration invalid')
+ exit(-1)
+ elif stat == libcam.CameraConfiguration.Status.Adjusted:
+ if self.opt_strict_formats:
+ print('Adjusting camera configuration disallowed by --strict-formats argument')
+ exit(-1)
+
+ print('Camera configuration adjusted')
+
+ self.camera.configure(camconfig)
+
+ self.stream_names = {}
+ self.streams = []
+
+ for idx, stream_config in enumerate(camconfig):
+ stream = stream_config.stream
+ self.streams.append(stream)
+ self.stream_names[stream] = 'stream' + str(idx)
+ print('{}-{}: stream config {}'.format(self.id, self.stream_names[stream], stream.configuration))
+
+ def alloc_buffers(self):
+ allocator = libcam.FrameBufferAllocator(self.camera)
+
+ for stream in self.streams:
+ allocated = allocator.allocate(stream)
+
+ print('{}-{}: Allocated {} buffers'.format(self.id, self.stream_names[stream], allocated))
+
+ self.allocator = allocator
+
+ def create_requests(self):
+ self.requests = []
+
+ # Identify the stream with the least number of buffers
+ num_bufs = min([len(self.allocator.buffers(stream)) for stream in self.streams])
+
+ requests = []
+
+ for buf_num in range(num_bufs):
+ request = self.camera.create_request(self.idx)
+
+ if request is None:
+ print('Can not create request')
+ exit(-1)
+
+ for stream in self.streams:
+ buffers = self.allocator.buffers(stream)
+ buffer = buffers[buf_num]
+
+ request.add_buffer(stream, buffer)
+
+ requests.append(request)
+
+ self.requests = requests
+
+ def start(self):
+ self.camera.start()
+
+ def stop(self):
+ self.camera.stop()
+
+ def queue_requests(self):
+ for request in self.requests:
+ self.camera.queue_request(request)
+ self.reqs_queued += 1
+
+ del self.requests
+
+
+class CaptureState:
+ cm: libcam.CameraManager
+ contexts: list[CameraContext]
+ renderer: Any
+
+ def __init__(self, cm, contexts):
+ self.cm = cm
+ self.contexts = contexts
+
+ # Called from renderer when there is a libcamera event
+ def event_handler(self):
+ try:
+ reqs = self.cm.get_ready_requests()
+
+ for req in reqs:
+ ctx = next(ctx for ctx in self.contexts if ctx.idx == req.cookie)
+ self.__request_handler(ctx, req)
+
+ running = any(ctx.reqs_completed < ctx.opt_capture for ctx in self.contexts)
+ return running
+ except Exception:
+ traceback.print_exc()
+ return False
+
+ def __request_handler(self, ctx, req):
+ if req.status != libcam.Request.Status.Complete:
+ raise Exception('{}: Request failed: {}'.format(ctx.id, req.status))
+
+ buffers = req.buffers
+
+ # Compute the frame rate. The timestamp is arbitrarily retrieved from
+ # the first buffer, as all buffers should have matching timestamps.
+ ts = buffers[next(iter(buffers))].metadata.timestamp
+ last = ctx.last
+ fps = 1000000000.0 / (ts - last) if (last != 0 and (ts - last) != 0) else 0
+ ctx.last = ts
+ ctx.fps = fps
+
+ if ctx.opt_metadata:
+ reqmeta = req.metadata
+ for ctrl, val in reqmeta.items():
+ print(f'\t{ctrl} = {val}')
+
+ for stream, fb in buffers.items():
+ stream_name = ctx.stream_names[stream]
+
+ crcs = []
+ if ctx.opt_crc:
+ with libcamera.utils.MappedFrameBuffer(fb) as mfb:
+ plane_crcs = [binascii.crc32(p) for p in mfb.planes]
+ crcs.append(plane_crcs)
+
+ meta = fb.metadata
+
+ print('{:.6f} ({:.2f} fps) {}-{}: seq {}, bytes {}, CRCs {}'
+ .format(ts / 1000000000, fps,
+ ctx.id, stream_name,
+ meta.sequence,
+ '/'.join([str(p.bytes_used) for p in meta.planes]),
+ crcs))
+
+ if ctx.opt_save_frames:
+ with libcamera.utils.MappedFrameBuffer(fb) as mfb:
+ filename = 'frame-{}-{}-{}.data'.format(ctx.id, stream_name, ctx.reqs_completed)
+ with open(filename, 'wb') as f:
+ for p in mfb.planes:
+ f.write(p)
+
+ self.renderer.request_handler(ctx, req)
+
+ ctx.reqs_completed += 1
+
+ # Called from renderer when it has finished with a request
+ def request_processed(self, ctx, req):
+ if ctx.reqs_queued < ctx.opt_capture:
+ req.reuse()
+ ctx.camera.queue_request(req)
+ ctx.reqs_queued += 1
+
+ def __capture_init(self):
+ for ctx in self.contexts:
+ ctx.acquire()
+
+ for ctx in self.contexts:
+ ctx.configure()
+
+ for ctx in self.contexts:
+ ctx.alloc_buffers()
+
+ for ctx in self.contexts:
+ ctx.create_requests()
+
+ def __capture_start(self):
+ for ctx in self.contexts:
+ ctx.start()
+
+ for ctx in self.contexts:
+ ctx.queue_requests()
+
+ def __capture_deinit(self):
+ for ctx in self.contexts:
+ ctx.stop()
+
+ for ctx in self.contexts:
+ ctx.release()
+
+ def do_cmd_capture(self):
+ self.__capture_init()
+
+ self.renderer.setup()
+
+ self.__capture_start()
+
+ self.renderer.run()
+
+ self.__capture_deinit()
+
+
+class CustomAction(argparse.Action):
+ def __init__(self, option_strings, dest, **kwargs):
+ super().__init__(option_strings, dest, default={}, **kwargs)
+
+ def __call__(self, parser, namespace, values, option_string=None):
+ if len(namespace.camera) == 0:
+ print(f'Option {option_string} requires a --camera context')
+ sys.exit(-1)
+
+ if self.type == bool:
+ values = True
+
+ current = namespace.camera[-1]
+
+ data = getattr(namespace, self.dest)
+
+ if self.nargs == '+':
+ if current not in data:
+ data[current] = []
+
+ data[current] += values
+ else:
+ data[current] = values
+
+
+def do_cmd_list(cm):
+ print('Available cameras:')
+
+ for idx, c in enumerate(cm.cameras):
+ print(f'{idx + 1}: {c.id}')
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ # global options
+ parser.add_argument('-l', '--list', action='store_true', help='List all cameras')
+ parser.add_argument('-c', '--camera', type=int, action='extend', nargs=1, default=[], help='Specify which camera to operate on, by index')
+ parser.add_argument('-p', '--list-properties', action='store_true', help='List cameras properties')
+ parser.add_argument('--list-controls', action='store_true', help='List cameras controls')
+ parser.add_argument('-I', '--info', action='store_true', help='Display information about stream(s)')
+ parser.add_argument('-R', '--renderer', default='null', help='Renderer (null, kms, qt, qtgl)')
+
+ # per camera options
+ parser.add_argument('-C', '--capture', nargs='?', type=int, const=1000000, action=CustomAction, help='Capture until interrupted by user or until CAPTURE frames captured')
+ parser.add_argument('--crc', nargs=0, type=bool, action=CustomAction, help='Print CRC32 for captured frames')
+ parser.add_argument('--save-frames', nargs=0, type=bool, action=CustomAction, help='Save captured frames to files')
+ parser.add_argument('--metadata', nargs=0, type=bool, action=CustomAction, help='Print the metadata for completed requests')
+ parser.add_argument('--strict-formats', type=bool, nargs=0, action=CustomAction, help='Do not allow requested stream format(s) to be adjusted')
+ parser.add_argument('-s', '--stream', nargs='+', action=CustomAction)
+ parser.add_argument('-o', '--orientation', help='Desired image orientation (rot0, rot180, mirror, flip)')
+ args = parser.parse_args()
+
+ cm = libcam.CameraManager.singleton()
+
+ if args.list:
+ do_cmd_list(cm)
+
+ contexts = []
+
+ for cam_idx in args.camera:
+ camera = next((c for i, c in enumerate(cm.cameras) if i + 1 == cam_idx), None)
+
+ if camera is None:
+ print('Unable to find camera', cam_idx)
+ return -1
+
+ ctx = CameraContext(camera, cam_idx)
+ ctx.opt_capture = args.capture.get(cam_idx, 0)
+ ctx.opt_crc = args.crc.get(cam_idx, False)
+ ctx.opt_save_frames = args.save_frames.get(cam_idx, False)
+ ctx.opt_metadata = args.metadata.get(cam_idx, False)
+ ctx.opt_strict_formats = args.strict_formats.get(cam_idx, False)
+ ctx.opt_stream = args.stream.get(cam_idx, ['role=viewfinder'])
+ ctx.opt_orientation = args.orientation
+ contexts.append(ctx)
+
+ for ctx in contexts:
+ print('Using camera {} as {}'.format(ctx.camera.id, ctx.id))
+
+ for ctx in contexts:
+ if args.list_properties:
+ ctx.do_cmd_list_props()
+ if args.list_controls:
+ ctx.do_cmd_list_controls()
+ if args.info:
+ ctx.do_cmd_info()
+
+ # Filter out capture contexts which are not marked for capture
+ contexts = [ctx for ctx in contexts if ctx.opt_capture > 0]
+
+ if contexts:
+ state = CaptureState(cm, contexts)
+
+ if args.renderer == 'null':
+ import cam_null
+ renderer = cam_null.NullRenderer(state)
+ elif args.renderer == 'kms':
+ import cam_kms
+ renderer = cam_kms.KMSRenderer(state)
+ elif args.renderer == 'qt':
+ import cam_qt
+ renderer = cam_qt.QtRenderer(state)
+ elif args.renderer == 'qtgl':
+ import cam_qtgl
+ renderer = cam_qtgl.QtRenderer(state)
+ else:
+ print('Bad renderer', args.renderer)
+ return -1
+
+ state.renderer = renderer
+
+ state.do_cmd_capture()
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/src/py/cam/cam_kms.py b/src/py/cam/cam_kms.py
new file mode 100644
index 00000000..38fc382d
--- /dev/null
+++ b/src/py/cam/cam_kms.py
@@ -0,0 +1,184 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+import pykms
+import selectors
+import sys
+
+
+class KMSRenderer:
+ def __init__(self, state):
+ self.state = state
+
+ self.cm = state.cm
+ self.contexts = state.contexts
+ self.running = False
+
+ card = pykms.Card()
+
+ res = pykms.ResourceManager(card)
+ conn = res.reserve_connector()
+ crtc = res.reserve_crtc(conn)
+ mode = conn.get_default_mode()
+ modeb = mode.to_blob(card)
+
+ req = pykms.AtomicReq(card)
+ req.add_connector(conn, crtc)
+ req.add_crtc(crtc, modeb)
+ r = req.commit_sync(allow_modeset=True)
+ assert(r == 0)
+
+ self.card = card
+ self.resman = res
+ self.crtc = crtc
+ self.mode = mode
+
+ self.bufqueue = []
+ self.current = None
+ self.next = None
+ self.cam_2_drm = {}
+
+ # KMS
+
+ def close(self):
+ req = pykms.AtomicReq(self.card)
+ for s in self.streams:
+ req.add_plane(s['plane'], None, None, dst=(0, 0, 0, 0))
+ req.commit()
+
+ def add_plane(self, req, stream, fb):
+ s = next(s for s in self.streams if s['stream'] == stream)
+ idx = s['idx']
+ plane = s['plane']
+
+ if idx % 2 == 0:
+ x = 0
+ else:
+ x = self.mode.hdisplay - fb.width
+
+ if idx // 2 == 0:
+ y = 0
+ else:
+ y = self.mode.vdisplay - fb.height
+
+ req.add_plane(plane, fb, self.crtc, dst=(x, y, fb.width, fb.height))
+
+ def apply_request(self, drmreq):
+
+ buffers = drmreq['camreq'].buffers
+
+ req = pykms.AtomicReq(self.card)
+
+ for stream, fb in buffers.items():
+ drmfb = self.cam_2_drm.get(fb, None)
+ self.add_plane(req, stream, drmfb)
+
+ req.commit()
+
+ def handle_page_flip(self, frame, time):
+ old = self.current
+ self.current = self.next
+
+ if len(self.bufqueue) > 0:
+ self.next = self.bufqueue.pop(0)
+ else:
+ self.next = None
+
+ if self.next:
+ drmreq = self.next
+
+ self.apply_request(drmreq)
+
+ if old:
+ req = old['camreq']
+ ctx = old['camctx']
+ self.state.request_processed(ctx, req)
+
+ def queue(self, drmreq):
+ if not self.next:
+ self.next = drmreq
+ self.apply_request(drmreq)
+ else:
+ self.bufqueue.append(drmreq)
+
+ # libcamera
+
+ def setup(self):
+ self.streams = []
+
+ idx = 0
+ for ctx in self.contexts:
+ for stream in ctx.streams:
+
+ cfg = stream.configuration
+ fmt = cfg.pixel_format
+ fmt = pykms.PixelFormat(fmt.fourcc)
+
+ plane = self.resman.reserve_generic_plane(self.crtc, fmt)
+ assert(plane is not None)
+
+ self.streams.append({
+ 'idx': idx,
+ 'stream': stream,
+ 'plane': plane,
+ 'fmt': fmt,
+ 'size': cfg.size,
+ })
+
+ for fb in ctx.allocator.buffers(stream):
+ w = cfg.size.width
+ h = cfg.size.height
+ fds = []
+ strides = []
+ offsets = []
+ for plane in fb.planes:
+ fds.append(plane.fd)
+ strides.append(cfg.stride)
+ offsets.append(plane.offset)
+
+ drmfb = pykms.DmabufFramebuffer(self.card, w, h, fmt,
+ fds, strides, offsets)
+ self.cam_2_drm[fb] = drmfb
+
+ idx += 1
+
+ def readdrm(self, fileobj):
+ for ev in self.card.read_events():
+ if ev.type == pykms.DrmEventType.FLIP_COMPLETE:
+ self.handle_page_flip(ev.seq, ev.time)
+
+ def readcam(self, fd):
+ self.running = self.state.event_handler()
+
+ def readkey(self, fileobj):
+ sys.stdin.readline()
+ self.running = False
+
+ def run(self):
+ print('Capturing...')
+
+ self.running = True
+
+ sel = selectors.DefaultSelector()
+ sel.register(self.card.fd, selectors.EVENT_READ, self.readdrm)
+ sel.register(self.cm.event_fd, selectors.EVENT_READ, self.readcam)
+ sel.register(sys.stdin, selectors.EVENT_READ, self.readkey)
+
+ print('Press enter to exit')
+
+ while self.running:
+ events = sel.select()
+ for key, mask in events:
+ callback = key.data
+ callback(key.fileobj)
+
+ print('Exiting...')
+
+ def request_handler(self, ctx, req):
+
+ drmreq = {
+ 'camctx': ctx,
+ 'camreq': req,
+ }
+
+ self.queue(drmreq)
diff --git a/src/py/cam/cam_null.py b/src/py/cam/cam_null.py
new file mode 100644
index 00000000..40dbd266
--- /dev/null
+++ b/src/py/cam/cam_null.py
@@ -0,0 +1,47 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+import selectors
+import sys
+
+
+class NullRenderer:
+ def __init__(self, state):
+ self.state = state
+
+ self.cm = state.cm
+ self.contexts = state.contexts
+
+ self.running = False
+
+ def setup(self):
+ pass
+
+ def run(self):
+ print('Capturing...')
+
+ self.running = True
+
+ sel = selectors.DefaultSelector()
+ sel.register(self.cm.event_fd, selectors.EVENT_READ, self.readcam)
+ sel.register(sys.stdin, selectors.EVENT_READ, self.readkey)
+
+ print('Press enter to exit')
+
+ while self.running:
+ events = sel.select()
+ for key, mask in events:
+ callback = key.data
+ callback(key.fileobj)
+
+ print('Exiting...')
+
+ def readcam(self, fd):
+ self.running = self.state.event_handler()
+
+ def readkey(self, fileobj):
+ sys.stdin.readline()
+ self.running = False
+
+ def request_handler(self, ctx, req):
+ self.state.request_processed(ctx, req)
diff --git a/src/py/cam/cam_qt.py b/src/py/cam/cam_qt.py
new file mode 100644
index 00000000..22d8c4da
--- /dev/null
+++ b/src/py/cam/cam_qt.py
@@ -0,0 +1,182 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from helpers import mfb_to_rgb
+from PyQt6 import QtCore, QtGui, QtWidgets
+import libcamera as libcam
+import libcamera.utils
+import sys
+
+
+# Loading MJPEG to a QPixmap produces corrupt JPEG data warnings. Ignore these.
+def qt_message_handler(msg_type, msg_log_context, msg_string):
+ if msg_string.startswith("Corrupt JPEG data"):
+ return
+
+ # For some reason qInstallMessageHandler returns None, so we won't
+ # call the old handler
+ if old_msg_handler is not None:
+ old_msg_handler(msg_type, msg_log_context, msg_string)
+ else:
+ print(msg_string)
+
+
+old_msg_handler = QtCore.qInstallMessageHandler(qt_message_handler)
+
+
+def rgb_to_pix(rgb):
+ w = rgb.shape[1]
+ h = rgb.shape[0]
+ qim = QtGui.QImage(rgb, w, h, QtGui.QImage.Format.Format_RGB888)
+ pix = QtGui.QPixmap.fromImage(qim)
+ return pix
+
+
+class QtRenderer:
+ def __init__(self, state):
+ self.state = state
+
+ self.cm = state.cm
+ self.contexts = state.contexts
+
+ def setup(self):
+ self.app = QtWidgets.QApplication([])
+
+ windows = []
+
+ for ctx in self.contexts:
+ for stream in ctx.streams:
+ window = MainWindow(ctx, stream)
+ window.show()
+ windows.append(window)
+
+ self.windows = windows
+
+ buf_mmap_map = {}
+
+ for ctx in self.contexts:
+ for stream in ctx.streams:
+ for buf in ctx.allocator.buffers(stream):
+ mfb = libcamera.utils.MappedFrameBuffer(buf).mmap()
+ buf_mmap_map[buf] = mfb
+
+ self.buf_mmap_map = buf_mmap_map
+
+ def run(self):
+ camnotif = QtCore.QSocketNotifier(self.cm.event_fd, QtCore.QSocketNotifier.Type.Read)
+ camnotif.activated.connect(lambda _: self.readcam())
+
+ keynotif = QtCore.QSocketNotifier(sys.stdin.fileno(), QtCore.QSocketNotifier.Type.Read)
+ keynotif.activated.connect(lambda _: self.readkey())
+
+ print('Capturing...')
+
+ self.app.exec()
+
+ print('Exiting...')
+
+ def readcam(self):
+ running = self.state.event_handler()
+
+ if not running:
+ self.app.quit()
+
+ def readkey(self):
+ sys.stdin.readline()
+ self.app.quit()
+
+ def request_handler(self, ctx, req):
+ buffers = req.buffers
+
+ for stream, fb in buffers.items():
+ wnd = next(wnd for wnd in self.windows if wnd.stream == stream)
+
+ mfb = self.buf_mmap_map[fb]
+
+ wnd.handle_request(stream, mfb)
+
+ self.state.request_processed(ctx, req)
+
+ def cleanup(self):
+ for w in self.windows:
+ w.close()
+
+
+class MainWindow(QtWidgets.QWidget):
+ def __init__(self, ctx, stream):
+ super().__init__()
+
+ self.ctx = ctx
+ self.stream = stream
+
+ self.label = QtWidgets.QLabel()
+
+ windowLayout = QtWidgets.QHBoxLayout()
+ self.setLayout(windowLayout)
+
+ windowLayout.addWidget(self.label)
+
+ controlsLayout = QtWidgets.QVBoxLayout()
+ windowLayout.addLayout(controlsLayout)
+
+ windowLayout.addStretch()
+
+ group = QtWidgets.QGroupBox('Info')
+ groupLayout = QtWidgets.QVBoxLayout()
+ group.setLayout(groupLayout)
+ controlsLayout.addWidget(group)
+
+ lab = QtWidgets.QLabel(ctx.id)
+ groupLayout.addWidget(lab)
+
+ self.frameLabel = QtWidgets.QLabel()
+ groupLayout.addWidget(self.frameLabel)
+
+ group = QtWidgets.QGroupBox('Properties')
+ groupLayout = QtWidgets.QVBoxLayout()
+ group.setLayout(groupLayout)
+ controlsLayout.addWidget(group)
+
+ camera = ctx.camera
+
+ for cid, cv in camera.properties.items():
+ lab = QtWidgets.QLabel()
+ lab.setText('{} = {}'.format(cid, cv))
+ groupLayout.addWidget(lab)
+
+ group = QtWidgets.QGroupBox('Controls')
+ groupLayout = QtWidgets.QVBoxLayout()
+ group.setLayout(groupLayout)
+ controlsLayout.addWidget(group)
+
+ for cid, cinfo in camera.controls.items():
+ lab = QtWidgets.QLabel()
+ lab.setText('{} = {}/{}/{}'
+ .format(cid, cinfo.min, cinfo.max, cinfo.default))
+ groupLayout.addWidget(lab)
+
+ controlsLayout.addStretch()
+
+ def buf_to_qpixmap(self, stream, mfb):
+ cfg = stream.configuration
+
+ if cfg.pixel_format == libcam.formats.MJPEG:
+ pix = QtGui.QPixmap(cfg.size.width, cfg.size.height)
+ pix.loadFromData(mfb.planes[0])
+ else:
+ rgb = mfb_to_rgb(mfb, cfg)
+ if rgb is None:
+ raise Exception('Format not supported: ' + cfg.pixel_format)
+
+ pix = rgb_to_pix(rgb)
+
+ return pix
+
+ def handle_request(self, stream, mfb):
+ ctx = self.ctx
+
+ pix = self.buf_to_qpixmap(stream, mfb)
+ self.label.setPixmap(pix)
+
+ self.frameLabel.setText('Queued: {}\nDone: {}\nFps: {:.2f}'
+ .format(ctx.reqs_queued, ctx.reqs_completed, ctx.fps))
diff --git a/src/py/cam/cam_qtgl.py b/src/py/cam/cam_qtgl.py
new file mode 100644
index 00000000..35b4b06b
--- /dev/null
+++ b/src/py/cam/cam_qtgl.py
@@ -0,0 +1,363 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from PyQt6 import QtCore, QtWidgets
+from PyQt6.QtCore import Qt
+
+import math
+import os
+import sys
+
+os.environ['PYOPENGL_PLATFORM'] = 'egl'
+
+from OpenGL.EGL.EXT.image_dma_buf_import import *
+from OpenGL.EGL.KHR.image import *
+from OpenGL.EGL.VERSION.EGL_1_0 import *
+from OpenGL.EGL.VERSION.EGL_1_2 import *
+from OpenGL.EGL.VERSION.EGL_1_3 import *
+
+from OpenGL.GLES2.OES.EGL_image import *
+from OpenGL.GLES2.OES.EGL_image_external import *
+from OpenGL.GLES2.VERSION.GLES2_2_0 import *
+from OpenGL.GLES3.VERSION.GLES3_3_0 import *
+
+from OpenGL.GL import shaders
+
+from gl_helpers import *
+
+
+class EglState:
+ def __init__(self):
+ self.create_display()
+ self.choose_config()
+ self.create_context()
+ self.check_extensions()
+
+ def create_display(self):
+ xdpy = getEGLNativeDisplay()
+ dpy = eglGetDisplay(xdpy)
+ self.display = dpy
+
+ def choose_config(self):
+ dpy = self.display
+
+ major, minor = EGLint(), EGLint()
+
+ b = eglInitialize(dpy, major, minor)
+ assert(b)
+
+ print('EGL {} {}'.format(
+ eglQueryString(dpy, EGL_VENDOR).decode(),
+ eglQueryString(dpy, EGL_VERSION).decode()))
+
+ check_egl_extensions(dpy, ['EGL_EXT_image_dma_buf_import'])
+
+ b = eglBindAPI(EGL_OPENGL_ES_API)
+ assert(b)
+
+ def print_config(dpy, cfg):
+
+ def getconf(a):
+ value = ctypes.c_long()
+ eglGetConfigAttrib(dpy, cfg, a, value)
+ return value.value
+
+ print('EGL Config {}: color buf {}/{}/{}/{} = {}, depth {}, stencil {}, native visualid {}, native visualtype {}'.format(
+ getconf(EGL_CONFIG_ID),
+ getconf(EGL_ALPHA_SIZE),
+ getconf(EGL_RED_SIZE),
+ getconf(EGL_GREEN_SIZE),
+ getconf(EGL_BLUE_SIZE),
+ getconf(EGL_BUFFER_SIZE),
+ getconf(EGL_DEPTH_SIZE),
+ getconf(EGL_STENCIL_SIZE),
+ getconf(EGL_NATIVE_VISUAL_ID),
+ getconf(EGL_NATIVE_VISUAL_TYPE)))
+
+ if False:
+ num_configs = ctypes.c_long()
+ eglGetConfigs(dpy, None, 0, num_configs)
+ print('{} configs'.format(num_configs.value))
+
+ configs = (EGLConfig * num_configs.value)()
+ eglGetConfigs(dpy, configs, num_configs.value, num_configs)
+ for config_id in configs:
+ print_config(dpy, config_id)
+
+ config_attribs = [
+ EGL_SURFACE_TYPE, EGL_WINDOW_BIT,
+ EGL_RED_SIZE, 8,
+ EGL_GREEN_SIZE, 8,
+ EGL_BLUE_SIZE, 8,
+ EGL_ALPHA_SIZE, 0,
+ EGL_RENDERABLE_TYPE, EGL_OPENGL_ES2_BIT,
+ EGL_NONE,
+ ]
+
+ n = EGLint()
+ configs = (EGLConfig * 1)()
+ b = eglChooseConfig(dpy, config_attribs, configs, 1, n)
+ assert(b and n.value == 1)
+ config = configs[0]
+
+ print('Chosen Config:')
+ print_config(dpy, config)
+
+ self.config = config
+
+ def create_context(self):
+ dpy = self.display
+
+ context_attribs = [
+ EGL_CONTEXT_CLIENT_VERSION, 2,
+ EGL_NONE,
+ ]
+
+ context = eglCreateContext(dpy, self.config, EGL_NO_CONTEXT, context_attribs)
+ assert(context)
+
+ b = eglMakeCurrent(dpy, EGL_NO_SURFACE, EGL_NO_SURFACE, context)
+ assert(b)
+
+ self.context = context
+
+ def check_extensions(self):
+ check_gl_extensions(['GL_OES_EGL_image'])
+
+ assert(eglCreateImageKHR)
+ assert(eglDestroyImageKHR)
+ assert(glEGLImageTargetTexture2DOES)
+
+
+class QtRenderer:
+ def __init__(self, state):
+ self.state = state
+
+ def setup(self):
+ self.app = QtWidgets.QApplication([])
+
+ window = MainWindow(self.state)
+ window.show()
+
+ self.window = window
+
+ def run(self):
+ camnotif = QtCore.QSocketNotifier(self.state.cm.event_fd, QtCore.QSocketNotifier.Type.Read)
+ camnotif.activated.connect(lambda _: self.readcam())
+
+ keynotif = QtCore.QSocketNotifier(sys.stdin.fileno(), QtCore.QSocketNotifier.Type.Read)
+ keynotif.activated.connect(lambda _: self.readkey())
+
+ print('Capturing...')
+
+ self.app.exec()
+
+ print('Exiting...')
+
+ def readcam(self):
+ running = self.state.event_handler()
+
+ if not running:
+ self.app.quit()
+
+ def readkey(self):
+ sys.stdin.readline()
+ self.app.quit()
+
+ def request_handler(self, ctx, req):
+ self.window.handle_request(ctx, req)
+
+ def cleanup(self):
+ self.window.close()
+
+
+class MainWindow(QtWidgets.QWidget):
+ def __init__(self, state):
+ super().__init__()
+
+ self.setAttribute(Qt.WidgetAttribute.WA_PaintOnScreen)
+ self.setAttribute(Qt.WidgetAttribute.WA_NativeWindow)
+
+ self.state = state
+
+ self.textures = {}
+ self.reqqueue = {}
+ self.current = {}
+
+ for ctx in self.state.contexts:
+
+ self.reqqueue[ctx.idx] = []
+ self.current[ctx.idx] = []
+
+ for stream in ctx.streams:
+ self.textures[stream] = None
+
+ num_tiles = len(self.textures)
+ self.num_columns = math.ceil(math.sqrt(num_tiles))
+ self.num_rows = math.ceil(num_tiles / self.num_columns)
+
+ self.egl = EglState()
+
+ self.surface = None
+
+ def paintEngine(self):
+ return None
+
+ def create_surface(self):
+ native_surface = c_void_p(self.winId().__int__())
+ surface = eglCreateWindowSurface(self.egl.display, self.egl.config,
+ native_surface, None)
+
+ b = eglMakeCurrent(self.egl.display, self.surface, self.surface, self.egl.context)
+ assert(b)
+
+ self.surface = surface
+
+ def init_gl(self):
+ self.create_surface()
+
+ vertShaderSrc = '''
+ attribute vec2 aPosition;
+ varying vec2 texcoord;
+
+ void main()
+ {
+ gl_Position = vec4(aPosition * 2.0 - 1.0, 0.0, 1.0);
+ texcoord.x = aPosition.x;
+ texcoord.y = 1.0 - aPosition.y;
+ }
+ '''
+ fragShaderSrc = '''
+ #extension GL_OES_EGL_image_external : enable
+ precision mediump float;
+ varying vec2 texcoord;
+ uniform samplerExternalOES texture;
+
+ void main()
+ {
+ gl_FragColor = texture2D(texture, texcoord);
+ }
+ '''
+
+ program = shaders.compileProgram(
+ shaders.compileShader(vertShaderSrc, GL_VERTEX_SHADER),
+ shaders.compileShader(fragShaderSrc, GL_FRAGMENT_SHADER)
+ )
+
+ glUseProgram(program)
+
+ glClearColor(0.5, 0.8, 0.7, 1.0)
+
+ vertPositions = [
+ 0.0, 0.0,
+ 1.0, 0.0,
+ 1.0, 1.0,
+ 0.0, 1.0
+ ]
+
+ inputAttrib = glGetAttribLocation(program, 'aPosition')
+ glVertexAttribPointer(inputAttrib, 2, GL_FLOAT, GL_FALSE, 0, vertPositions)
+ glEnableVertexAttribArray(inputAttrib)
+
+ def create_texture(self, stream, fb):
+ cfg = stream.configuration
+ fmt = cfg.pixel_format.fourcc
+ w = cfg.size.width
+ h = cfg.size.height
+
+ attribs = [
+ EGL_WIDTH, w,
+ EGL_HEIGHT, h,
+ EGL_LINUX_DRM_FOURCC_EXT, fmt,
+ EGL_DMA_BUF_PLANE0_FD_EXT, fb.planes[0].fd,
+ EGL_DMA_BUF_PLANE0_OFFSET_EXT, 0,
+ EGL_DMA_BUF_PLANE0_PITCH_EXT, cfg.stride,
+ EGL_NONE,
+ ]
+
+ image = eglCreateImageKHR(self.egl.display,
+ EGL_NO_CONTEXT,
+ EGL_LINUX_DMA_BUF_EXT,
+ None,
+ attribs)
+ assert(image)
+
+ textures = glGenTextures(1)
+ glBindTexture(GL_TEXTURE_EXTERNAL_OES, textures)
+ glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_MAG_FILTER, GL_LINEAR)
+ glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_MIN_FILTER, GL_LINEAR)
+ glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_WRAP_S, GL_CLAMP_TO_EDGE)
+ glTexParameteri(GL_TEXTURE_EXTERNAL_OES, GL_TEXTURE_WRAP_T, GL_CLAMP_TO_EDGE)
+ glEGLImageTargetTexture2DOES(GL_TEXTURE_EXTERNAL_OES, image)
+
+ return textures
+
+ def resizeEvent(self, event):
+ size = event.size()
+
+ print('Resize', size)
+
+ super().resizeEvent(event)
+
+ if self.surface is None:
+ return
+
+ glViewport(0, 0, size.width() // 2, size.height())
+
+ def paintEvent(self, event):
+ if self.surface is None:
+ self.init_gl()
+
+ for ctx_idx, queue in self.reqqueue.items():
+ if len(queue) == 0:
+ continue
+
+ ctx = next(ctx for ctx in self.state.contexts if ctx.idx == ctx_idx)
+
+ if self.current[ctx_idx]:
+ old = self.current[ctx_idx]
+ self.current[ctx_idx] = None
+ self.state.request_processed(ctx, old)
+
+ next_req = queue.pop(0)
+ self.current[ctx_idx] = next_req
+
+ stream, fb = next(iter(next_req.buffers.items()))
+
+ self.textures[stream] = self.create_texture(stream, fb)
+
+ self.paint_gl()
+
+ def paint_gl(self):
+ b = eglMakeCurrent(self.egl.display, self.surface, self.surface, self.egl.context)
+ assert(b)
+
+ glClear(GL_COLOR_BUFFER_BIT)
+
+ size = self.size()
+
+ for idx, ctx in enumerate(self.state.contexts):
+ for stream in ctx.streams:
+ if self.textures[stream] is None:
+ continue
+
+ w = size.width() // self.num_columns
+ h = size.height() // self.num_rows
+
+ x = idx % self.num_columns
+ y = idx // self.num_columns
+
+ x *= w
+ y *= h
+
+ glViewport(x, y, w, h)
+
+ glBindTexture(GL_TEXTURE_EXTERNAL_OES, self.textures[stream])
+ glDrawArrays(GL_TRIANGLE_FAN, 0, 4)
+
+ b = eglSwapBuffers(self.egl.display, self.surface)
+ assert(b)
+
+ def handle_request(self, ctx, req):
+ self.reqqueue[ctx.idx].append(req)
+ self.update()
diff --git a/src/py/cam/gl_helpers.py b/src/py/cam/gl_helpers.py
new file mode 100644
index 00000000..53b3e9df
--- /dev/null
+++ b/src/py/cam/gl_helpers.py
@@ -0,0 +1,66 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from OpenGL.EGL.VERSION.EGL_1_0 import EGLNativeDisplayType, eglGetProcAddress, eglQueryString, EGL_EXTENSIONS
+
+from OpenGL.raw.GLES2 import _types as _cs
+from OpenGL.GLES2.VERSION.GLES2_2_0 import *
+from OpenGL.GLES3.VERSION.GLES3_3_0 import *
+from OpenGL import GL as gl
+
+from ctypes import c_int, c_char_p, c_void_p, cdll, POINTER, util, \
+ pointer, CFUNCTYPE, c_bool
+
+
+def getEGLNativeDisplay():
+ _x11lib = cdll.LoadLibrary(util.find_library('X11'))
+ XOpenDisplay = _x11lib.XOpenDisplay
+ XOpenDisplay.argtypes = [c_char_p]
+ XOpenDisplay.restype = POINTER(EGLNativeDisplayType)
+
+ return XOpenDisplay(None)
+
+
+# Hack. PyOpenGL doesn't seem to manage to find glEGLImageTargetTexture2DOES.
+def getglEGLImageTargetTexture2DOES():
+ funcptr = eglGetProcAddress('glEGLImageTargetTexture2DOES')
+ prototype = CFUNCTYPE(None, _cs.GLenum, _cs.GLeglImageOES)
+ return prototype(funcptr)
+
+
+glEGLImageTargetTexture2DOES = getglEGLImageTargetTexture2DOES()
+
+
+def get_gl_extensions():
+ n = GLint()
+ glGetIntegerv(GL_NUM_EXTENSIONS, n)
+ gl_extensions = []
+ for i in range(n.value):
+ gl_extensions.append(gl.glGetStringi(GL_EXTENSIONS, i).decode())
+ return gl_extensions
+
+
+def check_gl_extensions(required_extensions):
+ extensions = get_gl_extensions()
+
+ if False:
+ print('GL EXTENSIONS: ', ' '.join(extensions))
+
+ for ext in required_extensions:
+ if ext not in extensions:
+ raise Exception(ext + ' missing')
+
+
+def get_egl_extensions(egl_display):
+ return eglQueryString(egl_display, EGL_EXTENSIONS).decode().split(' ')
+
+
+def check_egl_extensions(egl_display, required_extensions):
+ extensions = get_egl_extensions(egl_display)
+
+ if False:
+ print('EGL EXTENSIONS: ', ' '.join(extensions))
+
+ for ext in required_extensions:
+ if ext not in extensions:
+ raise Exception(ext + ' missing')
diff --git a/src/py/cam/helpers.py b/src/py/cam/helpers.py
new file mode 100644
index 00000000..2d906667
--- /dev/null
+++ b/src/py/cam/helpers.py
@@ -0,0 +1,158 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+#
+# Debayering code from PiCamera documentation
+
+from numpy.lib.stride_tricks import as_strided
+import libcamera as libcam
+import libcamera.utils
+import numpy as np
+
+
+def demosaic(data, r0, g0, g1, b0):
+ # Separate the components from the Bayer data to RGB planes
+
+ rgb = np.zeros(data.shape + (3,), dtype=data.dtype)
+ rgb[r0[1]::2, r0[0]::2, 0] = data[r0[1]::2, r0[0]::2] # Red
+ rgb[g0[1]::2, g0[0]::2, 1] = data[g0[1]::2, g0[0]::2] # Green
+ rgb[g1[1]::2, g1[0]::2, 1] = data[g1[1]::2, g1[0]::2] # Green
+ rgb[b0[1]::2, b0[0]::2, 2] = data[b0[1]::2, b0[0]::2] # Blue
+
+ # Below we present a fairly naive de-mosaic method that simply
+ # calculates the weighted average of a pixel based on the pixels
+ # surrounding it. The weighting is provided by a byte representation of
+ # the Bayer filter which we construct first:
+
+ bayer = np.zeros(rgb.shape, dtype=np.uint8)
+ bayer[r0[1]::2, r0[0]::2, 0] = 1 # Red
+ bayer[g0[1]::2, g0[0]::2, 1] = 1 # Green
+ bayer[g1[1]::2, g1[0]::2, 1] = 1 # Green
+ bayer[b0[1]::2, b0[0]::2, 2] = 1 # Blue
+
+ # Allocate an array to hold our output with the same shape as the input
+ # data. After this we define the size of window that will be used to
+ # calculate each weighted average (3x3). Then we pad out the rgb and
+ # bayer arrays, adding blank pixels at their edges to compensate for the
+ # size of the window when calculating averages for edge pixels.
+
+ output = np.empty(rgb.shape, dtype=rgb.dtype)
+ window = (3, 3)
+ borders = (window[0] - 1, window[1] - 1)
+ border = (borders[0] // 2, borders[1] // 2)
+
+ rgb = np.pad(rgb, [
+ (border[0], border[0]),
+ (border[1], border[1]),
+ (0, 0),
+ ], 'constant')
+ bayer = np.pad(bayer, [
+ (border[0], border[0]),
+ (border[1], border[1]),
+ (0, 0),
+ ], 'constant')
+
+ # For each plane in the RGB data, we use a nifty numpy trick
+ # (as_strided) to construct a view over the plane of 3x3 matrices. We do
+ # the same for the bayer array, then use Einstein summation on each
+ # (np.sum is simpler, but copies the data so it's slower), and divide
+ # the results to get our weighted average:
+
+ for plane in range(3):
+ p = rgb[..., plane]
+ b = bayer[..., plane]
+ pview = as_strided(p, shape=(
+ p.shape[0] - borders[0],
+ p.shape[1] - borders[1]) + window, strides=p.strides * 2)
+ bview = as_strided(b, shape=(
+ b.shape[0] - borders[0],
+ b.shape[1] - borders[1]) + window, strides=b.strides * 2)
+ psum = np.einsum('ijkl->ij', pview)
+ bsum = np.einsum('ijkl->ij', bview)
+ output[..., plane] = psum // bsum
+
+ return output
+
+
+def to_rgb(fmt, size, data):
+ w = size.width
+ h = size.height
+
+ if fmt == libcam.formats.YUYV:
+ # YUV422
+ yuyv = data.reshape((h, w // 2 * 4))
+
+ # YUV444
+ yuv = np.empty((h, w, 3), dtype=np.uint8)
+ yuv[:, :, 0] = yuyv[:, 0::2] # Y
+ yuv[:, :, 1] = yuyv[:, 1::4].repeat(2, axis=1) # U
+ yuv[:, :, 2] = yuyv[:, 3::4].repeat(2, axis=1) # V
+
+ m = np.array([
+ [1.0, 1.0, 1.0],
+ [-0.000007154783816076815, -0.3441331386566162, 1.7720025777816772],
+ [1.4019975662231445, -0.7141380310058594, 0.00001542569043522235]
+ ])
+
+ rgb = np.dot(yuv, m)
+ rgb[:, :, 0] -= 179.45477266423404
+ rgb[:, :, 1] += 135.45870971679688
+ rgb[:, :, 2] -= 226.8183044444304
+ rgb = rgb.astype(np.uint8)
+
+ elif fmt == libcam.formats.RGB888:
+ rgb = data.reshape((h, w, 3))
+ rgb[:, :, [0, 1, 2]] = rgb[:, :, [2, 1, 0]]
+
+ elif fmt == libcam.formats.BGR888:
+ rgb = data.reshape((h, w, 3))
+
+ elif fmt in [libcam.formats.ARGB8888, libcam.formats.XRGB8888]:
+ rgb = data.reshape((h, w, 4))
+ rgb = np.flip(rgb, axis=2)
+ # drop alpha component
+ rgb = np.delete(rgb, np.s_[0::4], axis=2)
+
+ elif str(fmt).startswith('S'):
+ fmt = str(fmt)
+ bayer_pattern = fmt[1:5]
+ bitspp = int(fmt[5:])
+
+ if bitspp == 8:
+ data = data.reshape((h, w))
+ data = data.astype(np.uint16)
+ elif bitspp in [10, 12]:
+ data = data.view(np.uint16)
+ data = data.reshape((h, w))
+ else:
+ raise Exception('Bad bitspp:' + str(bitspp))
+
+ idx = bayer_pattern.find('R')
+ assert(idx != -1)
+ r0 = (idx % 2, idx // 2)
+
+ idx = bayer_pattern.find('G')
+ assert(idx != -1)
+ g0 = (idx % 2, idx // 2)
+
+ idx = bayer_pattern.find('G', idx + 1)
+ assert(idx != -1)
+ g1 = (idx % 2, idx // 2)
+
+ idx = bayer_pattern.find('B')
+ assert(idx != -1)
+ b0 = (idx % 2, idx // 2)
+
+ rgb = demosaic(data, r0, g0, g1, b0)
+ rgb = (rgb >> (bitspp - 8)).astype(np.uint8)
+
+ else:
+ rgb = None
+
+ return rgb
+
+
+# A naive format conversion to 24-bit RGB
+def mfb_to_rgb(mfb: libcamera.utils.MappedFrameBuffer, cfg: libcam.StreamConfiguration):
+ data = np.array(mfb.planes[0], dtype=np.uint8)
+ rgb = to_rgb(cfg.pixel_format, cfg.size, data)
+ return rgb