I have 2 cloud functions, one cloud function publishes the message, and the other cloud function(transform.js) gets triggered from the published message. Now in the transform cloud function, I am trying to insert it into a big query table, but there are some cases where it will throw an error like the table does not exist or schema mismatch, In such cases the publisher should resends the data after some time.
the problem is that the publisher is NOT resending the data, I have even changed the retry policy to retry after exponential backoff delay
Transform cloud function
<code>const admin = require("firebase-admin");
//const serviceAccount = require("./path-firebase-adminsdk-uszgp-35748eb111.json");
const { PubSub, Schema } = require("@google-cloud/pubsub");
const { BigQuery } = require("@google-cloud/bigquery");
const functions = require('firebase-functions');
const pubsub = new PubSub({
projectId: "abc"
})
let transformedObject = {}
let initialFirestoreDatatype={}
let fieldsToTransform = []
var rootCollection=''
var docData;
var documentId;
const db = admin.firestore()
module.exports.transform = functions.pubsub.topic('posts-fs-to-bigquery-1').onPublish( async (message, context) => {
// Handle the incoming message
//console.log('Raw message data:', message.data.toString());
try{
const decodedData = Buffer.from(message.data, 'base64').toString('utf-8');
//console.log('decoded data:', decodedData);
const data = JSON.parse(decodedData);
console.log('Received message:', data);
rootCollection = data['rootCollection'];
docData = reconstructData(data['docData'])
console.log("data after processing",docData);
documentId=data['documentId'];
if(data['postId'].includes('/')){
data['postId']=`${rootCollection}/`+data['postId'];
let parts = data['postId'].split("/");
let collection = parts.slice(0, -1).join("/");
let document = parts.slice(-1)[0];
const subcollection = parts.slice(-2, -1)[0];
await fstoBigQuery(collection,document, subcollection, true)
}else{
await fstoBigQuery(data['rootCollection'],data['postId'], '', false)
}
}catch(e){
throw new Error(e)
}
});
async function fstoBigQuery(collection,postId, subcoll ,isSubCollection) {
//this is where error will occur and I want pub to republish the message
try{
const [table] = await bigquery.dataset("firestore_collections").table(`${rootCollection}`).get();
}catch(e){
throw new Error(e)
}
}
</code>
<code>const admin = require("firebase-admin");
//const serviceAccount = require("./path-firebase-adminsdk-uszgp-35748eb111.json");
const { PubSub, Schema } = require("@google-cloud/pubsub");
const { BigQuery } = require("@google-cloud/bigquery");
const functions = require('firebase-functions');
const pubsub = new PubSub({
projectId: "abc"
})
let transformedObject = {}
let initialFirestoreDatatype={}
let fieldsToTransform = []
var rootCollection=''
var docData;
var documentId;
const db = admin.firestore()
module.exports.transform = functions.pubsub.topic('posts-fs-to-bigquery-1').onPublish( async (message, context) => {
// Handle the incoming message
//console.log('Raw message data:', message.data.toString());
try{
const decodedData = Buffer.from(message.data, 'base64').toString('utf-8');
//console.log('decoded data:', decodedData);
const data = JSON.parse(decodedData);
console.log('Received message:', data);
rootCollection = data['rootCollection'];
docData = reconstructData(data['docData'])
console.log("data after processing",docData);
documentId=data['documentId'];
if(data['postId'].includes('/')){
data['postId']=`${rootCollection}/`+data['postId'];
let parts = data['postId'].split("/");
let collection = parts.slice(0, -1).join("/");
let document = parts.slice(-1)[0];
const subcollection = parts.slice(-2, -1)[0];
await fstoBigQuery(collection,document, subcollection, true)
}else{
await fstoBigQuery(data['rootCollection'],data['postId'], '', false)
}
}catch(e){
throw new Error(e)
}
});
async function fstoBigQuery(collection,postId, subcoll ,isSubCollection) {
//this is where error will occur and I want pub to republish the message
try{
const [table] = await bigquery.dataset("firestore_collections").table(`${rootCollection}`).get();
}catch(e){
throw new Error(e)
}
}
</code>
const admin = require("firebase-admin");
//const serviceAccount = require("./path-firebase-adminsdk-uszgp-35748eb111.json");
const { PubSub, Schema } = require("@google-cloud/pubsub");
const { BigQuery } = require("@google-cloud/bigquery");
const functions = require('firebase-functions');
const pubsub = new PubSub({
projectId: "abc"
})
let transformedObject = {}
let initialFirestoreDatatype={}
let fieldsToTransform = []
var rootCollection=''
var docData;
var documentId;
const db = admin.firestore()
module.exports.transform = functions.pubsub.topic('posts-fs-to-bigquery-1').onPublish( async (message, context) => {
// Handle the incoming message
//console.log('Raw message data:', message.data.toString());
try{
const decodedData = Buffer.from(message.data, 'base64').toString('utf-8');
//console.log('decoded data:', decodedData);
const data = JSON.parse(decodedData);
console.log('Received message:', data);
rootCollection = data['rootCollection'];
docData = reconstructData(data['docData'])
console.log("data after processing",docData);
documentId=data['documentId'];
if(data['postId'].includes('/')){
data['postId']=`${rootCollection}/`+data['postId'];
let parts = data['postId'].split("/");
let collection = parts.slice(0, -1).join("/");
let document = parts.slice(-1)[0];
const subcollection = parts.slice(-2, -1)[0];
await fstoBigQuery(collection,document, subcollection, true)
}else{
await fstoBigQuery(data['rootCollection'],data['postId'], '', false)
}
}catch(e){
throw new Error(e)
}
});
async function fstoBigQuery(collection,postId, subcoll ,isSubCollection) {
//this is where error will occur and I want pub to republish the message
try{
const [table] = await bigquery.dataset("firestore_collections").table(`${rootCollection}`).get();
}catch(e){
throw new Error(e)
}
}