You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

711 lines
26 KiB

6 years ago
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "FIRMessagingConnection.h"
  17. #import "Protos/GtalkCore.pbobjc.h"
  18. #import "Protos/GtalkExtensions.pbobjc.h"
  19. #import "FIRMessaging.h"
  20. #import "FIRMessagingDataMessageManager.h"
  21. #import "FIRMessagingDefines.h"
  22. #import "FIRMessagingLogger.h"
  23. #import "FIRMessagingRmqManager.h"
  24. #import "FIRMessagingSecureSocket.h"
  25. #import "FIRMessagingUtilities.h"
  26. #import "FIRMessagingVersionUtilities.h"
  27. #import "FIRMessaging_Private.h"
  28. static NSInteger const kIqSelectiveAck = 12;
  29. static NSInteger const kIqStreamAck = 13;
  30. static int const kInvalidStreamId = -1;
  31. // Threshold for number of messages removed that we will ack, for short lived connections
  32. static int const kMessageRemoveAckThresholdCount = 5;
  33. static NSTimeInterval const kHeartbeatInterval = 30.0;
  34. static NSTimeInterval const kConnectionTimeout = 20.0;
  35. static int32_t const kAckingInterval = 10;
  36. static NSString *const kUnackedS2dIdKey = @"FIRMessagingUnackedS2dIdKey";
  37. static NSString *const kAckedS2dIdMapKey = @"FIRMessagingAckedS2dIdMapKey";
  38. static NSString *const kRemoteFromAddress = @"from";
  39. @interface FIRMessagingD2SInfo : NSObject
  40. @property(nonatomic, readwrite, assign) int streamId;
  41. @property(nonatomic, readwrite, strong) NSString *d2sID;
  42. - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID;
  43. @end
  44. @implementation FIRMessagingD2SInfo
  45. - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID {
  46. self = [super init];
  47. if (self) {
  48. _streamId = streamId;
  49. _d2sID = [d2sID copy];
  50. }
  51. return self;
  52. }
  53. - (BOOL)isEqual:(id)object {
  54. if ([object isKindOfClass:[self class]]) {
  55. FIRMessagingD2SInfo *other = (FIRMessagingD2SInfo *)object;
  56. return self.streamId == other.streamId && [self.d2sID isEqualToString:other.d2sID];
  57. }
  58. return NO;
  59. }
  60. - (NSUInteger)hash {
  61. return [self.d2sID hash];
  62. }
  63. @end
  64. @interface FIRMessagingConnection ()<FIRMessagingSecureSocketDelegate>
  65. @property(nonatomic, readwrite, weak) FIRMessagingRmqManager *rmq2Manager;
  66. @property(nonatomic, readwrite, weak) FIRMessagingDataMessageManager *dataMessageManager;
  67. @property(nonatomic, readwrite, assign) FIRMessagingConnectionState state;
  68. @property(nonatomic, readwrite, copy) NSString *host;
  69. @property(nonatomic, readwrite, assign) NSUInteger port;
  70. @property(nonatomic, readwrite, strong) NSString *authId;
  71. @property(nonatomic, readwrite, strong) NSString *token;
  72. @property(nonatomic, readwrite, strong) FIRMessagingSecureSocket *socket;
  73. @property(nonatomic, readwrite, assign) int64_t lastLoginServerTimestamp;
  74. @property(nonatomic, readwrite, assign) int lastStreamIdAcked;
  75. @property(nonatomic, readwrite, assign) int inStreamId;
  76. @property(nonatomic, readwrite, assign) int outStreamId;
  77. @property(nonatomic, readwrite, strong) NSMutableArray *unackedS2dIds;
  78. @property(nonatomic, readwrite, strong) NSMutableDictionary *ackedS2dMap;
  79. @property(nonatomic, readwrite, strong) NSMutableArray *d2sInfos;
  80. // ttl=0 messages that need to be sent as soon as we establish a connection
  81. @property(nonatomic, readwrite, strong) NSMutableArray *sendOnConnectMessages;
  82. @property(nonatomic, readwrite, strong) NSRunLoop *runLoop;
  83. @end
  84. @implementation FIRMessagingConnection;
  85. - (instancetype)initWithAuthID:(NSString *)authId
  86. token:(NSString *)token
  87. host:(NSString *)host
  88. port:(NSUInteger)port
  89. runLoop:(NSRunLoop *)runLoop
  90. rmq2Manager:(FIRMessagingRmqManager *)rmq2Manager
  91. fcmManager:(FIRMessagingDataMessageManager *)dataMessageManager {
  92. self = [super init];
  93. if (self) {
  94. _authId = [authId copy];
  95. _token = [token copy];
  96. _host = [host copy];
  97. _port = port;
  98. _runLoop = runLoop;
  99. _rmq2Manager = rmq2Manager;
  100. _dataMessageManager = dataMessageManager;
  101. _d2sInfos = [NSMutableArray array];
  102. _unackedS2dIds = [NSMutableArray arrayWithArray:[_rmq2Manager unackedS2dRmqIds]];
  103. _ackedS2dMap = [NSMutableDictionary dictionary];
  104. _sendOnConnectMessages = [NSMutableArray array];
  105. }
  106. return self;
  107. }
  108. - (NSString *)description {
  109. return [NSString stringWithFormat:@"host: %@, port: %lu, stream id in: %d, stream id out: %d",
  110. self.host,
  111. _FIRMessaging_UL(self.port),
  112. self.inStreamId,
  113. self.outStreamId];
  114. }
  115. - (void)signIn {
  116. _FIRMessagingDevAssert(self.state == kFIRMessagingConnectionNotConnected, @"Invalid connection state.");
  117. if (self.state != kFIRMessagingConnectionNotConnected) {
  118. return;
  119. }
  120. // break it up for testing
  121. [self setupConnectionSocket];
  122. [self connectToSocket:self.socket];
  123. }
  124. - (void)setupConnectionSocket {
  125. self.socket = [[FIRMessagingSecureSocket alloc] init];
  126. self.socket.delegate = self;
  127. }
  128. - (void)connectToSocket:(FIRMessagingSecureSocket *)socket {
  129. self.state = kFIRMessagingConnectionConnecting;
  130. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection000,
  131. @"Start connecting to FIRMessaging service.");
  132. [socket connectToHost:self.host port:self.port onRunLoop:self.runLoop];
  133. }
  134. - (void)signOut {
  135. // Clear the list of messages to be sent on connect. This will only
  136. // have messages in it if an error happened before receiving the LoginResponse.
  137. [self.sendOnConnectMessages removeAllObjects];
  138. if (self.state == kFIRMessagingConnectionSignedIn) {
  139. [self sendClose];
  140. }
  141. if (self.state != kFIRMessagingConnectionNotConnected) {
  142. [self disconnect];
  143. }
  144. }
  145. - (void)teardown {
  146. if (self.state != kFIRMessagingConnectionNotConnected) {
  147. [self disconnect];
  148. }
  149. }
  150. #pragma mark - FIRMessagingSecureSocketDelegate
  151. - (void)secureSocketDidConnect:(FIRMessagingSecureSocket *)socket {
  152. self.state = kFIRMessagingConnectionConnected;
  153. self.lastStreamIdAcked = 0;
  154. self.inStreamId = 0;
  155. self.outStreamId = 0;
  156. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection001,
  157. @"Connected to FIRMessaging service.");
  158. [self resetUnconfirmedAcks];
  159. [self sendLoginRequest:self.authId token:self.token];
  160. }
  161. - (void)didDisconnectWithSecureSocket:(FIRMessagingSecureSocket *)socket {
  162. _FIRMessagingDevAssert(self.socket == socket, @"Invalid socket");
  163. _FIRMessagingDevAssert(self.socket.state == kFIRMessagingSecureSocketClosed, @"Socket already closed");
  164. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection002,
  165. @"Secure socket disconnected from FIRMessaging service.");
  166. [self disconnect];
  167. [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonSocketDisconnected];
  168. }
  169. - (void)secureSocket:(FIRMessagingSecureSocket *)socket
  170. didReceiveData:(NSData *)data
  171. withTag:(int8_t)tag {
  172. if (tag < 0) {
  173. // Invalid proto tag
  174. return;
  175. }
  176. Class klassForTag = FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag);
  177. if ([klassForTag isSubclassOfClass:[NSNull class]]) {
  178. FIRMessagingLoggerError(kFIRMessagingMessageCodeConnection003, @"Invalid tag %d for proto",
  179. tag);
  180. return;
  181. }
  182. GPBMessage *proto = [klassForTag parseFromData:data error:NULL];
  183. if (tag == kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionConnected) {
  184. FIRMessagingLoggerDebug(
  185. kFIRMessagingMessageCodeConnection004,
  186. @"Should not receive generated message when the connection is not connected.");
  187. return;
  188. } else if (tag != kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionSignedIn) {
  189. FIRMessagingLoggerDebug(
  190. kFIRMessagingMessageCodeConnection005,
  191. @"Should not receive generated message when the connection is not signed in.");
  192. return;
  193. }
  194. // If traffic is received after a heartbeat it is safe to assume the connection is healthy.
  195. [self cancelConnectionTimeoutTask];
  196. [self performSelector:@selector(sendHeartbeatPing)
  197. withObject:nil
  198. afterDelay:kHeartbeatInterval];
  199. [self willProcessProto:proto];
  200. switch (tag) {
  201. case kFIRMessagingProtoTagLoginResponse:
  202. [self didReceiveLoginResponse:(GtalkLoginResponse *)proto];
  203. break;
  204. case kFIRMessagingProtoTagDataMessageStanza:
  205. [self didReceiveDataMessageStanza:(GtalkDataMessageStanza *)proto];
  206. break;
  207. case kFIRMessagingProtoTagHeartbeatPing:
  208. [self didReceiveHeartbeatPing:(GtalkHeartbeatPing *)proto];
  209. break;
  210. case kFIRMessagingProtoTagHeartbeatAck:
  211. [self didReceiveHeartbeatAck:(GtalkHeartbeatAck *)proto];
  212. break;
  213. case kFIRMessagingProtoTagClose:
  214. [self didReceiveClose:(GtalkClose *)proto];
  215. break;
  216. case kFIRMessagingProtoTagIqStanza:
  217. [self handleIqStanza:(GtalkIqStanza *)proto];
  218. break;
  219. default:
  220. [self didReceiveUnhandledProto:proto];
  221. break;
  222. }
  223. }
  224. // Called from secure socket once we have send the proto with given rmqId over the wire
  225. // since we are mostly concerned with user facing messages which certainly have a rmqId
  226. // we can retrieve them from the Rmq if necessary to look at stuff but for now we just
  227. // log it.
  228. - (void)secureSocket:(FIRMessagingSecureSocket *)socket
  229. didSendProtoWithTag:(int8_t)tag
  230. rmqId:(NSString *)rmqId {
  231. // log the message
  232. [self logMessage:rmqId messageType:tag isOut:YES];
  233. }
  234. #pragma mark - FIRMessagingTestConnection
  235. - (void)sendProto:(GPBMessage *)proto {
  236. FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(proto);
  237. if (tag == kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionConnected) {
  238. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection006,
  239. @"Cannot send generated message when the connection is not connected.");
  240. return;
  241. } else if (tag != kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionSignedIn) {
  242. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection007,
  243. @"Cannot send generated message when the connection is not signed in.");
  244. return;
  245. }
  246. _FIRMessagingDevAssert(self.socket != nil, @"Socket shouldn't be nil");
  247. if (self.socket == nil) {
  248. return;
  249. }
  250. [self willSendProto:proto];
  251. [self.socket sendData:proto.data withTag:tag rmqId:FIRMessagingGetRmq2Id(proto)];
  252. }
  253. - (void)sendOnConnectOrDrop:(GPBMessage *)message {
  254. if (self.state == kFIRMessagingConnectionSignedIn) {
  255. // If a connection has already been established, send normally
  256. [self sendProto:message];
  257. } else {
  258. // Otherwise add them to the list of messages to send after login
  259. [self.sendOnConnectMessages addObject:message];
  260. }
  261. }
  262. + (GtalkLoginRequest *)loginRequestWithToken:(NSString *)token authID:(NSString *)authID {
  263. GtalkLoginRequest *login = [[GtalkLoginRequest alloc] init];
  264. login.accountId = 1000000;
  265. login.authService = GtalkLoginRequest_AuthService_AndroidId;
  266. login.authToken = token;
  267. login.id_p = [NSString stringWithFormat:@"%@-%@", @"ios", FIRMessagingCurrentLibraryVersion()];
  268. login.domain = @"mcs.android.com";
  269. login.deviceId = [NSString stringWithFormat:@"android-%llx", authID.longLongValue];
  270. login.networkType = [self currentNetworkType];
  271. login.resource = authID;
  272. login.user = authID;
  273. login.useRmq2 = YES;
  274. login.lastRmqId = 1; // Sending not enabled yet so this stays as 1.
  275. return login;
  276. }
  277. + (int32_t)currentNetworkType {
  278. // http://developer.android.com/reference/android/net/ConnectivityManager.html
  279. int32_t fcmNetworkType;
  280. FIRMessagingNetworkStatus type = [[FIRMessaging messaging] networkType];
  281. switch (type) {
  282. case kFIRMessagingReachabilityReachableViaWiFi:
  283. fcmNetworkType = 1;
  284. break;
  285. case kFIRMessagingReachabilityReachableViaWWAN:
  286. fcmNetworkType = 0;
  287. break;
  288. default:
  289. fcmNetworkType = -1;
  290. break;
  291. }
  292. return fcmNetworkType;
  293. }
  294. - (void)sendLoginRequest:(NSString *)authId
  295. token:(NSString *)token {
  296. GtalkLoginRequest *login = [[self class] loginRequestWithToken:token authID:authId];
  297. // clear the messages sent during last connection
  298. if ([self.d2sInfos count]) {
  299. [self.d2sInfos removeAllObjects];
  300. }
  301. if (self.unackedS2dIds.count > 0) {
  302. FIRMessagingLoggerDebug(
  303. kFIRMessagingMessageCodeConnection008,
  304. @"There are unacked persistent Ids in the login request: %@",
  305. [self.unackedS2dIds.description stringByReplacingOccurrencesOfString:@"%"
  306. withString:@"%%"]);
  307. }
  308. // Send out acks.
  309. for (NSString *unackedPersistentS2dId in self.unackedS2dIds) {
  310. [login.receivedPersistentIdArray addObject:unackedPersistentS2dId];
  311. }
  312. GtalkSetting *setting = [[GtalkSetting alloc] init];
  313. setting.name = @"new_vc";
  314. setting.value = @"1";
  315. [login.settingArray addObject:setting];
  316. [self sendProto:login];
  317. }
  318. - (void)sendHeartbeatAck {
  319. [self sendProto:[[GtalkHeartbeatAck alloc] init]];
  320. }
  321. - (void)sendHeartbeatPing {
  322. // cancel the previous heartbeat request.
  323. [NSObject cancelPreviousPerformRequestsWithTarget:self
  324. selector:@selector(sendHeartbeatPing)
  325. object:nil];
  326. [self scheduleConnectionTimeoutTask];
  327. [self sendProto:[[GtalkHeartbeatPing alloc] init]];
  328. }
  329. + (GtalkIqStanza *)createStreamAck {
  330. GtalkIqStanza *iq = [[GtalkIqStanza alloc] init];
  331. iq.type = GtalkIqStanza_IqType_Set;
  332. iq.id_p = @"";
  333. GtalkExtension *ext = [[GtalkExtension alloc] init];
  334. ext.id_p = kIqStreamAck;
  335. ext.data_p = @"";
  336. iq.extension = ext;
  337. return iq;
  338. }
  339. - (void)sendStreamAck {
  340. GtalkIqStanza *iq = [[self class] createStreamAck];
  341. [self sendProto:iq];
  342. }
  343. - (void)sendClose {
  344. [self sendProto:[[GtalkClose alloc] init]];
  345. }
  346. - (void)handleIqStanza:(GtalkIqStanza *)iq {
  347. if (iq.hasExtension) {
  348. if (iq.extension.id_p == kIqStreamAck) {
  349. [self didReceiveStreamAck:iq];
  350. return;
  351. }
  352. if (iq.extension.id_p == kIqSelectiveAck) {
  353. [self didReceiveSelectiveAck:iq];
  354. return;
  355. }
  356. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection009, @"Unknown ack extension id %d.",
  357. iq.extension.id_p);
  358. } else {
  359. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection010, @"Ip stanza without extension.");
  360. }
  361. [self didReceiveUnhandledProto:iq];
  362. }
  363. - (void)didReceiveLoginResponse:(GtalkLoginResponse *)loginResponse {
  364. if (loginResponse.hasError) {
  365. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection011,
  366. @"Login error with type: %@, message: %@.", loginResponse.error.type,
  367. loginResponse.error.message);
  368. return;
  369. }
  370. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection012, @"Logged onto MCS service.");
  371. // We sent the persisted list of unack'd messages with login so we can assume they have been ack'd
  372. // by the server.
  373. _FIRMessagingDevAssert(self.unackedS2dIds.count == 0, @"No ids present");
  374. _FIRMessagingDevAssert(self.outStreamId == 1, @"Login should be the first stream id");
  375. self.state = kFIRMessagingConnectionSignedIn;
  376. self.lastLoginServerTimestamp = loginResponse.serverTimestamp;
  377. [self.delegate didLoginWithConnection:self];
  378. [self sendHeartbeatPing];
  379. // Add all the TTL=0 messages on connect
  380. for (GPBMessage *message in self.sendOnConnectMessages) {
  381. [self sendProto:message];
  382. }
  383. [self.sendOnConnectMessages removeAllObjects];
  384. }
  385. - (void)didReceiveHeartbeatPing:(GtalkHeartbeatPing *)heartbeatPing {
  386. [self sendHeartbeatAck];
  387. }
  388. - (void)didReceiveHeartbeatAck:(GtalkHeartbeatAck *)heartbeatAck {
  389. #if FIRMessaging_PROBER
  390. self.lastHeartbeatPingTimestamp = FIRMessagingCurrentTimestampInSeconds();
  391. #endif
  392. }
  393. - (void)didReceiveDataMessageStanza:(GtalkDataMessageStanza *)dataMessageStanza {
  394. // TODO: Maybe add support raw data later
  395. [self.delegate connectionDidRecieveMessage:dataMessageStanza];
  396. }
  397. - (void)didReceiveUnhandledProto:(GPBMessage *)proto {
  398. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection013, @"Received unhandled proto");
  399. }
  400. - (void)didReceiveStreamAck:(GtalkIqStanza *)iq {
  401. // Server received some stuff from us we don't really need to do anything special
  402. }
  403. - (void)didReceiveSelectiveAck:(GtalkIqStanza *)iq {
  404. GtalkExtension *extension = iq.extension;
  405. if (extension) {
  406. int extensionId = extension.id_p;
  407. if (extensionId == kIqSelectiveAck) {
  408. NSString *dataString = extension.data_p;
  409. GtalkSelectiveAck *selectiveAck = [[GtalkSelectiveAck alloc] init];
  410. [selectiveAck mergeFromData:[dataString dataUsingEncoding:NSUTF8StringEncoding]
  411. extensionRegistry:nil];
  412. NSArray <NSString *>*acks = [selectiveAck idArray];
  413. // we've received ACK's
  414. [self.delegate connectionDidReceiveAckForRmqIds:acks];
  415. // resend unacked messages
  416. [self.dataMessageManager resendMessagesWithConnection:self];
  417. }
  418. }
  419. }
  420. - (void)didReceiveClose:(GtalkClose *)close {
  421. [self disconnect];
  422. }
  423. - (void)willProcessProto:(GPBMessage *)proto {
  424. self.inStreamId++;
  425. if ([proto isKindOfClass:GtalkDataMessageStanza.class]) {
  426. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection014,
  427. @"RMQ: Receiving %@ with rmq_id: %@ incoming stream Id: %d",
  428. proto.class, FIRMessagingGetRmq2Id(proto), self.inStreamId);
  429. } else {
  430. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection015,
  431. @"RMQ: Receiving %@ with incoming stream Id: %d.", proto.class,
  432. self.inStreamId);
  433. }
  434. int streamId = FIRMessagingGetLastStreamId(proto);
  435. if (streamId != kInvalidStreamId) {
  436. // confirm the D2S messages that were sent by us
  437. [self confirmAckedD2sIdsWithStreamId:streamId];
  438. // We can now confirm that our ack was received by the server and start our unack'd list fresh
  439. // with the proto we just received.
  440. [self confirmAckedS2dIdsWithStreamId:streamId];
  441. }
  442. NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
  443. if (rmq2Id != nil) {
  444. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection016,
  445. @"RMQ: Add unacked persistent Id: %@.",
  446. [rmq2Id stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  447. [self.unackedS2dIds addObject:rmq2Id];
  448. [self.rmq2Manager saveS2dMessageWithRmqId:rmq2Id]; // RMQ save
  449. }
  450. BOOL explicitAck = ([proto isKindOfClass:[GtalkDataMessageStanza class]] &&
  451. [(GtalkDataMessageStanza *)proto immediateAck]);
  452. // If we have not sent anything and the ack threshold has been reached then explicitly send one
  453. // to notify the server that we have received messages.
  454. if (self.inStreamId - self.lastStreamIdAcked >= kAckingInterval || explicitAck) {
  455. [self sendStreamAck];
  456. }
  457. }
  458. - (void)willSendProto:(GPBMessage *)proto {
  459. self.outStreamId++;
  460. NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
  461. if ([rmq2Id length]) {
  462. FIRMessagingD2SInfo *d2sInfo = [[FIRMessagingD2SInfo alloc] initWithStreamId:self.outStreamId d2sId:rmq2Id];
  463. [self.d2sInfos addObject:d2sInfo];
  464. }
  465. // each time we send a d2s message, it acks previously received
  466. // s2d messages via the last (s2d) stream id received.
  467. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection017,
  468. @"RMQ: Sending %@ with outgoing stream Id: %d.", proto.class,
  469. self.outStreamId);
  470. // We have received messages since last time we sent something - send ack info to server.
  471. if (self.inStreamId > self.lastStreamIdAcked) {
  472. FIRMessagingSetLastStreamId(proto, self.inStreamId);
  473. self.lastStreamIdAcked = self.inStreamId;
  474. }
  475. if (self.unackedS2dIds.count > 0) {
  476. // Move all 'unack'd' messages to the ack'd map so they can be removed once the
  477. // ack is confirmed.
  478. NSArray *ackedS2dIds = [NSArray arrayWithArray:self.unackedS2dIds];
  479. FIRMessagingLoggerDebug(
  480. kFIRMessagingMessageCodeConnection018, @"RMQ: Mark persistent Ids as acked: %@.",
  481. [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  482. [self.unackedS2dIds removeAllObjects];
  483. self.ackedS2dMap[[@(self.outStreamId) stringValue]] = ackedS2dIds;
  484. }
  485. }
  486. #pragma mark - Private
  487. /**
  488. * This processes the s2d message received in reference to the d2s messages
  489. * that we have sent before.
  490. */
  491. - (void)confirmAckedD2sIdsWithStreamId:(int)lastReceivedStreamId {
  492. NSMutableArray *d2sIdsAcked = [NSMutableArray array];
  493. for (FIRMessagingD2SInfo *d2sInfo in self.d2sInfos) {
  494. if (lastReceivedStreamId < d2sInfo.streamId) {
  495. break;
  496. }
  497. [d2sIdsAcked addObject:d2sInfo];
  498. }
  499. NSMutableArray *rmqIds = [NSMutableArray arrayWithCapacity:[d2sIdsAcked count]];
  500. // remove ACK'ed messages
  501. for (FIRMessagingD2SInfo *d2sInfo in d2sIdsAcked) {
  502. if ([d2sInfo.d2sID length]) {
  503. [rmqIds addObject:d2sInfo.d2sID];
  504. }
  505. [self.d2sInfos removeObject:d2sInfo];
  506. }
  507. [self.delegate connectionDidReceiveAckForRmqIds:rmqIds];
  508. int count = [self.delegate connectionDidReceiveAckForRmqIds:rmqIds];
  509. if (kMessageRemoveAckThresholdCount > 0 && count >= kMessageRemoveAckThresholdCount) {
  510. // For short lived connections, if a large number of messages are removed, send an
  511. // ack straight away so the server knows that this message was received.
  512. [self sendStreamAck];
  513. }
  514. }
  515. /**
  516. * Called when a stream ACK or a selective ACK are received - this indicates the message has
  517. * been received by MCS.
  518. */
  519. - (void)didReceiveAckForRmqIds:(NSArray *)rmqIds {
  520. // TODO: let the user know that the following messages were received by the server
  521. }
  522. - (void)confirmAckedS2dIdsWithStreamId:(int)lastReceivedStreamId {
  523. // If the server hasn't received the streamId yet.
  524. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection019,
  525. @"RMQ: Server last received stream Id: %d.", lastReceivedStreamId);
  526. if (lastReceivedStreamId < self.outStreamId) {
  527. // TODO: This could be a good indicator that we need to re-send something (acks)?
  528. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection020,
  529. @"RMQ: There are unsent messages that should be send...\n"
  530. "server received: %d\nlast stream id sent: %d",
  531. lastReceivedStreamId, self.outStreamId);
  532. }
  533. NSSet *ackedStreamIds =
  534. [self.ackedS2dMap keysOfEntriesPassingTest:^BOOL(id key, id obj, BOOL *stop) {
  535. NSString *streamId = key;
  536. return streamId.intValue <= lastReceivedStreamId;
  537. }];
  538. NSMutableArray *s2dIdsToDelete = [NSMutableArray array];
  539. for (NSString *streamId in ackedStreamIds) {
  540. NSArray *ackedS2dIds = self.ackedS2dMap[streamId];
  541. if (ackedS2dIds.count > 0) {
  542. FIRMessagingLoggerDebug(
  543. kFIRMessagingMessageCodeConnection021,
  544. @"RMQ: Mark persistent Ids as confirmed by stream id %@: %@.", streamId,
  545. [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  546. [self.ackedS2dMap removeObjectForKey:streamId];
  547. }
  548. [s2dIdsToDelete addObjectsFromArray:ackedS2dIds];
  549. }
  550. // clean up s2d ids that the server knows we've received.
  551. // we let the server know via a s2d last stream id received in a
  552. // d2s message. the server lets us know it has received our d2s
  553. // message via a d2s last stream id received in a s2d message.
  554. [self.rmq2Manager removeS2dIds:s2dIdsToDelete];
  555. }
  556. - (void)resetUnconfirmedAcks {
  557. [self.ackedS2dMap enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {
  558. [self.unackedS2dIds addObjectsFromArray:obj];
  559. }];
  560. [self.ackedS2dMap removeAllObjects];
  561. }
  562. - (void)disconnect {
  563. _FIRMessagingDevAssert(self.state != kFIRMessagingConnectionNotConnected, @"Connection already not connected");
  564. // cancel pending timeout tasks.
  565. [self cancelConnectionTimeoutTask];
  566. // cancel pending heartbeat.
  567. [NSObject cancelPreviousPerformRequestsWithTarget:self
  568. selector:@selector(sendHeartbeatPing)
  569. object:nil];
  570. // Unset the delegate. FIRMessagingConnection will not receive further events from the socket from now on.
  571. self.socket.delegate = nil;
  572. [self.socket disconnect];
  573. self.state = kFIRMessagingConnectionNotConnected;
  574. }
  575. - (void)connectionTimedOut {
  576. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection022,
  577. @"Connection to FIRMessaging service timed out.");
  578. [self disconnect];
  579. [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonTimeout];
  580. }
  581. - (void)scheduleConnectionTimeoutTask {
  582. // cancel the previous heartbeat timeout event and schedule a new one.
  583. [self cancelConnectionTimeoutTask];
  584. [self performSelector:@selector(connectionTimedOut)
  585. withObject:nil
  586. afterDelay:[self connectionTimeoutInterval]];
  587. }
  588. - (void)cancelConnectionTimeoutTask {
  589. // cancel pending timeout tasks.
  590. [NSObject cancelPreviousPerformRequestsWithTarget:self
  591. selector:@selector(connectionTimedOut)
  592. object:nil];
  593. }
  594. - (void)logMessage:(NSString *)description messageType:(int)messageType isOut:(BOOL)isOut {
  595. messageType = isOut ? -messageType : messageType;
  596. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection023,
  597. @"Send msg: %@ type: %d inStreamId: %d outStreamId: %d", description,
  598. messageType, self.inStreamId, self.outStreamId);
  599. }
  600. - (NSTimeInterval)connectionTimeoutInterval {
  601. return kConnectionTimeout;
  602. }
  603. @end