Skip to content

Commit c8bff00

Browse files
authored
Subscription: retain tsfile events in tsfile batch to avoid premature commit (#15598) (#15601)
Consider the historical data export snapshot scenario: 1. The events delivered upstream are, in order, the tsfile event and the termination event. 2. The tsfile event is parsed into multiple tablet events, and then the reference count of the tsfile event is set to 0 (should report as false). 3. Assuming that for some reason the tablet events were not sent to the peer in time, the reference count of the transfer termination event is set to 0 (should report as true). 4. At this point, because the tablet events were not enriched with a commit id (see Subscription: fully managed tsfile parsing process for tsfile format topic #15524), the termination event successfully marks the corresponding DR complete, which in turn leads to data loss.
1 parent c92b625 commit c8bff00

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,22 @@ public SubscriptionPipeTsFileEventBatch(
5757
@Override
5858
public synchronized void ack() {
5959
batch.decreaseEventsReferenceCount(this.getClass().getName(), true);
60+
enrichedEvents.stream()
61+
// only decrease reference count for tsfile event, since we already decrease reference count
62+
// for tablet event in batch
63+
.filter(event -> event instanceof PipeTsFileInsertionEvent)
64+
.forEach(event -> event.decreaseReferenceCount(this.getClass().getName(), true));
6065
}
6166

6267
@Override
6368
public synchronized void cleanUp(final boolean force) {
6469
// close batch, it includes clearing the reference count of events
6570
batch.close();
71+
72+
// clear the reference count of events
73+
for (final EnrichedEvent enrichedEvent : enrichedEvents) {
74+
enrichedEvent.clearReferenceCount(this.getClass().getName());
75+
}
6676
enrichedEvents.clear();
6777
}
6878

@@ -102,7 +112,6 @@ protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
102112
} finally {
103113
try {
104114
event.close();
105-
((PipeTsFileInsertionEvent) event).decreaseReferenceCount(this.getClass().getName(), false);
106115
} catch (final Exception ignored) {
107116
// no exceptions will be thrown
108117
}

0 commit comments

Comments
 (0)