You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
uptime-kuma/server/monitor-types/mqtt.js

122 lines
4.2 KiB

const { MonitorType } = require("./monitor-type");
const { log, UP } = require("../../src/util");
const mqtt = require("mqtt");
const jsonata = require("jsonata");
class MqttMonitorType extends MonitorType {
name = "mqtt";
/**
* Run the monitoring check on the MQTT monitor
* @param {Monitor} monitor Monitor to check
* @param {Heartbeat} heartbeat Monitor heartbeat to update
* @param {UptimeKumaServer} server Uptime Kuma server
* @returns {Promise<void>}
*/
async check(monitor, heartbeat, server) {
const receivedMessage = await this.mqttAsync(monitor.hostname, monitor.mqttTopic, {
port: monitor.port,
username: monitor.mqttUsername,
password: monitor.mqttPassword,
interval: monitor.interval,
});
if (monitor.mqttCheckType == null || monitor.mqttCheckType === "") {
// use old default
monitor.mqttCheckType = "keyword";
}
if (monitor.mqttCheckType === "keyword") {
if (receivedMessage != null && receivedMessage.includes(monitor.mqttSuccessMessage)) {
heartbeat.msg = `Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`;
heartbeat.status = UP;
} else {
throw Error(`Message Mismatch - Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`);
}
} else if (monitor.mqttCheckType === "json-query") {
const parsedMessage = JSON.parse(receivedMessage);
let expression = jsonata(monitor.jsonPath);
let result = await expression.evaluate(parsedMessage);
if (result?.toString() === monitor.expectedValue) {
heartbeat.msg = "Message received, expected value is found";
heartbeat.status = UP;
} else {
throw new Error("Message received but value is not equal to expected value, value was: [" + result + "]");
}
} else {
throw Error("Unknown MQTT Check Type");
}
}
/**
* Connect to MQTT Broker, subscribe to topic and receive message as String
* @param {string} hostname Hostname / address of machine to test
* @param {string} topic MQTT topic
* @param {object} options MQTT options. Contains port, username,
* password and interval (interval defaults to 20)
* @returns {Promise<string>} Received MQTT message
*/
mqttAsync(hostname, topic, options = {}) {
return new Promise((resolve, reject) => {
const { port, username, password, interval = 20 } = options;
// Adds MQTT protocol to the hostname if not already present
if (!/^(?:http|mqtt|ws)s?:\/\//.test(hostname)) {
hostname = "mqtt://" + hostname;
}
const timeoutID = setTimeout(() => {
log.debug("mqtt", "MQTT timeout triggered");
client.end();
reject(new Error("Timeout, Message not received"));
}, interval * 1000 * 0.8);
const mqttUrl = `${hostname}:${port}`;
log.debug("mqtt", `MQTT connecting to ${mqttUrl}`);
let client = mqtt.connect(mqttUrl, {
username,
password
});
client.on("connect", () => {
log.debug("mqtt", "MQTT connected");
try {
client.subscribe(topic, () => {
log.debug("mqtt", "MQTT subscribed to topic");
});
} catch (e) {
client.end();
clearTimeout(timeoutID);
reject(new Error("Cannot subscribe topic"));
}
});
client.on("error", (error) => {
client.end();
clearTimeout(timeoutID);
reject(error);
});
client.on("message", (messageTopic, message) => {
if (messageTopic === topic) {
client.end();
clearTimeout(timeoutID);
resolve(message.toString("utf8"));
}
});
});
}
}
module.exports = {
MqttMonitorType,
};