Merge pull request #875 from tarun7singh/master

Added MQTT Monitor type
This commit is contained in:
Louis Lam 2022-04-17 20:05:41 +08:00 committed by GitHub
commit ceba096f3e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 955 additions and 43 deletions

View file

@ -0,0 +1,16 @@
-- You should not modify if this have pushed to Github, unless it does serious wrong with the db.
BEGIN TRANSACTION;
ALTER TABLE monitor
ADD mqtt_topic TEXT;
ALTER TABLE monitor
ADD mqtt_success_message VARCHAR(255);
ALTER TABLE monitor
ADD mqtt_username VARCHAR(255);
ALTER TABLE monitor
ADD mqtt_password VARCHAR(255);
COMMIT;

View file

@ -0,0 +1,50 @@
const { log } = require("../src/util");
const mqttUsername = "louis1";
const mqttPassword = "!@#$LLam";
class SimpleMqttServer {
aedes = require("aedes")();
server = require("net").createServer(this.aedes.handle);
constructor(port) {
this.port = port;
}
start() {
this.server.listen(this.port, () => {
console.log("server started and listening on port ", this.port);
});
}
}
let server1 = new SimpleMqttServer(10000);
server1.aedes.authenticate = function (client, username, password, callback) {
if (username && password) {
console.log(password.toString("utf-8"));
callback(null, username === mqttUsername && password.toString("utf-8") === mqttPassword);
} else {
callback(null, false);
}
};
server1.aedes.on("subscribe", (subscriptions, client) => {
console.log(subscriptions);
for (let s of subscriptions) {
if (s.topic === "test") {
server1.aedes.publish({
topic: "test",
payload: Buffer.from("ok"),
}, (error) => {
if (error) {
log.error("mqtt_server", error);
}
});
}
}
});
server1.start();

708
package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -49,6 +49,7 @@
"test-install-script-ubuntu1604": "npm run compile-install-script && docker build --progress plain -f test/test_install_script/ubuntu1604.dockerfile .",
"test-nodejs16": "docker build --progress plain -f test/ubuntu-nodejs16.dockerfile .",
"simple-dns-server": "node extra/simple-dns-server.js",
"simple-mqtt-server": "node extra/simple-mqtt-server.js",
"update-language-files-with-base-lang": "cd extra/update-language-files && node index.js %npm_config_base_lang% && eslint ../../src/languages/**.js --fix",
"update-language-files": "cd extra/update-language-files && node index.js && eslint ../../src/languages/**.js --fix",
"ncu-patch": "npm-check-updates -u -t patch",
@ -86,6 +87,7 @@
"jsonwebtoken": "~8.5.1",
"jwt-decode": "^3.1.2",
"limiter": "^2.1.0",
"mqtt": "^4.2.8",
"node-cloudflared-tunnel": "~1.0.9",
"nodemailer": "~6.6.5",
"notp": "~2.0.3",
@ -126,6 +128,7 @@
"@vitejs/plugin-legacy": "~1.6.4",
"@vitejs/plugin-vue": "~1.9.4",
"@vue/compiler-sfc": "~3.2.31",
"aedes": "^0.46.3",
"babel-plugin-rewire": "~1.2.0",
"core-js": "~3.18.3",
"cross-env": "~7.0.3",

View file

