In this article we will follow the tenets of TDD for developing Scientific Software as laid out in the first installment of this series to develop an edge detection filter known as the Sobel filter.
In the first article, we talked about how important – and tricky – it can be to develop reliable testing methods for software which the complex problems often found in scientific software. We also saw how to overcome those issues by following a development cycle inspired by TDD, but adapted for scientific computing. I reproduce a shortened version of these instructions below.
Implementation cycle
- Gather requirements
- Sketch the design
- Implement initial tests
- Implement your alpha version
- Build an oracle library
- Revisit tests
- Implement profiling
Optimization cycle
- Optimize
- Reimplement
New method cycle
- Implement new method
- Validate against previous curated oracles
Getting Started: The Sobel Filter
In this article, we will follow the above instructions to develop a function which applies the Sobel filter. The Sobel filter is a commonly used computer vision tool to detect or enhance edges in images. Keep reading to see some examples!
Starting with the first step, we gather some requirements. We will follow the standard formulation of the Sobel filter described in this article. Simply put, the Sobel operator consists of convolving image A with the following two 3 × 3 kernels, squaring each pixel in the resulting images, summing them and taking the point-by-point square root. If Ax and Ay are the images resulting from the convolutions, the result of the Sobel filter S is √(Ax² + Ay²).
Requirements
We want this function to take any 2D array and generate another 2D array. We might want it to operate on any two axes of an ndarray. We might even want it to work on more (or less) than two axes. We might have specifications for how to handle the edges of the array.
For now let’s remember to keep it simple, and start with a 2D implementation. But before we do that, let’s sketch the design.
Sketch the Design
We start with a simple design, leveraging Python‘s annotations. I highly recommend annotating as much as possible, and using mypy as a linter.
from typing import Tuple
from numpy.core.multiarray import normalize_axis_index
from numpy.typing import NDArray
def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:
"""Compute the Sobel filter of an image
Parameters
----------
arr : NDArray
Input image
axes : Tuple[int, int], optional
Axes over which to compute the filter, by default (-2, -1)
Returns
-------
NDArray
Output
"""
# Only accepts 2D arrays
if arr.ndim != 2:
raise NotImplementedError
# Ensure that the axis[0] is the first axis, and axis[1] is the second
# axis. The obscure `normalize_axis_index` converts negative indices to
# indices between 0 and arr.ndim - 1.
if any(
normalize_axis_index(ax, arr.ndim) != i
for i, ax in zip(range(2), axes)
):
raise NotImplementedError
pass
Implement Tests
This function doesn’t do much. But it is documented, annotated, and its current limitations are baked into it. Now that we have a design, we immediately pivot to tests.
First, we notice that empty images (all zeroes) have no edges. So they must output zeroes as well. In fact, any constant image should also return zeros. Let’s bake that into out first tests. We will also see how we can use monkey testing to test many arrays.
# test_zero_constant.py
import numpy as np
import pytest
# Test multiple dtypes at once
@pytest.mark.parametrize(
"dtype",
["float16", "float32", "float64", "float128"],
)
def test_zero(dtype):
# Set random seed
seed = int(np.random.rand() * (2**32 - 1))
np.random.seed(seed)
# Create a 2D array of random shape and fill with zeros
nx, ny = np.random.randint(3, 100, size=(2,))
arr = np.zeros((nx, ny), dtype=dtype)
# Apply sobel function
arr_sob = sobel(arr)
# `assert_array_equal` should fail most of the times.
# It will only work when `arr_sob` is identically zero,
# which is usually not the case.
# DO NOT USE!
# np.testing.assert_array_equal(
# arr_sob, 0.0, err_msg=f"{seed=} {nx=}, {ny=}"
# )
# `assert_almost_equal` can fail when used with high decimals.
# It also relies on float64 checking, which might fail for
# float128 types.
# DO NOT USE!
# np.testing.assert_almost_equal(
# arr_sob,
# np.zeros_like(arr),
# err_msg=f"{seed=} {nx=}, {ny=}",
# decimal=4,
# )
# `assert_allclose` with custom tolerance is my preferred method
# The 10 is arbitrary and depends on the problem. If a method
# which you know to be correct does not pass, increase to 100, etc.
# If the tolerance needed to make the tests pass is too high, make
# sure the method is actually correct.
tol = 10 * np.finfo(arr.dtype).eps
err_msg = f"{seed=} {nx=}, {ny=} {tol=}" # Log seeds and other info
np.testing.assert_allclose(
arr_sob,
np.zeros_like(arr),
err_msg=err_msg,
atol=tol, # rtol is useless for desired=zeros
)
@pytest.mark.parametrize(
"dtype", ["float16", "float32", "float64", "float128"]
)
def test_constant(dtype):
seed = int(np.random.rand() * (2**32 - 1))
np.random.seed(seed)
nx, ny = np.random.randint(3, 100, size=(2,))
constant = np.random.randn(1).item()
arr = np.full((nx, ny), fill_value=constant, dtype=dtype)
arr_sob = sobel(arr)
tol = 10 * np.finfo(arr.dtype).eps
err_msg = f"{seed=} {nx=}, {ny=} {tol=}"
np.testing.assert_allclose(
arr_sob,
np.zeros_like(arr),
err_msg=err_msg,
atol=tol, # rtol is useless for desired=zeros
)
This code snippet can be run from the command-line with
$ pytest -qq -s -x -vv --durations=0 test_zero_constant.py
Alpha Version
Of course our tests will currently fail, but they are enough for now. Let’s implement a first version.
from typing import Tuple
import numpy as np
from numpy.core.multiarray import normalize_axis_index
from numpy.typing import NDArray
def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:
if arr.ndim != 2:
raise NotImplementedError
if any(
normalize_axis_index(ax, arr.ndim) != i
for i, ax in zip(range(2), axes)
):
raise NotImplementedError
# Define our filter kernels. Notice they inherit the input type, so
# that a float32 input never has to be cast to float64 for computation.
# But can you see where using another dtype for Gx and Gy might make
# sense for some input dtypes?
Gx = np.array(
[[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],
dtype=arr.dtype,
)
Gy = np.array(
[[-1, -2, -1], [0, 0, 0], [1, 2, 1]],
dtype=arr.dtype,
)
# Create the output array and fill with zeroes
s = np.zeros_like(arr)
for ix in range(1, arr.shape[0] - 1):
for iy in range(1, arr.shape[1] - 1):
# Pointwise multiplication followed by sum, aka convolution
s1 = (Gx * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
s2 = (Gy * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
s[ix, iy] = np.hypot(s1, s2) # np.sqrt(s1**2 + s2**2)
return s
With this new function, all our tests should pass, and we should get an output like this:
$ pytest -qq -s -x -vv --durations=0 test_zero_constant.py
........
======================================== slowest durations =========================================
0.09s call t_049988eae7f94139a7067f142bf2852f.py::test_constant[float16]
0.08s call t_049988eae7f94139a7067f142bf2852f.py::test_zero[float64]
0.06s call t_049988eae7f94139a7067f142bf2852f.py::test_constant[float128]
0.04s call t_049988eae7f94139a7067f142bf2852f.py::test_zero[float128]
0.04s call t_049988eae7f94139a7067f142bf2852f.py::test_constant[float64]
0.02s call t_049988eae7f94139a7067f142bf2852f.py::test_constant[float32]
0.01s call t_049988eae7f94139a7067f142bf2852f.py::test_zero[float16]
(17 durations < 0.005s hidden. Use -vv to show these durations.)
8 passed in 0.35s
We have so far:
- Gathered requirements of our problem.
- Sketched an initial design.
- Implemented some tests.
- Implemented the alpha version, which passes these tests.
These tests provide verification for future versions, as well as a very rudimentary oracle library. But while unit tests are great at capturing minute deviations from expected results, they are not great at differentiating wrong results from quantitatively different – but still correct – results. Suppose tomorrow we want to implement another Sobel-type operator, which has a longer kernel. We don’t expect the results to match exactly to our current version, but we do expect the outputs of both functions to be qualitatively very similar.
In addition, it is excellent practice to try many different inputs to our functions to get a sense of how they behave for different inputs. This ensures that we scientifically validate the results.
Scikit-image has an excellent library of images, which we can use to create oracles.
# !pip installscikit-image pooch
from typing import Dict, Callable
import numpy as np
import skimage.data
bwimages: Dict[str, np.ndarray] = {}
for attrname in skimage.data.__all__:
attr = getattr(skimage.data, attrname)
# Data are obtained via function calls
if isinstance(attr, Callable):
try:
# Download the data
data = attr()
# Ensure it is a 2D array
if isinstance(data, np.ndarray) and data.ndim == 2:
# Convert from various int types to float32 to better
# assess precision
bwimages[attrname] = data.astype(np.float32)
except:
continue
# Apply sobel to images
bwimages_sobel = {k: sobel(v) for k, v in bwimages.items()}
Once we have the data, we can plot it.
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
def create_colorbar(im, ax):
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.1)
cb = ax.get_figure().colorbar(im, cax=cax, orientation="vertical")
return cax, cb
for name, data in bwimages.items():
fig, axs = plt.subplots(
1, 2, figsize=(10, 4), sharex=True, sharey=True
)
im = axs[0].imshow(data, aspect="equal", cmap="gray")
create_colorbar(im, axs[0])
axs[0].set(title=name)
im = axs[1].imshow(bwimages_sobel[name], aspect="equal", cmap="gray")
create_colorbar(im, axs[1])
axs[1].set(title=f"{name} sobel")
These look really good! I would recommend storing these, both as arrays and as figures which I can quickly check against for a new version. I highly recommend HD5F for array storage. You can also set up a Sphinx Gallery to directly generate the figures them when updating documentation, that way you have a reproducible figure build that you can use to check against previous versions.
After the results have been validated, you can store them on disk and use them as part of your unit testing. Something like this:
oracle_library = [(k, v, bwimages_sobel[k]) for k, v in bwimages.items()]
# save_to_disk(oracle_library, ...)
# test_oracle.py
import numpy as np
import pytest
from numpy.typing import NDArray
# oracle_library = read_from_disk(...)
@pytest.mark.parametrize("name,input,output", oracle_library)
def test_oracles(name: str, input: NDArray, output: NDArray):
output_new = sobel(input)
tol = 10 * np.finfo(input.dtype).eps
mean_avg_error = np.abs(output_new - output).mean()
np.testing.assert_allclose(
output_new,
output,
err_msg=f"{name=} {tol=} {mean_avg_error=}",
atol=tol,
rtol=tol,
)
Profiling
Computing the Sobel filter for these datasets took a while! So the next step is to profile the code. I recommend creating a benchmark_xyz.py
file for each test, and re-run them regularly to probe for performance regressions. This can even be part of your unit testing, but we won’t go so far in this example. Another idea is to use the oracles above for benchmarking.
There are many ways of timing code execution. To get the system-wide, "real" elapsed time, use perf_counter_ns
from the built-in time
module to measure time in nanoseconds. In a Jupyter notebook, the built-in %%timeit
cell magic times a certain cell execution. The decorator below is inspired by this cell magic and can be used to time any function.
import time
from functools import wraps
from typing import Callable, Optional
def sizeof_fmt(num, suffix="s"):
for unit in ["n", "μ", "m"]:
if abs(num) < 1000:
return f"{num:3.1f} {unit}{suffix}"
num /= 1000
return f"{num:.1f}{suffix}"
def timeit(
func_or_number: Optional[Callable] = None,
number: int = 10,
) -> Callable:
"""Apply to a function to time its executions.
Parameters
----------
func_or_number : Optional[Callable], optional
Function to be decorated or `number` argument (see below), by
default None
number : int, optional
Number of times the function will run to obtain statistics, by
default 10
Returns
-------
Callable
When fed a function, returns the decorated function. Otherwise
returns a decorator.
Examples
--------
.. code-block:: python
@timeit
def my_fun():
pass
@timeit(100)
def my_fun():
pass
@timeit(number=3)
def my_fun():
pass
"""
if isinstance(func_or_number, Callable):
func = func_or_number
number = number
elif isinstance(func_or_number, int):
func = None
number = func_or_number
else:
func = None
number = number
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
runs_ns = np.empty((number,))
# Run first and measure store the result
start_time = time.perf_counter_ns()
result = f(*args, **kwargs)
runs_ns[0] = time.perf_counter_ns() - start_time
for i in range(1, number):
start_time = time.perf_counter_ns()
f(*args, **kwargs) # Without storage, faster
runs_ns[i] = time.perf_counter_ns() - start_time
time_msg = f"{sizeof_fmt(runs_ns.mean())} ± "
time_msg += f"{sizeof_fmt(runs_ns.std())}"
print(
f"{time_msg} per run (mean ± std. dev. of {number} runs)"
)
return result
return wrapper
if func is not None:
return decorator(func)
return decorator
Putting our function to the test:
arr_test = np.random.randn(500, 500)
sobel_timed = timeit(sobel)
sobel_timed(arr_test);
# 3.9s ± 848.9 ms per run (mean ± std. dev. of 10 runs)
Not too fast 🙁
When investigating slowness or performance regressions, it is also possible to track the runtime of each line. The line_profiler
library is an excellent resource for this. It can be used via Jupyter cell magic, or using the API. Here is an API example:
from line_profiler import LineProfiler
lp = LineProfiler()
lp_wrapper = lp(sobel)
lp_wrapper(arr_test)
lp.print_stats(output_unit=1) # 1 for seconds, 1e-3 for milliseconds, etc.
Here is an example output:
Timer unit: 1 s
Total time: 4.27197 s
File: /tmp/ipykernel_521529/1313985340.py
Function: sobel at line 8
Line # Hits Time Per Hit % Time Line Contents
==============================================================
8 def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:
9 # Only accepts 2D arrays
10 1 0.0 0.0 0.0 if arr.ndim != 2:
11 raise NotImplementedError
12
13 # Ensure that the axis[0] is the first axis, and axis[1] is the second
14 # axis. The obscure `normalize_axis_index` converts negative indices to
15 # indices between 0 and arr.ndim - 1.
16 1 0.0 0.0 0.0 if any(
17 normalize_axis_index(ax, arr.ndim) != i
18 1 0.0 0.0 0.0 for i, ax in zip(range(2), axes)
19 ):
20 raise NotImplementedError
21
22 1 0.0 0.0 0.0 Gx = np.array(
23 1 0.0 0.0 0.0 [[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],
24 1 0.0 0.0 0.0 dtype=arr.dtype,
25 )
26 1 0.0 0.0 0.0 Gy = np.array(
27 1 0.0 0.0 0.0 [[-1, -2, -1], [0, 0, 0], [1, 2, 1]],
28 1 0.0 0.0 0.0 dtype=arr.dtype,
29 )
30 1 0.0 0.0 0.0 s = np.zeros_like(arr)
31 498 0.0 0.0 0.0 for ix in range(1, arr.shape[0] - 1):
32 248004 0.1 0.0 2.2 for iy in range(1, arr.shape[1] - 1):
33 248004 1.8 0.0 41.5 s1 = (Gx * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
34 248004 1.7 0.0 39.5 s2 = (Gy * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
35 248004 0.7 0.0 16.8 s[ix, iy] = np.hypot(s1, s2)
36 1 0.0 0.0 0.0 return s
Lot’s of time is spend inside the innermost loop. NumPy prefers vectorized math, as it can then rely on compiled code. Since we are using explicit for loops, it makes sense that this function is very slow.
In addition, it is important to be smart about memory allocations inside of loops. NumPy is somewhat smart about allocating small amounts of memory inside loops, so the lines defining s1
or s2
might not be allocating new memory. But they also could be, or there could be some other memory allocation that is happening under the hood that we are not aware of. I therefore recommend also profiling memory. I like using Memray for that, but there are others like Fil and Sciagraph. I would also avoid memory_profiler which (very unfortunately!) is no longer maintained. Also Memray is much more powerful. Here is an example of Memray in action:
$ # Create sobel.bin which holds the profiling information
$ memray run -fo sobel.bin --trace-python-allocators sobel_run.py
Writing profile results into sobel.bin
Memray WARNING: Correcting symbol for aligned_alloc from 0x7fc5c984d8f0 to 0x7fc5ca4a5ce0
[memray] Successfully generated profile results.
You can now generate reports from the stored allocation records.
Some example commands to generate reports:
python3 -m memray flamegraph sobel.bin
$ # Generate flame graph
$ memray flamegraph -fo sobel_flamegraph.html --temporary-allocations sobel.bin
⠙ Calculating high watermark... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 99% 0:00:0100:01
⠏ Processing allocation records... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 98% 0:00:0100:01
Wrote sobel_flamegraph.html
$ # Show memory tree
$ memray tree --temporary-allocations sobel.bin
⠧ Calculating high watermark... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 100% 0:00:0100:01
⠧ Processing allocation records... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 100% 0:00:0100:01
Allocation metadata
-------------------
Command line arguments:
'memray run -fo sobel.bin --trace-python-allocators sobel_run.py'
Peak memory size: 11.719MB
Number of allocations: 15332714
Biggest 10 allocations:
-----------------------
📂 123.755MB (100.00 %) <ROOT>
└── [[3 frames hidden in 2 file(s)]]
└── 📂 123.755MB (100.00 %) _run_code /usr/lib/python3.10/runpy.py:86
├── 📂 122.988MB (99.38 %) <module> sobel_run.py:40
│ ├── 📄 51.087MB (41.28 %) sobel sobel_run.py:35
│ ├── [[1 frames hidden in 1 file(s)]]
│ │ └── 📄 18.922MB (15.29 %) _sum
│ │ lib/python3.10/site-packages/numpy/core/_methods.py:49
│ └── [[1 frames hidden in 1 file(s)]]
│ └── 📄 18.921MB (15.29 %) _sum
│ lib/python3.10/site-packages/numpy/core/_methods.py:49
...
Beta Version and a Bug
Now that we have a working alpha and some profiling functions, we will leverage the SciPy library to obtain a much faster version.
from typing import Tuple
import numpy as np
from numpy.core.multiarray import normalize_axis_index
from numpy.typing import NDArray
from scipy.signal import convolve2d
def sobel_conv2d(
arr: NDArray, axes: Tuple[int, int] = (-2, -1)
) -> NDArray:
if arr.ndim != 2:
raise NotImplementedError
if any(
normalize_axis_index(ax, arr.ndim) != i
for i, ax in zip(range(2), axes)
):
raise NotImplementedError
# Create the kernels as a single, complex array. Allows us to use
# np.abs instead of np.hypot to calculate the magnitude.
G = np.array(
[[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],
dtype=arr.dtype,
)
G = G + 1j * np.array(
[[-1, -2, -1], [0, 0, 0], [1, 2, 1]],
dtype=arr.dtype,
)
s = convolve2d(arr, G, mode="same")
np.absolute(s, out=s) # In-place abs
return s.real
sobel_timed = timeit(sobel_conv2d)
sobel_timed(arr_test)
# 14.3 ms ± 1.71 ms per loop (mean ± std. dev. of 10 runs)
Much better! But is it right?
The images look very similar, but if you notice the color scale, they are not the same. Running the tests flags a small mean average error. Thankfully, we are now well-equipped at detecting quantitative and qualitative differences.
After investigating this bug, we attribute it to the different boundary conditions. Looking into convolve2d
documentation tells us that the input array is padded with zeroes before applying the kernel. In the alpha, we padded the output!
Which one is right? Arguably the SciPy implementation makes more sense. In this case we should adopt the new version as the oracle, and if required, fix the alpha version to match it. This is common in scientific Software Development: new information of how to do things better changes the oracles and the tests.
In this case, the fix is easy, pad the array with zeros prior to processing it.
def sobel_v2(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:
# ...
arr = np.pad(arr, (1,)) # After padding, it is shaped (nx + 2, ny + 2)
s = np.zeros_like(arr)
for ix in range(1, arr.shape[0] - 1):
for iy in range(1, arr.shape[1] - 1):
s1 = (Gx * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
s2 = (Gy * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
s[ix - 1, iy - 1] = np.hypot(s1, s2) # Adjust indices
return s
Once we corrected out function, we can update the oracles and tests which rely on them.
Final Thoughts
We saw how to put in practice some of the software development ideas explored in this article. We also saw some tools which you can use to develop high-quality, high-performance code.
I suggest you try some of these ideas on your own projects. In particular, practice profiling code and improving it. The Sobel filter function we coded is very inefficient, I suggest trying to improve it.
For example, try CPU parallelization with a JIT compiler such as Numba, porting the inner loop into Cython, or implementing a CUDA GPU function with Numba or CuPy. Feel free to check out my series on coding CUDA kernels with Numba.