Skip to content

Commit 81055d3

Browse files
committed
Start fewer but larger workers with LocalCluster
When we have a large number of available cores we should set the default number of threads per process to be higher than one. This implements a policy that aims for a number of processes equal to the square root of the number of cores, at least above a certain amount of cores. Partially addresses dask#2450
1 parent 4e38022 commit 81055d3

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

distributed/deploy/local.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import weakref
99
import toolz
1010

11+
from dask.utils import factors
1112
from tornado import gen
1213

1314
from .cluster import Cluster
@@ -99,8 +100,7 @@ def __init__(self, n_workers=None, threads_per_worker=None, processes=True,
99100
self._old_logging_level = silence_logging(level=silence_logs)
100101
if n_workers is None and threads_per_worker is None:
101102
if processes:
102-
n_workers = _ncores
103-
threads_per_worker = 1
103+
n_workers, threads_per_worker = nprocesses_nthreads(_ncores)
104104
else:
105105
n_workers = 1
106106
threads_per_worker = _ncores
@@ -372,6 +372,34 @@ def scheduler_address(self):
372372
return '<unstarted>'
373373

374374

375+
def nprocesses_nthreads(n):
376+
"""
377+
The default breakdown of processes and threads for a given number of cores
378+
379+
Parameters
380+
----------
381+
n: int
382+
Number of available cores
383+
384+
Examples
385+
--------
386+
>>> nprocesses_nthreads(4)
387+
(4, 1)
388+
>>> nprocesses_nthreads(32)
389+
(8, 4)
390+
391+
Returns
392+
-------
393+
nprocesses, nthreads
394+
"""
395+
if n <= 4:
396+
processes = n
397+
else:
398+
processes = min(f for f in factors(n) if f >= math.sqrt(n))
399+
threads = n // processes
400+
return (processes, threads)
401+
402+
375403
clusters_to_close = weakref.WeakSet()
376404

377405

distributed/deploy/tests/test_local.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import pytest
1515

1616
from distributed import Client, Worker, Nanny
17-
from distributed.deploy.local import LocalCluster
17+
from distributed.deploy.local import LocalCluster, nprocesses_nthreads
1818
from distributed.metrics import time
1919
from distributed.utils_test import (inc, gen_test, slowinc,
2020
assert_cannot_connect,
@@ -529,5 +529,18 @@ def test_local_tls_restart(loop):
529529
assert workers_before != workers_after
530530

531531

532+
def test_default_process_thread_breakdown():
533+
assert nprocesses_nthreads(1) == (1, 1)
534+
assert nprocesses_nthreads(4) == (4, 1)
535+
assert nprocesses_nthreads(5) == (5, 1)
536+
assert nprocesses_nthreads(8) == (4, 2)
537+
assert nprocesses_nthreads(12) in ((6, 2), (4, 3))
538+
assert nprocesses_nthreads(20) == (5, 4)
539+
assert nprocesses_nthreads(24) in ((6, 4), (8, 3))
540+
assert nprocesses_nthreads(32) == (8, 4)
541+
assert nprocesses_nthreads(40) in ((8, 5), (10, 4))
542+
assert nprocesses_nthreads(80) in ((10, 8), (16, 5))
543+
544+
532545
if sys.version_info >= (3, 5):
533546
from distributed.deploy.tests.py3_test_deploy import * # noqa F401

0 commit comments

Comments
 (0)