summaryrefslogtreecommitdiff
path: root/test/py
diff options
context:
space:
mode:
Diffstat (limited to 'test/py')
-rw-r--r--test/py/meson.build42
-rwxr-xr-xtest/py/unittests.py365
2 files changed, 407 insertions, 0 deletions
diff --git a/test/py/meson.build b/test/py/meson.build
new file mode 100644
index 00000000..b922e857
--- /dev/null
+++ b/test/py/meson.build
@@ -0,0 +1,42 @@
+# SPDX-License-Identifier: CC0-1.0
+
+if not pycamera_enabled
+ subdir_done()
+endif
+
+# If ASan is enabled, the link order runtime check will fail as Python is not
+# linked to ASan. LD_PRELOAD the ASan runtime if available, or skip the test
+# otherwise.
+
+if asan_runtime_missing
+ warning('Unable to get path to ASan runtime, Python test disabled')
+ subdir_done()
+endif
+
+py_env = environment()
+
+pymod = import('python')
+py3 = pymod.find_installation('python3')
+
+pypathdir = meson.project_build_root() / 'src' / 'py'
+py_env.append('PYTHONPATH', pypathdir)
+
+if asan_enabled
+ py_env.append('LD_PRELOAD', asan_runtime)
+
+ # Preload the C++ standard library to work around a bug in ASan when
+ # dynamically loading C++ .so modules.
+ stdlib = run_command(cxx, '-print-file-name=' + cxx_stdlib + '.so',
+ check : true).stdout().strip()
+ py_env.append('LD_PRELOAD', stdlib)
+
+ # Disable leak detection as the Python interpreter is full of leaks.
+ py_env.append('ASAN_OPTIONS', 'detect_leaks=0')
+endif
+
+test('pyunittests',
+ py3,
+ args : files('unittests.py'),
+ env : py_env,
+ suite : 'pybindings',
+ is_parallel : false)
diff --git a/test/py/unittests.py b/test/py/unittests.py
new file mode 100755
index 00000000..8cb850d4
--- /dev/null
+++ b/test/py/unittests.py
@@ -0,0 +1,365 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from collections import defaultdict
+import gc
+import libcamera as libcam
+import selectors
+import typing
+import unittest
+import weakref
+
+
+class BaseTestCase(unittest.TestCase):
+ def assertZero(self, a, msg=None):
+ self.assertEqual(a, 0, msg)
+
+ def assertIsAlive(self, wr, msg='object not alive'):
+ self.assertIsNotNone(wr(), msg)
+
+ def assertIsDead(self, wr, msg='object not dead'):
+ self.assertIsNone(wr(), msg)
+
+ def assertIsAllAlive(self, wr_list, msg='object not alive'):
+ self.assertTrue(all([wr() for wr in wr_list]), msg)
+
+ def assertIsAllDead(self, wr_list, msg='object not dead'):
+ self.assertTrue(all([not wr() for wr in wr_list]), msg)
+
+
+class SimpleTestMethods(BaseTestCase):
+ def test_get_ref(self):
+ cm = libcam.CameraManager.singleton()
+ wr_cm = weakref.ref(cm)
+
+ cam = cm.get('platform/vimc.0 Sensor B')
+ self.assertIsNotNone(cam)
+ wr_cam = weakref.ref(cam)
+
+ del cm
+ gc.collect()
+ self.assertIsAlive(wr_cm)
+
+ del cam
+ gc.collect()
+ self.assertIsDead(wr_cm)
+ self.assertIsDead(wr_cam)
+
+ def test_acquire_release(self):
+ cm = libcam.CameraManager.singleton()
+ cam = cm.get('platform/vimc.0 Sensor B')
+ self.assertIsNotNone(cam)
+
+ cam.acquire()
+
+ cam.release()
+
+ def test_double_acquire(self):
+ cm = libcam.CameraManager.singleton()
+ cam = cm.get('platform/vimc.0 Sensor B')
+ self.assertIsNotNone(cam)
+
+ cam.acquire()
+
+ libcam.log_set_level('Camera', 'FATAL')
+ with self.assertRaises(RuntimeError):
+ cam.acquire()
+ libcam.log_set_level('Camera', 'INFO')
+
+ cam.release()
+
+ # I expected exception here, but looks like double release works fine
+ cam.release()
+
+ def test_version(self):
+ cm = libcam.CameraManager.singleton()
+ self.assertIsInstance(cm.version, str)
+
+
+class CameraTesterBase(BaseTestCase):
+ cm: typing.Any
+ cam: typing.Any
+
+ def setUp(self):
+ self.cm = libcam.CameraManager.singleton()
+ self.cam = next((cam for cam in self.cm.cameras if 'platform/vimc' in cam.id), None)
+ if self.cam is None:
+ self.cm = None
+ self.skipTest('No vimc found')
+
+ self.cam.acquire()
+
+ self.wr_cam = weakref.ref(self.cam)
+ self.wr_cm = weakref.ref(self.cm)
+
+ def tearDown(self):
+ # If a test fails, the camera may be in running state. So always stop.
+ self.cam.stop()
+
+ self.cam.release()
+
+ self.cam = None
+ self.cm = None
+
+ self.assertIsDead(self.wr_cm)
+ self.assertIsDead(self.wr_cam)
+
+
+class AllocatorTestMethods(CameraTesterBase):
+ def test_allocator(self):
+ cam = self.cam
+
+ camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
+ self.assertTrue(camconfig.size == 1)
+ wr_camconfig = weakref.ref(camconfig)
+
+ streamconfig = camconfig.at(0)
+ wr_streamconfig = weakref.ref(streamconfig)
+
+ cam.configure(camconfig)
+
+ stream = streamconfig.stream
+ wr_stream = weakref.ref(stream)
+
+ # stream should keep streamconfig and camconfig alive
+ del streamconfig
+ del camconfig
+ gc.collect()
+ self.assertIsAlive(wr_camconfig)
+ self.assertIsAlive(wr_streamconfig)
+
+ allocator = libcam.FrameBufferAllocator(cam)
+ num_bufs = allocator.allocate(stream)
+ self.assertTrue(num_bufs > 0)
+ wr_allocator = weakref.ref(allocator)
+
+ buffers = allocator.buffers(stream)
+ self.assertIsNotNone(buffers)
+ del buffers
+
+ buffer = allocator.buffers(stream)[0]
+ self.assertIsNotNone(buffer)
+ wr_buffer = weakref.ref(buffer)
+
+ del allocator
+ gc.collect()
+ self.assertIsAlive(wr_buffer)
+ self.assertIsAlive(wr_allocator)
+ self.assertIsAlive(wr_stream)
+
+ del buffer
+ gc.collect()
+ self.assertIsDead(wr_buffer)
+ self.assertIsDead(wr_allocator)
+ self.assertIsAlive(wr_stream)
+
+ del stream
+ gc.collect()
+ self.assertIsDead(wr_stream)
+ self.assertIsDead(wr_camconfig)
+ self.assertIsDead(wr_streamconfig)
+
+
+class SimpleCaptureMethods(CameraTesterBase):
+ def test_blocking(self):
+ cm = self.cm
+ cam = self.cam
+
+ camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
+ self.assertTrue(camconfig.size == 1)
+
+ streamconfig = camconfig.at(0)
+ fmts = streamconfig.formats
+ self.assertIsNotNone(fmts)
+ fmts = None
+
+ cam.configure(camconfig)
+
+ stream = streamconfig.stream
+
+ allocator = libcam.FrameBufferAllocator(cam)
+ num_bufs = allocator.allocate(stream)
+ self.assertTrue(num_bufs > 0)
+
+ num_bufs = len(allocator.buffers(stream))
+
+ reqs = []
+ for i in range(num_bufs):
+ req = cam.create_request(i)
+ self.assertIsNotNone(req)
+
+ buffer = allocator.buffers(stream)[i]
+ req.add_buffer(stream, buffer)
+
+ reqs.append(req)
+
+ buffer = None
+
+ cam.start()
+
+ for req in reqs:
+ cam.queue_request(req)
+
+ reqs = None
+ gc.collect()
+
+ sel = selectors.DefaultSelector()
+ sel.register(cm.event_fd, selectors.EVENT_READ)
+
+ reqs = []
+
+ while True:
+ events = sel.select()
+ if not events:
+ continue
+
+ ready_reqs = cm.get_ready_requests()
+
+ reqs += ready_reqs
+
+ if len(reqs) == num_bufs:
+ break
+
+ for i, req in enumerate(reqs):
+ self.assertTrue(i == req.cookie)
+
+ reqs = None
+ gc.collect()
+
+ cam.stop()
+
+ def test_select(self):
+ cm = self.cm
+ cam = self.cam
+
+ camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
+ self.assertTrue(camconfig.size == 1)
+
+ streamconfig = camconfig.at(0)
+ fmts = streamconfig.formats
+ self.assertIsNotNone(fmts)
+ fmts = None
+
+ cam.configure(camconfig)
+
+ stream = streamconfig.stream
+
+ allocator = libcam.FrameBufferAllocator(cam)
+ num_bufs = allocator.allocate(stream)
+ self.assertTrue(num_bufs > 0)
+
+ num_bufs = len(allocator.buffers(stream))
+
+ reqs = []
+ for i in range(num_bufs):
+ req = cam.create_request(i)
+ self.assertIsNotNone(req)
+
+ buffer = allocator.buffers(stream)[i]
+ req.add_buffer(stream, buffer)
+
+ reqs.append(req)
+
+ buffer = None
+
+ cam.start()
+
+ for req in reqs:
+ cam.queue_request(req)
+
+ reqs = None
+ gc.collect()
+
+ sel = selectors.DefaultSelector()
+ sel.register(cm.event_fd, selectors.EVENT_READ)
+
+ reqs = []
+
+ running = True
+ while running:
+ events = sel.select()
+ for _ in events:
+ ready_reqs = cm.get_ready_requests()
+
+ reqs += ready_reqs
+
+ if len(reqs) == num_bufs:
+ running = False
+
+ self.assertTrue(len(reqs) == num_bufs)
+
+ for i, req in enumerate(reqs):
+ self.assertTrue(i == req.cookie)
+
+ reqs = None
+ gc.collect()
+
+ cam.stop()
+
+
+# Recursively expand slist's objects into olist, using seen to track already
+# processed objects.
+def _getr(slist, olist, seen):
+ for e in slist:
+ if id(e) in seen:
+ continue
+ seen.add(id(e))
+ olist.append(e)
+ tl = gc.get_referents(e)
+ if tl:
+ _getr(tl, olist, seen)
+
+
+def get_all_objects(ignored=[]):
+ gcl = gc.get_objects()
+ olist = []
+ seen = set()
+
+ seen.add(id(gcl))
+ seen.add(id(olist))
+ seen.add(id(seen))
+ seen.update(set([id(o) for o in ignored]))
+
+ _getr(gcl, olist, seen)
+
+ return olist
+
+
+def create_type_count_map(olist):
+ map = defaultdict(int)
+ for o in olist:
+ map[type(o)] += 1
+ return map
+
+
+def diff_type_count_maps(before, after):
+ return [(k, after[k] - before[k]) for k in after if after[k] != before[k]]
+
+
+if __name__ == '__main__':
+ # \todo This is an attempt to see the Python objects that are not collected,
+ # but this doesn't work very well, as things always leak a bit.
+ test_leaks = False
+
+ if test_leaks:
+ gc.unfreeze()
+ gc.collect()
+
+ obs_before = get_all_objects()
+
+ unittest.main(exit=False)
+
+ if test_leaks:
+ gc.unfreeze()
+ gc.collect()
+
+ obs_after = get_all_objects([obs_before]) # type: ignore
+
+ before = create_type_count_map(obs_before) # type: ignore
+ after = create_type_count_map(obs_after)
+
+ leaks = diff_type_count_maps(before, after)
+ if len(leaks) > 0:
+ print(leaks)