diff --git a/docs/changelog/129245.yaml b/docs/changelog/129245.yaml new file mode 100644 index 0000000000000..1a05e4340b4b3 --- /dev/null +++ b/docs/changelog/129245.yaml @@ -0,0 +1,5 @@ +pr: 129245 +summary: Throttle indexing when disk IO throttling is disabled +area: Engine +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 78a9695bea540..898fe078ef40c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -244,8 +244,8 @@ private void checkMergeTaskThrottling() { // both currently running and enqueued merge tasks are considered "active" for throttling purposes int activeMerges = (int) (submittedMergesCount - doneMergesCount); if (activeMerges > configuredMaxMergeCount - // only throttle indexing if disk IO is un-throttled, and we still can't keep up with the merge load - && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec() + // only throttle indexing if disk IO is un-throttled (if enabled), and we still can't keep up with the merge load + && (config.isAutoThrottle() == false || threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec()) && shouldThrottleIncomingMerges.get() == false) { // maybe enable merge task throttling synchronized (shouldThrottleIncomingMerges) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 156dcf581ec9c..92148a661ceab 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -196,7 +196,15 @@ public void testSimpleMergeTaskReEnqueueingBySize() { } } - public void testIndexingThrottlingWhenSubmittingMerges() { + public void testIndexingThrottlingWhenSubmittingMergesWithDiskIOThrottlingEnabled() { + testIndexingThrottlingWhenSubmittingMerges(true); + } + + public void testIndexingThrottlingWhenSubmittingMergesWithDiskIOThrottlingDisabled() { + testIndexingThrottlingWhenSubmittingMerges(false); + } + + private void testIndexingThrottlingWhenSubmittingMerges(boolean withDiskIOThrottlingEnabled) { final int maxThreadCount = randomIntBetween(1, 5); // settings validation requires maxMergeCount >= maxThreadCount final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5); @@ -209,6 +217,7 @@ public void testIndexingThrottlingWhenSubmittingMerges() { Settings mergeSchedulerSettings = Settings.builder() .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount) .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount) + .put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), withDiskIOThrottlingEnabled) .build(); TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), @@ -224,12 +233,12 @@ public void testIndexingThrottlingWhenSubmittingMerges() { while (submittedMerges < mergesToSubmit - 1) { isUsingMaxTargetIORate.set(randomBoolean()); if (submittedMergeTasks.isEmpty() == false && randomBoolean()) { - // maybe schedule one submitted merge + // maybe schedule one of the submitted merges (but still it's not run) MergeTask mergeTask = randomFrom(submittedMergeTasks); submittedMergeTasks.remove(mergeTask); mergeTask.schedule(); } else { - // submit one merge + // submit one new merge MergeSource mergeSource = mock(MergeSource.class); OneMerge oneMerge = mock(OneMerge.class); when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); @@ -237,7 +246,7 @@ public void testIndexingThrottlingWhenSubmittingMerges() { when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); submittedMerges++; - if (isUsingMaxTargetIORate.get() && submittedMerges > maxMergeCount) { + if ((isUsingMaxTargetIORate.get() || withDiskIOThrottlingEnabled == false) && submittedMerges > maxMergeCount) { expectIndexThrottling = true; } else if (submittedMerges <= maxMergeCount) { expectIndexThrottling = false; @@ -246,15 +255,20 @@ public void testIndexingThrottlingWhenSubmittingMerges() { // assert IO throttle state assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling)); } - // submit one last merge when IO throttling is at max value - isUsingMaxTargetIORate.set(true); + if (withDiskIOThrottlingEnabled) { + // submit one last merge when IO throttling is at max value + isUsingMaxTargetIORate.set(true); + } else { + // but if disk IO throttling is not enabled, indexing throttling should still be triggered + isUsingMaxTargetIORate.set(randomBoolean()); + } MergeSource mergeSource = mock(MergeSource.class); OneMerge oneMerge = mock(OneMerge.class); when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); - // assert index throttling because IO throttling is at max value + // assert indexing throttling state because IO throttling is at max value OR disk IO throttling is disabled assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(true)); }