Skip to content

Provides more ByteBuf leaks fixes #803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 106 additions & 56 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import io.rsocket.util.MonoLifecycleHandler;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -260,6 +260,7 @@ public void doOnTerminal(
removeStreamReceiver(streamId);
}
});

receivers.put(streamId, receiver);

return receiver.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
Expand All @@ -281,7 +282,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {

final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
final AtomicBoolean payloadReleasedFlag = new AtomicBoolean(false);
final AtomicInteger wip = new AtomicInteger(0);

receivers.put(streamId, receiver);

Expand All @@ -293,30 +294,49 @@ private Flux<Payload> handleRequestStream(final Payload payload) {

@Override
public void accept(long n) {
if (firstRequest && !receiver.isDisposed()) {
if (firstRequest) {
firstRequest = false;
if (!payloadReleasedFlag.getAndSet(true)) {
sendProcessor.onNext(
RequestStreamFrameFlyweight.encodeReleasingPayload(
allocator, streamId, n, payload));
if (wip.getAndIncrement() != 0) {
// no need to do anything.
// stream was canceled and fist payload has already been discarded
return;
}
} else if (contains(streamId) && !receiver.isDisposed()) {
int missed = 1;
boolean firstHasBeenSent = false;
for (; ; ) {
if (!firstHasBeenSent) {
sendProcessor.onNext(
RequestStreamFrameFlyweight.encodeReleasingPayload(
allocator, streamId, n, payload));
firstHasBeenSent = true;
} else {
// if first frame was sent but we cycling again, it means that wip was
// incremented at doOnCancel
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
return;
}

missed = wip.addAndGet(-missed);
if (missed == 0) {
return;
}
}
} else if (!receiver.isDisposed()) {
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}
}
})
.doOnError(
t -> {
if (contains(streamId) && !receiver.isDisposed()) {
sendProcessor.onNext(ErrorFrameFlyweight.encode(allocator, streamId, t));
}
})
.doOnCancel(
() -> {
if (!payloadReleasedFlag.getAndSet(true)) {
payload.release();
if (wip.getAndIncrement() != 0) {
return;
}
if (contains(streamId) && !receiver.isDisposed()) {

// check if we need to release payload
// only applicable if the cancel appears earlier than actual request
if (payload.refCnt() > 0) {
payload.release();
} else {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
})
Expand All @@ -330,30 +350,32 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
return Flux.error(err);
}

return request.switchOnFirst(
(s, flux) -> {
Payload payload = s.get();
if (payload != null) {
if (!PayloadValidationUtils.isValid(mtu, payload)) {
payload.release();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
return Mono.error(t);
}
return handleChannel(payload, flux);
} else {
return flux;
}
},
false);
return request
.switchOnFirst(
(s, flux) -> {
Payload payload = s.get();
if (payload != null) {
if (!PayloadValidationUtils.isValid(mtu, payload)) {
payload.release();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
return Mono.error(t);
}
return handleChannel(payload, flux);
} else {
return flux;
}
},
false)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
}

private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Payload> inboundFlux) {
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
final AtomicBoolean payloadReleasedFlag = new AtomicBoolean(false);
final int streamId = streamIdSupplier.nextStreamId(receivers);

final AtomicInteger wip = new AtomicInteger(0);
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
final BaseSubscriber<Payload> upstreamSubscriber =
new BaseSubscriber<Payload>() {
Expand Down Expand Up @@ -421,19 +443,47 @@ protected void hookFinally(SignalType type) {
public void accept(long n) {
if (firstRequest) {
firstRequest = false;
senders.put(streamId, upstreamSubscriber);
receivers.put(streamId, receiver);

inboundFlux
.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(upstreamSubscriber);
if (!payloadReleasedFlag.getAndSet(true)) {
ByteBuf frame =
RequestChannelFrameFlyweight.encodeReleasingPayload(
allocator, streamId, false, n, initialPayload);

sendProcessor.onNext(frame);
if (wip.getAndIncrement() != 0) {
// no need to do anything.
// stream was canceled and fist payload has already been discarded
return;
}
int missed = 1;
boolean firstHasBeenSent = false;
for (; ; ) {
if (!firstHasBeenSent) {
ByteBuf frame;
try {
frame =
RequestChannelFrameFlyweight.encodeReleasingPayload(
allocator, streamId, false, n, initialPayload);
} catch (IllegalReferenceCountException e) {
return;
}

senders.put(streamId, upstreamSubscriber);
receivers.put(streamId, receiver);

inboundFlux
.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(upstreamSubscriber);

sendProcessor.onNext(frame);
firstHasBeenSent = true;
} else {
// if first frame was sent but we cycling again, it means that wip was
// incremented at doOnCancel
senders.remove(streamId, upstreamSubscriber);
receivers.remove(streamId, receiver);
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
return;
}

missed = wip.addAndGet(-missed);
if (missed == 0) {
return;
}
}
} else {
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
Expand All @@ -442,22 +492,22 @@ public void accept(long n) {
})
.doOnError(
t -> {
if (receivers.remove(streamId, receiver)) {
upstreamSubscriber.cancel();
}
upstreamSubscriber.cancel();
receivers.remove(streamId, receiver);
})
.doOnComplete(() -> receivers.remove(streamId, receiver))
.doOnCancel(
() -> {
if (!payloadReleasedFlag.getAndSet(true)) {
initialPayload.release();
upstreamSubscriber.cancel();
if (wip.getAndIncrement() != 0) {
return;
}

// need to send frame only if RequestChannelFrame was sent
if (receivers.remove(streamId, receiver)) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
upstreamSubscriber.cancel();
}
})
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
});
}

private Mono<Void> handleMetadataPush(Payload payload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,34 @@ static ByteBuf encode(
boolean hasMetadata,
ByteBuf data) {

final boolean addData = data != null && data.isReadable();
final boolean addMetadata = hasMetadata && metadata.isReadable();
final boolean addData;
if (data != null) {
if (data.isReadable()) {
addData = true;
} else {
// even though there is nothing to read, we still have to release here since nobody else
// going to do soo
data.release();
addData = false;
}
} else {
addData = false;
}

final boolean addMetadata;
if (hasMetadata) {
if (metadata.isReadable()) {
addMetadata = true;
} else {
// even though there is nothing to read, we still have to release here since nobody else
// going to do soo
metadata.release();
addMetadata = false;
}
} else {
// has no metadata means it is null, thus no need to release anything
addMetadata = false;
}

if (hasMetadata) {
int length = metadata.readableBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public static ByteBuf encode(
@Nullable final ByteBuf metadata) {

final boolean hasMetadata = metadata != null;
final boolean addMetadata = hasMetadata && metadata.isReadable();

int flags = 0;

Expand All @@ -27,6 +26,21 @@ public static ByteBuf encode(
.writeInt(ttl)
.writeInt(numRequests);

final boolean addMetadata;
if (hasMetadata) {
if (metadata.isReadable()) {
addMetadata = true;
} else {
// even though there is nothing to read, we still have to release here since nobody else
// going to do soo
metadata.release();
addMetadata = false;
}
} else {
// has no metadata means it is null, thus no need to release anything
addMetadata = false;
}

if (addMetadata) {
return allocator.compositeBuffer(2).addComponents(true, header, metadata);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.IllegalReferenceCountException;
import io.rsocket.Payload;

public class MetadataPushFrameFlyweight {

public static ByteBuf encodeReleasingPayload(ByteBufAllocator allocator, Payload payload) {
final ByteBuf metadata = payload.metadata().retain();
payload.release();
// releasing payload safely since it can be already released wheres we have to release retained
// data and metadata as well
try {
payload.release();
} catch (IllegalReferenceCountException e) {
metadata.release();
throw e;
}
return encode(allocator, metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.IllegalReferenceCountException;
import io.rsocket.Payload;

public class PayloadFrameFlyweight {
Expand All @@ -23,11 +24,31 @@ public static ByteBuf encodeNextCompleteReleasingPayload(
static ByteBuf encodeReleasingPayload(
ByteBufAllocator allocator, int streamId, boolean complete, Payload payload) {

final boolean hasMetadata = payload.hasMetadata();
// if refCnt exceptions throws here it is safe to do no-op
boolean hasMetadata = payload.hasMetadata();
// if refCnt exceptions throws here it is safe to do no-op still
final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null;
final ByteBuf data = payload.data().retain();

payload.release();
final ByteBuf data;
// retaining data safely. May throw either NPE or RefCntE
try {
data = payload.data().retain();
} catch (IllegalReferenceCountException | NullPointerException e) {
if (hasMetadata) {
metadata.release();
}
throw e;
}
// releasing payload safely since it can be already released wheres we have to release retained
// data and metadata as well
try {
payload.release();
} catch (IllegalReferenceCountException e) {
data.release();
if (hasMetadata) {
metadata.release();
}
throw e;
}

return encode(allocator, streamId, false, complete, true, metadata, data);
}
Expand Down
Loading