# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import numpy as np
import abel
from . import basex
from . import dasch
from . import daun
from . import direct
from . import hansenlaw
from . import linbasex
from . import onion_bordas
from . import rbasex
from . import tools
from timeit import default_timer as timer
import itertools
import sys
def _ensure_list(x):
"""
Convert the argument to a list (a scalar becomes a single-element list).
"""
return [x] if np.ndim(x) == 0 else list(x)
def _roundsf(x, n):
"""
Round to n significant digits
"""
return float('{:.{p}g}'.format(x, p=n))
[docs]
class Timent(object):
"""
Helper class for measuring execution times.
The constructor only initializes the timing-procedure parameters.
Use the :py:meth:`.time` method to run it for particular functions.
Parameters
----------
skip : int
number of "warm-up" iterations to perform before the measurements.
Can be specified as a negative number, then ``abs(skip)``
"warm-up" iterations are performed, but if this took more than
**duration** seconds, they are accounted towards the measured
iterations.
repeat : int
minimal number of measured iterations to perform.
Must be positive.
duration : float
minimal duration (in seconds) of the measurements.
"""
def __init__(self, skip=0, repeat=1, duration=0.0):
self.skip = int(skip)
self.repeat = int(repeat)
self.duration = float(duration)
[docs]
def time(self, func, *args, **kwargs):
"""
Repeatedly executes a function at least **repeat** times and for at
least **duration** seconds (see above), then returns the average time
per iteration.
The actual number of measured iterations can be retrieved from
:py:attr:`Timent.count`.
Parameters
----------
func : callable
function to execute
*args, **kwargs : any, optional
parameters to pass to **func**
Returns
-------
float
average function execution time
Notes
-----
The measurements overhead can be estimated by executing ::
Timent(...).time(lambda: None)
with a sufficiently large number of iterations (to avoid rounding
errors due to the finite timer precision).
In 2018, this overhead was on the order of 100 ns per iteration.
"""
# Execute "skip" iterations unconditionally
t0 = timer()
for i in range(abs(self.skip)):
func(*args, **kwargs)
t = timer()
# if they took longer than "duration" and should be included
if self.skip < 0 and t - t0 > self.duration:
# account for them in the "repeat" loop
start = -self.skip
if start > self.repeat:
self.repeat = start
else:
# otherwise -- reset the timer and proceed normally
start = 0
t0 = timer()
# Execute "repeat" iterations (maybe accounting for the "skipped")
for i in range(start, self.repeat):
func(*args, **kwargs)
# Continue iterations until "duration" time passes
for i in itertools.count(self.repeat):
t = timer()
if t - t0 > self.duration:
self.count = i # save the total number of measured iterations
break
func(*args, **kwargs)
# Return the average time per iteration
return (t - t0) / self.count
[docs]
class AbelTiming(object):
"""
Benchmark performance of different Abel implementations
(basis generation, forward and inverse transforms, as applicable).
Parameters
----------
n : int or sequence of int
array size(s) for the benchmark (assuming 2D square arrays (*n*, *n*))
select : str or sequence of str
methods to benchmark. Use ``'all'`` (default) for all available or
choose any combination of individual methods::
select=['basex', 'direct_C', 'direct_Python', 'hansenlaw',
'linbasex', 'onion_bordas, 'onion_peeling', 'two_point',
'three_point']
repeat : int
repeat each benchmark at least this number of times to get the average
values
t_min : float
repeat each benchmark for at least this number of seconds to get the
average values
t_max : float
do not benchmark methods at array sizes when this is expected to take
longer than this number of seconds. Notice that benchmarks for the
smallest size from **n** are always run and that the estimations can be
off by a factor of 2 or so.
verbose : boolean
determines whether benchmark progress should be reported (to stderr)
Attributes
-------
n : list of int
array sizes from the parameter **n**, sorted in ascending order
bs, fabel, iabel : dict of list of float
benchmark results — dictionaries for
bs
basis-set generation
fabel
forward Abel transform
iabel
inverse Abel transform
with methods as keys and lists of timings in milliseconds as entries.
Timings correspond to array sizes in :py:attr:`AbelTiming.n`; for
skipped benchmarks (see **t_max**) they are ``np.nan``.
Notes
-----
The results can be output in a nice format by simply
``print(AbelTiming(...))``.
Keep in mind that most methods have :math:`O(n^2)` memory and
:math:`O(n^3)` time complexity, so going from *n* = 501 to *n* = 5001
would require about 100 times more memory and take about 1000 times longer.
"""
def __init__(self, n=[301, 501], select='all',
repeat=1, t_min=0.1, t_max=np.inf,
verbose=True):
self.n = sorted(_ensure_list(n))
select = _ensure_list(select)
self.repeat = repeat
self.t_max = t_max
self.verbose = verbose
# create the timing function
self._time = Timent(skip=-1, repeat=repeat, duration=t_min).time
# which methods need half and whole images
need_half = frozenset([
'basex',
'daun',
'direct_C',
'direct_Python',
'hansenlaw',
'onion_bordas',
'onion_peeling',
'two_point',
'three_point',
])
need_whole = frozenset([
'linbasex',
'rbasex',
])
# all available methods (= union of the above sets)
all_methods = need_half | need_whole
# remove direct_C, if not supported
if not direct.cython_ext:
all_methods = all_methods - frozenset(['direct_C'])
# Select methods
if 'all' in select:
methods = all_methods
else:
methods = set()
for method in select:
if method not in all_methods:
print('Warning: Unsupported method "{}" ignored!'.
format(method))
else:
methods.add(method)
if not methods:
raise ValueError('At least one valid method must be specified!')
# dictionary for the results
self.res = {'bs': {},
'forward': {},
'inverse': {}}
# same results as separate dictionaries (aliases to the above)
self.bs = self.res['bs']
self.fabel = self.res['forward']
self.iabel = self.res['inverse']
# inverse speed for time estimations
self._pace = {}
# Loop over all image sizes
for ni in self.n:
self.ni = int(ni)
# image height and half-width
self.h, self.w = self.ni, self.ni // 2 + 1
# We transform a rectangular image, since we are making the
# assumption that we are transforming just the "right side" of
# a square image.
# see: https://github.com/PyAbel/PyAbel/issues/207
self._vprint('n =', self.ni)
# The following code tries to catch the interruption signal
# (Ctrl+C) to abort as soon as possible but preserve the available
# results. Setting a negative time limit makes all remaining
# benchmarks to skip (calling them is still needed to fill the
# results with nans).
# create needed images (half and/or whole)
if (self.t_max >= 0): # (do not create while aborting)
try:
if methods & need_half:
self.half_image = np.random.randn(self.h, self.w)
if methods & need_whole:
self.whole_image = np.random.randn(self.h, self.h)
except (KeyboardInterrupt, MemoryError) as e:
self._vprint(repr(e) + ' during image creation!'
' Skipping the rest...')
self.t_max = -1.0
# (the images will not be used, so leaving them as is)
# call benchmark (see below) for each method at this image size
for method in methods:
self._vprint(' ', method)
try:
getattr(self, '_time_' + method)()
except (KeyboardInterrupt, MemoryError) as e:
self._vprint('\n' + repr(e) + '! Skipping the rest...')
self.t_max = -1.0
# rerun this interrupted benchmark to nan-fill its results
getattr(self, '_time_' + method)()
# discard images
self.half_image = None
self.whole_image = None
self._vprint('')
def _vprint(self, *args, **kwargs):
"""
Print to stderr, only if verbose=True.
"""
if self.verbose:
print(*args, file=sys.stderr, **kwargs)
sys.stderr.flush() # (Python 3 buffers stderr. Why?!)
def _append(self, kind, method, result):
"""
Store one result, ensuring that the results array exists.
"""
if method not in self.res[kind]:
self.res[kind][method] = []
self.res[kind][method].append(result)
def _benchmark(self,
kind, method,
func, *args, **kwargs):
"""
Run benchmark for the function with arguments and store the result.
"""
self._append(kind, method,
self._time(func, *args, **kwargs) * 1000) # [s] to [ms]
def _skip(*param):
"""
Decorator for benchmarking functions.
Adds a check whether the estimated execution time would exceed t_max.
If so, fills the results with np.nan, otherwise executes the
benchmarking code.
Parameters are tuples "(kind, method)". Either item can be a list, then
all combinations of kind(s) and method(s) are implied. Altogether the
set of these kind–method pairs must be the same as in the "normal"
execution results.
"""
# assemble all kind–method pairs
res_keys = []
for p in param:
res_keys += itertools.product(*map(_ensure_list, p))
def decorator(f):
method = f.__name__[6:] # (remove initial "_time_")
def decorated(self):
# get the estimated time (use 0 if cannot) and report it
t_est = self._pace.get(method, 0) * self.ni**3
self._vprint(' estimated ' +
('{:g} s'.format(_roundsf(t_est, 2)) if t_est
else '???'), end='')
# skip the benchmark if it would take too long
if t_est > self.t_max:
self._vprint(' -- skipped')
# fill the results with nan
for k, m in res_keys:
self._append(k, m, np.nan)
return
else: # otherwise run the benchmark
f(self)
# calculate the actual total time and report it
t = (sum(self.res[k][m][-1] for k, m in res_keys) *
self.repeat) / 1000 # [ms] -> [s]
self._vprint(', actually {:.3f} s'.format(t))
# save the pace for future estimations
self._pace[method] = t / self.ni**3
return decorated
return decorator
# Benchmarking functions for each method.
# Must be named "_time_method", where "method" is as in "select".
# Do not take or return anything, but use instance variables:
# parameters:
# self.ni, self.h, self.w -- image size, height, half-width,
# self.whole_image, self.half_image -- image (part) to transform
# results:
# self.res[kind][method] = [timings] -- appended for each image size,
# use np.nan for skipped points
# kind = 'bs' (basis), 'forward', 'inverse' -- as applicable
# method -- as above, but can also include variants (like in basex)
@_skip(('bs', 'basex'),
(['inverse', 'forward'], ['basex', 'basex(var.reg.)']))
def _time_basex(self):
# benchmark the basis generation (default parameters)
def gen_basis():
basex.cache_cleanup()
basex.get_bs_cached(self.w, basis_dir=None)
self._benchmark('bs', 'basex',
gen_basis)
# benchmark all transforms
for direction in ['inverse', 'forward']: # (default first)
# get the transform matrix (default parameters)
A = basex.get_bs_cached(self.w, basis_dir=None,
direction=direction)
# benchmark the transform itself
self._benchmark(direction, 'basex',
basex.basex_core_transform,
self.half_image, A)
# benchmark the transform with variable regularization
def basex_var():
A = basex.get_bs_cached(self.w, reg=1.0+np.random.random(),
basis_dir=None, direction=direction)
basex.basex_core_transform(self.half_image, A)
self._benchmark(direction, 'basex(var.reg.)',
basex_var)
# discard the unneeded transform matrix
basex.cache_cleanup(direction)
# discard all caches
basex.cache_cleanup()
@_skip(('bs', 'daun'),
(['inverse', 'forward'], 'daun'),
('inverse', 'daun(var.reg.)'))
def _time_daun(self):
# benchmark the basis generation (default parameters)
def gen_basis():
daun.cache_cleanup()
daun.get_bs_cached(self.w)
self._benchmark('bs', 'daun',
gen_basis)
# benchmark all transforms
for direction in ['inverse', 'forward']: # (default first)
# cache the transform matrix
daun.get_bs_cached(self.w, direction=direction)
# benchmark the transform itself
self._benchmark(direction, 'daun',
daun.daun_transform,
self.half_image, direction=direction,
verbose=False)
if direction == 'inverse':
# benchmark the transform with variable regularization
def daun_var():
daun.daun_transform(self.half_image,
reg=1.0 + np.random.random(),
verbose=False)
self._benchmark('inverse', 'daun(var.reg.)',
daun_var)
# discard the unneeded transform matrix
daun.cache_cleanup(direction)
# discard all caches
daun.cache_cleanup()
@_skip((['inverse', 'forward'], 'direct_C'))
def _time_direct_C(self):
for direction in ['inverse', 'forward']:
self._benchmark(direction, 'direct_C',
direct.direct_transform,
self.half_image, direction=direction, backend='C')
@_skip((['inverse', 'forward'], 'direct_Python'))
def _time_direct_Python(self):
for direction in ['inverse', 'forward']:
self._benchmark(direction, 'direct_Python',
direct.direct_transform,
self.half_image, direction=direction,
backend='python')
@_skip((['inverse', 'forward'], 'hansenlaw'))
def _time_hansenlaw(self):
for direction in ['inverse', 'forward']:
self._benchmark(direction, 'hansenlaw',
hansenlaw.hansenlaw_transform,
self.half_image, direction=direction)
@_skip((['bs', 'inverse'], 'linbasex'))
def _time_linbasex(self):
# benchmark the basis generation (default parameters)
def gen_basis():
linbasex.cache_cleanup()
linbasex.get_bs_cached(self.h, basis_dir=None)
self._benchmark('bs', 'linbasex',
gen_basis)
# benchmark the transform (basis is already cached)
self._benchmark('inverse', 'linbasex',
linbasex.linbasex_transform_full,
self.whole_image)
# discard all caches
linbasex.cache_cleanup()
@_skip(('inverse', 'onion_bordas'))
def _time_onion_bordas(self):
self._benchmark('inverse', 'onion_bordas',
onion_bordas.onion_bordas_transform,
self.half_image)
# (Generic function for all Dasch methods; not called directly.)
def _time_dasch(self, method):
# benchmark the basis generation (default parameters)
def gen_basis():
dasch.cache_cleanup()
dasch.get_bs_cached(method, self.w, basis_dir=None)
self._benchmark('bs', method,
gen_basis)
# get the transform matrix (is already cached)
D = dasch.get_bs_cached(method, self.w, basis_dir=None)
# benchmark the transform
self._benchmark('inverse', method,
dasch.dasch_transform,
self.half_image, D)
# discard all caches
dasch.cache_cleanup()
@_skip((['bs', 'inverse'], 'onion_peeling'))
def _time_onion_peeling(self):
self._time_dasch('onion_peeling')
@_skip(('bs', 'rbasex'),
(['inverse', 'forward'], ['rbasex', 'rbasex(None)']))
def _time_rbasex(self):
# benchmark the basis generation (default parameters)
def gen_basis():
rbasex.cache_cleanup()
rbasex.get_bs_cached(self.w - 1) # Rmax = half-width - 1
self._benchmark('bs', 'rbasex',
gen_basis)
# benchmark all transforms
for direction in ['inverse', 'forward']: # (default first)
# warm-up run to cache "distributions" (basis is already cached)
rbasex.rbasex_transform(self.whole_image)
# benchmark the transform
self._benchmark(direction, 'rbasex',
rbasex.rbasex_transform,
self.whole_image)
# same without output image
self._benchmark(direction, 'rbasex(None)',
rbasex.rbasex_transform,
self.whole_image, out=None)
# discard the unneeded transform matrix
basex.cache_cleanup(direction)
# discard all caches
rbasex.cache_cleanup()
@_skip((['bs', 'inverse'], 'three_point'))
def _time_three_point(self):
self._time_dasch('three_point')
@_skip((['bs', 'inverse'], 'two_point'))
def _time_two_point(self):
self._time_dasch('two_point')
# (End of benchmarking functions.)
def __repr__(self):
import platform
out = ['PyAbel benchmark run on {}\n'.format(platform.processor()),
'time in milliseconds']
# field widths are chosen to accommodate up to:
# method = 15 characters
# ni = 99999 (would require at least 75 GB RAM)
# time = 9999999.9 ms (almost 3 hours)
# data columns are 9 characters wide and separated by 3 spaces
TITLE_FORMAT = '=== {} ==='
HEADER_ROW = 'Method ' + \
''.join([' {:>9}'.
format('n = {}'.format(ni)) for ni in self.n])
SEP_ROW = '-' * len(HEADER_ROW)
ROW_FORMAT = '{:15}' + ' {:9.1f}' * len(self.n)
def print_benchmark(name, res):
title = '{:=<{w}}'.format(TITLE_FORMAT.format(name),
w=len(SEP_ROW))
out = ['\n' + title + '\n']
out += [HEADER_ROW]
out += [SEP_ROW]
for name, row in sorted(res.items()):
out += [ROW_FORMAT.format(name, *row)]
return out
if self.bs:
out += print_benchmark('Basis generation', self.bs)
out += ['']
if self.fabel:
out += print_benchmark('Forward Abel transform', self.fabel)
out += ['']
if self.iabel:
out += print_benchmark('Inverse Abel transform', self.iabel)
return '\n'.join(out)
[docs]
class DistributionsTiming(object):
"""
Benchmark performance of different VMI distributions implementations.
Parameters
----------
n : int or sequence of int
array size(s) for the benchmark (assuming full images to be 2D square
arrays (*n*, *n*))
shape : str
image shape:
``'Q'``:
one quadrant ((*n* + 1)/2, (*n* + 1)/2)
``'half'`` (default):
half image (*n*, (*n* + 1)/2), vertically centered
``'full'``:
full image (*n*, *n*), centered
rmax : str or sequence of str
``'MIN'`` (default) and/or ``'all'``, see **rmax** in
:class:`abel.tools.vmi.Distributions`
order : int
highest order in the angular distributions. Even number ≥ 0.
weight : str or sequence of str
weighting to test. Use ``'all'`` for all available or choose any
combination of individual types::
weight=['none', 'sin', 'array', 'sin+array']
method : str or sequence of str
methods to benchmark. Use ``'all'`` (default) for all available or
choose any combination of individual methods::
method=['nearest', 'linear', 'remap']
repeat : int
repeat each benchmark at least this number of times to get the average
values
t_min : float
repeat each benchmark for at least this number of seconds to get the
average values
Attributes
-------
n : list of int
array sizes from the parameter **n**
results : dict of dict of dict of list of tuple of float
benchmark results — multi-level dictionary, in which
``results[method][rmax][weight]`` is the list of timings in
milliseconds corresponding to array sizes in
:py:attr:`DistributionsTiming.n`. Each timing is a tuple (*t*:sub:`1`,
*t*:sub:`∞`) with *t*:sub:`1` corresponding to single-image
(non-cached) performance, and *t*:sub:`∞` corresponding to batch
(cached) performance.
Notes
-----
The results can be output in a nice format by simply
``print(DistributionsTiming(...))``.
"""
def __init__(self, n=[301, 501], shape='half', rmax='MIN', order=2,
weight=['none', 'sin', 'sin+array'], method='all',
repeat=1, t_min=0.1):
self.n = _ensure_list(n)
if shape == 'Q':
origin = 'll'
self.shape = 'One quadrant'
elif shape == 'half':
origin = 'cl'
self.shape = 'Half image'
elif shape == 'full':
origin = 'cc'
self.shape = 'Full image'
else:
raise ValueError('Incorrect shape "{}"'.format(shape))
self.rmaxs = rmaxs = _ensure_list(rmax)
self.order = order
weights = _ensure_list(weight)
if 'all' in weights:
weights = ['none', 'sin', 'array', 'sin+array']
self.weights = weights
methods = _ensure_list(method)
if 'all' in methods:
methods = ['nearest', 'linear', 'remap']
self.methods = methods
# create the timing function
time = Timent(skip=-1, repeat=repeat, duration=t_min).time
# dictionary for the results
self.results = {m: {r: {w: [] for w in weights}
for r in rmaxs}
for m in methods}
from abel.tools.vmi import Ibeta, Distributions
# make sure that everything is loaded
# (otherwise the 1st timing call is very slow)
Ibeta(np.array([[0]]))
# Loop over all image sizes
for ni in self.n:
ni = int(ni)
# create image and weight array
rows = (ni + 1) // 2 if shape == 'Q' else ni
cols = (ni + 1) // 2 if shape in ['Q', 'half'] else ni
IM = np.random.randn(rows, cols)
warray = np.random.randn(rows, cols)
# benchmark each combination of parameters
for method in methods:
for rmax in rmaxs:
for weight in weights:
if weight == 'none':
w = {'use_sin': False, 'weights': None}
elif weight == 'sin':
w = {'use_sin': True, 'weights': None}
elif weight == 'array':
w = {'use_sin': False, 'weights': warray}
elif weight == 'sin+array':
w = {'use_sin': True, 'weights': warray}
else:
raise ValueError('Incorrect weight "{}"'.
format(weight))
# single-image
t1 = time(Ibeta,
IM, origin, rmax, order, method=method, **w)
# cached
distr = Distributions(origin, rmax, order,
method=method, **w)
distr(IM) # trigger precalculations
def distrIMIbeta(IM):
return distr(IM).Ibeta()
tn = time(distrIMIbeta,
IM)
# save results
self.results[method][rmax][weight].append((t1 * 1000,
tn * 1000))
def __repr__(self):
import platform
out = ['PyAbel benchmark run on {}\n'.format(platform.processor()),
'order = {}, time in milliseconds'.format(self.order)]
# field widths are chosen to accommodate up to:
# rmax + weight = 3 leading spaces + 14 characters
# ni = 99999
# time = 9999999.9 ms (almost 3 hours)
# data columns are 9 characters wide and separated by 3 spaces
TITLE_FORMAT = '=== ' + self.shape + ', {} ==='
HEADER_ROW = 'Method ' + \
''.join([' {:>9}'.
format('n = {}'.format(ni)) for ni in self.n])
SEP_ROW = '-' * len(HEADER_ROW)
ROW_FORMAT = ' {}, {:9}' + ' {:9.1f}' * len(self.n)
def print_benchmark(mode):
title = '{:=<{w}}'.format(TITLE_FORMAT.format(mode),
w=len(SEP_ROW))
out = ['\n' + title + '\n']
out += [HEADER_ROW]
out += [SEP_ROW]
for method in self.methods:
out += [method]
resm = self.results[method]
for rmax in self.rmaxs:
resr = resm[rmax]
for weight in self.weights:
res = resr[weight]
t = list(zip(*res))[0 if mode == 'single' else 1]
out += [ROW_FORMAT.format(rmax, str(weight), *t)]
return out
out += print_benchmark('single')
out += ['']
out += print_benchmark('cached')
return '\n'.join(out)
[docs]
def is_symmetric(arr, i_sym=True, j_sym=True):
"""
Takes in an array of shape (n, m) and check if it is symmetric
Parameters
----------
arr : 1D or 2D array
i_sym : array
symmetric with respect to the 1st axis
j_sym : array
symmetric with respect to the 2nd axis
Returns
-------
a binary array with the symmetry condition for the corresponding quadrants
Notes
-----
If both **i_sym** = ``True`` and **j_sym** = ``True``, the input array is
checked for polar symmetry.
See `issue #34 comment
<https://github.com/PyAbel/PyAbel/issues/34#issuecomment-160344809>`__
for the defintion of a center of the image.
"""
Q0, Q1, Q2, Q3 = tools.symmetry.get_image_quadrants(
arr, reorient=False)
if i_sym and not j_sym:
valid_flag = [np.allclose(np.fliplr(Q1), Q0),
np.allclose(np.fliplr(Q2), Q3)]
elif not i_sym and j_sym:
valid_flag = [np.allclose(np.flipud(Q1), Q2),
np.allclose(np.flipud(Q0), Q3)]
elif i_sym and j_sym:
valid_flag = [np.allclose(np.flipud(np.fliplr(Q1)), Q3),
np.allclose(np.flipud(np.fliplr(Q0)), Q2)]
else:
raise ValueError('Checking for symmetry with both i_sym=False \
and j_sym=False does not make sense!')
return np.array(valid_flag)
[docs]
def absolute_ratio_benchmark(analytical, recon, kind='inverse'):
"""
Check the absolute ratio between an analytical function and the result
of a inverse Abel reconstruction.
Parameters
----------
analytical : one of the classes from analytical, initialized
recon : 1D ndarray
a reconstruction (i.e. inverse abel)
given by some PyAbel implementation
"""
mask = analytical.mask_valid
if kind == 'inverse':
func = analytical.func
elif kind == 'forward':
func = analytical.abel
err = func[mask]/recon[mask]
return err