@@ -27,11 +27,12 @@ import io.reactivex.Observable
27
27
import javax.inject.Inject
28
28
import kotlin.properties.ReadWriteProperty
29
29
import kotlin.reflect.KProperty
30
+ import kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi
30
31
import kotlinx.coroutines.channels.BufferOverflow
31
- import kotlinx.coroutines.flow.Flow
32
+ import kotlinx.coroutines.flow.FlowCollector
32
33
import kotlinx.coroutines.flow.MutableSharedFlow
33
34
import kotlinx.coroutines.flow.MutableStateFlow
34
- import kotlinx.coroutines.flow.filterNotNull
35
+ import kotlinx.coroutines.flow.SharedFlow
35
36
import kotlinx.coroutines.rx2.asObservable
36
37
37
38
/* *
@@ -48,8 +49,7 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
48
49
private val useStateFlow
49
50
get() = RibEvents .useStateFlowInteractorEvent
50
51
51
- @VisibleForTesting
52
- internal val _lifecycleFlow : MutableSharedFlow <InteractorEvent ?> =
52
+ private val _lifecycleFlow : MutableSharedFlow <InteractorEvent ?> =
53
53
if (useStateFlow) {
54
54
MutableStateFlow (null )
55
55
} else {
@@ -59,8 +59,7 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
59
59
BufferOverflow .DROP_OLDEST ,
60
60
)
61
61
}
62
- public open val lifecycleFlow: Flow <InteractorEvent >
63
- get() = _lifecycleFlow .filterNotNull()
62
+ public open val lifecycleFlow: SharedFlow <InteractorEvent > = NonNullSharedFlow (_lifecycleFlow )
64
63
65
64
@Volatile private var _lifecycleObservable : Observable <InteractorEvent >? = null
66
65
@@ -86,16 +85,16 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
86
85
final override fun correspondingEvents (): CorrespondingEventsFunction <InteractorEvent > =
87
86
LIFECYCLE_MAP_FUNCTION
88
87
89
- final override fun peekLifecycle (): InteractorEvent ? = _lifecycleFlow .replayCache.lastOrNull()
88
+ final override fun peekLifecycle (): InteractorEvent ? = lifecycleFlow .replayCache.lastOrNull()
90
89
91
90
@OptIn(CoreFriendModuleApi ::class )
92
91
final override fun requestScope (): CompletableSource =
93
- _lifecycleFlow .asScopeCompletable(lifecycleRange)
92
+ lifecycleFlow .asScopeCompletable(lifecycleRange)
94
93
95
94
// ---- InteractorType overrides ---- //
96
95
97
96
override fun isAttached (): Boolean =
98
- _lifecycleFlow .replayCache.lastOrNull() == InteractorEvent .ACTIVE
97
+ lifecycleFlow .replayCache.lastOrNull() == InteractorEvent .ACTIVE
99
98
100
99
override fun handleBackPress (): Boolean = false
101
100
@@ -233,3 +232,20 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
233
232
}
234
233
}
235
234
}
235
+
236
+ // See https://github.com/Kotlin/kotlinx.coroutines/issues/2514
237
+ @OptIn(ExperimentalForInheritanceCoroutinesApi ::class )
238
+ private class NonNullSharedFlow <R : Any >(
239
+ private val upstream : SharedFlow <R ?>,
240
+ ) : SharedFlow<R> {
241
+ override val replayCache: List <R >
242
+ get() = upstream.replayCache.filterNotNull()
243
+
244
+ override suspend fun collect (collector : FlowCollector <R >): Nothing {
245
+ upstream.collect { value ->
246
+ if (value != null ) {
247
+ collector.emit(value)
248
+ }
249
+ }
250
+ }
251
+ }
0 commit comments