You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

264 lines
8.2 KiB

/*
* Copyright 2017 Google
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#import "FIRMessagingRmqManager.h"
#import <sqlite3.h>
#import "FIRMessagingDefines.h"
#import "FIRMessagingLogger.h"
#import "FIRMessagingRmq2PersistentStore.h"
#import "FIRMessagingUtilities.h"
#import "Protos/GtalkCore.pbobjc.h"
#ifndef _FIRMessagingRmqLogAndExit
#define _FIRMessagingRmqLogAndExit(stmt, return_value) \
do { \
[self logErrorAndFinalizeStatement:stmt]; \
return return_value; \
} while(0)
#endif
static NSString *const kFCMRmqTag = @"FIRMessagingRmq:";
@interface FIRMessagingRmqManager ()
@property(nonatomic, readwrite, strong) FIRMessagingRmq2PersistentStore *rmq2Store;
// map the category of an outgoing message with the number of messages for that category
// should always have two keys -- the app, gcm
@property(nonatomic, readwrite, strong) NSMutableDictionary *outstandingMessages;
// Outgoing RMQ persistent id
@property(nonatomic, readwrite, assign) int64_t rmqId;
@end
@implementation FIRMessagingRmqManager
- (instancetype)initWithDatabaseName:(NSString *)databaseName {
self = [super init];
if (self) {
_FIRMessagingDevAssert([databaseName length] > 0, @"RMQ: Invalid rmq db name");
_rmq2Store = [[FIRMessagingRmq2PersistentStore alloc] initWithDatabaseName:databaseName];
_outstandingMessages = [NSMutableDictionary dictionaryWithCapacity:2];
_rmqId = -1;
}
return self;
}
- (void)loadRmqId {
if (self.rmqId >= 0) {
return; // already done
}
[self loadInitialOutgoingPersistentId];
if (self.outstandingMessages.count) {
FIRMessagingLoggerDebug(kFIRMessagingMessageCodeRmqManager000,
@"%@: outstanding categories %ld", kFCMRmqTag,
_FIRMessaging_UL(self.outstandingMessages.count));
}
}
/**
* Initialize the 'initial RMQ':
* - max ID of any message in the queue
* - if the queue is empty, stored value in separate DB.
*
* Stream acks will remove from RMQ, when we remove the highest message we keep track
* of its ID.
*/
- (void)loadInitialOutgoingPersistentId {
// we shouldn't always trust the lastRmqId stored in the LastRmqId table, because
// we only save to the LastRmqId table once in a while (after getting the lastRmqId sent
// by the server after reconnect, and after getting a rmq ack from the server). The
// rmq message with the highest rmq id tells the real story, so check against that first.
int64_t rmqId = [self queryHighestRmqId];
if (rmqId == 0) {
rmqId = [self querylastRmqId];
}
self.rmqId = rmqId + 1;
}
#pragma mark - Save
/**
* Save a message to RMQ2. Will populate the rmq2 persistent ID.
*/
- (BOOL)saveRmqMessage:(GPBMessage *)message
error:(NSError **)error {
// send using rmq2manager
// the wire format of rmq2 id is a string. However, we keep it as a long internally
// in the database. So only convert the id to string when preparing for sending over
// the wire.
NSString *rmq2Id = FIRMessagingGetRmq2Id(message);
if (![rmq2Id length]) {
int64_t rmqId = [self nextRmqId];
rmq2Id = [NSString stringWithFormat:@"%lld", rmqId];
FIRMessagingSetRmq2Id(message, rmq2Id);
}
FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(message);
return [self saveMessage:message withRmqId:[rmq2Id integerValue] tag:tag error:error];
}
- (BOOL)saveMessage:(GPBMessage *)message
withRmqId:(int64_t)rmqId
tag:(int8_t)tag
error:(NSError **)error {
NSData *data = [message data];
return [self.rmq2Store saveMessageWithRmqId:rmqId tag:tag data:data error:error];
}
/**
* This is called when we delete the largest outgoing message from queue.
*/
- (void)saveLastOutgoingRmqId:(int64_t)rmqID {
[self.rmq2Store updateLastOutgoingRmqId:rmqID];
}
- (BOOL)saveS2dMessageWithRmqId:(NSString *)rmqID {
return [self.rmq2Store saveUnackedS2dMessageWithRmqId:rmqID];
}
#pragma mark - Query
- (int64_t)queryHighestRmqId {
return [self.rmq2Store queryHighestRmqId];
}
- (int64_t)querylastRmqId {
return [self.rmq2Store queryLastRmqId];
}
- (NSArray *)unackedS2dRmqIds {
return [self.rmq2Store unackedS2dRmqIds];
}
#pragma mark - FIRMessagingRMQScanner protocol
/**
* We don't have a 'getMessages' method - it would require loading in memory
* the entire content body of all messages.
*
* Instead we iterate and call 'resend' for each message.
*
* This is called:
* - on connect MCS, to resend any outstanding messages
* - init
*/
- (void)scanWithRmqMessageHandler:(FIRMessagingRmqMessageHandler)rmqMessageHandler
dataMessageHandler:(FIRMessagingDataMessageHandler)dataMessageHandler {
// no need to scan database with no callbacks
if (rmqMessageHandler || dataMessageHandler) {
[self.rmq2Store scanOutgoingRmqMessagesWithHandler:^(int64_t rmqId, int8_t tag, NSData *data) {
if (rmqMessageHandler != nil) {
rmqMessageHandler(rmqId, tag, data);
}
if (dataMessageHandler != nil && kFIRMessagingProtoTagDataMessageStanza == tag) {
GPBMessage *proto =
[FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag) parseFromData:data error:NULL];
GtalkDataMessageStanza *stanza = (GtalkDataMessageStanza *)proto;
dataMessageHandler(rmqId, stanza);
}
}];
}
}
#pragma mark - Remove
- (void)ackReceivedForRmqId:(NSString *)rmqId {
// TODO: Optional book-keeping
}
- (int)removeRmqMessagesWithRmqId:(NSString *)rmqId {
return [self removeRmqMessagesWithRmqIds:@[rmqId]];
}
- (int)removeRmqMessagesWithRmqIds:(NSArray *)rmqIds {
if (![rmqIds count]) {
return 0;
}
for (NSString *rmqId in rmqIds) {
[self ackReceivedForRmqId:rmqId];
}
int64_t maxRmqId = -1;
for (NSString *rmqId in rmqIds) {
int64_t rmqIdValue = [rmqId longLongValue];
if (rmqIdValue > maxRmqId) {
maxRmqId = rmqIdValue;
}
}
maxRmqId++;
if (maxRmqId >= self.rmqId) {
[self saveLastOutgoingRmqId:maxRmqId];
}
return [self.rmq2Store deleteMessagesFromTable:kTableOutgoingRmqMessages withRmqIds:rmqIds];
}
- (void)removeS2dIds:(NSArray *)s2dIds {
[self.rmq2Store deleteMessagesFromTable:kTableS2DRmqIds withRmqIds:s2dIds];
}
#pragma mark - Sync Messages
// TODO: RMQManager should also have a cache for all the sync messages
// so we don't hit the DB each time.
- (FIRMessagingPersistentSyncMessage *)querySyncMessageWithRmqID:(NSString *)rmqID {
return [self.rmq2Store querySyncMessageWithRmqID:rmqID];
}
- (BOOL)deleteSyncMessageWithRmqID:(NSString *)rmqID {
return [self.rmq2Store deleteSyncMessageWithRmqID:rmqID];
}
- (int)deleteExpiredOrFinishedSyncMessages:(NSError **)error {
return [self.rmq2Store deleteExpiredOrFinishedSyncMessages:error];
}
- (BOOL)saveSyncMessageWithRmqID:(NSString *)rmqID
expirationTime:(int64_t)expirationTime
apnsReceived:(BOOL)apnsReceived
mcsReceived:(BOOL)mcsReceived
error:(NSError *__autoreleasing *)error {
return [self.rmq2Store saveSyncMessageWithRmqID:rmqID
expirationTime:expirationTime
apnsReceived:apnsReceived
mcsReceived:mcsReceived
error:error];
}
- (BOOL)updateSyncMessageViaAPNSWithRmqID:(NSString *)rmqID error:(NSError **)error {
return [self.rmq2Store updateSyncMessageViaAPNSWithRmqID:rmqID error:error];
}
- (BOOL)updateSyncMessageViaMCSWithRmqID:(NSString *)rmqID error:(NSError **)error {
return [self.rmq2Store updateSyncMessageViaMCSWithRmqID:rmqID error:error];
}
#pragma mark - Testing
+ (void)removeDatabaseWithName:(NSString *)dbName {
[FIRMessagingRmq2PersistentStore removeDatabase:dbName];
}
#pragma mark - Private
- (int64_t)nextRmqId {
return ++self.rmqId;
}
@end