You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

683 lines
24 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firebase/Messaging/FIRMessagingConnection.h"
  17. #import <FirebaseMessaging/FIRMessaging.h>
  18. #import "Firebase/Messaging/Protos/GtalkCore.pbobjc.h"
  19. #import "Firebase/Messaging/Protos/GtalkExtensions.pbobjc.h"
  20. #import "Firebase/Messaging/FIRMessagingDataMessageManager.h"
  21. #import "Firebase/Messaging/FIRMessagingDefines.h"
  22. #import "Firebase/Messaging/FIRMessagingLogger.h"
  23. #import "Firebase/Messaging/FIRMessagingRmqManager.h"
  24. #import "Firebase/Messaging/FIRMessagingSecureSocket.h"
  25. #import "Firebase/Messaging/FIRMessagingUtilities.h"
  26. #import "Firebase/Messaging/FIRMessagingVersionUtilities.h"
  27. #import "Firebase/Messaging/FIRMessaging_Private.h"
  28. static NSInteger const kIqSelectiveAck = 12;
  29. static NSInteger const kIqStreamAck = 13;
  30. static int const kInvalidStreamId = -1;
  31. static NSTimeInterval const kHeartbeatInterval = 30.0;
  32. static NSTimeInterval const kConnectionTimeout = 20.0;
  33. static int32_t const kAckingInterval = 10;
  34. static NSString *const kUnackedS2dIdKey = @"FIRMessagingUnackedS2dIdKey";
  35. static NSString *const kAckedS2dIdMapKey = @"FIRMessagingAckedS2dIdMapKey";
  36. static NSString *const kRemoteFromAddress = @"from";
  37. @interface FIRMessagingD2SInfo : NSObject
  38. @property(nonatomic, readwrite, assign) int streamId;
  39. @property(nonatomic, readwrite, strong) NSString *d2sID;
  40. - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID;
  41. @end
  42. @implementation FIRMessagingD2SInfo
  43. - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID {
  44. self = [super init];
  45. if (self) {
  46. _streamId = streamId;
  47. _d2sID = [d2sID copy];
  48. }
  49. return self;
  50. }
  51. - (BOOL)isEqual:(id)object {
  52. if ([object isKindOfClass:[self class]]) {
  53. FIRMessagingD2SInfo *other = (FIRMessagingD2SInfo *)object;
  54. return self.streamId == other.streamId && [self.d2sID isEqualToString:other.d2sID];
  55. }
  56. return NO;
  57. }
  58. - (NSUInteger)hash {
  59. return [self.d2sID hash];
  60. }
  61. @end
  62. @interface FIRMessagingConnection ()<FIRMessagingSecureSocketDelegate>
  63. @property(nonatomic, readwrite, weak) FIRMessagingRmqManager *rmq2Manager;
  64. @property(nonatomic, readwrite, weak) FIRMessagingDataMessageManager *dataMessageManager;
  65. @property(nonatomic, readwrite, assign) FIRMessagingConnectionState state;
  66. @property(nonatomic, readwrite, copy) NSString *host;
  67. @property(nonatomic, readwrite, assign) NSUInteger port;
  68. @property(nonatomic, readwrite, strong) NSString *authId;
  69. @property(nonatomic, readwrite, strong) NSString *token;
  70. @property(nonatomic, readwrite, strong) FIRMessagingSecureSocket *socket;
  71. @property(nonatomic, readwrite, assign) int64_t lastLoginServerTimestamp;
  72. @property(nonatomic, readwrite, assign) int lastStreamIdAcked;
  73. @property(nonatomic, readwrite, assign) int inStreamId;
  74. @property(nonatomic, readwrite, assign) int outStreamId;
  75. @property(nonatomic, readwrite, strong) NSMutableArray *unackedS2dIds;
  76. @property(nonatomic, readwrite, strong) NSMutableDictionary *ackedS2dMap;
  77. @property(nonatomic, readwrite, strong) NSMutableArray *d2sInfos;
  78. // ttl=0 messages that need to be sent as soon as we establish a connection
  79. @property(nonatomic, readwrite, strong) NSMutableArray *sendOnConnectMessages;
  80. @property(nonatomic, readwrite, strong) NSRunLoop *runLoop;
  81. @end
  82. @implementation FIRMessagingConnection;
  83. - (instancetype)initWithAuthID:(NSString *)authId
  84. token:(NSString *)token
  85. host:(NSString *)host
  86. port:(NSUInteger)port
  87. runLoop:(NSRunLoop *)runLoop
  88. rmq2Manager:(FIRMessagingRmqManager *)rmq2Manager
  89. fcmManager:(FIRMessagingDataMessageManager *)dataMessageManager {
  90. self = [super init];
  91. if (self) {
  92. _authId = [authId copy];
  93. _token = [token copy];
  94. _host = [host copy];
  95. _port = port;
  96. _runLoop = runLoop;
  97. _rmq2Manager = rmq2Manager;
  98. _dataMessageManager = dataMessageManager;
  99. _d2sInfos = [NSMutableArray array];
  100. _unackedS2dIds = [NSMutableArray arrayWithArray:[_rmq2Manager unackedS2dRmqIds]];
  101. _ackedS2dMap = [NSMutableDictionary dictionary];
  102. _sendOnConnectMessages = [NSMutableArray array];
  103. }
  104. return self;
  105. }
  106. - (NSString *)description {
  107. return [NSString stringWithFormat:@"host: %@, port: %lu, stream id in: %d, stream id out: %d",
  108. self.host,
  109. _FIRMessaging_UL(self.port),
  110. self.inStreamId,
  111. self.outStreamId];
  112. }
  113. - (void)signIn {
  114. if (self.state != kFIRMessagingConnectionNotConnected) {
  115. return;
  116. }
  117. // break it up for testing
  118. [self setupConnectionSocket];
  119. [self connectToSocket:self.socket];
  120. }
  121. - (void)setupConnectionSocket {
  122. self.socket = [[FIRMessagingSecureSocket alloc] init];
  123. self.socket.delegate = self;
  124. }
  125. - (void)connectToSocket:(FIRMessagingSecureSocket *)socket {
  126. self.state = kFIRMessagingConnectionConnecting;
  127. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection000,
  128. @"Start connecting to FIRMessaging service.");
  129. [socket connectToHost:self.host port:self.port onRunLoop:self.runLoop];
  130. }
  131. - (void)signOut {
  132. // Clear the list of messages to be sent on connect. This will only
  133. // have messages in it if an error happened before receiving the LoginResponse.
  134. [self.sendOnConnectMessages removeAllObjects];
  135. if (self.state == kFIRMessagingConnectionSignedIn) {
  136. [self sendClose];
  137. }
  138. if (self.state != kFIRMessagingConnectionNotConnected) {
  139. [self disconnect];
  140. }
  141. }
  142. - (void)teardown {
  143. if (self.state != kFIRMessagingConnectionNotConnected) {
  144. [self disconnect];
  145. }
  146. }
  147. #pragma mark - FIRMessagingSecureSocketDelegate
  148. - (void)secureSocketDidConnect:(FIRMessagingSecureSocket *)socket {
  149. self.state = kFIRMessagingConnectionConnected;
  150. self.lastStreamIdAcked = 0;
  151. self.inStreamId = 0;
  152. self.outStreamId = 0;
  153. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection001,
  154. @"Connected to FIRMessaging service.");
  155. [self resetUnconfirmedAcks];
  156. [self sendLoginRequest:self.authId token:self.token];
  157. }
  158. - (void)didDisconnectWithSecureSocket:(FIRMessagingSecureSocket *)socket {
  159. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection002,
  160. @"Secure socket disconnected from FIRMessaging service. %ld", (long)self.socket.state);
  161. [self disconnect];
  162. [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonSocketDisconnected];
  163. }
  164. - (void)secureSocket:(FIRMessagingSecureSocket *)socket
  165. didReceiveData:(NSData *)data
  166. withTag:(int8_t)tag {
  167. if (tag < 0) {
  168. // Invalid proto tag
  169. return;
  170. }
  171. Class klassForTag = FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag);
  172. if ([klassForTag isSubclassOfClass:[NSNull class]]) {
  173. FIRMessagingLoggerError(kFIRMessagingMessageCodeConnection003, @"Invalid tag %d for proto",
  174. tag);
  175. return;
  176. }
  177. GPBMessage *proto = [klassForTag parseFromData:data error:NULL];
  178. if (tag == kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionConnected) {
  179. FIRMessagingLoggerDebug(
  180. kFIRMessagingMessageCodeConnection004,
  181. @"Should not receive generated message when the connection is not connected.");
  182. return;
  183. } else if (tag != kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionSignedIn) {
  184. FIRMessagingLoggerDebug(
  185. kFIRMessagingMessageCodeConnection005,
  186. @"Should not receive generated message when the connection is not signed in.");
  187. return;
  188. }
  189. // If traffic is received after a heartbeat it is safe to assume the connection is healthy.
  190. [self cancelConnectionTimeoutTask];
  191. [self performSelector:@selector(sendHeartbeatPing)
  192. withObject:nil
  193. afterDelay:kHeartbeatInterval];
  194. [self willProcessProto:proto];
  195. switch (tag) {
  196. case kFIRMessagingProtoTagLoginResponse:
  197. [self didReceiveLoginResponse:(GtalkLoginResponse *)proto];
  198. break;
  199. case kFIRMessagingProtoTagDataMessageStanza:
  200. [self didReceiveDataMessageStanza:(GtalkDataMessageStanza *)proto];
  201. break;
  202. case kFIRMessagingProtoTagHeartbeatPing:
  203. [self didReceiveHeartbeatPing:(GtalkHeartbeatPing *)proto];
  204. break;
  205. case kFIRMessagingProtoTagHeartbeatAck:
  206. [self didReceiveHeartbeatAck:(GtalkHeartbeatAck *)proto];
  207. break;
  208. case kFIRMessagingProtoTagClose:
  209. [self didReceiveClose:(GtalkClose *)proto];
  210. break;
  211. case kFIRMessagingProtoTagIqStanza:
  212. [self handleIqStanza:(GtalkIqStanza *)proto];
  213. break;
  214. default:
  215. [self didReceiveUnhandledProto:proto];
  216. break;
  217. }
  218. }
  219. // Called from secure socket once we have send the proto with given rmqId over the wire
  220. // since we are mostly concerned with user facing messages which certainly have a rmqId
  221. // we can retrieve them from the Rmq if necessary to look at stuff but for now we just
  222. // log it.
  223. - (void)secureSocket:(FIRMessagingSecureSocket *)socket
  224. didSendProtoWithTag:(int8_t)tag
  225. rmqId:(NSString *)rmqId {
  226. // log the message
  227. [self logMessage:rmqId messageType:tag isOut:YES];
  228. }
  229. #pragma mark - FIRMessagingTestConnection
  230. - (void)sendProto:(GPBMessage *)proto {
  231. FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(proto);
  232. if (tag == kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionConnected) {
  233. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection006,
  234. @"Cannot send generated message when the connection is not connected.");
  235. return;
  236. } else if (tag != kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionSignedIn) {
  237. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection007,
  238. @"Cannot send generated message when the connection is not signed in.");
  239. return;
  240. }
  241. if (self.socket == nil) {
  242. return;
  243. }
  244. [self willSendProto:proto];
  245. [self.socket sendData:proto.data withTag:tag rmqId:FIRMessagingGetRmq2Id(proto)];
  246. }
  247. - (void)sendOnConnectOrDrop:(GPBMessage *)message {
  248. if (self.state == kFIRMessagingConnectionSignedIn) {
  249. // If a connection has already been established, send normally
  250. [self sendProto:message];
  251. } else {
  252. // Otherwise add them to the list of messages to send after login
  253. [self.sendOnConnectMessages addObject:message];
  254. }
  255. }
  256. + (GtalkLoginRequest *)loginRequestWithToken:(NSString *)token authID:(NSString *)authID {
  257. GtalkLoginRequest *login = [[GtalkLoginRequest alloc] init];
  258. login.accountId = 1000000;
  259. login.authService = GtalkLoginRequest_AuthService_AndroidId;
  260. login.authToken = token;
  261. login.id_p = [NSString stringWithFormat:@"%@-%@", @"ios", FIRMessagingCurrentLibraryVersion()];
  262. login.domain = @"mcs.android.com";
  263. login.deviceId = [NSString stringWithFormat:@"android-%llx", authID.longLongValue];
  264. login.networkType = [self currentNetworkType];
  265. login.resource = authID;
  266. login.user = authID;
  267. login.useRmq2 = YES;
  268. login.lastRmqId = 1; // Sending not enabled yet so this stays as 1.
  269. return login;
  270. }
  271. + (int32_t)currentNetworkType {
  272. // http://developer.android.com/reference/android/net/ConnectivityManager.html
  273. int32_t fcmNetworkType;
  274. FIRMessagingNetworkStatus type = [[FIRMessaging messaging] networkType];
  275. switch (type) {
  276. case kFIRMessagingReachabilityReachableViaWiFi:
  277. fcmNetworkType = 1;
  278. break;
  279. case kFIRMessagingReachabilityReachableViaWWAN:
  280. fcmNetworkType = 0;
  281. break;
  282. default:
  283. fcmNetworkType = -1;
  284. break;
  285. }
  286. return fcmNetworkType;
  287. }
  288. - (void)sendLoginRequest:(NSString *)authId
  289. token:(NSString *)token {
  290. GtalkLoginRequest *login = [[self class] loginRequestWithToken:token authID:authId];
  291. // clear the messages sent during last connection
  292. if ([self.d2sInfos count]) {
  293. [self.d2sInfos removeAllObjects];
  294. }
  295. if (self.unackedS2dIds.count > 0) {
  296. FIRMessagingLoggerDebug(
  297. kFIRMessagingMessageCodeConnection008,
  298. @"There are unacked persistent Ids in the login request: %@",
  299. [self.unackedS2dIds.description stringByReplacingOccurrencesOfString:@"%"
  300. withString:@"%%"]);
  301. }
  302. // Send out acks.
  303. for (NSString *unackedPersistentS2dId in self.unackedS2dIds) {
  304. [login.receivedPersistentIdArray addObject:unackedPersistentS2dId];
  305. }
  306. GtalkSetting *setting = [[GtalkSetting alloc] init];
  307. setting.name = @"new_vc";
  308. setting.value = @"1";
  309. [login.settingArray addObject:setting];
  310. [self sendProto:login];
  311. }
  312. - (void)sendHeartbeatAck {
  313. [self sendProto:[[GtalkHeartbeatAck alloc] init]];
  314. }
  315. - (void)sendHeartbeatPing {
  316. // cancel the previous heartbeat request.
  317. [NSObject cancelPreviousPerformRequestsWithTarget:self
  318. selector:@selector(sendHeartbeatPing)
  319. object:nil];
  320. [self scheduleConnectionTimeoutTask];
  321. [self sendProto:[[GtalkHeartbeatPing alloc] init]];
  322. }
  323. + (GtalkIqStanza *)createStreamAck {
  324. GtalkIqStanza *iq = [[GtalkIqStanza alloc] init];
  325. iq.type = GtalkIqStanza_IqType_Set;
  326. iq.id_p = @"";
  327. GtalkExtension *ext = [[GtalkExtension alloc] init];
  328. ext.id_p = kIqStreamAck;
  329. ext.data_p = @"";
  330. iq.extension = ext;
  331. return iq;
  332. }
  333. - (void)sendStreamAck {
  334. GtalkIqStanza *iq = [[self class] createStreamAck];
  335. [self sendProto:iq];
  336. }
  337. - (void)sendClose {
  338. [self sendProto:[[GtalkClose alloc] init]];
  339. }
  340. - (void)handleIqStanza:(GtalkIqStanza *)iq {
  341. if (iq.hasExtension) {
  342. if (iq.extension.id_p == kIqStreamAck) {
  343. [self didReceiveStreamAck:iq];
  344. return;
  345. }
  346. if (iq.extension.id_p == kIqSelectiveAck) {
  347. [self didReceiveSelectiveAck:iq];
  348. return;
  349. }
  350. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection009, @"Unknown ack extension id %d.",
  351. iq.extension.id_p);
  352. } else {
  353. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection010, @"Ip stanza without extension.");
  354. }
  355. [self didReceiveUnhandledProto:iq];
  356. }
  357. - (void)didReceiveLoginResponse:(GtalkLoginResponse *)loginResponse {
  358. if (loginResponse.hasError) {
  359. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection011,
  360. @"Login error with type: %@, message: %@.", loginResponse.error.type,
  361. loginResponse.error.message);
  362. return;
  363. }
  364. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection012, @"Logged onto MCS service.");
  365. self.state = kFIRMessagingConnectionSignedIn;
  366. self.lastLoginServerTimestamp = loginResponse.serverTimestamp;
  367. [self.delegate didLoginWithConnection:self];
  368. [self sendHeartbeatPing];
  369. // Add all the TTL=0 messages on connect
  370. for (GPBMessage *message in self.sendOnConnectMessages) {
  371. [self sendProto:message];
  372. }
  373. [self.sendOnConnectMessages removeAllObjects];
  374. }
  375. - (void)didReceiveHeartbeatPing:(GtalkHeartbeatPing *)heartbeatPing {
  376. [self sendHeartbeatAck];
  377. }
  378. - (void)didReceiveHeartbeatAck:(GtalkHeartbeatAck *)heartbeatAck {
  379. }
  380. - (void)didReceiveDataMessageStanza:(GtalkDataMessageStanza *)dataMessageStanza {
  381. // TODO: Maybe add support raw data later
  382. [self.delegate connectionDidRecieveMessage:dataMessageStanza];
  383. }
  384. - (void)didReceiveUnhandledProto:(GPBMessage *)proto {
  385. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection013, @"Received unhandled proto");
  386. }
  387. - (void)didReceiveStreamAck:(GtalkIqStanza *)iq {
  388. // Server received some stuff from us we don't really need to do anything special
  389. }
  390. - (void)didReceiveSelectiveAck:(GtalkIqStanza *)iq {
  391. GtalkExtension *extension = iq.extension;
  392. if (extension) {
  393. int extensionId = extension.id_p;
  394. if (extensionId == kIqSelectiveAck) {
  395. NSString *dataString = extension.data_p;
  396. GtalkSelectiveAck *selectiveAck = [[GtalkSelectiveAck alloc] init];
  397. [selectiveAck mergeFromData:[dataString dataUsingEncoding:NSUTF8StringEncoding]
  398. extensionRegistry:nil];
  399. NSArray <NSString *>*acks = [selectiveAck idArray];
  400. // we've received ACK's
  401. [self.delegate connectionDidReceiveAckForRmqIds:acks];
  402. // resend unacked messages
  403. [self.dataMessageManager resendMessagesWithConnection:self];
  404. }
  405. }
  406. }
  407. - (void)didReceiveClose:(GtalkClose *)close {
  408. [self disconnect];
  409. }
  410. - (void)willProcessProto:(GPBMessage *)proto {
  411. self.inStreamId++;
  412. if ([proto isKindOfClass:GtalkDataMessageStanza.class]) {
  413. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection014,
  414. @"RMQ: Receiving %@ with rmq_id: %@ incoming stream Id: %d",
  415. proto.class, FIRMessagingGetRmq2Id(proto), self.inStreamId);
  416. } else {
  417. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection015,
  418. @"RMQ: Receiving %@ with incoming stream Id: %d.", proto.class,
  419. self.inStreamId);
  420. }
  421. int streamId = FIRMessagingGetLastStreamId(proto);
  422. if (streamId != kInvalidStreamId) {
  423. // confirm the D2S messages that were sent by us
  424. [self confirmAckedD2sIdsWithStreamId:streamId];
  425. // We can now confirm that our ack was received by the server and start our unack'd list fresh
  426. // with the proto we just received.
  427. [self confirmAckedS2dIdsWithStreamId:streamId];
  428. }
  429. NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
  430. if (rmq2Id != nil) {
  431. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection016,
  432. @"RMQ: Add unacked persistent Id: %@.",
  433. [rmq2Id stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  434. [self.unackedS2dIds addObject:rmq2Id];
  435. [self.rmq2Manager saveS2dMessageWithRmqId:rmq2Id]; // RMQ save
  436. }
  437. BOOL explicitAck = ([proto isKindOfClass:[GtalkDataMessageStanza class]] &&
  438. [(GtalkDataMessageStanza *)proto immediateAck]);
  439. // If we have not sent anything and the ack threshold has been reached then explicitly send one
  440. // to notify the server that we have received messages.
  441. if (self.inStreamId - self.lastStreamIdAcked >= kAckingInterval || explicitAck) {
  442. [self sendStreamAck];
  443. }
  444. }
  445. - (void)willSendProto:(GPBMessage *)proto {
  446. self.outStreamId++;
  447. NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
  448. if ([rmq2Id length]) {
  449. FIRMessagingD2SInfo *d2sInfo = [[FIRMessagingD2SInfo alloc] initWithStreamId:self.outStreamId d2sId:rmq2Id];
  450. [self.d2sInfos addObject:d2sInfo];
  451. }
  452. // each time we send a d2s message, it acks previously received
  453. // s2d messages via the last (s2d) stream id received.
  454. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection017,
  455. @"RMQ: Sending %@ with outgoing stream Id: %d.", proto.class,
  456. self.outStreamId);
  457. // We have received messages since last time we sent something - send ack info to server.
  458. if (self.inStreamId > self.lastStreamIdAcked) {
  459. FIRMessagingSetLastStreamId(proto, self.inStreamId);
  460. self.lastStreamIdAcked = self.inStreamId;
  461. }
  462. if (self.unackedS2dIds.count > 0) {
  463. // Move all 'unack'd' messages to the ack'd map so they can be removed once the
  464. // ack is confirmed.
  465. NSArray *ackedS2dIds = [NSArray arrayWithArray:self.unackedS2dIds];
  466. FIRMessagingLoggerDebug(
  467. kFIRMessagingMessageCodeConnection018, @"RMQ: Mark persistent Ids as acked: %@.",
  468. [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  469. [self.unackedS2dIds removeAllObjects];
  470. self.ackedS2dMap[[@(self.outStreamId) stringValue]] = ackedS2dIds;
  471. }
  472. }
  473. #pragma mark - Private
  474. /**
  475. * This processes the s2d message received in reference to the d2s messages
  476. * that we have sent before.
  477. */
  478. - (void)confirmAckedD2sIdsWithStreamId:(int)lastReceivedStreamId {
  479. NSMutableArray *d2sIdsAcked = [NSMutableArray array];
  480. for (FIRMessagingD2SInfo *d2sInfo in self.d2sInfos) {
  481. if (lastReceivedStreamId < d2sInfo.streamId) {
  482. break;
  483. }
  484. [d2sIdsAcked addObject:d2sInfo];
  485. }
  486. NSMutableArray *rmqIds = [NSMutableArray arrayWithCapacity:[d2sIdsAcked count]];
  487. // remove ACK'ed messages
  488. for (FIRMessagingD2SInfo *d2sInfo in d2sIdsAcked) {
  489. if ([d2sInfo.d2sID length]) {
  490. [rmqIds addObject:d2sInfo.d2sID];
  491. }
  492. [self.d2sInfos removeObject:d2sInfo];
  493. }
  494. [self.delegate connectionDidReceiveAckForRmqIds:rmqIds];
  495. }
  496. - (void)confirmAckedS2dIdsWithStreamId:(int)lastReceivedStreamId {
  497. // If the server hasn't received the streamId yet.
  498. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection019,
  499. @"RMQ: Server last received stream Id: %d.", lastReceivedStreamId);
  500. if (lastReceivedStreamId < self.outStreamId) {
  501. // TODO: This could be a good indicator that we need to re-send something (acks)?
  502. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection020,
  503. @"RMQ: There are unsent messages that should be send...\n"
  504. "server received: %d\nlast stream id sent: %d",
  505. lastReceivedStreamId, self.outStreamId);
  506. }
  507. NSSet *ackedStreamIds =
  508. [self.ackedS2dMap keysOfEntriesPassingTest:^BOOL(id key, id obj, BOOL *stop) {
  509. NSString *streamId = key;
  510. return streamId.intValue <= lastReceivedStreamId;
  511. }];
  512. NSMutableArray *s2dIdsToDelete = [NSMutableArray array];
  513. for (NSString *streamId in ackedStreamIds) {
  514. NSArray *ackedS2dIds = self.ackedS2dMap[streamId];
  515. if (ackedS2dIds.count > 0) {
  516. FIRMessagingLoggerDebug(
  517. kFIRMessagingMessageCodeConnection021,
  518. @"RMQ: Mark persistent Ids as confirmed by stream id %@: %@.", streamId,
  519. [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  520. [self.ackedS2dMap removeObjectForKey:streamId];
  521. }
  522. [s2dIdsToDelete addObjectsFromArray:ackedS2dIds];
  523. }
  524. // clean up s2d ids that the server knows we've received.
  525. // we let the server know via a s2d last stream id received in a
  526. // d2s message. the server lets us know it has received our d2s
  527. // message via a d2s last stream id received in a s2d message.
  528. [self.rmq2Manager removeS2dIds:s2dIdsToDelete];
  529. }
  530. - (void)resetUnconfirmedAcks {
  531. [self.ackedS2dMap enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {
  532. [self.unackedS2dIds addObjectsFromArray:obj];
  533. }];
  534. [self.ackedS2dMap removeAllObjects];
  535. }
  536. - (void)disconnect {
  537. // cancel pending timeout tasks.
  538. [self cancelConnectionTimeoutTask];
  539. // cancel pending heartbeat.
  540. [NSObject cancelPreviousPerformRequestsWithTarget:self
  541. selector:@selector(sendHeartbeatPing)
  542. object:nil];
  543. // Unset the delegate. FIRMessagingConnection will not receive further events from the socket from now on.
  544. self.socket.delegate = nil;
  545. [self.socket disconnect];
  546. self.state = kFIRMessagingConnectionNotConnected;
  547. }
  548. - (void)connectionTimedOut {
  549. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection022,
  550. @"Connection to FIRMessaging service timed out.");
  551. [self disconnect];
  552. [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonTimeout];
  553. }
  554. - (void)scheduleConnectionTimeoutTask {
  555. // cancel the previous heartbeat timeout event and schedule a new one.
  556. [self cancelConnectionTimeoutTask];
  557. [self performSelector:@selector(connectionTimedOut)
  558. withObject:nil
  559. afterDelay:[self connectionTimeoutInterval]];
  560. }
  561. - (void)cancelConnectionTimeoutTask {
  562. // cancel pending timeout tasks.
  563. [NSObject cancelPreviousPerformRequestsWithTarget:self
  564. selector:@selector(connectionTimedOut)
  565. object:nil];
  566. }
  567. - (void)logMessage:(NSString *)description messageType:(int)messageType isOut:(BOOL)isOut {
  568. messageType = isOut ? -messageType : messageType;
  569. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection023,
  570. @"Send msg: %@ type: %d inStreamId: %d outStreamId: %d", description,
  571. messageType, self.inStreamId, self.outStreamId);
  572. }
  573. - (NSTimeInterval)connectionTimeoutInterval {
  574. return kConnectionTimeout;
  575. }
  576. @end