Tutorial

Publish/Subscribe Pattern in Node.js

Publish/Subscribe Pattern in Node.js

The author selected No Kid Hungry to receive a donation as part of the Write for DOnations program.

Introduction

The Pub/Sub pattern is a versatile one-way messaging pattern where a publisher generates data/messages, and a subscriber registers to receive specific types of messages. It can be implemented using a peer-to-peer architecture or a message broker to mediate communication.

Fig-1

The above image illustrates the Peer-to-Peer Pub/Sub model, where a Publisher sends messages directly to Subscribers without a mediator. Subscribers need to know the address or the endpoint of the Publisher to get messages.

Note: A node, in this instance, typically refers to an active participant in the messaging network, which could be either a service that publishes information or a service that receives information (a subscriber).

Fig-2

In the above image, the Pub/Sub model uses a message broker as a central hub to deliver messages between publishers and subscribers. The broker mediates the message exchange, distributing messages from publishers to subscribers. The subscriber nodes subscribe to the broker rather than the publisher directly.

The presence of a broker improves the decoupling between the system’s nodes since both the publisher and subscribers interact only with the broker.

In this tutorial, you will build a real-time chat application to further demonstrate this pattern.

Prerequisites

Step 1 - Server Side Implementation

To start the server-side implementation, we will initialize a basic Nodejs app using the command:

npm init -y

The above command creates a default package.json file.

A package.json file is a key component in Node.js projects. It serves as a manifest for the project, containing various metadata such as project name, version, dependencies, scripts, and more. When you add dependencies to your project using npm install or yarn add, the package.json file is automatically updated to reflect the newly added dependencies.

Next, we will install the WebSocket (ws) dependency package that will be needed during the entire course of this build:

npm install ws

The server-side implementation will be a basic server-side chat app. We will follow the below workflow:

  1. Setup a Server
  2. Read the HTML file to be rendered on the browser
  3. Set up a WebSocket connection.

Setting up a Server

Create a file named app.js in your directory and put the code below inside:

const http = require("http");
const server = http.createServer((req, res) => {
  res.end("Hello Chat App");
});

const PORT = 3459;
server.listen(PORT, () => {
  console.log(`Server up and running on port ${PORT}`);
});

The createServer method on the built-in http module of Node.js will be used to set up a server. The PORT at which the server should listen to requests was set, and the listen method was called on the server instance created to listen to incoming requests on the port specified.

Run the command: node app.js in your terminal, and you should have a response like this:

Output
Server is up and running on port 3459

If you make a request to this port on your browser, you should have something like this as your response:

Fig-3

Read the HTML file to be rendered on the browser

Create a file called index.html in the root directory and copy the below code:

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Document</title>
  </head>
  <body>
    <p>Serving HTML file</p>
  </body>
</html>

This is a basic html file that renders a Hello. Now, we have to read this file and serve it as the response whenever an HTTP request is made to our server.

const http = require("http");
const fs = require("fs");
const path = require("path");

const server = http.createServer((req, res) => {
  const htmlFilePath = path.join(__dirname, "index.html");
  fs.readFile(htmlFilePath, (err, data) => {
    if (err) {
      res.writeHead(500);
      res.end("Error occured while reading file");
    }
    res.writeHead(200, { "Content-Type": "text/html" });
    res.end(data);
  });
});

Here, we use the built-in path module and the join function to concatenate path segments together. Then, the readFile function is used to read the index.html file asynchronously. It takes two arguments: the path of the file to be read and a callback. A 500 status code is sent to the response header, and the error message is sent back to the client.

If the data is read successfully, we send a 200 success status code to the response header and the response data back to the client, which, in this case, is the content of the file. If no encoding is specified, like UTF-8 encoding, then the raw buffer is returned. Otherwise, the HTML file is returned.

Make a request to the server on your browser, and you should have this:

Fig-4

Setup WebSocket connection

const WebSocket = require("ws");
const webSocketServer = new WebSocket.Server({ server });

webSocketServer.on("connection", (client) => {
  console.log("successfully connected to the client");

  client.on("message", (streamMessage) => {
    console.log("message", streamMessage);
    distributeClientMessages(streamMessage);
  });
});

const distributeClientMessages = (message) => {
  for (const client of webSocketServer.clients) {
    if (client.readyState === WebSocket.OPEN) {
      client.send(message);
    }
  }
};

In the preceding line of code, we create a new WebSocket server, webSocketServer and attach it to our existing HTTP server. This will allow us to handle both standard HTTP requests and WebSocket connections on the same port 3459.

The on() connection event is triggered when a successful WebSocket connection is established. The client in the callback function is a WebSocket connection object that represents the connection to the client. It will be used to send and receive messages and listen to events like message from the client.

The distrubuteClientMessages function is used here to send received messages to all connected clients. It takes in a message argument and iterates over the connected clients to our server. It then checks for the connection state of each client (readyState === WebSocket.OPEN). This is to ensure that the server sends messages only to clients that have open connections. If the client’s connection is open, the server sends the message to that client using the client.send(message) method.

