summaryrefslogtreecommitdiff
path: root/src/py/cam/cam.py
diff options
context:
space:
mode:
authorTomi Valkeinen <tomi.valkeinen@ideasonboard.com>2022-05-09 13:10:23 +0300
committerKieran Bingham <kieran.bingham@ideasonboard.com>2022-05-10 13:53:43 +0200
commit74ba01121a61bd02bc8f09abbf3fd04db3561ab0 (patch)
tree7b31e06de6a00c0580733ffa7befe5ac6ac98fe8 /src/py/cam/cam.py
parent06cb7130c4fadac3bde6e695b0a6842656c9a5d4 (diff)
py: Add cam.py
Add cam.py, which mimics the 'cam' tool. Four rendering backends are added: * null - Do nothing * kms - Use KMS with dmabufs * qt - SW render on a Qt window * qtgl - OpenGL render on a Qt window All the renderers handle only a few pixel formats, and especially the GL renderer is just a prototype. Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com> Reviewed-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> Signed-off-by: Kieran Bingham <kieran.bingham@ideasonboard.com>
Diffstat (limited to 'src/py/cam/cam.py')
-rwxr-xr-xsrc/py/cam/cam.py475
1 files changed, 475 insertions, 0 deletions
diff --git a/src/py/cam/cam.py b/src/py/cam/cam.py
new file mode 100755
index 00000000..012b191c
--- /dev/null
+++ b/src/py/cam/cam.py
@@ -0,0 +1,475 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+# \todo Convert ctx and state dicts to proper classes, and move relevant
+# functions to those classes.
+
+import argparse
+import binascii
+import libcamera as libcam
+import os
+import sys
+
+
+class CustomAction(argparse.Action):
+ def __init__(self, option_strings, dest, **kwargs):
+ super().__init__(option_strings, dest, default={}, **kwargs)
+
+ def __call__(self, parser, namespace, values, option_string=None):
+ if len(namespace.camera) == 0:
+ print(f'Option {option_string} requires a --camera context')
+ sys.exit(-1)
+
+ if self.type == bool:
+ values = True
+
+ current = namespace.camera[-1]
+
+ data = getattr(namespace, self.dest)
+
+ if self.nargs == '+':
+ if current not in data:
+ data[current] = []
+
+ data[current] += values
+ else:
+ data[current] = values
+
+
+def do_cmd_list(cm):
+ print('Available cameras:')
+
+ for idx, c in enumerate(cm.cameras):
+ print(f'{idx + 1}: {c.id}')
+
+
+def do_cmd_list_props(ctx):
+ camera = ctx['camera']
+
+ print('Properties for', ctx['id'])
+
+ for name, prop in camera.properties.items():
+ print('\t{}: {}'.format(name, prop))
+
+
+def do_cmd_list_controls(ctx):
+ camera = ctx['camera']
+
+ print('Controls for', ctx['id'])
+
+ for name, prop in camera.controls.items():
+ print('\t{}: {}'.format(name, prop))
+
+
+def do_cmd_info(ctx):
+ camera = ctx['camera']
+
+ print('Stream info for', ctx['id'])
+
+ roles = [libcam.StreamRole.Viewfinder]
+
+ camconfig = camera.generate_configuration(roles)
+ if camconfig is None:
+ raise Exception('Generating config failed')
+
+ for i, stream_config in enumerate(camconfig):
+ print('\t{}: {}'.format(i, stream_config))
+
+ formats = stream_config.formats
+ for fmt in formats.pixel_formats:
+ print('\t * Pixelformat:', fmt, formats.range(fmt))
+
+ for size in formats.sizes(fmt):
+ print('\t -', size)
+
+
+def acquire(ctx):
+ camera = ctx['camera']
+
+ camera.acquire()
+
+
+def release(ctx):
+ camera = ctx['camera']
+
+ camera.release()
+
+
+def parse_streams(ctx):
+ streams = []
+
+ for stream_desc in ctx['opt-stream']:
+ stream_opts = {'role': libcam.StreamRole.Viewfinder}
+
+ for stream_opt in stream_desc.split(','):
+ if stream_opt == 0:
+ continue
+
+ arr = stream_opt.split('=')
+ if len(arr) != 2:
+ print('Bad stream option', stream_opt)
+ sys.exit(-1)
+
+ key = arr[0]
+ value = arr[1]
+
+ if key in ['width', 'height']:
+ value = int(value)
+ elif key == 'role':
+ rolemap = {
+ 'still': libcam.StreamRole.StillCapture,
+ 'raw': libcam.StreamRole.Raw,
+ 'video': libcam.StreamRole.VideoRecording,
+ 'viewfinder': libcam.StreamRole.Viewfinder,
+ }
+
+ role = rolemap.get(value.lower(), None)
+
+ if role is None:
+ print('Bad stream role', value)
+ sys.exit(-1)
+
+ value = role
+ elif key == 'pixelformat':
+ pass
+ else:
+ print('Bad stream option key', key)
+ sys.exit(-1)
+
+ stream_opts[key] = value
+
+ streams.append(stream_opts)
+
+ return streams
+
+
+def configure(ctx):
+ camera = ctx['camera']
+
+ streams = parse_streams(ctx)
+
+ roles = [opts['role'] for opts in streams]
+
+ camconfig = camera.generate_configuration(roles)
+ if camconfig is None:
+ raise Exception('Generating config failed')
+
+ for idx, stream_opts in enumerate(streams):
+ stream_config = camconfig.at(idx)
+
+ if 'width' in stream_opts and 'height' in stream_opts:
+ stream_config.size = (stream_opts['width'], stream_opts['height'])
+
+ if 'pixelformat' in stream_opts:
+ stream_config.pixel_format = stream_opts['pixelformat']
+
+ stat = camconfig.validate()
+
+ if stat == libcam.CameraConfiguration.Status.Invalid:
+ print('Camera configuration invalid')
+ exit(-1)
+ elif stat == libcam.CameraConfiguration.Status.Adjusted:
+ if ctx['opt-strict-formats']:
+ print('Adjusting camera configuration disallowed by --strict-formats argument')
+ exit(-1)
+
+ print('Camera configuration adjusted')
+
+ r = camera.configure(camconfig)
+ if r != 0:
+ raise Exception('Configure failed')
+
+ ctx['stream-names'] = {}
+ ctx['streams'] = []
+
+ for idx, stream_config in enumerate(camconfig):
+ stream = stream_config.stream
+ ctx['streams'].append(stream)
+ ctx['stream-names'][stream] = 'stream' + str(idx)
+ print('{}-{}: stream config {}'.format(ctx['id'], ctx['stream-names'][stream], stream.configuration))
+
+
+def alloc_buffers(ctx):
+ camera = ctx['camera']
+
+ allocator = libcam.FrameBufferAllocator(camera)
+
+ for idx, stream in enumerate(ctx['streams']):
+ ret = allocator.allocate(stream)
+ if ret < 0:
+ print('Cannot allocate buffers')
+ exit(-1)
+
+ allocated = len(allocator.buffers(stream))
+
+ print('{}-{}: Allocated {} buffers'.format(ctx['id'], ctx['stream-names'][stream], allocated))
+
+ ctx['allocator'] = allocator
+
+
+def create_requests(ctx):
+ camera = ctx['camera']
+
+ ctx['requests'] = []
+
+ # Identify the stream with the least number of buffers
+ num_bufs = min([len(ctx['allocator'].buffers(stream)) for stream in ctx['streams']])
+
+ requests = []
+
+ for buf_num in range(num_bufs):
+ request = camera.create_request(ctx['idx'])
+
+ if request is None:
+ print('Can not create request')
+ exit(-1)
+
+ for stream in ctx['streams']:
+ buffers = ctx['allocator'].buffers(stream)
+ buffer = buffers[buf_num]
+
+ ret = request.add_buffer(stream, buffer)
+ if ret < 0:
+ print('Can not set buffer for request')
+ exit(-1)
+
+ requests.append(request)
+
+ ctx['requests'] = requests
+
+
+def start(ctx):
+ camera = ctx['camera']
+
+ camera.start()
+
+
+def stop(ctx):
+ camera = ctx['camera']
+
+ camera.stop()
+
+
+def queue_requests(ctx):
+ camera = ctx['camera']
+
+ for request in ctx['requests']:
+ camera.queue_request(request)
+ ctx['reqs-queued'] += 1
+
+ del ctx['requests']
+
+
+def capture_init(contexts):
+ for ctx in contexts:
+ acquire(ctx)
+
+ for ctx in contexts:
+ configure(ctx)
+
+ for ctx in contexts:
+ alloc_buffers(ctx)
+
+ for ctx in contexts:
+ create_requests(ctx)
+
+
+def capture_start(contexts):
+ for ctx in contexts:
+ start(ctx)
+
+ for ctx in contexts:
+ queue_requests(ctx)
+
+
+# Called from renderer when there is a libcamera event
+def event_handler(state):
+ cm = state['cm']
+ contexts = state['contexts']
+
+ os.read(cm.efd, 8)
+
+ reqs = cm.get_ready_requests()
+
+ for req in reqs:
+ ctx = next(ctx for ctx in contexts if ctx['idx'] == req.cookie)
+ request_handler(state, ctx, req)
+
+ running = any(ctx['reqs-completed'] < ctx['opt-capture'] for ctx in contexts)
+ return running
+
+
+def request_handler(state, ctx, req):
+ if req.status != libcam.Request.Status.Complete:
+ raise Exception('{}: Request failed: {}'.format(ctx['id'], req.status))
+
+ buffers = req.buffers
+
+ # Compute the frame rate. The timestamp is arbitrarily retrieved from
+ # the first buffer, as all buffers should have matching timestamps.
+ ts = buffers[next(iter(buffers))].metadata.timestamp
+ last = ctx.get('last', 0)
+ fps = 1000000000.0 / (ts - last) if (last != 0 and (ts - last) != 0) else 0
+ ctx['last'] = ts
+ ctx['fps'] = fps
+
+ for stream, fb in buffers.items():
+ stream_name = ctx['stream-names'][stream]
+
+ crcs = []
+ if ctx['opt-crc']:
+ with fb.mmap() as mfb:
+ plane_crcs = [binascii.crc32(p) for p in mfb.planes]
+ crcs.append(plane_crcs)
+
+ meta = fb.metadata
+
+ print('{:.6f} ({:.2f} fps) {}-{}: seq {}, bytes {}, CRCs {}'
+ .format(ts / 1000000000, fps,
+ ctx['id'], stream_name,
+ meta.sequence, meta.bytesused,
+ crcs))
+
+ if ctx['opt-metadata']:
+ reqmeta = req.metadata
+ for ctrl, val in reqmeta.items():
+ print(f'\t{ctrl} = {val}')
+
+ if ctx['opt-save-frames']:
+ with fb.mmap() as mfb:
+ filename = 'frame-{}-{}-{}.data'.format(ctx['id'], stream_name, ctx['reqs-completed'])
+ with open(filename, 'wb') as f:
+ for p in mfb.planes:
+ f.write(p)
+
+ state['renderer'].request_handler(ctx, req)
+
+ ctx['reqs-completed'] += 1
+
+
+# Called from renderer when it has finished with a request
+def request_prcessed(ctx, req):
+ camera = ctx['camera']
+
+ if ctx['reqs-queued'] < ctx['opt-capture']:
+ req.reuse()
+ camera.queue_request(req)
+ ctx['reqs-queued'] += 1
+
+
+def capture_deinit(contexts):
+ for ctx in contexts:
+ stop(ctx)
+
+ for ctx in contexts:
+ release(ctx)
+
+
+def do_cmd_capture(state):
+ capture_init(state['contexts'])
+
+ renderer = state['renderer']
+
+ renderer.setup()
+
+ capture_start(state['contexts'])
+
+ renderer.run()
+
+ capture_deinit(state['contexts'])
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ # global options
+ parser.add_argument('-l', '--list', action='store_true', help='List all cameras')
+ parser.add_argument('-c', '--camera', type=int, action='extend', nargs=1, default=[], help='Specify which camera to operate on, by index')
+ parser.add_argument('-p', '--list-properties', action='store_true', help='List cameras properties')
+ parser.add_argument('--list-controls', action='store_true', help='List cameras controls')
+ parser.add_argument('-I', '--info', action='store_true', help='Display information about stream(s)')
+ parser.add_argument('-R', '--renderer', default='null', help='Renderer (null, kms, qt, qtgl)')
+
+ # per camera options
+ parser.add_argument('-C', '--capture', nargs='?', type=int, const=1000000, action=CustomAction, help='Capture until interrupted by user or until CAPTURE frames captured')
+ parser.add_argument('--crc', nargs=0, type=bool, action=CustomAction, help='Print CRC32 for captured frames')
+ parser.add_argument('--save-frames', nargs=0, type=bool, action=CustomAction, help='Save captured frames to files')
+ parser.add_argument('--metadata', nargs=0, type=bool, action=CustomAction, help='Print the metadata for completed requests')
+ parser.add_argument('--strict-formats', type=bool, nargs=0, action=CustomAction, help='Do not allow requested stream format(s) to be adjusted')
+ parser.add_argument('-s', '--stream', nargs='+', action=CustomAction)
+ args = parser.parse_args()
+
+ cm = libcam.CameraManager.singleton()
+
+ if args.list:
+ do_cmd_list(cm)
+
+ contexts = []
+
+ for cam_idx in args.camera:
+ camera = next((c for i, c in enumerate(cm.cameras) if i + 1 == cam_idx), None)
+
+ if camera is None:
+ print('Unable to find camera', cam_idx)
+ return -1
+
+ contexts.append({
+ 'camera': camera,
+ 'idx': cam_idx,
+ 'id': 'cam' + str(cam_idx),
+ 'reqs-queued': 0,
+ 'reqs-completed': 0,
+ 'opt-capture': args.capture.get(cam_idx, False),
+ 'opt-crc': args.crc.get(cam_idx, False),
+ 'opt-save-frames': args.save_frames.get(cam_idx, False),
+ 'opt-metadata': args.metadata.get(cam_idx, False),
+ 'opt-strict-formats': args.strict_formats.get(cam_idx, False),
+ 'opt-stream': args.stream.get(cam_idx, ['role=viewfinder']),
+ })
+
+ for ctx in contexts:
+ print('Using camera {} as {}'.format(ctx['camera'].id, ctx['id']))
+
+ for ctx in contexts:
+ if args.list_properties:
+ do_cmd_list_props(ctx)
+ if args.list_controls:
+ do_cmd_list_controls(ctx)
+ if args.info:
+ do_cmd_info(ctx)
+
+ if args.capture:
+
+ state = {
+ 'cm': cm,
+ 'contexts': contexts,
+ 'event_handler': event_handler,
+ 'request_prcessed': request_prcessed,
+ }
+
+ if args.renderer == 'null':
+ import cam_null
+ renderer = cam_null.NullRenderer(state)
+ elif args.renderer == 'kms':
+ import cam_kms
+ renderer = cam_kms.KMSRenderer(state)
+ elif args.renderer == 'qt':
+ import cam_qt
+ renderer = cam_qt.QtRenderer(state)
+ elif args.renderer == 'qtgl':
+ import cam_qtgl
+ renderer = cam_qtgl.QtRenderer(state)
+ else:
+ print('Bad renderer', args.renderer)
+ return -1
+
+ state['renderer'] = renderer
+
+ do_cmd_capture(state)
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())