diff options
Diffstat (limited to 'src/py/cam/cam.py')
-rwxr-xr-x | src/py/cam/cam.py | 580 |
1 files changed, 281 insertions, 299 deletions
diff --git a/src/py/cam/cam.py b/src/py/cam/cam.py index 64f67e86..c8ffb084 100755 --- a/src/py/cam/cam.py +++ b/src/py/cam/cam.py @@ -6,6 +6,7 @@ # \todo Convert ctx and state dicts to proper classes, and move relevant # functions to those classes. +from typing import Any import argparse import binascii import libcamera as libcam @@ -14,379 +15,371 @@ import sys import traceback -class CustomAction(argparse.Action): - def __init__(self, option_strings, dest, **kwargs): - super().__init__(option_strings, dest, default={}, **kwargs) - - def __call__(self, parser, namespace, values, option_string=None): - if len(namespace.camera) == 0: - print(f'Option {option_string} requires a --camera context') - sys.exit(-1) - - if self.type == bool: - values = True - - current = namespace.camera[-1] - - data = getattr(namespace, self.dest) - - if self.nargs == '+': - if current not in data: - data[current] = [] - - data[current] += values - else: - data[current] = values - - -def do_cmd_list(cm): - print('Available cameras:') - - for idx, c in enumerate(cm.cameras): - print(f'{idx + 1}: {c.id}') - - -def do_cmd_list_props(ctx): - camera = ctx['camera'] - - print('Properties for', ctx['id']) +class CameraContext: + camera: libcam.Camera + id: str + idx: int - for name, prop in camera.properties.items(): - print('\t{}: {}'.format(name, prop)) + opt_stream: str + opt_strict_formats: bool + opt_crc: bool + opt_metadata: bool + opt_save_frames: bool + opt_capture: int + stream_names: dict[libcam.Stream, str] + streams: list[libcam.Stream] + allocator: libcam.FrameBufferAllocator + requests: list[libcam.Request] + reqs_queued: int + reqs_completed: int + last: int = 0 + fps: float -def do_cmd_list_controls(ctx): - camera = ctx['camera'] + def __init__(self, camera, idx): + self.camera = camera + self.idx = idx + self.id = 'cam' + str(idx) + self.reqs_queued = 0 + self.reqs_completed = 0 - print('Controls for', ctx['id']) + def do_cmd_list_props(self): + print('Properties for', self.id) - for name, prop in camera.controls.items(): - print('\t{}: {}'.format(name, prop)) + for name, prop in self.camera.properties.items(): + print('\t{}: {}'.format(name, prop)) + def do_cmd_list_controls(self): + print('Controls for', self.id) -def do_cmd_info(ctx): - camera = ctx['camera'] + for name, prop in self.camera.controls.items(): + print('\t{}: {}'.format(name, prop)) - print('Stream info for', ctx['id']) + def do_cmd_info(self): + print('Stream info for', self.id) - roles = [libcam.StreamRole.Viewfinder] + roles = [libcam.StreamRole.Viewfinder] - camconfig = camera.generate_configuration(roles) - if camconfig is None: - raise Exception('Generating config failed') + camconfig = self.camera.generate_configuration(roles) + if camconfig is None: + raise Exception('Generating config failed') - for i, stream_config in enumerate(camconfig): - print('\t{}: {}'.format(i, stream_config)) + for i, stream_config in enumerate(camconfig): + print('\t{}: {}'.format(i, stream_config)) - formats = stream_config.formats - for fmt in formats.pixel_formats: - print('\t * Pixelformat:', fmt, formats.range(fmt)) + formats = stream_config.formats + for fmt in formats.pixel_formats: + print('\t * Pixelformat:', fmt, formats.range(fmt)) - for size in formats.sizes(fmt): - print('\t -', size) + for size in formats.sizes(fmt): + print('\t -', size) + def acquire(self): + self.camera.acquire() -def acquire(ctx): - camera = ctx['camera'] + def release(self): + self.camera.release() - camera.acquire() + def __parse_streams(self): + streams = [] + for stream_desc in self.opt_stream: + stream_opts: dict[str, Any] + stream_opts = {'role': libcam.StreamRole.Viewfinder} -def release(ctx): - camera = ctx['camera'] + for stream_opt in stream_desc.split(','): + if stream_opt == 0: + continue - camera.release() - - -def parse_streams(ctx): - streams = [] - - for stream_desc in ctx['opt-stream']: - stream_opts = {'role': libcam.StreamRole.Viewfinder} - - for stream_opt in stream_desc.split(','): - if stream_opt == 0: - continue - - arr = stream_opt.split('=') - if len(arr) != 2: - print('Bad stream option', stream_opt) - sys.exit(-1) - - key = arr[0] - value = arr[1] - - if key in ['width', 'height']: - value = int(value) - elif key == 'role': - rolemap = { - 'still': libcam.StreamRole.StillCapture, - 'raw': libcam.StreamRole.Raw, - 'video': libcam.StreamRole.VideoRecording, - 'viewfinder': libcam.StreamRole.Viewfinder, - } - - role = rolemap.get(value.lower(), None) - - if role is None: - print('Bad stream role', value) + arr = stream_opt.split('=') + if len(arr) != 2: + print('Bad stream option', stream_opt) sys.exit(-1) - value = role - elif key == 'pixelformat': - pass - else: - print('Bad stream option key', key) - sys.exit(-1) - - stream_opts[key] = value - - streams.append(stream_opts) + key = arr[0] + value = arr[1] + + if key in ['width', 'height']: + value = int(value) + elif key == 'role': + rolemap = { + 'still': libcam.StreamRole.StillCapture, + 'raw': libcam.StreamRole.Raw, + 'video': libcam.StreamRole.VideoRecording, + 'viewfinder': libcam.StreamRole.Viewfinder, + } + + role = rolemap.get(value.lower(), None) + + if role is None: + print('Bad stream role', value) + sys.exit(-1) + + value = role + elif key == 'pixelformat': + pass + else: + print('Bad stream option key', key) + sys.exit(-1) - return streams + stream_opts[key] = value + streams.append(stream_opts) -def configure(ctx): - camera = ctx['camera'] + return streams - streams = parse_streams(ctx) + def configure(self): + streams = self.__parse_streams() - roles = [opts['role'] for opts in streams] + roles = [opts['role'] for opts in streams] - camconfig = camera.generate_configuration(roles) - if camconfig is None: - raise Exception('Generating config failed') + camconfig = self.camera.generate_configuration(roles) + if camconfig is None: + raise Exception('Generating config failed') - for idx, stream_opts in enumerate(streams): - stream_config = camconfig.at(idx) + for idx, stream_opts in enumerate(streams): + stream_config = camconfig.at(idx) - if 'width' in stream_opts: - stream_config.size.width = stream_opts['width'] + if 'width' in stream_opts: + stream_config.size.width = stream_opts['width'] - if 'height' in stream_opts: - stream_config.size.height = stream_opts['height'] + if 'height' in stream_opts: + stream_config.size.height = stream_opts['height'] - if 'pixelformat' in stream_opts: - stream_config.pixel_format = libcam.PixelFormat(stream_opts['pixelformat']) + if 'pixelformat' in stream_opts: + stream_config.pixel_format = libcam.PixelFormat(stream_opts['pixelformat']) - stat = camconfig.validate() + stat = camconfig.validate() - if stat == libcam.CameraConfiguration.Status.Invalid: - print('Camera configuration invalid') - exit(-1) - elif stat == libcam.CameraConfiguration.Status.Adjusted: - if ctx['opt-strict-formats']: - print('Adjusting camera configuration disallowed by --strict-formats argument') + if stat == libcam.CameraConfiguration.Status.Invalid: + print('Camera configuration invalid') exit(-1) + elif stat == libcam.CameraConfiguration.Status.Adjusted: + if self.opt_strict_formats: + print('Adjusting camera configuration disallowed by --strict-formats argument') + exit(-1) - print('Camera configuration adjusted') - - r = camera.configure(camconfig) - if r != 0: - raise Exception('Configure failed') - - ctx['stream-names'] = {} - ctx['streams'] = [] - - for idx, stream_config in enumerate(camconfig): - stream = stream_config.stream - ctx['streams'].append(stream) - ctx['stream-names'][stream] = 'stream' + str(idx) - print('{}-{}: stream config {}'.format(ctx['id'], ctx['stream-names'][stream], stream.configuration)) - - -def alloc_buffers(ctx): - camera = ctx['camera'] - - allocator = libcam.FrameBufferAllocator(camera) + print('Camera configuration adjusted') - for idx, stream in enumerate(ctx['streams']): - ret = allocator.allocate(stream) - if ret < 0: - print('Cannot allocate buffers') - exit(-1) + r = self.camera.configure(camconfig) + if r != 0: + raise Exception('Configure failed') - allocated = len(allocator.buffers(stream)) + self.stream_names = {} + self.streams = [] - print('{}-{}: Allocated {} buffers'.format(ctx['id'], ctx['stream-names'][stream], allocated)) + for idx, stream_config in enumerate(camconfig): + stream = stream_config.stream + self.streams.append(stream) + self.stream_names[stream] = 'stream' + str(idx) + print('{}-{}: stream config {}'.format(self.id, self.stream_names[stream], stream.configuration)) - ctx['allocator'] = allocator + def alloc_buffers(self): + allocator = libcam.FrameBufferAllocator(self.camera) + for stream in self.streams: + ret = allocator.allocate(stream) + if ret < 0: + print('Cannot allocate buffers') + exit(-1) -def create_requests(ctx): - camera = ctx['camera'] + allocated = len(allocator.buffers(stream)) - ctx['requests'] = [] + print('{}-{}: Allocated {} buffers'.format(self.id, self.stream_names[stream], allocated)) - # Identify the stream with the least number of buffers - num_bufs = min([len(ctx['allocator'].buffers(stream)) for stream in ctx['streams']]) + self.allocator = allocator - requests = [] + def create_requests(self): + self.requests = [] - for buf_num in range(num_bufs): - request = camera.create_request(ctx['idx']) + # Identify the stream with the least number of buffers + num_bufs = min([len(self.allocator.buffers(stream)) for stream in self.streams]) - if request is None: - print('Can not create request') - exit(-1) + requests = [] - for stream in ctx['streams']: - buffers = ctx['allocator'].buffers(stream) - buffer = buffers[buf_num] + for buf_num in range(num_bufs): + request = self.camera.create_request(self.idx) - ret = request.add_buffer(stream, buffer) - if ret < 0: - print('Can not set buffer for request') + if request is None: + print('Can not create request') exit(-1) - requests.append(request) + for stream in self.streams: + buffers = self.allocator.buffers(stream) + buffer = buffers[buf_num] - ctx['requests'] = requests + ret = request.add_buffer(stream, buffer) + if ret < 0: + print('Can not set buffer for request') + exit(-1) + requests.append(request) -def start(ctx): - camera = ctx['camera'] + self.requests = requests - camera.start() + def start(self): + self.camera.start() + def stop(self): + self.camera.stop() -def stop(ctx): - camera = ctx['camera'] + def queue_requests(self): + for request in self.requests: + self.camera.queue_request(request) + self.reqs_queued += 1 - camera.stop() + del self.requests -def queue_requests(ctx): - camera = ctx['camera'] +class CaptureState: + cm: libcam.CameraManager + contexts: list[CameraContext] + renderer: Any - for request in ctx['requests']: - camera.queue_request(request) - ctx['reqs-queued'] += 1 + def __init__(self, cm, contexts): + self.cm = cm + self.contexts = contexts - del ctx['requests'] + # Called from renderer when there is a libcamera event + def event_handler(self): + try: + self.cm.read_event() + reqs = self.cm.get_ready_requests() -def capture_init(contexts): - for ctx in contexts: - acquire(ctx) + for req in reqs: + ctx = next(ctx for ctx in self.contexts if ctx.idx == req.cookie) + self.__request_handler(ctx, req) - for ctx in contexts: - configure(ctx) + running = any(ctx.reqs_completed < ctx.opt_capture for ctx in self.contexts) + return running + except Exception: + traceback.print_exc() + return False - for ctx in contexts: - alloc_buffers(ctx) + def __request_handler(self, ctx, req): + if req.status != libcam.Request.Status.Complete: + raise Exception('{}: Request failed: {}'.format(ctx.id, req.status)) - for ctx in contexts: - create_requests(ctx) + buffers = req.buffers + # Compute the frame rate. The timestamp is arbitrarily retrieved from + # the first buffer, as all buffers should have matching timestamps. + ts = buffers[next(iter(buffers))].metadata.timestamp + last = ctx.last + fps = 1000000000.0 / (ts - last) if (last != 0 and (ts - last) != 0) else 0 + ctx.last = ts + ctx.fps = fps -def capture_start(contexts): - for ctx in contexts: - start(ctx) + for stream, fb in buffers.items(): + stream_name = ctx.stream_names[stream] - for ctx in contexts: - queue_requests(ctx) + crcs = [] + if ctx.opt_crc: + with libcamera.utils.MappedFrameBuffer(fb) as mfb: + plane_crcs = [binascii.crc32(p) for p in mfb.planes] + crcs.append(plane_crcs) + meta = fb.metadata -# Called from renderer when there is a libcamera event -def event_handler(state): - try: - cm = state['cm'] - contexts = state['contexts'] + print('{:.6f} ({:.2f} fps) {}-{}: seq {}, bytes {}, CRCs {}' + .format(ts / 1000000000, fps, + ctx.id, stream_name, + meta.sequence, meta.bytesused, + crcs)) - cm.read_event() + if ctx.opt_metadata: + reqmeta = req.metadata + for ctrl, val in reqmeta.items(): + print(f'\t{ctrl} = {val}') - reqs = cm.get_ready_requests() + if ctx.opt_save_frames: + with libcamera.utils.MappedFrameBuffer(fb) as mfb: + filename = 'frame-{}-{}-{}.data'.format(ctx.id, stream_name, ctx.reqs_completed) + with open(filename, 'wb') as f: + for p in mfb.planes: + f.write(p) - for req in reqs: - ctx = next(ctx for ctx in contexts if ctx['idx'] == req.cookie) - request_handler(state, ctx, req) + self.renderer.request_handler(ctx, req) - running = any(ctx['reqs-completed'] < ctx['opt-capture'] for ctx in contexts) - return running - except Exception: - traceback.print_exc() - return False + ctx.reqs_completed += 1 + # Called from renderer when it has finished with a request + def request_processed(self, ctx, req): + if ctx.reqs_queued < ctx.opt_capture: + req.reuse() + ctx.camera.queue_request(req) + ctx.reqs_queued += 1 -def request_handler(state, ctx, req): - if req.status != libcam.Request.Status.Complete: - raise Exception('{}: Request failed: {}'.format(ctx['id'], req.status)) + def __capture_init(self): + for ctx in self.contexts: + ctx.acquire() - buffers = req.buffers + for ctx in self.contexts: + ctx.configure() - # Compute the frame rate. The timestamp is arbitrarily retrieved from - # the first buffer, as all buffers should have matching timestamps. - ts = buffers[next(iter(buffers))].metadata.timestamp - last = ctx.get('last', 0) - fps = 1000000000.0 / (ts - last) if (last != 0 and (ts - last) != 0) else 0 - ctx['last'] = ts - ctx['fps'] = fps + for ctx in self.contexts: + ctx.alloc_buffers() - for stream, fb in buffers.items(): - stream_name = ctx['stream-names'][stream] + for ctx in self.contexts: + ctx.create_requests() - crcs = [] - if ctx['opt-crc']: - with libcamera.utils.MappedFrameBuffer(fb) as mfb: - plane_crcs = [binascii.crc32(p) for p in mfb.planes] - crcs.append(plane_crcs) + def __capture_start(self): + for ctx in self.contexts: + ctx.start() - meta = fb.metadata + for ctx in self.contexts: + ctx.queue_requests() - print('{:.6f} ({:.2f} fps) {}-{}: seq {}, bytes {}, CRCs {}' - .format(ts / 1000000000, fps, - ctx['id'], stream_name, - meta.sequence, meta.bytesused, - crcs)) + def __capture_deinit(self): + for ctx in self.contexts: + ctx.stop() - if ctx['opt-metadata']: - reqmeta = req.metadata - for ctrl, val in reqmeta.items(): - print(f'\t{ctrl} = {val}') + for ctx in self.contexts: + ctx.release() - if ctx['opt-save-frames']: - with libcamera.utils.MappedFrameBuffer(fb) as mfb: - filename = 'frame-{}-{}-{}.data'.format(ctx['id'], stream_name, ctx['reqs-completed']) - with open(filename, 'wb') as f: - for p in mfb.planes: - f.write(p) + def do_cmd_capture(self): + self.__capture_init() - state['renderer'].request_handler(ctx, req) + self.renderer.setup() - ctx['reqs-completed'] += 1 + self.__capture_start() + self.renderer.run() -# Called from renderer when it has finished with a request -def request_prcessed(ctx, req): - camera = ctx['camera'] + self.__capture_deinit() - if ctx['reqs-queued'] < ctx['opt-capture']: - req.reuse() - camera.queue_request(req) - ctx['reqs-queued'] += 1 +class CustomAction(argparse.Action): + def __init__(self, option_strings, dest, **kwargs): + super().__init__(option_strings, dest, default={}, **kwargs) -def capture_deinit(contexts): - for ctx in contexts: - stop(ctx) + def __call__(self, parser, namespace, values, option_string=None): + if len(namespace.camera) == 0: + print(f'Option {option_string} requires a --camera context') + sys.exit(-1) - for ctx in contexts: - release(ctx) + if self.type == bool: + values = True + current = namespace.camera[-1] -def do_cmd_capture(state): - capture_init(state['contexts']) + data = getattr(namespace, self.dest) - renderer = state['renderer'] + if self.nargs == '+': + if current not in data: + data[current] = [] - renderer.setup() + data[current] += values + else: + data[current] = values - capture_start(state['contexts']) - renderer.run() +def do_cmd_list(cm): + print('Available cameras:') - capture_deinit(state['contexts']) + for idx, c in enumerate(cm.cameras): + print(f'{idx + 1}: {c.id}') def main(): @@ -422,39 +415,28 @@ def main(): print('Unable to find camera', cam_idx) return -1 - contexts.append({ - 'camera': camera, - 'idx': cam_idx, - 'id': 'cam' + str(cam_idx), - 'reqs-queued': 0, - 'reqs-completed': 0, - 'opt-capture': args.capture.get(cam_idx, False), - 'opt-crc': args.crc.get(cam_idx, False), - 'opt-save-frames': args.save_frames.get(cam_idx, False), - 'opt-metadata': args.metadata.get(cam_idx, False), - 'opt-strict-formats': args.strict_formats.get(cam_idx, False), - 'opt-stream': args.stream.get(cam_idx, ['role=viewfinder']), - }) + ctx = CameraContext(camera, cam_idx) + ctx.opt_capture = args.capture.get(cam_idx, 0) + ctx.opt_crc = args.crc.get(cam_idx, False) + ctx.opt_save_frames = args.save_frames.get(cam_idx, False) + ctx.opt_metadata = args.metadata.get(cam_idx, False) + ctx.opt_strict_formats = args.strict_formats.get(cam_idx, False) + ctx.opt_stream = args.stream.get(cam_idx, ['role=viewfinder']) + contexts.append(ctx) for ctx in contexts: - print('Using camera {} as {}'.format(ctx['camera'].id, ctx['id'])) + print('Using camera {} as {}'.format(ctx.camera.id, ctx.id)) for ctx in contexts: if args.list_properties: - do_cmd_list_props(ctx) + ctx.do_cmd_list_props() if args.list_controls: - do_cmd_list_controls(ctx) + ctx.do_cmd_list_controls() if args.info: - do_cmd_info(ctx) + ctx.do_cmd_info() if args.capture: - - state = { - 'cm': cm, - 'contexts': contexts, - 'event_handler': event_handler, - 'request_prcessed': request_prcessed, - } + state = CaptureState(cm, contexts) if args.renderer == 'null': import cam_null @@ -472,9 +454,9 @@ def main(): print('Bad renderer', args.renderer) return -1 - state['renderer'] = renderer + state.renderer = renderer - do_cmd_capture(state) + state.do_cmd_capture() return 0 |