I am trying to get the my MongoDB Atlas changes updated in real time in the front user facing. I am trying to implement the Mongo Change Stream functions. I am able to get the changes in my file but not sure how I will pass those changes to the front-end page.
Inside my Mongodb Stream file I am already getting the changes from MongoDB, all changes are being showing on that “change” inside the “for while”. I just need to know how I can pass that to the front facing part. If someone could help me on that, I tried some stuff but nothing worked there.
I am working with 3 files –
The Main front facing file code –
import { useEffect, useState, useCallback } from 'react';
import axios from 'axios';
import { columns } from './columns';
import { DataTable } from './DataTable';
export default function JobsTable() {
const [sseConnection, setSSEConnection] = useState(null);
const [jobs, setJobs] = useState([]);
const fetchData = async () => {
const response = await axios.get('/api/jobs');
const listenToSSEUpdates = useCallback(() => {
console.log('listenToSSEUpdates func');
const eventSource = new EventSource('/api/sse');
}, [listenToSSEUpdates]);
<DataTable columns={columns} data={jobs} />{' '}
<code>'use client';
import { useEffect, useState, useCallback } from 'react';
import axios from 'axios';
import { columns } from './columns';
import { DataTable } from './DataTable';
export default function JobsTable() {
const [sseConnection, setSSEConnection] = useState(null);
const [jobs, setJobs] = useState([]);
useEffect(() => {
const fetchData = async () => {
const response = await axios.get('/api/jobs');
setJobs(response.data);
};
fetchData();
}, []);
const listenToSSEUpdates = useCallback(() => {
console.log('listenToSSEUpdates func');
const eventSource = new EventSource('/api/sse');
}, []);
useEffect(() => {
listenToSSEUpdates();
return () => {
if (sseConnection) {
sseConnection.close();
}
};
}, [listenToSSEUpdates]);
return (
<div>
{' '}
<DataTable columns={columns} data={jobs} />{' '}
</div>
);
}
</code>
'use client';
import { useEffect, useState, useCallback } from 'react';
import axios from 'axios';
import { columns } from './columns';
import { DataTable } from './DataTable';
export default function JobsTable() {
const [sseConnection, setSSEConnection] = useState(null);
const [jobs, setJobs] = useState([]);
useEffect(() => {
const fetchData = async () => {
const response = await axios.get('/api/jobs');
setJobs(response.data);
};
fetchData();
}, []);
const listenToSSEUpdates = useCallback(() => {
console.log('listenToSSEUpdates func');
const eventSource = new EventSource('/api/sse');
}, []);
useEffect(() => {
listenToSSEUpdates();
return () => {
if (sseConnection) {
sseConnection.close();
}
};
}, [listenToSSEUpdates]);
return (
<div>
{' '}
<DataTable columns={columns} data={jobs} />{' '}
</div>
);
}
My API SSE File:
<code>import { changeStream } from '@/lib/mongoChangeStream';
import { NextResponse } from 'next/server';
import { headers } from 'next/headers';
const HEARTBEAT_INTERVAL = 5000; // 5 seconds (adjust this as needed)
export async function GET(req, res) {
// Check if the client accepts SSE
const headersList = headers();
const accept = headersList.get('accept');
return new NextResponse('ok');
<code>import { changeStream } from '@/lib/mongoChangeStream';
import { NextResponse } from 'next/server';
import { headers } from 'next/headers';
const HEARTBEAT_INTERVAL = 5000; // 5 seconds (adjust this as needed)
export async function GET(req, res) {
// Check if the client accepts SSE
const headersList = headers();
const accept = headersList.get('accept');
return new NextResponse('ok');
}
</code>
import { changeStream } from '@/lib/mongoChangeStream';
import { NextResponse } from 'next/server';
import { headers } from 'next/headers';
const HEARTBEAT_INTERVAL = 5000; // 5 seconds (adjust this as needed)
export async function GET(req, res) {
// Check if the client accepts SSE
const headersList = headers();
const accept = headersList.get('accept');
return new NextResponse('ok');
}
My MongoDB Stream Connection:
<code>import { MongoClient } from 'mongodb';
const uri = process.env.DATABASE_URL || '';
const client = new MongoClient(uri);
console.log('Setting up change stream');
const database = client.db('pcb');
const collection = database.collection('Jobs');
const options = { fullDocument: 'updateLookup' };
// This could be any pipeline.
// Open a Change Stream on the "jobs" collection
changeStream = collection.watch(pipeline, options);
// Print change events as they occur
for await (const change of changeStream) {
console.log('Received change:n', change);
// Close the change stream when done
await changeStream.close();
// Close the MongoDB client connection
run().catch(console.dir);
<code>import { MongoClient } from 'mongodb';
const uri = process.env.DATABASE_URL || '';
const client = new MongoClient(uri);
export let changeStream;
async function run() {
try {
console.log('Setting up change stream');
const database = client.db('pcb');
const collection = database.collection('Jobs');
const options = { fullDocument: 'updateLookup' };
// This could be any pipeline.
const pipeline = [];
// Open a Change Stream on the "jobs" collection
changeStream = collection.watch(pipeline, options);
// Print change events as they occur
for await (const change of changeStream) {
console.log('Received change:n', change);
}
// Close the change stream when done
await changeStream.close();
} finally {
// Close the MongoDB client connection
await client.close();
}
}
run().catch(console.dir);
</code>
import { MongoClient } from 'mongodb';
const uri = process.env.DATABASE_URL || '';
const client = new MongoClient(uri);
export let changeStream;
async function run() {
try {
console.log('Setting up change stream');
const database = client.db('pcb');
const collection = database.collection('Jobs');
const options = { fullDocument: 'updateLookup' };
// This could be any pipeline.
const pipeline = [];
// Open a Change Stream on the "jobs" collection
changeStream = collection.watch(pipeline, options);
// Print change events as they occur
for await (const change of changeStream) {
console.log('Received change:n', change);
}
// Close the change stream when done
await changeStream.close();
} finally {
// Close the MongoDB client connection
await client.close();
}
}
run().catch(console.dir);