#!/usr/bin/env python3
#
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt.py - camera tuning tool
import os
import sys
from ctt_image_load import *
from ctt_ccm import *
from ctt_awb import *
from ctt_alsc import *
from ctt_lux import *
from ctt_noise import *
from ctt_geq import *
from ctt_pretty_print_json import *
import random
import json
import re
"""
This file houses the camera object, which is used to perform the calibrations.
The camera object houses all the calibration images as attributes in two lists:
- imgs (macbeth charts)
- imgs_alsc (alsc correction images)
Various calibrations are methods of the camera object, and the output is stored
in a dictionary called self.json.
Once all the caibration has been completed, the Camera.json is written into a
json file.
The camera object initialises its json dictionary by reading from a pre-written
blank json file. This has been done to avoid reproducing the entire json file
in the code here, thereby avoiding unecessary clutter.
"""
"""
Get the colour and lux values from the strings of each inidvidual image
"""
def get_col_lux(string):
"""
Extract colour and lux values from filename
"""
col = re.search(r'([0-9]+)[kK](\.(jpg|jpeg|brcm|dng)|_.*\.(jpg|jpeg|brcm|dng))$', string)
lux = re.search(r'([0-9]+)[lL](\.(jpg|jpeg|brcm|dng)|_.*\.(jpg|jpeg|brcm|dng))$', string)
try:
col = col.group(1)
except AttributeError:
"""
Catch error if images labelled incorrectly and pass reasonable defaults
"""
return None, None
try:
lux = lux.group(1)
except AttributeError:
"""
Catch error if images labelled incorrectly and pass reasonable defaults
Still returns colour if that has been found.
"""
return col, None
return int(col), int(lux)
"""
Camera object that is the backbone of the tuning tool.
Input is the desired path of the output json.
"""
class Camera:
def __init__(self, jfile):
self.path = os.path.dirname(os.path.expanduser(__file__)) + '/'
if self.path == '/':
self.path = ''
self.imgs = []
self.imgs_alsc = []
self.log = 'Log created : ' + time.asctime(time.localtime(time.time()))
self.log_separator = '\n'+'-'*70+'\n'
self.jf = jfile
"""
initial json dict populated by uncalibrated values
"""
self.json = {
"rpi.black_level": {
"black_level": 4096
},
"rpi.dpc": {
},
"rpi.lux": {
"reference_shutter_speed": 10000,
"reference_gain": 1,
"reference_aperture": 1.0
},
"rpi.noise": {
},
"rpi.geq": {
},
"rpi.sdn": {
},
"rpi.awb": {
"priors": [
{"lux": 0, "prior": [2000, 1.0, 3000, 0.0, 13000, 0.0]},
{"lux": 800, "prior": [2000, 0.0, 6000, 2.0, 13000, 2.0]},
{"lux": 1500, "prior": [2000, 0.0, 4000, 1.0, 6000, 6.0, 6500, 7.0, 7000, 1.0, 13000, 1.0]}
],
"modes": {
"auto": {"lo": 2500, "hi": 8000},
"incandescent": {"lo": 2500, "hi": 3000},
"tungsten": {"lo": 3000, "hi": 3500},
"fluorescent": {"lo": 4000, "hi": 4700},
"indoor": {"lo": 3000, "hi": 5000},
"daylight": {"lo": 5500, "hi": 6500},
"cloudy": {"lo": 7000, "hi": 8600}
},
"bayes": 1
},
"rpi.agc": {
"metering_modes": {
"centre-weighted": {
"weights": [3, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 0, 0, 0, 0]
},
"spot": {
"weights": [2, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
},
"matrix": {
"weights": [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
}
},
"exposure_modes": {
"normal": {
"shutter": [100, 10000, 30000, 60000, 120000],
"gain": [1.0, 2.0, 4.0, 6.0, 6.0]
},
"short": {
"shutter": [100, 5000, 10000, 20000, 120000],
"gain": [1.0, 2.0, 4.0, 6.0, 6.0]
}
},
"constraint_modes": {
"normal": [
{"bound": "LOWER", "q_lo": 0.98, "q_hi": 1.0, "y_target": [0, 0.5, 1000, 0.5]}
],
"highlight": [
{"bound": "LOWER", "q_lo": 0.98, "q_hi": 1.0, "y_target": [0, 0.5, 1000, 0.5]},
{"bound": "UPPER", "q_lo": 0.98, "q_hi": 1.0, "y_target": [0, 0.8, 1000, 0.8]}
]
},
"y_target": [0, 0.16, 1000, 0.165, 10000, 0.17]
},
"rpi.alsc": {
'omega': 1.3,
'n_iter': 100,
'luminance_strength': 0.7,
},
"rpi.contrast": {
"ce_enable": 1,
"gamma_curve": [
0, 0,
1024, 5040,
2048, 9338,
3072, 12356,
4096, 15312,
5120, 18051,
6144, 20790,
7168, 23193,
8192, 25744,
9216, 27942,
10240, 30035,
11264, 32005,
12288, 33975,
13312, 35815,
14336, 37600,
15360, 39168,
16384, 40642,
18432, 43379,
20480, 45749,
22528, 47753,
24576, 49621,
26624, 51253,
28672, 52698,
30720, 53796,
32768, 54876,
36864, 57012,
40960, 58656,
45056, 59954,
49152, 61183,
53248, 62355,
57344, 63419,
61440, 64476,
65535, 65535
]
},
"rpi.ccm": {
},
"rpi.sharpen": {
}
}
"""
Perform colour correction calibrations by comparing macbeth patch colours
to standard macbeth chart colours.
"""
def ccm_cal(self, do_alsc_colour):
if 'rpi.ccm' in self.disable:
return 1
print('\nStarting CCM calibration')
self.log_new_sec('CCM')
"""
if image is greyscale then CCm makes no sense
"""
if self.grey:
print('\nERROR: Can\'t do CCM on greyscale image!')
self.log += '\nERROR: Cannot perform CCM calibration '
self.log += 'on greyscale image!\nCCM aborted!'
del self.json['rpi.ccm']
return 0
a = time.time()
"""
Check if alsc tables have been generated, if not then do ccm without
alsc
"""
if ("rpi.alsc" not in self.disable) and do_alsc_colour:
"""
case where ALSC colour has been done, so no errors should be
expected...
"""
try:
cal_cr_list = self.json['rpi.alsc']['calibrations_Cr']
cal_cb_list = self.json['rpi.alsc']['calibrations_Cb']
self.log += '\nALSC tables found successfully'
except KeyError:
cal_cr_list, cal_cb_list = None, None
print('WARNING! No ALSC tables found for CCM!')
print('Performing CCM calibrations without ALSC correction...')
self.log += '\nWARNING: No ALSC tables found.\nCCM calibration '
self.log += 'performed without ALSC correction...'
else:
"""
case where config options result in CCM done without ALSC colour tables
"""
cal_cr_list, cal_cb_list = None, None
self.log += '\nWARNING: No ALSC tables found.\nCCM calibration '
self.log += 'performed without ALSC correction...'
"""
Do CCM calibration
"""
try:
ccms = ccm(self, cal_cr_list, cal_cb_list)
except ArithmeticError:
print('ERROR: Matrix is singular!\nTake new pictures and try again...')
self.log += '\nERROR: Singular matrix encountered during fit!'
self.log += '\nCCM aborted!'
return 1
"""
Write output to json
"""
self.json['rpi.ccm']['ccms'] = ccms
self.log += '\nCCM calibration written to json file'
print('Finished CCM calibration')
"""
Auto white balance calibration produces a colour curve for
various colour temperatures, as well as providing a maximum 'wiggle room'
distance from this curve (transverse_neg/pos).
"""
def awb_cal(self, greyworld, do_alsc_colour):
if 'rpi.awb' in self.disable:
return 1
print('\nStarting AWB calibration')
self.log_new_sec('AWB')
"""
if image is greyscale then AWB makes no sense
"""
if self.grey:
print('\nERROR: Can\'t do AWB on greyscale image!')
self.log += '\nERROR: Cannot perform AWB calibration '
self.log += 'on greyscale image!\nAWB aborted!'
del self.json['rpi.awb']
return 0
"""
optional set greyworld (e.g. for noir cameras)
"""
if greyworld:
self.json['rpi.awb']['bayes'] = 0
self.log += '\nGreyworld set'
"""
Check if alsc tables have been generated, if not then do awb without
alsc correction
"""
if ("rpi.alsc" not in self.disable) and do_alsc_colour:
try:
cal_cr_list = self.json['rpi.alsc']['calibrations_Cr']
cal_cb_list = self.json['rpi.alsc']['calibrations_Cb']
self.log += '\nALSC tables found successfully'
except KeyError:
cal_cr_list, cal_cb_list = None, None
print('ERROR, no ALSC calibrations found for AWB')
print('Performing AWB without ALSC tables')
self.log += '\nWARNING: No ALSC tables found.\nAWB calibration '
self.log += 'performed without ALSC correction...'
else:
cal_cr_list, cal_cb_list = None, None
self.log += '\nWARNING: No ALSC tables found.\nAWB calibration '
self.log += 'performed without ALSC correction...'
"""
call calibration function
"""
plot = "rpi.awb" in self.plot
awb_out = awb(self, cal_cr_list, cal_cb_list, plot)
ct_curve, transverse_neg, transverse_pos = awb_out
"""
write output to json
"""
self.json['rpi.awb']['ct_curve'] = ct_curve
self.json['rpi.awb']['sensitivity_r'] = 1.0
self.json['rpi.awb']['sensitivity_b'] = 1.0
self.json['rpi.awb']['transverse_pos'] = transverse_pos
self.json['rpi.awb']['transverse_neg'] = transverse_neg
self.log += '\nAWB calibration written to json file'
print('Finished AWB calibration')
"""
Auto lens shading correction completely mitigates the effects of lens shading for ech
colour channel seperately, and then partially corrects for vignetting.
The extent of the correction depends on the 'luminance_strength' parameter.
"""
def alsc_cal(self, luminance_strength, do_alsc_colour):
if 'rpi.alsc' in self.disable:
return 1
print('\nStarting ALSC calibration')
self.log_new_sec('ALSC')
"""
check if alsc images have been taken
"""
if len(self.imgs_alsc) == 0:
print('\nError:\nNo alsc calibration images found')
self.log += '\nERROR: No ALSC calibration images found!'
self.log += '\nALSC calibration aborted!'
return 1
self.json['rpi.alsc']['luminance_strength'] = luminance_strength
if self.grey and do_alsc_colour:
print('Greyscale camera so only luminance_lut calculated')
do_alsc_colour = False
self.log += '\nWARNING: ALSC colour correction cannot be done on '
self.log += 'greyscale image!\nALSC colour corrections forced off!'
"""
call calibration function
"""
plot = "rpi.alsc" in self.plot
alsc_out = alsc_all(self, do_alsc_colour, plot)
cal_cr_list, cal_cb_list, luminance_lut, av_corn = alsc_out
"""
write ouput to json and finish if not do_alsc_colour
"""
if not do_alsc_colour:
self.json['rpi.alsc']['luminance_lut'] = luminance_lut
self.json['rpi.alsc']['n_iter'] = 0
self.log += '\nALSC calibrations written to json file'
self.log += '\nNo colour calibrations performed'
print('Finished ALSC calibrations')
return 1
self.json['rpi.alsc']['calibrations_Cr'] = cal_cr_list
self.json['rpi.alsc']['calibrations_Cb'] = cal_cb_list
self.json['rpi.alsc']['luminance_lut'] = luminance_lut
self.log += '\nALSC colour and luminance tables written to json file'
"""
The sigmas determine the strength of the adaptive algorithm, that
cleans up any lens shading that has slipped through the alsc. These are
determined by measuring a 'worst-case' difference between two alsc tables
that are adjacent in colour space. If, however, only one colour
temperature has been provided, then this difference can not be computed
as only one table is available.
To determine the sigmas you would have to estimate the error of an alsc
table with only the image it was taken on as a check. To avoid circularity,
dfault exaggerated sigmas are used, which can result in too much alsc and
is therefore not advised.
In general, just take another alsc picture at another colour temperature!
"""
if len(self.imgs_alsc) == 1:
self.json['rpi.alsc']['sigma'] = 0.005
self.json['rpi.alsc']['sigma_Cb'] = 0.005
print('\nWarning:\nOnly one alsc calibration found'
'\nStandard sigmas used for adaptive algorithm.')
print('Finished ALSC calibrations')
self.log += '\nWARNING: Only one colour temperature found in '
self.log += 'calibration images.\nStandard sigmas used for adaptive '
self.log += 'algorithm!'
return 1
"""
obtain worst-case scenario residual sigmas
"""
sigma_r, sigma_b = get_sigma(self, cal_cr_list, cal_cb_list)
"""
write output to json
"""
self.json['rpi.alsc']['sigma'] = np.round(sigma_r, 5)
self.json['rpi.alsc']['sigma_Cb'] = np.round(sigma_b, 5)
self.log += '\nCalibrated sigmas written to json file'
print('Finished ALSC calibrations')
"""
Green equalisation fixes problems caused by discrepancies in green
channels. This is done by measuring the effect on macbeth chart patches,
which ideally would have the same green values throughout.
An upper bound linear model is fit, fixing a threshold for the green
differences that are corrected.
"""
def geq_cal(self):
if 'rpi.geq' in self.disable:
return 1
print('\nStarting GEQ calibrations')
self.log_new_sec('GEQ')
"""
perform calibration
"""
plot = 'rpi.geq' in self.plot
slope, offset = geq_fit(self, plot)
"""
write output to json
"""
self.json['rpi.geq']['offset'] = offset
self.json['rpi.geq']['slope'] = slope
self.log += '\nGEQ calibrations written to json file'
print('Finished GEQ calibrations')
"""
Lux calibrations allow the lux level of a scene to be estimated by a ratio
calculation. Lux values are used in the pipeline for algorithms such as AGC
and AWB
"""
def lux_cal(self):
if 'rpi.lux' in self.disable:
return 1
print('\nStarting LUX calibrations')
self.log_new_sec('LUX')
"""
The lux calibration is done on a single image. For best effects, the
image with lux level closest to 1000 is chosen.
"""
luxes = [Img.lux for Img in self.imgs]
argmax = luxes.index(min(luxes, key=lambda l: abs(1000-l)))
Img = self.imgs[argmax]
self.log += '\nLux found closest to 1000: {} lx'.format(Img.lux)
self.log += '\nImage used: ' + Img.name
if Img.lux < 50:
self.log += '\nWARNING: Low lux could cause inaccurate calibrations!'
"""
do calibration
"""
lux_out, shutter_speed, gain = lux(self, Img)
"""
write output to json
"""
self.json['rpi.lux']['reference_shutter_speed'] = shutter_speed
self.json['rpi.lux']['reference_gain'] = gain
self.json['rpi.lux']['reference_lux'] = Img.lux
self.json['rpi.lux']['reference_Y'] = lux_out
self.log += '\nLUX calibrations written to json file'
print('Finished LUX calibrations')
"""
Noise alibration attempts to describe the noise profile of the sensor. The
calibration is run on macbeth images and the final output is taken as the average
"""
def noise_cal(self):
if 'rpi.noise' in self.disable:
return 1
print('\nStarting NOISE calibrations')
self.log_new_sec('NOISE')
"""
run calibration on all images and sort by slope.
"""
plot = "rpi.noise" in self.plot
noise_out = sorted([noise(self, Img, plot) for Img in self.imgs], key=lambda x: x[0])
self.log += '\nFinished processing images'
"""
take the average of the interquartile
"""
length = len(noise_out)
noise_out = np.mean(noise_out[length//4:1+3*length//4], axis=0)
self.log += '\nAverage noise profile: constant = {} '.format(int(noise_out[1]))
self.log += 'slope = {:.3f}'.format(noise_out[0])
"""
write to json
"""
self.json['rpi.noise']['reference_constant'] = int(noise_out[1])
self.json['rpi.noise']['reference_slope'] = round(noise_out[0], 3)
self.log += '\nNOISE calibrations written to json'
print('Finished NOISE calibrations')
"""
Removes json entries that are turned off
"""
def json_remove(self, disable):
self.log_new_sec('Disabling Options', cal=False)
if len(self.disable) == 0:
self.log += '\nNothing disabled!'
return 1
for key in disable:
try:
del self.json[key]
self.log += '\nDisabled: ' + key
except KeyError:
self.log += '\nERROR: ' + key + ' not found!'
"""
writes the json dictionary to the raw json file then make pretty
"""
def write_json(self):
"""
Write json dictionary to file
"""
jstring = json.dumps(self.json, sort_keys=False)
"""
make it pretty :)
"""
pretty_print_json(jstring, self.jf)
"""
add a new section to the log file
"""
def log_new_sec(self, section, cal=True):
self.log += '\n'+self.log_separator
self.log += section
if cal:
self.log += ' Calibration'
self.log += self.log_separator
"""
write script arguments to log file
"""
def log_user_input(self, json_output, directory, config, log_output):
self.log_new_sec('User Arguments', cal=False)
self.log += '\nJson file output: ' + json_output
self.log += '\nCalibration images directory: ' + directory
if config is None:
self.log += '\nNo configuration file input... using default options'
elif config is False:
self.log += '\nWARNING: Invalid configuration file path...'
self.log += ' using default options'
elif config is True:
self.log += '\nWARNING: Invalid syntax in configuration file...'
self.log += ' using default options'
else:
self.log += '\nConfiguration file: ' + config
if log_output is None:
self.log += '\nNo log file path input... using default: ctt_log.txt'
else:
self.log += '\nLog file output: ' + log_output
# if log_output
"""
write log file
"""
def write_log(self, filename):
if filename is None:
filename = 'ctt_log.txt'
self.log += '\n' + self.log_separator
with open(filename, 'w') as logfile:
logfile.write(self.log)
"""
Add all images from directory, pass into relevant list of images and
extrace lux and temperature values.
"""
def add_imgs(self, directory, mac_config, blacklevel=-1):
self.log_new_sec('Image Loading', cal=False)
img_suc_msg = 'Image loaded successfully!'
print('\n\nLoading images from '+directory)
self.log += '\nDirectory: ' + directory
"""
get list of files
"""
filename_list = get_photos(directory)
print("Files found: {}".format(len(filename_list)))
self.log += '\nFiles found: {}'.format(len(filename_list))
"""
iterate over files
"""
f
MutexLocker locker(mutex_);
if (!callbacks_) {
LOG(HAL, Error) << "Can't open camera before callbacks are set";
return { nullptr, -ENODEV };
}
CameraDevice *camera = cameraDeviceFromHalId(id);
if (!camera) {
LOG(HAL, Error) << "Invalid camera id '" << id << "'";
return { nullptr, -ENODEV };
}
int ret = camera->open(hardwareModule);
if (ret)
return { nullptr, ret };
LOG(HAL, Info) << "Open camera '" << id << "'";
return { camera, 0 };
}
void CameraHalManager::cameraAdded(std::shared_ptr<Camera> cam)
{
unsigned int id;
bool isCameraExternal = false;
bool isCameraNew = false;
MutexLocker locker(mutex_);
/*
* Each camera is assigned a unique integer ID when it is seen for the
* first time. If the camera has been seen before, the previous ID is
* re-used.
*
* IDs starts from '0' for internal cameras and '1000' for external
* cameras.
*/
auto iter = cameraIdsMap_.find(cam->id());
if (iter != cameraIdsMap_.end()) {
id = iter->second;
if (id >= firstExternalCameraId_)
isCameraExternal = true;
} else {
isCameraNew = true;
/*
* Now check if this is an external camera and assign
* its id accordingly.
*/
if (cameraLocation(cam.get()) == properties::CameraLocationExternal) {
isCameraExternal = true;
id = nextExternalCameraId_;
} else {
id = numInternalCameras_;
}
}
/*
* The configuration file must be valid, and contain a corresponding
* entry for internal cameras. External cameras can be initialized
* without configuration file.
*/
if (!isCameraExternal && !halConfig_.exists()) {
LOG(HAL, Error)
<< "HAL configuration file is mandatory for internal cameras";
return;
}
const CameraConfigData *cameraConfigData = halConfig_.cameraConfigData(cam->id());
/*
* Some cameras whose location is reported by libcamera as external may
* actually be internal to the device. This is common with UVC cameras
* that are integrated in a laptop. In that case the real location
* should be specified in the configuration file.
* ccm also technically does an awb but it measures this from the macbeth
chart in the image rather than using calibration data
"""
if Cam.check_imgs():
Cam.json['rpi.black_level']['black_level'] = Cam.blacklevel_16
Cam.json_remove(disable)
print('\nSTARTING CALIBRATIONS')
Cam.alsc_cal(luminance_strength, do_alsc_colour)
Cam.geq_cal()
Cam.lux_cal()
Cam.noise_cal()
Cam.awb_cal(greyworld, do_alsc_colour)
Cam.ccm_cal(do_alsc_colour)
print('\nFINISHED CALIBRATIONS')
Cam.write_json()
Cam.write_log(log_output)
print('\nCalibrations written to: '+json_output)
if log_output is None:
log_output = 'ctt_log.txt'
print('Log file written to: '+log_output)
pass
else:
Cam.write_log(log_output)
if __name__ == '__main__':
"""
initialise calibration
"""
if len(sys.argv) == 1:
print("""
Pisp Camera Tuning Tool version 1.0
Required Arguments:
'-i' : Calibration image directory.
'-o' : Name of output json file.
Optional Arguments:
'-c' : Config file for the CTT. If not passed, default parameters used.
'-l' : Name of output log file. If not passed, 'ctt_log.txt' used.
""")
quit(0)
else:
"""
parse input arguments
"""
json_output, directory, config, log_output = parse_input()
run_ctt(json_output, directory, config, log_output)