|
|
/* * Copyright 2017 Google * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
#import "FIRMessagingRmqManager.h"
#import <sqlite3.h>
#import "FIRMessagingDefines.h" #import "FIRMessagingLogger.h" #import "FIRMessagingRmq2PersistentStore.h" #import "FIRMessagingUtilities.h" #import "Protos/GtalkCore.pbobjc.h"
#ifndef _FIRMessagingRmqLogAndExit #define _FIRMessagingRmqLogAndExit(stmt, return_value) \ do { \ [self logErrorAndFinalizeStatement:stmt]; \ return return_value; \ } while(0) #endif
static NSString *const kFCMRmqTag = @"FIRMessagingRmq:";
@interface FIRMessagingRmqManager ()
@property(nonatomic, readwrite, strong) FIRMessagingRmq2PersistentStore *rmq2Store; // map the category of an outgoing message with the number of messages for that category // should always have two keys -- the app, gcm @property(nonatomic, readwrite, strong) NSMutableDictionary *outstandingMessages;
// Outgoing RMQ persistent id @property(nonatomic, readwrite, assign) int64_t rmqId;
@end
@implementation FIRMessagingRmqManager
- (instancetype)initWithDatabaseName:(NSString *)databaseName { self = [super init]; if (self) { _FIRMessagingDevAssert([databaseName length] > 0, @"RMQ: Invalid rmq db name"); _rmq2Store = [[FIRMessagingRmq2PersistentStore alloc] initWithDatabaseName:databaseName]; _outstandingMessages = [NSMutableDictionary dictionaryWithCapacity:2]; _rmqId = -1; } return self; }
- (void)loadRmqId { if (self.rmqId >= 0) { return; // already done }
[self loadInitialOutgoingPersistentId]; if (self.outstandingMessages.count) { FIRMessagingLoggerDebug(kFIRMessagingMessageCodeRmqManager000, @"%@: outstanding categories %ld", kFCMRmqTag, _FIRMessaging_UL(self.outstandingMessages.count)); } }
/** * Initialize the 'initial RMQ': * - max ID of any message in the queue * - if the queue is empty, stored value in separate DB. * * Stream acks will remove from RMQ, when we remove the highest message we keep track * of its ID. */ - (void)loadInitialOutgoingPersistentId {
// we shouldn't always trust the lastRmqId stored in the LastRmqId table, because // we only save to the LastRmqId table once in a while (after getting the lastRmqId sent // by the server after reconnect, and after getting a rmq ack from the server). The // rmq message with the highest rmq id tells the real story, so check against that first.
int64_t rmqId = [self queryHighestRmqId]; if (rmqId == 0) { rmqId = [self querylastRmqId]; } self.rmqId = rmqId + 1; }
#pragma mark - Save
/** * Save a message to RMQ2. Will populate the rmq2 persistent ID. */ - (BOOL)saveRmqMessage:(GPBMessage *)message error:(NSError **)error { // send using rmq2manager // the wire format of rmq2 id is a string. However, we keep it as a long internally // in the database. So only convert the id to string when preparing for sending over // the wire. NSString *rmq2Id = FIRMessagingGetRmq2Id(message); if (![rmq2Id length]) { int64_t rmqId = [self nextRmqId]; rmq2Id = [NSString stringWithFormat:@"%lld", rmqId]; FIRMessagingSetRmq2Id(message, rmq2Id); } FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(message); return [self saveMessage:message withRmqId:[rmq2Id integerValue] tag:tag error:error]; }
- (BOOL)saveMessage:(GPBMessage *)message withRmqId:(int64_t)rmqId tag:(int8_t)tag error:(NSError **)error { NSData *data = [message data]; return [self.rmq2Store saveMessageWithRmqId:rmqId tag:tag data:data error:error]; }
/** * This is called when we delete the largest outgoing message from queue. */ - (void)saveLastOutgoingRmqId:(int64_t)rmqID { [self.rmq2Store updateLastOutgoingRmqId:rmqID]; }
- (BOOL)saveS2dMessageWithRmqId:(NSString *)rmqID { return [self.rmq2Store saveUnackedS2dMessageWithRmqId:rmqID]; }
#pragma mark - Query
- (int64_t)queryHighestRmqId { return [self.rmq2Store queryHighestRmqId]; }
- (int64_t)querylastRmqId { return [self.rmq2Store queryLastRmqId]; }
- (NSArray *)unackedS2dRmqIds { return [self.rmq2Store unackedS2dRmqIds]; }
#pragma mark - FIRMessagingRMQScanner protocol
/** * We don't have a 'getMessages' method - it would require loading in memory * the entire content body of all messages. * * Instead we iterate and call 'resend' for each message. * * This is called: * - on connect MCS, to resend any outstanding messages * - init */ - (void)scanWithRmqMessageHandler:(FIRMessagingRmqMessageHandler)rmqMessageHandler dataMessageHandler:(FIRMessagingDataMessageHandler)dataMessageHandler { // no need to scan database with no callbacks if (rmqMessageHandler || dataMessageHandler) { [self.rmq2Store scanOutgoingRmqMessagesWithHandler:^(int64_t rmqId, int8_t tag, NSData *data) { if (rmqMessageHandler != nil) { rmqMessageHandler(rmqId, tag, data); } if (dataMessageHandler != nil && kFIRMessagingProtoTagDataMessageStanza == tag) { GPBMessage *proto = [FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag) parseFromData:data error:NULL]; GtalkDataMessageStanza *stanza = (GtalkDataMessageStanza *)proto; dataMessageHandler(rmqId, stanza); } }]; } }
#pragma mark - Remove
- (void)ackReceivedForRmqId:(NSString *)rmqId { // TODO: Optional book-keeping }
- (int)removeRmqMessagesWithRmqId:(NSString *)rmqId { return [self removeRmqMessagesWithRmqIds:@[rmqId]]; }
- (int)removeRmqMessagesWithRmqIds:(NSArray *)rmqIds { if (![rmqIds count]) { return 0; } for (NSString *rmqId in rmqIds) { [self ackReceivedForRmqId:rmqId]; } int64_t maxRmqId = -1; for (NSString *rmqId in rmqIds) { int64_t rmqIdValue = [rmqId longLongValue]; if (rmqIdValue > maxRmqId) { maxRmqId = rmqIdValue; } } maxRmqId++; if (maxRmqId >= self.rmqId) { [self saveLastOutgoingRmqId:maxRmqId]; } return [self.rmq2Store deleteMessagesFromTable:kTableOutgoingRmqMessages withRmqIds:rmqIds]; }
- (void)removeS2dIds:(NSArray *)s2dIds { [self.rmq2Store deleteMessagesFromTable:kTableS2DRmqIds withRmqIds:s2dIds]; }
#pragma mark - Sync Messages
// TODO: RMQManager should also have a cache for all the sync messages // so we don't hit the DB each time. - (FIRMessagingPersistentSyncMessage *)querySyncMessageWithRmqID:(NSString *)rmqID { return [self.rmq2Store querySyncMessageWithRmqID:rmqID]; }
- (BOOL)deleteSyncMessageWithRmqID:(NSString *)rmqID { return [self.rmq2Store deleteSyncMessageWithRmqID:rmqID]; }
- (int)deleteExpiredOrFinishedSyncMessages:(NSError **)error { return [self.rmq2Store deleteExpiredOrFinishedSyncMessages:error]; }
- (BOOL)saveSyncMessageWithRmqID:(NSString *)rmqID expirationTime:(int64_t)expirationTime apnsReceived:(BOOL)apnsReceived mcsReceived:(BOOL)mcsReceived error:(NSError *__autoreleasing *)error { return [self.rmq2Store saveSyncMessageWithRmqID:rmqID expirationTime:expirationTime apnsReceived:apnsReceived mcsReceived:mcsReceived error:error]; }
- (BOOL)updateSyncMessageViaAPNSWithRmqID:(NSString *)rmqID error:(NSError **)error { return [self.rmq2Store updateSyncMessageViaAPNSWithRmqID:rmqID error:error]; }
- (BOOL)updateSyncMessageViaMCSWithRmqID:(NSString *)rmqID error:(NSError **)error { return [self.rmq2Store updateSyncMessageViaMCSWithRmqID:rmqID error:error]; }
#pragma mark - Testing
+ (void)removeDatabaseWithName:(NSString *)dbName { [FIRMessagingRmq2PersistentStore removeDatabase:dbName]; }
#pragma mark - Private
- (int64_t)nextRmqId { return ++self.rmqId; }
@end
|