Skip to content

Commit 755e47c

Browse files
committed
Introduce CHANGE_POINT ... BY ... syntax
1 parent 696716c commit 755e47c

File tree

36 files changed

+3058
-1992
lines changed

36 files changed

+3058
-1992
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/TopNBenchmark.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ private static Operator operator(String data, int topCount) {
125125
topCount,
126126
elementTypes,
127127
encoders,
128+
List.of(),
128129
IntStream.range(0, count).mapToObj(c -> new TopNOperator.SortOrder(c, false, false)).toList(),
129130
16 * 1024
130131
);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.io.IOException;
1717
import java.util.Arrays;
1818
import java.util.Objects;
19+
import java.util.stream.IntStream;
1920

2021
/**
2122
* A page is a column-oriented data abstraction that allows data to be passed between operators in
@@ -311,4 +312,9 @@ public Page filter(int... positions) {
311312
}
312313
return new Page(filteredBlocks);
313314
}
315+
316+
public Page subPage(int fromIndex, int toIndex) {
317+
// TODO: optimize!
318+
return filter(IntStream.range(fromIndex, toIndex).toArray());
319+
}
314320
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ChangePointOperator.java

Lines changed: 147 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import java.util.ArrayDeque;
2323
import java.util.ArrayList;
2424
import java.util.Deque;
25+
import java.util.Iterator;
2526
import java.util.List;
27+
import java.util.Objects;
28+
import java.util.stream.Collectors;
2629

2730
/**
2831
* Find spikes, dips and change point in a list of values.
@@ -35,20 +38,21 @@ public class ChangePointOperator implements Operator {
3538

3639
public static final int INPUT_VALUE_COUNT_LIMIT = 1000;
3740

38-
public record Factory(int channel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
41+
public record Factory(int metricChannel, List<Integer> partitionChannel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
3942
@Override
4043
public Operator get(DriverContext driverContext) {
41-
return new ChangePointOperator(driverContext, channel, sourceText, sourceLine, sourceColumn);
44+
return new ChangePointOperator(driverContext, metricChannel, partitionChannel, sourceText, sourceLine, sourceColumn);
4245
}
4346

4447
@Override
4548
public String describe() {
46-
return "ChangePointOperator[channel=" + channel + "]";
49+
return ChangePointOperator.describe(metricChannel, partitionChannel);
4750
}
4851
}
4952

5053
private final DriverContext driverContext;
51-
private final int channel;
54+
private final int metricChannel;
55+
private final List<Integer> partitionChannel;
5256
private final String sourceText;
5357
private final int sourceLine;
5458
private final int sourceColumn;
@@ -60,9 +64,10 @@ public String describe() {
6064

6165
// TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
6266
// (by modularizing esql-core) and use that instead of the individual fields.
63-
public ChangePointOperator(DriverContext driverContext, int channel, String sourceText, int sourceLine, int sourceColumn) {
67+
public ChangePointOperator(DriverContext driverContext, int metricChannel, List<Integer> partitionChannel, String sourceText, int sourceLine, int sourceColumn) {
6468
this.driverContext = driverContext;
65-
this.channel = channel;
69+
this.metricChannel = metricChannel;
70+
this.partitionChannel = partitionChannel;
6671
this.sourceText = sourceText;
6772
this.sourceLine = sourceLine;
6873
this.sourceColumn = sourceColumn;
@@ -105,61 +110,140 @@ public Page getOutput() {
105110
}
106111

107112
private void createOutputPages() {
108-
int valuesCount = 0;
109-
for (Page page : inputPages) {
110-
valuesCount += page.getPositionCount();
111-
}
112-
boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT;
113-
if (tooManyValues) {
114-
valuesCount = INPUT_VALUE_COUNT_LIMIT;
113+
int maxValuesCount = 0;
114+
{
115+
int valuesCount = 0;
116+
String lastPartitionFieldValue = null;
117+
for (Page inputPage : inputPages) {
118+
String currentPartitionFieldValue = getCurrentPartitionKey(inputPage, 0);
119+
if (lastPartitionFieldValue != null) {
120+
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
121+
valuesCount = 0;
122+
}
123+
}
124+
lastPartitionFieldValue = currentPartitionFieldValue;
125+
valuesCount += inputPage.getPositionCount();
126+
maxValuesCount = Math.max(maxValuesCount, valuesCount);
127+
}
115128
}
129+
boolean tooManyValues = maxValuesCount > INPUT_VALUE_COUNT_LIMIT;
116130

117-
List<Double> values = new ArrayList<>(valuesCount);
118-
List<Integer> bucketIndexes = new ArrayList<>(valuesCount);
119-
int valuesIndex = 0;
131+
132+
List<MlAggsHelper.DoubleBucketValues> bucketValuesPerPartition = new ArrayList<>();
120133
boolean hasNulls = false;
121134
boolean hasMultivalued = false;
122-
for (Page inputPage : inputPages) {
123-
Block inputBlock = inputPage.getBlock(channel);
124-
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < valuesCount; i++) {
125-
Object value = BlockUtils.toJavaObject(inputBlock, i);
126-
if (value == null) {
127-
hasNulls = true;
128-
valuesIndex++;
129-
} else if (value instanceof List<?>) {
130-
hasMultivalued = true;
131-
valuesIndex++;
132-
} else {
133-
values.add(((Number) value).doubleValue());
134-
bucketIndexes.add(valuesIndex++);
135+
{
136+
List<Double> values = new ArrayList<>(maxValuesCount);
137+
List<Integer> bucketIndexes = new ArrayList<>(maxValuesCount);
138+
int valuesIndex = 0;
139+
String lastPartitionFieldValue = null;
140+
for (Page inputPage : inputPages) {
141+
String currentPartitionFieldValue = getCurrentPartitionKey(inputPage, 0);
142+
if (lastPartitionFieldValue != null) {
143+
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
144+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
145+
null,
146+
values.stream().mapToDouble(Double::doubleValue).toArray(),
147+
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
148+
);
149+
bucketValuesPerPartition.add(bucketValues);
150+
151+
values = new ArrayList<>(maxValuesCount);
152+
bucketIndexes = new ArrayList<>(maxValuesCount);
153+
valuesIndex = 0;
154+
}
155+
}
156+
lastPartitionFieldValue = currentPartitionFieldValue;
157+
Block inputBlock = inputPage.getBlock(metricChannel);
158+
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < maxValuesCount; i++, valuesIndex++) {
159+
Object value = BlockUtils.toJavaObject(inputBlock, i);
160+
if (value == null) {
161+
hasNulls = true;
162+
} else if (value instanceof List<?>) {
163+
hasMultivalued = true;
164+
} else {
165+
values.add(((Number) value).doubleValue());
166+
bucketIndexes.add(valuesIndex);
167+
}
168+
}
169+
}
170+
// Handle last partition separately
171+
// if (lastPartitionFieldValue != null) {
172+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
173+
null,
174+
values.stream().mapToDouble(Double::doubleValue).toArray(),
175+
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
176+
);
177+
bucketValuesPerPartition.add(bucketValues);
178+
// }
179+
}
180+
181+
List<ChangeType> changeTypes = new ArrayList<>();
182+
{
183+
for (MlAggsHelper.DoubleBucketValues bucketValues : bucketValuesPerPartition) {
184+
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
185+
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
186+
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
135187
}
188+
changeTypes.add(changeType);
136189
}
137190
}
138191

139-
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
140-
null,
141-
values.stream().mapToDouble(Double::doubleValue).toArray(),
142-
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
143-
);
144-
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
145-
int changePointIndex = changeType.changePoint();
192+
insertChangePoints(changeTypes);
193+
194+
if (tooManyValues) {
195+
warnings(true).registerException(
196+
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
197+
);
198+
}
199+
if (hasNulls) {
200+
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
201+
}
202+
if (hasMultivalued) {
203+
warnings(true).registerException(
204+
new IllegalArgumentException(
205+
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206+
)
207+
);
208+
}
209+
}
146210

211+
private void insertChangePoints(Iterable<ChangeType> changeTypes) {
212+
Iterator<ChangeType> changeTypesIterator = changeTypes.iterator();
213+
ChangeType changeType = null;
214+
if (changeTypesIterator.hasNext()) {
215+
changeType = changeTypesIterator.next();
216+
}
147217
BlockFactory blockFactory = driverContext.blockFactory();
148218
int pageStartIndex = 0;
219+
String lastPartitionFieldValue = null;
149220
while (inputPages.isEmpty() == false) {
150221
Page inputPage = inputPages.peek();
151222
Page outputPage;
152223
Block changeTypeBlock = null;
153224
Block changePvalueBlock = null;
154225
boolean success = false;
226+
227+
String currentPartitionFieldValue = getCurrentPartitionKey(inputPage, 0);
228+
if (lastPartitionFieldValue != null) {
229+
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
230+
pageStartIndex = 0;
231+
if (changeTypesIterator.hasNext()) {
232+
changeType = changeTypesIterator.next();
233+
}
234+
}
235+
}
236+
lastPartitionFieldValue = currentPartitionFieldValue;
237+
155238
try {
156-
if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) {
239+
// TODO: How to handle case when there are no change points
240+
if (changeType != null && pageStartIndex <= changeType.changePoint() && changeType.changePoint() < pageStartIndex + inputPage.getPositionCount()) {
157241
try (
158242
BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount());
159243
DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount())
160244
) {
161245
for (int i = 0; i < inputPage.getPositionCount(); i++) {
162-
if (pageStartIndex + i == changePointIndex) {
246+
if (pageStartIndex + i == changeType.changePoint()) {
163247
changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName()));
164248
pvalueBlockBuilder.appendDouble(changeType.pValue());
165249
} else {
@@ -174,8 +258,10 @@ private void createOutputPages() {
174258
changeTypeBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
175259
changePvalueBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
176260
}
177-
178-
outputPage = inputPage.appendBlocks(new Block[] { changeTypeBlock, changePvalueBlock });
261+
outputPage = inputPage.appendBlocks(new Block[]{changeTypeBlock, changePvalueBlock});
262+
if (pageStartIndex + inputPage.getPositionCount() > INPUT_VALUE_COUNT_LIMIT) {
263+
outputPage = outputPage.subPage(0, INPUT_VALUE_COUNT_LIMIT - pageStartIndex);
264+
}
179265
success = true;
180266
} finally {
181267
if (success == false) {
@@ -187,25 +273,21 @@ private void createOutputPages() {
187273
outputPages.add(outputPage);
188274
pageStartIndex += inputPage.getPositionCount();
189275
}
276+
}
190277

191-
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
192-
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
193-
}
194-
if (tooManyValues) {
195-
warnings(true).registerException(
196-
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
197-
);
198-
}
199-
if (hasNulls) {
200-
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
278+
private String getCurrentPartitionKey(Page page, int i) {
279+
if (partitionChannel.isEmpty()) {
280+
return "-default-partition-";
201281
}
202-
if (hasMultivalued) {
203-
warnings(true).registerException(
204-
new IllegalArgumentException(
205-
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206-
)
207-
);
282+
assert page.getPositionCount() > 0;
283+
StringBuilder builder = new StringBuilder();
284+
for (Integer partitionChannel : partitionChannel) {
285+
try (var block = page.getBlock(partitionChannel).filter(i)) {
286+
BytesRef partitionFieldValue = ((BytesRefBlock) block).getBytesRef(i, new BytesRef());
287+
builder.append(partitionFieldValue.utf8ToString());
288+
}
208289
}
290+
return builder.toString();
209291
}
210292

211293
@Override
@@ -220,7 +302,15 @@ public void close() {
220302

221303
@Override
222304
public String toString() {
223-
return "ChangePointOperator[channel=" + channel + "]";
305+
return describe(metricChannel, partitionChannel);
306+
}
307+
308+
private static String describe(int metricChannel, List<Integer> partitionChannel) {
309+
return "ChangePointOperator[metricChannel="
310+
+ metricChannel
311+
+ ", partitionChannels="
312+
+ partitionChannel.stream().map(c -> c.toString()).collect(Collectors.joining(",", "[", "]"))
313+
+ "]";
224314
}
225315

226316
private Warnings warnings(boolean onlyWarnings) {

0 commit comments

Comments
 (0)