@ -57,6 +57,7 @@ class Database {
"patch-proxy.sql": true,
"patch-monitor-expiry-notification.sql": true,
"patch-status-page-footer-css.sql": true,
"patch-added-mqtt-monitor.sql": true,
}
/**

View file

View file

@ -7,7 +7,7 @@ dayjs.extend(timezone);
const axios = require("axios");
const { Prometheus } = require("../prometheus");
const { log, UP, DOWN, PENDING, flipStatus, TimeLogger } = require("../../src/util");
const { tcping, ping, dnsResolve, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, errorLog } = require("../util-server");
const { tcping, ping, dnsResolve, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, errorLog, mqttAsync } = require("../util-server");
const { R } = require("redbean-node");
const { BeanModel } = require("redbean-node/dist/bean-model");
const { Notification } = require("../notification");
@ -81,6 +81,10 @@ class Monitor extends BeanModel {
proxyId: this.proxy_id,
notificationIDList,
tags: tags,
mqttUsername: this.mqttUsername,
mqttPassword: this.mqttPassword,
mqttTopic: this.mqttTopic,
mqttSuccessMessage: this.mqttSuccessMessage
};
if (includeSensitiveData) {
@ -159,7 +163,7 @@ class Monitor extends BeanModel {
// undefined if not https
let tlsInfo = undefined;
if (! previousBeat) {
if (!previousBeat) {
previousBeat = await R.findOne("heartbeat", " monitor_id = ? ORDER BY time DESC", [
this.id,
]);
@ -177,7 +181,7 @@ class Monitor extends BeanModel {
}
// Duration
if (! isFirstBeat) {
if (!isFirstBeat) {
bean.duration = dayjs(bean.time).diff(dayjs(previousBeat.time), "second");
} else {
bean.duration = 0;
@ -382,7 +386,7 @@ class Monitor extends BeanModel {
},
httpsAgent: new https.Agent({
maxCachedSessions: 0, // Use Custom agent to disable session reuse (https://github.com/nodejs/node/issues/3940)
rejectUnauthorized: ! this.getIgnoreTls(),
rejectUnauthorized: !this.getIgnoreTls(),
}),
maxRedirects: this.maxredirects,
validateStatus: (status) => {
@ -404,7 +408,14 @@ class Monitor extends BeanModel {
} else {
throw new Error("Server not found on Steam");
}
} else if (this.type === "mqtt") {
bean.msg = await mqttAsync(this.hostname, this.mqttTopic, this.mqttSuccessMessage, {
port: this.port,
username: this.mqttUsername,
password: this.mqttPassword,
interval: this.interval,
});
bean.status = UP;
} else {
bean.msg = "Unknown Monitor Type";
bean.status = PENDING;
@ -683,7 +694,7 @@ class Monitor extends BeanModel {
} else {
// Handle new monitor with only one beat, because the beat's duration = 0
let status = parseInt(await R.getCell("SELECT `status` FROM heartbeat WHERE monitor_id = ?", [ monitorID ]));
let status = parseInt(await R.getCell("SELECT `status` FROM heartbeat WHERE monitor_id = ?", [monitorID]));
if (status === UP) {
uptime = 1;

View file

@ -720,6 +720,9 @@ try {
bean.dns_resolve_server = monitor.dns_resolve_server;
bean.pushToken = monitor.pushToken;
bean.proxyId = Number.isInteger(monitor.proxyId) ? monitor.proxyId : null;
bean.mqttUsername = monitor.mqttUsername;
bean.mqttTopic = monitor.mqttTopic;
bean.mqttSuccessMessage = monitor.mqttSuccessMessage;
await R.store(bean);

View file

@ -9,6 +9,7 @@ const iconv = require("iconv-lite");
const chardet = require("chardet");
const fs = require("fs");
const nodeJsUtil = require("util");
const mqtt = require("mqtt");
// From ping-lite
exports.WIN = /^win/.test(process.platform);
@ -26,7 +27,7 @@ exports.initJWTSecret = async () => {
"jwtSecret",
]);
if (! jwtSecretBean) {
if (!jwtSecretBean) {
jwtSecretBean = R.dispense("setting");
jwtSecretBean.key = "jwtSecret";
}
@ -88,6 +89,63 @@ exports.pingAsync = function (hostname, ipv6 = false) {
});
};
exports.mqttAsync = function (hostname, topic, okMessage, options = {}) {
return new Promise((resolve, reject) => {
const { port, username, password, interval = 20 } = options;
// Adds MQTT protocol to the hostname if not already present
if (!/^(?:http|mqtt)s?:\/\//.test(hostname)) {
hostname = "mqtt://" + hostname;
}
const timeoutID = setTimeout(() => {
log.debug("mqtt", "MQTT timeout triggered");
client.end();
reject(new Error("Timeout"));
}, interval * 1000 * 0.8);
log.debug("mqtt", "MQTT connecting");
let client = mqtt.connect(hostname, {
port,
username,
password
});
client.on("connect", () => {
log.debug("mqtt", "MQTT connected");
try {
log.debug("mqtt", "MQTT subscribe topic");
client.subscribe(topic);
} catch (e) {
client.end();
clearTimeout(timeoutID);
reject(new Error("Cannot subscribe topic"));
}
});
client.on("error", (error) => {
client.end();
clearTimeout(timeoutID);
reject(error);
});
client.on("message", (messageTopic, message) => {
if (messageTopic == topic) {
client.end();
clearTimeout(timeoutID);
if (message.toString() === okMessage) {
resolve(`Topic: ${messageTopic}; Message: ${message.toString()}`);
} else {
reject(new Error(`Error; Topic: ${messageTopic}; Message: ${message.toString()}`));
}
}
});
});
};
exports.dnsResolve = function (hostname, resolverServer, rrtype) {
const resolver = new Resolver();
resolver.setServers([ resolverServer ]);
@ -206,7 +264,7 @@ const parseCertificateInfo = function (info) {
const existingList = {};
while (link) {
log.debug("util", `[${i}] ${link.fingerprint}`);
log.debug("cert", `[${i}] ${link.fingerprint}`);
if (!link.valid_from || !link.valid_to) {
break;
@ -221,7 +279,7 @@ const parseCertificateInfo = function (info) {
if (link.issuerCertificate == null) {
break;
} else if (link.issuerCertificate.fingerprint in existingList) {
log.debug("util", `[Last] ${link.issuerCertificate.fingerprint}`);
log.debug("cert", `[Last] ${link.issuerCertificate.fingerprint}`);
link.issuerCertificate = null;
break;
} else {
@ -242,7 +300,7 @@ exports.checkCertificate = function (res) {
const info = res.request.res.socket.getPeerCertificate(true);
const valid = res.request.res.socket.authorized || false;
log.debug("util", "Parsing Certificate Info");
log.debug("cert", "Parsing Certificate Info");
const parsedInfo = parseCertificateInfo(info);
return {
@ -284,13 +342,13 @@ exports.getTotalClientInRoom = (io, roomName) => {
const sockets = io.sockets;
if (! sockets) {
if (!sockets) {
return 0;
}
const adapter = sockets.adapter;
if (! adapter) {
if (!adapter) {
return 0;
}
@ -315,7 +373,7 @@ exports.allowAllOrigin = (res) => {
};
exports.checkLogin = (socket) => {
if (! socket.userID) {
if (!socket.userID) {
throw new Error("You are not logged in.");
}
};

View file

@ -309,6 +309,10 @@ export default {
"One record": "One record",
steamApiKeyDescription: "For monitoring a Steam Game Server you need a Steam Web-API key. You can register your API key here: ",
"Current User": "Current User",
topic: "Topic",
topicExplanation: "MQTT topic to monitor",
successMessage: "Success Message",
successMessageExplanation: "MQTT message that will be considered as success",
recent: "Recent",
Done: "Done",
Info: "Info",

View file

@ -32,6 +32,9 @@
<option value="steam">
Steam Game Server
</option>
<option value="mqtt">
MQTT
</option>
</select>
</div>
@ -67,15 +70,15 @@
</div>
<!-- Hostname -->
<!-- TCP Port / Ping / DNS / Steam only -->
<div v-if="monitor.type === 'port' || monitor.type === 'ping' || monitor.type === 'dns' || monitor.type === 'steam'" class="my-3">
<!-- TCP Port / Ping / DNS / Steam / MQTT only -->
<div v-if="monitor.type === 'port' || monitor.type === 'ping' || monitor.type === 'dns' || monitor.type === 'steam' || monitor.type === 'mqtt'" class="my-3">
<label for="hostname" class="form-label">{{ $t("Hostname") }}</label>
<input id="hostname" v-model="monitor.hostname" type="text" class="form-control" :pattern="`${ipRegexPattern}|${hostnameRegexPattern}`" required>
</div>
<!-- Port -->
<!-- For TCP Port / Steam Type -->
<div v-if="monitor.type === 'port' || monitor.type === 'steam'" class="my-3">
<!-- For TCP Port / Steam / MQTT Type -->
<div v-if="monitor.type === 'port' || monitor.type === 'steam' || monitor.type === 'mqtt'" class="my-3">
<label for="port" class="form-label">{{ $t("Port") }}</label>
<input id="port" v-model="monitor.port" type="number" class="form-control" required min="0" max="65535" step="1">
</div>
@ -115,6 +118,36 @@
</div>
</template>
<!-- MQTT -->
<!-- For MQTT Type -->
<template v-if="monitor.type === 'mqtt'">
<div class="my-3">
<label for="mqttUsername" class="form-label">MQTT {{ $t("Username") }}</label>
<input id="mqttUsername" v-model="monitor.mqttUsername" type="text" class="form-control">
</div>
<div class="my-3">
<label for="mqttPassword" class="form-label">MQTT {{ $t("Password") }}</label>
<input id="mqttPassword" v-model="monitor.mqttPassword" type="password" class="form-control">
</div>
<div class="my-3">
<label for="mqttTopic" class="form-label">MQTT {{ $t("Topic") }}</label>
<input id="mqttTopic" v-model="monitor.mqttTopic" type="text" class="form-control" required>
<div class="form-text">
{{ $t("topicExplanation") }}
</div>
</div>
<div class="my-3">
<label for="mqttSuccessMessage" class="form-label">MQTT {{ $t("successMessage") }}</label>
<input id="mqttSuccessMessage" v-model="monitor.mqttSuccessMessage" type="text" class="form-control" required>
<div class="form-text">
{{ $t("successMessageExplanation") }}
</div>
</div>
</template>
<!-- Interval -->
<div class="my-3">
<label for="interval" class="form-label">{{ $t("Heartbeat Interval") }} ({{ $t("checkEverySecond", [ monitor.interval ]) }})</label>
@ -139,7 +172,7 @@
<h2 v-if="monitor.type !== 'push'" class="mt-5 mb-2">{{ $t("Advanced") }}</h2>
<div class="my-3 form-check">
<div v-if="monitor.type === 'http' || monitor.type === 'keyword' " class="my-3 form-check">
<input id="expiry-notification" v-model="monitor.expiryNotification" class="form-check-input" type="checkbox">
<label class="form-check-label" for="expiry-notification">
{{ $t("Domain Name Expiry Notification") }}
@ -492,6 +525,10 @@ export default {
dns_resolve_type: "A",
dns_resolve_server: "1.1.1.1",
proxyId: null,
mqttUsername: "",
mqttPassword: "",
mqttTopic: "",
mqttSuccessMessage: "",
};
if (this.$root.proxyList && !this.monitor.proxyId) {

View file

@ -54,7 +54,39 @@ function debug(msg) {
}
exports.debug = debug;
class Logger {
constructor() {
/**
* UPTIME_KUMA_HIDE_LOG=debug_monitor,info_monitor
*
* Example:
* [
* "debug_monitor", // Hide all logs that level is debug and the module is monitor
* "info_monitor",
* ]
*/
this.hideLog = {
info: [],
warn: [],
error: [],
debug: [],
};
if (typeof process !== "undefined" && process.env.UPTIME_KUMA_HIDE_LOG) {
let list = process.env.UPTIME_KUMA_HIDE_LOG.split(",").map(v => v.toLowerCase());
for (let pair of list) {
// split first "_" only
let values = pair.split(/_(.*)/s);
if (values.length >= 2) {
this.hideLog[values[0]].push(values[1]);
}
}
this.debug("server", "UPTIME_KUMA_HIDE_LOG is set");
this.debug("server", this.hideLog);
}
}
log(module, msg, level) {
if (this.hideLog[level] && this.hideLog[level].includes(module)) {
return;
}
module = module.toUpperCase();
level = level.toUpperCase();
const now = new Date().toISOString();

View file

@ -59,7 +59,46 @@ export function debug(msg: any) {
}
class Logger {
/**
* UPTIME_KUMA_HIDE_LOG=debug_monitor,info_monitor
*
* Example:
* [
* "debug_monitor", // Hide all logs that level is debug and the module is monitor
* "info_monitor",
* ]
*/
hideLog : any = {
info: [],
warn: [],
error: [],
debug: [],
};
constructor() {
if (typeof process !== "undefined" && process.env.UPTIME_KUMA_HIDE_LOG) {
let list = process.env.UPTIME_KUMA_HIDE_LOG.split(",").map(v => v.toLowerCase());
for (let pair of list) {
// split first "_" only
let values = pair.split(/_(.*)/s);
if (values.length >= 2) {
this.hideLog[values[0]].push(values[1]);
}
}
this.debug("server", "UPTIME_KUMA_HIDE_LOG is set");
this.debug("server", this.hideLog);
}
}
log(module: string, msg: any, level: string) {
if (this.hideLog[level] && this.hideLog[level].includes(module)) {
return;
}
module = module.toUpperCase();
level = level.toUpperCase();