Report this

What is the reason for this report?

Deploy Coreflux MQTT Broker with Managed Databases

Published on January 30, 2026
Deploy Coreflux MQTT Broker with Managed Databases

Introduction

MQTT brokers connect IoT devices and applications through a publish-subscribe messaging pattern, making them essential for modern IoT infrastructure. Coreflux is a low-code MQTT broker that adds real-time data processing and transformation capabilities, allowing you to integrate directly with DigitalOcean managed databases including MongoDB, PostgreSQL, MySQL, and OpenSearch without writing custom integration code.

What you’ll learn: This tutorial walks you through deploying a complete IoT data pipeline—from setting up a managed database cluster and Coreflux MQTT broker on DigitalOcean, to configuring secure VPC networking, building data transformation models using Coreflux’s Language of Things (LoT), and automatically storing processed IoT data in your chosen database. You’ll end up with a production-ready setup that handles real-time messaging and persistent storage for IoT applications.

Key Takeaways

Before diving into the step-by-step deployment process, here are the key points you’ll learn:

  • Deploy a managed database cluster (PostgreSQL, MongoDB, MySQL, or OpenSearch) on DigitalOcean for scalable IoT data storage.
  • Set up Coreflux MQTT broker on a DigitalOcean Droplet using the Marketplace image or Docker.
  • Create secure VPC networking to connect your MQTT broker and database without public exposure.
  • Build real-time data pipelines using Coreflux’s Language of Things (LoT) for low-code IoT automation.
  • Transform and store IoT data automatically from MQTT topics to database tables, collections, or indexes.
  • Verify end-to-end data flow from simulated sensors through transformation models to database storage.

This tutorial provides a production-ready foundation for IoT applications that need real-time messaging combined with persistent data storage and advanced capabilities like search, analytics, or relational queries.

What You Will Build

By the end of this automation guide, you will have deployed:

  • A managed database cluster (PostgreSQL, MongoDB, MySQL, or OpenSearch) for scalable storage
  • A DigitalOcean droplet running Coreflux MQTT broker
  • A Virtual Private Cloud (VPC) network for secure IoT communication
  • Real-time data simulation using LoT Notebook extension
  • Low code data transformation models and database integration routes
  • Complete Data Integration & Transformation pipeline for IoT automation

Coreflux & DigitalOcean Partnership

Coreflux provides a Lightweight MQTT Broker and Data Pipeline tools through Language-of-Things programming language for Efficient IoT Communication on DigitalOcean.

What is MQTT?

MQTT (Message Queuing Telemetry Transport) is a lightweight, publish-subscribe network protocol widely adopted in IoT ecosystems. Designed for constrained devices and low-bandwidth, high-latency, or unreliable networks, MQTT enables efficient, real-time messaging in bandwidth-constrained environments.

About Coreflux

Coreflux offers a lightweight MQTT broker to facilitate efficient, real-time communication between IoT devices and applications, including real-time data transformation capabilities necessary for each use-case. Built for scalability and reliability, Coreflux is tailored for environments where low latency and high throughput are critical.

Coreflux handles message routing and data flow between devices, whether you’re building a small IoT project or deploying an industrial monitoring system.

With Coreflux on DigitalOcean, you get:

Data Processing: Centralization of your data processing needs where your data lives, ensuring real-time data processing.

Data Integration: Easily integrate with other DigitalOcean services like Managed Databases (PostgreSQL, MongoDB, MySQL, or OpenSearch), ensuring a single and simple ecosystem for all your data needs.

Scalability: Easily handle growing amounts of data and devices without compromising performance.

Reliability: Ensure consistent and dependable messaging across all connected devices.

Coreflux MQTT and managed databases architecture overview

Prerequisites

Before you begin this MQTT broker deployment tutorial, you’ll need:

Estimated time: This tutorial takes approximately 30-45 minutes to complete, depending on database provisioning time (typically 1-5 minutes per database cluster).

Step 1 — Creating the Network Infrastructure for IoT Automation

Creating a VPC Network for Secure MQTT Communication

First, you’ll create a Virtual Private Cloud (VPC) to ensure secure communication between your IoT services and MQTT broker, without the need for public access.

  1. Log in to your DigitalOcean control panel
  2. Navigate to NetworkingVPC from the left sidebar
  3. Click Create VPC Network

DigitalOcean VPC creation screen

  1. Configure your VPC for IoT automation:

    1. Name: coreflux-integrations-vpc (or your VPC name)
    2. Datacenter region: Choose Frankfurt (or your preferred region)
    3. IP Range: Use the default or configure as needed
    4. Description: Add a meaningful description for your MQTT broker and Databases network
  2. Click Create VPC Network

The VPC will provide isolated networking for all your IoT resources, ensuring secure communication between the Coreflux MQTT broker and managed databases. For more details on VPC configuration, see our guide on creating VPC networks.

Step 2 — Setting Up Managed Database for Scalable Storage

Choose one of the following database options based on your IoT application requirements:

  • PostgreSQL: Ideal for structured data requiring relational queries, ACID compliance, and complex relationships
  • MySQL: Great for structured workloads and transactional queries with strong consistency and widespread tooling support
  • MongoDB: Perfect for flexible document storage with varying schemas and rapid development
  • OpenSearch: Excellent for advanced search, analytics, log analysis, and time-series data visualization

Setting Up PostgreSQL Managed Database