Step 2 - Client Side Implementation

For the client-side implementation, we will modify our index.html file a little bit.

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Document</title>
  </head>
  <body>
    <p>Pub/Sub Pattern with Chat Messaging</p>

    <div id="messageContainer"></div>
    <form id="messageForm">
      <form id="messageForm">
        <input
          type="text"
          id="messageText"
          placeholder="Send a message"
          style="
            padding: 10px;
            margin: 5px;
            border-radius: 5px;
            border: 1px solid #ccc;
            outline: none;
          "
          onfocus="this.style.borderColor='#007bff';"
          onblur="this.style.borderColor='#ccc';"
        />
        <input
          type="button"
          value="Send Message"
          style="
            padding: 10px;
            margin: 5px;
            border-radius: 5px;
            background-color: #007bff;
            color: white;
            border: none;
            cursor: pointer;
          "
          onmouseover="this.style.backgroundColor='#0056b3';"
          onmouseout="this.style.backgroundColor='#007bff';"
        />
      </form>
    </form>

    <script>
      const url = window.location.host;
      const socket = new WebSocket(`ws://${url}`);
      console.log("url", url); // localhost:3459
      console.log("socket", socket); // { url: "ws://localhost:3459/", readyState: 0, bufferedAmount: 0, onopen: null, onerror: null, onclose: null, extensions: "", protocol: "", onmessage: null, binaryType: "blob" }
    </script>
  </body>
</html>

In this piece of code, we added a form element that has an input and a button for sending messages. WebSocket connections are initiated by clients, and to communicate with a WebSocket-enabled server that we have set up initially, we have to create an instance of the WebSocket object specifying the ws://url that identifies the server we want to use. The URL and socket variable, when logged, will have the URL connection to the port where our server is listening on port 3459 and the WebSocket object, respectively.

So, when you type in the make a request to the server in your browser, you should see this:

Fig-5

Let’s upgrade our script so that we can send messages from the client to the server and receive messages from the server.

<script>
  const url = window.location.host;
  const socket = new WebSocket(`ws://${url}`);
  const messageContainer = document.getElementById("messageContainer");

  socket.onmessage = function (eventMessage) {
    eventMessage.data.text().then((text) => {
      const messageContent = document.createElement("p");
      messageContent.innerHTML = text;
      document.getElementById("messageContainer").appendChild(messageContent);
    });
  };

  const form = document.getElementById("messageForm");
  form.addEventListener("submit", (event) => {
    event.preventDefault();
    const message = document.getElementById("messageText").value;
    socket.send(message);
    document.getElementById("messageText").value = "";
  });
</script>

As previously mentioned, we retrieve the URL that sends a request to our server from the client side (browser) and create a new WebSocket object instance with the URL. Then, we create an event on the form element when the Send Message button is clicked. The text entered by the user on the user interface is extracted from the input element, and the send method is called on the socket instance to send a message to the server.

Note: In order to send a message to the server on the WebSocket connection, the send() method of the WebSocket object is usually invoked, and it expects a single message argument, which can be an ArrayBuffer, Blob, string or typed array. This method buffers the specified message to be transmitted and returns it before sending the message to the server.

The onmessage event called on the socket object is triggered when a message is received from the server. This is used to update the user interface of an incoming message. The eventMessage param in the callback function has the data(the message) sent from the server, but it comes back as a Blob. The text() method is then used on the Blob data, which returns a promise and is resolved using the then() to get the actual text from the server.

Let’s test what we have. Start up the server by running

node app.js

Then, open two different browser tabs, open http://localhost:3459/, and try sending messages between the tabs to test:

Fig-6

Step 3 - Scaling the Application

Let’s say our application starts growing, and we try to scale it by having multiple instances of our chat server. What we want to acheive is that two different users connected to two different servers should be able to send text messages to each other successfully. Currently, we have only one server, and if we request another server, say http://localhost:3460/, we will not have the messages for the server on port 3459; i.e. only users connected to 3460 can chat with themselves. The current implementation works in a way that when a chat message is sent on our working server instance, the message is distributed locally to only the clients connected to that particular server, as shown when we open http://localhost:3459/ on two different browsers. Now, let’s see how we can have two different servers integrate them so they can talk to each other

Step 4 - Redis as a Message Broker

Redis is a fast and flexible in-memory data structure store. It is often used as a database or a cache server to cache data. Additionally, it can be used to implement a centralized Pub/Sub message exchange pattern. Redis’s speed and flexibility have made it a very popular choice for sharing data in a distributed system.

The aim here is to integrate our chat servers using Redis as a message broker. Each server instance publishes any message received from the client (browser) to the message broker at the same time. The message broker subscribes to any message coming from the server instances.

Let’s modify our app.js file:

//app.js
const http = require("http");
const fs = require("fs");
const path = require("path");
const WebSocket = require("ws");
const Redis = require("ioredis");

const redisPublisher = new Redis();
const redisSubscriber = new Redis();

