Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions firestore-bigquery-export/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Version 0.2.3

fix: pass full document resource name to bigquery

## Version 0.2.2

fix: remove default value on DATABASE_REGION
Expand Down
2 changes: 1 addition & 1 deletion firestore-bigquery-export/extension.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

name: firestore-bigquery-export
version: 0.2.2
version: 0.2.3
specVersion: v1beta

displayName: Stream Firestore to BigQuery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Object {
"maxDispatchesPerSecond": 10,
"maxEnqueueAttempts": 3,
"maxStaleness": undefined,
"projectId": undefined,
"refreshIntervalMinutes": undefined,
"tableId": "my_table",
"timePartitioning": null,
Expand Down
1 change: 1 addition & 0 deletions firestore-bigquery-export/functions/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export function clustering(clusters: string | undefined) {

export default {
bqProjectId: process.env.BIGQUERY_PROJECT_ID,
projectId: process.env.PROJECT_ID,
databaseId: process.env.DATABASE || "(default)",
databaseRegion: process.env.DATABASE_REGION,
collectionPath: process.env.COLLECTION_PATH,
Expand Down
128 changes: 54 additions & 74 deletions firestore-bigquery-export/functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ import {
import * as logs from "./logs";
import * as events from "./events";
import { getChangeType, getDocumentId } from "./util";
import { DocumentSnapshot } from "firebase-admin/firestore";

// Configuration for the Firestore Event History Tracker.
// Configuration for the Firestore Event History Tracker
const eventTrackerConfig = {
firestoreInstanceId: config.databaseId,
tableId: config.tableId,
Expand Down Expand Up @@ -67,27 +66,27 @@ const eventTrackerConfig = {
logLevel: config.logLevel,
};

// Initialize the Firestore Event History Tracker with the given configuration.
const eventTracker: FirestoreBigQueryEventHistoryTracker =
new FirestoreBigQueryEventHistoryTracker(eventTrackerConfig);
const eventTracker = new FirestoreBigQueryEventHistoryTracker(
eventTrackerConfig
);

// Initialize logging.
logs.logger.setLogLevel(config.logLevel);
logs.init();

/** Initialize Firebase Admin SDK if not already initialized */
if (admin.apps.length === 0) {
admin.initializeApp();
}

// Setup the event channel for EventArc.
events.setupEventChannel();

// Define a type for task data to ensure consistency
/**
* Task data structure for BigQuery synchronization
*/
interface SyncBigQueryTaskData {
timestamp: string;
eventId: string;
documentPath: string;
relativePath: string;
fullResourceName: string;
changeType: ChangeType;
documentId: string;
params: Record<string, any> | null;
Expand All @@ -96,39 +95,38 @@ interface SyncBigQueryTaskData {
}

/**
* Cloud Function to handle enqueued tasks to synchronize Firestore changes to BigQuery.
* Handles enqueued tasks for syncing Firestore changes to BigQuery
*/
export const syncBigQuery = functions.tasks
.taskQueue()
.onDispatch(async (taskData: SyncBigQueryTaskData, ctx) => {
const documentName = taskData.documentPath;
const fullResourceName = taskData.fullResourceName;
const eventId = taskData.eventId;
const operation = taskData.changeType;

logs.logEventAction(
"Firestore event received by onDispatch trigger",
documentName,
fullResourceName,
eventId,
operation
);

try {
// Use the shared function to write the event to BigQuery
await recordEventToBigQuery(
taskData.changeType,
taskData.documentId,
taskData.fullResourceName,
taskData.data,
taskData.oldData,
taskData
);

// Record a success event in EventArc, if configured
await events.recordSuccessEvent({
subject: taskData.documentId,
data: {
timestamp: taskData.timestamp,
operation: taskData.changeType,
documentName: taskData.documentPath,
documentName: taskData.fullResourceName,
documentId: taskData.documentId,
pathParams: taskData.params,
eventId: taskData.eventId,
Expand All @@ -137,13 +135,11 @@ export const syncBigQuery = functions.tasks
},
});

// Log completion of the task.
logs.complete();
} catch (err) {
// Log error and throw it to handle in the calling function.
logs.logFailedEventAction(
"Failed to write event to BigQuery from onDispatch handler",
documentName,
fullResourceName,
eventId,
operation,
err as Error
Expand All @@ -153,35 +149,34 @@ export const syncBigQuery = functions.tasks
}
});

/**
* Main Cloud Function that triggers on Firestore document changes
* and sends the data to BigQuery
*/
export const fsexportbigquery = onDocumentWritten(
`${config.collectionPath}/{documentId}`,
async (event) => {
const { data, ...context } = event;

// Start logging the function execution.
logs.start();

// Determine the type of change (CREATE, UPDATE, DELETE) from the new event data.
const changeType = getChangeType(data);
const documentId = getDocumentId(data);

// Check if the document is newly created or deleted.
const isCreated = changeType === ChangeType.CREATE;
const isDeleted = changeType === ChangeType.DELETE;

// Get the new and old data from the snapshot.
const newData = isDeleted ? undefined : data.after.data();
const oldData =
isCreated || config.excludeOldData ? undefined : data.before.data();

// check this is the full doc name
const documentName = context.document;
const relativeName = context.document;
const projectId = config.projectId;
const fullResourceName = `projects/${projectId}/databases/${config.databaseId}/documents/${relativeName}`;
const eventId = context.id;
const operation = changeType;

logs.logEventAction(
"Firestore event received by onDocumentWritten trigger",
documentName,
fullResourceName,
eventId,
operation
);
Expand All @@ -190,13 +185,12 @@ export const fsexportbigquery = onDocumentWritten(
let serializedOldData: any;

try {
// Serialize the data before processing.
serializedData = eventTracker.serializeData(newData);
serializedOldData = eventTracker.serializeData(oldData);
} catch (err) {
logs.logFailedEventAction(
"Failed to serialize data",
documentName,
fullResourceName,
eventId,
operation,
err as Error
Expand All @@ -205,7 +199,6 @@ export const fsexportbigquery = onDocumentWritten(
}

try {
// Record the start event in EventArc, if configured.
await events.recordStartEvent({
documentId,
changeType,
Expand All @@ -219,16 +212,17 @@ export const fsexportbigquery = onDocumentWritten(
}

try {
// Write the change event to BigQuery.
await recordEventToBigQuery(
changeType,
documentId,
fullResourceName,
serializedData,
serializedOldData,
{
timestamp: context.time,
eventId: context.id,
documentPath: context.document,
relativePath: context.document,
fullResourceName,
changeType,
documentId,
params: config.wildcardIds ? context.params : null,
Expand All @@ -238,11 +232,12 @@ export const fsexportbigquery = onDocumentWritten(
);
} catch (err) {
logs.failedToWriteToBigQueryImmediately(err as Error);
// Handle enqueue errors with retries and backup to GCS.

await attemptToEnqueue(err, {
timestamp: context.time,
eventId: context.id,
documentPath: context.document,
relativePath: context.document,
fullResourceName: fullResourceName,
changeType,
documentId,
params: config.wildcardIds ? context.params : null,
Expand All @@ -251,49 +246,49 @@ export const fsexportbigquery = onDocumentWritten(
});
}

// Log the successful completion of the function.
logs.complete();
}
);

/**
* Record the event to the Firestore Event History Tracker and BigQuery.
* Records a Firestore document change event to BigQuery
*
* @param changeType - The type of change (CREATE, UPDATE, DELETE).
* @param documentId - The ID of the Firestore document.
* @param serializedData - The serialized new data of the document.
* @param serializedOldData - The serialized old data of the document.
* @param taskData - The task data containing event information.
* @param changeType - The type of change (CREATE, UPDATE, DELETE)
* @param documentId - The ID of the Firestore document
* @param fullResourceName - Fully-qualified Firestore document path
* @param serializedData - The serialized new data
* @param serializedOldData - The serialized old data
* @param taskData - Task metadata containing event information
*/
async function recordEventToBigQuery(
changeType: ChangeType,
documentId: string,
fullResourceName: string,
serializedData: any,
serializedOldData: any,
taskData: SyncBigQueryTaskData
) {
const event: FirestoreDocumentChangeEvent = {
timestamp: taskData.timestamp, // Cloud Firestore commit timestamp
operation: changeType, // The type of operation performed
documentName: taskData.documentPath, // The document name
documentId, // The document ID
timestamp: taskData.timestamp,
operation: changeType,
documentName: fullResourceName,
documentId,
pathParams: taskData.params as
| FirestoreDocumentChangeEvent["pathParams"]
| null, // Path parameters, if any
eventId: taskData.eventId, // The event ID from Firestore
data: serializedData, // Serialized new data
oldData: serializedOldData, // Serialized old data
| null,
eventId: taskData.eventId,
data: serializedData,
oldData: serializedOldData,
};

// Record the event in the Firestore Event History Tracker and BigQuery.
await eventTracker.record([event]);
}

/**
* Handle errors when enqueueing tasks to sync BigQuery.
* Handles task enqueueing with retry logic when BigQuery sync fails
*
* @param err - The error object.
* @param taskData - The task data to be enqueued.
* @param err - The error that occurred
* @param taskData - The task data to enqueue
*/
async function attemptToEnqueue(_err: Error, taskData: SyncBigQueryTaskData) {
try {
Expand All @@ -303,36 +298,31 @@ async function attemptToEnqueue(_err: Error, taskData: SyncBigQueryTaskData) {
);

let attempts = 0;
const jitter = Math.random() * 100; // Adding jitter to avoid collision

// Exponential backoff formula with a maximum of 5 + jitter seconds
const jitter = Math.random() * 100;
const backoff = (attempt: number) =>
Math.min(Math.pow(2, attempt) * 100, 5000) + jitter;

while (attempts < config.maxEnqueueAttempts) {
if (attempts > 0) {
// Wait before retrying to enqueue the task.
await new Promise((resolve) => setTimeout(resolve, backoff(attempts)));
}

attempts++;
try {
await queue.enqueue(taskData);
break; // Break the loop if enqueuing is successful.
break;
} catch (enqueueErr) {
// Throw the error if max attempts are reached.
if (attempts === config.maxEnqueueAttempts) {
throw enqueueErr;
}
}
}
} catch (enqueueErr) {
// Record the error event.
await events.recordErrorEvent(enqueueErr as Error);

logs.logFailedEventAction(
"Failed to enqueue event to Cloud Tasks from onWrite handler",
taskData.documentPath,
taskData.fullResourceName,
taskData.eventId,
taskData.changeType,
enqueueErr as Error
Expand All @@ -341,37 +331,27 @@ async function attemptToEnqueue(_err: Error, taskData: SyncBigQueryTaskData) {
}

/**
* Cloud Function to set up BigQuery sync by initializing the event tracker.
* Sets up BigQuery synchronization by initializing the event tracker
*/
export const setupBigQuerySync = functions.tasks
.taskQueue()
.onDispatch(async () => {
/** Setup runtime environment */
const runtime = getExtensions().runtime();

// Initialize the BigQuery sync.
await eventTracker.initialize();

// Update the processing state.
await runtime.setProcessingState(
"PROCESSING_COMPLETE",
"Sync setup completed"
);
});

/**
* Cloud Function to initialize BigQuery sync.
* Initializes BigQuery synchronization
*/
export const initBigQuerySync = functions.tasks
.taskQueue()
.onDispatch(async () => {
/** Setup runtime environment */
const runtime = getExtensions().runtime();

// Initialize the BigQuery sync.
await eventTracker.initialize();

// Update the processing state.
await runtime.setProcessingState(
"PROCESSING_COMPLETE",
"Sync setup completed"
Expand Down
Loading