From 7ec5344ad3760211570cd57edb052768e54cf289 Mon Sep 17 00:00:00 2001 From: Joe Pemberton Date: Wed, 1 Oct 2025 13:47:36 -0700 Subject: [PATCH 1/4] Fix ObservableCache GC issue `ObservableCache` retains strong references to every event issued by its upstream source via its `head` field. These references remain reachable for the entire duration of the subscription, even if application code does not retain its own reference to the `ObservableCache` instance. This occurs because `ObservableCache` is currently the object that `implements Observer` and is subscribed to the upstream `source`, which means that the source _must_ retain a reference to the `ObservableCache` until the subscription is either disposed of or reaches a terminal state. As a result, every cached `T` value remains reachable for the duration of the subscription to `source`, even if the application no longer retains the ability to issue new subscriptions to the `ObservableCache` instance. This change fixes this issue by: 1. Splitting the `Observable` and `Observer` surfaces of `ObservableCache` into two distinct objects (`ObservableCache` which extends `Observable`, and `ObservableCache.Multicaster` which implements `Observer`) 2. Having only `ObservableCache` retain a reference to the `head` 3. Having only `ObservableCache.Multicaster` retain a reference to the current `tail` 4. Setting `Multicaster.tail` to `null` upon receipt of the upstream's terminal event With this change, when `ObservableCache` goes out of scope, the only remaining references to the nodes of the linked list are `Multicaster.tail` and `CacheDisposable.node`. As subscribed `CacheDisposable` instances advance through the linked list, its nodes progressively become reclaimable, and eventually become fully reclaimable once the terminal event has been issued to all subscribers. --- .../operators/observable/ObservableCache.java | 436 +++++++++--------- .../observable/ObservableCacheTest.java | 59 +++ .../rxjava3/testsupport/Reclaimable.java | 206 +++++++++ 3 files changed, 494 insertions(+), 207 deletions(-) create mode 100644 src/test/java/io/reactivex/rxjava3/testsupport/Reclaimable.java diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java index daa9edd533..115be47d39 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java @@ -24,23 +24,7 @@ * * @param the source element type */ -public final class ObservableCache extends AbstractObservableWithUpstream -implements Observer { - - /** - * The subscription to the source should happen at most once. - */ - final AtomicBoolean once; - - /** - * The number of items per cached nodes. - */ - final int capacityHint; - - /** - * The current known array of observer state to notify. - */ - final AtomicReference[]> observers; +public final class ObservableCache extends AbstractObservableWithUpstream { /** * A shared instance of an empty array of observers to avoid creating @@ -56,61 +40,49 @@ public final class ObservableCache extends AbstractObservableWithUpstream head; - - /** - * The current tail of the linked structure holding the items. - */ - Node tail; - - /** - * How many items have been put into the tail node so far. + * The subscription to the source should happen at most once. */ - int tailOffset; + final AtomicBoolean once; /** - * If {@link #observers} is {@link #TERMINATED}, this holds the terminal error if not null. + * Responsible caching events from the source and multicasting them to each downstream. */ - Throwable error; + final Multicaster multicaster; /** - * True if the source has terminated. + * The first node in a singly linked list. Each node has the capacity to hold a specific number of events, and each + * points exclusively to the next node (if present). When a new downstream arrives, the subscription is + * initialized with a reference to the "head" node, and any events present in the linked list are replayed. As + * events are replayed to the new downstream, its 'node' reference advances through the linked list, discarding each + * node reference once all events in that node have been replayed. Consequently, once {@code this} instance goes out + * of scope, the prefix of nodes up to the first node that is still being replayed becomes unreachable and eligible + * for collection. */ - volatile boolean done; + final Node head; /** * Constructs an empty, non-connected cache. * @param source the source to subscribe to for the first incoming observer * @param capacityHint the number of items expected (reduce allocation frequency) */ - @SuppressWarnings("unchecked") public ObservableCache(Observable source, int capacityHint) { super(source); - this.capacityHint = capacityHint; this.once = new AtomicBoolean(); Node n = new Node<>(capacityHint); this.head = n; - this.tail = n; - this.observers = new AtomicReference<>(EMPTY); + this.multicaster = new Multicaster<>(capacityHint, n); } @Override protected void subscribeActual(Observer t) { - CacheDisposable consumer = new CacheDisposable<>(t, this); + CacheDisposable consumer = new CacheDisposable<>(t, multicaster, head); t.onSubscribe(consumer); - add(consumer); + multicaster.add(consumer); if (!once.get() && once.compareAndSet(false, true)) { - source.subscribe(this); + source.subscribe(multicaster); } else { - replay(consumer); + multicaster.replay(consumer); } } @@ -127,7 +99,7 @@ protected void subscribeActual(Observer t) { * @return true if the cache has observers */ /* public */ boolean hasObservers() { - return observers.get().length != 0; + return multicaster.observers.get().length != 0; } /** @@ -135,194 +107,243 @@ protected void subscribeActual(Observer t) { * @return the number of currently cached event count */ /* public */ long cachedEventCount() { - return size; + return multicaster.size; } - /** - * Atomically adds the consumer to the {@link #observers} copy-on-write array - * if the source has not yet terminated. - * @param consumer the consumer to add - */ - void add(CacheDisposable consumer) { - for (;;) { - CacheDisposable[] current = observers.get(); - if (current == TERMINATED) { - return; - } - int n = current.length; + static final class Multicaster implements Observer { - @SuppressWarnings("unchecked") - CacheDisposable[] next = new CacheDisposable[n + 1]; - System.arraycopy(current, 0, next, 0, n); - next[n] = consumer; + /** + * The number of items per cached nodes. + */ + final int capacityHint; - if (observers.compareAndSet(current, next)) { - return; - } - } - } + /** + * The current known array of observer state to notify. + */ + final AtomicReference[]> observers; - /** - * Atomically removes the consumer from the {@link #observers} copy-on-write array. - * @param consumer the consumer to remove - */ - @SuppressWarnings("unchecked") - void remove(CacheDisposable consumer) { - for (;;) { - CacheDisposable[] current = observers.get(); - int n = current.length; - if (n == 0) { - return; - } + /** + * The total number of elements in the list available for reads. + */ + volatile long size; - int j = -1; - for (int i = 0; i < n; i++) { - if (current[i] == consumer) { - j = i; - break; - } - } + /** + * The current tail of the linked structure holding the items. + */ + Node tail; - if (j < 0) { - return; - } - CacheDisposable[] next; + /** + * How many items have been put into the tail node so far. + */ + int tailOffset; - if (n == 1) { - next = EMPTY; - } else { - next = new CacheDisposable[n - 1]; - System.arraycopy(current, 0, next, 0, j); - System.arraycopy(current, j + 1, next, j, n - j - 1); - } + /** + * If {@link #observers} is {@link #TERMINATED}, this holds the terminal error if not null. + */ + Throwable error; - if (observers.compareAndSet(current, next)) { - return; - } - } - } + /** + * True if the source has terminated. + */ + volatile boolean done; - /** - * Replays the contents of this cache to the given consumer based on its - * current state and number of items requested by it. - * @param consumer the consumer to continue replaying items to - */ - void replay(CacheDisposable consumer) { - // make sure there is only one replay going on at a time - if (consumer.getAndIncrement() != 0) { - return; + @SuppressWarnings("unchecked") + Multicaster(int capacityHint, final Node head) { + this.capacityHint = capacityHint; + this.tail = head; + this.observers = new AtomicReference<>(EMPTY); } - // see if there were more replay request in the meantime - int missed = 1; - // read out state into locals upfront to avoid being re-read due to volatile reads - long index = consumer.index; - int offset = consumer.offset; - Node node = consumer.node; - Observer downstream = consumer.downstream; - int capacity = capacityHint; - - for (;;) { - // if the consumer got disposed, clear the node and quit - if (consumer.disposed) { - consumer.node = null; - return; + /** + * Atomically adds the consumer to the {@link #observers} copy-on-write array + * if the source has not yet terminated. + * @param consumer the consumer to add + */ + void add(CacheDisposable consumer) { + for (;;) { + CacheDisposable[] current = observers.get(); + if (current == TERMINATED) { + return; + } + int n = current.length; + + @SuppressWarnings("unchecked") + CacheDisposable[] next = new CacheDisposable[n + 1]; + System.arraycopy(current, 0, next, 0, n); + next[n] = consumer; + + if (observers.compareAndSet(current, next)) { + return; + } } + } - // first see if the source has terminated, read order matters! - boolean sourceDone = done; - // and if the number of items is the same as this consumer has received - boolean empty = size == index; - - // if the source is done and we have all items so far, terminate the consumer - if (sourceDone && empty) { - // release the node object to avoid leaks through retained consumers - consumer.node = null; - // if error is not null then the source failed - Throwable ex = error; - if (ex != null) { - downstream.onError(ex); + /** + * Atomically removes the consumer from the {@link #observers} copy-on-write array. + * @param consumer the consumer to remove + */ + @SuppressWarnings("unchecked") + void remove(CacheDisposable consumer) { + for (;;) { + CacheDisposable[] current = observers.get(); + int n = current.length; + if (n == 0) { + return; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (current[i] == consumer) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + CacheDisposable[] next; + + if (n == 1) { + next = EMPTY; } else { - downstream.onComplete(); + next = new CacheDisposable[n - 1]; + System.arraycopy(current, 0, next, 0, j); + System.arraycopy(current, j + 1, next, j, n - j - 1); + } + + if (observers.compareAndSet(current, next)) { + return; } + } + } + + /** + * Replays the contents of this cache to the given consumer based on its + * current state and number of items requested by it. + * @param consumer the consumer to continue replaying items to + */ + void replay(CacheDisposable consumer) { + // make sure there is only one replay going on at a time + if (consumer.getAndIncrement() != 0) { return; } - // there are still items not sent to the consumer - if (!empty) { - // if the offset in the current node has reached the node capacity - if (offset == capacity) { - // switch to the subsequent node - node = node.next; - // reset the in-node offset - offset = 0; + // see if there were more replay request in the meantime + int missed = 1; + // read out state into locals upfront to avoid being re-read due to volatile reads + long index = consumer.index; + int offset = consumer.offset; + Node node = consumer.node; + Observer downstream = consumer.downstream; + int capacity = capacityHint; + + for (;;) { + // if the consumer got disposed, clear the node and quit + if (consumer.disposed) { + consumer.node = null; + return; } - // emit the cached item - downstream.onNext(node.values[offset]); - - // move the node offset forward - offset++; - // move the total consumed item count forward - index++; + // first see if the source has terminated, read order matters! + boolean sourceDone = done; + // and if the number of items is the same as this consumer has received + boolean empty = size == index; + + // if the source is done and we have all items so far, terminate the consumer + if (sourceDone && empty) { + // release the node object to avoid leaks through retained consumers + consumer.node = null; + // if error is not null then the source failed + Throwable ex = error; + if (ex != null) { + downstream.onError(ex); + } else { + downstream.onComplete(); + } + return; + } - // retry for the next item/terminal event if any - continue; - } + // there are still items not sent to the consumer + if (!empty) { + // if the offset in the current node has reached the node capacity + if (offset == capacity) { + // switch to the subsequent node + node = node.next; + // reset the in-node offset + offset = 0; + } + + // emit the cached item + downstream.onNext(node.values[offset]); + + // move the node offset forward + offset++; + // move the total consumed item count forward + index++; + + // retry for the next item/terminal event if any + continue; + } - // commit the changed references back - consumer.index = index; - consumer.offset = offset; - consumer.node = node; - // release the changes and see if there were more replay request in the meantime - missed = consumer.addAndGet(-missed); - if (missed == 0) { - break; + // commit the changed references back + consumer.index = index; + consumer.offset = offset; + consumer.node = node; + // release the changes and see if there were more replay request in the meantime + missed = consumer.addAndGet(-missed); + if (missed == 0) { + break; + } } } - } - @Override - public void onSubscribe(Disposable d) { - // we can't do much with the upstream disposable - } - - @Override - public void onNext(T t) { - int tailOffset = this.tailOffset; - // if the current tail node is full, create a fresh node - if (tailOffset == capacityHint) { - Node n = new Node<>(tailOffset); - n.values[0] = t; - this.tailOffset = 1; - tail.next = n; - tail = n; - } else { - tail.values[tailOffset] = t; - this.tailOffset = tailOffset + 1; + @Override + public void onSubscribe(Disposable d) { + // we can't do much with the upstream disposable } - size++; - for (CacheDisposable consumer : observers.get()) { - replay(consumer); + + @Override + public void onNext(T t) { + int tailOffset = this.tailOffset; + // if the current tail node is full, create a fresh node + if (tailOffset == capacityHint) { + Node n = new Node<>(tailOffset); + n.values[0] = t; + this.tailOffset = 1; + tail.next = n; + tail = n; + } else { + tail.values[tailOffset] = t; + this.tailOffset = tailOffset + 1; + } + size++; + for (CacheDisposable consumer : observers.get()) { + replay(consumer); + } } - } - @SuppressWarnings("unchecked") - @Override - public void onError(Throwable t) { - error = t; - done = true; - for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { - replay(consumer); + @SuppressWarnings("unchecked") + @Override + public void onError(Throwable t) { + error = t; + done = true; + // No additional events will arrive, so now we can clear the 'tail' reference + tail = null; + for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { + replay(consumer); + } } - } - @SuppressWarnings("unchecked") - @Override - public void onComplete() { - done = true; - for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { - replay(consumer); + @SuppressWarnings("unchecked") + @Override + public void onComplete() { + done = true; + // No additional events will arrive, so now we can clear the 'tail' reference + tail = null; + for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { + replay(consumer); + } } } @@ -338,7 +359,7 @@ static final class CacheDisposable extends AtomicInteger final Observer downstream; - final ObservableCache parent; + final Multicaster parent; Node node; @@ -353,11 +374,12 @@ static final class CacheDisposable extends AtomicInteger * the parent cache object. * @param downstream the actual consumer * @param parent the parent that holds onto the cached items + * @param head the first node in the linked list */ - CacheDisposable(Observer downstream, ObservableCache parent) { + CacheDisposable(Observer downstream, Multicaster parent, Node head) { this.downstream = downstream; this.parent = parent; - this.node = parent.head; + this.node = head; } @Override diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java index 7f47ec95d8..f3a4fb4edd 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java @@ -19,7 +19,10 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import io.reactivex.rxjava3.subjects.CompletableSubject; import org.junit.Test; import io.reactivex.rxjava3.core.*; @@ -355,4 +358,60 @@ public void addRemoveRace() { ); } } + + @Test + public void valuesAreReclaimable() { + for (int c = 1; c <= 32; c *= 2) { + for (int numValues : Arrays.asList(0, 1, c - 1, c, c + 1, c * 2 - 1, c * 2, c * 2 + 1)) { + + CompletableSubject termination = CompletableSubject.create(); + List integers = IntStream.range(0, numValues).boxed().collect(Collectors.toList()); + int lastNodeIndex = Math.max(0, ((numValues - 1) / c) * c); + + List> payloads = integers.stream() + .map(Payload::new) + .map(Reclaimable::of) + .collect(Collectors.toList()); + Reclaimable> cache = Reclaimable.of( + Observable.fromStream(payloads.stream().map(Reclaimable::remove)) + .concatWith(termination) + .cacheWithInitialCapacity(c)); + + TestObserver o = cache.remove().map(Payload::value).test(); + + Reclaimable.forceGC() + .assertReclaimed(cache) + .assertAllReclaimed(payloads.subList(0, lastNodeIndex)) + .assertAllUnreclaimed(payloads.subList(lastNodeIndex, numValues)); + + o.assertValueSequence(integers) + .assertNotComplete() + .assertNoErrors(); + + termination.onComplete(); + + o.assertValueSequence(integers) + .assertComplete(); + + Reclaimable.forceGC().assertAllReclaimed(payloads); + } + } + } + + static final class Payload { + private final int value; + + Payload(int value) { + this.value = value; + } + + int value() { + return value; + } + + @Override + public String toString() { + return "Payload(" + value + ")"; + } + } } diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/Reclaimable.java b/src/test/java/io/reactivex/rxjava3/testsupport/Reclaimable.java new file mode 100644 index 0000000000..3e00de468d --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/testsupport/Reclaimable.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.testsupport; + +import java.lang.ref.SoftReference; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + +import io.reactivex.rxjava3.annotations.NonNull; + +/** + * A test utility for verifying whether an object instance is reclaimable. + * + * @param the type of referent + */ +public final class Reclaimable { + + private static final ReclamationOps OPS = new ReclamationOps(); + + private final String name; + private R referent; + private SoftReference softReference; + + private Reclaimable(String name, R referent) { + this.name = name; + this.referent = referent; + } + + /** + * Constructs a {@code Reclaimable} instance with the specified {@code referent}. + * + * @param referent the object to be tested for reclaimability + * @param the type of referent + * @return the new {@code Reclaimable} instance + * @throws NullPointerException if {@code referent} is {@code null} + */ + @NonNull + public static Reclaimable of(@NonNull R referent) { + Objects.requireNonNull(referent, "referent is null"); + return of(referent.toString(), referent); + } + + /** + * Constructs a {@code Reclaimable} instance with the specified {@code name} and {@code referent}. + * + * @param name a human-readable name for the {@code referent}. Must not be {@code null}. + * @param referent the object to be tested for reclaimability + * @param the type of referent + * @return the new {@code Reclaimable} instance + * @throws NullPointerException if any of {@code name} or {@code referent} is {@code null} + */ + @NonNull + public static Reclaimable of(@NonNull String name, @NonNull R referent) { + Objects.requireNonNull(name, "name is null"); + Objects.requireNonNull(referent, "referent is null"); + return new Reclaimable<>(name, referent); + } + + /** + * Transitions the referent from a strong reference to a soft reference and returns the referent. This method must + * be invoked exactly once in order for the referent to become eligible for garbage collection. + * + * @return the non-{@code null} referent. + * @throws IllegalStateException if the referent was already removed + */ + @NonNull + public R remove() { + R r = referent; + if (r == null) { + throw new IllegalStateException("referent was already removed"); + } + this.softReference = new SoftReference<>(r); + this.referent = null; + return r; + } + + /** + * Tests whether the referent associated with this {@code Reclaimable} instance has been successfully reclaimed by + * the garbage collector. This method always returns {@code false} if {@link #remove()} has not yet been + * invoked. + * + * @return {@code true} if the referent has been reclaimed + */ + public boolean isReclaimed() { + return softReference != null && softReference.get() == null; + } + + boolean isRemoved() { + return softReference != null; + } + + /** + * Fills the heap, forcing the collection of any softly-reachable objects. + *

+ * Taken from the javadoc of {@link SoftReference}: + *

+ * All soft references to softly-reachable objects are guaranteed to have been cleared before the virtual machine + * throws an OutOfMemoryError. + *
+ * + * @return a set of fluent operators that can be used to verify the reclamation status of {@code Reclaimable} instances + */ + @NonNull + public static ReclamationOps forceGC() { + try { + List list = new LinkedList<>(); + for (;;) { + // fill the heap, 8 Gb at a time + list.add(new long[1024 * 1024 * 1024]); + } + } catch (OutOfMemoryError ex) { + // softly-reachable objects are now guaranteed to be collected + return OPS; + } + } + + @Override + public String toString() { + return "Reclaimable(name=" + name + ", referent=" + referent + ")"; + } + + /** + * A set of fluent operators that can be used to verify the reclamation status of {@code Reclaimable} instances. + */ + public static final class ReclamationOps { + /** + * Verifies that the referent held in {@code reclaimable} has been successfully reclaimed by the garbage + * collector. + * + * @param reclaimable the {@code Reclaimable} whose {@code referent} is expected to have been reclaimed + * @return this + */ + @NonNull + public ReclamationOps assertReclaimed(@NonNull Reclaimable reclaimable) { + Objects.requireNonNull(reclaimable, "reclaimable is null"); + if (!reclaimable.isRemoved()) { + throw new IllegalStateException("referent has not been removed: " + reclaimable); + } + if (!reclaimable.isReclaimed()) { + throw new AssertionError("expected referent to be reclaimed: " + reclaimable); + } + return this; + } + + /** + * Verifies that the referents held in the {@code reclaimables} have been successfully reclaimed by the garbage + * collector. + * + * @param reclaimables the {@code Reclaimable} instances whose referents are expected to have been reclaimed + * @return this + */ + @NonNull + public ReclamationOps assertAllReclaimed(@NonNull Iterable> reclaimables) { + Objects.requireNonNull(reclaimables, "reclaimables is null"); + for (Reclaimable reclaimable : reclaimables) { + assertReclaimed(reclaimable); + } + return this; + } + + /** + * Verifies that the referent held in {@code reclaimable} has not been reclaimed by the garbage + * collector. + * + * @param reclaimable the {@code Reclaimable} whose {@code referent} is expected to not have been reclaimed + * @return this + */ + @NonNull + public ReclamationOps assertUnreclaimed(@NonNull Reclaimable reclaimable) { + Objects.requireNonNull(reclaimable, "reclaimable is null"); + reclaimable.isRemoved(); + if (reclaimable.isReclaimed()) { + throw new AssertionError("expected referent to NOT be reclaimed: " + reclaimable); + } + return this; + } + + /** + * Verifies that the referents held in the {@code reclaimables} have not been reclaimed by the garbage + * collector. + * + * @param reclaimables the {@code Reclaimable} instances whose referents are expected to not have been reclaimed + * @return this + */ + @NonNull + public ReclamationOps assertAllUnreclaimed(@NonNull Iterable> reclaimables) { + Objects.requireNonNull(reclaimables, "reclaimables is null"); + for (Reclaimable reclaimable : reclaimables) { + assertUnreclaimed(reclaimable); + } + return this; + } + } +} From a8e638ac0a8b3c9f18fd89c2e956b2eb55bd8c13 Mon Sep 17 00:00:00 2001 From: Joe Pemberton Date: Thu, 2 Oct 2025 10:29:03 -0700 Subject: [PATCH 2/4] Rewrite unit test to simply inflate heap --- .../observable/ObservableCacheTest.java | 67 +++--- .../rxjava3/testsupport/Reclaimable.java | 206 ------------------ 2 files changed, 41 insertions(+), 232 deletions(-) delete mode 100644 src/test/java/io/reactivex/rxjava3/testsupport/Reclaimable.java diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java index f3a4fb4edd..6ca11284d9 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java @@ -16,13 +16,19 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; +import io.reactivex.rxjava3.observables.ConnectableObservable; import io.reactivex.rxjava3.subjects.CompletableSubject; +import org.junit.Assert; import org.junit.Test; import io.reactivex.rxjava3.core.*; @@ -360,41 +366,50 @@ public void addRemoveRace() { } @Test - public void valuesAreReclaimable() { - for (int c = 1; c <= 32; c *= 2) { - for (int numValues : Arrays.asList(0, 1, c - 1, c, c + 1, c * 2 - 1, c * 2, c * 2 + 1)) { + public void valuesAreReclaimable() throws Exception { + ConnectableObservable source = + Observable.range(0, 200) + .map($ -> new byte[1024 * 1024]) + .publish(); - CompletableSubject termination = CompletableSubject.create(); - List integers = IntStream.range(0, numValues).boxed().collect(Collectors.toList()); - int lastNodeIndex = Math.max(0, ((numValues - 1) / c) * c); + System.out.println("Bounded Replay Leak check: Wait before GC"); + Thread.sleep(1000); - List> payloads = integers.stream() - .map(Payload::new) - .map(Reclaimable::of) - .collect(Collectors.toList()); - Reclaimable> cache = Reclaimable.of( - Observable.fromStream(payloads.stream().map(Reclaimable::remove)) - .concatWith(termination) - .cacheWithInitialCapacity(c)); + System.out.println("Bounded Replay Leak check: GC"); + System.gc(); - TestObserver o = cache.remove().map(Payload::value).test(); + Thread.sleep(500); - Reclaimable.forceGC() - .assertReclaimed(cache) - .assertAllReclaimed(payloads.subList(0, lastNodeIndex)) - .assertAllUnreclaimed(payloads.subList(lastNodeIndex, numValues)); + final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); - o.assertValueSequence(integers) - .assertNotComplete() - .assertNoErrors(); + System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0); - termination.onComplete(); + final AtomicLong after = new AtomicLong(); - o.assertValueSequence(integers) - .assertComplete(); + source.cache().lastElement().subscribe(new Consumer() { + @Override + public void accept(byte[] v) throws Exception { + System.out.println("Bounded Replay Leak check: Wait before GC 2"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC 2"); + System.gc(); - Reclaimable.forceGC().assertAllReclaimed(payloads); + Thread.sleep(500); + + after.set(memoryMXBean.getHeapMemoryUsage().getUsed()); } + }); + + source.connect(); + + System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0); + + if (initial + 100 * 1024 * 1024 < after.get()) { + Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0) + + " -> " + after.get() / 1024.0 / 1024.0); } } diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/Reclaimable.java b/src/test/java/io/reactivex/rxjava3/testsupport/Reclaimable.java deleted file mode 100644 index 3e00de468d..0000000000 --- a/src/test/java/io/reactivex/rxjava3/testsupport/Reclaimable.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.rxjava3.testsupport; - -import java.lang.ref.SoftReference; -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; - -import io.reactivex.rxjava3.annotations.NonNull; - -/** - * A test utility for verifying whether an object instance is reclaimable. - * - * @param the type of referent - */ -public final class Reclaimable { - - private static final ReclamationOps OPS = new ReclamationOps(); - - private final String name; - private R referent; - private SoftReference softReference; - - private Reclaimable(String name, R referent) { - this.name = name; - this.referent = referent; - } - - /** - * Constructs a {@code Reclaimable} instance with the specified {@code referent}. - * - * @param referent the object to be tested for reclaimability - * @param the type of referent - * @return the new {@code Reclaimable} instance - * @throws NullPointerException if {@code referent} is {@code null} - */ - @NonNull - public static Reclaimable of(@NonNull R referent) { - Objects.requireNonNull(referent, "referent is null"); - return of(referent.toString(), referent); - } - - /** - * Constructs a {@code Reclaimable} instance with the specified {@code name} and {@code referent}. - * - * @param name a human-readable name for the {@code referent}. Must not be {@code null}. - * @param referent the object to be tested for reclaimability - * @param the type of referent - * @return the new {@code Reclaimable} instance - * @throws NullPointerException if any of {@code name} or {@code referent} is {@code null} - */ - @NonNull - public static Reclaimable of(@NonNull String name, @NonNull R referent) { - Objects.requireNonNull(name, "name is null"); - Objects.requireNonNull(referent, "referent is null"); - return new Reclaimable<>(name, referent); - } - - /** - * Transitions the referent from a strong reference to a soft reference and returns the referent. This method must - * be invoked exactly once in order for the referent to become eligible for garbage collection. - * - * @return the non-{@code null} referent. - * @throws IllegalStateException if the referent was already removed - */ - @NonNull - public R remove() { - R r = referent; - if (r == null) { - throw new IllegalStateException("referent was already removed"); - } - this.softReference = new SoftReference<>(r); - this.referent = null; - return r; - } - - /** - * Tests whether the referent associated with this {@code Reclaimable} instance has been successfully reclaimed by - * the garbage collector. This method always returns {@code false} if {@link #remove()} has not yet been - * invoked. - * - * @return {@code true} if the referent has been reclaimed - */ - public boolean isReclaimed() { - return softReference != null && softReference.get() == null; - } - - boolean isRemoved() { - return softReference != null; - } - - /** - * Fills the heap, forcing the collection of any softly-reachable objects. - *

- * Taken from the javadoc of {@link SoftReference}: - *

- * All soft references to softly-reachable objects are guaranteed to have been cleared before the virtual machine - * throws an OutOfMemoryError. - *
- * - * @return a set of fluent operators that can be used to verify the reclamation status of {@code Reclaimable} instances - */ - @NonNull - public static ReclamationOps forceGC() { - try { - List list = new LinkedList<>(); - for (;;) { - // fill the heap, 8 Gb at a time - list.add(new long[1024 * 1024 * 1024]); - } - } catch (OutOfMemoryError ex) { - // softly-reachable objects are now guaranteed to be collected - return OPS; - } - } - - @Override - public String toString() { - return "Reclaimable(name=" + name + ", referent=" + referent + ")"; - } - - /** - * A set of fluent operators that can be used to verify the reclamation status of {@code Reclaimable} instances. - */ - public static final class ReclamationOps { - /** - * Verifies that the referent held in {@code reclaimable} has been successfully reclaimed by the garbage - * collector. - * - * @param reclaimable the {@code Reclaimable} whose {@code referent} is expected to have been reclaimed - * @return this - */ - @NonNull - public ReclamationOps assertReclaimed(@NonNull Reclaimable reclaimable) { - Objects.requireNonNull(reclaimable, "reclaimable is null"); - if (!reclaimable.isRemoved()) { - throw new IllegalStateException("referent has not been removed: " + reclaimable); - } - if (!reclaimable.isReclaimed()) { - throw new AssertionError("expected referent to be reclaimed: " + reclaimable); - } - return this; - } - - /** - * Verifies that the referents held in the {@code reclaimables} have been successfully reclaimed by the garbage - * collector. - * - * @param reclaimables the {@code Reclaimable} instances whose referents are expected to have been reclaimed - * @return this - */ - @NonNull - public ReclamationOps assertAllReclaimed(@NonNull Iterable> reclaimables) { - Objects.requireNonNull(reclaimables, "reclaimables is null"); - for (Reclaimable reclaimable : reclaimables) { - assertReclaimed(reclaimable); - } - return this; - } - - /** - * Verifies that the referent held in {@code reclaimable} has not been reclaimed by the garbage - * collector. - * - * @param reclaimable the {@code Reclaimable} whose {@code referent} is expected to not have been reclaimed - * @return this - */ - @NonNull - public ReclamationOps assertUnreclaimed(@NonNull Reclaimable reclaimable) { - Objects.requireNonNull(reclaimable, "reclaimable is null"); - reclaimable.isRemoved(); - if (reclaimable.isReclaimed()) { - throw new AssertionError("expected referent to NOT be reclaimed: " + reclaimable); - } - return this; - } - - /** - * Verifies that the referents held in the {@code reclaimables} have not been reclaimed by the garbage - * collector. - * - * @param reclaimables the {@code Reclaimable} instances whose referents are expected to not have been reclaimed - * @return this - */ - @NonNull - public ReclamationOps assertAllUnreclaimed(@NonNull Iterable> reclaimables) { - Objects.requireNonNull(reclaimables, "reclaimables is null"); - for (Reclaimable reclaimable : reclaimables) { - assertUnreclaimed(reclaimable); - } - return this; - } - } -} From 95db46c511ebfa6e95555b418e26b125fb9471d7 Mon Sep 17 00:00:00 2001 From: Joe Pemberton Date: Thu, 2 Oct 2025 10:30:25 -0700 Subject: [PATCH 3/4] Clean up imports --- .../internal/operators/observable/ObservableCacheTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java index 6ca11284d9..c45bd48d35 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java @@ -23,12 +23,8 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import io.reactivex.rxjava3.observables.ConnectableObservable; -import io.reactivex.rxjava3.subjects.CompletableSubject; -import org.junit.Assert; import org.junit.Test; import io.reactivex.rxjava3.core.*; @@ -408,7 +404,7 @@ public void accept(byte[] v) throws Exception { System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0); if (initial + 100 * 1024 * 1024 < after.get()) { - Assert.fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0) + fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0) + " -> " + after.get() / 1024.0 / 1024.0); } } From b9b2316d6a6adf7b1a8276c1651c6d0b98c8caf5 Mon Sep 17 00:00:00 2001 From: Joe Pemberton Date: Thu, 2 Oct 2025 10:43:08 -0700 Subject: [PATCH 4/4] Inline AtomicReference, delete unused class --- .../operators/observable/ObservableCache.java | 33 ++++++++----------- .../observable/ObservableCacheTest.java | 17 ---------- 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java index 115be47d39..d2e74aefb8 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java @@ -99,7 +99,7 @@ protected void subscribeActual(Observer t) { * @return true if the cache has observers */ /* public */ boolean hasObservers() { - return multicaster.observers.get().length != 0; + return multicaster.get().length != 0; } /** @@ -110,18 +110,13 @@ protected void subscribeActual(Observer t) { return multicaster.size; } - static final class Multicaster implements Observer { + static final class Multicaster extends AtomicReference[]> implements Observer { /** * The number of items per cached nodes. */ final int capacityHint; - /** - * The current known array of observer state to notify. - */ - final AtomicReference[]> observers; - /** * The total number of elements in the list available for reads. */ @@ -138,7 +133,7 @@ static final class Multicaster implements Observer { int tailOffset; /** - * If {@link #observers} is {@link #TERMINATED}, this holds the terminal error if not null. + * If the observers are {@link #TERMINATED}, this holds the terminal error if not null. */ Throwable error; @@ -149,19 +144,19 @@ static final class Multicaster implements Observer { @SuppressWarnings("unchecked") Multicaster(int capacityHint, final Node head) { - this.capacityHint = capacityHint; + super(EMPTY); this.tail = head; - this.observers = new AtomicReference<>(EMPTY); + this.capacityHint = capacityHint; } /** - * Atomically adds the consumer to the {@link #observers} copy-on-write array + * Atomically adds the consumer to the observers copy-on-write array * if the source has not yet terminated. * @param consumer the consumer to add */ void add(CacheDisposable consumer) { for (;;) { - CacheDisposable[] current = observers.get(); + CacheDisposable[] current = get(); if (current == TERMINATED) { return; } @@ -172,20 +167,20 @@ void add(CacheDisposable consumer) { System.arraycopy(current, 0, next, 0, n); next[n] = consumer; - if (observers.compareAndSet(current, next)) { + if (compareAndSet(current, next)) { return; } } } /** - * Atomically removes the consumer from the {@link #observers} copy-on-write array. + * Atomically removes the consumer from the observers copy-on-write array. * @param consumer the consumer to remove */ @SuppressWarnings("unchecked") void remove(CacheDisposable consumer) { for (;;) { - CacheDisposable[] current = observers.get(); + CacheDisposable[] current = get(); int n = current.length; if (n == 0) { return; @@ -212,7 +207,7 @@ void remove(CacheDisposable consumer) { System.arraycopy(current, j + 1, next, j, n - j - 1); } - if (observers.compareAndSet(current, next)) { + if (compareAndSet(current, next)) { return; } } @@ -318,7 +313,7 @@ public void onNext(T t) { this.tailOffset = tailOffset + 1; } size++; - for (CacheDisposable consumer : observers.get()) { + for (CacheDisposable consumer : get()) { replay(consumer); } } @@ -330,7 +325,7 @@ public void onError(Throwable t) { done = true; // No additional events will arrive, so now we can clear the 'tail' reference tail = null; - for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { + for (CacheDisposable consumer : getAndSet(TERMINATED)) { replay(consumer); } } @@ -341,7 +336,7 @@ public void onComplete() { done = true; // No additional events will arrive, so now we can clear the 'tail' reference tail = null; - for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { + for (CacheDisposable consumer : getAndSet(TERMINATED)) { replay(consumer); } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java index c45bd48d35..74d17c062b 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java @@ -408,21 +408,4 @@ public void accept(byte[] v) throws Exception { + " -> " + after.get() / 1024.0 / 1024.0); } } - - static final class Payload { - private final int value; - - Payload(int value) { - this.value = value; - } - - int value() { - return value; - } - - @Override - public String toString() { - return "Payload(" + value + ")"; - } - } }