Managed PostgreSQL on DigitalOcean is a good fit when your IoT workloads need relational schemas, strong consistency, and advanced SQL analytics, backed by automated backups, monitoring, and maintenance.

DigitalOcean managed PostgreSQL cluster setup

  1. From the DigitalOcean control panel, navigate to Databases

  2. Click Create Database Cluster

  3. Configure your PostgreSQL cluster for IoT automation:

    • Database engine: Select PostgreSQL
    • Version: Choose the latest stable version
    • Datacenter region: Select Frankfurt (same as your VPC)
    • VPC Network: Select the coreflux-integrations-vpc you created
    • Database cluster name: postgresql-coreflux-test
    • Project: Select your target project
  4. Choose your plan based on your IoT requirements:

    1. For development: Basic plan with 1 GB RAM
    2. For production: General Purpose or higher for scalable storage
  5. Click Create Database Cluster

The managed database creation process typically takes 1-5 minutes. Once complete, you’ll be redirected to the database overview page, where you can see the connection details and perform administrative actions.

Configuring PostgreSQL Database Access for MQTT Broker Integration

You’ll be prompted with Getting Started steps, where your connection details are shown and you can configure the inbound access rules (recommended to limit to your IP and VPC-only).

  1. Click Get Started to configure your PostgreSQL database
  2. [Optional] Restrict inbound connections:
    • Add your local computer’s IP for management access
    • The droplet will be automatically allowed through VPC networking

PostgreSQL inbound access and VPC rules

For connection details, you’ll be able to see two options - Public Network and VPC Network. The first is for external access for tools like DBeaver, while the second will be used by the Coreflux service to access the database.

PostgreSQL public and VPC connection details

  1. Note the connection details provided, both for public access and for VPC access (distinct details for each):
    • Host: Your database hostname
    • User: Default admin user
    • Password: Auto-generated secure password
    • Database: Authentication database name

Testing PostgreSQL Database Connection

You can test the PostgreSQL connection using DBeaver with the provided connection parameters, using public access credentials:

Testing PostgreSQL connection in DBeaver

Creating PostgreSQL Application Database and User (Optional)

For better security and organization, create a dedicated user and database for your IoT automation application. This can also be done through DBeaver or CLI, but DigitalOcean provides a user-friendly approach:

  1. Go to Users & Databases tab in your managed database cluster
  2. Create User:
    • Username: coreflux-broker-client
    • Password: Autogenerated
  3. Create Database:
    • Database name: coreflux-broker-data

Note: You may need to change the user permissions within the database to be able to create tables, insert and select data. For PostgreSQL, grant the necessary privileges using GRANT CREATE, INSERT, SELECT ON DATABASE coreflux-broker-data TO coreflux-broker-client;. For MySQL, use GRANT CREATE, INSERT, SELECT ON coreflux-broker-data.* TO 'coreflux-broker-client'@'%';. See our PostgreSQL quickstart and MySQL documentation for more details.

Setting Up MySQL Managed Database

Managed MySQL on DigitalOcean is ideal for structured, transactional IoT data where you want familiar SQL, broad ecosystem support, and a fully managed service handling backups, updates, and monitoring.

DigitalOcean managed MySQL cluster setup

  1. From the DigitalOcean control panel, navigate to Databases

  2. Click Create Database Cluster

  3. Configure your MySQL cluster for IoT automation:

    • Database engine: Select MySQL
    • Version: Choose the latest stable version
    • Datacenter region: Select Frankfurt (same as your VPC)
    • VPC Network: Select the coreflux-integrations-vpc you created
    • Database cluster name: mysql-coreflux-test
    • Project: Select your target project
  4. Choose your plan based on your IoT requirements:

    • For development: Basic plan with 1 GB RAM
    • For production: General Purpose or higher for scalable storage
  5. Click Create Database Cluster

The managed database creation process typically takes 1-5 minutes. Once complete, you’ll be redirected to the database overview page, where you can see the connection details and perform administrative actions.

Configuring MySQL Database Access for MQTT Broker Integration

You’ll be prompted with Getting Started steps, where your connection details are shown and you can configure the inbound access rules (recommended to limit to your IP and VPC-only).

  1. Click Get Started to configure your MySQL database
  2. [Optional] Restrict inbound connections:
    • Add your local computer’s IP for management access
    • The droplet will be automatically allowed through VPC networking

MySQL inbound access and VPC rules

For connection details, you’ll be able to see two options - Public Network and VPC Network. The first is for external access for tools like DBeaver, while the second will be used by the Coreflux service to access the database.

MySQL public and VPC connection details

  1. Note the connection details provided, both for public access and for VPC access (distinct details for each):
    • Host: Your database hostname
    • User: Default admin user
    • Password: Auto-generated secure password
    • Database: Authentication database name

Testing MySQL Database Connection

You can test the MySQL connection using DBeaver with the provided connection parameters, using public access credentials.

Note: You may need to change DBeaver’s driver settings — set allowPublicKeyRetrieval = true.

Testing MySQL connection in DBeaver

Creating MySQL Application Database and User (Optional)

For better security and organization, create a dedicated user and database for your IoT automation application. This can also be done through DBeaver or CLI, but DigitalOcean provides a user-friendly approach:

  1. Go to Users & Databases tab in your managed database cluster
  2. Create User:
    • Username: coreflux-broker-client
    • Password: Autogenerated
  3. Create Database:
    • Database name: coreflux-broker-data

