summaryrefslogtreecommitdiff
path: root/src/py/examples
diff options
context:
space:
mode:
Diffstat (limited to 'src/py/examples')
-rwxr-xr-xsrc/py/examples/simple-cam.py340
-rwxr-xr-xsrc/py/examples/simple-capture.py163
-rwxr-xr-xsrc/py/examples/simple-continuous-capture.py185
3 files changed, 688 insertions, 0 deletions
diff --git a/src/py/examples/simple-cam.py b/src/py/examples/simple-cam.py
new file mode 100755
index 00000000..1cd1019d
--- /dev/null
+++ b/src/py/examples/simple-cam.py
@@ -0,0 +1,340 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+# A simple libcamera capture example
+#
+# This is a python version of simple-cam from:
+# https://git.libcamera.org/libcamera/simple-cam.git
+#
+# \todo Move to simple-cam repository when the Python API has stabilized more
+
+import libcamera as libcam
+import selectors
+import sys
+import time
+
+TIMEOUT_SEC = 3
+
+
+def handle_camera_event(cm):
+ # cm.get_ready_requests() returns the ready requests, which in our case
+ # should almost always return a single Request, but in some cases there
+ # could be multiple or none.
+
+ reqs = cm.get_ready_requests()
+
+ # Process the captured frames
+
+ for req in reqs:
+ process_request(req)
+
+
+def process_request(request):
+ global camera
+
+ print()
+
+ print(f'Request completed: {request}')
+
+ # When a request has completed, it is populated with a metadata control
+ # list that allows an application to determine various properties of
+ # the completed request. This can include the timestamp of the Sensor
+ # capture, or its gain and exposure values, or properties from the IPA
+ # such as the state of the 3A algorithms.
+ #
+ # To examine each request, print all the metadata for inspection. A custom
+ # application can parse each of these items and process them according to
+ # its needs.
+
+ requestMetadata = request.metadata
+ for id, value in requestMetadata.items():
+ print(f'\t{id.name} = {value}')
+
+ # Each buffer has its own FrameMetadata to describe its state, or the
+ # usage of each buffer. While in our simple capture we only provide one
+ # buffer per request, a request can have a buffer for each stream that
+ # is established when configuring the camera.
+ #
+ # This allows a viewfinder and a still image to be processed at the
+ # same time, or to allow obtaining the RAW capture buffer from the
+ # sensor along with the image as processed by the ISP.
+
+ buffers = request.buffers
+ for _, buffer in buffers.items():
+ metadata = buffer.metadata
+
+ # Print some information about the buffer which has completed.
+ print(f' seq: {metadata.sequence:06} timestamp: {metadata.timestamp} bytesused: ' +
+ '/'.join([str(p.bytes_used) for p in metadata.planes]))
+
+ # Image data can be accessed here, but the FrameBuffer
+ # must be mapped by the application
+
+ # Re-queue the Request to the camera.
+ request.reuse()
+ camera.queue_request(request)
+
+
+# ----------------------------------------------------------------------------
+# Camera Naming.
+#
+# Applications are responsible for deciding how to name cameras, and present
+# that information to the users. Every camera has a unique identifier, though
+# this string is not designed to be friendly for a human reader.
+#
+# To support human consumable names, libcamera provides camera properties
+# that allow an application to determine a naming scheme based on its needs.
+#
+# In this example, we focus on the location property, but also detail the
+# model string for external cameras, as this is more likely to be visible
+# information to the user of an externally connected device.
+#
+# The unique camera ID is appended for informative purposes.
+#
+def camera_name(camera):
+ props = camera.properties
+ location = props.get(libcam.properties.Location, None)
+
+ if location == libcam.properties.LocationEnum.Front:
+ name = 'Internal front camera'
+ elif location == libcam.properties.LocationEnum.Back:
+ name = 'Internal back camera'
+ elif location == libcam.properties.LocationEnum.External:
+ name = 'External camera'
+ if libcam.properties.Model in props:
+ name += f' "{props[libcam.properties.Model]}"'
+ else:
+ name = 'Undefined location'
+
+ name += f' ({camera.id})'
+
+ return name
+
+
+def main():
+ global camera
+
+ # --------------------------------------------------------------------
+ # Get the Camera Manager.
+ #
+ # The Camera Manager is responsible for enumerating all the Camera
+ # in the system, by associating Pipeline Handlers with media entities
+ # registered in the system.
+ #
+ # The CameraManager provides a list of available Cameras that
+ # applications can operate on.
+ #
+ # There can only be a single CameraManager within any process space.
+
+ cm = libcam.CameraManager.singleton()
+
+ # Just as a test, generate names of the Cameras registered in the
+ # system, and list them.
+
+ for camera in cm.cameras:
+ print(f' - {camera_name(camera)}')
+
+ # --------------------------------------------------------------------
+ # Camera
+ #
+ # Camera are entities created by pipeline handlers, inspecting the
+ # entities registered in the system and reported to applications
+ # by the CameraManager.
+ #
+ # In general terms, a Camera corresponds to a single image source
+ # available in the system, such as an image sensor.
+ #
+ # Application lock usage of Camera by 'acquiring' them.
+ # Once done with it, application shall similarly 'release' the Camera.
+ #
+ # As an example, use the first available camera in the system after
+ # making sure that at least one camera is available.
+ #
+ # Cameras can be obtained by their ID or their index, to demonstrate
+ # this, the following code gets the ID of the first camera; then gets
+ # the camera associated with that ID (which is of course the same as
+ # cm.cameras[0]).
+
+ if not cm.cameras:
+ print('No cameras were identified on the system.')
+ return -1
+
+ camera_id = cm.cameras[0].id
+ camera = cm.get(camera_id)
+ camera.acquire()
+
+ # --------------------------------------------------------------------
+ # Stream
+ #
+ # Each Camera supports a variable number of Stream. A Stream is
+ # produced by processing data produced by an image source, usually
+ # by an ISP.
+ #
+ # +-------------------------------------------------------+
+ # | Camera |
+ # | +-----------+ |
+ # | +--------+ | |------> [ Main output ] |
+ # | | Image | | | |
+ # | | |---->| ISP |------> [ Viewfinder ] |
+ # | | Source | | | |
+ # | +--------+ | |------> [ Still Capture ] |
+ # | +-----------+ |
+ # +-------------------------------------------------------+
+ #
+ # The number and capabilities of the Stream in a Camera are
+ # a platform dependent property, and it's the pipeline handler
+ # implementation that has the responsibility of correctly
+ # report them.
+
+ # --------------------------------------------------------------------
+ # Camera Configuration.
+ #
+ # Camera configuration is tricky! It boils down to assign resources
+ # of the system (such as DMA engines, scalers, format converters) to
+ # the different image streams an application has requested.
+ #
+ # Depending on the system characteristics, some combinations of
+ # sizes, formats and stream usages might or might not be possible.
+ #
+ # A Camera produces a CameraConfigration based on a set of intended
+ # roles for each Stream the application requires.
+
+ config = camera.generate_configuration([libcam.StreamRole.Viewfinder])
+
+ # The CameraConfiguration contains a StreamConfiguration instance
+ # for each StreamRole requested by the application, provided
+ # the Camera can support all of them.
+ #
+ # Each StreamConfiguration has default size and format, assigned
+ # by the Camera depending on the Role the application has requested.
+
+ stream_config = config.at(0)
+ print(f'Default viewfinder configuration is: {stream_config}')
+
+ # Each StreamConfiguration parameter which is part of a
+ # CameraConfiguration can be independently modified by the
+ # application.
+ #
+ # In order to validate the modified parameter, the CameraConfiguration
+ # should be validated -before- the CameraConfiguration gets applied
+ # to the Camera.
+ #
+ # The CameraConfiguration validation process adjusts each
+ # StreamConfiguration to a valid value.
+
+ # Validating a CameraConfiguration -before- applying it will adjust it
+ # to a valid configuration which is as close as possible to the one
+ # requested.
+
+ config.validate()
+ print(f'Validated viewfinder configuration is: {stream_config}')
+
+ # Once we have a validated configuration, we can apply it to the
+ # Camera.
+
+ camera.configure(config)
+
+ # --------------------------------------------------------------------
+ # Buffer Allocation
+ #
+ # Now that a camera has been configured, it knows all about its
+ # Streams sizes and formats. The captured images need to be stored in
+ # framebuffers which can either be provided by the application to the
+ # library, or allocated in the Camera and exposed to the application
+ # by libcamera.
+ #
+ # An application may decide to allocate framebuffers from elsewhere,
+ # for example in memory allocated by the display driver that will
+ # render the captured frames. The application will provide them to
+ # libcamera by constructing FrameBuffer instances to capture images
+ # directly into.
+ #
+ # Alternatively libcamera can help the application by exporting
+ # buffers allocated in the Camera using a FrameBufferAllocator
+ # instance and referencing a configured Camera to determine the
+ # appropriate buffer size and types to create.
+
+ allocator = libcam.FrameBufferAllocator(camera)
+
+ for cfg in config:
+ allocated = allocator.allocate(cfg.stream)
+ print(f'Allocated {allocated} buffers for stream')
+
+ # --------------------------------------------------------------------
+ # Frame Capture
+ #
+ # libcamera frames capture model is based on the 'Request' concept.
+ # For each frame a Request has to be queued to the Camera.
+ #
+ # A Request refers to (at least one) Stream for which a Buffer that
+ # will be filled with image data shall be added to the Request.
+ #
+ # A Request is associated with a list of Controls, which are tunable
+ # parameters (similar to v4l2_controls) that have to be applied to
+ # the image.
+ #
+ # Once a request completes, all its buffers will contain image data
+ # that applications can access and for each of them a list of metadata
+ # properties that reports the capture parameters applied to the image.
+
+ stream = stream_config.stream
+ buffers = allocator.buffers(stream)
+ requests = []
+ for i in range(len(buffers)):
+ request = camera.create_request()
+
+ buffer = buffers[i]
+ request.add_buffer(stream, buffer)
+
+ # Controls can be added to a request on a per frame basis.
+ request.set_control(libcam.controls.Brightness, 0.5)
+
+ requests.append(request)
+
+ # --------------------------------------------------------------------
+ # Start Capture
+ #
+ # In order to capture frames the Camera has to be started and
+ # Request queued to it. Enough Request to fill the Camera pipeline
+ # depth have to be queued before the Camera start delivering frames.
+ #
+ # When a Request has been completed, it will be added to a list in the
+ # CameraManager and an event will be raised using eventfd.
+ #
+ # The list of completed Requests can be retrieved with
+ # CameraManager.get_ready_requests(), which will also clear the list in the
+ # CameraManager.
+ #
+ # The eventfd can be retrieved from CameraManager.event_fd, and the fd can
+ # be waited upon using e.g. Python's selectors.
+
+ camera.start()
+ for request in requests:
+ camera.queue_request(request)
+
+ sel = selectors.DefaultSelector()
+ sel.register(cm.event_fd, selectors.EVENT_READ, lambda fd: handle_camera_event(cm))
+
+ start_time = time.time()
+
+ while time.time() - start_time < TIMEOUT_SEC:
+ events = sel.select()
+ for key, mask in events:
+ key.data(key.fileobj)
+
+ # --------------------------------------------------------------------
+ # Clean Up
+ #
+ # Stop the Camera, release resources and stop the CameraManager.
+ # libcamera has now released all resources it owned.
+
+ camera.stop()
+ camera.release()
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/src/py/examples/simple-capture.py b/src/py/examples/simple-capture.py
new file mode 100755
index 00000000..4b85408f
--- /dev/null
+++ b/src/py/examples/simple-capture.py
@@ -0,0 +1,163 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+# A simple capture example showing:
+# - How to setup the camera
+# - Capture certain number of frames in a blocking manner
+# - How to stop the camera
+#
+# This simple example is, in many ways, too simple. The purpose of the example
+# is to introduce the concepts. A more realistic example is given in
+# simple-continuous-capture.py.
+
+import argparse
+import libcamera as libcam
+import selectors
+import sys
+
+# Number of frames to capture
+TOTAL_FRAMES = 30
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-c', '--camera', type=str, default='1',
+ help='Camera index number (starting from 1) or part of the name')
+ parser.add_argument('-f', '--format', type=str, help='Pixel format')
+ parser.add_argument('-s', '--size', type=str, help='Size ("WxH")')
+ args = parser.parse_args()
+
+ cm = libcam.CameraManager.singleton()
+
+ try:
+ if args.camera.isnumeric():
+ cam_idx = int(args.camera)
+ cam = next((cam for i, cam in enumerate(cm.cameras) if i + 1 == cam_idx))
+ else:
+ cam = next((cam for cam in cm.cameras if args.camera in cam.id))
+ except Exception:
+ print(f'Failed to find camera "{args.camera}"')
+ return -1
+
+ # Acquire the camera for our use
+
+ cam.acquire()
+
+ # Configure the camera
+
+ cam_config = cam.generate_configuration([libcam.StreamRole.Viewfinder])
+
+ stream_config = cam_config.at(0)
+
+ if args.format:
+ fmt = libcam.PixelFormat(args.format)
+ stream_config.pixel_format = fmt
+
+ if args.size:
+ w, h = [int(v) for v in args.size.split('x')]
+ stream_config.size = libcam.Size(w, h)
+
+ cam.configure(cam_config)
+
+ print(f'Capturing {TOTAL_FRAMES} frames with {stream_config}')
+
+ stream = stream_config.stream
+
+ # Allocate the buffers for capture
+
+ allocator = libcam.FrameBufferAllocator(cam)
+ ret = allocator.allocate(stream)
+ assert ret > 0
+
+ num_bufs = len(allocator.buffers(stream))
+
+ # Create the requests and assign a buffer for each request
+
+ reqs = []
+ for i in range(num_bufs):
+ # Use the buffer index as the cookie
+ req = cam.create_request(i)
+
+ buffer = allocator.buffers(stream)[i]
+ req.add_buffer(stream, buffer)
+
+ reqs.append(req)
+
+ # Start the camera
+
+ cam.start()
+
+ # frames_queued and frames_done track the number of frames queued and done
+
+ frames_queued = 0
+ frames_done = 0
+
+ # Queue the requests to the camera
+
+ for req in reqs:
+ cam.queue_request(req)
+ frames_queued += 1
+
+ # The main loop. Wait for the queued Requests to complete, process them,
+ # and re-queue them again.
+
+ sel = selectors.DefaultSelector()
+ sel.register(cm.event_fd, selectors.EVENT_READ)
+
+ while frames_done < TOTAL_FRAMES:
+ # cm.get_ready_requests() does not block, so we use a Selector to wait
+ # for a camera event. Here we should almost always get a single
+ # Request, but in some cases there could be multiple or none.
+
+ events = sel.select()
+ if not events:
+ continue
+
+ reqs = cm.get_ready_requests()
+
+ for req in reqs:
+ frames_done += 1
+
+ buffers = req.buffers
+
+ # A ready Request could contain multiple buffers if multiple streams
+ # were being used. Here we know we only have a single stream,
+ # and we use next(iter()) to get the first and only buffer.
+
+ assert len(buffers) == 1
+
+ stream, fb = next(iter(buffers.items()))
+
+ # Here we could process the received buffer. In this example we only
+ # print a few details below.
+
+ meta = fb.metadata
+
+ print("seq {:3}, bytes {}, frames queued/done {:3}/{:<3}"
+ .format(meta.sequence,
+ '/'.join([str(p.bytes_used) for p in meta.planes]),
+ frames_queued, frames_done))
+
+ # If we want to capture more frames we need to queue more Requests.
+ # We could create a totally new Request, but it is more efficient
+ # to reuse the existing one that we just received.
+ if frames_queued < TOTAL_FRAMES:
+ req.reuse()
+ cam.queue_request(req)
+ frames_queued += 1
+
+ # Stop the camera
+
+ cam.stop()
+
+ # Release the camera
+
+ cam.release()
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/src/py/examples/simple-continuous-capture.py b/src/py/examples/simple-continuous-capture.py
new file mode 100755
index 00000000..e1cb931e
--- /dev/null
+++ b/src/py/examples/simple-continuous-capture.py
@@ -0,0 +1,185 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+# A simple capture example extending the simple-capture.py example:
+# - Capture frames using events from multiple cameras
+# - Listening events from stdin to exit the application
+# - Memory mapping the frames and calculating CRC
+
+import binascii
+import libcamera as libcam
+import libcamera.utils
+import selectors
+import sys
+
+
+# A container class for our state per camera
+class CameraCaptureContext:
+ idx: int
+ cam: libcam.Camera
+ reqs: list[libcam.Request]
+ mfbs: dict[libcam.FrameBuffer, libcamera.utils.MappedFrameBuffer]
+
+ def __init__(self, cam, idx):
+ self.idx = idx
+ self.cam = cam
+
+ # Acquire the camera for our use
+
+ cam.acquire()
+
+ # Configure the camera
+
+ cam_config = cam.generate_configuration([libcam.StreamRole.Viewfinder])
+
+ stream_config = cam_config.at(0)
+
+ cam.configure(cam_config)
+
+ stream = stream_config.stream
+
+ # Allocate the buffers for capture
+
+ allocator = libcam.FrameBufferAllocator(cam)
+ ret = allocator.allocate(stream)
+ assert ret > 0
+
+ num_bufs = len(allocator.buffers(stream))
+
+ print(f'cam{idx} ({cam.id}): capturing {num_bufs} buffers with {stream_config}')
+
+ # Create the requests and assign a buffer for each request
+
+ self.reqs = []
+ self.mfbs = {}
+
+ for i in range(num_bufs):
+ # Use the buffer index as the "cookie"
+ req = cam.create_request(idx)
+
+ buffer = allocator.buffers(stream)[i]
+ req.add_buffer(stream, buffer)
+
+ self.reqs.append(req)
+
+ # Save a mmapped buffer so we can calculate the CRC later
+ self.mfbs[buffer] = libcamera.utils.MappedFrameBuffer(buffer).mmap()
+
+ def uninit_camera(self):
+ # Stop the camera
+
+ self.cam.stop()
+
+ # Release the camera
+
+ self.cam.release()
+
+
+# A container class for our state
+class CaptureContext:
+ cm: libcam.CameraManager
+ camera_contexts: list[CameraCaptureContext] = []
+
+ def handle_camera_event(self):
+ # cm.get_ready_requests() returns the ready requests, which in our case
+ # should almost always return a single Request, but in some cases there
+ # could be multiple or none.
+
+ reqs = self.cm.get_ready_requests()
+
+ # Process the captured frames
+
+ for req in reqs:
+ self.handle_request(req)
+
+ return True
+
+ def handle_request(self, req: libcam.Request):
+ cam_ctx = self.camera_contexts[req.cookie]
+
+ buffers = req.buffers
+
+ assert len(buffers) == 1
+
+ # A ready Request could contain multiple buffers if multiple streams
+ # were being used. Here we know we only have a single stream,
+ # and we use next(iter()) to get the first and only buffer.
+
+ stream, fb = next(iter(buffers.items()))
+
+ # Use the MappedFrameBuffer to access the pixel data with CPU. We calculate
+ # the crc for each plane.
+
+ mfb = cam_ctx.mfbs[fb]
+ crcs = [binascii.crc32(p) for p in mfb.planes]
+
+ meta = fb.metadata
+
+ print('cam{:<6} seq {:<6} bytes {:10} CRCs {}'
+ .format(cam_ctx.idx,
+ meta.sequence,
+ '/'.join([str(p.bytes_used) for p in meta.planes]),
+ crcs))
+
+ # We want to re-queue the buffer we just handled. Instead of creating
+ # a new Request, we re-use the old one. We need to call req.reuse()
+ # to re-initialize the Request before queuing.
+
+ req.reuse()
+ cam_ctx.cam.queue_request(req)
+
+ def handle_key_event(self):
+ sys.stdin.readline()
+ print('Exiting...')
+ return False
+
+ def capture(self):
+ # Queue the requests to the camera
+
+ for cam_ctx in self.camera_contexts:
+ for req in cam_ctx.reqs:
+ cam_ctx.cam.queue_request(req)
+
+ # Use Selector to wait for events from the camera and from the keyboard
+
+ sel = selectors.DefaultSelector()
+ sel.register(sys.stdin, selectors.EVENT_READ, self.handle_key_event)
+ sel.register(self.cm.event_fd, selectors.EVENT_READ, lambda: self.handle_camera_event())
+
+ running = True
+
+ while running:
+ events = sel.select()
+ for key, mask in events:
+ # If the handler return False, we should exit
+ if not key.data():
+ running = False
+
+
+def main():
+ cm = libcam.CameraManager.singleton()
+
+ ctx = CaptureContext()
+ ctx.cm = cm
+
+ for idx, cam in enumerate(cm.cameras):
+ cam_ctx = CameraCaptureContext(cam, idx)
+ ctx.camera_contexts.append(cam_ctx)
+
+ # Start the cameras
+
+ for cam_ctx in ctx.camera_contexts:
+ cam_ctx.cam.start()
+
+ ctx.capture()
+
+ for cam_ctx in ctx.camera_contexts:
+ cam_ctx.uninit_camera()
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())