Tutorial

Publish/Subscribe Pattern in Node.js

Draft updated on Invalid Date
Publish/Subscribe Pattern in Node.js

This tutorial is out of date and no longer maintained.

The author selected Open Source Initiative to receive a donation as part of the Write for DOnations program.

Introduction

Publish/Subscribe (commonly referred to as Pub/Sub) pattern is one of the most versatile one-way messaging patterns. You can think of a One-way messaging pattern as dropping a letter (message) in a mailbox. You send this letter out, and that is the end; you are not waiting for a response or a reply. The message you sent goes one way, which is from you: the sender to the receiver and you are not expecting a reply back.

A publisher is a part of the system that generateas data or messages while a subscriber, also part of the system that registers an inteest in receiving specific types of messages or data. The Pub/Sub pattern can be implemented in two ways: First is using a peer-peer architecture and secondly, using a message broker to serve as a mediator for the communication.

The above image illustrates the Peer-to-Peer Pub/Sub model where there is a Publisher, a node that sends out messages, and Subscribers, nodes that register interest in receiving messages from the publishers. The publisher node directly communicates with each of the subscribers. This indicates the peer-peer model where publishers send messages directly to the subscribers without a mediator. Each subscriber has to know the address or the endpoint of the publisher in order to receive messages.

Note: A node, in this instance, typically refers to an active participant in the messaging network, which could be either a service that publishes information or a service that receives information (a subscriber).

The image above shows the Pub/Sub model using a message broker. You can think of a message broker as a central hub where messages are delivered to (published) and sent out (subscriber) without these two components having to relate to each other. In this model, the publisher node or service sends a message, and the broker serves as the mediator that takes the messages from the publisher and distributes them to the subscribers. The subscriber nodes subscribe to the broker rather than the publisher directly.

The presence of a broker improves the decoupling between the system’s nodes since both the publisher and subscribers interact only with the broker.

Let’s build a real-time chat application to further demonstrate this pattern

Prerequisite

Step 1 - Chat App - Server Side Implementation

To start our server-side implementation, we will initialize a basic Nodejs app using the command:

npm init -y

which creates a default package.json.

Next, we will install the WebSocket (ws) dependency package that will be needed during the entire course of this build:

npm install ws

The server-side implementation will be a basic server-side chat app. We need to follow the below steps:

  1. Setup A Server
  2. Read the HTML file to be rendered on the browser
  3. Set up a WebSocket connection.

Setting up a Server

Create a file named app.js in your directory and put the code below inside:

const http = require("http");
const server = http.createServer((req, res) => {
  res.end("Hello Chat App");
});

const PORT = 3459;
server.listen(PORT, () => {
  console.log(`Server up and running on port ${PORT}`);
});

The createServer method on the build in http module of Node.js will be used to setup a server. The PORT at which the server should listen to requests was set, and the listen method was called on the server instance created to listen to incoming requests on the port specified.

Run the command: node app.js in your terminal and you should have a response like this:

output
Server up and running on port 3459

If you make a request to this port on your browser you should have something like this as your repsonse:

Read HTML file to be rendered on the browser

Create a file call index.html in the root directory and input the code below:

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Document</title>
  </head>
  <body>
    <p>Serving HTML file</p>
  </body>
</html>

This is a basic html file that renders a paragraph -> Hello. Now we have to read this file and serve it as the response whenever a request is made to our server.

// app.js

const server = http.createServer((req, res) => {
  const htmlFilePath = path.join(__dirname, "index.html");
  fs.readFile(htmlFilePath, (err, data) => {
    if (err) {
      res.writeHead(500);
      res.end("Error occured while reading file");
    }
    res.writeHead(200, { "Content-Type": "text/html" });
    res.end(data);
  });
});

Here we are using the built in path module and the join function to concatenate path segments together. Then the readFile function is used to read the index.html file asynchronously. It takes in two arguments, the path of the file to be read and a callback. A 500 status code is sent to the response header and the error message is sent back to the client. if the data is read successfully, we send a 200 success status code to the response header and the response data back to the client, which in this case is the content of the file. If If no encoding is specified, like UTF-8 endoding then the raw buffer is returned, otherwise the html file is returned.

Make a request to the server on your browser and you should have this:

Indicating that our html file has been read successfully and served.

Setup Websocket connection

const webSocketServer = new WebSocket.Server({ server });

webSocketServer.on("connection", (client) => {
  console.log("successfully connected to the client");

  client.on("message", (streamMessage) => {
    console.log("message", streamMessage);
    distributeClientMessages(streamMessage);
  });
});

const distributeClientMessages = (message) => {
  for (const client of webSocketServer.clients) {
    if (client.readyState === WebSocket.OPEN) {
      client.send(message);
    }
  }
};

In the preceeding line of code, we create a new WebSocket server (webSocketServer) and attach it to our existing HTTP server (server). This will allow us handle both standard HTTP requests and WebSocket connections on the same port (3459).

The on connection event is triggered when there is a successful WebSocket conenction is established. The client in the callback function is a WebSocket connection object that represents the connection to the client, which will be used to send,receive messages and listen to events like message from the client.

The distrubuteClientMessages function is used here to send a received messages to all connected clients. It takes in a message argument and iterate over the connected clients to our server. It then checks for the connection state of each client (readyState === WebSocket.OPEN). This is to ensure that the server send messages only to clients that have open connection. If the client’s connection is open, the server sends the message to that client using the client.send(message) method.

Step 2 - Client Side Implementation

For the client side implementation, we will modify our index.html file a little bit.

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Document</title>
  </head>
  <body>
    <p>Pub/Sub Pattern with Chat Messaging</p>

    <div id="messageContainer"></div>
    <form id="messageForm">
      <form id="messageForm">
        <input
          type="text"
          id="messageText"
          placeholder="Send a message"
          style="
            padding: 10px;
            margin: 5px;
            border-radius: 5px;
            border: 1px solid #ccc;
            outline: none;
          "
          onfocus="this.style.borderColor='#007bff';"
          onblur="this.style.borderColor='#ccc';"
        />
        <input
          type="button"
          value="Send Message"
          style="
            padding: 10px;
            margin: 5px;
            border-radius: 5px;
            background-color: #007bff;
            color: white;
            border: none;
            cursor: pointer;
          "
          onmouseover="this.style.backgroundColor='#0056b3';"
          onmouseout="this.style.backgroundColor='#007bff';"
        />
      </form>
    </form>

    <script>
      const url = window.location.host;
      const socket = new WebSocket(`ws://${url}`);
    </script>
  </body>
</html>

In this piece of code we added a form element that has an input and a button for sending messages. WebSocket connections are initiated by clients and to communicate with a WebSocket-enabled server that we have setup initially,we have to create an instance of the WebSocket object specifying the ws://url that identifies the server we want to use. The url variable when logged will have the url connection to the port where our server is listening (3459).

console.log("url", url); // localhost:3459
console.log("socket", socket); // { url: "ws://localhost:3459/", readyState: 0, bufferedAmount: 0, onopen: null, onerror: null, onclose: null, extensions: "", protocol: "", onmessage: null, binaryType: "blob" }

So, when you type in the make a request to the server in your browser, you should see this:

Let us upgrade our script so that we can send message from the client to the server and receive message from the server.

// index.html
    <script>
      const url = window.location.host;
      const socket = new WebSocket(`ws://${url}`);

      const messageContainer = document.getElementById("messageContainer");
      socket.onmessage = function (eventMessage) {
        eventMessage.data.text().then((text) => {
          const messageContent = document.createElement("p");
          messageContent.innerHTML = text;

          document
            .getElementById("messageContainer")
            .appendChild(messageContent);
        });
      };

      const form = document.getElementById("messageForm");
      form.addEventListener("submit", (event) => {
        event.preventDefault();

        const message = document.getElementById("messageText").value;
        socket.send(message);
        document.getElementById("messageText").value = "";
      });
    </script>

As previously mentioned, we retrieve the url that sends a request to our server from the client side (broswer) and create a new WebSocket object instance with the url. Then, we create an event on the form element when the Send Message button is clicked. The text entered by the user on the user interface is extracted from the input element and the send method is called on the socket instance to send message to the server.

Note: In order to send a message to the server on the WebSocket connection, the send() method of the WebSocket object is usually invoked and it expects a single message argument which can be an ArrayBuffer, Blob, string or typed array. This method buffers the specified message to be transmitted and returns it before sending the message to the server.

The onmessage event called on the socket object is triggered when a message is received from the server. This is used to update the user interface of an incoming message. The eventMessage param in the callback function has the data(the message) sent from the server, but it comes back as a Blob. .text() is then used on the Blob data which returns a promise and resolved using the then() to get the actual text from the server.

Let’s test what we have. Start up the server by running

node app.js

Then, open two different browsers and make a request to http://localhost:3459/ and try sending messages between the browsers:

Step 3 - Scaling the Application

Let’s say our application starts growing and we try to scale it by having multiple instances of our chat server. What we want to acheive is that two different users connected to two different servers should be able to send text messages to each other successfully. Currently we have only one server and if we make a request to another server say http://localhost:3460/, we will not have the messages for server on port 3459, i.e only users connected to 3460 can chat with themselves. The current implementation works in away that when a chat message is sent on our working server instance, the message is distributed locally to only the clients connected to that particular server as shown when we open http://localhost:3459/ on two different browsers. Now, let’s see how we can have two different servers, integrate them so they can talk to each other

Step 4 - Redis as a Message Broker

Redis is a fast and flexible in-memory data structure store. Often times it is used as a database or a cache server to cache data. Additionally, it can be used to implement a centralized Pub/Sub message exchange pattern. Redis speed and flexibility have made it a very pupular choice for sharing data in a distributed system.

