GraphQL MaxListenersExceededWarning when a subscription doesn’t yield a value for multiple finished client processes

I have a very simple dummy Node.js + GraphQL + Apollo project that you can download from here, or test on StackBlitz, or just inspect the code at the end of this post.

My problem is: When testing a subscription multiple times (2 different processes involved at a time, one for the server and another one for the tester) and that subscription doesn’t yield a value, then the event listener for that subscription remains active even if the process that performed the test finishes. Then I get the error:

(node:20064) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 CHANNEL_MESSAGE listeners added to [EventEmitter]. Use emitter.setMaxListeners() to increase limit
(Use `node --trace-warnings ...` to show where the warning was created)

You can test it with:

Terminal 1: $ yarn; yarn dev
Terminal 2: $ clear; for i in {1..11}; do echo -e "n33[33m### Iteration: $i33[0m"; node src/test.js; done

Then you will get the following:

enter image description here

However, if the subscription always yield a value (in the example above making: const IGNORE_MESSAGE_EVENTS = false), then we don’t get that MaxListenersExceededWarning.

What I need: If the test process finishes gracefully/abruptly/unexpectedly the event listener should be removed automatically from the server so there are no unused resources around.

On the tester I even have the instructions below in the finally (try-catch-finally) but didn’t work:

if (subscription) {
  subscription.unsubscribe();
}
if (client1) {
  client1.stop();
  client1 = null;
}
if (client2) {
  client2.stop();
  client2 = null;
}

however I think the server should not necessary expect that the tester-client finishes the connection properly but just remove listeners that were originated by a connection that doesn’t exist anymore, on this case the one for the tester process.

file: /.env

PORT=4000

file: /package.json

{
  "name": "mirror",
  "private": true,
  "version": "0.0.0",
  "type": "module",
  "scripts": {
    "dev": "node src/index.js"
  },
  "devDependencies": {},
  "dependencies": {
    "@apollo/client": "^3.9.11",
    "@apollo/server": "^4.10.2",
    "@graphql-tools/schema": "^10.0.3",
    "ody-parser": "^1.20.2",
    "cross-fetch": "^4.0.0",
    "dotenv": "^16.4.5",
    "express": "^4.19.2",
    "graphql": "^16.8.1",
    "graphql-subscriptions": "^2.0.0",
    "graphql-tag": "^2.12.6",
    "graphql-ws": "^5.16.0",
    "react": "^18.2.0",
    "react-dom": "^18.2.0",
    "ws": "^8.16.0"
  },
  "peerDependencies": {
    "@types/zen-observable": "^0.8.7"
  }
}

file: /src/index.js

import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import { makeExecutableSchema } from '@graphql-tools/schema';
import bodyParser from 'body-parser';
import dotenv from 'dotenv';
import express from 'express';
import { PubSub } from 'graphql-subscriptions';
import { gql } from 'graphql-tag';
import { useServer } from 'graphql-ws/lib/use/ws';
import { createServer } from 'http';
import { WebSocketServer } from 'ws';

dotenv.config();

const app = express();
const httpServer = createServer(app);

const pubsub = new PubSub();

const NOTIFICATION_CHANNEL_MESSAGE = 'CHANNEL_MESSAGE';

const typeDefs = gql`
  type MessageEvent {
    channel: String!
    message: String!
  }
  type Query {
    health: String
  }
  type Mutation {
    sendMessage(channel: String!, message: String!): Boolean
  }
  type Subscription {
    messageEvent(channel: String!): MessageEvent
  }
`;

const IGNORE_MESSAGE_EVENTS = true;
const shouldIgnoreMessageEvents = new Promise((resolve) => resolve(IGNORE_MESSAGE_EVENTS));

const resolvers = {
  Query: {
    health: () => 'OK',
  },
  Mutation: {
    sendMessage: async (_, { channel, message }) => {
      await pubsub.publish(NOTIFICATION_CHANNEL_MESSAGE, {
        messageEvent: { channel, message },
      });

      return true;
    },
  },
  Subscription: {
    messageEvent: {
      subscribe: async function* (_, { channel }, context) {
        for await (const { messageEvent } of context.pubsub.asyncIterator([
          NOTIFICATION_CHANNEL_MESSAGE,
        ])) {
          if (messageEvent.channel === channel && !(await shouldIgnoreMessageEvents)) {
            yield messageEvent;
          }
        }
      },
      resolve: (messageEvent) => messageEvent,
    },
  },
};

