diff options
Diffstat (limited to 'test/py')
-rw-r--r-- | test/py/meson.build | 42 | ||||
-rwxr-xr-x | test/py/unittests.py | 365 |
2 files changed, 407 insertions, 0 deletions
diff --git a/test/py/meson.build b/test/py/meson.build new file mode 100644 index 00000000..b922e857 --- /dev/null +++ b/test/py/meson.build @@ -0,0 +1,42 @@ +# SPDX-License-Identifier: CC0-1.0 + +if not pycamera_enabled + subdir_done() +endif + +# If ASan is enabled, the link order runtime check will fail as Python is not +# linked to ASan. LD_PRELOAD the ASan runtime if available, or skip the test +# otherwise. + +if asan_runtime_missing + warning('Unable to get path to ASan runtime, Python test disabled') + subdir_done() +endif + +py_env = environment() + +pymod = import('python') +py3 = pymod.find_installation('python3') + +pypathdir = meson.project_build_root() / 'src' / 'py' +py_env.append('PYTHONPATH', pypathdir) + +if asan_enabled + py_env.append('LD_PRELOAD', asan_runtime) + + # Preload the C++ standard library to work around a bug in ASan when + # dynamically loading C++ .so modules. + stdlib = run_command(cxx, '-print-file-name=' + cxx_stdlib + '.so', + check : true).stdout().strip() + py_env.append('LD_PRELOAD', stdlib) + + # Disable leak detection as the Python interpreter is full of leaks. + py_env.append('ASAN_OPTIONS', 'detect_leaks=0') +endif + +test('pyunittests', + py3, + args : files('unittests.py'), + env : py_env, + suite : 'pybindings', + is_parallel : false) diff --git a/test/py/unittests.py b/test/py/unittests.py new file mode 100755 index 00000000..8cb850d4 --- /dev/null +++ b/test/py/unittests.py @@ -0,0 +1,365 @@ +#!/usr/bin/env python3 + +# SPDX-License-Identifier: GPL-2.0-or-later +# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com> + +from collections import defaultdict +import gc +import libcamera as libcam +import selectors +import typing +import unittest +import weakref + + +class BaseTestCase(unittest.TestCase): + def assertZero(self, a, msg=None): + self.assertEqual(a, 0, msg) + + def assertIsAlive(self, wr, msg='object not alive'): + self.assertIsNotNone(wr(), msg) + + def assertIsDead(self, wr, msg='object not dead'): + self.assertIsNone(wr(), msg) + + def assertIsAllAlive(self, wr_list, msg='object not alive'): + self.assertTrue(all([wr() for wr in wr_list]), msg) + + def assertIsAllDead(self, wr_list, msg='object not dead'): + self.assertTrue(all([not wr() for wr in wr_list]), msg) + + +class SimpleTestMethods(BaseTestCase): + def test_get_ref(self): + cm = libcam.CameraManager.singleton() + wr_cm = weakref.ref(cm) + + cam = cm.get('platform/vimc.0 Sensor B') + self.assertIsNotNone(cam) + wr_cam = weakref.ref(cam) + + del cm + gc.collect() + self.assertIsAlive(wr_cm) + + del cam + gc.collect() + self.assertIsDead(wr_cm) + self.assertIsDead(wr_cam) + + def test_acquire_release(self): + cm = libcam.CameraManager.singleton() + cam = cm.get('platform/vimc.0 Sensor B') + self.assertIsNotNone(cam) + + cam.acquire() + + cam.release() + + def test_double_acquire(self): + cm = libcam.CameraManager.singleton() + cam = cm.get('platform/vimc.0 Sensor B') + self.assertIsNotNone(cam) + + cam.acquire() + + libcam.log_set_level('Camera', 'FATAL') + with self.assertRaises(RuntimeError): + cam.acquire() + libcam.log_set_level('Camera', 'INFO') + + cam.release() + + # I expected exception here, but looks like double release works fine + cam.release() + + def test_version(self): + cm = libcam.CameraManager.singleton() + self.assertIsInstance(cm.version, str) + + +class CameraTesterBase(BaseTestCase): + cm: typing.Any + cam: typing.Any + + def setUp(self): + self.cm = libcam.CameraManager.singleton() + self.cam = next((cam for cam in self.cm.cameras if 'platform/vimc' in cam.id), None) + if self.cam is None: + self.cm = None + self.skipTest('No vimc found') + + self.cam.acquire() + + self.wr_cam = weakref.ref(self.cam) + self.wr_cm = weakref.ref(self.cm) + + def tearDown(self): + # If a test fails, the camera may be in running state. So always stop. + self.cam.stop() + + self.cam.release() + + self.cam = None + self.cm = None + + self.assertIsDead(self.wr_cm) + self.assertIsDead(self.wr_cam) + + +class AllocatorTestMethods(CameraTesterBase): + def test_allocator(self): + cam = self.cam + + camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture]) + self.assertTrue(camconfig.size == 1) + wr_camconfig = weakref.ref(camconfig) + + streamconfig = camconfig.at(0) + wr_streamconfig = weakref.ref(streamconfig) + + cam.configure(camconfig) + + stream = streamconfig.stream + wr_stream = weakref.ref(stream) + + # stream should keep streamconfig and camconfig alive + del streamconfig + del camconfig + gc.collect() + self.assertIsAlive(wr_camconfig) + self.assertIsAlive(wr_streamconfig) + + allocator = libcam.FrameBufferAllocator(cam) + num_bufs = allocator.allocate(stream) + self.assertTrue(num_bufs > 0) + wr_allocator = weakref.ref(allocator) + + buffers = allocator.buffers(stream) + self.assertIsNotNone(buffers) + del buffers + + buffer = allocator.buffers(stream)[0] + self.assertIsNotNone(buffer) + wr_buffer = weakref.ref(buffer) + + del allocator + gc.collect() + self.assertIsAlive(wr_buffer) + self.assertIsAlive(wr_allocator) + self.assertIsAlive(wr_stream) + + del buffer + gc.collect() + self.assertIsDead(wr_buffer) + self.assertIsDead(wr_allocator) + self.assertIsAlive(wr_stream) + + del stream + gc.collect() + self.assertIsDead(wr_stream) + self.assertIsDead(wr_camconfig) + self.assertIsDead(wr_streamconfig) + + +class SimpleCaptureMethods(CameraTesterBase): + def test_blocking(self): + cm = self.cm + cam = self.cam + + camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture]) + self.assertTrue(camconfig.size == 1) + + streamconfig = camconfig.at(0) + fmts = streamconfig.formats + self.assertIsNotNone(fmts) + fmts = None + + cam.configure(camconfig) + + stream = streamconfig.stream + + allocator = libcam.FrameBufferAllocator(cam) + num_bufs = allocator.allocate(stream) + self.assertTrue(num_bufs > 0) + + num_bufs = len(allocator.buffers(stream)) + + reqs = [] + for i in range(num_bufs): + req = cam.create_request(i) + self.assertIsNotNone(req) + + buffer = allocator.buffers(stream)[i] + req.add_buffer(stream, buffer) + + reqs.append(req) + + buffer = None + + cam.start() + + for req in reqs: + cam.queue_request(req) + + reqs = None + gc.collect() + + sel = selectors.DefaultSelector() + sel.register(cm.event_fd, selectors.EVENT_READ) + + reqs = [] + + while True: + events = sel.select() + if not events: + continue + + ready_reqs = cm.get_ready_requests() + + reqs += ready_reqs + + if len(reqs) == num_bufs: + break + + for i, req in enumerate(reqs): + self.assertTrue(i == req.cookie) + + reqs = None + gc.collect() + + cam.stop() + + def test_select(self): + cm = self.cm + cam = self.cam + + camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture]) + self.assertTrue(camconfig.size == 1) + + streamconfig = camconfig.at(0) + fmts = streamconfig.formats + self.assertIsNotNone(fmts) + fmts = None + + cam.configure(camconfig) + + stream = streamconfig.stream + + allocator = libcam.FrameBufferAllocator(cam) + num_bufs = allocator.allocate(stream) + self.assertTrue(num_bufs > 0) + + num_bufs = len(allocator.buffers(stream)) + + reqs = [] + for i in range(num_bufs): + req = cam.create_request(i) + self.assertIsNotNone(req) + + buffer = allocator.buffers(stream)[i] + req.add_buffer(stream, buffer) + + reqs.append(req) + + buffer = None + + cam.start() + + for req in reqs: + cam.queue_request(req) + + reqs = None + gc.collect() + + sel = selectors.DefaultSelector() + sel.register(cm.event_fd, selectors.EVENT_READ) + + reqs = [] + + running = True + while running: + events = sel.select() + for _ in events: + ready_reqs = cm.get_ready_requests() + + reqs += ready_reqs + + if len(reqs) == num_bufs: + running = False + + self.assertTrue(len(reqs) == num_bufs) + + for i, req in enumerate(reqs): + self.assertTrue(i == req.cookie) + + reqs = None + gc.collect() + + cam.stop() + + +# Recursively expand slist's objects into olist, using seen to track already +# processed objects. +def _getr(slist, olist, seen): + for e in slist: + if id(e) in seen: + continue + seen.add(id(e)) + olist.append(e) + tl = gc.get_referents(e) + if tl: + _getr(tl, olist, seen) + + +def get_all_objects(ignored=[]): + gcl = gc.get_objects() + olist = [] + seen = set() + + seen.add(id(gcl)) + seen.add(id(olist)) + seen.add(id(seen)) + seen.update(set([id(o) for o in ignored])) + + _getr(gcl, olist, seen) + + return olist + + +def create_type_count_map(olist): + map = defaultdict(int) + for o in olist: + map[type(o)] += 1 + return map + + +def diff_type_count_maps(before, after): + return [(k, after[k] - before[k]) for k in after if after[k] != before[k]] + + +if __name__ == '__main__': + # \todo This is an attempt to see the Python objects that are not collected, + # but this doesn't work very well, as things always leak a bit. + test_leaks = False + + if test_leaks: + gc.unfreeze() + gc.collect() + + obs_before = get_all_objects() + + unittest.main(exit=False) + + if test_leaks: + gc.unfreeze() + gc.collect() + + obs_after = get_all_objects([obs_before]) # type: ignore + + before = create_type_count_map(obs_before) # type: ignore + after = create_type_count_map(obs_after) + + leaks = diff_type_count_maps(before, after) + if len(leaks) > 0: + print(leaks) |