Streams of Cocoa: Why It's Still Worth Knowing NSStream
These days, RAM is cheap and plentiful for many use cases. And still there’s a common scenario in which things will easily not fit in RAM: moving a file around.
While most files we encounter daily are small and can easily fit in memory at once, this isn’t true for many media files — especially video. If all we want to do is move files as is, Foundation
has us covered: We have APIs that let us specify what should end up where in less than 10 lines of code, and they do the heavy lifting for us.
However, if we need to massage the bytes in passing, we’ll quickly arrive at the point where the one-liners fail: If, for example, we want to compute some kind of checksum for a file, or encrypt it, we’ll quickly exhaust what we can do with (NSMutable)Data
. And even if we could fit everything in memory at once, doing so would block the current thread for quite a while.
This is where streams save the day.
Enter NSStream
NSStream
is a class cluster that offers an abstraction over buffered reading/writing of arbitrarily sized data. It dates back to a time when most application programming happened on the main thread (after all, many Macs only had a single CPU core to begin with), so the preferred way of using NSStream
is asynchronously; while NSStream
does support direct reading/writing of bytes (a mode of operation called “polling”), one would typically implement a class that conforms to NSStreamDelegate
, set an instance of that class as the stream’s delegate, and schedule the stream on a run loop. The stream then calls the delegate whenever something interesting happens, letting it provide or process bytes on the fly.
When NSStream
was introduced, API contracts between a class and its delegate were typically specified in terms of a so-called “informal protocol” — which is fancy speak for a category on NSObject
. Using an informal protocol has some key advantages. For one, the author of the class could provide a default implementation, which would then remove the need for respondsToSelector:
checks. More importantly, this used to be the only way to specify methods that were optional to implement. (Optional methods in protocols are a language addition from Objective-C 2.0, which debuted several years later.)
So why did we stop doing this? Informal protocols came at the price of making NSObject
’s API surface grow. By a lot. It had all the downsides of categories, which tempted authors to keep the number of APIs they added in an informal protocol as small as possible, and this shows in NSStreamDelegate
: Where a modern protocol would have an explicit method for the different kind of events, NSStreamDelegate
consists of a single optional method, through which all events are funneled — regardless if the kind of stream may ever actually send a certain type of event.
As mentioned in passing, NSStream
is a class cluster. So in practice, you’ll always work with some kind of subclass of it. The public subclasses of NSStream
are NSInputStream
, which is a stream you can read from but not write to; and NSOutputStream
, which is a stream you can write to but not read from. So if you create an NSInputStream
and set an object that implements -stream:handleEvent:
as its delegate, that object shouldn’t ever receive NSStreamEventHasSpaceAvailable
. (The same holds true for NSOutputStream
and NSStreamEventHasBytesAvailable
.)
With all that said, why would you still want to use NSStream
?
Using Streams Today
Sometimes, a stream is the most natural way to do something, and Stream
is the most high-level API available. When, for example, you need some functionality that URLSession
doesn’t offer directly, you can often use the escape hatch of creating a task based on a URLRequest
with an HTTPBodyStream
.
So let’s look into what we’ll need to use this!
ℹ️ Note: If you want to skip the step-by-step development or prefer seeing the code in its entirety while following along, you can download the sample code as a ZIP.
Creating a request that streams from a single file is easy enough:
let source = InputStream(url: <# some file url #>) var request = URLRequest(url: <# some HTTP url #>) request.httpBodyStream = source
This is, of course, nonsense: NSURLSession
has APIs that are much more convenient for this purpose.
But what if we want to end-to-end encrypt the data we’re sending or perform any other kind of manipulation? We’ll need some more streams!
Uniformly Transforming Bytes
The URL request wants an input stream, so let’s write one that lets us massage the bytes from another stream however we see fit. We’ll start with a skeleton that merely forwards all properties, along with the conceptually simple API, to an underlying input stream object:
class TransducingStream: InputStream, StreamDelegate { private weak var delegateStorage: StreamDelegate? private let sourceStream: InputStream override var delegate: StreamDelegate? { set { delegateStorage = newValue } // This is an interesting peculiarity of `NSStream`. 🤷🏾♂️ get { delegateStorage ?? self } } override var streamError: Error? { sourceStream.streamError } override var hasBytesAvailable: Bool { sourceStream.hasBytesAvailable } override var streamStatus: Stream.Status { sourceStream.streamStatus } override func open() { sourceStream.open() } override func close() { sourceStream.close() } override func property(forKey key: Stream.PropertyKey) -> Any? { sourceStream.property(forKey: key) } override func setProperty(_ property: Any?, forKey key: Stream.PropertyKey) -> Bool { sourceStream.setProperty(property, forKey: key) } // To be continued … }
OK, that was boring. Because this article is not about end-to-end encryption or such things, we’ll leave out the actual byte massaging and use a closure that’s passed alongside the actual stream when we initialize an instance of our new class, and we’ll let that closure do the heavy lifting. Following the pattern of not actually working, we’ll also delegate the run loop scheduling to the underlying stream object and set ourselves as the delegate of this underlying stream. This way, we’ll honor all API contracts as soon as we forward the events to our delegate:
// Implementation continues: typealias Transducer = (UnsafeMutablePointer<UInt8>, Int) -> Void private let transducer: Transducer required init(sourceStream: InputStream, transducer: @escaping Transducer) { self.sourceStream = sourceStream self.transducer = transducer super.init(data: .init()) sourceStream.delegate = self } deinit { // Careful: not weak! sourceStream.delegate = nil } func stream(_ aStream: Stream, handle eventCode: Stream.Event) { if let delegate = delegateStorage, delegate !== self { delegate.stream?(self, handle: eventCode) } } override func schedule(in aRunLoop: RunLoop, forMode mode: RunLoop.Mode) { sourceStream.schedule(in: aRunLoop, forMode: mode) } override func remove(from aRunLoop: RunLoop, forMode mode: RunLoop.Mode) { sourceStream.remove(from: aRunLoop, forMode: mode) } // To be completed …
With all of the bookkeeping out of the way, let’s perform the actual work. NSInputStream
offers two methods for reading: one where the caller passes a buffer to be filled by the stream, and one where the stream lets the caller access its internal buffer.
Version one is safer, but version two is potentially more performant because you may save one copy of every byte. This can make a difference, but it’s far from a given. In both cases, we’ll pass the buffer to the transducer and let it do its thing:
// The rest of the implementation: override func read(_ buffer: UnsafeMutablePointer<UInt8>, maxLength len: Int) -> Int { let byteCount = sourceStream.read(buffer, maxLength: len) // Only process the buffer if there’s something in there! if byteCount > 0 { transducer(buffer, byteCount) } return byteCount } override func getBuffer(_ buffer: UnsafeMutablePointer<UnsafeMutablePointer<UInt8>?>, length len: UnsafeMutablePointer<Int>) -> Bool { var maybeBuffer: UnsafeMutablePointer<UInt8>? var length = 0 guard sourceStream.getBuffer(&maybeBuffer, length: &length), let myBuffer = maybeBuffer else { return false } transducer(myBuffer, length) buffer.initialize(to: myBuffer) len.initialize(to: length) return true }
As you may have noticed, this example isn’t the most realistic one: It only works correctly with data transformations that never fail and that don’t modify the byte count/byte order.
Supporting Arbitrary Transformations
How could we solve this? NSStream
has a class method that gives you a bound pair of streams: an input stream to read from and an output stream to write into. Read attempts on the input are relayed as available space on the output.
With that knowledge, we could set out to extend the stream class we just wrote. But that may not be a good idea: If the transform we apply to the source bytes doesn’t map one byte to one byte (like a base-whatever encoding does) what does it mean to read Stream.PropertyKey.fileCurrentOffsetKey
or set it to a new value? Should it return/set the offset of the source stream or the transformed byte stream? Also, for a block-based cypher or many media codecs, not all offsets would necessarily be valid, and skipping may not be possible at all.
A better idea than a class that is a stream itself would be to have a class that merely vends the input stream of a bound stream pair. The class can then be the delegate of the output stream of the stream pair and write bytes as space becomes available, while something reads off the vended input stream.
Drafting the API
So let’s modify our example to this effect: Instead of a closure that never fails and modifies a buffer in place, we’ll allow failure and pass a type that only offers an interface for reading. For the sake of simplicity when it comes to memory management, we bite the bullet and have the transform return Data
instead of a buffer pointer or similar things that might lead to difficult-to-understand bugs. (This is a price we pay because this example is generic: If you were to write a streaming transformer built for only one specific use case, you could potentially avoid some copying and heap allocations. That, however, is beyond the scope of this article.)
To reflect the change from being a stream to vending one, let’s name our class StreamingTransformer
. To instantiate this class, we’ll need a file we want to transform. And since all file-based operations can fail, we’ll create instances in a fashion that can fail without any other shenanigans.
Let’s draft the API quickly:
final class StreamingTransformer: NSObject { enum TransformCallType { case moreToCome case final } typealias Transform = (UnsafeBufferPointer<UInt8>, TransformCallType) -> Result<Data, Error> /// Known error conditions that can occur at run time. enum Errors: Error { /// For scaffolding purposes. case unimplemented // More to come... } class func transforming(file: URL, transform: @escaping Transform) throws -> StreamingTransformer { throw Errors.unimplemented } /// Begins streaming the underlying bytes on the given run loop while data is read from the returned stream. /// /// While streaming, the `transform` parameter passed on creation will be invoked until either: /// - the source of this object has been streamed in its entirety /// - an error is encountered while reading from the source /// - an error is returned from the `transform` /// - the returned `InputStream` is closed /// /// Whichever happens first. /// /// - Parameters: /// - run loop: The run loop on which the data shall be read from the source, and the `transform` is called. /// - Throws: Throws an error if the object has already begun streaming. /// - Returns: Returns an opened `InputStream` that the transformed bytes can be read from. public func beginStreaming(in runloop: RunLoop) throws -> InputStream { throw Errors.unimplemented } }
So what goes in place of throw Errors.unimplemented
? Well, first we’ll limit ourselves to working on files — that’s a precondition pointed out in the name, so we don’t need to waste an error case on that. Failing to open that file, though? That calls for an error! Lastly, we also need a “bound stream pair”:
final class StreamingTransformer: NSObject { enum Errors { case unimplemented case cannotOpen(file: URL, underlyingError: Error?) // To be continued... } class func transforming(file: URL, transform: @escaping Transform) throws -> StreamingTransformer { precondition(file.isFileURL, "Can only work on file URLs") guard let actualSource = InputStream(url: file) else { throw Errors.cannotOpen(file: file, underlyingError: nil) } actualSource.open() if actualSource.streamStatus != .open { throw Errors.cannotOpen(file: file, underlyingError: actualSource.streamError) } let bufferSize = 512 let streamPair = try makeOpenedStreamPair(bufferSize: bufferSize) // Initializer definition coming soon... return .init(source: actualSource, bufferSize: bufferSize, boundPair: streamPair, transform: transform) } private static func makeOpenedStreamPair(bufferSize: Int) throws -> (source: InputStream, drain: OutputStream) { throws Errors.unimplemented // Coming up shortly… } }
The class method on Stream
that returns a bound pair of streams is not exactly “swifty.” It’s not even Cocoa-y, but rather a layer of wet rice paper wrapped around a Core Foundation function that follows The Create Rule and returns two newly created instances by reference. This is why we tuck it away in a helper function and introduce another error code:
final class StreamingTransformer: NSObject { public enum Errors: Error { case cannotOpen(file: URL, underlyingError: Error?) case cannotAllocateBuffer(size: Int) case unimplemented // To be continued... } public class func transforming(file: URL, transform: @escaping Transform) throws -> StreamingTransformer { // Like before. } private static func makeOpenedStreamPair(bufferSize: Int) throws -> (source: InputStream, drain: OutputStream) { var maybeInput: InputStream? var maybeOutput: OutputStream? Stream.getBoundStreams(withBufferSize: bufferSize, inputStream: &maybeInput, outputStream: &maybeOutput) guard let input = maybeInput, let output = maybeOutput else { throw Errors.cannotAllocateBuffer(size: bufferSize) } input.open() output.open() assert(input.streamStatus == .open, "Could not open reading end of bound stream pair") assert(output.streamStatus == .open, "Could not open writing end of bound stream pair") return (source: input, drain: output) } }
With all of this out of the way, we know what the initializer has to look like and what stored properties our class needs at a minimum:
final class StreamingTransformer: NSObject { private init(source: InputStream, bufferSize: Int, boundPair: (source: InputStream, drain: OutputStream), transform: @escaping Transform) { precondition(bufferSize > 0, "Cannot read buffers with a size of 0") self.actualSource = source self.readBuffer = .allocate(capacity: bufferSize) self.drain = boundPair.drain self.sourceToVend = boundPair.source self.bufferTransform = transform super.init() self.drain.delegate = self } deinit { readBuffer.deallocate() drain.delegate = nil actualSource.close() drain.close() sourceToVend.close() } private let bufferTransform: Transform private let readBuffer: UnsafeMutableBufferPointer<UInt8> private let drain: OutputStream private let sourceToVend: InputStream private let actualSource: InputStream }
Conforming to StreamDelegate
The last code snippet was the first step that wouldn’t compile, since our class doesn’t yet conform to StreamDelegate
. So let’s fix that:
extension StreamingTransformer: StreamDelegate { public func stream(_ aStream: Stream, handle eventCode: Stream.Event) { if eventCode.contains(.errorOccurred) { return handle(error: aStream.streamError) } if eventCode.contains(.endEncountered) { return handleEnd() } if eventCode.contains(.hasSpaceAvailable) { drainBytes() } else { print("Encountered event: \(eventCode)") } } private func handle(error: Error?) {} private func handleEnd() {} private func drainBytes() {} }
As we saw in the initial example, reading from an InputStream
is not very “swifty.” So it won’t surprise you that writing to an OutputStream
isn’t either. Before we start implementing the methods stubs above, let’s extend InputStream
and OutputStream
to address that!
Interlude: Swiftifying Streams
We’ll start by giving InputStream
a reading method that takes a (mutable) buffer and returns a custom result enum that covers the three possible cases:
extension InputStream { public func read(buffer: UnsafeMutableBufferPointer<UInt8>) -> ReadResult { let spaceAvailable = buffer.count precondition(spaceAvailable > 0, "Cannot read into buffer of size \(spaceAvailable)") let address = buffer.baseAddress! let bytesRead = read(address, maxLength: spaceAvailable) if bytesRead < 0 { return .failedReading(error: streamError) } else if bytesRead == 0 && streamStatus == .atEnd { return .doneReading } else { return .readSome(slice: buffer.prefix(bytesRead)) } } public enum ReadResult { case readSome(slice: Slice<UnsafeMutableBufferPointer<UInt8>>) case doneReading case failedReading(error: Error?) } }
The nice thing about this implementation is that it avoids unnecessary copies. The bytes are copied into the mutable buffer once, but afterward, only memory addresses (plus a bit of metadata) are passed around.
Because Transform
returns a Data
object, we extend OutputStream
so that we can write a Data
object to it and so that it returns an appropriate enum that models all the cases we need to handle:
extension OutputStream { public func write(buffer: Data) -> WriteResult { let bytesAvailable = buffer.count return buffer.withUnsafeBytes { guard let rawPointer = $0.baseAddress, $0.count > 0 else { // A no-data write completes trivially. return .completed } let bytesWritten = write(rawPointer.assumingMemoryBound(to: UInt8.self), maxLength: bytesAvailable) if bytesWritten < 0 { return .failure(error: streamError!) } else if bytesWritten == bytesAvailable { return .completed } else { return .partial(unwrittenRemainder: buffer.suffix(from: bytesWritten)) } } } public enum WriteResult { case completed case partial(unwrittenRemainder: Data) case failure(error: Error) } }
Putting the Extensions to Use
From this it follows that we’ll need an additional stored property.
In case of a partial write, we need to hold on to the associated unwritten data until the next time the drain has space available. As far as I know, Data
’s func suffix(from:)
is a lazy implementation that only copies bytes when it actually has to. So under the hood, this should (just like a Slice<T>
) still cling on to the buffer you allocated inside your transform, instead of copying its contents:
final class StreamingTransformer: NSObject { // Everything like before but now: private var leftover: Data? // Defaults to `nil`, so no change needed in `init(…)`. }
With all of that out of the way, we can finally implement drainBytes
. First, we’ll need to get rid of any remnants from the last time our drain
had space to fill. Once that’s done, we’ll transform all bytes read from our actualSource
and write that data to drain
while it has space available.
If the actualSource
is done, reading fails, transforming the bytes fails, or writing the transformed data doesn’t complete, we break out of this loop:
extension StreamingTransformer: StreamDelegate { public func stream(_ aStream: Stream, handle eventCode: Stream.Event) { // Like before. } private func drainBytes() { // First clear out anything we didn’t manage to write the last time around. if let pending = leftover { switch drain.write(buffer: pending) { case .failure(error: let error): // Write failed, no point in clearing the leftover. return handle(error: error) case .partial(unwrittenRemainder: let remainder): // Remember the remainder so we can write it the next time the drain has space available. leftover = remainder return case .completed: // Entirety of the leftover buffer has been written > clear it and proceed! leftover = nil } } assert(leftover == nil, "Flushing should have cleared leftovers or stated we need to bail") while drain.hasSpaceAvailable { // Fetch fresh bytes from the source and transform them. let readResult = actualSource.read(buffer: readBuffer) let transformResult: Result<Data, Error> switch readResult { case .failedReading(error: let error): return handle(error: error) case .doneReading: transformResult = bufferTransform(.init(start: nil, count: 0), .final) case .readSome(slice: let slice): transformResult = bufferTransform(.init(rebasing: slice), .moreToCome) } // If the transform failed, we bail with an error. If, however, it did succeed, we write the data. switch transformResult { case .failure(let error): return handle(error: error) case .success(let data): // Again, writing may fail or only write some of the bytes — in both cases we bail. switch drain.write(buffer: data) { case .failure(error: let error): return handle(error: error) case .partial(unwrittenRemainder: let remainder): // Remember the remainder so we can flush it the next time there's some space. leftover = remainder return case .completed: // And lastly, if we’re done reading and managed to write everything that’s left, we’re done. if case .doneReading = readResult { return finishProcessing() } } } } } private func handleEnd() {} private func handle(error: Error?) {} private func finishProcessing() {} }
Wrapping Things Up
This leaves three method stubs to be implemented. handleEnd()
will be the simplest one: If there’s still data in actualSource
, we’ll bubble up an error that the consumer closed, and we may include how much data we’ve already consumed. If, however, actualSource
happens to be depleted just at that point in time, we’ve actually finished processing. (This can, for example, happen if the transform is 1-to-1 and you’re sending a streamed HTTP request with a content-length header.) finishProcessing
then does the actual cleanup work:
final class StreamingTransformer: NSObject { public enum Errors: Error { case cannotOpen(file: URL, underlyingError: Error?) case cannotAllocateBuffer(size: Int) case cannotRead(underlyingError: Error?) case consumerClosed(bytesRead: Int64?) case unimplemented // The end? } // Rest like before. } extension StreamingTransformer: StreamDelegate { public func stream(_ aStream: Stream, handle eventCode: Stream.Event) { // Like before. } private func drainBytes() { // Like before. } private func handleEnd() { if actualSource.streamStatus == .atEnd { finishProcessing() } else { handle(error: Errors.consumerClosed(bytesRead: actualSource.property(forKey: .fileCurrentOffsetKey) as? Int64)) } } private func handle(error: Error?) { // Do something appropriate here. At the very least, log. finishProcessing(notify: false) } private func finishProcessing(notify: Bool = true) { // Avoid getting additional callbacks. drain.delegate = nil // Close the streams and notify when appropriate. actualSource.close() drain.close() if notify { // Whatever makes sense in your case. } } }
That’s it, right?
Well not quite. We now have everything in place to transform the contents of a file on demand and can handle all the relevant cases that come up in doing so. But this is still not doing anything until we know when there’s space available. And we’ll only ever find that out after we’ve scheduled the drain
on a run loop and somebody reads off sourceToVend
.
Scheduling the Work
In the very beginning, when we drafted the API, there also was an instance method, beginStreaming(in:)
, and we never implemented that. So in our final step, let’s get to that and add a state
property to avoid beginning to stream more than once:
final class StreamingTransformer: NSObject { public enum Errors: Error { case cannotOpen(file: URL, underlyingError: Error?) case cannotAllocateBuffer(size: Int) case cannotRead(underlyingError: Error?) case sourceAlreadyExhausted, alreadyStreaming case consumerClosed(bytesRead: Int64?) } enum State { case idle, done case streaming(in: RunLoop, for: RunLoop.Mode) } private init(source: InputStream, bufferSize: Int, boundPair: (source: InputStream, drain: OutputStream), transform: @escaping Transform) { precondition(bufferSize > 0, "Cannot read buffers with a size of 0") self.actualSource = source self.readBuffer = .allocate(capacity: bufferSize) self.drain = boundPair.drain self.sourceToVend = boundPair.source self.bufferTransform = transform self.state = .idle super.init() self.drain.delegate = self } deinit { readBuffer.deallocate() drain.delegate = nil if case .streaming(let runloop, let mode) = state { drain.remove(from: runloop, forMode: mode) } actualSource.close() drain.close() sourceToVend.close() } private let bufferTransform: Transform private let readBuffer: UnsafeMutableBufferPointer<UInt8> private var leftover: Data? private let drain: OutputStream private let sourceToVend: InputStream private let actualSource: InputStream private var state: State public func beginStreaming(in runloop: RunLoop) throws -> InputStream { switch state { case .done: throw Errors.sourceAlreadyExhausted case .streaming: throw Errors.alreadyStreaming case .idle: state = .streaming(in: runloop, for: .default) drain.schedule(in: runloop, forMode: .default) } return sourceToVend } }
With that, we have a functioning class. Some things are left as an exercise to the reader:
-
The class doesn’t support cancellation — you’ll probably want to add that.
-
The private helper functions in the extension that adds conformance to
StreamDelegate
should check ifstate
is still streaming. -
While all the callbacks are going to happen on the thread of the run loop
drain
is scheduled on, cancellation and deallocation can happen on a different one. So any access tostate
requires a lock. -
There’s no notification mechanism that would signal when an instance is finished — either successfully, or with an error. Combine,
NotificationCenter
, or traditional delegation would all be valid approaches.
Conclusion
In this article, we discussed some situations where InputStream
and OutputStream
are still useful tools. Especially when dealing with data of truly arbitrary size, or when transforming transient data on the fly, the Stream
classes remain invaluable tools. We’ve shown how their APIs can be used in Swift to transform data on demand, keeping the memory pressure low and your app responsive. To achieve this, we extended both classes to make using them more “swifty.” Lastly, we demonstrated how streams are scheduled on and removed from a run loop, and how a StreamDelegate
implementation handles the events an InputStream
posts.
So we hope that by now, you not only have one more tool under your belt, but you can also identify situations in which you should consider reaching for it.