const schema = makeExecutableSchema({ typeDefs, resolvers });

const wsServer = new WebSocketServer({
  server: httpServer,
  path: '/graphql',
});

useServer({ schema, context: { pubsub } }, wsServer);

const server = new ApolloServer({
  schema,
  introspection: true,
  plugins: [ApolloServerPluginDrainHttpServer({ httpServer })],
});

await server.start();

app.use('/graphql', bodyParser.json(), expressMiddleware(server));

const PORT = process.env.PORT;
httpServer.listen(PORT, () => {
  console.log(`Server running on port: ${PORT}`);
});

file: /src/test.js

import pkg from '@apollo/client';
import fetch from 'cross-fetch';
import dotenv from 'dotenv';
import gql from 'graphql-tag';
import { createClient } from 'graphql-ws';
import ws from 'ws';

const { ApolloClient, ApolloLink, HttpLink, InMemoryCache, Observable } = pkg;

const expect = (value) => {
  return {
    toBe: (expected) => {
      if (value !== expected) {
        throw new Error(`Received: ${value} | Expected: ${expected}`);
      }
    },
  };
};

const createPromiseSignal = () => {
  let resolveFunction;
  const promise = new Promise((resolve) => {
    resolveFunction = resolve;
  });

  return [resolveFunction, promise];
};

// prettier-ignore
class GraphQLWsLink extends ApolloLink {
  constructor(url) {
    super();
    this.client = createClient({ url, webSocketImpl: ws });
  }
  request(operation) {
    return new Observable((observer) => {
      const { query, variables } = operation;
      const dispose = this.client.subscribe(
        {
          query: query.loc?.source.body || '',
          variables,
        },
        {
          next: (data) => observer.next(data),
          error: (err) => observer.error(err.reason ? new Error(err.reason) : err),
          complete: () => observer.complete(),
        }
      );

      return () => {
        dispose();
      };
    });
  }
  onConnected(callback) {
    this.client.on('connected', callback);
  }
  async close() {
    await this.client.dispose();
  }
}

dotenv.config();

const httpUri = `http://localhost:${process.env.PORT}/graphql`;

let client1 = null;
let client2 = null;
let subscription = null;

const channelToListen = 'sports';
const channelToSend = 'sports'; // { sports -> ok, tech -> error }
const message = 'Hello World';

try {
  const wsLink = new GraphQLWsLink(httpUri.replace(/^http:///, 'ws://'));

  client1 = new ApolloClient({
    link: wsLink,
    cache: new InMemoryCache(),
  });

  // prettier-ignore
  const [wsLinkConnectionResolve, wsLinkConnectionPromise] = createPromiseSignal();

  wsLink.onConnected(wsLinkConnectionResolve);

  const observer = client1.subscribe({
    query: gql`
      subscription messageSubscription($channel: String!) {
        messageEvent(channel: $channel) {
          channel
          message
        }
      }
    `,
    variables: {
      channel: channelToListen,
    },
  });

  subscription = observer.subscribe({
    next(response) {
      console.log(response.data.messageEvent);
    },
  });

  await wsLinkConnectionPromise;

  client2 = new ApolloClient({
    link: new HttpLink({
      uri: httpUri,
      fetch,
    }),
    cache: new InMemoryCache(),
  });

  // prettier-ignore
  const { data: { sendMessage } } = await client2.mutate({
    mutation: gql`
      mutation sendMessage($channel: String!, $message: String!) {
        sendMessage(channel: $channel, message: $message)
      }
    `,
    variables: {
      channel: channelToSend,
      message,
    },
  });

  expect(sendMessage).toBe(true);

  console.log('All tests passed!');

} finally {
  if (subscription) {
    subscription.unsubscribe();
  }
  if (client1) {
    client1.stop();
    client1 = null;
  }
  if (client2) {
    client2.stop();
    client2 = null;
  }
}

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật