I have a very simple dummy Node.js + GraphQL + Apollo
project that you can download from here, or test on StackBlitz, or just inspect the code at the end of this post.
My problem is: When testing a subscription multiple times (2 different processes involved at a time, one for the server and another one for the tester) and that subscription doesn’t yield a value, then the event listener for that subscription remains active even if the process that performed the test finishes. Then I get the error:
(node:20064) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 CHANNEL_MESSAGE listeners added to [EventEmitter]. Use emitter.setMaxListeners() to increase limit
(Use `node --trace-warnings ...` to show where the warning was created)
You can test it with:
Terminal 1: $ yarn; yarn dev
Terminal 2: $ clear; for i in {1..11}; do echo -e "n33[33m### Iteration: $i33[0m"; node src/test.js; done
Then you will get the following:
However, if the subscription always yield a value (in the example above making: const IGNORE_MESSAGE_EVENTS = false
), then we don’t get that MaxListenersExceededWarning
.
What I need: If the test process finishes gracefully/abruptly/unexpectedly the event listener should be removed automatically from the server so there are no unused resources around.
On the tester I even have the instructions below in the finally (try-catch-finally) but didn’t work:
if (subscription) {
subscription.unsubscribe();
}
if (client1) {
client1.stop();
client1 = null;
}
if (client2) {
client2.stop();
client2 = null;
}
however I think the server should not necessary expect that the tester-client finishes the connection properly but just remove listeners that were originated by a connection that doesn’t exist anymore, on this case the one for the tester process.
file: /.env
PORT=4000
file: /package.json
{
"name": "mirror",
"private": true,
"version": "0.0.0",
"type": "module",
"scripts": {
"dev": "node src/index.js"
},
"devDependencies": {},
"dependencies": {
"@apollo/client": "^3.9.11",
"@apollo/server": "^4.10.2",
"@graphql-tools/schema": "^10.0.3",
"ody-parser": "^1.20.2",
"cross-fetch": "^4.0.0",
"dotenv": "^16.4.5",
"express": "^4.19.2",
"graphql": "^16.8.1",
"graphql-subscriptions": "^2.0.0",
"graphql-tag": "^2.12.6",
"graphql-ws": "^5.16.0",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"ws": "^8.16.0"
},
"peerDependencies": {
"@types/zen-observable": "^0.8.7"
}
}
file: /src/index.js
import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import { makeExecutableSchema } from '@graphql-tools/schema';
import bodyParser from 'body-parser';
import dotenv from 'dotenv';
import express from 'express';
import { PubSub } from 'graphql-subscriptions';
import { gql } from 'graphql-tag';
import { useServer } from 'graphql-ws/lib/use/ws';
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
dotenv.config();
const app = express();
const httpServer = createServer(app);
const pubsub = new PubSub();
const NOTIFICATION_CHANNEL_MESSAGE = 'CHANNEL_MESSAGE';
const typeDefs = gql`
type MessageEvent {
channel: String!
message: String!
}
type Query {
health: String
}
type Mutation {
sendMessage(channel: String!, message: String!): Boolean
}
type Subscription {
messageEvent(channel: String!): MessageEvent
}
`;
const IGNORE_MESSAGE_EVENTS = true;
const shouldIgnoreMessageEvents = new Promise((resolve) => resolve(IGNORE_MESSAGE_EVENTS));
const resolvers = {
Query: {
health: () => 'OK',
},
Mutation: {
sendMessage: async (_, { channel, message }) => {
await pubsub.publish(NOTIFICATION_CHANNEL_MESSAGE, {
messageEvent: { channel, message },
});
return true;
},
},
Subscription: {
messageEvent: {
subscribe: async function* (_, { channel }, context) {
for await (const { messageEvent } of context.pubsub.asyncIterator([
NOTIFICATION_CHANNEL_MESSAGE,
])) {
if (messageEvent.channel === channel && !(await shouldIgnoreMessageEvents)) {
yield messageEvent;
}
}
},
resolve: (messageEvent) => messageEvent,
},
},
};
const schema = makeExecutableSchema({ typeDefs, resolvers });
const wsServer = new WebSocketServer({
server: httpServer,
path: '/graphql',
});
useServer({ schema, context: { pubsub } }, wsServer);
const server = new ApolloServer({
schema,
introspection: true,
plugins: [ApolloServerPluginDrainHttpServer({ httpServer })],
});
await server.start();
app.use('/graphql', bodyParser.json(), expressMiddleware(server));
const PORT = process.env.PORT;
httpServer.listen(PORT, () => {
console.log(`Server running on port: ${PORT}`);
});
file: /src/test.js
import pkg from '@apollo/client';
import fetch from 'cross-fetch';
import dotenv from 'dotenv';
import gql from 'graphql-tag';
import { createClient } from 'graphql-ws';
import ws from 'ws';
const { ApolloClient, ApolloLink, HttpLink, InMemoryCache, Observable } = pkg;
const expect = (value) => {
return {
toBe: (expected) => {
if (value !== expected) {
throw new Error(`Received: ${value} | Expected: ${expected}`);
}
},
};
};
const createPromiseSignal = () => {
let resolveFunction;
const promise = new Promise((resolve) => {
resolveFunction = resolve;
});
return [resolveFunction, promise];
};
// prettier-ignore
class GraphQLWsLink extends ApolloLink {
constructor(url) {
super();
this.client = createClient({ url, webSocketImpl: ws });
}
request(operation) {
return new Observable((observer) => {
const { query, variables } = operation;
const dispose = this.client.subscribe(
{
query: query.loc?.source.body || '',
variables,
},
{
next: (data) => observer.next(data),
error: (err) => observer.error(err.reason ? new Error(err.reason) : err),
complete: () => observer.complete(),
}
);
return () => {
dispose();
};
});
}
onConnected(callback) {
this.client.on('connected', callback);
}
async close() {
await this.client.dispose();
}
}
dotenv.config();
const httpUri = `http://localhost:${process.env.PORT}/graphql`;
let client1 = null;
let client2 = null;
let subscription = null;
const channelToListen = 'sports';
const channelToSend = 'sports'; // { sports -> ok, tech -> error }
const message = 'Hello World';
try {
const wsLink = new GraphQLWsLink(httpUri.replace(/^http:///, 'ws://'));
client1 = new ApolloClient({
link: wsLink,
cache: new InMemoryCache(),
});
// prettier-ignore
const [wsLinkConnectionResolve, wsLinkConnectionPromise] = createPromiseSignal();
wsLink.onConnected(wsLinkConnectionResolve);
const observer = client1.subscribe({
query: gql`
subscription messageSubscription($channel: String!) {
messageEvent(channel: $channel) {
channel
message
}
}
`,
variables: {
channel: channelToListen,
},
});
subscription = observer.subscribe({
next(response) {
console.log(response.data.messageEvent);
},
});
await wsLinkConnectionPromise;
client2 = new ApolloClient({
link: new HttpLink({
uri: httpUri,
fetch,
}),
cache: new InMemoryCache(),
});
// prettier-ignore
const { data: { sendMessage } } = await client2.mutate({
mutation: gql`
mutation sendMessage($channel: String!, $message: String!) {
sendMessage(channel: $channel, message: $message)
}
`,
variables: {
channel: channelToSend,
message,
},
});
expect(sendMessage).toBe(true);
console.log('All tests passed!');
} finally {
if (subscription) {
subscription.unsubscribe();
}
if (client1) {
client1.stop();
client1 = null;
}
if (client2) {
client2.stop();
client2 = null;
}
}