Setting Up MongoDB Managed Database

Managed MongoDB on DigitalOcean is well-suited to flexible or evolving IoT payloads, letting you store heterogeneous sensor documents without rigid schemas, while the platform handles replication, backups, and monitoring.

Creating a Managed MongoDB Cluster

  1. From the DigitalOcean control panel, navigate to Databases

  2. Click Create Database Cluster

  3. Configure your MongoDB cluster for IoT automation:

    • Database engine: Select MongoDB
    • Version: Choose the latest stable version
    • Datacenter region: Select Frankfurt (same as your VPC)
    • VPC Network: Select the coreflux-integrations-vpc you created
    • Database cluster name: mongodb-coreflux-test
    • Project: Select your target project
  4. Choose your plan based on your IoT requirements:

    • For development: Basic plan with 1 GB RAM
    • For production: General Purpose or higher for scalable storage
  5. Click Create Database Cluster

The managed database creation process typically takes 1-5 minutes. Once complete, you’ll be redirected to the database overview page, where you can see the connection details and perform administrative actions.

Configuring MongoDB Database Access for MQTT Broker Integration

You’ll be prompted with Getting Started steps, where your connection details are shown and you can configure the inbound access rules (recommended to limit to your IP and VPC-only).

  1. Click Get Started to configure your MongoDB database
  2. [Optional] Restrict inbound connections:
    • Add your local computer’s IP for management access
    • The droplet will be automatically allowed through VPC networking

Configuring Database Access for MQTT Broker Integration

For connection details, you’ll be able to see two options: Public Network and VPC Network. The first is for external access for tools like MongoDB Compass, while the second will be used by the Coreflux service to access the database.

Connection Details for MongoDB

  1. Note the connection details provided, both for public access and for VPC access (distinct details for each):
    • Host: Your database hostname
    • User: Default admin user
    • Password: Auto-generated secure password
    • Database: Authentication database name

Testing MongoDB Database Connection

You can test the MongoDB connection using MongoDB Compass or the provided connection string, using public access credentials:

mongodb://username:password@mongodb-host:27017/defaultauthdb?ssl=true

Testing Database Connection

Creating MongoDB Application Database and User (Optional)

For better security and organization, create a dedicated user and database for your IoT automation application. This can also be done through MongoDB Compass or CLI, but DigitalOcean provides a user-friendly approach:

  1. Go to Users & Databases tab in your managed database cluster
  2. Create User:
    • Username: coreflux-broker-client
    • Password: Autogenerated
  3. Create Database:
    • Database name: coreflux-broker-data

Setting Up OpenSearch Managed Database

Managed OpenSearch on DigitalOcean is designed for search, log analytics, and time-series dashboards over high-volume IoT data, with the service managing cluster health, scaling, and index storage for you.

Create a Managed OpenSearch Cluster

  1. From the DigitalOcean control panel, navigate to Databases

  2. Click Create Database Cluster

  3. Configure your OpenSearch cluster for IoT automation:

    • Database engine: Select OpenSearch
    • Version: Choose the latest stable version
    • Datacenter region: Select Frankfurt (same as your VPC)
    • VPC Network: Select the coreflux-integrations-vpc you created
    • Database cluster name: opensearch-coreflux-test
    • Project: Select your target project
  4. Choose your plan based on your IoT requirements:

    1. For development: Basic plan with 1 GB RAM
    2. For production: General Purpose or higher for scalable storage
  5. Click Create Database Cluster

The managed database creation process typically takes 1-5 minutes. Once complete, you’ll be redirected to the database overview page, where you can see the connection details and perform administrative actions.

Configuring OpenSearch Database Access for MQTT Broker Integration

You’ll be prompted with Getting Started steps, where your connection details are shown and you can configure the inbound access rules (recommended to limit to your IP and VPC-only).

  1. Click Get Started to configure your OpenSearch database
  2. [Optional] Restrict inbound connections:
    • Add your local computer’s IP for management access
    • The droplet will be automatically allowed through VPC networking

Configure Database Access

For connection details, you’ll be able to see two options: Public Network and VPC Network. The first is for external access for tools, while the second will be used by the Coreflux service to access the database. You’ll also see the URL and parameters to access the OpenSearch Dashboards.

Connection Details

  1. Note the connection details provided, both for public access and for VPC access (distinct details for each):
    • Host: Your database hostname
    • User: Default admin user
    • Password: Auto-generated secure password
    • Database: Authentication database name

Testing OpenSearch Database Connection

You can test the OpenSearch connection using the OpenSearch Dashboards with the provided credentials:

Test Database Connection

Step 3 — Deploying the Coreflux MQTT Broker on DigitalOcean Droplet

Creating the DigitalOcean Droplet

  1. Navigate to Droplets in your DigitalOcean control panel
  2. Click Create Droplet

Creating a new DigitalOcean Droplet

  1. Configure your droplet for MQTT broker deployment:

    • Choose Region: Frankfurt (same as your managed database)
    • VPC Network: Select coreflux-integrations-vpc
    • Choose an image: Go to Marketplace tab
    • Search for “Coreflux” and select Coreflux from the Marketplace