const server = http.createServer((req, res) => {
  const htmlFilePath = path.join(__dirname, "index.html");
  fs.readFile(htmlFilePath, (err, data) => {
    if (err) {
      res.writeHead(500);
      res.end("Error occured while reading file");
    }
    res.writeHead(200, { "Content-Type": "text/html" });
    res.end(data);
  });
});

const webSocketServer = new WebSocket.Server({ server });

webSocketServer.on("connection", (client) => {
  console.log("succesfully connected to the client");
  client.on("message", (streamMessage) => {
    redisPublisher.publish("chat_messages", streamMessage);
  });
});

redisSubscriber.subscribe("chat_messages");
console.log("sub", redisSubscriber.subscribe("messages"));

redisSubscriber.on("message", (channel, message) => {
  console.log("redis", channel, message);
  for (const client of webSocketServer.clients) {
    if (client.readyState === WebSocket.OPEN) {
      client.send(message);
    }
  }
});
const PORT = process.argv[2] || 3459;
server.listen(PORT, () => {
  console.log(`Server up and running on port ${PORT}`);
});

Here, we are taking advantage of Redis’s publish/subscribe capabilities. Two different connection instancesnwas instantiated, once for publishing messages and the other to subscribe to a channel. When a message is sent from the client, we publish it to a Redis channel named “chat_messages” using the publisher method on the redisPublisher instance. The subscribe method is called on the redisSubscribe instance to subscribe to the same chat_message channel. Whenever a message is published to this channel, the redisSubscriber.on event listener is triggered. This event listener iterates over all currently connected WebSocket clients and sends the received message to each client. This is to ensure that when one user sends a message, all other users connected to any server instance receive that message in real time.

If you start two different servers, say:

node app.js 3459
node app.js 3460

When chat text is sent on one instance, we can now broadcast the messages across our connected servers rather than to only one particular server. You can test this by running http://localhost:3459/ and http://localhost:3460/, then sending chats between them and seeing that the messages are broadcast across the two servers in real-time.

You can monitor the messages published to a channel from the redis-cli and also subscribe to the channel to get the subscribed messages:

Run the command redis-cli. Then enter MONITOR. Go back to your browser and start a chat. In your terminal, you should see something like this, assuming you send a chat text of Wow:

Fig-8

To see subscribed messages published, run the same command redis-cli and enter SUBSCRIBE channelName. channelName in our case will be chat_messages. You should have something like this in your terminal if you send a text: Great from the browser:

Fig-9

Now, we can have multiple instances of our server running on different ports or even different machines, and as long as they subscribe to the same Redis channel, they can receive and broadcast messages to all connected clients, ensuring users can chat seamlessly across instances.

Remember we discussed the Pub/Sub pattern implementation using a Message broker in the introduction section. This example perfectly sums it up.

Fig-10

In the figure above, there are two different clients connected to chat servers. The chat servers are interconnected, not directly, but through a Redis instance. This means that while they handle client connections independently, they share information (chat messages) through a common medium (Redis). Each chat server up there connects to Redis. This connection is used to publish messages to Redis and subscribe to Redis channels to receive messages. When a user sends a message, the chat server publishes it to the specified channel on Redis.

When Redis receives a published message, it broadcasts this message to all subscribed chat servers. Each chat server then relays the message to all connected clients, ensuring that every user receives the messages sent by any user, regardless of which server they’re connected to.

This architecture allows us to horizontally scale our chat application by adding more server instances as needed. Each instance can handle its own set of connected clients, thanks to Redis’s publish/subscribe-system capabilities, which ensure consistent message distribution across all instances. This setup is efficient for handling large numbers of simultaneous users and ensures the high availability of your application.

Conclusion

In this tutorial, we have learnt about the Publish/Subscribe pattern while creating a simple chat application to demonstrate this pattern, using Redis as a message broker. Up next is to learn how to implement a peer-to-peer messaging system in cases where a message broker might not be the best solution, for example, in complex distributed systems where a single point of failure (Broker) is not an option.

You will find the complete source code of this tutorial here on GitHub.

The author selected No Kid Hungry to receive a donation as part of the Write for DOnations program.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about our products

About the authors

Default avatar

Sr Technical Writer

Sr. Technical Writer@ DigitalOcean | Medium Top Writers(AI & ChatGPT) | 2M+ monthly views & 34K Subscribers | Ex Cloud Consultant @ AMEX | Ex SRE(DevOps) @ NUTANIX


Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
Leave a comment


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Become a contributor for community

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

DigitalOcean Documentation

Full documentation for every DigitalOcean product.

Resources for startups and SMBs

The Wave has everything you need to know about building a business, from raising funding to marketing your product.

Get our newsletter

Stay up to date by signing up for DigitalOcean’s Infrastructure as a Newsletter.

New accounts only. By submitting your email you agree to our Privacy Policy

The developer cloud

Scale up as you grow — whether you're running one virtual machine or ten thousand.

Get started for free

Sign up and get $200 in credit for your first 60 days with DigitalOcean.*

*This promotional offer applies to new accounts only.