-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Mono.cache() forget its source when it terminates #3598
Comments
@yamass how about |
@OlegDokuka: Doesn't help. @Test
void testCache() {
Mono<Integer> mono = Mono.empty();
for (int i = 0; i < 1_000_000; i++) {
mono = mono.then(Mono.just(i))
.onTerminateDetach()
.cache();
mono.block();
}
System.out.println("Done");
} Still runs into an OutOfMemoryError. (with -Xmx100m). |
I fail to see, how this is a reactor-core library problem. You are essentially building a single linked list like data structure while only being interested in its head which you are accessing directly via a pointer. In my view, this is a conscious memory leak on the programmer's part. Why don't you do: @Test
public void name() {
Mono<Integer> mono = Mono.empty();
for (int i = 0; i < 5_000_000; i++) {
final Sinks.One<Integer> sink = Sinks.one();
final Mono<Integer> hotMono = (i % 2 == 0) ? Mono.empty() : Mono.just(i);
mono.then(hotMono).subscribe(sink::tryEmitValue, sink::tryEmitError, sink::tryEmitEmpty);
mono = sink.asMono();
System.out.println(mono.block());
}
System.out.println("Done");
} Or @Test
public void name() {
Mono<Integer> mono = Mono.empty();
for (int i = 0; i < 5_000_000; i++) {
final Sinks.One<Integer> sink = Sinks.one();
final Mono<Integer> hotMono = (i % 2 == 0) ? Mono.empty() : Mono.just(i);
mono.then(hotMono).subscribe(new HandcraftedSubscriber<>(sink));
mono = sink.asMono();
System.out.println(mono.block());
}
System.out.println("Done");
}
private static class HandcraftedSubscriber<T> extends BaseSubscriber<T> {
final Sinks.One<T> sink;
HandcraftedSubscriber(Sinks.One<T> sink) {
this.sink = sink;
}
@Override
protected void hookOnNext(T value) {
sink.tryEmitValue(value);
}
@Override
protected void hookOnError(Throwable throwable) {
sink.tryEmitError(throwable);
}
@Override
protected void hookOnComplete() {
sink.tryEmitEmpty();
}
}
|
Yes, I am essentially building up a linked list. Didn't I essentially say that myself? The reason why I regard this as problematic behavior of the library is because it is unnecessary. The library could (and in my opinion should) cap references to previous (terminated) monos, especially in the case of cache. I clearly described why we need this. Also, I clearly described how the cache() operator could cap its reference to its upstream source, which is what I would have expected in the first place. And finally, I also made clear that the It's all about being able to express solutions to problems in a way that is idiomatic to reactor. Your workarounds look everything else than idiomatic, imho. By the way, we have obviously also come up with some workaround (since we needed to) but it is neither very idiomatic. |
You are correct. It's probably because it would never occur to me that modifying reactive chain of |
@kimec The answer is no. @OlegDokuka Please, could you look into why it does not work with |
In our application, we use a chain of Monos that represent the fact a previous work has been completed. So every time some new work is done, the "previous work completed" Mono is replaced by a new one using
then().cache()
.However, doing this often enough will result in an OutOfMemoryError.
Example (Please run with
-Xmx100m
):This is due to the fact that the
MonoCacheTime
does not unset the reference to itssource
after it emits a terminal signal. Therefore, the whole chain of operators is kept in memory.Note that chaining using
map()
instead ofthen()
also might make sense for similar cases, when the new value depends on the old one.Expected Behavior
MonoCacheTime
should unset its reference to their source as soon as itssource
emits a terminal signal.Actual Behavior
See example above.
Possible Solution
Make
MonoCacheTime
unset its reference to itssource
as soon as itssource
emits a terminal signal.Your Environment
reactor-core-3.4.24
The text was updated successfully, but these errors were encountered: