The world’s leading publication for data science, AI, and ML professionals.

Developing Scientific Software

Part 2: Practical Aspects with Python

In this article we will follow the tenets of TDD for developing Scientific Software as laid out in the first installment of this series to develop an edge detection filter known as the Sobel filter.

In the first article, we talked about how important – and tricky – it can be to develop reliable testing methods for software which the complex problems often found in scientific software. We also saw how to overcome those issues by following a development cycle inspired by TDD, but adapted for scientific computing. I reproduce a shortened version of these instructions below.

Implementation cycle

  1. Gather requirements
  2. Sketch the design
  3. Implement initial tests
  4. Implement your alpha version
  5. Build an oracle library
  6. Revisit tests
  7. Implement profiling

Optimization cycle

  1. Optimize
  2. Reimplement

New method cycle

  1. Implement new method
  2. Validate against previous curated oracles

Getting Started: The Sobel Filter

In this article, we will follow the above instructions to develop a function which applies the Sobel filter. The Sobel filter is a commonly used computer vision tool to detect or enhance edges in images. Keep reading to see some examples!

Starting with the first step, we gather some requirements. We will follow the standard formulation of the Sobel filter described in this article. Simply put, the Sobel operator consists of convolving image A with the following two 3 × 3 kernels, squaring each pixel in the resulting images, summing them and taking the point-by-point square root. If Ax and Ay are the images resulting from the convolutions, the result of the Sobel filter S is √(Ax² + Ay²).

Requirements

We want this function to take any 2D array and generate another 2D array. We might want it to operate on any two axes of an ndarray. We might even want it to work on more (or less) than two axes. We might have specifications for how to handle the edges of the array.

For now let’s remember to keep it simple, and start with a 2D implementation. But before we do that, let’s sketch the design.

Sketch the Design

We start with a simple design, leveraging Python‘s annotations. I highly recommend annotating as much as possible, and using mypy as a linter.

from typing import Tuple

from numpy.core.multiarray import normalize_axis_index
from numpy.typing import NDArray

def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:
    """Compute the Sobel filter of an image

    Parameters
    ----------
    arr : NDArray
        Input image
    axes : Tuple[int, int], optional
        Axes over which to compute the filter, by default (-2, -1)

    Returns
    -------
    NDArray
        Output
    """
    # Only accepts 2D arrays
    if arr.ndim != 2:
        raise NotImplementedError

    # Ensure that the axis[0] is the first axis, and axis[1] is the second
    # axis. The obscure `normalize_axis_index` converts negative indices to
    # indices between 0 and arr.ndim - 1.
    if any(
        normalize_axis_index(ax, arr.ndim) != i
        for i, ax in zip(range(2), axes)
    ):
        raise NotImplementedError
    pass

Implement Tests

This function doesn’t do much. But it is documented, annotated, and its current limitations are baked into it. Now that we have a design, we immediately pivot to tests.

First, we notice that empty images (all zeroes) have no edges. So they must output zeroes as well. In fact, any constant image should also return zeros. Let’s bake that into out first tests. We will also see how we can use monkey testing to test many arrays.

# test_zero_constant.py

import numpy as np
import pytest

# Test multiple dtypes at once
@pytest.mark.parametrize(
    "dtype",
    ["float16", "float32", "float64", "float128"],
)
def test_zero(dtype):
    # Set random seed
    seed = int(np.random.rand() * (2**32 - 1))
    np.random.seed(seed)

    # Create a 2D array of random shape and fill with zeros
    nx, ny = np.random.randint(3, 100, size=(2,))
    arr = np.zeros((nx, ny), dtype=dtype)

    # Apply sobel function
    arr_sob = sobel(arr)

    # `assert_array_equal` should fail most of the times.
    # It will only work when `arr_sob` is identically zero,
    # which is usually not the case.
    # DO NOT USE!
    # np.testing.assert_array_equal(
    #     arr_sob, 0.0, err_msg=f"{seed=} {nx=}, {ny=}"
    # )

    # `assert_almost_equal` can fail when used with high decimals.
    # It also relies on float64 checking, which might fail for
    # float128 types.
    # DO NOT USE!
    # np.testing.assert_almost_equal(
    #     arr_sob,
    #     np.zeros_like(arr),
    #     err_msg=f"{seed=} {nx=}, {ny=}",
    #     decimal=4,
    # )

    # `assert_allclose` with custom tolerance is my preferred method
    # The 10 is arbitrary and depends on the problem. If a method
    # which you know to be correct does not pass, increase to 100, etc.
    # If the tolerance needed to make the tests pass is too high, make
    # sure the method is actually correct.
    tol = 10 * np.finfo(arr.dtype).eps
    err_msg = f"{seed=} {nx=}, {ny=} {tol=}"  # Log seeds and other info
    np.testing.assert_allclose(
        arr_sob,
        np.zeros_like(arr),
        err_msg=err_msg,
        atol=tol,  # rtol is useless for desired=zeros
    )