Selecting Coreflux from the Marketplace

  1. Choose Size for your IoT workload:

    • For development: Basic plan with 2 GB memory
    • For production: Basic or General Purpose plan with 4+ GB memory for scalable performance
  2. Choose Authentication Method:

    • SSH Key: Recommended for improved security
      1. A key can be created locally using ssh-keygen
    • Password: Alternative option
  3. Finalize Details:

    • Hostname: coreflux-test-broker
    • Project: Select your project
    • Tags: Add relevant tags for DevOps organization
  4. Click Create Droplet

  5. See the Droplet Home Page and wait for it to finish deploying

Droplet deployment in progress

Alternative - Installing Coreflux MQTT Broker with Docker on Docker Image Droplet

Using the same approach as for Coreflux Droplet, select Docker as the Marketplace image.

Once your droplet is running, connect to it via SSH with the defined authentication method or the web console available in the Droplet home page:

ssh root@your-droplet-ip

SSH connection to the Coreflux droplet

Run the Coreflux MQTT broker using Docker:

docker run -d \
  --name coreflux \
  -p 1883:1883 \
  -p 1884:1884 \
  -p 5000:5000 \
  -p 443:443 \
  coreflux/coreflux-mqtt-broker-t:1.6.3

This Docker command:

  • Runs the container in detached mode (-d)
  • Names the container coreflux
  • Exposes necessary ports for MQTT and web interface
  • Uses the latest Coreflux image

Verify the MQTT broker is running:

docker ps

You should see a container running:

Running Coreflux container in Docker

Validating the MQTT Broker deployment by connecting to it with default values

You can access the MQTT broker through a MQTT client like MQTT Explorer to validate the access to the broker, regardless of the approach taken to deploy it.

MQTT Explorer connected to Coreflux broker

Step 4 — Configuring Firewall Rules for Secure IoT Communication (Optional)

For production IoT automation deployments, configure firewall rules to restrict access:

  1. Navigate to NetworkingFirewalls

  2. Click Create Firewall

  3. Configure inbound rules for MQTT broker security:

    • SSH: Port 22 from your IP
    • MQTT: Port 1883 from your IoT application sources
    • MQTT with TLS: Port 1884 for secure MQTT with TLS
    • WebSocket: Port 5000 for MQTT through WebSocket
    • WebSocket with TLS: Port 443 for MQTT through WebSocket with TLS
  4. Apply the firewall to your droplet

For detailed firewall configuration, refer to DigitalOcean’s firewall quickstart guide. Production tip: Restrict MQTT port 1883 to specific source IPs or VPC ranges only, and prefer port 1884 (MQTT with TLS) for external device connections. Consider using DigitalOcean App Platform with private networking if you need additional security layers.

Step 5 — Setting Up IoT Data Integration with Coreflux’s Language of Things

Installing the LoT Notebook Extension

The LoT (Language of Things) Notebook extension for Visual Studio Code provides an integrated low code development environment for MQTT broker programming and IoT automation. Learn more about Coreflux’s Language of Things (LoT) for low-code IoT automation.

  1. Open Visual Studio Code
  2. Go to Extensions (Ctrl+Shift+X)
  3. Search for “LoT Notebooks
  4. Install the LoT VSCode Notebooks Extension by Coreflux

LoT Notebook extension in Visual Studio Code

Connecting to Your MQTT Broker

Configure the connection to your Coreflux MQTT broker, using default credentials, when prompted on the top bar or by clicking the MQTT button on the bottom bar on the left:

  • User: root
  • Password: coreflux

Assuming no errors, you’ll see the status of the MQTT connectivity to the broker in the bottom bar, on the left.

Coreflux MQTT connection status in VS Code

Step 6 — Creating Data in MQTT Broker through Actions

For this use-case, we will build an integration of raw-data, through a transformation pipeline, into a Database. However, as we are not connected to any MQTT devices in the demo, we will take advantage of LoT’s capabilities and use an Action to simulate device data.

In LoT, an Action is an executable logic that is triggered by specific events such as timed intervals, topic updates, or explicit calls from other actions or system components. Actions allow dynamic interaction with MQTT topics, internal variables, and payloads, facilitating complex IoT automation workflows.

As such, we can use an Action that generates data in certain topics in a defined time interval, that can then be used by the rest of the pipeline we will define below.

You can download the github repo with the sample project.

Generating Simulated IoT Data

Create an Action to generate simulated sensor data using the low code LoT (Language of Things) interface:

DEFINE ACTION RANDOMIZEMachineData
ON EVERY 10 SECONDS DO
    PUBLISH TOPIC "raw_data/machine1" WITH RANDOM BETWEEN 0 AND 10
    PUBLISH TOPIC "raw_data/station2" WITH RANDOM BETWEEN 0 AND 60

In the Notebook provided you also have an Action that does an incremental counter to simulate data, as an alternative to the provided Action.

Running LoT action to generate simulated IoT data

When you run this Action, it will:

  • Deploy automatically to the MQTT broker
  • Generate simulated IoT sensor data every 10 seconds
  • Publish real-time data to specific MQTT topics
  • Show sync status in the LoT Notebook interface
    • This status shows if the code on the LoT Notebook differs from the one running in the broker, or if it is missing entirely

Step 7 — Creating Data Transformation Models for Real-Time Processing

Defining Data Models with Language of Things

Models in Coreflux are used to transform, aggregate, and compute values from input MQTT topics, publishing the results to new topics. They serve the foundation for the creation of the UNS - Unified Namespace - of your system, applicable to your several data sources.

This way, a Model allows you to define how raw IoT data should be structured and transformed, both for a single device or for multiple devices simultaneously (through the use of the wildcard +). A model also serves as the key data schema used for scalable storage to the managed database.


DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"

    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER

    ADD "energy_wh" WITH (energy * 1000)

    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")

    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)

    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)

    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)

    ADD "timestamp" WITH TIMESTAMP "UTC"

This low code model:

  • Uses wildcard + to apply to all machines automatically
  • Converts energy to watt-hours (energy_wh) by multiplying by 1000
  • Determines production status based on energy thresholds
  • Tracks production counts and stoppage events
  • Adds timestamps to all real-time data points
  • Extracts machine ID from the topic structure
  • Publishes structured data to the Simulator/Machine/Data topics (replacing the + with each topic that matches the format for the trigger/source data)

As we generated two simulated sensors/machines with the Action, we can see the Model structure being applied automatically to both, generating both a json object and the individual topics.

Transformed MQTT data published by Coreflux model

Step 8 — Setting Up Database Integration for Scalable Storage

Choose the database integration section that matches your selected database from Step 2.

PostgreSQL Integration

In this section, you’ll learn how to store your processed IoT data in a PostgreSQL managed database on DigitalOcean.

To store your processed IoT data in a PostgreSQL managed database, you’ll define a Route in Coreflux. Routes specify how data is sent from your MQTT broker to your PostgreSQL cluster using a simple, low-code configuration:


DEFINE ROUTE PostgreSQL_Log WITH TYPE POSTGRESQL

    ADD SQL_CONFIG

        WITH SERVER "db-postgresql.db.onmyserver.com"

        WITH PORT 25060

        WITH DATABASE "defaultdb"

        WITH USERNAME "doadmin"

        WITH PASSWORD "AVNS_pass"

        WITH USE_SSL TRUE

        WITH TRUST_SERVER_CERTIFICATE FALSE

Replace with your PostgreSQL connection details from DigitalOcean and run the Route in your LoT Notebook. Important: Use the VPC connection details (not public) for better security and lower latency. The VPC hostname and port differ from the public connection string—check your database cluster’s connection details page for both options.

Updating the Model for PostgreSQL Database Storage

Modify your LoT model to use the database route for scalable storage, by adding this to the end of the Model:


STORE IN "PostgreSQL_Log"

    WITH TABLE "MachineProductionData"

Additionally, add a parameter with the topic, to have a unique identifier for each entry in your managed database.


DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"

    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER

    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"

    ADD "energy_wh" WITH (energy * 1000)

    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")

    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)

    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)

    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)

    ADD "timestamp" WITH TIMESTAMP "UTC"

    STORE IN "PostgreSQL_Log"

        WITH TABLE "MachineProductionData"

After you deploy this updated action, all data should be automatically stored in the database when updated.

MySQL Integration

MySQL is a widely used relational database management system, making it an excellent choice for storing and analyzing IoT data at scale. In this section, you’ll learn how to connect your Coreflux MQTT broker to a managed MySQL database on DigitalOcean, so your real-time device data is securely and reliably persisted for analytics, reporting, or integration with other applications.

To enable this integration, you must define a Route in Coreflux’s LoT (Language of Things) that instructs where and how the processed data should be sent. Below is the required low-code format for routing data to a MySQL database. Be sure to substitute your own connection details as needed:

DEFINE ROUTE MySQL_Log WITH TYPE MYSQL
    ADD SQL_CONFIG
        WITH SERVER "db-mysql.db.onmyserver.com"
        WITH PORT 25060
        WITH DATABASE "defaultdb"
        WITH USERNAME "doadmin"
        WITH PASSWORD "AVNS_pass"
        WITH USE_SSL TRUE
        WITH TRUST_SERVER_CERTIFICATE FALSE

Replace with your MySQL connection details from DigitalOcean and run the Route in your LoT Notebook. Important: Use the VPC connection details (not public) for better security and lower latency. If you encounter connection issues, verify that TRUST_SERVER_CERTIFICATE is set correctly for your MySQL version—some versions require TRUE while others work with FALSE.

Updating the Model for MySQL Database Storage

Modify your LoT model to use the database route for scalable storage, by adding this to the end of the Model:

STORE IN "MySQL_Log"
    WITH TABLE "MachineProductionData"

Additionally, add a parameter with the topic, to have a unique identifier for each entry in your managed database.

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
    ADD "energy_wh" WITH (energy * 1000)
    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
    ADD "timestamp" WITH TIMESTAMP "UTC"
    STORE IN "MySQL_Log"
        WITH TABLE "MachineProductionData"

After you deploy this updated action, all data should be automatically stored in the database when updated.

MongoDB Integration

MongoDB is a NoSQL database that is well-suited for storing and querying IoT data with flexible schemas. In this section, you’ll learn how to connect your Coreflux MQTT broker to a managed MongoDB database on DigitalOcean, so your real-time device data is securely and reliably persisted for analytics, reporting, or integration with other applications.

To enable this integration, you must define a Route in Coreflux’s LoT (Language of Things) that instructs where and how the processed data should be sent. Below is the required low-code format for routing data to a MongoDB database. Be sure to substitute your own connection details as needed:

