You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

90 lines
3.2 KiB

5 years ago
  1. //
  2. // pausableBuffered.swift
  3. // RxSwiftExt
  4. //
  5. // Created by Tanguy Helesbeux on 24/05/2017.
  6. // Copyright © 2017 RxSwift Community. All rights reserved.
  7. //
  8. import Foundation
  9. import RxSwift
  10. extension ObservableType {
  11. /**
  12. Pauses the elements of the source observable sequence based on the latest element from the second observable sequence.
  13. While paused, elements from the source are buffered, limited to a maximum number of element.
  14. When resumed, all bufered elements are flushed as single events in a contiguous stream.
  15. - seealso: [pausable operator on reactivex.io](http://reactivex.io/documentation/operators/backpressure.html)
  16. - parameter pauser: The observable sequence used to pause the source observable sequence.
  17. - parameter limit: The maximum number of element buffered. Pass `nil` to buffer all elements without limit. Default 1.
  18. - parameter flushOnCompleted: If `true` bufered elements will be flushed when the source completes. Default `true`.
  19. - parameter flushOnError: If `true` bufered elements will be flushed when the source errors. Default `true`.
  20. - returns: The observable sequence which is paused and resumed based upon the pauser observable sequence.
  21. */
  22. public func pausableBuffered<P: ObservableType> (_ pauser: P, limit: Int? = 1, flushOnCompleted: Bool = true, flushOnError: Bool = true) -> Observable<E> where P.E == Bool {
  23. return Observable<E>.create { observer in
  24. var buffer: [E] = []
  25. if let limit = limit {
  26. buffer.reserveCapacity(limit)
  27. }
  28. var paused = true
  29. let lock = NSRecursiveLock()
  30. let flush = {
  31. for value in buffer {
  32. observer.onNext(value)
  33. }
  34. buffer.removeAll(keepingCapacity: limit != nil)
  35. }
  36. let boundaryDisposable = pauser.subscribe { event in
  37. lock.lock(); defer { lock.unlock() }
  38. switch event {
  39. case .next(let resume):
  40. paused = !resume
  41. if resume && buffer.count > 0 {
  42. flush()
  43. }
  44. case .completed:
  45. observer.onCompleted()
  46. case .error(let error):
  47. observer.onError(error)
  48. }
  49. }
  50. let disposable = self.subscribe { event in
  51. lock.lock(); defer { lock.unlock() }
  52. switch event {
  53. case .next(let element):
  54. if paused {
  55. buffer.append(element)
  56. if let limit = limit, buffer.count > limit {
  57. buffer.remove(at: 0)
  58. }
  59. } else {
  60. observer.onNext(element)
  61. }
  62. case .completed:
  63. if flushOnCompleted { flush() }
  64. observer.onCompleted()
  65. case .error(let error):
  66. if flushOnError { flush() }
  67. observer.onError(error)
  68. }
  69. }
  70. return Disposables.create([disposable, boundaryDisposable])
  71. }
  72. }
  73. }