You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
459 lines
19 KiB
459 lines
19 KiB
// The MIT License (MIT)
|
|
//
|
|
// Copyright (c) 2015-2021 Alexander Grebenyuk (github.com/kean).
|
|
|
|
import Foundation
|
|
|
|
/// `ImagePipeline` loads and decodes image data, processes loaded images and
|
|
/// stores them in caches.
|
|
///
|
|
/// See [Nuke's README](https://github.com/kean/Nuke) for a detailed overview of
|
|
/// the image pipeline and all of the related classes.
|
|
///
|
|
/// If you want to build a system that fits your specific needs, see `ImagePipeline.Configuration`
|
|
/// for a list of the available options. You can set custom data loaders and caches, configure
|
|
/// image encoders and decoders, change the number of concurrent operations for each
|
|
/// individual stage, disable and enable features like deduplication and rate limiting, and more.
|
|
///
|
|
/// `ImagePipeline` is fully thread-safe.
|
|
public /* final */ class ImagePipeline {
|
|
public let configuration: Configuration
|
|
public var observer: ImagePipelineObserving?
|
|
private(set) var dataLoader: DataLoader?
|
|
|
|
private var tasks = [ImageTask: TaskSubscription]()
|
|
|
|
private let decompressedImageTasks: TaskPool<ImageRequest.LoadKeyForProcessedImage, ImageResponse, Error>
|
|
private let processedImageTasks: TaskPool<ImageRequest.LoadKeyForProcessedImage, ImageResponse, Error>
|
|
private let originalImageTasks: TaskPool<ImageRequest.LoadKeyForOriginalImage, ImageResponse, Error>
|
|
private let originalImageDataTasks: TaskPool<ImageRequest.LoadKeyForOriginalImage, (Data, URLResponse?), Error>
|
|
|
|
// The queue on which the entire subsystem is synchronized.
|
|
let queue = DispatchQueue(label: "com.github.kean.Nuke.ImagePipeline", qos: .userInitiated)
|
|
private var isInvalidated = false
|
|
|
|
private var nextTaskId: Int64 { OSAtomicIncrement64(_nextTaskId) }
|
|
private let _nextTaskId: UnsafeMutablePointer<Int64>
|
|
|
|
let rateLimiter: RateLimiter?
|
|
let id = UUID()
|
|
|
|
/// Shared image pipeline.
|
|
public static var shared = ImagePipeline()
|
|
|
|
deinit {
|
|
_nextTaskId.deallocate()
|
|
|
|
ResumableDataStorage.shared.unregister(self)
|
|
#if TRACK_ALLOCATIONS
|
|
Allocations.decrement("ImagePipeline")
|
|
#endif
|
|
}
|
|
|
|
/// Initializes `ImagePipeline` instance with the given configuration.
|
|
///
|
|
/// - parameter configuration: `Configuration()` by default.
|
|
public init(configuration: Configuration = Configuration()) {
|
|
self.configuration = configuration
|
|
self.rateLimiter = configuration.isRateLimiterEnabled ? RateLimiter(queue: queue) : nil
|
|
|
|
let isDeduplicationEnabled = configuration.isDeduplicationEnabled
|
|
self.decompressedImageTasks = TaskPool(isDeduplicationEnabled)
|
|
self.processedImageTasks = TaskPool(isDeduplicationEnabled)
|
|
self.originalImageTasks = TaskPool(isDeduplicationEnabled)
|
|
self.originalImageDataTasks = TaskPool(isDeduplicationEnabled)
|
|
|
|
self._nextTaskId = UnsafeMutablePointer<Int64>.allocate(capacity: 1)
|
|
self._nextTaskId.initialize(to: 0)
|
|
|
|
// Performance optimization to reduce number of queue switches.
|
|
if let dataLoader = configuration.dataLoader as? DataLoader {
|
|
dataLoader.attach(pipeline: self)
|
|
self.dataLoader = dataLoader
|
|
}
|
|
|
|
ResumableDataStorage.shared.register(self)
|
|
|
|
#if TRACK_ALLOCATIONS
|
|
Allocations.increment("ImagePipeline")
|
|
#endif
|
|
}
|
|
|
|
public convenience init(_ configure: (inout ImagePipeline.Configuration) -> Void) {
|
|
var configuration = ImagePipeline.Configuration()
|
|
configure(&configuration)
|
|
self.init(configuration: configuration)
|
|
}
|
|
|
|
/// Invalidates the pipeline and cancels all outstanding tasks.
|
|
func invalidate() {
|
|
queue.async {
|
|
guard !self.isInvalidated else { return }
|
|
self.isInvalidated = true
|
|
self.tasks.keys.forEach(self.cancel)
|
|
}
|
|
}
|
|
|
|
// MARK: - Loading Images
|
|
|
|
@discardableResult
|
|
public func loadImage(with request: ImageRequestConvertible,
|
|
queue: DispatchQueue? = nil,
|
|
completion: @escaping (_ result: Result<ImageResponse, Error>) -> Void) -> ImageTask {
|
|
loadImage(with: request, queue: queue, progress: nil, completion: completion)
|
|
}
|
|
|
|
/// Loads an image for the given request using image loading pipeline.
|
|
///
|
|
/// The pipeline first checks if the image or image data exists in any of its caches.
|
|
/// It checks if the processed image exists in the memory cache, then if the processed
|
|
/// image data exists in the custom data cache (disabled by default), then if the data
|
|
/// cache contains the original image data. Only if there is no cached data, the pipeline
|
|
/// will start loading the data. When the data is loaded the pipeline decodes it, applies
|
|
/// the processors, and decompresses the image in the background.
|
|
///
|
|
/// To learn more about the pipeine, see the [README](https://github.com/kean/Nuke).
|
|
///
|
|
/// # Deduplication
|
|
///
|
|
/// The pipeline avoids doing any duplicated work when loading images. For example,
|
|
/// let's take these two requests:
|
|
///
|
|
/// ```swift
|
|
/// let url = URL(string: "http://example.com/image")
|
|
/// pipeline.loadImage(with: ImageRequest(url: url, processors: [
|
|
/// ImageProcessors.Resize(size: CGSize(width: 44, height: 44)),
|
|
/// ImageProcessors.GaussianBlur(radius: 8)
|
|
/// ]))
|
|
/// pipeline.loadImage(with: ImageRequest(url: url, processors: [
|
|
/// ImageProcessors.Resize(size: CGSize(width: 44, height: 44))
|
|
/// ]))
|
|
/// ```
|
|
///
|
|
/// Nuke will load the data only once, resize the image once and blur it also only once.
|
|
/// There is no duplicated work done. The work only gets canceled when all the registered
|
|
/// requests are, and the priority is based on the highest priority of the registered requests.
|
|
///
|
|
/// # Configuration
|
|
///
|
|
/// See `ImagePipeline.Configuration` to learn more about the pipeline features and
|
|
/// how to enable/disable them.
|
|
///
|
|
/// - parameter queue: A queue on which to execute `progress` and `completion`
|
|
/// callbacks. By default, the pipeline uses `.main` queue.
|
|
/// - parameter progress: A closure to be called periodically on the main thread
|
|
/// when the progress is updated. `nil` by default.
|
|
/// - parameter completion: A closure to be called on the main thread when the
|
|
/// request is finished. `nil` by default.
|
|
@discardableResult
|
|
public func loadImage(with request: ImageRequestConvertible,
|
|
queue: DispatchQueue? = nil,
|
|
progress: ((_ intermediateResponse: ImageResponse?, _ completedUnitCount: Int64, _ totalUnitCount: Int64) -> Void)? = nil,
|
|
completion: ((_ result: Result<ImageResponse, Error>) -> Void)? = nil) -> ImageTask {
|
|
loadImage(with: request.asImageRequest(), isConfined: false, queue: queue, progress: progress, completion: completion)
|
|
}
|
|
|
|
func loadImage(with request: ImageRequest,
|
|
isConfined: Bool,
|
|
queue callbackQueue: DispatchQueue?,
|
|
progress progressHandler: ((_ intermediateResponse: ImageResponse?, _ completedUnitCount: Int64, _ totalUnitCount: Int64) -> Void)?,
|
|
completion: ((_ result: Result<ImageResponse, Error>) -> Void)?) -> ImageTask {
|
|
let request = inheritOptions(request)
|
|
let task = ImageTask(taskId: nextTaskId, request: request, isDataTask: false)
|
|
task.pipeline = self
|
|
if isConfined {
|
|
self.startImageTask(task, callbackQueue: callbackQueue, progress: progressHandler, completion: completion)
|
|
} else {
|
|
self.queue.async {
|
|
self.startImageTask(task, callbackQueue: callbackQueue, progress: progressHandler, completion: completion)
|
|
}
|
|
}
|
|
return task
|
|
}
|
|
|
|
// MARK: - Loading Image Data
|
|
|
|
@discardableResult
|
|
public func loadData(with request: ImageRequestConvertible,
|
|
queue: DispatchQueue? = nil,
|
|
completion: @escaping (Result<(data: Data, response: URLResponse?), Error>) -> Void) -> ImageTask {
|
|
loadData(with: request, queue: queue, progress: nil, completion: completion)
|
|
}
|
|
|
|
/// Loads the image data for the given request. The data doesn't get decoded or processed in any
|
|
/// other way.
|
|
///
|
|
/// You can call `loadImage(:)` for the request at any point after calling `loadData(:)`, the
|
|
/// pipeline will use the same operation to load the data, no duplicated work will be performed.
|
|
///
|
|
/// - parameter queue: A queue on which to execute `progress` and `completion`
|
|
/// callbacks. By default, the pipeline uses `.main` queue.
|
|
/// - parameter progress: A closure to be called periodically on the main thread
|
|
/// when the progress is updated. `nil` by default.
|
|
/// - parameter completion: A closure to be called on the main thread when the
|
|
/// request is finished.
|
|
@discardableResult
|
|
public func loadData(with request: ImageRequestConvertible,
|
|
queue: DispatchQueue? = nil,
|
|
progress: ((_ completed: Int64, _ total: Int64) -> Void)?,
|
|
completion: @escaping (Result<(data: Data, response: URLResponse?), Error>) -> Void) -> ImageTask {
|
|
loadData(with: request.asImageRequest(), isConfined: false, queue: queue, progress: progress, completion: completion)
|
|
}
|
|
|
|
func loadData(with request: ImageRequest,
|
|
isConfined: Bool,
|
|
queue callbackQueue: DispatchQueue?,
|
|
progress: ((_ completed: Int64, _ total: Int64) -> Void)?,
|
|
completion: @escaping (Result<(data: Data, response: URLResponse?), Error>) -> Void) -> ImageTask {
|
|
let task = ImageTask(taskId: nextTaskId, request: request, isDataTask: true)
|
|
task.pipeline = self
|
|
if isConfined {
|
|
self.startDataTask(task, callbackQueue: callbackQueue, progress: progress, completion: completion)
|
|
} else {
|
|
self.queue.async {
|
|
self.startDataTask(task, callbackQueue: callbackQueue, progress: progress, completion: completion)
|
|
}
|
|
}
|
|
return task
|
|
}
|
|
|
|
// MARK: - Image Task Events
|
|
|
|
func imageTaskCancelCalled(_ task: ImageTask) {
|
|
queue.async {
|
|
self.cancel(task)
|
|
}
|
|
}
|
|
|
|
private func cancel(_ task: ImageTask) {
|
|
guard let subscription = self.tasks.removeValue(forKey: task) else { return }
|
|
if !task.isDataTask {
|
|
self.send(.cancelled, task)
|
|
}
|
|
subscription.unsubscribe()
|
|
}
|
|
|
|
func imageTaskUpdatePriorityCalled(_ task: ImageTask, priority: ImageRequest.Priority) {
|
|
queue.async {
|
|
task._priority = priority
|
|
guard let subscription = self.tasks[task] else { return }
|
|
if !task.isDataTask {
|
|
self.send(.priorityUpdated(priority: priority), task)
|
|
}
|
|
subscription.setPriority(priority.taskPriority)
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Cache
|
|
|
|
public extension ImagePipeline {
|
|
/// Returns a cached response from the memory cache.
|
|
func cachedImage(for url: URL) -> ImageContainer? {
|
|
cachedImage(for: ImageRequest(url: url))
|
|
}
|
|
|
|
/// Returns a cached response from the memory cache. Returns `nil` if the request disables
|
|
/// memory cache reads.
|
|
func cachedImage(for request: ImageRequest) -> ImageContainer? {
|
|
guard request.options.memoryCacheOptions.isReadAllowed && request.cachePolicy != .reloadIgnoringCachedData else { return nil }
|
|
|
|
let request = inheritOptions(request)
|
|
return configuration.imageCache?[request]
|
|
}
|
|
|
|
internal func storeResponse(_ image: ImageContainer, for request: ImageRequest) {
|
|
guard request.options.memoryCacheOptions.isWriteAllowed,
|
|
!image.isPreview || configuration.isStoringPreviewsInMemoryCache else { return }
|
|
configuration.imageCache?[request] = image
|
|
}
|
|
|
|
/// Returns a key used for disk cache (see `DataCaching`).
|
|
func cacheKey(for request: ImageRequest, item: DataCacheItem) -> String {
|
|
switch item {
|
|
case .originalImageData: return request.makeCacheKeyForOriginalImageData()
|
|
case .finalImage: return request.makeCacheKeyForFinalImageData()
|
|
}
|
|
}
|
|
|
|
/// Removes cached image from all cache layers.
|
|
func removeCachedImage(for request: ImageRequest) {
|
|
let request = inheritOptions(request)
|
|
|
|
configuration.imageCache?[request] = nil
|
|
|
|
if let dataCache = configuration.dataCache {
|
|
dataCache.removeData(for: request.makeCacheKeyForOriginalImageData())
|
|
dataCache.removeData(for: request.makeCacheKeyForFinalImageData())
|
|
}
|
|
|
|
configuration.dataLoader.removeData(for: request.urlRequest)
|
|
}
|
|
}
|
|
|
|
// MARK: - Starting Image Tasks (Private)
|
|
|
|
private extension ImagePipeline {
|
|
func startImageTask(_ task: ImageTask,
|
|
callbackQueue: DispatchQueue?,
|
|
progress progressHandler: ((_ intermediateResponse: ImageResponse?, _ completedUnitCount: Int64, _ totalUnitCount: Int64) -> Void)?,
|
|
completion: ((_ result: Result<ImageResponse, Error>) -> Void)?) {
|
|
guard !isInvalidated else { return }
|
|
|
|
self.send(.started, task)
|
|
|
|
tasks[task] = makeTaskLoadImage(for: task.request)
|
|
.subscribe(priority: task._priority.taskPriority) { [weak self, weak task] event in
|
|
guard let self = self, let task = task else { return }
|
|
|
|
self.send(ImageTaskEvent(event), task)
|
|
|
|
if event.isCompleted {
|
|
self.tasks[task] = nil
|
|
}
|
|
|
|
self.dispatchCallback(to: callbackQueue) {
|
|
guard !task.isCancelled else { return }
|
|
|
|
switch event {
|
|
case let .value(response, isCompleted):
|
|
if isCompleted {
|
|
completion?(.success(response))
|
|
} else {
|
|
progressHandler?(response, task.completedUnitCount, task.totalUnitCount)
|
|
}
|
|
case let .progress(progress):
|
|
task.setProgress(progress)
|
|
progressHandler?(nil, progress.completed, progress.total)
|
|
case let .error(error):
|
|
completion?(.failure(error))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func startDataTask(_ task: ImageTask,
|
|
callbackQueue: DispatchQueue?,
|
|
progress progressHandler: ((_ completed: Int64, _ total: Int64) -> Void)?,
|
|
completion: @escaping (Result<(data: Data, response: URLResponse?), Error>) -> Void) {
|
|
guard !isInvalidated else { return }
|
|
|
|
tasks[task] = makeTaskLoadImageData(for: task.request)
|
|
.subscribe(priority: task._priority.taskPriority) { [weak self, weak task] event in
|
|
guard let self = self, let task = task else { return }
|
|
|
|
if event.isCompleted {
|
|
self.tasks[task] = nil
|
|
}
|
|
|
|
self.dispatchCallback(to: callbackQueue) {
|
|
guard !task.isCancelled else { return }
|
|
|
|
switch event {
|
|
case let .value(response, isCompleted):
|
|
if isCompleted {
|
|
completion(.success(response))
|
|
}
|
|
case let .progress(progress):
|
|
task.setProgress(progress)
|
|
progressHandler?(progress.completed, progress.total)
|
|
case let .error(error):
|
|
completion(.failure(error))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func dispatchCallback(to callbackQueue: DispatchQueue?, _ closure: @escaping () -> Void) {
|
|
if callbackQueue === self.queue {
|
|
closure()
|
|
} else {
|
|
(callbackQueue ?? self.configuration.callbackQueue).async(execute: closure)
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Task Factory (Private)
|
|
|
|
// When you request an image, the pipeline creates the following dependency graph:
|
|
//
|
|
// TaskLoadImage -> TaskProcessImage* -> TaskDecodeImage -> TaskLoadImageData
|
|
//
|
|
// Each task represents a resource to be retrieved - processed image, original image, etc.
|
|
// Each task can be reuse of the same resource requested multiple times.
|
|
|
|
extension ImagePipeline {
|
|
func makeTaskLoadImage(for request: ImageRequest) -> Task<ImageResponse, Error>.Publisher {
|
|
decompressedImageTasks.publisherForKey(request.makeLoadKeyForFinalImage()) {
|
|
TaskLoadImage(self, request)
|
|
}
|
|
}
|
|
|
|
func makeTaskProcessImage(for request: ImageRequest) -> Task<ImageResponse, Error>.Publisher {
|
|
request.processors.isEmpty ?
|
|
makeTaskDecodeImage(for: request) : // No processing needed
|
|
processedImageTasks.publisherForKey(request.makeLoadKeyForFinalImage()) {
|
|
TaskProcessImage(self, request)
|
|
}
|
|
}
|
|
|
|
func makeTaskDecodeImage(for request: ImageRequest) -> Task<ImageResponse, Error>.Publisher {
|
|
originalImageTasks.publisherForKey(request.makeLoadKeyForOriginalImage()) {
|
|
TaskDecodeImage(self, request)
|
|
}
|
|
}
|
|
|
|
func makeTaskLoadImageData(for request: ImageRequest) -> Task<(Data, URLResponse?), Error>.Publisher {
|
|
originalImageDataTasks.publisherForKey(request.makeLoadKeyForOriginalImage()) {
|
|
TaskLoadImageData(self, request)
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Misc (Private)
|
|
|
|
private extension ImagePipeline {
|
|
/// Inherits some of the pipeline configuration options like processors.
|
|
func inheritOptions(_ request: ImageRequest) -> ImageRequest {
|
|
// Do not manipulate is the request has some processors already.
|
|
guard request.processors.isEmpty, !configuration.processors.isEmpty else { return request }
|
|
|
|
var request = request
|
|
request.processors = configuration.processors
|
|
return request
|
|
}
|
|
|
|
func send(_ event: ImageTaskEvent, _ task: ImageTask) {
|
|
observer?.pipeline(self, imageTask: task, didReceiveEvent: event)
|
|
}
|
|
}
|
|
|
|
// MARK: - Errors
|
|
|
|
public extension ImagePipeline {
|
|
/// Represents all possible image pipeline errors.
|
|
enum Error: Swift.Error, CustomDebugStringConvertible {
|
|
/// Data loader failed to load image data with a wrapped error.
|
|
case dataLoadingFailed(Swift.Error)
|
|
/// Decoder failed to produce a final image.
|
|
case decodingFailed
|
|
/// Processor failed to produce a final image.
|
|
case processingFailed
|
|
|
|
public var debugDescription: String {
|
|
switch self {
|
|
case let .dataLoadingFailed(error): return "Failed to load image data: \(error)"
|
|
case .decodingFailed: return "Failed to create an image from the image data"
|
|
case .processingFailed: return "Failed to process the image"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Testing
|
|
|
|
extension ImagePipeline {
|
|
var taskCount: Int {
|
|
tasks.count
|
|
}
|
|
}
|