DEFINE ROUTE mongo_route WITH TYPE MONGODB
    ADD MONGODB_CONFIG
        WITH CONNECTION_STRING "mongodb+srv://<username>:<password>@<cluster-uri>/<database>?tls=true&authSource=admin&replicaSet=<replica-set>"
        WITH DATABASE "admin"

Replace with your MongoDB connection details from DigitalOcean and run the Route in your LoT Notebook. Important: Use the VPC connection string format when available. The connection string should include tls=true and authSource=admin parameters. For troubleshooting MongoDB connections, see our guide on connecting to MongoDB.

Updating the Model for MongoDB Database Storage

Modify your LoT model to use the database route for scalable storage, by adding this to the end of the Model:

STORE IN "mongo_route"
    WITH TABLE "MachineProductionData"

Additionally, add a parameter with the topic, to have a unique identifier for each entry in your managed database.


DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
    ADD "energy_wh" WITH (energy * 1000)
    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
    ADD "timestamp" WITH TIMESTAMP "UTC"
    STORE IN "mongo_route"
        WITH TABLE "MachineProductionData"

After you deploy this updated action, all data should be automatically stored in the database when updated.

OpenSearch Integration

OpenSearch is a distributed search and analytics engine designed for large-scale data processing and real-time analytics. In this section, you’ll learn how to connect your Coreflux MQTT broker to a managed OpenSearch database on DigitalOcean, so your real-time device data is securely and reliably persisted for analytics, reporting, or integration with other applications.

To enable this integration, you must define a Route in Coreflux’s LoT (Language of Things) that instructs where and how the processed data should be sent. Below is the required low-code format for routing data to a OpenSearch database. Be sure to substitute your own connection details as needed:

DEFINE ROUTE OpenSearch_log WITH TYPE OPENSEARCH
    ADD OPENSEARCH_CONFIG
        WITH BASE_URL "https://my-opensearch-cluster:9200"
        WITH USERNAME "myuser"
        WITH PASSWORD "mypassword"
        WITH USE_SSL TRUE
        WITH IGNORE_CERT_ERRORS FALSE

Replace with your OpenSearch connection details from DigitalOcean and run the Route in your LoT Notebook. Important: Use the VPC base URL (not public) when available. The base URL format is typically https://your-cluster-hostname:9200. For OpenSearch Dashboards access, use the separate Dashboards URL provided in your database cluster details. See our OpenSearch quickstart for more details.

Updating the Model for OpenSearch Database Storage

Modify your LoT model to use the database route for scalable storage, by adding this to the end of the Model:

STORE IN "OpenSearch_Log"
    WITH TABLE "MachineProductionData"

Additionally, add a parameter with the topic, to have a unique identifier for each entry in your managed database.

DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
    ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
    ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
    ADD "energy_wh" WITH (energy * 1000)
    ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
    ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
    ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
    ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
    ADD "timestamp" WITH TIMESTAMP "UTC"
    STORE IN "OpenSearch_Log"
        WITH TABLE "MachineProductionData"

After you deploy this updated action, all data should be automatically stored in the database when updated.

Step 9 — Verifying the Complete IoT Automation Pipeline

Monitoring Real-Time Data Flow

  1. MQTT Explorer: Use an MQTT client to verify real-time data publication
  2. Database Client: Connect to verify the storage of data (DBeaver for PostgreSQL, MongoDB Compass for MongoDB, or OpenSearch Dashboards for OpenSearch)

Verifying PostgreSQL Storage

Connect to your PostgreSQL managed database using DBeaver to verify scalable storage:

  1. Use the connection string from your DigitalOcean database
  2. Navigate to the coreflux-broker-data database (or the name you gave to the database)
  3. Check the MachineProductionData table for stored records

PostgreSQL table showing stored IoT records

As we’ve seen before, all of the data is available in the MQTT Broker for other uses and integrations.

MQTT topics with real time machine data

Verifying MongoDB Storage

Connect to your MongoDB managed database using MongoDB Compass to verify scalable storage:

  1. Use the connection string from your DigitalOcean database
  2. Navigate to the coreflux-broker-data database (or the name you gave to the database)
  3. Check the MachineProductionData collection for stored documents

Checking Database Storage

You should see real-time data documents with structure similar to:

{
  "_id": {
    "$oid": "68626dc3e8385cbe9a1666c3"
  },
  "energy": 36,
  "energy_wh": 36000,
  "production_status": "active",
  "production_count": 31,
  "stoppage": 0,
  "maintenance_alert": false,
  "timestamp": "2025-06-30 10:58:11",
  "device_name": "station2"
}

As we’ve seen before, all of the data is available in the MQTT Broker for other uses and integrations.

Verifying MySQL Storage

Connect to your MySQL managed database using DBeaver to verify scalable storage:

  1. Use the connection string from your DigitalOcean database
  2. Navigate to the coreflux-broker-data database (or the name you gave to the database)
  3. Check the MachineProductionData table for stored records

Verifying stored IoT records in MySQL

As with the other integrations, all of the data is also available in the MQTT Broker for other uses and downstream integrations.

Monitoring Real-Time Data Flow

Verifying OpenSearch Storage

Open OpenSearch Dashboards with the provided URL and credentials:

  1. Open the Menu and select Index Management option
    1. Select the Indexes option in the Menu and see if your table name appears on the list

Checking Database Storage

  1. Go back to the main page and select the Discover option in the Menu
    1. Create an Index Pattern following the steps it provided
    2. Go back to the Discover page and you should see your data

