/* * Copyright 2017 Google * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #import "Firebase/Messaging/FIRMessagingRmqManager.h" #import #import "Firebase/Messaging/FIRMessagingConstants.h" #import "Firebase/Messaging/FIRMessagingDefines.h" #import "Firebase/Messaging/FIRMessagingLogger.h" #import "Firebase/Messaging/FIRMessagingPersistentSyncMessage.h" #import "Firebase/Messaging/FIRMessagingUtilities.h" #import "Firebase/Messaging/NSError+FIRMessaging.h" #import "Firebase/Messaging/Protos/GtalkCore.pbobjc.h" #ifndef _FIRMessagingRmqLogAndExit #define _FIRMessagingRmqLogAndExit(stmt, return_value) \ do { \ [self logErrorAndFinalizeStatement:stmt]; \ return return_value; \ } while(0) #endif #ifndef FIRMessagingRmqLogAndReturn #define FIRMessagingRmqLogAndReturn(stmt) \ do { \ [self logErrorAndFinalizeStatement:stmt]; \ return; \ } while(0) #endif #ifndef FIRMessaging_MUST_NOT_BE_MAIN_THREAD #define FIRMessaging_MUST_NOT_BE_MAIN_THREAD() \ do { \ NSAssert(![NSThread isMainThread], @"Must not be executing on the main thread."); \ } while (0); #endif // table names NSString *const kTableOutgoingRmqMessages = @"outgoingRmqMessages"; NSString *const kTableLastRmqId = @"lastrmqid"; NSString *const kOldTableS2DRmqIds = @"s2dRmqIds"; NSString *const kTableS2DRmqIds = @"s2dRmqIds_1"; // Used to prevent de-duping of sync messages received both via APNS and MCS. NSString *const kTableSyncMessages = @"incomingSyncMessages"; static NSString *const kTablePrefix = @""; // create tables static NSString *const kCreateTableOutgoingRmqMessages = @"create TABLE IF NOT EXISTS %@%@ " @"(_id INTEGER PRIMARY KEY, " @"rmq_id INTEGER, " @"type INTEGER, " @"ts INTEGER, " @"data BLOB)"; static NSString *const kCreateTableLastRmqId = @"create TABLE IF NOT EXISTS %@%@ " @"(_id INTEGER PRIMARY KEY, " @"rmq_id INTEGER)"; static NSString *const kCreateTableS2DRmqIds = @"create TABLE IF NOT EXISTS %@%@ " @"(_id INTEGER PRIMARY KEY, " @"rmq_id TEXT)"; static NSString *const kCreateTableSyncMessages = @"create TABLE IF NOT EXISTS %@%@ " @"(_id INTEGER PRIMARY KEY, " @"rmq_id TEXT, " @"expiration_ts INTEGER, " @"apns_recv INTEGER, " @"mcs_recv INTEGER)"; static NSString *const kDropTableCommand = @"drop TABLE if exists %@%@"; // table infos static NSString *const kRmqIdColumn = @"rmq_id"; static NSString *const kDataColumn = @"data"; static NSString *const kProtobufTagColumn = @"type"; static NSString *const kIdColumn = @"_id"; static NSString *const kOutgoingRmqMessagesColumns = @"rmq_id, type, data"; // Sync message columns static NSString *const kSyncMessagesColumns = @"rmq_id, expiration_ts, apns_recv, mcs_recv"; // Message time expiration in seconds since 1970 static NSString *const kSyncMessageExpirationTimestampColumn = @"expiration_ts"; static NSString *const kSyncMessageAPNSReceivedColumn = @"apns_recv"; static NSString *const kSyncMessageMCSReceivedColumn = @"mcs_recv"; // Utility to create an NSString from a sqlite3 result code NSString * _Nonnull FIRMessagingStringFromSQLiteResult(int result) { #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wunguarded-availability" const char *errorStr = sqlite3_errstr(result); #pragma clang diagnostic pop NSString *errorString = [NSString stringWithFormat:@"%d - %s", result, errorStr]; return errorString; } @interface FIRMessagingRmqManager () { sqlite3 *_database; /// Serial queue for database read/write operations. dispatch_queue_t _databaseOperationQueue; } @property(nonatomic, readwrite, strong) NSString *databaseName; // map the category of an outgoing message with the number of messages for that category // should always have two keys -- the app, gcm @property(nonatomic, readwrite, strong) NSMutableDictionary *outstandingMessages; // Outgoing RMQ persistent id @property(nonatomic, readwrite, assign) int64_t rmqId; @end @implementation FIRMessagingRmqManager - (instancetype)initWithDatabaseName:(NSString *)databaseName { self = [super init]; if (self) { _databaseOperationQueue = dispatch_queue_create("com.google.firebase.messaging.database.rmq", DISPATCH_QUEUE_SERIAL); _databaseName = [databaseName copy]; [self openDatabase]; _outstandingMessages = [NSMutableDictionary dictionaryWithCapacity:2]; _rmqId = -1; } return self; } - (void)dealloc { sqlite3_close(_database); } # pragma mark - RMQ ID - (void)loadRmqId { if (self.rmqId >= 0) { return; // already done } [self loadInitialOutgoingPersistentId]; if (self.outstandingMessages.count) { FIRMessagingLoggerDebug(kFIRMessagingMessageCodeRmqManager000, @"Outstanding categories %ld", _FIRMessaging_UL(self.outstandingMessages.count)); } } /** * Initialize the 'initial RMQ': * - max ID of any message in the queue * - if the queue is empty, stored value in separate DB. * * Stream acks will remove from RMQ, when we remove the highest message we keep track * of its ID. */ - (void)loadInitialOutgoingPersistentId { // we shouldn't always trust the lastRmqId stored in the LastRmqId table, because // we only save to the LastRmqId table once in a while (after getting the lastRmqId sent // by the server after reconnect, and after getting a rmq ack from the server). The // rmq message with the highest rmq id tells the real story, so check against that first. __block int64_t rmqId; dispatch_sync(_databaseOperationQueue, ^{ rmqId = [self queryHighestRmqId]; }); if (rmqId == 0) { dispatch_sync(_databaseOperationQueue, ^{ rmqId = [self queryLastRmqId]; }); } self.rmqId = rmqId + 1; } #pragma mark - Save /** * Save a message to RMQ2. Will populate the rmq2 persistent ID. */ - (void)saveRmqMessage:(GPBMessage *)message withCompletionHandler:(void(^)(BOOL success))handler { // send using rmq2manager // the wire format of rmq2 id is a string. However, we keep it as a long internally // in the database. So only convert the id to string when preparing for sending over // the wire. NSString *rmq2Id = FIRMessagingGetRmq2Id(message); if (![rmq2Id length]) { int64_t rmqId = [self nextRmqId]; rmq2Id = [NSString stringWithFormat:@"%lld", rmqId]; FIRMessagingSetRmq2Id(message, rmq2Id); } FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(message); NSData *data = [message data]; dispatch_async(_databaseOperationQueue, ^{ BOOL success = [self saveMessageWithRmqId:[rmq2Id integerValue] tag:tag data:data]; if (handler) { dispatch_async(dispatch_get_main_queue(), ^{ handler(success); }); } }); } /** * This is called when we delete the largest outgoing message from queue. */ - (void)saveLastOutgoingRmqId:(int64_t)rmqID { dispatch_async(_databaseOperationQueue, ^{ NSString *queryFormat = @"INSERT OR REPLACE INTO %@ (%@, %@) VALUES (?, ?)"; NSString *query = [NSString stringWithFormat:queryFormat, kTableLastRmqId, // table kIdColumn, kRmqIdColumn]; // columns sqlite3_stmt *statement; if (sqlite3_prepare_v2(self->_database, [query UTF8String], -1, &statement, NULL) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(statement); } if (sqlite3_bind_int(statement, 1, 1) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(statement); } if (sqlite3_bind_int64(statement, 2, rmqID) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(statement); } if (sqlite3_step(statement) != SQLITE_DONE) { FIRMessagingRmqLogAndReturn(statement); } sqlite3_finalize(statement); }); } - (void)saveS2dMessageWithRmqId:(NSString *)rmqId { dispatch_async(_databaseOperationQueue, ^{ NSString *insertFormat = @"INSERT INTO %@ (%@) VALUES (?)"; NSString *insertSQL = [NSString stringWithFormat:insertFormat, kTableS2DRmqIds, kRmqIdColumn]; sqlite3_stmt *insert_statement; if (sqlite3_prepare_v2(self->_database, [insertSQL UTF8String], -1, &insert_statement, NULL) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(insert_statement); } if (sqlite3_bind_text(insert_statement, 1, [rmqId UTF8String], (int)[rmqId length], SQLITE_STATIC) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(insert_statement); } if (sqlite3_step(insert_statement) != SQLITE_DONE) { FIRMessagingRmqLogAndReturn(insert_statement); } sqlite3_finalize(insert_statement); }); } #pragma mark - Query - (int64_t)queryHighestRmqId { NSString *queryFormat = @"SELECT %@ FROM %@ ORDER BY %@ DESC LIMIT %d"; NSString *query = [NSString stringWithFormat:queryFormat, kRmqIdColumn, // column kTableOutgoingRmqMessages, // table kRmqIdColumn, // order by column 1]; // limit sqlite3_stmt *statement; int64_t highestRmqId = 0; if (sqlite3_prepare_v2(_database, [query UTF8String], -1, &statement, NULL) != SQLITE_OK) { _FIRMessagingRmqLogAndExit(statement, highestRmqId); } if (sqlite3_step(statement) == SQLITE_ROW) { highestRmqId = sqlite3_column_int64(statement, 0); } sqlite3_finalize(statement); return highestRmqId; } - (int64_t)queryLastRmqId { NSString *queryFormat = @"SELECT %@ FROM %@ ORDER BY %@ DESC LIMIT %d"; NSString *query = [NSString stringWithFormat:queryFormat, kRmqIdColumn, // column kTableLastRmqId, // table kRmqIdColumn, // order by column 1]; // limit sqlite3_stmt *statement; int64_t lastRmqId = 0; if (sqlite3_prepare_v2(_database, [query UTF8String], -1, &statement, NULL) != SQLITE_OK) { _FIRMessagingRmqLogAndExit(statement, lastRmqId); } if (sqlite3_step(statement) == SQLITE_ROW) { lastRmqId = sqlite3_column_int64(statement, 0); } sqlite3_finalize(statement); return lastRmqId; } - (NSArray *)unackedS2dRmqIds { __block NSMutableArray *rmqIDArray = [NSMutableArray array]; dispatch_sync(_databaseOperationQueue, ^{ NSString *queryFormat = @"SELECT %@ FROM %@ ORDER BY %@ ASC"; NSString *query = [NSString stringWithFormat:queryFormat, kRmqIdColumn, kTableS2DRmqIds, kRmqIdColumn]; sqlite3_stmt *statement; if (sqlite3_prepare_v2(self->_database, [query UTF8String], -1, &statement, NULL) != SQLITE_OK) { FIRMessagingLoggerDebug(kFIRMessagingMessageCodeRmq2PersistentStore005, @"Could not find s2d ids"); FIRMessagingRmqLogAndReturn(statement); } while (sqlite3_step(statement) == SQLITE_ROW) { const char *rmqID = (char *)sqlite3_column_text(statement, 0); [rmqIDArray addObject:[NSString stringWithUTF8String:rmqID]]; } sqlite3_finalize(statement); }); return rmqIDArray; } #pragma mark - FIRMessagingRMQScanner protocol #pragma mark - Remove - (void)removeRmqMessagesWithRmqIds:(NSArray *)rmqIds { if (![rmqIds count]) { return; } int64_t maxRmqId = -1; for (NSString *rmqId in rmqIds) { int64_t rmqIdValue = [rmqId longLongValue]; if (rmqIdValue > maxRmqId) { maxRmqId = rmqIdValue; } } maxRmqId++; if (maxRmqId >= self.rmqId) { [self saveLastOutgoingRmqId:maxRmqId]; } [self deleteMessagesFromTable:kTableOutgoingRmqMessages withRmqIds:rmqIds]; } - (void)removeS2dIds:(NSArray *)s2dIds { [self deleteMessagesFromTable:kTableS2DRmqIds withRmqIds:s2dIds]; } #pragma mark - Sync Messages - (FIRMessagingPersistentSyncMessage *)querySyncMessageWithRmqID:(NSString *)rmqID { __block FIRMessagingPersistentSyncMessage *persistentMessage; dispatch_sync(_databaseOperationQueue, ^{ NSString *queryFormat = @"SELECT %@ FROM %@ WHERE %@ = '%@'"; NSString *query = [NSString stringWithFormat:queryFormat, kSyncMessagesColumns, // SELECT (rmq_id, expiration_ts, apns_recv, mcs_recv) kTableSyncMessages, // FROM sync_rmq kRmqIdColumn, // WHERE rmq_id rmqID]; sqlite3_stmt *stmt; if (sqlite3_prepare_v2(self->_database, [query UTF8String], -1, &stmt, NULL) != SQLITE_OK) { [self logError]; sqlite3_finalize(stmt); return; } const int rmqIDColumn = 0; const int expirationTimestampColumn = 1; const int apnsReceivedColumn = 2; const int mcsReceivedColumn = 3; int count = 0; while (sqlite3_step(stmt) == SQLITE_ROW) { NSString *rmqID = [NSString stringWithUTF8String:(char *)sqlite3_column_text(stmt, rmqIDColumn)]; int64_t expirationTimestamp = sqlite3_column_int64(stmt, expirationTimestampColumn); BOOL apnsReceived = sqlite3_column_int(stmt, apnsReceivedColumn); BOOL mcsReceived = sqlite3_column_int(stmt, mcsReceivedColumn); // create a new persistent message persistentMessage = [[FIRMessagingPersistentSyncMessage alloc] initWithRMQID:rmqID expirationTime:expirationTimestamp]; persistentMessage.apnsReceived = apnsReceived; persistentMessage.mcsReceived = mcsReceived; count++; } sqlite3_finalize(stmt); }); return persistentMessage; } - (void)deleteSyncMessageWithRmqID:(NSString *)rmqID { [self deleteMessagesFromTable:kTableSyncMessages withRmqIds:@[rmqID]]; } - (void)deleteExpiredOrFinishedSyncMessages { dispatch_async(_databaseOperationQueue, ^{ int64_t now = FIRMessagingCurrentTimestampInSeconds(); NSString *deleteSQL = @"DELETE FROM %@ " @"WHERE %@ < %lld OR " // expirationTime < now @"(%@ = 1 AND %@ = 1)"; // apns_received = 1 AND mcs_received = 1 NSString *query = [NSString stringWithFormat:deleteSQL, kTableSyncMessages, kSyncMessageExpirationTimestampColumn, now, kSyncMessageAPNSReceivedColumn, kSyncMessageMCSReceivedColumn]; sqlite3_stmt *stmt; if (sqlite3_prepare_v2(self->_database, [query UTF8String], -1, &stmt, NULL) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(stmt); } if (sqlite3_step(stmt) != SQLITE_DONE) { FIRMessagingRmqLogAndReturn(stmt); } sqlite3_finalize(stmt); int deleteCount = sqlite3_changes(self->_database); if (deleteCount > 0) { FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSyncMessageManager001, @"Successfully deleted %d sync messages from store", deleteCount); } }); } - (void)saveSyncMessageWithRmqID:(NSString *)rmqID expirationTime:(int64_t)expirationTime apnsReceived:(BOOL)apnsReceived mcsReceived:(BOOL)mcsReceived { dispatch_async(_databaseOperationQueue, ^{ NSString *insertFormat = @"INSERT INTO %@ (%@, %@, %@, %@) VALUES (?, ?, ?, ?)"; NSString *insertSQL = [NSString stringWithFormat:insertFormat, kTableSyncMessages, // Table name kRmqIdColumn, // rmq_id kSyncMessageExpirationTimestampColumn, // expiration_ts kSyncMessageAPNSReceivedColumn, // apns_recv kSyncMessageMCSReceivedColumn /* mcs_recv */]; sqlite3_stmt *stmt; if (sqlite3_prepare_v2(self->_database, [insertSQL UTF8String], -1, &stmt, NULL) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(stmt); } if (sqlite3_bind_text(stmt, 1, [rmqID UTF8String], (int)[rmqID length], NULL) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(stmt); } if (sqlite3_bind_int64(stmt, 2, expirationTime) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(stmt); } if (sqlite3_bind_int(stmt, 3, apnsReceived ? 1 : 0) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(stmt); } if (sqlite3_bind_int(stmt, 4, mcsReceived ? 1 : 0) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(stmt); } if (sqlite3_step(stmt) != SQLITE_DONE) { FIRMessagingRmqLogAndReturn(stmt); } sqlite3_finalize(stmt); FIRMessagingLoggerInfo(kFIRMessagingMessageCodeSyncMessageManager004, @"Added sync message to cache: %@", rmqID); }); } - (void)updateSyncMessageViaAPNSWithRmqID:(NSString *)rmqID { dispatch_async(_databaseOperationQueue, ^{ if (![self updateSyncMessageWithRmqID:rmqID column:kSyncMessageAPNSReceivedColumn value:YES]) { FIRMessagingLoggerError(kFIRMessagingMessageCodeSyncMessageManager005, @"Failed to update APNS state for sync message %@", rmqID); } }); } - (void)updateSyncMessageViaMCSWithRmqID:(NSString *)rmqID { dispatch_async(_databaseOperationQueue, ^{ if (![self updateSyncMessageWithRmqID:rmqID column:kSyncMessageMCSReceivedColumn value:YES]) { FIRMessagingLoggerError(kFIRMessagingMessageCodeSyncMessageManager006, @"Failed to update MCS state for sync message %@", rmqID); } }); } - (BOOL)updateSyncMessageWithRmqID:(NSString *)rmqID column:(NSString *)column value:(BOOL)value{ FIRMessaging_MUST_NOT_BE_MAIN_THREAD(); NSString *queryFormat = @"UPDATE %@ " // Table name @"SET %@ = %d " // column=value @"WHERE %@ = ?"; // condition NSString *query = [NSString stringWithFormat:queryFormat, kTableSyncMessages, column, value ? 1 : 0, kRmqIdColumn]; sqlite3_stmt *stmt; if (sqlite3_prepare_v2(_database, [query UTF8String], -1, &stmt, NULL) != SQLITE_OK) { _FIRMessagingRmqLogAndExit(stmt, NO); } if (sqlite3_bind_text(stmt, 1, [rmqID UTF8String], (int)[rmqID length], NULL) != SQLITE_OK) { _FIRMessagingRmqLogAndExit(stmt, NO); } if (sqlite3_step(stmt) != SQLITE_DONE) { _FIRMessagingRmqLogAndExit(stmt, NO); } sqlite3_finalize(stmt); return YES; } # pragma mark - Database - (NSString *)pathForDatabase { NSString *dbNameWithExtension = [NSString stringWithFormat:@"%@.sqlite", _databaseName]; NSArray *paths = NSSearchPathForDirectoriesInDomains(FIRMessagingSupportedDirectory(), NSUserDomainMask, YES); NSArray *components = @[ paths.lastObject, kFIRMessagingSubDirectoryName, dbNameWithExtension ]; return [NSString pathWithComponents:components]; } - (void)createTableWithName:(NSString *)tableName command:(NSString *)command { FIRMessaging_MUST_NOT_BE_MAIN_THREAD(); char *error; NSString *createDatabase = [NSString stringWithFormat:command, kTablePrefix, tableName]; if (sqlite3_exec(self->_database, [createDatabase UTF8String], NULL, NULL, &error) != SQLITE_OK) { // remove db before failing [self removeDatabase]; NSString *errorMessage = [NSString stringWithFormat:@"Couldn't create table: %@ %@", kCreateTableOutgoingRmqMessages, [NSString stringWithCString:error encoding:NSUTF8StringEncoding]]; FIRMessagingLoggerError(kFIRMessagingMessageCodeRmq2PersistentStoreErrorCreatingTable, @"%@", errorMessage); NSAssert(NO, errorMessage); } } - (void)dropTableWithName:(NSString *)tableName { FIRMessaging_MUST_NOT_BE_MAIN_THREAD(); char *error; NSString *dropTableSQL = [NSString stringWithFormat:kDropTableCommand, kTablePrefix, tableName]; if (sqlite3_exec(self->_database, [dropTableSQL UTF8String], NULL, NULL, &error) != SQLITE_OK) { FIRMessagingLoggerError(kFIRMessagingMessageCodeRmq2PersistentStore002, @"Failed to remove table %@", tableName); } } - (void)removeDatabase { // Ensure database is removed in a sync queue as this sometimes makes test have race conditions. dispatch_sync(_databaseOperationQueue, ^{ NSString *path = [self pathForDatabase]; [[NSFileManager defaultManager] removeItemAtPath:path error:nil]; }); } - (void)openDatabase { dispatch_async(_databaseOperationQueue, ^{ NSFileManager *fileManager = [NSFileManager defaultManager]; NSString *path = [self pathForDatabase]; BOOL didOpenDatabase = YES; if (![fileManager fileExistsAtPath:path]) { // We've to separate between different versions here because of backwards compatbility issues. int result = sqlite3_open_v2([path UTF8String], &self->_database, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FILEPROTECTION_NONE, NULL); if (result != SQLITE_OK) { NSString *errorString = FIRMessagingStringFromSQLiteResult(result); NSString *errorMessage = [NSString stringWithFormat:@"Could not open existing RMQ database at path %@, error: %@", path, errorString]; FIRMessagingLoggerError(kFIRMessagingMessageCodeRmq2PersistentStoreErrorOpeningDatabase, @"%@", errorMessage); NSAssert(NO, errorMessage); return; } [self createTableWithName:kTableOutgoingRmqMessages command:kCreateTableOutgoingRmqMessages]; [self createTableWithName:kTableLastRmqId command:kCreateTableLastRmqId]; [self createTableWithName:kTableS2DRmqIds command:kCreateTableS2DRmqIds]; } else { // Calling sqlite3_open should create the database, since the file doesn't exist. int result = sqlite3_open_v2([path UTF8String], &self->_database, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FILEPROTECTION_NONE, NULL); if (result != SQLITE_OK) { NSString *errorString = FIRMessagingStringFromSQLiteResult(result); NSString *errorMessage = [NSString stringWithFormat:@"Could not create RMQ database at path %@, error: %@", path, errorString]; FIRMessagingLoggerError(kFIRMessagingMessageCodeRmq2PersistentStoreErrorCreatingDatabase, @"%@", errorMessage); NSAssert(NO, errorMessage); didOpenDatabase = NO; } else { [self updateDBWithStringRmqID]; } } if (didOpenDatabase) { [self createTableWithName:kTableSyncMessages command:kCreateTableSyncMessages]; } }); } - (void)updateDBWithStringRmqID { dispatch_async(_databaseOperationQueue, ^{ [self createTableWithName:kTableS2DRmqIds command:kCreateTableS2DRmqIds]; [self dropTableWithName:kOldTableS2DRmqIds]; }); } #pragma mark - Scan /** * We don't have a 'getMessages' method - it would require loading in memory * the entire content body of all messages. * * Instead we iterate and call 'resend' for each message. * * This is called: * - on connect MCS, to resend any outstanding messages * - init */ - (void)scanWithRmqMessageHandler:(FIRMessagingRmqMessageHandler)rmqMessageHandler { dispatch_async(_databaseOperationQueue, ^{ NSMutableDictionary *messages = [NSMutableDictionary dictionary]; static NSString *queryFormat = @"SELECT %@ FROM %@ WHERE %@ != 0 ORDER BY %@ ASC"; NSString *query = [NSString stringWithFormat:queryFormat, kOutgoingRmqMessagesColumns, // select (rmq_id, type, data) kTableOutgoingRmqMessages, // from table kRmqIdColumn, // where kRmqIdColumn]; // order by sqlite3_stmt *statement; if (sqlite3_prepare_v2(self->_database, [query UTF8String], -1, &statement, NULL) != SQLITE_OK) { [self logError]; sqlite3_finalize(statement); if (rmqMessageHandler) { dispatch_async(dispatch_get_main_queue(),^{ rmqMessageHandler(messages); }); } } // can query sqlite3 for this but this is fine const int rmqIdColumnNumber = 0; const int typeColumnNumber = 1; const int dataColumnNumber = 2; while (sqlite3_step(statement) == SQLITE_ROW) { int64_t rmqId = sqlite3_column_int64(statement, rmqIdColumnNumber); int8_t type = sqlite3_column_int(statement, typeColumnNumber); const void *bytes = sqlite3_column_blob(statement, dataColumnNumber); int length = sqlite3_column_bytes(statement, dataColumnNumber); NSData *data = [NSData dataWithBytes:bytes length:length]; GPBMessage *proto = [FIRMessagingGetClassForTag((FIRMessagingProtoTag)type) parseFromData:data error:NULL]; [messages addEntriesFromDictionary:@{@(rmqId): proto}]; } sqlite3_finalize(statement); if (rmqMessageHandler) { dispatch_async(dispatch_get_main_queue(),^{ rmqMessageHandler(messages); }); } }); } #pragma mark - Private - (BOOL)saveMessageWithRmqId:(int64_t)rmqId tag:(int8_t)tag data:(NSData *)data { FIRMessaging_MUST_NOT_BE_MAIN_THREAD(); NSString *insertFormat = @"INSERT INTO %@ (%@, %@, %@) VALUES (?, ?, ?)"; NSString *insertSQL = [NSString stringWithFormat:insertFormat, kTableOutgoingRmqMessages, // table kRmqIdColumn, kProtobufTagColumn, kDataColumn /* columns */]; sqlite3_stmt *insert_statement; if (sqlite3_prepare_v2(self->_database, [insertSQL UTF8String], -1, &insert_statement, NULL) != SQLITE_OK) { _FIRMessagingRmqLogAndExit(insert_statement, NO); } if (sqlite3_bind_int64(insert_statement, 1, rmqId) != SQLITE_OK) { _FIRMessagingRmqLogAndExit(insert_statement, NO); } if (sqlite3_bind_int(insert_statement, 2, tag) != SQLITE_OK) { _FIRMessagingRmqLogAndExit(insert_statement, NO); } if (sqlite3_bind_blob(insert_statement, 3, [data bytes], (int)[data length], NULL) != SQLITE_OK) { _FIRMessagingRmqLogAndExit(insert_statement, NO); } if (sqlite3_step(insert_statement) != SQLITE_DONE) { _FIRMessagingRmqLogAndExit(insert_statement, NO); } sqlite3_finalize(insert_statement); return YES; } - (void)deleteMessagesFromTable:(NSString *)tableName withRmqIds:(NSArray *)rmqIds { dispatch_async(_databaseOperationQueue, ^{ BOOL isRmqIDString = NO; // RmqID is a string only for outgoing messages if ([tableName isEqualToString:kTableS2DRmqIds] || [tableName isEqualToString:kTableSyncMessages]) { isRmqIDString = YES; } NSMutableString *delete = [NSMutableString stringWithFormat:@"DELETE FROM %@ WHERE ", tableName]; NSString *toDeleteArgument = [NSString stringWithFormat:@"%@ = ? OR ", kRmqIdColumn]; int toDelete = (int)[rmqIds count]; if (toDelete == 0) { return; } int maxBatchSize = 100; int start = 0; int deleteCount = 0; while (start < toDelete) { // construct the WHERE argument int end = MIN(start + maxBatchSize, toDelete); NSMutableString *whereArgument = [NSMutableString string]; for (int i = start; i < end; i++) { [whereArgument appendString:toDeleteArgument]; } // remove the last * OR * from argument NSRange range = NSMakeRange([whereArgument length] -4, 4); [whereArgument deleteCharactersInRange:range]; NSString *deleteQuery = [NSString stringWithFormat:@"%@ %@", delete, whereArgument]; // sqlite update sqlite3_stmt *delete_statement; if (sqlite3_prepare_v2(self->_database, [deleteQuery UTF8String], -1, &delete_statement, NULL) != SQLITE_OK) { FIRMessagingRmqLogAndReturn(delete_statement); } // bind values int rmqIndex = 0; int placeholderIndex = 1; // placeholders in sqlite3 start with 1 for (NSString *rmqId in rmqIds) { // objectAtIndex: is O(n) -- would make it slow if (rmqIndex < start) { rmqIndex++; continue; } else if (rmqIndex >= end) { break; } else { if (isRmqIDString) { if (sqlite3_bind_text(delete_statement, placeholderIndex, [rmqId UTF8String], (int)[rmqId length], SQLITE_STATIC) != SQLITE_OK) { FIRMessagingLoggerDebug(kFIRMessagingMessageCodeRmq2PersistentStore003, @"Failed to bind rmqID %@", rmqId); FIRMessagingLoggerError(kFIRMessagingMessageCodeSyncMessageManager007, @"Failed to delete sync message %@", rmqId); continue; } } else { int64_t rmqIdValue = [rmqId longLongValue]; sqlite3_bind_int64(delete_statement, placeholderIndex, rmqIdValue); } placeholderIndex++; } rmqIndex++; FIRMessagingLoggerInfo(kFIRMessagingMessageCodeSyncMessageManager008, @"Successfully deleted sync message from cache %@", rmqId); } if (sqlite3_step(delete_statement) != SQLITE_DONE) { FIRMessagingRmqLogAndReturn(delete_statement); } sqlite3_finalize(delete_statement); deleteCount += sqlite3_changes(self->_database); start = end; } // if we are here all of our sqlite queries should have succeeded FIRMessagingLoggerDebug(kFIRMessagingMessageCodeRmq2PersistentStore004, @"Trying to delete %d s2D ID's, successfully deleted %d", toDelete, deleteCount); }); } - (int64_t)nextRmqId { return ++self.rmqId; } - (NSString *)lastErrorMessage { return [NSString stringWithFormat:@"%s", sqlite3_errmsg(_database)]; } - (int)lastErrorCode { return sqlite3_errcode(_database); } - (void)logError { FIRMessagingLoggerError(kFIRMessagingMessageCodeRmq2PersistentStore006, @"Error: code (%d) message: %@", [self lastErrorCode], [self lastErrorMessage]); } - (void)logErrorAndFinalizeStatement:(sqlite3_stmt *)stmt { [self logError]; sqlite3_finalize(stmt); } @end