/* SPDX-License-Identifier: GPL-2.0-or-later */ /* * Copyright (C) 2020, Umang Jain * * hotplug-cameras.cpp - Test cameraAdded/cameraRemoved signals in CameraManager */ #include #include #include #include #include #include #include #include #include #include #include #include "test.h" using namespace libcamera; using namespace std::chrono_literals; class HotplugTest : public Test { protected: void cameraAddedHandler([[maybe_unused]] std::shared_ptr cam) { cameraAdded_ = true; } void cameraRemovedHandler([[maybe_unused]] std::shared_ptr cam) { cameraRemoved_ = true; } int init() { if (!File::exists("/sys/module/uvcvideo")) { std::cout << "uvcvideo driver is not loaded, skipping" << std::endl; return TestSkip; } if (geteuid() != 0) { std::cout << "This test requires root permissions, skipping" << std::endl; return TestSkip; } cm_ = new CameraManager(); if (cm_->start()) { std::cout << "Failed to start camera manager" << std::endl; return TestFail; } cameraAdded_ = false; cameraRemoved_ = false; cm_->cameraAdded.connect(this, &HotplugTest::cameraAddedHandler); cm_->cameraRemoved.connect(this, &HotplugTest::cameraRemovedHandler); return 0; } int run() { DIR *dir; struct dirent *dirent; std::string uvcDeviceDir; dir = opendir(uvcDriverDir_.c_str()); /* Find a UVC device directory, which we can bind/unbind. */ while ((dirent = readdir(dir)) != nullptr) { if (!File::exists(uvcDriverDir_ + dirent->d_name + "/video4linux")) continue; uvcDeviceDir = dirent->d_name; break; } closedir(dir); /* If no UVC device found, skip the test. */ if (uvcDeviceDir.empty()) return TestSkip; /* Unbind a camera and process events. */ std::ofstream(uvcDriverDir_ + "unbind", std::ios::binary) << uvcDeviceDir; Timer timer; timer.start(1000ms); while (timer.isRunning() && !cameraRemoved_) Thread::current()->eventDispatcher()->processEvents(); if (!cameraRemoved_) { std::cout << "Camera unplug not detected" << std::endl; return TestFail; } /* Bind the camera again and process events. */ std::ofstream(uvcDriverDir_ + "bind", std::ios::binary) << uvcDeviceDir; timer.start(1000ms); while (timer.isRunning() && !cameraAdded_) Thread::current()->eventDispatcher()->processEvents(); if (!cameraAdded_) { std::cout << "Camera plug not detected" << std::endl; return TestFail; } return TestPass; } void cleanup() { cm_->stop(); delete cm_; } private: CameraManager *cm_; static const std::string uvcDriverDir_; bool cameraRemoved_; bool cameraAdded_; }; const std::string HotplugTest::uvcDriverDir_ = "/sys/bus/usb/drivers/uvcvideo/"; TEST_REGISTER(HotplugTest) 'content'>blob: f4504d457b2cf6f39c724fca08adcf34052d8e69 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/usr/bin/env python3
#
# SPDX-License-Identifier: BSD-2-Clause
#
# Script to convert version 1.0 Raspberry Pi camera tuning files to version 2.0.
#
# Copyright 2022 Raspberry Pi Ltd

import argparse
import json
import sys

from ctt_pretty_print_json import pretty_print


def convert_v2(in_json: dict) -> str:

    if 'version' in in_json.keys() and in_json['version'] != 1.0:
        print(f'The JSON config reports version {in_json["version"]} that is incompatible with this tool.')
        sys.exit(-1)

    converted = {
        'version': 2.0,
        'target': 'bcm2835',
        'algorithms': [{algo: config} for algo, config in in_json.items()]
    }

    return pretty_print(converted)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter, description=
                    'Convert the format of the Raspberry Pi camera tuning file from v1.0 to v2.0.\n')
    parser.add_argument('input', type=str, help='Input tuning file.')
    parser.add_argument('output', type=str, nargs='?',
                        help='Output converted tuning file. If not provided, the input file will be updated in-place.',
                        default=None)
    args = parser.parse_args()

    with open(args.input, 'r') as f:
        in_json = json.load(f)

    out_json = convert_v2(in_json)

    with open(args.output if args.output is not None else args.input, 'w') as f:
        f.write(out_json)