Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
652e5e5
Offline storage: guard empty-filter delete + propagate SQLite store f…
bmehta001 Jun 22, 2026
325c55b
Add MemoryStorage empty-filter delete regression test
bmehta001 Jun 22, 2026
d9640b7
Address review comment: propagate synchronous disk store failures
bmehta001 Jun 22, 2026
f1b3381
Prevent event loss when a disk write fails during Flush()
bmehta001 Jun 23, 2026
45e9d55
Address Copilot comment: NoopTaskDispatcher::Cancel returns found-state
bmehta001 Jun 23, 2026
bab7b42
Address Copilot comments: rename flush test for precision
bmehta001 Jun 23, 2026
e9c7ee3
Merge remote-tracking branch 'msft/main' into bhamehta/fix-storage-da…
bmehta001 Jun 23, 2026
84e49a6
Fold the SQLite batch-flush optimization into the data-safety change …
bmehta001 Jun 23, 2026
e1e7c4e
Address Copilot: make StoreRecords fully all-or-nothing on invalid re…
bmehta001 Jun 23, 2026
40fd118
Address Copilot: re-queue the flush batch only on a zero store result
bmehta001 Jun 23, 2026
97fee8e
Merge branch 'main' into bhamehta/fix-storage-data-safety
bmehta001 Jun 24, 2026
937d3ac
Fix PrivacyGuard JNI UAF, RoInitialize leak, and missing low_battery …
bmehta001 Jul 2, 2026
82cffa1
Fix GetAndReserveRecords data race (#1221) and SQLite shutdown leak (…
bmehta001 Jul 2, 2026
03cf210
Balance RoInitialize with an RAII guard (Copilot round-1)
bmehta001 Jul 2, 2026
2905ca7
Add test for low_battery transmit-profile powerState (#312)
bmehta001 Jul 2, 2026
e8db589
Guard checkpoint-on-flush against null disk storage (Copilot round-3)
bmehta001 Jul 2, 2026
62ffbbd
Merge branch 'main' into bhamehta/fix-storage-data-safety
bmehta001 Jul 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions lib/jni/PrivacyGuard_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,22 @@ Java_com_microsoft_applications_events_PrivacyGuard_nativeInitializePrivacyGuard
InitializationConfiguration config(
reinterpret_cast<ILogger*>(iLoggerNativePtr),
CommonDataContext{});
// InitializationConfiguration holds const char* pointers, so the backing
// std::string storage must outlive the PrivacyGuard construction below.
std::string notificationEventName, semanticContextEventName, summaryEventName;
if (NotificationEventName != nullptr) {
config.NotificationEventName = JStringToStdString(env, NotificationEventName).c_str();
notificationEventName = JStringToStdString(env, NotificationEventName);
config.NotificationEventName = notificationEventName.c_str();
}

if (SemanticContextEventName != nullptr) {
config.SemanticContextNotificationEventName = JStringToStdString(env, SemanticContextEventName).c_str();
semanticContextEventName = JStringToStdString(env, SemanticContextEventName);
config.SemanticContextNotificationEventName = semanticContextEventName.c_str();
}

if (SummaryEventName != nullptr) {
config.SummaryEventName = JStringToStdString(env, SummaryEventName).c_str();
summaryEventName = JStringToStdString(env, SummaryEventName);
config.SummaryEventName = summaryEventName.c_str();
}

config.UseEventFieldPrefix = static_cast<bool>(UseEventFieldPrefix);
Expand Down Expand Up @@ -119,16 +125,22 @@ Java_com_microsoft_applications_events_PrivacyGuard_nativeInitializePrivacyGuard
machineIds,
outOfScopeIdentifiers));

// InitializationConfiguration holds const char* pointers, so the backing
// std::string storage must outlive the PrivacyGuard construction below.
std::string notificationEventName, semanticContextEventName, summaryEventName;
if (NotificationEventName != NULL) {
config.NotificationEventName = JStringToStdString(env, NotificationEventName).c_str();
notificationEventName = JStringToStdString(env, NotificationEventName);
config.NotificationEventName = notificationEventName.c_str();
}

