From 06cb7130c4fadac3bde6e695b0a6842656c9a5d4 Mon Sep 17 00:00:00 2001 From: Tomi Valkeinen Date: Mon, 9 May 2022 13:10:22 +0300 Subject: py: Add unittests.py Add a simple unittests.py as a base for python unittests. Signed-off-by: Tomi Valkeinen Reviewed-by: Laurent Pinchart Signed-off-by: Laurent Pinchart Signed-off-by: Kieran Bingham --- test/meson.build | 1 + test/py/meson.build | 17 +++ test/py/unittests.py | 352 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 370 insertions(+) create mode 100644 test/py/meson.build create mode 100755 test/py/unittests.py (limited to 'test') diff --git a/test/meson.build b/test/meson.build index 9c68fae1..d050bfa1 100644 --- a/test/meson.build +++ b/test/meson.build @@ -18,6 +18,7 @@ subdir('log') subdir('media_device') subdir('pipeline') subdir('process') +subdir('py') subdir('serialization') subdir('stream') subdir('v4l2_compat') diff --git a/test/py/meson.build b/test/py/meson.build new file mode 100644 index 00000000..2affdbd4 --- /dev/null +++ b/test/py/meson.build @@ -0,0 +1,17 @@ +# SPDX-License-Identifier: CC0-1.0 + +if not pycamera_enabled + subdir_done() +endif + +pymod = import('python') +py3 = pymod.find_installation('python3') + +pypathdir = meson.project_build_root() / 'src' / 'py' + +test('pyunittests', + py3, + args : files('unittests.py'), + env : ['PYTHONPATH=' + pypathdir], + suite : 'pybindings', + is_parallel : false) diff --git a/test/py/unittests.py b/test/py/unittests.py new file mode 100755 index 00000000..cbc00ff3 --- /dev/null +++ b/test/py/unittests.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 + +# SPDX-License-Identifier: GPL-2.0-or-later +# Copyright (C) 2022, Tomi Valkeinen + +from collections import defaultdict +import errno +import gc +import libcamera as libcam +import os +import selectors +import time +import unittest +import weakref + + +class BaseTestCase(unittest.TestCase): + def assertZero(self, a, msg=None): + self.assertEqual(a, 0, msg) + + +class SimpleTestMethods(BaseTestCase): + def test_get_ref(self): + cm = libcam.CameraManager.singleton() + wr_cm = weakref.ref(cm) + + cam = cm.get('platform/vimc.0 Sensor B') + self.assertIsNotNone(cam) + wr_cam = weakref.ref(cam) + + cm = None + gc.collect() + self.assertIsNotNone(wr_cm()) + + cam = None + gc.collect() + self.assertIsNone(wr_cm()) + self.assertIsNone(wr_cam()) + + def test_acquire_release(self): + cm = libcam.CameraManager.singleton() + cam = cm.get('platform/vimc.0 Sensor B') + self.assertIsNotNone(cam) + + ret = cam.acquire() + self.assertZero(ret) + + ret = cam.release() + self.assertZero(ret) + + def test_double_acquire(self): + cm = libcam.CameraManager.singleton() + cam = cm.get('platform/vimc.0 Sensor B') + self.assertIsNotNone(cam) + + ret = cam.acquire() + self.assertZero(ret) + + libcam.log_set_level('Camera', 'FATAL') + ret = cam.acquire() + self.assertEqual(ret, -errno.EBUSY) + libcam.log_set_level('Camera', 'ERROR') + + ret = cam.release() + self.assertZero(ret) + + ret = cam.release() + # I expected EBUSY, but looks like double release works fine + self.assertZero(ret) + + +class CameraTesterBase(BaseTestCase): + def setUp(self): + self.cm = libcam.CameraManager.singleton() + self.cam = next((cam for cam in self.cm.cameras if 'platform/vimc' in cam.id), None) + if self.cam is None: + self.cm = None + self.skipTest('No vimc found') + + ret = self.cam.acquire() + if ret != 0: + self.cam = None + self.cm = None + raise Exception('Failed to acquire camera') + + def tearDown(self): + # If a test fails, the camera may be in running state. So always stop. + self.cam.stop() + + ret = self.cam.release() + if ret != 0: + raise Exception('Failed to release camera') + + self.cam = None + self.cm = None + + +class AllocatorTestMethods(CameraTesterBase): + def test_allocator(self): + cam = self.cam + + camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture]) + self.assertTrue(camconfig.size == 1) + wr_camconfig = weakref.ref(camconfig) + + streamconfig = camconfig.at(0) + wr_streamconfig = weakref.ref(streamconfig) + + ret = cam.configure(camconfig) + self.assertZero(ret) + + stream = streamconfig.stream + wr_stream = weakref.ref(stream) + + # stream should keep streamconfig and camconfig alive + streamconfig = None + camconfig = None + gc.collect() + self.assertIsNotNone(wr_camconfig()) + self.assertIsNotNone(wr_streamconfig()) + + allocator = libcam.FrameBufferAllocator(cam) + ret = allocator.allocate(stream) + self.assertTrue(ret > 0) + wr_allocator = weakref.ref(allocator) + + buffers = allocator.buffers(stream) + buffers = None + + buffer = allocator.buffers(stream)[0] + self.assertIsNotNone(buffer) + wr_buffer = weakref.ref(buffer) + + allocator = None + gc.collect() + self.assertIsNotNone(wr_buffer()) + self.assertIsNotNone(wr_allocator()) + self.assertIsNotNone(wr_stream()) + + buffer = None + gc.collect() + self.assertIsNone(wr_buffer()) + self.assertIsNone(wr_allocator()) + self.assertIsNotNone(wr_stream()) + + stream = None + gc.collect() + self.assertIsNone(wr_stream()) + self.assertIsNone(wr_camconfig()) + self.assertIsNone(wr_streamconfig()) + + +class SimpleCaptureMethods(CameraTesterBase): + def test_sleep(self): + cm = self.cm + cam = self.cam + + camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture]) + self.assertTrue(camconfig.size == 1) + + streamconfig = camconfig.at(0) + fmts = streamconfig.formats + + ret = cam.configure(camconfig) + self.assertZero(ret) + + stream = streamconfig.stream + + allocator = libcam.FrameBufferAllocator(cam) + ret = allocator.allocate(stream) + self.assertTrue(ret > 0) + + num_bufs = len(allocator.buffers(stream)) + + reqs = [] + for i in range(num_bufs): + req = cam.create_request(i) + self.assertIsNotNone(req) + + buffer = allocator.buffers(stream)[i] + ret = req.add_buffer(stream, buffer) + self.assertZero(ret) + + reqs.append(req) + + buffer = None + + ret = cam.start() + self.assertZero(ret) + + for req in reqs: + ret = cam.queue_request(req) + self.assertZero(ret) + + reqs = None + gc.collect() + + time.sleep(0.5) + + reqs = cm.get_ready_requests() + + self.assertTrue(len(reqs) == num_bufs) + + for i, req in enumerate(reqs): + self.assertTrue(i == req.cookie) + + reqs = None + gc.collect() + + ret = cam.stop() + self.assertZero(ret) + + def test_select(self): + cm = self.cm + cam = self.cam + + camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture]) + self.assertTrue(camconfig.size == 1) + + streamconfig = camconfig.at(0) + fmts = streamconfig.formats + + ret = cam.configure(camconfig) + self.assertZero(ret) + + stream = streamconfig.stream + + allocator = libcam.FrameBufferAllocator(cam) + ret = allocator.allocate(stream) + self.assertTrue(ret > 0) + + num_bufs = len(allocator.buffers(stream)) + + reqs = [] + for i in range(num_bufs): + req = cam.create_request(i) + self.assertIsNotNone(req) + + buffer = allocator.buffers(stream)[i] + ret = req.add_buffer(stream, buffer) + self.assertZero(ret) + + reqs.append(req) + + buffer = None + + ret = cam.start() + self.assertZero(ret) + + for req in reqs: + ret = cam.queue_request(req) + self.assertZero(ret) + + reqs = None + gc.collect() + + sel = selectors.DefaultSelector() + sel.register(cm.efd, selectors.EVENT_READ) + + reqs = [] + + running = True + while running: + events = sel.select() + for key, mask in events: + os.read(key.fileobj, 8) + + ready_reqs = cm.get_ready_requests() + + self.assertTrue(len(ready_reqs) > 0) + + reqs += ready_reqs + + if len(reqs) == num_bufs: + running = False + + self.assertTrue(len(reqs) == num_bufs) + + for i, req in enumerate(reqs): + self.assertTrue(i == req.cookie) + + reqs = None + gc.collect() + + ret = cam.stop() + self.assertZero(ret) + + +# Recursively expand slist's objects into olist, using seen to track already +# processed objects. +def _getr(slist, olist, seen): + for e in slist: + if id(e) in seen: + continue + seen.add(id(e)) + olist.append(e) + tl = gc.get_referents(e) + if tl: + _getr(tl, olist, seen) + + +def get_all_objects(ignored=[]): + gcl = gc.get_objects() + olist = [] + seen = set() + + seen.add(id(gcl)) + seen.add(id(olist)) + seen.add(id(seen)) + seen.update(set([id(o) for o in ignored])) + + _getr(gcl, olist, seen) + + return olist + + +def create_type_count_map(olist): + map = defaultdict(int) + for o in olist: + map[type(o)] += 1 + return map + + +def diff_type_count_maps(before, after): + return [(k, after[k] - before[k]) for k in after if after[k] != before[k]] + + +if __name__ == '__main__': + # \todo This is an attempt to see the Python objects that are not collected, + # but this doesn't work very well, as things always leak a bit. + test_leaks = False + + if test_leaks: + gc.unfreeze() + gc.collect() + + obs_before = get_all_objects() + + unittest.main(exit=False) + + if test_leaks: + gc.unfreeze() + gc.collect() + + obs_after = get_all_objects([obs_before]) + + before = create_type_count_map(obs_before) + after = create_type_count_map(obs_after) + + leaks = diff_type_count_maps(before, after) + if len(leaks) > 0: + print(leaks) -- cgit v1.2.1