Speed up numpy looking for best indices

I have a numpy array that maps x-y-coordinates to the appropriate z-coordinates. For this I use a 2D array that represents x and y as its axes and contains the corresponding z values:

import numpy as np
x_size = 2000
y_size = 2500
z_size = 400
rng = np.random.default_rng(123)
z_coordinates = np.linspace(0, z_size, y_size) + rng.laplace(0, 1, (x_size, y_size))

So each of the 2000*2500 x-y-points is assigned a z-value (float between 0 and 400). Now I want to look up for each integer z and integer x which is the closest y-value, essentially creating a map that is of shape (x_size, z_size) and holds the best y-values.

The simplest approach is creating an empty array of target shape and iterating over each z value:

y_coordinates = np.empty((x_size, z_size), dtype=np.uint16)
for i in range(z_size):
    y_coordinates[:, i] = np.argmin(
        np.abs(z_coordinates - i),
        axis=1,
    )

however this takes about 11 s on my machine, which unfortunately is way to slow.

Surely using a more vectorised approach would be faster, such as:

y_coordinates = np.argmin(
    np.abs(
        z_coordinates[..., np.newaxis] - np.arange(z_size)
    ),
    axis=1,
)

Surprisingly this runs about 60% slower than the version above (tested at 1/10th size, since at full size this uses excessive memory).

Also wrapping the code blocks in functions and decorating them with numba’s @jit(nopython=True) doesn’t help.

How can I speed up the calculation?

6

IMPROVEMENT 1

The following approach completes the task in approximately 0.4 seconds on my machine. The key idea is to avoid using np.argmin and instead leverage binary search (np.searchsorted) on a pre-sorted array of z-coordinates.

This approach avoids the inefficiency of repeatedly using np.argmin by pre-sorting the z-coordinates for each x and then employing np.searchsorted to efficiently find the closest z-values. By sorting the z-coordinates with a complexity of O(N log N) and using binary search for each integer z, which has a complexity of O(log N), the method significantly reduces computational load compared to the original approach. This strategy involves finding the insertion point for each integer z in the sorted z-coordinates and comparing it with its neighbors to determine the closest z-value. This eliminates the need for redundant calculations and ensures that only necessary comparisons are made, thereby improving performance.

x_size = 2000
y_size = 2500
z_size = 400
rng = np.random.default_rng(123)

z_coordinates = np.linspace(0, z_size, y_size) + rng.laplace(0, 1, (x_size, y_size))

# Sort the z_coordinates for each x and get sorted indices
idx_sorted = np.argsort(z_coordinates, axis=1)
z_sorted = np.take_along_axis(z_coordinates, idx_sorted, axis=1)

zs = np.arange(z_size, dtype=np.float64)

y_coordinates = np.empty((x_size, z_size), dtype=np.uint16)

for x in range(x_size):
    positions = np.searchsorted(z_sorted[x, :], zs)
    positions = np.clip(positions, 1, y_size - 1)
    z_prev = z_sorted[x, positions - 1]
    z_curr = z_sorted[x, positions]
    diffs_prev = np.abs(z_prev - zs)
    diffs_curr = np.abs(z_curr - zs)
    closest = np.where(diffs_prev < diffs_curr, positions - 1, positions)
    y_coordinates[x, :] = idx_sorted[x, closest]

IMPROVEMENT 2

One way of improving performance is to add the following line of code:

z_coordinates = z_coordinates.astype(np.float32).copy(order='C')

Ensuring a C-contiguous memory layout and using float32 instead of float64 significantly speeds up computation by optimizing memory access patterns and reducing data size.

  • C-contiguous arrays store elements in a row-major order, improving cache efficiency and enabling vectorized operations by minimizing cache misses and leveraging CPU optimizations.

  • Switching to float32 halves memory usage, reducing memory bandwidth demands and allowing more data to fit into the CPU cache, which is especially beneficial for large arrays.

These changes align the data with the CPU’s expectations, enabling faster and more efficient processing, which in your case approximately halves the execution time. The primary improvement comes from using float32 instead of float64, significantly reducing memory usage and enhancing performance.

7

for some reason on my machine using a for loop is faster than vectorization:

for j in range(x_size):
    y_coordinates[j, i] = np.argmin(np.abs(z_coordinates[j, :] - i))

Not sure what is the reason behind this.

However, if this is how you actually generate the z_coordinates, I believe you can speed it up by eliminating unlikely choices of y as for each y, z is likely to be distributed within a small range:

y_coordinates_new = np.empty((x_size, z_size), dtype=np.uint16)
m_min = np.min(z_coordinates, axis = 0)[:, np.newaxis] - np.arange(z_size)
m_max = np.max(z_coordinates, axis = 0)[:, np.newaxis] - np.arange(z_size)
m_range_min = (m_min*m_max > 0) * np.minimum(np.abs(m_min), np.abs(m_max))
m_range_max = np.maximum(np.abs(m_min), np.abs(m_max))