Checking Database Storage

As we’ve seen before, all of the data is available in the MQTT Broker for other uses and integrations.

Checking Database Storage

Step 10 - Expand Your Use Case and Integrations

Test LoT Capabilities

  • Publish Sample Data: Use MQTT Explorer to publish sample datasets to your Coreflux broker. Experiment with different payload structures and different Models/Actions to see how they are processed and stored in your selected database.
  • Data Validation: Verify that the data in your database matches the payloads you published. Check for consistency and accuracy using your database client (DBeaver for PostgreSQL, MongoDB Compass for MongoDB, or OpenSearch Dashboards for OpenSearch), ensuring your IoT automation integration is working as expected. Compare timestamps, field transformations, and data types to validate your real-time data pipeline.
  • Real-Time Monitoring: Set up a continuous real-time data feed using another datasource of MQTT data, like a simple sensor with MQTT connectivity. Watch how Coreflux and your database handles incoming IoT data streams and explore response times for data retrieval and queries.

Build Analytics and Visualizations

  • Create Dashboards: Integrate with visualization tools like Grafana to create dashboards that display your IoT data, pulling from both live MQTT topics and historical database queries. Track metrics like device uptime, sensor readings, production counts, or maintenance alerts from your automation systems. Learn how to set up monitoring with our guide on monitoring for DigitalOcean managed databases with Prometheus and Grafana. For real-time dashboards, subscribe directly to MQTT topics; for historical trends and aggregates, query your database.
  • Trend Analysis: Leverage your database’s capabilities to analyze trends over time:
    • PostgreSQL: Use SQL queries for complex relational analysis
    • MongoDB: Use the aggregation framework for document-based analysis
    • OpenSearch: Use advanced analytics and search capabilities for full-text search and time-series analysis
  • Multi-Database Integration: Explore integrating additional managed databases like MongoDB for unstructured data, PostgreSQL for relational data, MySQL for structured queries, or OpenSearch for advanced analytics and search capabilities. Use Coreflux routes to send data to multiple destinations simultaneously.

Optimize and Scale Your IoT Infrastructure

  • Load Testing: Simulate high traffic by publishing many messages simultaneously using LoT Notebook or automated scripts. Monitor how your Coreflux MQTT broker and database cluster handle the load and identify any bottlenecks in your Data Pipeline.
  • Scaling: DigitalOcean offers vertical and horizontal scaling options. Increase droplet resources (CPU, RAM, or storage) as your IoT data needs grow. Scale your managed database cluster to handle larger datasets and configure auto-scaling alerts to notify you when approaching resource limits.

Frequently Asked Questions

1. How do I integrate Coreflux MQTT broker with a managed database?

You integrate Coreflux MQTT broker with a managed database by defining a Route in LoT that points to your target service (PostgreSQL, MySQL, MongoDB, or OpenSearch). Each route uses the appropriate connection parameters (server or connection string, port, database name, username, password, and SSL/TLS options) and automatically persists MQTT message payloads into tables, collections, or indexes. Once the route is defined, you attach it to a Model with the STORE IN directive so that every processed message is written to your chosen database.

2. Can I save MQTT data directly to databases without writing custom integration code?

Yes. Coreflux is designed as a low-code integration layer, so you do not need to write application code or external ETL jobs to persist data. For each database type, you configure a LoT route (for example, PostgreSQL_Log, MySQL_Log, mongo_route, or OpenSearch_Log) and then extend your model with STORE IN "<route_name>" WITH TABLE "MachineProductionData". Coreflux handles connection pooling, retries, and error handling, so you focus on modeling topics and transformations instead of boilerplate database code.

3. Which managed database should I choose for MQTT IoT data storage?

The best managed database for your MQTT IoT data depends on your data structure, query needs, and analytics goals. Use the below comparison table below to help you decide:

Database Best For Example Use Cases
PostgreSQL Strong consistency, relational schemas, complex SQL queries Industrial sensor networks, transactional events, analytics
MySQL Relational data, structured queries, wide compatibility Inventory systems, production metrics, traditional business records
MongoDB Flexible, evolving schemas; document storage Connected devices with variable payloads, IoT telemetry with changing formats
OpenSearch Full-text search, analytics, dashboards, log indexing Time-series analytics, monitoring, event logs, IoT search and visualization

Tip: You can use more than one managed database at the same time by configuring multiple Coreflux routes. This makes it possible to store structured IoT data in PostgreSQL or MySQL, aggregate logs and metrics in OpenSearch, and collect unstructured or schemaless data in MongoDB, all from the same MQTT stream.

4. How does this architecture handle both real-time and historical analytics?

Coreflux keeps all processed values available on MQTT topics for real-time consumption, dashboards, or additional pipelines, while Routes persist the same modeled data to your databases for historical queries. In practice, you can subscribe to topics for immediate reactions (alerts, control loops) and query PostgreSQL/MySQL/MongoDB/OpenSearch for aggregates, trends, and long-term analysis. This dual-path design mirrors common patterns in MQTT and IoT data integration guides, where a broker provides live messaging and databases provide durable storage and analytics.

5. How secure is the connection between Coreflux and managed databases?

When deploying on DigitalOcean, you can use VPC networking to keep all communication between your Coreflux MQTT broker and databases private. The VPC isolates your resources from public internet access, and DigitalOcean managed databases support TLS encryption for connections. Additionally, you can create dedicated database users with limited permissions for your Coreflux application, following the principle of least privilege.

