You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

107 lines
4.3 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. //
  2. // cascade.swift
  3. // RxSwiftExt
  4. //
  5. // Created by Florent Pillet on 17/04/16.
  6. // Copyright © 2016 RxSwift Community. All rights reserved.
  7. //
  8. import Foundation
  9. import RxSwift
  10. extension Observable where Element: ObservableType {
  11. /**
  12. Cascade through a sequence of observables: every observable that sends a `next` value becomes the "current"
  13. observable (like in `switchLatest`), and the subscription to all previous observables in the sequence is disposed.
  14. This allows subscribing to multiple observable sequences while irrevocably switching to the next when it starts emitting. If any of the
  15. currently subscribed-to sequences errors, the error is propagated to the observer and the sequence terminates.
  16. - parameter observables: a sequence of observables which will all be immediately subscribed to
  17. - returns: An observable sequence that contains elements from the latest observable sequence that emitted elements
  18. */
  19. public static func cascade<S: Sequence>(_ observables: S) -> Observable<Element.Element> where S.Element == Element {
  20. let flow = Array(observables)
  21. if flow.isEmpty {
  22. return Observable<Element.Element>.empty()
  23. }
  24. return Observable<Element.Element>.create { observer in
  25. var current = 0, initialized = false
  26. var subscriptions: [SerialDisposable?] = flow.map { _ in SerialDisposable() }
  27. let lock = NSRecursiveLock()
  28. lock.lock()
  29. defer { lock.unlock() }
  30. for i in 0 ..< flow.count {
  31. let index = i
  32. let disposable = flow[index].subscribe { event in
  33. lock.lock()
  34. defer { lock.unlock() }
  35. switch event {
  36. case .next(let element):
  37. while current < index {
  38. subscriptions[current]?.dispose()
  39. subscriptions[current] = nil
  40. current += 1
  41. }
  42. if index == current {
  43. assert(subscriptions[index] != nil)
  44. observer.onNext(element)
  45. }
  46. case .completed:
  47. if index >= current {
  48. if initialized {
  49. subscriptions[index]?.dispose()
  50. subscriptions[index] = nil
  51. for next in current ..< subscriptions.count where subscriptions[next] != nil {
  52. return
  53. }
  54. observer.onCompleted()
  55. }
  56. }
  57. case .error(let error):
  58. observer.onError(error)
  59. }
  60. }
  61. if let serialDisposable = subscriptions[index] {
  62. serialDisposable.disposable = disposable
  63. } else {
  64. disposable.dispose()
  65. }
  66. }
  67. initialized = true
  68. for i in 0 ..< flow.count where subscriptions[i] != nil {
  69. return Disposables.create {
  70. subscriptions.forEach { $0?.dispose() }
  71. }
  72. }
  73. observer.onCompleted()
  74. return Disposables.create()
  75. }
  76. }
  77. }
  78. extension ObservableType {
  79. /**
  80. Cascade through a sequence of observables: every observable that sends a `next` value becomes the "current"
  81. observable (like in `switchLatest`), and the subscription to all previous observables in the sequence is disposed.
  82. This allows subscribing to multiple observable sequences while irrevocably switching to the next when it starts emitting. If any of the
  83. currently subscribed-to sequences errors, the error is propagated to the observer and the sequence terminates.
  84. - parameter observables: a sequence of observables which will all be immediately subscribed to
  85. - returns: An observable sequence that contains elements from the latest observable sequence that emitted elements
  86. */
  87. public func cascade<S: Sequence>(_ next: S) -> Observable<Element> where S.Element == Self {
  88. return Observable.cascade([self.asObservable()] + Array(next).map { $0.asObservable() })
  89. }
  90. }