if (SemanticContextEventName != NULL) {
config.SemanticContextNotificationEventName = JStringToStdString(env, SemanticContextEventName).c_str();
semanticContextEventName = JStringToStdString(env, SemanticContextEventName);
config.SemanticContextNotificationEventName = semanticContextEventName.c_str();
}

if (SummaryEventName != NULL) {
config.SummaryEventName = JStringToStdString(env, SummaryEventName).c_str();
summaryEventName = JStringToStdString(env, SummaryEventName);
config.SummaryEventName = summaryEventName.c_str();
}

config.UseEventFieldPrefix = static_cast<bool>(UseEventFieldPrefix);
Expand Down
10 changes: 10 additions & 0 deletions lib/offline/MemoryStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,16 @@ namespace MAT_NS_BEGIN {

void MemoryStorage::DeleteRecords(const std::map<std::string, std::string> & whereFilter)
{
// An empty filter matches every record. Never silently wipe the whole
// in-memory queue from a no-op predicate; callers must use
// DeleteAllRecords() for an intentional full clear. This mirrors the
// fail-closed behavior of OfflineStorage_SQLite::DeleteRecords.
if (whereFilter.empty())
{
LOG_WARN("DeleteRecords called with an empty filter; ignoring to avoid deleting all records.");
return;
}

auto matcher = [&](const StorageRecord &r, const std::map<std::string, std::string> & whereFilter)
{
bool matched = true;
Expand Down
49 changes: 30 additions & 19 deletions lib/offline/OfflineStorageHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,28 +174,37 @@ namespace MAT_NS_BEGIN {
// than the handle gets replaced by nullptr in this DeferredCallbackHandle obj.
m_flushHandle.Cancel();

size_t dbSizeBeforeFlush = m_offlineStorageMemory->GetSize();
size_t dbSizeBeforeFlush = (m_offlineStorageMemory != nullptr) ? m_offlineStorageMemory->GetSize() : 0;
if ((m_offlineStorageMemory) && (dbSizeBeforeFlush > 0) && (m_offlineStorageDisk))
{
// This will block on and then take a lock for the duration of this move, and
// StoreRecord() will then block until the move completes.
// Drain the in-memory queue into a local batch. Records are removed
// from memory here; any that fail to persist below are re-inserted, so
// a disk write failure does not silently lose events. Draining (rather
// than reserving) keeps only a single copy of each record in flight and
// avoids stamping a reservation lease that the Room backend would
// persist to disk.
auto records = m_offlineStorageMemory->GetRecords(false, EventLatency_Unspecified);
std::vector<StorageRecordId> ids;

// TODO: [MG] - consider running the batch in transaction
// if (sqlite)
// sqlite->Execute("BEGIN");

// Persist the whole batch to disk in a single transaction. The disk
// StoreRecords() is all-or-nothing on both backends: it returns the
// full count on success, or 0 if nothing was committed (SQLite rolls
// the transaction back; Room returns 0 on a failed JNI batch). So a
// zero result means nothing was persisted -- return every record to
// the in-memory queue for retry. No events are lost, and there are no
// duplicates because a failed batch leaves nothing on disk.
// (We key off == 0 rather than < size so that a non-zero-but-capped
// count -- only possible for batches larger than the RAM queue can
// ever hold -- is not mistaken for a failure.)
size_t totalSaved = m_offlineStorageDisk->StoreRecords(records);

// TODO: [MG] - consider running the batch in transaction
// if (sqlite)
// sqlite->Execute("END");

// Delete records from reserved on flush
HttpHeaders dummy;
bool fromMemory = true;
m_offlineStorageMemory->DeleteRecords(ids, dummy, fromMemory);
if (totalSaved == 0 && !records.empty())
{
LOG_WARN("Flush: disk store failed for the batch of %zu records; returned to the queue for retry",
records.size());
for (auto& record : records)
{
m_offlineStorageMemory->StoreRecord(record);
}
}

// Notify event listener about the records cached
OnStorageRecordsSaved(totalSaved);
Comment thread
bmehta001 marked this conversation as resolved.
Expand All @@ -210,7 +219,7 @@ namespace MAT_NS_BEGIN {
}

// Checkpoint DB
if (m_config.HasConfig(CFG_BOOL_CHECKPOINT_DB_ON_FLUSH) && m_config[CFG_BOOL_CHECKPOINT_DB_ON_FLUSH])
if (m_offlineStorageDisk && m_config.HasConfig(CFG_BOOL_CHECKPOINT_DB_ON_FLUSH) && m_config[CFG_BOOL_CHECKPOINT_DB_ON_FLUSH])
{
m_offlineStorageDisk->Flush();
}
Expand Down Expand Up @@ -269,7 +278,9 @@ namespace MAT_NS_BEGIN {
{
if (record.persistence != EventPersistence::EventPersistence_DoNotStoreOnDisk)
{
m_offlineStorageDisk->StoreRecord(record);
// Propagate a synchronous disk write failure to the caller so a
// failed store is not counted as successfully persisted.
return m_offlineStorageDisk->StoreRecord(record);
Comment thread
bmehta001 marked this conversation as resolved.
Comment thread
bmehta001 marked this conversation as resolved.
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/offline/OfflineStorageHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ namespace MAT_NS_BEGIN {
std::unique_ptr<IOfflineStorage> m_offlineStorageMemory;
std::shared_ptr<IOfflineStorage> m_offlineStorageDisk;

bool m_readFromMemory;
unsigned m_lastReadCount;
std::atomic<bool> m_readFromMemory;
std::atomic<unsigned> m_lastReadCount;

bool m_shutdownStarted;
unsigned m_memoryDbSize;
Expand Down
181 changes: 152 additions & 29 deletions lib/offline/OfflineStorage_SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace MAT_NS_BEGIN {

class DbTransaction {
SqliteDB* m_db;
bool m_rollback = false;
public:
bool locked;

Expand All @@ -34,11 +35,24 @@ namespace MAT_NS_BEGIN {
}
}

// Discard the transaction (ROLLBACK) instead of committing it on destruction.
void markForRollback()
{
m_rollback = true;
}

~DbTransaction()
{
if (locked)
{
m_db->unlock();
if (m_rollback)
{
m_db->rollback();
}
else
{
m_db->unlock();
}
}
}
};
Expand Down Expand Up @@ -147,40 +161,31 @@ namespace MAT_NS_BEGIN {
m_db->execute(command.c_str());
}

bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record)
bool OfflineStorage_SQLite::isValidRecord(StorageRecord const& record) const
{
// TODO: [MG] - this works, but may not play nicely with several LogManager instances
// static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data);

if (record.id.empty() || record.tenantToken.empty() || static_cast<int>(record.latency) < 0 || record.timestamp <= 0) {
LOG_ERROR("Failed to store event %s:%s: Invalid parameters",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Invalid parameters");
return false;
}
return true;
}

if (!m_db) {
LOG_ERROR("Failed to store event %s:%s: Database is not open",
bool OfflineStorage_SQLite::insertRecordUnsafe(StorageRecord const& record)
{
if (!SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob))
{
LOG_ERROR("Failed to store event %s:%s: database write failed",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageOpenFailed("Database is not open");
return false;
}
m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size();
return true;
}

{
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Database error");
return false;
}
#endif
SqliteStatement(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data).execute(record.id, record.tenantToken, static_cast<int>(record.latency), static_cast<int>(record.persistence), record.timestamp, record.blob);
m_DbSizeEstimate += record.id.size() + record.tenantToken.size() + record.blob.size();
}

void OfflineStorage_SQLite::checkStorageSizeLimits()
{
if ((m_DbSizeNotificationLimit != 0) && (m_DbSizeEstimate>m_DbSizeNotificationLimit))
{
auto now = PAL::getMonotonicTimeMs();
Expand Down Expand Up @@ -210,20 +215,138 @@ namespace MAT_NS_BEGIN {
m_resizing = false;
}
}
}

return true;
bool OfflineStorage_SQLite::StoreRecord(StorageRecord const& record)
{
// TODO: [MG] - this works, but may not play nicely with several LogManager instances
// static SqliteStatement sql_insert(*m_db, m_stmtInsertEvent_id_tenant_prio_ts_data);

if (!isValidRecord(record)) {
return false;
}

if (!m_db) {
LOG_ERROR("Failed to store event %s:%s: Database is not open",
tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageOpenFailed("Database is not open");
return false;
}

bool stored = false;
{
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store event %s:%s: Database error", tenantTokenToId(record.tenantToken).c_str(), record.id.c_str());
m_observer->OnStorageFailed("Database error");
return false;
}
#endif
stored = insertRecordUnsafe(record);
}

if (!stored) {
// Report the write failure after the transaction has closed, so the
// observer callback never runs while BEGIN EXCLUSIVE is held.
m_observer->OnStorageFailed("Database write failed");
}

// Run the size-limit check after the transaction, matching the original
// per-record path (which ran it on every StoreRecord call).
checkStorageSizeLimits();

return stored;

}

size_t OfflineStorage_SQLite::StoreRecords(std::vector<StorageRecord> & records)
{
size_t stored = 0;
for (auto & i : records) {
if (StoreRecord(i)) {
++stored;
if (records.empty()) {
return 0;
}

// Validate (and report rejects) up front -- before the DB-open check and
// the transaction -- so no observer callback runs while BEGIN EXCLUSIVE is
// held. The batch is all-or-nothing: if ANY record is invalid we store
// nothing and return 0, so a caller that re-queues the whole batch on a
// short return (e.g. Flush) can never duplicate records that would
// otherwise have been partially committed.
size_t validCount = 0;
for (auto const& i : records) {
if (isValidRecord(i)) {
++validCount;
}
}
return stored;

if (validCount == 0) {
// Every record was invalid (already reported above). Match the single
// StoreRecord(), which returns after validation without checking
// DB-open.
return 0;
}

if (!m_db) {
LOG_ERROR("Failed to store %zu events: Database is not open", records.size());
m_observer->OnStorageOpenFailed("Database is not open");
return 0;
}

if (validCount != records.size()) {
// At least one record was invalid (already reported). Store nothing so
// the batch stays all-or-nothing for the caller.
return 0;
}

size_t addedSize = 0;
bool allStored = true;
{
// Batch all inserts into a single transaction: one BEGIN EXCLUSIVE /
// COMMIT (one fsync) for the whole flush instead of one per record.
// All-or-nothing: if any insert fails the transaction is rolled back,
// so callers (e.g. Flush) can re-queue the whole batch without risking
// duplicate rows (the events table has no unique record_id constraint).
#ifdef ENABLE_LOCKING
LOCKGUARD(m_lock);
DbTransaction transaction(m_db.get());
if (!transaction.locked)
{
LOG_ERROR("Failed to store %zu events: Database error", records.size());
m_observer->OnStorageFailed("Database error");
return 0;
}
#endif
for (auto const& r : records) {
if (insertRecordUnsafe(r)) {
addedSize += r.id.size() + r.tenantToken.size() + r.blob.size();
}
else {
allStored = false;
break;
}
}

if (!allStored) {
#ifdef ENABLE_LOCKING
transaction.markForRollback();
#endif
// Undo the size-estimate added by the rolled-back inserts.
m_DbSizeEstimate -= std::min(m_DbSizeEstimate.load(), addedSize);
}
}

if (!allStored) {
// The whole batch was rolled back after a write failure; report once.
m_observer->OnStorageFailed("Database write failed");
}

// Run the size-full notification / resize check once after the batch,
// matching the original per-record path (which ran it on every insert).
checkStorageSizeLimits();

return allStored ? records.size() : 0;
}

// Debug routine to print record count in the DB
Expand Down
Loading
Loading