@pytest.mark.parametrize(
    "dtype", ["float16", "float32", "float64", "float128"]
)
def test_constant(dtype):
    seed = int(np.random.rand() * (2**32 - 1))
    np.random.seed(seed)

    nx, ny = np.random.randint(3, 100, size=(2,))
    constant = np.random.randn(1).item()
    arr = np.full((nx, ny), fill_value=constant, dtype=dtype)
    arr_sob = sobel(arr)

    tol = 10 * np.finfo(arr.dtype).eps
    err_msg = f"{seed=} {nx=}, {ny=} {tol=}"
    np.testing.assert_allclose(
        arr_sob,
        np.zeros_like(arr),
        err_msg=err_msg,
        atol=tol,  # rtol is useless for desired=zeros
    )

This code snippet can be run from the command-line with

$ pytest -qq -s -x -vv --durations=0 test_zero_constant.py

Alpha Version

Of course our tests will currently fail, but they are enough for now. Let’s implement a first version.

from typing import Tuple

import numpy as np
from numpy.core.multiarray import normalize_axis_index
from numpy.typing import NDArray

def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:
    if arr.ndim != 2:
        raise NotImplementedError
    if any(
        normalize_axis_index(ax, arr.ndim) != i
        for i, ax in zip(range(2), axes)
    ):
        raise NotImplementedError

    # Define our filter kernels. Notice they inherit the input type, so
    # that a float32 input never has to be cast to float64 for computation.
    # But can you see where using another dtype for Gx and Gy might make
    # sense for some input dtypes?
    Gx = np.array(
        [[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],
        dtype=arr.dtype,
    )
    Gy = np.array(
        [[-1, -2, -1], [0, 0, 0], [1, 2, 1]],
        dtype=arr.dtype,
    )

    # Create the output array and fill with zeroes
    s = np.zeros_like(arr)
    for ix in range(1, arr.shape[0] - 1):
        for iy in range(1, arr.shape[1] - 1):
            # Pointwise multiplication followed by sum, aka convolution
            s1 = (Gx * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
            s2 = (Gy * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
            s[ix, iy] = np.hypot(s1, s2)  # np.sqrt(s1**2 + s2**2)
    return s

With this new function, all our tests should pass, and we should get an output like this:

$ pytest -qq -s -x -vv --durations=0 test_zero_constant.py
........
======================================== slowest durations =========================================
0.09s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float16]
0.08s call     t_049988eae7f94139a7067f142bf2852f.py::test_zero[float64]
0.06s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float128]
0.04s call     t_049988eae7f94139a7067f142bf2852f.py::test_zero[float128]
0.04s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float64]
0.02s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float32]
0.01s call     t_049988eae7f94139a7067f142bf2852f.py::test_zero[float16]

(17 durations < 0.005s hidden.  Use -vv to show these durations.)
8 passed in 0.35s

We have so far:

  1. Gathered requirements of our problem.
  2. Sketched an initial design.
  3. Implemented some tests.
  4. Implemented the alpha version, which passes these tests.

These tests provide verification for future versions, as well as a very rudimentary oracle library. But while unit tests are great at capturing minute deviations from expected results, they are not great at differentiating wrong results from quantitatively different – but still correct – results. Suppose tomorrow we want to implement another Sobel-type operator, which has a longer kernel. We don’t expect the results to match exactly to our current version, but we do expect the outputs of both functions to be qualitatively very similar.

In addition, it is excellent practice to try many different inputs to our functions to get a sense of how they behave for different inputs. This ensures that we scientifically validate the results.

Scikit-image has an excellent library of images, which we can use to create oracles.

# !pip installscikit-image pooch
from typing import Dict, Callable

import numpy as np
import skimage.data

