summaryrefslogtreecommitdiff
path: root/subprojects/gtest.wrap
href='#n1'>123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
AgeCommit message (Expand)Author
#!/usr/bin/env python3

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>

# A simple capture example showing:
# - How to setup the camera
# - Capture certain number of frames in a blocking manner
# - How to stop the camera
#
# This simple example is, in many ways, too simple. The purpose of the example
# is to introduce the concepts. A more realistic example is given in
# simple-continuous-capture.py.

import argparse
import libcamera as libcam
import sys

# Number of frames to capture
TOTAL_FRAMES = 30


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--camera', type=str, default='1',
                        help='Camera index number (starting from 1) or part of the name')
    parser.add_argument('-f', '--format', type=str, help='Pixel format')
    parser.add_argument('-s', '--size', type=str, help='Size ("WxH")')
    args = parser.parse_args()

    cm = libcam.CameraManager.singleton()

    try:
        if args.camera.isnumeric():
            cam_idx = int(args.camera)
            cam = next((cam for i, cam in enumerate(cm.cameras) if i + 1 == cam_idx))
        else:
            cam = next((cam for cam in cm.cameras if args.camera in cam.id))
    except Exception:
        print(f'Failed to find camera "{args.camera}"')
        return -1

    # Acquire the camera for our use

    ret = cam.acquire()
    assert ret == 0

    # Configure the camera

    cam_config = cam.generate_configuration([libcam.StreamRole.Viewfinder])

    stream_config = cam_config.at(0)

    if args.format:
        fmt = libcam.PixelFormat(args.format)
        stream_config.pixel_format = fmt

    if args.size:
        w, h = [int(v) for v in args.size.split('x')]
        stream_config.size = libcam.Size(w, h)

    ret = cam.configure(cam_config)
    assert ret == 0

    print(f'Capturing {TOTAL_FRAMES} frames with {stream_config}')

    stream = stream_config.stream

    # Allocate the buffers for capture

    allocator = libcam.FrameBufferAllocator(cam)
    ret = allocator.allocate(stream)
    assert ret > 0

    num_bufs = len(allocator.buffers(stream))

    # Create the requests and assign a buffer for each request

    reqs = []
    for i in range(num_bufs):
        # Use the buffer index as the cookie
        req = cam.create_request(i)

        buffer = allocator.buffers(stream)[i]
        ret = req.add_buffer(stream, buffer)
        assert ret == 0

        reqs.append(req)

    # Start the camera

    ret = cam.start()
    assert ret == 0

    # frames_queued and frames_done track the number of frames queued and done

    frames_queued = 0
    frames_done = 0

    # Queue the requests to the camera

    for req in reqs:
        ret = cam.queue_request(req)
        assert ret == 0
        frames_queued += 1

    # The main loop. Wait for the queued Requests to complete, process them,
    # and re-queue them again.

    while frames_done < TOTAL_FRAMES:
        # cm.get_ready_requests() blocks until there is an event and returns
        # all the ready requests. Here we should almost always get a single
        # Request, but in some cases there could be multiple or none.

        reqs = cm.get_ready_requests()

        for req in reqs:
            frames_done += 1

            buffers = req.buffers

            # A ready Request could contain multiple buffers if multiple streams
            # were being used. Here we know we only have a single stream,
            # and we use next(iter()) to get the first and only buffer.

            assert len(buffers) == 1

            stream, fb = next(iter(buffers.items()))

            # Here we could process the received buffer. In this example we only
            # print a few details below.

            meta = fb.metadata

            print("seq {:3}, bytes {}, frames queued/done {:3}/{:<3}"
                  .format(meta.sequence,
                          '/'.join([str(p.bytes_used) for p in meta.planes]),
                          frames_queued, frames_done))

            # If we want to capture more frames we need to queue more Requests.
            # We could create a totally new Request, but it is more efficient
            # to reuse the existing one that we just received.
            if frames_queued < TOTAL_FRAMES:
                req.reuse()
                cam.queue_request(req)
                frames_queued += 1

    # Stop the camera

    ret = cam.stop()
    assert ret == 0

    # Release the camera

    ret = cam.release()
    assert ret == 0

    return 0


if __name__ == '__main__':
    sys.exit(main())