AsyncStream in Swift


Greetings, traveler!

To learn more about AsyncStream in the Swift programming language, we can look at the protocol it conforms to, known as AsyncSequence. As described in Apple’s documentation:

A type that provides asynchronous, sequential, iterated access to its elements. An AsyncSequence resembles the Sequence type — offering a list of values you can step through one at a time — and adds asynchronicity. An AsyncSequence may have all, some, or none of its values available when you first use it. Instead, you use await to receive values as they become available.

AsyncStream is a new tool introduced in Swift 5.5’s concurrency framework. This framework provides tools for managing multiple operations concurrently, improving application performance. If you are new to these concepts, you may want to read some articles about async/await, tasks, and actors, which are part of the Swift Concurrency series. AsyncStream allows us to optimize existing code that relies on closures or Combine publishers.

AsyncStream vs AsyncThrowingStream

AsyncStream is basically a stream of elements. AsyncThrowingStream is the same thing, but it can handle cases where the whole operation throws an error. In this article, we will delve into the workings of these tools with an easy-to-understand example that illustrates their utility in monitoring the progress of a download operation.

Example

Let’s imagine a situation where we need to download an array of objects from the server. At the same time, we want to show the user the progress of the download process so that he can track the completion percentage.

First, let’s create an enumeration to display the download state. We can have two states: when the download is still in progress and when all files are downloaded. There is a third state that will notify us that the download failed. AsyncThrowingStream will help us with this case, so we don’t need to create it manually.

struct Object {
    var id = UUID()
}

We must specify an associated value for each case. For the ‘inProgress’ state, a Float will display the completion percentage. For the state when the download is completed, it will be an array of data.

enum State {
    case inProgress(Float)
    case completed([Object])
}

We have three participants in this story:

  1. The Service that provides the old way of loading data using closures.
  2. The Manager that creates the data stream.
  3. The ViewModel that uses this stream.

Let’s start with the Service.

final class DownloadService {
    
    func getData(
        from url: URL,
        onDownloading: (Float) -> Void,
        completion: (Result<[Object], Error>) -> Void
    ) throws {
        
        print("Downloading...")
    }
    
}

As you can see, it provides all the necessary tools. But it uses closures.

Now, let’s move to the Manager. It has a property that can provide AsyncThrowingStream. It uses Service to get data from the URL and its closures to yield the state. Note that when we are finished with data fetching, we must finish the stream not to keep the it alive. Since this is AsyncThrowingStream, we can handle its errors, too.

final class DownloadManager {
    
    private let service = DownloadService()
    private let url = URL(string: "https://livsycode.com/")!
    
    var stream: AsyncThrowingStream<State, Error> {
        AsyncThrowingStream<State, Error> { continuation in
            do {
                try service.getData(from: url) { progressPercentage in
                    continuation.yield(.inProgress(progressPercentage))
                } completion: { result in
                    switch result {
                    case .success(let data):
                        continuation.yield(.completed(data))
                        continuation.finish()
                    case .failure(let error):
                        continuation.finish(throwing: error)
                    }
                }
            } catch {
                continuation.finish(throwing: error)
            }
        }
    }
    
}

Also, we can handle the debugging using the ‘onTermination’ callback.

final class DownloadManager {
    
    private let service = DownloadService()
    private let url = URL(string: "https://livsycode.com/")!
    
    var stream: AsyncThrowingStream<State, Error> {
        AsyncThrowingStream<State, Error> { continuation in
            do {
                try service.getData(from: url) { progressPercentage in
                    continuation.yield(.inProgress(progressPercentage))
                } completion: { result in
                    switch result {
                    case .success(let data):
                        continuation.yield(.completed(data))
                        continuation.finish()
                    case .failure(let error):
                        continuation.finish(throwing: error)
                    }
                }
            } catch {
                continuation.finish(throwing: error)
            }
            
            continuation.onTermination = { @Sendable status in
                switch status {
                case .finished:
                    print("Finished")
                case .cancelled:
                    print("Task was cancelled")
                @unknown default:
                    break
                }
            }
        }
    }
    
}

To improve code safety, you can use @Sendable here. This article, which is also part of the series about Swift Concurrency, explains more about it.

Alright then. Now, we can use our Manager in the ViewModel.

final class ViewModel {
    
    private let manager = DownloadManager()
    
    init() {
        Task {
            for try await state in manager.stream {
                switch state {
                case .inProgress(let progressPercentage):
                    print(progressPercentage)
                case .completed(let array):
                    array.forEach {
                        print($0.id)
                    }
                }
            }
        }
    }
    
}

Since our stream conforms to the protocol AsyncSequence, we can iterate the state it provides.

Conclusion

This article is coming to an end. We have reviewed a new aspect of working within Swift Concurrency. I hope it was valuable for you, and I look forward to seeing you in the following publications.