6. Is this setup suitable for production IoT deployments?

Yes. This architecture mirrors patterns used in production MQTT and database integrations, where a broker front-ends device traffic and a managed database tier provides durability and analytics. DigitalOcean managed databases offer automated backups, high availability, and monitoring, while Coreflux MQTT broker can scale horizontally to handle high message throughput. For production, you should also configure firewall rules, use strong credentials, enable TLS for MQTT and database connections, and size your droplets and clusters based on expected message volume.

7. Can I run the MQTT broker without public internet access, or in hybrid environments?

Yes. MQTT brokers are often deployed in private networks or edge environments, and public resources consistently note that MQTT can work without the public internet as long as clients can reach the broker. With DigitalOcean, you can keep Coreflux and your databases inside a VPC and only expose what is strictly necessary (for example, a VPN, bastion host, or limited firewall rules). You can also synchronize selected topics with other brokers or cloud regions if you need hybrid or multi-site architectures.

8. Are there any limitations or trade-offs when using MQTT with databases for IoT data?

MQTT is optimized for lightweight, event-driven messaging; databases are optimized for storage and querying. Storing every raw message can become expensive or noisy, so best practices recommend modeling data carefully (for example, aggregating metrics, filtering topics, or downsampling). Very low-power devices or ultra-constrained networks might struggle with persistent connections or TLS overhead, in which case you may need to tune QoS levels, batching, and retention policies. As long as you design your models and routes with these trade-offs in mind, MQTT plus managed databases works well for most IoT scenarios.

9. How do I choose between PostgreSQL, MySQL, MongoDB, and OpenSearch for my IoT project?

You should choose a managed database based on your IoT data structure, scalability, and how you want to query your device data. The table below summarizes the strengths of each option:

Database Best When… Typical Use Cases Key Strengths
PostgreSQL You need complex relational queries, strong consistency, and transactional integrity (ACID support). Industrial sensor networks, correlating device data with production, needing analytics over joined datasets Relational schemas, advanced SQL, consistency
MySQL Your workloads are structured, with wide tooling and compatibility needs. Inventory tracking, traditional business systems, production metrics Simpler relational needs, broad support
MongoDB Your device payloads and schemas evolve, or you want fast prototyping with flexible, document-based storage. IoT telemetry with variable formats, rapid development, semi-structured data Flexible schemas, easy scaling, fast prototyping
OpenSearch You need to analyze, search, or dashboard large volumes of IoT data (logs, time series, events). Searching sensor data, log analytics, visualization, keyword/time-based queries Search, full-text, analytics, fast aggregation

Conclusion

Integrating Coreflux MQTT broker with DigitalOcean’s managed database services (PostgreSQL, MongoDB, MySQL, or OpenSearch) gives you a complete setup for real-time IoT data processing and storage. Following this tutorial, you’ve built an automation pipeline that collects, processes, and stores IoT data using low-code development practices.

With Coreflux’s architecture and your chosen database’s storage features, you can handle large volumes of real-time data and query it for insights. Whether you’re monitoring industrial systems, tracking environmental sensors, or managing smart city infrastructure, this setup lets you make data-driven decisions based on both live MQTT topics and historical database queries.

You can learn more about DigitalOcean managed databases and explore advanced Droplet configurations in the DigitalOcean documentation.

You can try the provided use-cases or implement your own use-cases using Coreflux & DigitalOcean. You can also get the free Coreflux MQTT Broker on DigitalOcean Droplet Marketplace or through Coreflux website.

Learn more about what you can do with Coreflux and LoT in the Coreflux Docs and Tutorials.

Additional Resources and Documentation

Note: While this tutorial uses the Coreflux or Docker Marketplace image for simplified deployment, you can also install Coreflux MQTT broker directly on Ubuntu. For manual installation instructions, visit the Coreflux Installation Guide.

DigitalOcean Setup Guides

Project Management

Network Infrastructure

Droplet Deployment

Database Management

Server Management

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about our products

About the author(s)

Hugo Vaz
Hugo Vaz
Author
Anish Singh Walia
Anish Singh Walia
Editor
Sr Technical Writer
See author profile

I help Businesses scale with AI x SEO x (authentic) Content that revives traffic and keeps leads flowing | 3,000,000+ Average monthly readers on Medium | Sr Technical Writer @ DigitalOcean | Ex-Cloud Consultant @ AMEX | Ex-Site Reliability Engineer(DevOps)@Nutanix

Vinayak Baranwal
Vinayak Baranwal
Editor
Technical Writer II
See author profile

Building future-ready infrastructure with Linux, Cloud, and DevOps. Full Stack Developer & System Administrator. Technical Writer @ DigitalOcean | GitHub Contributor | Passionate about Docker, PostgreSQL, and Open Source | Exploring NLP & AI-TensorFlow | Nailed over 50+ deployments across production environments.

Still looking for an answer?

Was this helpful?


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Creative CommonsThis work is licensed under a Creative Commons Attribution-NonCommercial- ShareAlike 4.0 International License.
Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

The developer cloud

Scale up as you grow — whether you're running one virtual machine or ten thousand.

Get started for free

Sign up and get $200 in credit for your first 60 days with DigitalOcean.*

*This promotional offer applies to new accounts only.