You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

409 lines
15 KiB

/*
* Copyright 2017 Google
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#import "Firebase/Messaging/FIRMessagingSecureSocket.h"
#import <Protobuf/GPBMessage.h>
#import <Protobuf/GPBCodedOutputStream.h>
#import <Protobuf/GPBUtilities.h>
#import "Firebase/Messaging/FIRMessagingCodedInputStream.h"
#import "Firebase/Messaging/FIRMessagingDefines.h"
#import "Firebase/Messaging/FIRMessagingLogger.h"
#import "Firebase/Messaging/FIRMessagingPacketQueue.h"
static const NSUInteger kMaxBufferLength = 1024 * 1024; // 1M
static const NSUInteger kBufferLengthIncrement = 16 * 1024; // 16k
static const uint8_t kVersion = 40;
static const uint8_t kInvalidTag = -1;
typedef NS_ENUM(NSUInteger, FIRMessagingSecureSocketReadResult) {
kFIRMessagingSecureSocketReadResultNone,
kFIRMessagingSecureSocketReadResultIncomplete,
kFIRMessagingSecureSocketReadResultCorrupt,
kFIRMessagingSecureSocketReadResultSuccess
};
static int32_t LogicalRightShift32(int32_t value, int32_t spaces) {
return (int32_t)((uint32_t)(value) >> spaces);
}
static NSUInteger SerializedSize(int32_t value) {
NSUInteger bytes = 0;
while (YES) {
if ((value & ~0x7F) == 0) {
bytes += sizeof(uint8_t);
return bytes;
} else {
bytes += sizeof(uint8_t);
value = LogicalRightShift32(value, 7);
}
}
}
@interface FIRMessagingSecureSocket() <NSStreamDelegate>
@property(nonatomic, readwrite, assign) FIRMessagingSecureSocketState state;
@property(nonatomic, readwrite, strong) NSInputStream *inStream;
@property(nonatomic, readwrite, strong) NSOutputStream *outStream;
@property(nonatomic, readwrite, strong) NSMutableData *inputBuffer;
@property(nonatomic, readwrite, assign) NSUInteger inputBufferLength;
@property(nonatomic, readwrite, strong) NSMutableData *outputBuffer;
@property(nonatomic, readwrite, assign) NSUInteger outputBufferLength;
@property(nonatomic, readwrite, strong) FIRMessagingPacketQueue *packetQueue;
@property(nonatomic, readwrite, assign) BOOL isVersionSent;
@property(nonatomic, readwrite, assign) BOOL isVersionReceived;
@property(nonatomic, readwrite, assign) BOOL isInStreamOpen;
@property(nonatomic, readwrite, assign) BOOL isOutStreamOpen;
@property(nonatomic, readwrite, strong) NSRunLoop *runLoop;
@property(nonatomic, readwrite, strong) NSString *currentRmqIdBeingSent;
@property(nonatomic, readwrite, assign) int8_t currentProtoTypeBeingSent;
@end
@implementation FIRMessagingSecureSocket
- (instancetype)init {
self = [super init];
if (self) {
_state = kFIRMessagingSecureSocketNotOpen;
_inputBuffer = [NSMutableData dataWithLength:kBufferLengthIncrement];
_packetQueue = [[FIRMessagingPacketQueue alloc] init];
_currentProtoTypeBeingSent = kInvalidTag;
}
return self;
}
- (void)dealloc {
[self disconnect];
}
- (void)connectToHost:(NSString *)host
port:(NSUInteger)port
onRunLoop:(NSRunLoop *)runLoop {
if (!host || self.state != kFIRMessagingSecureSocketNotOpen) {
return;
}
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket000,
@"Opening secure socket to FIRMessaging service");
self.state = kFIRMessagingSecureSocketOpening;
self.runLoop = runLoop;
CFReadStreamRef inputStreamRef;
CFWriteStreamRef outputStreamRef;
CFStreamCreatePairWithSocketToHost(NULL,
(__bridge CFStringRef)host,
(int)port,
&inputStreamRef,
&outputStreamRef);
self.inStream = CFBridgingRelease(inputStreamRef);
self.outStream = CFBridgingRelease(outputStreamRef);
if (!self.inStream || !self.outStream) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket001,
@"Failed to initialize socket.");
return;
}
self.isInStreamOpen = NO;
self.isOutStreamOpen = NO;
BOOL isVOIPSocket = NO;
[self openStream:self.outStream isVOIPStream:isVOIPSocket];
[self openStream:self.inStream isVOIPStream:isVOIPSocket];
}
- (void)disconnect {
if (self.state == kFIRMessagingSecureSocketClosing) {
return;
}
if (!self.inStream && !self.outStream) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket002,
@"The socket is not open or already closed.");
return;
}
self.state = kFIRMessagingSecureSocketClosing;
if (self.inStream) {
[self closeStream:self.inStream];
self.inStream = nil;
}
if (self.outStream) {
[self closeStream:self.outStream];
self.outStream = nil;
}
self.state = kFIRMessagingSecureSocketClosed;
[self.delegate didDisconnectWithSecureSocket:self];
}
- (void)sendData:(NSData *)data withTag:(int8_t)tag rmqId:(NSString *)rmqId {
[self.packetQueue push:[FIRMessagingPacket packetWithTag:tag rmqId:rmqId data:data]];
if ([self.outStream hasSpaceAvailable]) {
[self performWrite];
}
}
#pragma mark - NSStreamDelegate
- (void)stream:(NSStream *)stream handleEvent:(NSStreamEvent)eventCode {
switch (eventCode) {
case NSStreamEventHasBytesAvailable:
if (self.state != kFIRMessagingSecureSocketOpen) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket003,
@"Try to read from socket that is not opened");
return;
}
if (![self performRead]) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket004,
@"Error occurred when reading incoming stream");
[self disconnect];
}
break;
case NSStreamEventEndEncountered:
FIRMessagingLoggerDebug(
kFIRMessagingMessageCodeSecureSocket005, @"%@ end encountered",
stream == self.inStream
? @"Input stream"
: (stream == self.outStream ? @"Output stream" : @"Unknown stream"));
[self disconnect];
break;
case NSStreamEventOpenCompleted:
if (stream == self.inStream) {
self.isInStreamOpen = YES;
} else if (stream == self.outStream) {
self.isOutStreamOpen = YES;
}
if (self.isInStreamOpen && self.isOutStreamOpen) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket006,
@"Secure socket to FIRMessaging service opened");
self.state = kFIRMessagingSecureSocketOpen;
[self.delegate secureSocketDidConnect:self];
}
break;
case NSStreamEventErrorOccurred: {
FIRMessagingLoggerDebug(
kFIRMessagingMessageCodeSecureSocket007, @"%@ error occurred",
stream == self.inStream
? @"Input stream"
: (stream == self.outStream ? @"Output stream" : @"Unknown stream"));
[self disconnect];
break;
}
case NSStreamEventHasSpaceAvailable:
if (self.state != kFIRMessagingSecureSocketOpen) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket008,
@"Try to write to socket that is not opened");
return;
}
[self performWrite];
break;
default:
break;
}
}
#pragma mark - Private
- (void)openStream:(NSStream *)stream isVOIPStream:(BOOL)isVOIPStream {
if (stream) {
if ([stream streamStatus] != NSStreamStatusNotOpen) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket009,
@"stream should not be open.");
return;
}
[stream setProperty:NSStreamSocketSecurityLevelNegotiatedSSL
forKey:NSStreamSocketSecurityLevelKey];
if (isVOIPStream) {
[stream setProperty:NSStreamNetworkServiceTypeVoIP
forKey:NSStreamNetworkServiceType];
}
stream.delegate = self;
[stream scheduleInRunLoop:self.runLoop forMode:NSDefaultRunLoopMode];
[stream open];
}
}
- (void)closeStream:(NSStream *)stream {
if (stream) {
[stream close];
[stream removeFromRunLoop:self.runLoop forMode:NSDefaultRunLoopMode];
stream.delegate = nil;
}
}
- (BOOL)performRead {
if (!self.isVersionReceived) {
self.isVersionReceived = YES;
uint8_t versionByte = 0;
NSInteger bytesRead = [self.inStream read:&versionByte maxLength:sizeof(uint8_t)];
if (bytesRead != sizeof(uint8_t) || kVersion != versionByte) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket010,
@"Version do not match. Received %d, Expecting %d", versionByte,
kVersion);
return NO;
}
}
while (YES) {
BOOL isInputBufferValid = [self.inputBuffer length] > 0;
if (!isInputBufferValid) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket011,
@"Input buffer is not valid.");
return NO;
}
if (![self.inStream hasBytesAvailable]) {
break;
}
// try to read more data
uint8_t *unusedBufferPtr = (uint8_t *)self.inputBuffer.mutableBytes + self.inputBufferLength;
NSUInteger unusedBufferLength = [self.inputBuffer length] - self.inputBufferLength;
NSInteger bytesRead = [self.inStream read:unusedBufferPtr maxLength:unusedBufferLength];
if (bytesRead <= 0) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket012,
@"Failed to read input stream. Bytes read %ld, Used buffer size %lu, "
@"Unused buffer size %lu",
_FIRMessaging_UL(bytesRead), _FIRMessaging_UL(self.inputBufferLength),
_FIRMessaging_UL(unusedBufferLength));
break;
}
// did successfully read some more data
self.inputBufferLength += (NSUInteger)bytesRead;
if ([self.inputBuffer length] <= self.inputBufferLength) {
// shouldn't be reading more than 1MB of data in one go
if ([self.inputBuffer length] + kBufferLengthIncrement > kMaxBufferLength) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket013,
@"Input buffer exceed 1M, disconnect socket");
return NO;
}
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket014,
@"Input buffer limit exceeded. Used input buffer size %lu, "
@"Total input buffer size %lu. No unused buffer left. "
@"Increase buffer size.",
_FIRMessaging_UL(self.inputBufferLength),
_FIRMessaging_UL([self.inputBuffer length]));
[self.inputBuffer increaseLengthBy:kBufferLengthIncrement];
}
while (self.inputBufferLength > 0 && [self.inputBuffer length] > 0) {
NSRange inputRange = NSMakeRange(0, self.inputBufferLength);
size_t protoBytes = 0;
// read the actual proto data coming in
FIRMessagingSecureSocketReadResult readResult =
[self processCurrentInputBuffer:[self.inputBuffer subdataWithRange:inputRange]
outOffset:&protoBytes];
// Corrupt data encountered, stop processing.
if (readResult == kFIRMessagingSecureSocketReadResultCorrupt) {
return NO;
// Incomplete data, keep trying to read by loading more from the stream.
} else if (readResult == kFIRMessagingSecureSocketReadResultIncomplete) {
break;
}
// we have read (0, protoBytes) of data in the inputBuffer
if (protoBytes == self.inputBufferLength) {
// did completely read the buffer data can be reset for further processing
self.inputBufferLength = 0;
} else {
// delete processed bytes while maintaining the buffer size.
NSUInteger prevLength __unused = [self.inputBuffer length];
// delete the processed bytes
[self.inputBuffer replaceBytesInRange:NSMakeRange(0, protoBytes) withBytes:NULL length:0];
// reallocate more data
[self.inputBuffer increaseLengthBy:protoBytes];
self.inputBufferLength -= protoBytes;
}
}
}
return YES;
}
- (FIRMessagingSecureSocketReadResult)processCurrentInputBuffer:(NSData *)readData
outOffset:(size_t *)outOffset {
*outOffset = 0;
FIRMessagingCodedInputStream *input = [[FIRMessagingCodedInputStream alloc] initWithData:readData];
int8_t rawTag;
if (![input readTag:&rawTag]) {
return kFIRMessagingSecureSocketReadResultIncomplete;
}
int32_t length;
if (![input readLength:&length]) {
return kFIRMessagingSecureSocketReadResultIncomplete;
}
// NOTE tag can be zero for |HeartbeatPing|, and length can be zero for |Close| proto
if (rawTag < 0 || length < 0) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket015, @"Buffer data corrupted.");
return kFIRMessagingSecureSocketReadResultCorrupt;
}
NSData *data = [input readDataWithLength:(uint32_t)length];
if (data == nil) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket016,
@"Incomplete data, buffered data length %ld, expected length %d",
_FIRMessaging_UL(self.inputBufferLength), length);
return kFIRMessagingSecureSocketReadResultIncomplete;
}
[self.delegate secureSocket:self didReceiveData:data withTag:rawTag];
*outOffset = input.offset;
return kFIRMessagingSecureSocketReadResultSuccess;
}
- (void)performWrite {
if (!self.isVersionSent) {
self.isVersionSent = YES;
uint8_t versionByte = kVersion;
[self.outStream write:&versionByte maxLength:sizeof(uint8_t)];
}
while (!self.packetQueue.isEmpty && self.outStream.hasSpaceAvailable) {
if (self.outputBuffer.length == 0) {
// serialize new packets only when the output buffer is flushed.
FIRMessagingPacket *packet = [self.packetQueue pop];
self.currentRmqIdBeingSent = packet.rmqId;
self.currentProtoTypeBeingSent = packet.tag;
NSUInteger length = SerializedSize(packet.tag) +
SerializedSize((int)packet.data.length) + packet.data.length;
self.outputBuffer = [NSMutableData dataWithLength:length];
GPBCodedOutputStream *output = [GPBCodedOutputStream streamWithData:self.outputBuffer];
[output writeRawVarint32:packet.tag];
[output writeBytesNoTag:packet.data];
self.outputBufferLength = 0;
}
// flush the output buffer.
NSInteger written = [self.outStream write:self.outputBuffer.bytes + self.outputBufferLength
maxLength:self.outputBuffer.length - self.outputBufferLength];
if (written <= 0) {
continue;
}
self.outputBufferLength += (NSUInteger)written;
if (self.outputBufferLength >= self.outputBuffer.length) {
self.outputBufferLength = 0;
self.outputBuffer = nil;
[self.delegate secureSocket:self
didSendProtoWithTag:self.currentProtoTypeBeingSent
rmqId:self.currentRmqIdBeingSent];
self.currentRmqIdBeingSent = nil;
self.currentProtoTypeBeingSent = kInvalidTag;
}
}
}
@end