Skip to content

Introduce CHANGE_POINT ... BY ... syntax #128177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ private static Operator operator(String data, int topCount) {
topCount,
elementTypes,
encoders,
List.of(),
IntStream.range(0, count).mapToObj(c -> new TopNOperator.SortOrder(c, false, false)).toList(),
16 * 1024
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.IntStream;

/**
* A page is a column-oriented data abstraction that allows data to be passed between operators in
Expand Down Expand Up @@ -311,4 +312,9 @@ public Page filter(int... positions) {
}
return new Page(filteredBlocks);
}

public Page subPage(int fromIndex, int toIndex) {
// TODO: optimize!
return filter(IntStream.range(fromIndex, toIndex).toArray());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Find spikes, dips and change point in a list of values.
Expand All @@ -35,20 +38,23 @@ public class ChangePointOperator implements Operator {

public static final int INPUT_VALUE_COUNT_LIMIT = 1000;

public record Factory(int channel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
public record Factory(int metricChannel, List<Integer> partitionChannel, String sourceText, int sourceLine, int sourceColumn)
implements
OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new ChangePointOperator(driverContext, channel, sourceText, sourceLine, sourceColumn);
return new ChangePointOperator(driverContext, metricChannel, partitionChannel, sourceText, sourceLine, sourceColumn);
}

@Override
public String describe() {
return "ChangePointOperator[channel=" + channel + "]";
return ChangePointOperator.describe(metricChannel, partitionChannel);
}
}

private final DriverContext driverContext;
private final int channel;
private final int metricChannel;
private final List<Integer> partitionChannel;
private final String sourceText;
private final int sourceLine;
private final int sourceColumn;
Expand All @@ -60,9 +66,17 @@ public String describe() {

// TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
// (by modularizing esql-core) and use that instead of the individual fields.
public ChangePointOperator(DriverContext driverContext, int channel, String sourceText, int sourceLine, int sourceColumn) {
public ChangePointOperator(
DriverContext driverContext,
int metricChannel,
List<Integer> partitionChannel,
String sourceText,
int sourceLine,
int sourceColumn
) {
this.driverContext = driverContext;
this.channel = channel;
this.metricChannel = metricChannel;
this.partitionChannel = partitionChannel;
this.sourceText = sourceText;
this.sourceLine = sourceLine;
this.sourceColumn = sourceColumn;
Expand Down Expand Up @@ -105,61 +119,141 @@ public Page getOutput() {
}

private void createOutputPages() {
int valuesCount = 0;
for (Page page : inputPages) {
valuesCount += page.getPositionCount();
}
boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT;
if (tooManyValues) {
valuesCount = INPUT_VALUE_COUNT_LIMIT;
int maxValuesCount = 0;
{
int valuesCount = 0;
String lastPartitionFieldValue = null;
for (Page inputPage : inputPages) {
String currentPartitionFieldValue = getPartitionKey(inputPage, 0);
if (lastPartitionFieldValue != null) {
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
valuesCount = 0;
}
}
lastPartitionFieldValue = currentPartitionFieldValue;
valuesCount += inputPage.getPositionCount();
maxValuesCount = Math.max(maxValuesCount, valuesCount);
}
}
boolean tooManyValues = maxValuesCount > INPUT_VALUE_COUNT_LIMIT;

List<Double> values = new ArrayList<>(valuesCount);
List<Integer> bucketIndexes = new ArrayList<>(valuesCount);
int valuesIndex = 0;
List<MlAggsHelper.DoubleBucketValues> bucketValuesPerPartition = new ArrayList<>();
boolean hasNulls = false;
boolean hasMultivalued = false;
for (Page inputPage : inputPages) {
Block inputBlock = inputPage.getBlock(channel);
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < valuesCount; i++) {
Object value = BlockUtils.toJavaObject(inputBlock, i);
if (value == null) {
hasNulls = true;
valuesIndex++;
} else if (value instanceof List<?>) {
hasMultivalued = true;
valuesIndex++;
} else {
values.add(((Number) value).doubleValue());
bucketIndexes.add(valuesIndex++);
{
List<Double> values = new ArrayList<>(maxValuesCount);
List<Integer> bucketIndexes = new ArrayList<>(maxValuesCount);
int valuesIndex = 0;
String lastPartitionFieldValue = null;
for (Page inputPage : inputPages) {
String currentPartitionFieldValue = getPartitionKey(inputPage, 0);
if (lastPartitionFieldValue != null) {
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
null,
values.stream().mapToDouble(Double::doubleValue).toArray(),
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
);
bucketValuesPerPartition.add(bucketValues);

values = new ArrayList<>(maxValuesCount);
bucketIndexes = new ArrayList<>(maxValuesCount);
valuesIndex = 0;
}
}
lastPartitionFieldValue = currentPartitionFieldValue;
Block inputBlock = inputPage.getBlock(metricChannel);
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < maxValuesCount; i++, valuesIndex++) {
Object value = BlockUtils.toJavaObject(inputBlock, i);
if (value == null) {
hasNulls = true;
} else if (value instanceof List<?>) {
hasMultivalued = true;
} else {
values.add(((Number) value).doubleValue());
bucketIndexes.add(valuesIndex);
}
}
}
// Handle last partition separately
// if (lastPartitionFieldValue != null) {
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
null,
values.stream().mapToDouble(Double::doubleValue).toArray(),
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
);
bucketValuesPerPartition.add(bucketValues);
// }
}

List<ChangeType> changeTypes = new ArrayList<>();
{
for (MlAggsHelper.DoubleBucketValues bucketValues : bucketValuesPerPartition) {
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
}
changeTypes.add(changeType);
}
}

MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
null,
values.stream().mapToDouble(Double::doubleValue).toArray(),
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
);
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
int changePointIndex = changeType.changePoint();
insertChangePoints(changeTypes);

if (tooManyValues) {
warnings(true).registerException(
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
);
}
if (hasNulls) {
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
}
if (hasMultivalued) {
warnings(true).registerException(
new IllegalArgumentException(
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
)
);
}
}

private void insertChangePoints(Iterable<ChangeType> changeTypes) {
Iterator<ChangeType> changeTypesIterator = changeTypes.iterator();
ChangeType changeType = null;
if (changeTypesIterator.hasNext()) {
changeType = changeTypesIterator.next();
}
BlockFactory blockFactory = driverContext.blockFactory();
int pageStartIndex = 0;
String lastPartitionFieldValue = null;
while (inputPages.isEmpty() == false) {
Page inputPage = inputPages.peek();
Page outputPage;
Block changeTypeBlock = null;
Block changePvalueBlock = null;
boolean success = false;

String currentPartitionFieldValue = getPartitionKey(inputPage, 0);
if (lastPartitionFieldValue != null) {
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
pageStartIndex = 0;
if (changeTypesIterator.hasNext()) {
changeType = changeTypesIterator.next();
}
}
}
lastPartitionFieldValue = currentPartitionFieldValue;

try {
if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) {
// TODO: How to handle case when there are no change points
if (changeType != null
&& pageStartIndex <= changeType.changePoint()
&& changeType.changePoint() < pageStartIndex + inputPage.getPositionCount()) {
try (
BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount());
DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount())
) {
for (int i = 0; i < inputPage.getPositionCount(); i++) {
if (pageStartIndex + i == changePointIndex) {
if (pageStartIndex + i == changeType.changePoint()) {
changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName()));
pvalueBlockBuilder.appendDouble(changeType.pValue());
} else {
Expand All @@ -174,8 +268,10 @@ private void createOutputPages() {
changeTypeBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
changePvalueBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
}

outputPage = inputPage.appendBlocks(new Block[] { changeTypeBlock, changePvalueBlock });
if (pageStartIndex + inputPage.getPositionCount() > INPUT_VALUE_COUNT_LIMIT) {
outputPage = outputPage.subPage(0, INPUT_VALUE_COUNT_LIMIT - pageStartIndex);
}
success = true;
} finally {
if (success == false) {
Expand All @@ -187,25 +283,28 @@ private void createOutputPages() {
outputPages.add(outputPage);
pageStartIndex += inputPage.getPositionCount();
}
}

if (changeType instanceof ChangeType.Indeterminable indeterminable) {
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
/**
* Calculates the partition key of the i-th row of the given page.
*
* @param page page for which the partition key should be calculated
* @param i row index
* @return partition key of the i-th row of the given page
*/
private String getPartitionKey(Page page, int i) {
if (partitionChannel.isEmpty()) {
return "";
}
if (tooManyValues) {
warnings(true).registerException(
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
);
}
if (hasNulls) {
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
}
if (hasMultivalued) {
warnings(true).registerException(
new IllegalArgumentException(
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
)
);
assert page.getPositionCount() > 0;
StringBuilder builder = new StringBuilder();
for (Integer partitionChannel : partitionChannel) {
try (var block = page.getBlock(partitionChannel).filter(i)) {
BytesRef partitionFieldValue = ((BytesRefBlock) block).getBytesRef(i, new BytesRef());
builder.append(partitionFieldValue.utf8ToString());
}
}
return builder.toString();
}

@Override
Expand All @@ -220,7 +319,15 @@ public void close() {

@Override
public String toString() {
return "ChangePointOperator[channel=" + channel + "]";
return describe(metricChannel, partitionChannel);
}

private static String describe(int metricChannel, List<Integer> partitionChannel) {
return "ChangePointOperator[metricChannel="
+ metricChannel
+ ", partitionChannels="
+ partitionChannel.stream().map(c -> c.toString()).collect(Collectors.joining(",", "[", "]"))
+ "]";
}

private Warnings warnings(boolean onlyWarnings) {
Expand Down
Loading