You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

95 lines
3.5 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. //
  2. // pausableBuffered.swift
  3. // RxSwiftExt
  4. //
  5. // Created by Tanguy Helesbeux on 24/05/2017.
  6. // Copyright © 2017 RxSwift Community. All rights reserved.
  7. //
  8. import Foundation
  9. import RxSwift
  10. extension ObservableType {
  11. /**
  12. Pauses the elements of the source observable sequence based on the latest element from the second observable sequence.
  13. While paused, elements from the source are buffered, limited to a maximum number of element.
  14. When resumed, all buffered elements are flushed as single events in a contiguous stream.
  15. - seealso: [pausable operator on reactivex.io](http://reactivex.io/documentation/operators/backpressure.html)
  16. - parameter pauser: The observable sequence used to pause the source observable sequence.
  17. - parameter limit: The maximum number of element buffered. Pass `nil` to buffer all elements without limit. Default 1.
  18. - parameter flushOnCompleted: If `true` buffered elements will be flushed when the source completes. Default `true`.
  19. - parameter flushOnError: If `true` buffered elements will be flushed when the source errors. Default `true`.
  20. - returns: The observable sequence which is paused and resumed based upon the pauser observable sequence.
  21. */
  22. public func pausableBuffered<Pauser: ObservableType> (_ pauser: Pauser, limit: Int? = 1, flushOnCompleted: Bool = true, flushOnError: Bool = true) -> Observable<Element> where Pauser.Element == Bool {
  23. return Observable<Element>.create { observer in
  24. var buffer: [Element] = []
  25. if let limit = limit {
  26. buffer.reserveCapacity(limit)
  27. }
  28. var paused = true
  29. var flushIndex = 0
  30. let lock = NSRecursiveLock()
  31. let flush = {
  32. while flushIndex < buffer.count {
  33. flushIndex += 1
  34. observer.onNext(buffer[flushIndex - 1])
  35. }
  36. if buffer.count > 0 {
  37. flushIndex = 0
  38. buffer.removeAll(keepingCapacity: limit != nil)
  39. }
  40. }
  41. let boundaryDisposable = pauser.distinctUntilChanged(==).subscribe { event in
  42. lock.lock(); defer { lock.unlock() }
  43. switch event {
  44. case .next(let resume):
  45. if resume && buffer.count > 0 {
  46. flush()
  47. }
  48. paused = !resume
  49. case .completed:
  50. observer.onCompleted()
  51. case .error(let error):
  52. observer.onError(error)
  53. }
  54. }
  55. let disposable = self.subscribe { event in
  56. lock.lock(); defer { lock.unlock() }
  57. switch event {
  58. case .next(let element):
  59. if paused {
  60. buffer.append(element)
  61. if let limit = limit, buffer.count > limit {
  62. buffer.remove(at: 0)
  63. }
  64. } else {
  65. observer.onNext(element)
  66. }
  67. case .completed:
  68. if flushOnCompleted { flush() }
  69. observer.onCompleted()
  70. case .error(let error):
  71. if flushOnError { flush() }
  72. observer.onError(error)
  73. }
  74. }
  75. return Disposables.create([disposable, boundaryDisposable])
  76. }
  77. }
  78. }