feat: added kafka producer (#3268)

*  feat: added kafka producer

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: eslint warn

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: typings and auth problems

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: better variable name to trrack disconnection

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: grouping Kafka Producer special settings into one template

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

*  feat: add kafka producer translations into `en.json`

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: disable close-on-select on kafka broker picker

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* 🐛 fix: `en.json` invalid json (conflict resolve)

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>

* Nostr dm notifications (#3051)

* Add nostr DM notification provider

* require crypto for node 18 compatibility

* remove whitespace

Co-authored-by: Frank Elsinga <frank@elsinga.de>

* move closer to where it is used

* simplify success or failure logic

* don't clobber the non-alert msg

* Update server/notification-providers/nostr.js

Co-authored-by: Frank Elsinga <frank@elsinga.de>

* polyfills required for node <= 18

* resolve linter warnings

* missing comma

---------

Co-authored-by: Frank Elsinga <frank@elsinga.de>

* Drop nostr

* Minor

* Fix a bug of clone

---------

Signed-off-by: Muhammed Hussein Karimi <info@karimi.dev>
Co-authored-by: Frank Elsinga <frank@elsinga.de>
Co-authored-by: Louis Lam <louislam@users.noreply.github.com>
pull/3438/head
Muhammed Hussein karimi 10 months ago committed by GitHub
parent 084cf01fcd
commit 278b88a9d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,22 @@
-- You should not modify if this have pushed to Github, unless it does serious wrong with the db.
BEGIN TRANSACTION;
ALTER TABLE monitor
ADD kafka_producer_topic VARCHAR(255);
ALTER TABLE monitor
ADD kafka_producer_brokers TEXT;
ALTER TABLE monitor
ADD kafka_producer_ssl INTEGER;
ALTER TABLE monitor
ADD kafka_producer_allow_auto_topic_creation VARCHAR(255);
ALTER TABLE monitor
ADD kafka_producer_sasl_options TEXT;
ALTER TABLE monitor
ADD kafka_producer_message TEXT;
COMMIT;

9
package-lock.json generated

@ -41,6 +41,7 @@
"jsonata": "^2.0.3",
"jsonwebtoken": "~9.0.0",
"jwt-decode": "~3.1.2",
"kafkajs": "^2.2.4",
"limiter": "~2.1.0",
"liquidjs": "^10.7.0",
"mongodb": "~4.14.0",
@ -13009,6 +13010,14 @@
"resolved": "https://registry.npmjs.org/jwt-decode/-/jwt-decode-3.1.2.tgz",
"integrity": "sha512-UfpWE/VZn0iP50d8cz9NrZLM9lSWhcJ+0Gt/nm4by88UL+J1SiKN8/5dkjMmbEzwL2CAe+67GsegCbIKtbp75A=="
},
"node_modules/kafkajs": {
"version": "2.2.4",
"resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz",
"integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==",
"engines": {
"node": ">=14.0.0"
}
},
"node_modules/keyv": {
"version": "4.5.2",
"resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.2.tgz",

@ -101,6 +101,7 @@
"jsonata": "^2.0.3",
"jsonwebtoken": "~9.0.0",
"jwt-decode": "~3.1.2",
"kafkajs": "^2.2.4",
"limiter": "~2.1.0",
"liquidjs": "^10.7.0",
"mongodb": "~4.14.0",

@ -73,6 +73,7 @@ class Database {
"patch-add-parent-monitor.sql": true,
"patch-add-invert-keyword.sql": true,
"patch-added-json-query.sql": true,
"patch-added-kafka-producer.sql": true,
};
/**

@ -6,7 +6,7 @@ const { log, UP, DOWN, PENDING, MAINTENANCE, flipStatus, TimeLogger, MAX_INTERVA
SQL_DATETIME_FORMAT
} = require("../../src/util");
const { tcping, ping, dnsResolve, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, mssqlQuery, postgresQuery, mysqlQuery, mqttAsync, setSetting, httpNtlm, radius, grpcQuery,
redisPingAsync, mongodbPing,
redisPingAsync, mongodbPing, kafkaProducerAsync
} = require("../util-server");
const { R } = require("redbean-node");
const { BeanModel } = require("redbean-node/dist/bean-model");
@ -129,6 +129,11 @@ class Monitor extends BeanModel {
httpBodyEncoding: this.httpBodyEncoding,
jsonPath: this.jsonPath,
expectedValue: this.expectedValue,
kafkaProducerTopic: this.kafkaProducerTopic,
kafkaProducerBrokers: JSON.parse(this.kafkaProducerBrokers),
kafkaProducerSsl: this.kafkaProducerSsl === "1" && true || false,
kafkaProducerAllowAutoTopicCreation: this.kafkaProducerAllowAutoTopicCreation === "1" && true || false,
kafkaProducerMessage: this.kafkaProducerMessage,
screenshot,
};
@ -153,6 +158,7 @@ class Monitor extends BeanModel {
tlsCa: this.tlsCa,
tlsCert: this.tlsCert,
tlsKey: this.tlsKey,
kafkaProducerSaslOptions: JSON.parse(this.kafkaProducerSaslOptions),
};
}
@ -792,6 +798,24 @@ class Monitor extends BeanModel {
bean.ping = dayjs().valueOf() - startTime;
}
} else if (this.type === "kafka-producer") {
let startTime = dayjs().valueOf();
bean.msg = await kafkaProducerAsync(
JSON.parse(this.kafkaProducerBrokers),
this.kafkaProducerTopic,
this.kafkaProducerMessage,
{
allowAutoTopicCreation: this.kafkaProducerAllowAutoTopicCreation,
ssl: this.kafkaProducerSsl,
clientId: `Uptime-Kuma/${version}`,
interval: this.interval,
},
JSON.parse(this.kafkaProducerSaslOptions),
);
bean.status = UP;
bean.ping = dayjs().valueOf() - startTime;
} else {
throw new Error("Unknown Monitor Type");
}

@ -643,6 +643,9 @@ let needSetup = false;
monitor.accepted_statuscodes_json = JSON.stringify(monitor.accepted_statuscodes);
delete monitor.accepted_statuscodes;
monitor.kafkaProducerBrokers = JSON.stringify(monitor.kafkaProducerBrokers);
monitor.kafkaProducerSaslOptions = JSON.stringify(monitor.kafkaProducerSaslOptions);
bean.import(monitor);
bean.user_id = socket.userID;
@ -757,6 +760,11 @@ let needSetup = false;
bean.httpBodyEncoding = monitor.httpBodyEncoding;
bean.expectedValue = monitor.expectedValue;
bean.jsonPath = monitor.jsonPath;
bean.kafkaProducerTopic = monitor.kafkaProducerTopic;
bean.kafkaProducerBrokers = JSON.stringify(monitor.kafkaProducerBrokers);
bean.kafkaProducerAllowAutoTopicCreation = monitor.kafkaProducerAllowAutoTopicCreation;
bean.kafkaProducerSaslOptions = JSON.stringify(monitor.kafkaProducerSaslOptions);
bean.kafkaProducerMessage = monitor.kafkaProducerMessage;
bean.validate();

@ -28,8 +28,11 @@ const {
} = require("node-radius-utils");
const dayjs = require("dayjs");
const isWindows = process.platform === /^win/.test(process.platform);
// SASLOptions used in JSDoc
// eslint-disable-next-line no-unused-vars
const { Kafka, SASLOptions } = require("kafkajs");
const isWindows = process.platform === /^win/.test(process.platform);
/**
* Init or reset JWT secret
* @returns {Promise<Bean>}
@ -196,6 +199,94 @@ exports.mqttAsync = function (hostname, topic, okMessage, options = {}) {
});
};
/**
* Monitor Kafka using Producer
* @param {string} topic Topic name to produce into
* @param {string} message Message to produce
* @param {Object} [options={interval = 20, allowAutoTopicCreation = false, ssl = false, clientId = "Uptime-Kuma"}]
* Kafka client options. Contains ssl, clientId, allowAutoTopicCreation and
* interval (interval defaults to 20, allowAutoTopicCreation defaults to false, clientId defaults to "Uptime-Kuma"
* and ssl defaults to false)
* @param {string[]} brokers List of kafka brokers to connect, host and port joined by ':'
* @param {SASLOptions} [saslOptions={}] Options for kafka client Authentication (SASL) (defaults to
* {})
* @returns {Promise<string>}
*/
exports.kafkaProducerAsync = function (brokers, topic, message, options = {}, saslOptions = {}) {
return new Promise((resolve, reject) => {
const { interval = 20, allowAutoTopicCreation = false, ssl = false, clientId = "Uptime-Kuma" } = options;
let connectedToKafka = false;
const timeoutID = setTimeout(() => {
log.debug("kafkaProducer", "KafkaProducer timeout triggered");
connectedToKafka = true;
reject(new Error("Timeout"));
}, interval * 1000 * 0.8);
if (saslOptions.mechanism === "None") {
saslOptions = undefined;
}
let client = new Kafka({
brokers: brokers,
clientId: clientId,
sasl: saslOptions,
retry: {
retries: 0,
},
ssl: ssl,
});
let producer = client.producer({
allowAutoTopicCreation: allowAutoTopicCreation,
retry: {
retries: 0,
}
});
producer.connect().then(
() => {
try {
producer.send({
topic: topic,
messages: [{
value: message,
}],
});
connectedToKafka = true;
clearTimeout(timeoutID);
resolve("Message sent successfully");
} catch (e) {
connectedToKafka = true;
producer.disconnect();
clearTimeout(timeoutID);
reject(new Error("Error sending message: " + e.message));
}
}
).catch(
(e) => {
connectedToKafka = true;
producer.disconnect();
clearTimeout(timeoutID);
reject(new Error("Error in producer connection: " + e.message));
}
);
producer.on("producer.network.request_timeout", (_) => {
clearTimeout(timeoutID);
reject(new Error("producer.network.request_timeout"));
});
producer.on("producer.disconnect", (_) => {
if (!connectedToKafka) {
clearTimeout(timeoutID);
reject(new Error("producer.disconnect"));
}
});
});
};
/**
* Use NTLM Auth for a http request.
* @param {Object} options The http request options

@ -768,6 +768,20 @@
"Badge URL": "Badge URL",
"Group": "Group",
"Monitor Group": "Monitor Group",
"Kafka Brokers": "Kafka Brokers",
"Enter the list of brokers": "Enter the list of brokers",
"Press Enter to add broker": "Press Enter to add broker",
"Kafka Topic Name": "Kafka Topic Name",
"Kafka Producer Message": "Kafka Producer Message",
"Enable Kafka SSL": "Enable Kafka SSL",
"Enable Kafka Producer Auto Topic Creation": "Enable Kafka Producer Auto Topic Creation",
"Kafka SASL Options": "Kafka SASL Options",
"Mechanism": "Mechanism",
"Pick a SASL Mechanism...": "Pick a SASL Mechanism...",
"Authorization Identity": "Authorization Identity",
"AccessKey Id": "AccessKey Id",
"Secret AccessKey": "Secret AccessKey",
"Session Token": "Session Token",
"noGroupMonitorMsg": "Not Available. Create a Group Monitor First.",
"Close": "Close",
"Request Body": "Request Body"

@ -61,6 +61,9 @@
<option value="mqtt">
MQTT
</option>
<option value="kafka-producer">
Kafka Producer
</option>
<option value="sqlserver">
Microsoft SQL Server
</option>
@ -166,6 +169,57 @@
</select>
</div>
<template v-if="monitor.type === 'kafka-producer'">
<!-- Kafka Brokers List -->
<div class="my-3">
<label for="kafkaProducerBrokers" class="form-label">{{ $t("Kafka Brokers") }}</label>
<VueMultiselect
id="kafkaProducerBrokers"
v-model="monitor.kafkaProducerBrokers"
:multiple="true"
:options="[]"
:placeholder="$t('Enter the list of brokers')"
:tag-placeholder="$t('Press Enter to add broker')"
:max-height="500"
:taggable="true"
:show-no-options="false"
:close-on-select="false"
:clear-on-select="false"
:preserve-search="false"
:preselect-first="false"
@tag="addKafkaProducerBroker"
></VueMultiselect>
</div>
<!-- Kafka Topic Name -->
<div class="my-3">
<label for="kafkaProducerTopic" class="form-label">{{ $t("Kafka Topic Name") }}</label>
<input id="kafkaProducerTopic" v-model="monitor.kafkaProducerTopic" type="text" class="form-control" required>
</div>
<!-- Kafka Producer Message -->
<div class="my-3">
<label for="kafkaProducerMessage" class="form-label">{{ $t("Kafka Producer Message") }}</label>
<input id="kafkaProducerMessage" v-model="monitor.kafkaProducerMessage" type="text" class="form-control" required>
</div>
<!-- Kafka SSL -->
<div class="my-3 form-check">
<input id="kafkaProducerSsl" v-model="monitor.kafkaProducerSsl" class="form-check-input" type="checkbox">
<label class="form-check-label" for="kafkaProducerSsl">
{{ $t("Enable Kafka SSL") }}
</label>
</div>
<!-- Kafka SSL -->
<div class="my-3 form-check">
<input id="kafkaProducerAllowAutoTopicCreation" v-model="monitor.kafkaProducerAllowAutoTopicCreation" class="form-check-input" type="checkbox">
<label class="form-check-label" for="kafkaProducerAllowAutoTopicCreation">
{{ $t("Enable Kafka Producer Auto Topic Creation") }}
</label>
</div>
</template>
<!-- Hostname -->
<!-- TCP Port / Ping / DNS / Steam / MQTT / Radius only -->
<div v-if="monitor.type === 'port' || monitor.type === 'ping' || monitor.type === 'dns' || monitor.type === 'steam' || monitor.type === 'gamedig' ||monitor.type === 'mqtt' || monitor.type === 'radius'" class="my-3">
@ -512,6 +566,56 @@
</button>
</div>
<!-- Kafka SASL Options -->
<!-- Kafka Producer only -->
<template v-if="monitor.type === 'kafka-producer'">
<h2 class="mt-5 mb-2">{{ $t("Kafka SASL Options") }}</h2>
<div class="my-3">
<label class="form-label" for="kafkaProducerSaslMechanism">
{{ $t("Mechanism") }}
</label>
<VueMultiselect
id="kafkaProducerSaslMechanism"
v-model="monitor.kafkaProducerSaslOptions.mechanism"
:options="kafkaSaslMechanismOptions"
:multiple="false"
:clear-on-select="false"
:preserve-search="false"
:placeholder="$t('Pick a SASL Mechanism...')"
:preselect-first="false"
:max-height="500"
:allow-empty="false"
:taggable="false"
></VueMultiselect>
</div>
<div v-if="monitor.kafkaProducerSaslOptions.mechanism !== 'None'">
<div v-if="monitor.kafkaProducerSaslOptions.mechanism !== 'aws'" class="my-3">
<label for="kafkaProducerSaslUsername" class="form-label">{{ $t("Username") }}</label>
<input id="kafkaProducerSaslUsername" v-model="monitor.kafkaProducerSaslOptions.username" type="text" autocomplete="kafkaProducerSaslUsername" class="form-control">
</div>
<div v-if="monitor.kafkaProducerSaslOptions.mechanism !== 'aws'" class="my-3">
<label for="kafkaProducerSaslPassword" class="form-label">{{ $t("Password") }}</label>
<input id="kafkaProducerSaslPassword" v-model="monitor.kafkaProducerSaslOptions.password" type="password" autocomplete="kafkaProducerSaslPassword" class="form-control">
</div>
<div v-if="monitor.kafkaProducerSaslOptions.mechanism === 'aws'" class="my-3">
<label for="kafkaProducerSaslAuthorizationIdentity" class="form-label">{{ $t("Authorization Identity") }}</label>
<input id="kafkaProducerSaslAuthorizationIdentity" v-model="monitor.kafkaProducerSaslOptions.authorizationIdentity" type="text" autocomplete="kafkaProducerSaslAuthorizationIdentity" class="form-control" required>
</div>
<div v-if="monitor.kafkaProducerSaslOptions.mechanism === 'aws'" class="my-3">
<label for="kafkaProducerSaslAccessKeyId" class="form-label">{{ $t("AccessKey Id") }}</label>
<input id="kafkaProducerSaslAccessKeyId" v-model="monitor.kafkaProducerSaslOptions.accessKeyId" type="text" autocomplete="kafkaProducerSaslAccessKeyId" class="form-control" required>
</div>
<div v-if="monitor.kafkaProducerSaslOptions.mechanism === 'aws'" class="my-3">
<label for="kafkaProducerSaslSecretAccessKey" class="form-label">{{ $t("Secret AccessKey") }}</label>
<input id="kafkaProducerSaslSecretAccessKey" v-model="monitor.kafkaProducerSaslOptions.secretAccessKey" type="password" autocomplete="kafkaProducerSaslSecretAccessKey" class="form-control" required>
</div>
<div v-if="monitor.kafkaProducerSaslOptions.mechanism === 'aws'" class="my-3">
<label for="kafkaProducerSaslSessionToken" class="form-label">{{ $t("Session Token") }}</label>
<input id="kafkaProducerSaslSessionToken" v-model="monitor.kafkaProducerSaslOptions.sessionToken" type="password" autocomplete="kafkaProducerSaslSessionToken" class="form-control">
</div>
</div>
</template>
<!-- HTTP Options -->
<template v-if="monitor.type === 'http' || monitor.type === 'keyword' || monitor.type === 'json-query' ">
<h2 class="mt-5 mb-2">{{ $t("HTTP Options") }}</h2>
@ -724,6 +828,7 @@ export default {
},
acceptedStatusCodeOptions: [],
dnsresolvetypeOptions: [],
kafkaSaslMechanismOptions: [],
ipOrHostnameRegexPattern: hostNameRegexPattern(),
mqttIpOrHostnameRegexPattern: hostNameRegexPattern(true),
gameList: null,
@ -987,12 +1092,21 @@ message HealthCheckResponse {
"TXT",
];
let kafkaSaslMechanismOptions = [
"None",
"plain",
"scram-sha-256",
"scram-sha-512",
"aws",
];
for (let i = 100; i <= 999; i++) {
acceptedStatusCodeOptions.push(i.toString());
}
this.acceptedStatusCodeOptions = acceptedStatusCodeOptions;
this.dnsresolvetypeOptions = dnsresolvetypeOptions;
this.kafkaSaslMechanismOptions = kafkaSaslMechanismOptions;
},
methods: {
/** Initialize the edit monitor form */
@ -1026,7 +1140,11 @@ message HealthCheckResponse {
mqttTopic: "",
mqttSuccessMessage: "",
authMethod: null,
httpBodyEncoding: "json"
httpBodyEncoding: "json",
kafkaProducerBrokers: [],
kafkaProducerSaslOptions: {
mechanism: "None",
},
};
if (this.$root.proxyList && !this.monitor.proxyId) {
@ -1067,6 +1185,7 @@ message HealthCheckResponse {
this.monitor.childrenIDs = undefined;
this.monitor.forceInactive = undefined;
this.monitor.pathName = undefined;
this.monitor.screenshot = undefined;
this.monitor.name = this.$t("cloneOf", [ this.monitor.name ]);
this.$refs.tagsManager.newTags = this.monitor.tags.map((monitorTag) => {
@ -1093,6 +1212,10 @@ message HealthCheckResponse {
},
addKafkaProducerBroker(newBroker) {
this.monitor.kafkaProducerBrokers.push(newBroker);
},
/**
* Validate form input
* @returns {boolean} Is the form input valid?

Loading…
Cancel
Save