dockge/backend/agent-manager.ts
2024-11-17 11:35:49 +00:00

350 lines
10 KiB
TypeScript

import { DockgeSocket } from "./util-server";
import { io, Socket as SocketClient } from "socket.io-client";
import { log } from "./log";
import { Agent } from "./models/agent";
import { isDev, LooseObject, sleep } from "../common/util-common";
import semver from "semver";
import { R } from "redbean-node";
import dayjs, { Dayjs } from "dayjs";
/**
* Dockge Instance Manager
* One AgentManager per Socket connection
*/
export class AgentManager {
protected socket: DockgeSocket;
protected agentSocketList: Record<string, SocketClient> = {};
protected agentLoggedInList: Record<string, boolean> = {};
protected _firstConnectTime: Dayjs = dayjs();
constructor(socket: DockgeSocket) {
this.socket = socket;
}
get firstConnectTime(): Dayjs {
return this._firstConnectTime;
}
test(url: string, username: string, password: string): Promise<void> {
return new Promise((resolve, reject) => {
let obj = new URL(url);
let endpoint = obj.host;
if (!endpoint) {
reject(new Error("Invalid Dockge URL"));
}
if (this.agentSocketList[endpoint]) {
reject(new Error("The Dockge URL already exists"));
}
let client = io(url, {
reconnection: false,
extraHeaders: {
endpoint,
},
});
client.on("connect", () => {
client.emit(
"login",
{
username: username,
password: password,
},
(res: LooseObject) => {
if (res.ok) {
resolve();
} else {
reject(new Error(res.msg));
}
client.disconnect();
}
);
});
client.on("connect_error", (err) => {
if (err.message === "xhr poll error") {
reject(
new Error("Unable to connect to the Dockge instance")
);
} else {
reject(err);
}
client.disconnect();
});
});
}
/**
*
* @param url
* @param username
* @param password
*/
async add(url: string, username: string, password: string): Promise<Agent> {
let bean = R.dispense("agent") as Agent;
bean.url = url;
bean.username = username;
bean.password = password;
await R.store(bean);
return bean;
}
/**
*
* @param url
*/
async remove(url: string) {
let bean = await R.findOne("agent", " url = ? ", [url]);
if (bean) {
await R.trash(bean);
let endpoint = bean.endpoint;
this.disconnect(endpoint);
this.sendAgentList();
delete this.agentSocketList[endpoint];
} else {
throw new Error("Agent not found");
}
}
connect(url: string, username: string, password: string) {
let obj = new URL(url);
let endpoint = obj.host;
this.socket.emit("agentStatus", {
endpoint: endpoint,
status: "connecting",
});
if (!endpoint) {
log.error(
"agent-manager",
"Invalid endpoint: " + endpoint + " URL: " + url
);
return;
}
if (this.agentSocketList[endpoint]) {
log.debug(
"agent-manager",
"Already connected to the socket server: " + endpoint
);
return;
}
log.info(
"agent-manager",
"Connecting to the socket server: " + endpoint
);
let client = io(url, {
extraHeaders: {
endpoint,
},
});
client.on("connect", () => {
log.info(
"agent-manager",
"Connected to the socket server: " + endpoint
);
client.emit(
"login",
{
username: username,
password: password,
},
(res: LooseObject) => {
if (res.ok) {
log.info(
"agent-manager",
"Logged in to the socket server: " + endpoint
);
this.agentLoggedInList[endpoint] = true;
this.socket.emit("agentStatus", {
endpoint: endpoint,
status: "online",
});
} else {
log.error(
"agent-manager",
"Failed to login to the socket server: " + endpoint
);
this.agentLoggedInList[endpoint] = false;
this.socket.emit("agentStatus", {
endpoint: endpoint,
status: "offline",
});
}
}
);
});
client.on("connect_error", (err) => {
log.error(
"agent-manager",
"Error from the socket server: " + endpoint
);
this.socket.emit("agentStatus", {
endpoint: endpoint,
status: "offline",
});
});
client.on("disconnect", () => {
log.info(
"agent-manager",
"Disconnected from the socket server: " + endpoint
);
this.socket.emit("agentStatus", {
endpoint: endpoint,
status: "offline",
});
});
client.on("agent", (...args: unknown[]) => {
this.socket.emit("agent", ...args);
});
client.on("info", (res) => {
log.debug("agent-manager", res);
// Disconnect if the version is lower than 1.4.0
if (!isDev && semver.satisfies(res.version, "< 1.4.0")) {
this.socket.emit("agentStatus", {
endpoint: endpoint,
status: "offline",
msg: `${endpoint}: Unsupported version: ` + res.version,
});
client.disconnect();
}
});
this.agentSocketList[endpoint] = client;
}
disconnect(endpoint: string) {
let client = this.agentSocketList[endpoint];
client?.disconnect();
}
async connectAll() {
this._firstConnectTime = dayjs();
if (this.socket.endpoint) {
log.info(
"agent-manager",
"This connection is connected as an agent, skip connectAll()"
);
return;
}
let list: Record<string, Agent> = await Agent.getAgentList();
if (Object.keys(list).length !== 0) {
log.info(
"agent-manager",
"Connecting to all instance socket server(s)..."
);
}
for (let endpoint in list) {
let agent = list[endpoint];
this.connect(agent.url, agent.username, agent.password);
}
}
disconnectAll() {
for (let endpoint in this.agentSocketList) {
this.disconnect(endpoint);
}
}
async emitToEndpoint(
endpoint: string,
eventName: string,
...args: unknown[]
) {
log.debug("agent-manager", "Emitting event to endpoint: " + endpoint);
let client = this.agentSocketList[endpoint];
if (!client) {
log.error(
"agent-manager",
"Socket client not found for endpoint: " + endpoint
);
throw new Error(
"Socket client not found for endpoint: " + endpoint
);
}
if (!client.connected || !this.agentLoggedInList[endpoint]) {
// Maybe the request is too quick, the socket is not connected yet, check firstConnectTime
// If it is within 10 seconds, we should apply retry logic here
let diff = dayjs().diff(this.firstConnectTime, "second");
log.debug("agent-manager", endpoint + ": diff: " + diff);
let ok = false;
while (diff < 10) {
if (client.connected && this.agentLoggedInList[endpoint]) {
log.debug(
"agent-manager",
`${endpoint}: Connected & Logged in`
);
ok = true;
break;
}
log.debug(
"agent-manager",
endpoint + ": not ready yet, retrying in 1 second..."
);
await sleep(1000);
diff = dayjs().diff(this.firstConnectTime, "second");
}
if (!ok) {
log.error(
"agent-manager",
`${endpoint}: Socket client not connected`
);
throw new Error(
"Socket client not connected for endpoint: " + endpoint
);
}
}
client.emit("agent", endpoint, eventName, ...args);
}
emitToAllEndpoints(eventName: string, ...args: unknown[]) {
log.debug("agent-manager", "Emitting event to all endpoints");
for (let endpoint in this.agentSocketList) {
this.emitToEndpoint(endpoint, eventName, ...args).catch((e) => {
log.warn("agent-manager", e.message);
});
}
}
async sendAgentList() {
let list = await Agent.getAgentList();
let result: Record<string, LooseObject> = {};
// Myself
result[""] = {
url: "",
username: "",
endpoint: "",
};
for (let endpoint in list) {
let agent = list[endpoint];
result[endpoint] = agent.toJSON();
}
this.socket.emit("agentList", {
ok: true,
agentList: result,
});
}
}