bwimages: Dict[str, np.ndarray] = {}
for attrname in skimage.data.__all__:
    attr = getattr(skimage.data, attrname)
    # Data are obtained via function calls
    if isinstance(attr, Callable):
        try:
            # Download the data
            data = attr()
            # Ensure it is a 2D array
            if isinstance(data, np.ndarray) and data.ndim == 2:
                # Convert from various int types to float32 to better
                # assess precision
                bwimages[attrname] = data.astype(np.float32)
        except:
            continue

# Apply sobel to images
bwimages_sobel = {k: sobel(v) for k, v in bwimages.items()}

Once we have the data, we can plot it.

import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable

def create_colorbar(im, ax):
    divider = make_axes_locatable(ax)
    cax = divider.append_axes("right", size="5%", pad=0.1)
    cb = ax.get_figure().colorbar(im, cax=cax, orientation="vertical")
    return cax, cb

for name, data in bwimages.items():
    fig, axs = plt.subplots(
        1, 2, figsize=(10, 4), sharex=True, sharey=True
    )
    im = axs[0].imshow(data, aspect="equal", cmap="gray")
    create_colorbar(im, axs[0])
    axs[0].set(title=name)

    im = axs[1].imshow(bwimages_sobel[name], aspect="equal", cmap="gray")
    create_colorbar(im, axs[1])
    axs[1].set(title=f"{name} sobel")

These look really good! I would recommend storing these, both as arrays and as figures which I can quickly check against for a new version. I highly recommend HD5F for array storage. You can also set up a Sphinx Gallery to directly generate the figures them when updating documentation, that way you have a reproducible figure build that you can use to check against previous versions.

After the results have been validated, you can store them on disk and use them as part of your unit testing. Something like this:

oracle_library = [(k, v, bwimages_sobel[k]) for k, v in bwimages.items()]
# save_to_disk(oracle_library, ...)
# test_oracle.py
import numpy as np
import pytest
from numpy.typing import NDArray

# oracle_library = read_from_disk(...)

@pytest.mark.parametrize("name,input,output", oracle_library)
def test_oracles(name: str, input: NDArray, output: NDArray):
    output_new = sobel(input)
    tol = 10 * np.finfo(input.dtype).eps
    mean_avg_error = np.abs(output_new - output).mean()
    np.testing.assert_allclose(
        output_new,
        output,
        err_msg=f"{name=} {tol=} {mean_avg_error=}",
        atol=tol,
        rtol=tol,
    )

Profiling

Computing the Sobel filter for these datasets took a while! So the next step is to profile the code. I recommend creating a benchmark_xyz.py file for each test, and re-run them regularly to probe for performance regressions. This can even be part of your unit testing, but we won’t go so far in this example. Another idea is to use the oracles above for benchmarking.

There are many ways of timing code execution. To get the system-wide, "real" elapsed time, use perf_counter_ns from the built-in time module to measure time in nanoseconds. In a Jupyter notebook, the built-in %%timeit cell magic times a certain cell execution. The decorator below is inspired by this cell magic and can be used to time any function.

import time
from functools import wraps
from typing import Callable, Optional

def sizeof_fmt(num, suffix="s"):
    for unit in ["n", "μ", "m"]:
        if abs(num) < 1000:
            return f"{num:3.1f} {unit}{suffix}"
        num /= 1000
    return f"{num:.1f}{suffix}"

def timeit(
    func_or_number: Optional[Callable] = None,
    number: int = 10,
) -> Callable:
    """Apply to a function to time its executions.

    Parameters
    ----------
    func_or_number : Optional[Callable], optional
        Function to be decorated or `number` argument (see below), by
        default None
    number : int, optional
        Number of times the function will run to obtain statistics, by
        default 10

    Returns
    -------
    Callable
        When fed a function, returns the decorated function. Otherwise
        returns a decorator.

    Examples
    --------

    .. code-block:: python

        @timeit
        def my_fun():
            pass

        @timeit(100)
        def my_fun():
            pass

        @timeit(number=3)
        def my_fun():
            pass

    """
    if isinstance(func_or_number, Callable):
        func = func_or_number
        number = number
    elif isinstance(func_or_number, int):
        func = None
        number = func_or_number
    else:
        func = None
        number = number

    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            runs_ns = np.empty((number,))

            # Run first and measure store the result
            start_time = time.perf_counter_ns()
            result = f(*args, **kwargs)
            runs_ns[0] = time.perf_counter_ns() - start_time
            for i in range(1, number):
                start_time = time.perf_counter_ns()
                f(*args, **kwargs)  # Without storage, faster
                runs_ns[i] = time.perf_counter_ns() - start_time
            time_msg = f"{sizeof_fmt(runs_ns.mean())} ± "
            time_msg += f"{sizeof_fmt(runs_ns.std())}"
            print(
                f"{time_msg} per run (mean ± std. dev. of {number} runs)"
            )
            return result

        return wrapper

    if func is not None:
        return decorator(func)
    return decorator