for i in range(z_size):
    candidates = np.where(m_range_min[:, i] <= np.min(m_range_max[:, i]))[0]
    y_coordinates_new[:, i] = candidates[np.argmin(
       np.abs(z_coordinates[:, candidates] - i),
       axis=1,
    )]

On my machine this is 16x faster for your example. Obviously it would be slower if you increase the variance of your random part.


Explained:

The idea is that your samples (z) are sparse by y, meaning that z values are distributed within 2500 relatively small “buckets”. So to find the point closest to i, we don’t need to check all the buckets as the point we are looking for would likely only exist in a few candidate buckets.

First we calculate what is the minimum distance and maximum distance to each i, within each bucket:

# min and max (z-i) in each bucket
m_min = np.min(z_coordinates, axis = 0)[:, np.newaxis] - np.arange(z_size)
m_max = np.max(z_coordinates, axis = 0)[:, np.newaxis] - np.arange(z_size)

# min and max possible distance to i in each bucket
m_range_min = (m_min*m_max > 0) * np.minimum(np.abs(m_min), np.abs(m_max))
m_range_max = np.maximum(np.abs(m_min), np.abs(m_max))

The minimum distance is a rough estimate for speed, if i is between the min z value and max z value in each bucket, I just assume minimum possible distance to i is 0. You could probably do better here by finding the closest value to 0.

Now, for each of these buckets, we know what is the minimum/maximum possible distance to i. We can say that given two buckets a, b, and respectively min and max distance [d1_a, d2_a], [d1_b, d2_b], we can rule out b in favor of a if d1_b > d2_a. This is the case where all points in bucket b would be further away from i compared to any points in a.

With this idea in mind, our candidates, defined as buckets where it is not strictly “worse” than any of our buckets, can be find via:

np.where(m_range_min[:, i] <= np.min(m_range_max[:, i]))[0]

Think of it this way, if d1 (the min distance) is larger than any d2 (the max distance) of any other bucket we have, there is no reason to keep this bucket.

This method is somewhat a math hack, what is nice about it is that you can combine with what other people mentioned here, like numba, presort, etc. Presort is a good idea, as there is really no point to search multiple times through the same array. We could do it further by maintaining some search index to only increase it when doing the loop, but I am afraid that might be slower than numpy’s vectorization without some careful implementation.

3

This answer provide an algorithm with an optimal complexity: O(x_size * (y_size + z_size)). This algorithm is the fastest one proposed so far (by a large margin). It is implemented in Numba using multiple threads.


Explanation of the approach

The idea is that there is no need to iterate over all Z values : we can iterate over z_coordinates line by line, and for each line of z_coordinates, we fill an array used to find the nearest value for each possible z. The best candidate for the value z is stored in arr[z].

In practice, there are tricky corner cases making things a more complicated. For example, due to rounding, I decided to fill the neighbours of arr[z] (i.e. arr[z-1] and arr[z+1]) so to make the algorithm simpler. Moreover, when there are not enough values so arr cannot be fully filled by all the values in a line of z_coordinates, we need to fill the holes in the arr. In some more complicated cases (combining rounding issue while kind of holes in arr), we need to correct the values in arr (or operate on more distant neighbours which is not efficient). The number of step in the correction function should always be a small constant, certainly <= 3 (it nerver reached 3 in practice in my tests). Note that, in practice, no corner case happens on the specific input dataset provided.

Each line is computed in parallel using multiple threads. I assume the array is not too small (to avoid to deal with more corner cases in the code and make it simpler) which should not be an issue. I also assume there are no special values like NaN in z_coordinates.


Resulting code

Here is the final code:

import numba as nb
import numpy as np

@nb.njit('(float64[:,::1], int64)', parallel=True)
def compute_loops(z_coordinates, z_size):
    x_size, y_size = z_coordinates.shape
    assert x_size > 0 and y_size > 0
    y_coordinates = np.empty((x_size, z_size), dtype=np.uint16)
    for x in nb.prange(x_size):
        for i in range(z_size):
            pos = 0
            minVal = np.abs(z_coordinates[x, 0] - i)
            for y in range(1, y_size):
                val = np.abs(z_coordinates[x, y] - i)
                if minVal > val:
                    pos = y
                    minVal = val
            y_coordinates[x, i] = pos # np.argmin(np.abs(z_coordinates[x, :] - i))
    return y_coordinates

