Skip to content
This repository was archived by the owner on Feb 2, 2024. It is now read-only.

Optimize series.rolling.sum() #608

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 80 additions & 10 deletions sdc/datatypes/hpat_pandas_series_rolling_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from sdc.datatypes.common_functions import _sdc_pandas_series_align
from sdc.datatypes.hpat_pandas_series_rolling_types import SeriesRollingType
from sdc.hiframes.pd_series_type import SeriesType
from sdc.utilities.prange_utils import parallel_chunks
from sdc.utilities.sdc_typing_utils import TypeChecker
from sdc.utilities.utils import sdc_overload_method, sdc_register_jitable

Expand Down Expand Up @@ -214,12 +215,6 @@ def arr_std(arr, ddof):
return arr_var(arr, ddof) ** 0.5


@sdc_register_jitable
def arr_sum(arr):
"""Calculate sum of values"""
return arr.sum()


@sdc_register_jitable
def arr_var(arr, ddof):
"""Calculate unbiased variance of values"""
Expand Down Expand Up @@ -308,12 +303,87 @@ def apply_minp(arr, ddof, minp):
gen_hpat_pandas_series_rolling_impl(arr_skew))
hpat_pandas_rolling_series_std_impl = register_jitable(
gen_hpat_pandas_series_rolling_ddof_impl(arr_std))
hpat_pandas_rolling_series_sum_impl = register_jitable(
gen_hpat_pandas_series_rolling_impl(arr_sum))
hpat_pandas_rolling_series_var_impl = register_jitable(
gen_hpat_pandas_series_rolling_ddof_impl(arr_var))


@sdc_register_jitable
def pop_sum(value, nfinite, result):
"""Calculate the window sum without old value."""
if numpy.isfinite(value):
nfinite -= 1
result -= value

return nfinite, result


@sdc_register_jitable
def put_sum(value, nfinite, result):
"""Calculate the window sum with new value."""
if numpy.isfinite(value):
nfinite += 1
result += value

return nfinite, result


@sdc_register_jitable
def result_or_nan(nfinite, minp, result):
"""Get result taking into account min periods."""
if nfinite < minp:
return numpy.nan

return result


def gen_sdc_pandas_series_rolling_impl(pop, put, init_result=numpy.nan):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider the following option:

@sdc_register_jitable
def result_or_nan(nfinite, minp, result):
    if nfinite < minp:
        return numpy.nan
    
    return result


def gen_sdc_pandas_series_rolling_impl(pop, put, init_result=numpy.nan):
    """Generate series rolling methods implementations based on pop/put funcs"""
    def impl(self):
        win = self._window
        minp = self._min_periods

        input_series = self._data
        input_arr = input_series._data
        length = len(input_arr)
        output_arr = numpy.empty(length, dtype=float64)

        chunks = parallel_chunks(length)
        for i in prange(len(chunks)):
            chunk = chunks[i]
            nfinite = 0
            result = init_result

            prelude_start = max(0, chunk.start - win + 1)
            prelude_stop = min(chunk.start, prelude_start + win)

            interlude_start = prelude_stop
            interlude_stop = min(prelude_start + win, chunk.stop)

            for idx in range(prelude_start, prelude_stop):
                value = input_arr[idx]
                nfinite, result = put(value, nfinite, result)

            for idx in range(interlude_start, interlude_stop):
                value = input_arr[idx]
                nfinite, result = put(value, nfinite, result)
                output_arr[idx] = result_or_nan(nfinite, minp, result)

            for idx in range(interlude_stop, chunk.stop):
                put_value = input_arr[idx]
                pop_value = input_arr[idx - win]
                nfinite, result = put(put_value, nfinite, result)
                nfinite, result = pop(pop_value, nfinite, result)
                output_arr[idx] = result_or_nan(nfinite, minp, result)

        return pandas.Series(output_arr, input_series._index,
                             name=input_series._name)
    return impl

It's not the most elegant one, but it could give us some performance (due to elimination of condition in loop and extra counter). If it doesn't, your solution is preferable.

Also, I've changed order of put and pop (firstly put, then pop). It shouldn't affect sum, but could be useful for min and max - if we have added new min/max - we don't need to recalculate result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get visible result, but I like the code. So let me apply the patch.

"""Generate series rolling methods implementations based on pop/put funcs"""
def impl(self):
win = self._window
minp = self._min_periods

input_series = self._data
input_arr = input_series._data
length = len(input_arr)
output_arr = numpy.empty(length, dtype=float64)

chunks = parallel_chunks(length)
for i in prange(len(chunks)):
chunk = chunks[i]
nfinite = 0
result = init_result

prelude_start = max(0, chunk.start - win + 1)
prelude_stop = min(chunk.start, prelude_start + win)

interlude_start = prelude_stop
interlude_stop = min(prelude_start + win, chunk.stop)

for idx in range(prelude_start, prelude_stop):
value = input_arr[idx]
nfinite, result = put(value, nfinite, result)

for idx in range(interlude_start, interlude_stop):
value = input_arr[idx]
nfinite, result = put(value, nfinite, result)
output_arr[idx] = result_or_nan(nfinite, minp, result)

for idx in range(interlude_stop, chunk.stop):
put_value = input_arr[idx]
pop_value = input_arr[idx - win]
nfinite, result = put(put_value, nfinite, result)
nfinite, result = pop(pop_value, nfinite, result)
output_arr[idx] = result_or_nan(nfinite, minp, result)

return pandas.Series(output_arr, input_series._index,
name=input_series._name)
return impl


sdc_pandas_series_rolling_sum_impl = register_jitable(
gen_sdc_pandas_series_rolling_impl(pop_sum, put_sum, init_result=0.))


@sdc_rolling_overload(SeriesRollingType, 'apply')
def hpat_pandas_series_rolling_apply(self, func, raw=None):

Expand Down Expand Up @@ -619,13 +689,13 @@ def hpat_pandas_series_rolling_std(self, ddof=1):
return hpat_pandas_rolling_series_std_impl


@sdc_rolling_overload(SeriesRollingType, 'sum')
@sdc_overload_method(SeriesRollingType, 'sum')
def hpat_pandas_series_rolling_sum(self):

ty_checker = TypeChecker('Method rolling.sum().')
ty_checker.check(self, SeriesRollingType)

return hpat_pandas_rolling_series_sum_impl
return sdc_pandas_series_rolling_sum_impl


@sdc_rolling_overload(SeriesRollingType, 'var')
Expand Down
4 changes: 2 additions & 2 deletions sdc/tests/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,8 +847,8 @@ def test_impl(obj, window, min_periods):
hpat_func = self.jit(test_impl)
assert_equal = self._get_assert_equal(obj)

for window in range(0, len(obj) + 3, 2):
for min_periods in range(0, window + 1, 2):
for window in range(len(obj) + 2):
for min_periods in range(window):
with self.subTest(obj=obj, window=window,
min_periods=min_periods):
jit_result = hpat_func(obj, window, min_periods)
Expand Down
60 changes: 52 additions & 8 deletions sdc/tests/tests_perf/test_perf_series_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,29 @@
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# *****************************************************************************
import string

import time

import numba
import pandas
import numpy as np

from sdc.tests.test_utils import test_global_input_data_float64
from sdc.tests.tests_perf.test_perf_base import TestBase
from sdc.tests.tests_perf.test_perf_utils import (calc_compilation, get_times,
perf_data_gen_fixed_len)
from sdc.tests.tests_perf.test_perf_utils import perf_data_gen_fixed_len
from .generator import generate_test_cases
from .generator import TestCase as TC


rolling_usecase_tmpl = """
def series_rolling_{method_name}_usecase(data, {extra_usecase_params}):
start_time = time.time()
for i in range({ncalls}):
res = data.rolling({rolling_params}).{method_name}({method_params})
end_time = time.time()
return end_time - start_time, res
"""


def get_rolling_params(window=100, min_periods=None):
"""Generate supported rolling parameters"""
rolling_params = [f'{window}']
Expand All @@ -48,14 +56,37 @@ def get_rolling_params(window=100, min_periods=None):
return ', '.join(rolling_params)


def gen_series_rolling_usecase(method_name, rolling_params=None,
extra_usecase_params='',
method_params='', ncalls=1):
"""Generate series rolling method use case"""
if not rolling_params:
rolling_params = get_rolling_params()

func_text = rolling_usecase_tmpl.format(**{
'method_name': method_name,
'extra_usecase_params': extra_usecase_params,
'rolling_params': rolling_params,
'method_params': method_params,
'ncalls': ncalls
})

global_vars = {'np': np, 'time': time}
loc_vars = {}
exec(func_text, global_vars, loc_vars)
_series_rolling_usecase = loc_vars[f'series_rolling_{method_name}_usecase']

return _series_rolling_usecase


# python -m sdc.runtests sdc.tests.tests_perf.test_perf_series_rolling.TestSeriesRollingMethods
class TestSeriesRollingMethods(TestBase):
# more than 19 columns raise SystemError: CPUDispatcher() returned a result with an error set
max_columns_num = 19

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.map_ncalls_dlength = {
'sum': (100, [8 * 10 ** 5]),
}

def _test_case(self, pyfunc, name, total_data_length, data_num=1,
input_data=test_global_input_data_float64):
Expand All @@ -82,6 +113,20 @@ def _test_case(self, pyfunc, name, total_data_length, data_num=1,
self._test_jit(pyfunc, base, *args)
self._test_py(pyfunc, base, *args)

def _test_series_rolling_method(self, name, rolling_params=None,
extra_usecase_params='', method_params=''):
ncalls, total_data_length = self.map_ncalls_dlength[name]
usecase = gen_series_rolling_usecase(name, rolling_params=rolling_params,
extra_usecase_params=extra_usecase_params,
method_params=method_params, ncalls=ncalls)
data_num = 1
if extra_usecase_params:
data_num += len(extra_usecase_params.split(', '))
self._test_case(usecase, name, total_data_length, data_num=data_num)

def test_series_rolling_sum(self):
self._test_series_rolling_method('sum')


cases = [
TC(name='apply', size=[10 ** 7], params='func=lambda x: np.nan if len(x) == 0 else x.mean()'),
Expand All @@ -96,7 +141,6 @@ def _test_case(self, pyfunc, name, total_data_length, data_num=1,
TC(name='quantile', size=[10 ** 7], params='0.2'),
TC(name='skew', size=[10 ** 7]),
TC(name='std', size=[10 ** 7]),
TC(name='sum', size=[10 ** 7]),
TC(name='var', size=[10 ** 7]),
]

Expand Down
2 changes: 1 addition & 1 deletion sdc/utilities/prange_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import sdc

from typing import NamedTuple
from sdc.utilities.utils import sdc_overload, sdc_register_jitable
from sdc.utilities.utils import sdc_register_jitable


class Chunk(NamedTuple):
Expand Down