summaryrefslogtreecommitdiff
path: root/src/py/libcamera/__init__.py
blob: 0d7da9e28e8d7961ae77a7621bc6432040d2d4d1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# SPDX-License-Identifier: LGPL-2.1-or-later
# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>

from ._libcamera import *


class MappedFrameBuffer:
    def __init__(self, fb):
        self.__fb = fb

    def __enter__(self):
        import os
        import mmap

        fb = self.__fb

        # Collect information about the buffers

        bufinfos = {}

        for i in range(fb.num_planes):
            fd = fb.fd(i)

            if fd not in bufinfos:
                buflen = os.lseek(fd, 0, os.SEEK_END)
                bufinfos[fd] = {'maplen': 0, 'buflen': buflen}
            else:
                buflen = bufinfos[fd]['buflen']

            if fb.offset(i) > buflen or fb.offset(i) + fb.length(i) > buflen:
                raise RuntimeError(f'plane is out of buffer: buffer length={buflen}, ' +
                                   f'plane offset={fb.offset(i)}, plane length={fb.length(i)}')

            bufinfos[fd]['maplen'] = max(bufinfos[fd]['maplen'], fb.offset(i) + fb.length(i))

        # mmap the buffers

        maps = []

        for fd, info in bufinfos.items():
            map = mmap.mmap(fd, info['maplen'], mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE)
            info['map'] = map
            maps.append(map)

        self.__maps = tuple(maps)

        # Create memoryviews for the planes

        planes = []

        for i in range(fb.num_planes):
            fd = fb.fd(i)
            info = bufinfos[fd]

            mv = memoryview(info['map'])

            start = fb.offset(i)
            end = fb.offset(i) + fb.length(i)

            mv = mv[start:end]

            planes.append(mv)

        self.__planes = tuple(planes)

        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        for p in self.__planes:
            p.release()

        for mm in self.__maps:
            mm.close()

    @property
    def planes(self):
        return self.__planes


def __FrameBuffer__mmap(self):
    return MappedFrameBuffer(self)


FrameBuffer.mmap = __FrameBuffer__mmap