Putting our function to the test:

arr_test = np.random.randn(500, 500)
sobel_timed = timeit(sobel)
sobel_timed(arr_test);
# 3.9s ± 848.9 ms per run (mean ± std. dev. of 10 runs)

Not too fast 🙁

When investigating slowness or performance regressions, it is also possible to track the runtime of each line. The line_profiler library is an excellent resource for this. It can be used via Jupyter cell magic, or using the API. Here is an API example:

from line_profiler import LineProfiler

lp = LineProfiler()
lp_wrapper = lp(sobel)
lp_wrapper(arr_test)
lp.print_stats(output_unit=1)  # 1 for seconds, 1e-3 for milliseconds, etc.

Here is an example output:

Timer unit: 1 s

Total time: 4.27197 s
File: /tmp/ipykernel_521529/1313985340.py
Function: sobel at line 8

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     8                                           def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:
     9                                               # Only accepts 2D arrays
    10         1          0.0      0.0      0.0      if arr.ndim != 2:
    11                                                   raise NotImplementedError
    12                                           
    13                                               # Ensure that the axis[0] is the first axis, and axis[1] is the second
    14                                               # axis. The obscure `normalize_axis_index` converts negative indices to
    15                                               # indices between 0 and arr.ndim - 1.
    16         1          0.0      0.0      0.0      if any(
    17                                                   normalize_axis_index(ax, arr.ndim) != i
    18         1          0.0      0.0      0.0          for i, ax in zip(range(2), axes)
    19                                               ):
    20                                                   raise NotImplementedError
    21                                           
    22         1          0.0      0.0      0.0      Gx = np.array(
    23         1          0.0      0.0      0.0          [[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],
    24         1          0.0      0.0      0.0          dtype=arr.dtype,
    25                                               )
    26         1          0.0      0.0      0.0      Gy = np.array(
    27         1          0.0      0.0      0.0          [[-1, -2, -1], [0, 0, 0], [1, 2, 1]],
    28         1          0.0      0.0      0.0          dtype=arr.dtype,
    29                                               )
    30         1          0.0      0.0      0.0      s = np.zeros_like(arr)
    31       498          0.0      0.0      0.0      for ix in range(1, arr.shape[0] - 1):
    32    248004          0.1      0.0      2.2          for iy in range(1, arr.shape[1] - 1):
    33    248004          1.8      0.0     41.5              s1 = (Gx * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
    34    248004          1.7      0.0     39.5              s2 = (Gy * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
    35    248004          0.7      0.0     16.8              s[ix, iy] = np.hypot(s1, s2)
    36         1          0.0      0.0      0.0      return s

Lot’s of time is spend inside the innermost loop. NumPy prefers vectorized math, as it can then rely on compiled code. Since we are using explicit for loops, it makes sense that this function is very slow.

In addition, it is important to be smart about memory allocations inside of loops. NumPy is somewhat smart about allocating small amounts of memory inside loops, so the lines defining s1 or s2 might not be allocating new memory. But they also could be, or there could be some other memory allocation that is happening under the hood that we are not aware of. I therefore recommend also profiling memory. I like using Memray for that, but there are others like Fil and Sciagraph. I would also avoid memory_profiler which (very unfortunately!) is no longer maintained. Also Memray is much more powerful. Here is an example of Memray in action:

$ # Create sobel.bin which holds the profiling information
$ memray run -fo sobel.bin --trace-python-allocators sobel_run.py
Writing profile results into sobel.bin
Memray WARNING: Correcting symbol for aligned_alloc from 0x7fc5c984d8f0 to 0x7fc5ca4a5ce0
[memray] Successfully generated profile results.

You can now generate reports from the stored allocation records.
Some example commands to generate reports:

python3 -m memray flamegraph sobel.bin
$ # Generate flame graph
$ memray flamegraph -fo sobel_flamegraph.html --temporary-allocations sobel.bin
⠙ Calculating high watermark... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸  99% 0:00:0100:01
⠏ Processing allocation records... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸  98% 0:00:0100:01
Wrote sobel_flamegraph.html
$ # Show memory tree
$ memray tree --temporary-allocations sobel.bin

⠧ Calculating high watermark... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 100% 0:00:0100:01
⠧ Processing allocation records... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 100% 0:00:0100:01

Allocation metadata
-------------------
Command line arguments: 
'memray run -fo sobel.bin --trace-python-allocators sobel_run.py'
Peak memory size: 11.719MB
Number of allocations: 15332714

Biggest 10 allocations:
-----------------------
📂    123.755MB (100.00 %) <ROOT>  
└── [[3 frames hidden in 2 file(s)]]
    └── 📂    123.755MB (100.00 %) _run_code  /usr/lib/python3.10/runpy.py:86
        ├── 📂    122.988MB (99.38 %) <module>  sobel_run.py:40
        │   ├── 📄    51.087MB (41.28 %) sobel  sobel_run.py:35
        │   ├── [[1 frames hidden in 1 file(s)]]
        │   │   └── 📄    18.922MB (15.29 %) _sum  
        │   │       lib/python3.10/site-packages/numpy/core/_methods.py:49
        │   └── [[1 frames hidden in 1 file(s)]]
        │       └── 📄    18.921MB (15.29 %) _sum  
        │           lib/python3.10/site-packages/numpy/core/_methods.py:49
...

Beta Version and a Bug

Now that we have a working alpha and some profiling functions, we will leverage the SciPy library to obtain a much faster version.

from typing import Tuple

import numpy as np
from numpy.core.multiarray import normalize_axis_index
from numpy.typing import NDArray
from scipy.signal import convolve2d

def sobel_conv2d(
    arr: NDArray, axes: Tuple[int, int] = (-2, -1)
) -> NDArray:
    if arr.ndim != 2:
        raise NotImplementedError
    if any(
        normalize_axis_index(ax, arr.ndim) != i
        for i, ax in zip(range(2), axes)
    ):
        raise NotImplementedError

    # Create the kernels as a single, complex array. Allows us to use
    # np.abs instead of np.hypot to calculate the magnitude.
    G = np.array(
        [[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],
        dtype=arr.dtype,
    )
    G = G + 1j * np.array(
        [[-1, -2, -1], [0, 0, 0], [1, 2, 1]],
        dtype=arr.dtype,
    )
    s = convolve2d(arr, G, mode="same")
    np.absolute(s, out=s)  # In-place abs
    return s.real
sobel_timed = timeit(sobel_conv2d)
sobel_timed(arr_test)
# 14.3 ms ± 1.71 ms per loop (mean ± std. dev. of 10 runs)

Much better! But is it right?

The images look very similar, but if you notice the color scale, they are not the same. Running the tests flags a small mean average error. Thankfully, we are now well-equipped at detecting quantitative and qualitative differences.

After investigating this bug, we attribute it to the different boundary conditions. Looking into convolve2d documentation tells us that the input array is padded with zeroes before applying the kernel. In the alpha, we padded the output!

Which one is right? Arguably the SciPy implementation makes more sense. In this case we should adopt the new version as the oracle, and if required, fix the alpha version to match it. This is common in scientific Software Development: new information of how to do things better changes the oracles and the tests.

In this case, the fix is easy, pad the array with zeros prior to processing it.

def sobel_v2(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:
    # ...
    arr = np.pad(arr, (1,))  # After padding, it is shaped (nx + 2, ny + 2)
    s = np.zeros_like(arr)
    for ix in range(1, arr.shape[0] - 1):
        for iy in range(1, arr.shape[1] - 1):
            s1 = (Gx * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
            s2 = (Gy * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
            s[ix - 1, iy - 1] = np.hypot(s1, s2)  # Adjust indices
    return s

Once we corrected out function, we can update the oracles and tests which rely on them.

Final Thoughts

We saw how to put in practice some of the software development ideas explored in this article. We also saw some tools which you can use to develop high-quality, high-performance code.

I suggest you try some of these ideas on your own projects. In particular, practice profiling code and improving it. The Sobel filter function we coded is very inefficient, I suggest trying to improve it.

For example, try CPU parallelization with a JIT compiler such as Numba, porting the inner loop into Cython, or implementing a CUDA GPU function with Numba or CuPy. Feel free to check out my series on coding CUDA kernels with Numba.


Related Articles