# Fill the missing values in the value-array if there is not enough values (e.g. pretty large z_size)
# (untested)
@nb.njit('(float64[::1], uint16[::1], int64)')
def fill_missing_values(all_val, all_pos, z_size):
    i = 0
    while i < z_size:
        # If there is a missing value
        if all_pos[i] == 0xFFFF:
            j = i
            while j < z_size and all_pos[j] == 0xFFFF:
                j += 1
            if i == 0:
                # Fill the hole based on 1 value (lower bound)
                assert j+1 < z_size and all_pos[j] == 0xFFFF and all_pos[j+1] != 0xFFFF
                for i2 in range(i, j+1):
                    all_val[i2] = all_val[j+1]
                    all_pos[i2] = all_pos[j+1]
            elif j == z_size:
                # Fill the hole based on 1 value (upper bound)
                assert i-1 >= 0 and all_pos[i-1] != 0xFFFF and all_pos[i] == 0xFFFF
                for i2 in range(i, j+1):
                    all_val[i2] = all_val[i-1]
                    all_pos[i2] = all_pos[i-1]
            else:
                assert i-1 >= 0 and j < z_size and all_val[i-1] != 0xFFFF and all_pos[j] != 0xFFFF
                lower_val = all_val[i-1]
                lower_pos = all_pos[i-1]
                upper_val = all_val[j]
                upper_pos = all_pos[j]
                # Fill the hole based on 2 values
                for i2 in range(i, j+1):
                    if np.abs(lower_val - i2) < np.abs(upper_val - i2):
                        all_val[i2] = lower_val
                        all_pos[i2] = lower_pos
                    else:
                        all_val[i2] = upper_val
                        all_pos[i2] = upper_pos
            i = j
        i += 1

# Correct values in very pathological cases where z_size is big so there are not enough 
# values added to the value-array causing some values of the value-array to be incorrect.
# The number of `while` iteration should be always <= 3 in practice
@nb.njit('(float64[::1], uint16[::1], int64)')
def correct_values(all_val, all_pos, z_size):
    while True:
        stop = True
        for i in range(0, z_size-1):
            current = np.abs(all_val[i] - i)
            if np.abs(all_val[i+1] - i) < current:
                all_val[i] = all_val[i+1]
                all_pos[i] = all_pos[i+1]
                stop = False
        for i in range(1, z_size):
            current = np.abs(all_val[i] - i)
            if np.abs(all_val[i-1] - i) < current:
                all_val[i] = all_val[i-1]
                all_pos[i] = all_pos[i-1]
                stop = False
        if stop:
            break

@nb.njit('(float64[:,::1], int64)', parallel=True)
def compute_fastest(z_coordinates, z_size):
    x_size, y_size = z_coordinates.shape
    assert y_size >= 2 and z_size >= 2
    y_coordinates = np.empty((x_size, z_size), dtype=np.uint16)
    for x in nb.prange(x_size):
        all_pos = np.full(z_size, 0xFFFF, dtype=np.uint16)
        all_val = np.full(z_size, np.inf, dtype=np.float64)
        for y in range(0, y_size):
            val = z_coordinates[x, y]
            #assert not np.isnan(val)
            if val < 0: # Lower bound
                i = 0
                if np.abs(val - i) < np.abs(all_val[i] - i):
                    all_val[i] = val
                    all_pos[i] = y
            elif val >= z_size: # Upper bound
                i = z_size - 1
                if np.abs(val - i) < np.abs(all_val[i] - i):
                    all_val[i] = val
                    all_pos[i] = y
            else: # Inside the array of values
                offset = np.int32(val)
                for i in range(max(offset-1, 0), min(offset+2, z_size)):
                    if np.abs(val - i) < np.abs(all_val[i] - i):
                        all_val[i] = val
                        all_pos[i] = y
        fill_missing_values(all_val, all_pos, z_size)
        correct_values(all_val, all_pos, z_size)
        for i in range(0, z_size):
            y_coordinates[x, i] = all_pos[i]
    return y_coordinates

Performance results

Here are performance results on my machine with a i5-9600KF CPU (6 cores), Numpy 1.24.3, Numba 58.1, on Windows, for the provided input:

Naive fully vectorized code in the question:   113000 ms  (slow due to swapping)
Naive loop in the question:                      8460 ms
ZLi's implementation:                            1964 ms
Naive Numba parallel code with loops:             402 ms
PaulS' implementation:                            262 ms
This Numba code:                                   12 ms  <----------

Note the fully-vectorized code in the question use so much memory it cause memory swapping. It completely saturate my 32 GiB of RAM (about 24 GiB was available in practice) which is clearly not reasonable!

Note the PaulS’ implementation is about equally fast with 32-bit and 64-bit on my machine. This is probably because the operation is compute-bound on my machine (dependent of the speed of the RAM).

This Numba implementation is 705 times faster than the fastest implementation in the question. It is also 22 times faster than the best answer so far! It also use a tiny amount of additional RAM for the computation (<1 MiB).

The closest-y is ambiguous. For example, if 2 values are equal, then argmin will return the first index it sees. Which might not be desirable.

With contour lines you can disambiguate these cases. For that, there is contourpy.

import contourpy as cp
z_contour_gen = cp.contour_generator(z=z_coordinates, name="threaded", chunk_count=5)

creates a contour generator which uses all threads. Then call

[z_contour_gen.lines(level=i) for i in range(z_size)] # or multi_lines

and this gave a ~30% speed up for me, and more information on z.

New contributor

globLearner is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật