Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions Documentation/SwiftConcurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,39 @@ stream.asObservable()
)
```

#### Creating an `Observable` with async/await

Use `Observable.create` with an async closure when you need to emit multiple values from asynchronous operations:

```swift
let observable = Observable<String>.create { observer in
// Fetch data from multiple async sources
let firstBatch = try await fetchDataFromAPI()
observer(firstBatch)

let secondBatch = try await fetchMoreDataFromAPI()
observer(secondBatch)

// Observable automatically completes when the async closure finishes
// If any await throws, an error event is emitted
}
```

#### Creating an `Infallible` with async/await

Use `Infallible.create` for async operations that cannot fail:

```swift
let infallible = Infallible<Int>.create { observer in
// Generate values asynchronously without throwing
for i in 1...5 {
await Task.sleep(nanoseconds: 1_000_000_000) // 1 second delay
observer(i * 10)
}
// Infallible automatically completes when the async closure finishes
}
```

### Wrapping an `async` result as a `Single`

If you already have an async piece of work that returns a single result you wish to await, you can bridge it back to the Rx world by using `Single.create`, a special overload which takes an `async throws` closure where you can simply await your async work:
Expand Down
46 changes: 46 additions & 0 deletions RxSwift/Observable+Concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,52 @@ import Foundation

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public extension ObservableConvertibleType {

typealias ElementObserver<Element> = (Element) -> Void

/**
Creates an `Observable` from the result of an asynchronous operation
that emits elements via a provided observer closure.

- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)

- parameter work: An `async` closure that takes an `ElementObserver` (a closure used to emit elements),
and may call it multiple times to emit values.
When the closure finishes, a `.completed` event is automatically emitted.
If the closure throws, an `.error` event will be emitted instead.

- returns: An `Observable` sequence of the element type emitted by the `work` closure.
*/
@_disfavoredOverload
static func create(
detached: Bool = false,
priority: TaskPriority? = nil,
work: @Sendable @escaping (_ observer: ElementObserver<Element>) async throws -> Void
) -> Observable<Element> {
.create { rawObserver in
let operation: () async throws -> Void = {
do {
let observer: ElementObserver<Element> = { element in
guard !Task.isCancelled else { return }
rawObserver.onNext(element)
}
try await work(observer)
rawObserver.onCompleted()
} catch {
rawObserver.onError(error)
}
}

let task = if detached {
Task.detached(priority: priority, operation: operation)
} else {
Task(priority: priority, operation: operation)
}

return Disposables.create { task.cancel() }
}
}

/// Allows iterating over the values of an Observable
/// asynchronously via Swift's concurrency features (`async/await`)
///
Expand Down
39 changes: 39 additions & 0 deletions RxSwift/Traits/Infallible/Infallible+Concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,45 @@
// MARK: - Infallible
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public extension InfallibleType {

/**
Creates an `Infallible` from the result of an asynchronous operation
that emits elements via a provided observer closure.

- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)

- parameter work: An `async` closure that takes an `ElementObserver` (a closure used to emit elements),
and may call it multiple times to emit values.
When the closure finishes, a `.completed` event is automatically emitted.

- returns: An `Infallible` sequence of the element type emitted by the `work` closure.
*/
@_disfavoredOverload
static func create(
detached: Bool = false,
priority: TaskPriority? = nil,
work: @Sendable @escaping (_ observer: ElementObserver<Element>) async -> Void
) -> Infallible<Element> {
.create { rawObserver in
let operation: () async -> Void = {
let observer: ElementObserver<Element> = { element in
guard !Task.isCancelled else { return }
rawObserver(.next(element))
}
await work(observer)
rawObserver(.completed)
}

let task = if detached {
Task.detached(priority: priority, operation: operation)
} else {
Task(priority: priority, operation: operation)
}

return Disposables.create { task.cancel() }
}
}

/// Allows iterating over the values of an Infallible
/// asynchronously via Swift's concurrency features (`async/await`)
///
Expand Down
18 changes: 18 additions & 0 deletions Tests/RxSwiftTests/Infallible+ConcurrencyTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,23 @@ extension InfallibleConcurrencyTests {

XCTAssertEqual(values, Array(1...10))
}

func testCreateInfalliableFromAsync() async throws {
var expectedValues = [Int]()
let randomValue: () async -> Int = {
let value = Int.random(in: 100...100000)
expectedValues.append(value)
return value
}

let infallible = Infallible<Int>.create { observer in
for _ in 1...10 {
observer(await randomValue())
}
}

let values = try infallible.toBlocking().toArray()
XCTAssertEqual(values, expectedValues)
}
}
#endif
19 changes: 19 additions & 0 deletions Tests/RxSwiftTests/Observable+ConcurrencyTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,25 @@ class ObservableConcurrencyTests: RxTest {

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension ObservableConcurrencyTests {

func testCreateObservableFromAsync() async throws {
var expectedValues = [Int]()
let randomValue: () async throws -> Int = {
let value = Int.random(in: 100...100000)
expectedValues.append(value)
return value
}

let infallible = Infallible<Int>.create { observer in
for _ in 1...10 {
observer(try await randomValue())
}
}

let values = try infallible.toBlocking().toArray()
XCTAssertEqual(values, expectedValues)
}

func testAwaitsValuesAndFinishes() async {
let observable = Observable
.from(1...10)
Expand Down