The aim here is to integrate our chat servers using Redis as a message broker. Each of the server instance publish any mesage received from the client (browser) to the message broker at the same time. it subscribes for any message coming from the server instances.

Let’s modify our app.js file:

//app.js

const http = require("http");
const fs = require("fs");
const path = require("path");
const WebSocket = require("ws");
const Redis = require("ioredis");

const redisPublisher = new Redis();
const redisSubscriber = new Redis();

const server = http.createServer((req, res) => {
  const htmlFilePath = path.join(__dirname, "index.html");
  fs.readFile(htmlFilePath, (err, data) => {
    if (err) {
      res.writeHead(500);
      res.end("Error occured while reading file");
    }
    res.writeHead(200, { "Content-Type": "text/html" });
    res.end(data);
  });
});

const webSocketServer = new WebSocket.Server({ server });

webSocketServer.on("connection", (client) => {
  console.log("succesfully connected to the client");
  client.on("message", (streamMessage) => {
    redisPublisher.publish("chat_messages", streamMessage);
  });
});

redisSubscriber.subscribe("chat_messages");
console.log("sub", redisSubscriber.subscribe("messages"));

redisSubscriber.on("message", (channel, message) => {
  console.log("redis", channel, message);
  for (const client of webSocketServer.clients) {
    if (client.readyState === WebSocket.OPEN) {
      client.send(message);
    }
  }
});
const PORT = process.argv[2] || 3459;
server.listen(PORT, () => {
  console.log(`Server up and running on port ${PORT}`);
});

Here we are taking the advantage of Redis’s publish/subscribe capabilities. Two different connection instancesnwas instantiated, once for publishing messages and the other to subscribe to a channel. When a message is send from the client, we are publishing it to a Redis channel named “chat_messages” using the publisher method on the redisPublisher instance. The subscribe method is called on the redisSubscribe instance to subscribe to the same chat_message channel. Whenever a message is published to this channel, the redisSubscriber.on event listener is triggered. This event listener iterates over all currently connected WebSocket clients and sends the received message to each client. This is to ensure that when one user sends a message, all other users connected to any server instance receive that message in real-time.

If you start 2 different servers say:

node app.js 3459
node app.js 3460

When chat text is sent on one instance, we can now broadcast the messages across our connected servers and not to only one particular server. You can test this by running http://localhost:3459/ and http://localhost:3460/, then send chats between then and see that the messages are broadcasted across the two servers in real-time. You can monitor the messages published to a channel from the redis-cli and also subscribe to the channel to get the subscribed messages:

Run the command redis-cli. Then enter MONITOR. Go back to your browser and send a chat, in your terminal you should see something like this assuming you send a chat text of Wow:

To see subscribed messaged published, run same command redis-cli and enter SUBSCRIBE channelName. channelName in our case will be chat_messages. You should have something like this in your terminal if you send a text: Great from the browser:

Now, we can have multiple instances of our server running on different ports or even different machines, and as long as they subscribe to the same Redis channel, they can receive and broadcast messages to all connected clients, ensuring users can chat seamlessly across instances.

Remember we discussed about the Pub/Sub pattern implementation using a messgae broker in the introduction section: This example perfectly sums it up.

In the figure above, there are 2 different clients (browsers) connected to chat servers. The chat servers are interconnected, not directly, but through a Redis instance. This means that while they handle client connections independently, they share information (chat messages) through a common medium (Redis).Each chat server up there connects to Redis. This connection is used to publish messages to Redis and subscribe to Redis channels to receive messages. When a user sends a message, the chat server publishes it to the specified channel on Redis.When Redis receives a published message, it broadcasts this message to all subscribed chat servers. Each chat server then relays the message to all connected clients, ensuring that every user receives the messages sent by any user, regardless of which server they’re connected to.

This architecture allows us to horizontally scale our chat application by adding more server instances as needed. Each instance can handle its own set of connected clients, thanks to Redis’s pub/sub system capabilities, that ensures consistent message distribution across all instances. This setup is efficient for handling large numbers of simultaneous users and ensures high availability of your application.

Conclusion

In this tutorial we have learnt about the Publish/Subscribe pattern while creating a simple chat application to demonstrating this pattern, using Redis as a message broker. Up next is to learn how to implement a peer-to-peer messaging system in cases where a message broker might not be the best solution for example, in a complex distributed systems where a single point of failure (Broker) is not an option. You will find the complete source code of this tutorial here on GitHub.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about us


About the authors

Default avatar

Sr Technical Writer


Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
Leave a comment


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Get our biweekly newsletter

Sign up for Infrastructure as a Newsletter.

Hollie's Hub for Good

Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.

Become a contributor

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

Welcome to the developer cloud

DigitalOcean makes it simple to launch in the cloud and scale up as you grow — whether you're running one virtual machine or ten thousand.

Learn more
DigitalOcean Cloud Control Panel