- 发送时间: ${new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' })}
+ `;
+ })
+ .join('');
}
}
+ /**
+ * HTML 转义
+ */
+ escapeHTML(str) {
+ if (!str) return '';
+ const map = {
+ '&': '&',
+ '<': '<',
+ '>': '>',
+ '"': '"',
+ "'": ''',
+ };
+ return String(str).replace(/[&<>"']/g, m => map[m]);
+ }
+
/**
* 测试连接
*/
diff --git a/modules/notification-api/channels/telegram.js b/modules/notification-api/channels/telegram.js
index 36cae0b..e443cfc 100644
--- a/modules/notification-api/channels/telegram.js
+++ b/modules/notification-api/channels/telegram.js
@@ -24,7 +24,7 @@ class TelegramChannel {
try {
const url = `${this.apiBase}${config.bot_token}/sendMessage`;
- const text = this.formatMessage(title, message);
+ const text = this.formatMessage(title, message, options.notification, config);
const response = await axios.post(url, {
chat_id: config.chat_id,
@@ -55,12 +55,21 @@ class TelegramChannel {
/**
* 格式化消息
*/
- formatMessage(title, message) {
+ formatMessage(title, message, notification, config) {
let text = `${this.escapeHTML(title)}\n\n`;
- // 格式化消息内容
+ // 聚合通知特殊排版
+ if (notification?.is_batch) {
+ text += `当前有 ${notification.is_batch ? '多个' : ''} 汇总告警如下:\n\n`;
+ }
+
text += this.formatContent(message);
+ if (config.base_url) {
+ const dashboardUrl = config.base_url.replace(/\/$/, '') + '/#/';
+ text += `\n\n🔗 查看仪表盘`;
+ }
+
return text;
}
@@ -68,14 +77,25 @@ class TelegramChannel {
* 格式化内容
*/
formatContent(message) {
- // 如果是 JSON,格式化显示
+ // 如果是 JSON, 格式化显示
try {
const data = JSON.parse(message);
const jsonStr = JSON.stringify(data, null, 2);
return `
+
-
+
+
+
+
+ ${this.formatMessage(message)}
+
+
+ ${dashboardUrl ? `
+
+ 进入仪表盘看板
+
` : ''}
+
-
`;
@@ -142,16 +134,45 @@ class EmailChannel {
* 格式化消息内容
*/
formatMessage(message) {
- // 如果是 JSON,格式化显示
try {
const data = JSON.parse(message);
- return `本邮件由 API 监控系统自动发送,请勿回复
-${JSON.stringify(data, null, 2)}`;
+ return `${this.escapeHTML(JSON.stringify(data, null, 2))}`;
} catch (e) {
- // 普通文本,转换为段落
- return message.split('\n').map(line => `${line}
`).join(''); + return message.split('\n') + .filter(line => line.trim()) + .map(line => { + const colonIndex = line.indexOf(':'); + if (colonIndex > 0) { + const label = line.substring(0, colonIndex).replace(/(📋|📧|⏰|📊|🖥️|💳|🔗|🌐|❌|⏱️|💰|🎯)/gu, '').trim(); + const value = line.substring(colonIndex + 1).trim(); + return ` +
+
+ `;
+ }
+ return `${this.escapeHTML(label)}
+ ${this.escapeHTML(value)}
+ ${this.escapeHTML(line)}
${this.escapeHTML(jsonStr)}`;
} catch (e) {
- // 普通文本,转义并保留换行
- return this.escapeHTML(message).replace(/\n/g, '\n');
+ // 普通文本,识别 "Label: Value" 模式进行美化排版
+ return message.split('\n')
+ .filter(line => line.trim())
+ .map(line => {
+ const colonIndex = line.indexOf(':');
+ if (colonIndex > 0 && colonIndex < 30) {
+ const label = line.substring(0, colonIndex).replace(/(📋|📧|⏰|📊|🖥️|💳|🔗|🌐|❌|⏱️|💰|🎯)/gu, '').trim();
+ const value = line.substring(colonIndex + 1).trim();
+ return `${this.escapeHTML(label)}: ${this.escapeHTML(value)}`;
+ }
+ return this.escapeHTML(line);
+ })
+ .join('\n');
}
}
diff --git a/modules/notification-api/models.js b/modules/notification-api/models.js
index 54208ea..b24f725 100644
--- a/modules/notification-api/models.js
+++ b/modules/notification-api/models.js
@@ -87,6 +87,9 @@ class AlertRuleModel extends BaseModel {
suppression: JSON.stringify(ruleData.suppression || {}),
time_window: JSON.stringify(ruleData.time_window || { enabled: false }),
description: ruleData.description || '',
+ title_template: ruleData.title_template || '',
+ message_template: ruleData.message_template || '',
+ backup_channels: JSON.stringify(ruleData.backup_channels || []),
};
this.insert(data);
return data;
@@ -123,6 +126,7 @@ class AlertRuleModel extends BaseModel {
conditions: JSON.parse(rule.conditions || '{}'),
suppression: JSON.parse(rule.suppression || '{}'),
time_window: JSON.parse(rule.time_window || '{"enabled":false}'),
+ backup_channels: JSON.parse(rule.backup_channels || '[]'),
};
}
@@ -140,6 +144,10 @@ class AlertRuleModel extends BaseModel {
if (ruleData.suppression !== undefined) data.suppression = JSON.stringify(ruleData.suppression);
if (ruleData.time_window !== undefined) data.time_window = JSON.stringify(ruleData.time_window);
if (ruleData.description !== undefined) data.description = ruleData.description;
+ if (ruleData.quiet_until !== undefined) data.quiet_until = ruleData.quiet_until;
+ if (ruleData.title_template !== undefined) data.title_template = ruleData.title_template;
+ if (ruleData.message_template !== undefined) data.message_template = ruleData.message_template;
+ if (ruleData.backup_channels !== undefined) data.backup_channels = JSON.stringify(ruleData.backup_channels);
return this.update(id, data);
}
@@ -280,6 +288,8 @@ class AlertStateTrackingModel extends BaseModel {
fingerprint: fingerprint,
last_triggered_at: Date.now(),
consecutive_failures: 1,
+ state_history: JSON.stringify([]),
+ is_flapping: 0,
...updates,
};
const result = this.insert(data);
@@ -402,10 +412,35 @@ class NotificationGlobalConfigModel extends BaseModel {
enable_batch: config.enable_batch === 1,
batch_interval_seconds: config.batch_interval_seconds || 30,
default_channels: JSON.parse(config.default_channels || '[]'),
+ global_rate_limit_per_hour: config.global_rate_limit_per_hour || 100,
+ enable_auto_escalation: config.enable_auto_escalation === 1,
+ base_url: config.base_url || '',
};
}
}
+/**
+ * 维护计划模型
+ */
+class MaintenanceScheduleModel extends BaseModel {
+ constructor() {
+ super('maintenance_schedules');
+ }
+
+ /**
+ * 获取当前生效的维护计划
+ */
+ getActive() {
+ const db = this.getDb();
+ const now = new Date().toISOString();
+ const stmt = db.prepare(`
+ SELECT * FROM ${this.tableName}
+ WHERE start_at <= ? AND end_at >= ?
+ `);
+ return stmt.all(now, now);
+ }
+}
+
// 导出单例实例
module.exports = {
NotificationChannel: new NotificationChannelModel(),
@@ -413,5 +448,6 @@ module.exports = {
NotificationHistory: new NotificationHistoryModel(),
AlertStateTracking: new AlertStateTrackingModel(),
NotificationGlobalConfig: new NotificationGlobalConfigModel(),
+ MaintenanceSchedule: new MaintenanceScheduleModel(),
generateId,
};
diff --git a/modules/notification-api/router.js b/modules/notification-api/router.js
index 2802eb4..3b01077 100644
--- a/modules/notification-api/router.js
+++ b/modules/notification-api/router.js
@@ -168,14 +168,9 @@ router.post('/channels/:id/test', async (req, res) => {
// 解密配置
const config = JSON.parse(decrypt(channel.config));
- const testTitle = '🔔 [测试] API Monitor 通知测试';
- const testMessage = `这是一条来自 API Monitor 的测试通知。
-
-📋 渠道名称: ${channel.name}
-📧 渠道类型: ${channel.type === 'email' ? 'Email 邮箱' : 'Telegram'}
-⏰ 发送时间: ${new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' })}
-
-如果您收到此消息,说明通知渠道配置正确!`;
+ const testTitle = '🔔 通知连通性测试';
+ const testMessage = `发送时间: ${new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' })}
+验证状态: 配置正确, 通道畅通`;
let success = false;
if (channel.type === 'email') {
@@ -242,6 +237,10 @@ router.post('/rules', (req, res) => {
suppression = {},
time_window = { enabled: false },
description = '',
+ title_template = '',
+ message_template = '',
+ backup_channels = [],
+ quiet_until = null,
enabled = true,
} = req.body;
@@ -259,6 +258,10 @@ router.post('/rules', (req, res) => {
suppression,
time_window,
description,
+ title_template,
+ message_template,
+ backup_channels,
+ quiet_until,
enabled: enabled ? 1 : 0,
});
@@ -283,6 +286,10 @@ router.put('/rules/:id', (req, res) => {
suppression,
time_window,
description,
+ title_template,
+ message_template,
+ backup_channels,
+ quiet_until,
enabled,
} = req.body;
@@ -294,6 +301,10 @@ router.put('/rules/:id', (req, res) => {
if (suppression !== undefined) updateData.suppression = suppression;
if (time_window !== undefined) updateData.time_window = time_window;
if (description !== undefined) updateData.description = description;
+ if (title_template !== undefined) updateData.title_template = title_template;
+ if (message_template !== undefined) updateData.message_template = message_template;
+ if (backup_channels !== undefined) updateData.backup_channels = backup_channels;
+ if (quiet_until !== undefined) updateData.quiet_until = quiet_until;
if (enabled !== undefined) updateData.enabled = enabled ? 1 : 0;
storage.rule.update(req.params.id, updateData);
diff --git a/modules/notification-api/schema.sql b/modules/notification-api/schema.sql
index 518b38b..6bab9d3 100644
--- a/modules/notification-api/schema.sql
+++ b/modules/notification-api/schema.sql
@@ -24,10 +24,22 @@ CREATE TABLE IF NOT EXISTS alert_rules (
suppression TEXT,
time_window TEXT,
description TEXT,
+ quiet_until DATETIME, -- 手动静默截止时间
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
+-- 20.1 维护计划表
+CREATE TABLE IF NOT EXISTS maintenance_schedules (
+ id TEXT PRIMARY KEY,
+ target_type TEXT NOT NULL, -- monitor/server/global
+ target_id TEXT,
+ start_at DATETIME NOT NULL,
+ end_at DATETIME NOT NULL,
+ reason TEXT,
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP
+);
+
-- 21. 通知历史表
CREATE TABLE IF NOT EXISTS notification_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -69,6 +81,9 @@ CREATE TABLE IF NOT EXISTS notification_global_config (
enable_batch INTEGER DEFAULT 1,
batch_interval_seconds INTEGER DEFAULT 30,
default_channels TEXT,
+ global_rate_limit_per_hour INTEGER DEFAULT 100, -- 全局熔断限制
+ enable_auto_escalation INTEGER DEFAULT 0, -- 是否开启告警升级
+ base_url TEXT, -- 仪表盘基地址,用于生成链接
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
diff --git a/modules/notification-api/service.js b/modules/notification-api/service.js
index 8e669e0..b3947cf 100644
--- a/modules/notification-api/service.js
+++ b/modules/notification-api/service.js
@@ -19,6 +19,14 @@ class NotificationService extends EventEmitter {
this.queue = [];
this.processing = false;
this.retryTimer = null;
+
+ // --- 增强功能状态 ---
+ this.batchBuffer = new Map(); // channelId -> notification[]
+ this.batchTimers = new Map();
+ this.sentCountInLastHour = 0;
+ this.lastResetTime = Date.now();
+ this.circuitBroken = false;
+ this.startupTime = Date.now(); // 记录启动时间,用于启动保护
}
/**
@@ -44,6 +52,9 @@ class NotificationService extends EventEmitter {
// 启动定时清理任务
this.startCleanupTasks();
+ // 加载未完成的通知 (重启恢复)
+ this.loadIncompleteNotifications();
+
this.initialized = true;
logger.info('✅ 通知服务已初始化');
}
@@ -58,16 +69,24 @@ class NotificationService extends EventEmitter {
try {
logger.debug(`触发告警: ${sourceModule}/${eventType}`);
- // 自动处理恢复:如果是恢复事件,重置对应的故障状态追踪
- // 这样下次故障时 repeat_count 可以重新计数
+ // 自动处理恢复:如果是恢复事件,更新对应的故障状态追踪
+ // 不再直接 reset,而是让抖动检测逻辑决定是否放行
if (eventType === 'up' || eventType === 'online') {
const oppositeType = eventType === 'up' ? 'down' : 'offline';
const downRules = storage.rule.getBySourceAndEvent(sourceModule, oppositeType);
if (downRules.length > 0) {
- logger.debug(`检测到恢复事件,正在重置 ${downRules.length} 条故障规则的状态记录`);
for (const rule of downRules) {
const fingerprint = this.generateFingerprint(rule, data);
- storage.stateTracking.reset(rule.id, fingerprint);
+ const state = storage.stateTracking.get(rule.id, fingerprint);
+ if (state) {
+ // 记录这次恢复尝试到历史中,以便检测是否在频繁抖动
+ this.detectFlapping(state, eventType);
+
+ // 如果没有抖动,才真正重置连续失败计数
+ if (!state.is_flapping) {
+ storage.stateTracking.reset(rule.id, fingerprint);
+ }
+ }
}
}
}
@@ -127,7 +146,28 @@ class NotificationService extends EventEmitter {
}
}
- // 5. 发送通知
+ // 4.1 检查手动静默 (quiet_until)
+ if (rule.quiet_until && new Date(rule.quiet_until) > new Date()) {
+ logger.debug(`规则处于手动静默期,跳过: ${rule.name}`);
+ return;
+ }
+
+ // 4.2 检查全局/特定维护计划
+ const maintenance = this.checkMaintenance(rule, eventData);
+ if (maintenance) {
+ logger.debug(`匹配到维护计划 [${maintenance.reason}], 跳过: ${rule.name}`);
+ return;
+ }
+
+ // 5. 抖动检测 (Anti-Flapping)
+ const isFlapping = this.detectFlapping(state, rule.event_type);
+ if (isFlapping) {
+ // 如果正在抖动,仅发送特定频率或直接抑制
+ logger.debug(`检测到监控项处于抖动状态, 抑制重复变动通知: ${rule.name}`);
+ return;
+ }
+
+ // 6. 发送通知
for (const channelId of channelIds) {
const channel = storage.channel.getById(channelId);
if (!channel || !channel.enabled) {
@@ -135,114 +175,248 @@ class NotificationService extends EventEmitter {
continue;
}
+ const ctx = { rule, eventData, severity: rule.severity, state };
const notification = {
rule_id: rule.id,
channel_id: channelId,
- title: this.formatTitle(rule, eventData),
- message: this.formatMessage(rule, eventData),
+ title: this.formatTitle(rule, eventData, ctx),
+ message: this.formatMessage(rule, eventData, ctx),
data: eventData,
+ severity: rule.severity
};
this.enqueue(notification);
}
- // 6. 更新最后通知时间
+ // 7. 更新最后通知时间
storage.stateTracking.updateLastNotified(rule.id, fingerprint);
}
/**
- * 发送通知 (核心逻辑)
+ * 发送通知 (核心逻辑,支持备用渠道漂移)
*/
async send(notification) {
- const { channel_id, title, message } = notification;
- const channel = storage.channel.getById(channel_id);
+ const { channel_id, title, message, rule_id } = notification;
+ let success = await this.doSend(channel_id, title, message, notification);
+
+ // 如果首选渠道失败且有备用渠道,尝试漂移
+ if (!success && rule_id) {
+ const rule = storage.rule.getById(rule_id);
+ if (rule?.backup_channels?.length > 0) {
+ logger.warn(`首选渠道发送失败,尝试通过 ${rule.backup_channels.length} 个备用渠道漂移...`);
+ for (const backupId of rule.backup_channels) {
+ success = await this.doSend(backupId, `[漂移] ${title}`, message, notification);
+ if (success) {
+ logger.info(`备用渠道 ${backupId} 漂移成功`);
+ break;
+ }
+ }
+ }
+ }
- if (!channel) {
- logger.error(`渠道不存在: ${channel_id}`);
- return false;
+ // 更新历史记录
+ if (success) {
+ storage.history.updateStatus(notification.log_id, 'sent', new Date().toISOString());
+ } else {
+ storage.history.updateStatus(notification.log_id, 'failed', null, '所有尝试(含漂移)均失败');
}
+ return success;
+ }
+
+ /**
+ * 实际执行发送
+ */
+ async doSend(channel_id, title, message, notification) {
+ const channel = storage.channel.getById(channel_id);
+ if (!channel) return false;
+
try {
- // 解密配置
const config = JSON.parse(decrypt(channel.config));
+ const options = { notification }; // 传递上下文给渠道格式化
let success = false;
-
if (channel.type === 'email') {
- success = await emailChannel.send(config, title, message);
+ success = await emailChannel.send(config, title, message, options);
} else if (channel.type === 'telegram') {
- success = await telegramChannel.send(config, title, message);
- } else {
- logger.error(`未知渠道类型: ${channel.type}`);
- return false;
- }
-
- // 更新历史记录
- if (success) {
- storage.history.updateStatus(
- notification.log_id,
- 'sent',
- new Date().toISOString()
- );
- logger.info(`通知发送成功: ${title}`);
- } else {
- storage.history.updateStatus(
- notification.log_id,
- 'failed',
- null,
- '发送失败'
- );
+ success = await telegramChannel.send(config, title, message, options);
}
-
return success;
} catch (error) {
- logger.error(`发送通知失败: ${error.message}`);
-
- // 更新历史记录为失败
- storage.history.updateStatus(
- notification.log_id,
- 'failed',
- null,
- error.message
- );
-
+ logger.error(`执行渠道 ${channel_id} 发送失败: ${error.message}`);
return false;
}
}
/**
- * 队列管理
+ * 队列管理 (增加合并与熔断支持)
*/
enqueue(notification) {
- // 创建历史记录
- const log = storage.history.create(notification);
- notification.log_id = log.id;
+ // 如果熔断开启,丢弃非 Critical 告警
+ if (this.circuitBroken) {
+ const rule = storage.rule.getById(notification.rule_id);
+ if (rule?.severity !== 'critical') {
+ logger.warn(`[熔断控制] 已丢弃非紧急告警: ${notification.title}`);
+ return;
+ }
+ }
- // 加入队列
- this.queue.push(notification);
+ // 创建/获取历史记录
+ if (!notification.log_id) {
+ const log = storage.history.create(notification);
+ notification.log_id = log.id;
+ }
- logger.debug(`通知已加入队列: ${notification.title} (队列长度: ${this.queue.length})`);
+ const config = storage.globalConfig.getDefault();
+ const startupGracePeriod = 60000; // 启动 60 秒内为保护期
+ const isStartup = Date.now() - this.startupTime < startupGracePeriod;
+
+ // 检查是否需要合并 (Batching)
+ // 启动保护期内强制开启聚合,防止重启轰炸
+ if ((config.enable_batch || isStartup) && !notification.is_retry) {
+ let interval = config.batch_interval_seconds || 30;
+ if (isStartup && interval < 30) interval = 30; // 启动时至少等待 30 秒以收集首轮扫描的所有故障
+ this.addToBatch(notification, interval);
+ } else {
+ this.queue.push(notification);
+ }
- // 确保队列处理器运行
- if (!this.processing) {
+ // 启动队列处理器
+ if (!this.processing && this.queue.length > 0) {
this.startQueueProcessor();
}
}
/**
- * 启动队列处理器
+ * 加入合并缓冲区
+ */
+ addToBatch(notification, interval) {
+ const channelId = notification.channel_id;
+ if (!this.batchBuffer.has(channelId)) {
+ this.batchBuffer.set(channelId, []);
+ }
+
+ this.batchBuffer.get(channelId).push(notification);
+
+ // 如果没有定时器,启动一个
+ if (!this.batchTimers.has(channelId)) {
+ const timer = setTimeout(() => {
+ this.flushBatch(channelId);
+ }, interval * 1000);
+ this.batchTimers.set(channelId, timer);
+ }
+ }
+
+ /**
+ * 刷新并发送合并通知
+ */
+ async flushBatch(channelId) {
+ const notifications = this.batchBuffer.get(channelId) || [];
+ this.batchBuffer.delete(channelId);
+ this.batchTimers.delete(channelId);
+
+ if (notifications.length === 0) return;
+
+ if (notifications.length === 1) {
+ this.queue.push(notifications[0]);
+ } else {
+ // 创建聚合通知
+ const first = notifications[0];
+ const batchNotification = {
+ ...first,
+ title: `📦 [聚合通知] 包含 ${notifications.length} 条告警`,
+ message: notifications.map(n => `--- ${n.title} ---\n${n.message}`).join('\n\n'),
+ is_batch: true
+ };
+ this.queue.push(batchNotification);
+ }
+
+ if (!this.processing) this.startQueueProcessor();
+ }
+
+ /**
+ * 熔断检查
+ */
+ checkRateLimit() {
+ const config = storage.globalConfig.getDefault();
+ const limit = config.global_rate_limit_per_hour || 100;
+
+ // 每小时重置计数器
+ const now = Date.now();
+ if (now - this.lastResetTime > 3600000) {
+ this.sentCountInLastHour = 0;
+ this.lastResetTime = now;
+ if (this.circuitBroken) {
+ this.circuitBroken = false;
+ logger.info('熔断已自动解除,恢复正常发送');
+ }
+ }
+
+ this.sentCountInLastHour++;
+
+ if (this.sentCountInLastHour > limit) {
+ if (!this.circuitBroken) {
+ this.circuitBroken = true;
+ logger.error(`[🚨 熔断控制] 已达到每小时发送上限 (${limit}), 进入保护模式!仅发送紧急告警。`);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * 检查维护状态
+ */
+ checkMaintenance(rule, eventData) {
+ const activeSchedules = storage.maintenance.getActive();
+ if (activeSchedules.length === 0) return null;
+
+ return activeSchedules.find(s => {
+ if (s.target_type === 'global') return true;
+ if (s.target_type === 'monitor' && s.target_id == eventData.monitorId) return true;
+ if (s.target_type === 'server' && s.target_id == eventData.serverId) return true;
+ return false;
+ });
+ }
+
+ /**
+ * 启动队列处理器 (支持并发发送)
*/
async startQueueProcessor() {
if (this.processing) return;
this.processing = true;
- while (this.queue.length > 0) {
- const notification = this.queue.shift();
- await this.send(notification);
- }
+ try {
+ const concurrency = 5; // 最大并发数
+ const workers = [];
+
+ // 启动多个 worker 并行处理队列
+ for (let i = 0; i < concurrency; i++) {
+ workers.push((async () => {
+ while (this.queue.length > 0) {
+ const notification = this.queue.shift();
+ if (!notification) continue;
+
+ try {
+ await this.send(notification);
+ } catch (error) {
+ logger.error(`异步发送通知异常: ${error.message}`);
+ }
+ }
+ })());
+ }
- this.processing = false;
+ // 等待所有 worker 完成
+ await Promise.all(workers);
+ } finally {
+ this.processing = false;
+
+ // 如果在 worker 完成期间又有新任务加入, 再次触发处理器
+ if (this.queue.length > 0) {
+ this.startQueueProcessor();
+ }
+ }
}
/**
@@ -262,12 +436,24 @@ class NotificationService extends EventEmitter {
logger.info(`发现 ${failedLogs.length} 条失败记录,准备重试`);
for (const log of failedLogs) {
+ // 增加时效性检查:不重试 24 小时前的通知
+ const createdAt = new Date(log.created_at).getTime();
+ if (Date.now() - createdAt > 24 * 60 * 60 * 1000) {
+ storage.history.updateStatus(log.id, 'failed', null, '通知超过 24 小时,停止重试');
+ continue;
+ }
+
const retryCount = log.retry_count || 0;
if (retryCount >= maxRetry) {
- logger.warn(`达到最大重试次数,放弃: ${log.title}`);
+ logger.warn(`达到最大重试次数,放弃: ${log.title} (ID: ${log.id})`);
continue;
}
+ logger.info(`正在重试通知: ${log.title} (重试次数: ${retryCount + 1}/${maxRetry})`);
+
+ // 更新状态为重试中 (这会增加 retry_count)
+ storage.history.updateStatus(log.id, 'retrying');
+
// 重新加入队列
const notification = {
rule_id: log.rule_id,
@@ -282,7 +468,7 @@ class NotificationService extends EventEmitter {
}
// 启动队列处理
- if (!this.processing) {
+ if (!this.processing && this.queue.length > 0) {
this.startQueueProcessor();
}
} catch (error) {
@@ -338,6 +524,57 @@ class NotificationService extends EventEmitter {
}
}
+ /**
+ * 加载未完成的通知 (用于系统启动时恢复)
+ */
+ async loadIncompleteNotifications() {
+ try {
+ // 获取待处理和重试中的记录
+ const pending = storage.history.getByStatus('pending', 100);
+ const retrying = storage.history.getByStatus('retrying', 100);
+
+ const all = [...pending, ...retrying];
+
+ if (all.length === 0) return;
+
+ logger.info(`发现 ${all.length} 条未完成的通知, 正在重新加载...`);
+
+ for (const log of all) {
+ // 增加时效性检查:只加载 24 小时以内的未完成通知
+ const createdAt = new Date(log.created_at).getTime();
+ if (Date.now() - createdAt > 24 * 60 * 60 * 1000) {
+ storage.history.updateStatus(log.id, 'failed', null, '系统重启清理:忽略 24 小时前的陈旧通知');
+ continue;
+ }
+
+ // 检查是否已经在队列中 (防止重复)
+ if (this.queue.some(n => n.log_id === log.id)) continue;
+
+ // 补丁:更新历史遗留的图标 (如果是恢复类事件且包含旧图标)
+ let title = log.title;
+ if ((title.includes('恢复') || title.includes('online') || title.includes('up')) &&
+ (title.includes('ℹ️') || title.includes('⚠️'))) {
+ title = title.replace('ℹ️', '✅').replace('⚠️', '✅');
+ }
+
+ const notification = {
+ rule_id: log.rule_id,
+ channel_id: log.channel_id,
+ title: title,
+ message: log.message,
+ data: JSON.parse(log.data || '{}'),
+ log_id: log.id,
+ is_backlog: true // 标记为积压通知
+ };
+
+ // 使用 enqueue 进入逻辑流,这样可以触发启动期的聚合逻辑
+ this.enqueue(notification);
+ }
+ } catch (error) {
+ logger.error(`加载未完成通知失败: ${error.message}`);
+ }
+ }
+
/**
* 加载渠道
*/
@@ -395,22 +632,37 @@ class NotificationService extends EventEmitter {
/**
* 格式化标题
*/
- formatTitle(rule, eventData) {
+ formatTitle(rule, eventData, ctx) {
+ if (rule.title_template) {
+ return this.renderTemplate(rule.title_template, eventData);
+ }
+
const severityIcon = {
critical: '🚨',
warning: '⚠️',
- info: 'ℹ️',
+ info: '通知',
};
+ let icon = severityIcon[rule.severity] || '🔔';
+
+ // 特殊处理:恢复类事件使用绿色对勾
+ if (rule.event_type === 'up' || rule.event_type === 'online') {
+ icon = '✅';
+ }
+
+ // 核心优化:直接在标题显示“主体 - 事件”
+ const subject = eventData.monitorName || eventData.serverName || '';
+
+ if (subject) {
+ return `${icon} ${subject} - ${rule.name}`;
+ }
- // 严重程度中文映射
+ // 降级逻辑:如果没有具体主体,则显示 [严重程度] 规则名
const severityText = {
critical: '紧急',
warning: '警告',
- info: '通知',
+ info: '提示',
};
-
- const icon = severityIcon[rule.severity] || '🔔';
const text = severityText[rule.severity] || rule.severity.toUpperCase();
return `${icon} [${text}] ${rule.name}`;
}
@@ -418,26 +670,24 @@ class NotificationService extends EventEmitter {
/**
* 格式化消息
*/
- formatMessage(rule, eventData) {
+ formatMessage(rule, eventData, ctx) {
+ if (rule.message_template) {
+ return this.renderTemplate(rule.message_template, eventData);
+ }
+
// 根据事件类型格式化消息
const lines = [];
// 添加基本信息
- if (eventData.monitorName) lines.push(`📊 监控项: ${eventData.monitorName}`);
- if (eventData.serverName) lines.push(`🖥️ 主机: ${eventData.serverName}`);
- if (eventData.accountName) lines.push(`💳 账户: ${eventData.accountName}`);
-
- lines.push(''); // 空行
-
- // 添加详细信息
- if (eventData.url) lines.push(`🔗 URL: ${eventData.url}`);
- if (eventData.host) lines.push(`🌐 主机: ${eventData.host}`);
- if (eventData.error) lines.push(`❌ 错误: ${eventData.error}`);
- if (eventData.ping !== undefined) lines.push(`⏱️ 响应时间: ${eventData.ping}ms`);
- if (eventData.cpu_usage !== undefined) lines.push(`📊 CPU 使用率: ${eventData.cpu_usage}%`);
- if (eventData.mem_percent !== undefined) lines.push(`💾 内存使用率: ${eventData.mem_percent}%`);
- if (eventData.balance !== undefined) lines.push(`💰 余额: $${eventData.balance}`);
- if (eventData.threshold !== undefined) lines.push(`🎯 阈值: ${eventData.threshold}`);
+ if (eventData.monitorName) lines.push(`项目: ${eventData.monitorName}`);
+ if (eventData.serverName) lines.push(`主机: ${eventData.serverName}`);
+ if (eventData.error) lines.push(`错误: ${eventData.error}`);
+ if (eventData.url) lines.push(`地址: ${eventData.url}`);
+ if (eventData.ping !== undefined) lines.push(`响应: ${eventData.ping}ms`);
+ if (eventData.cpu_usage !== undefined) lines.push(`CPU: ${eventData.cpu_usage}%`);
+ if (eventData.mem_percent !== undefined) lines.push(`内存: ${eventData.mem_percent}%`);
+ if (eventData.balance !== undefined) lines.push(`余额: $${eventData.balance}`);
+ if (eventData.threshold !== undefined) lines.push(`阈值: ${eventData.threshold}`);
// 如果没有特定信息,显示完整数据
if (lines.length <= 1) {
@@ -450,6 +700,62 @@ class NotificationService extends EventEmitter {
return lines.join('\n');
}
+ /**
+ * 模板渲染引擎
+ */
+ renderTemplate(template, data) {
+ if (!template) return '';
+ return template.replace(/\{\{(.*?)\}\}/g, (match, key) => {
+ const val = data[key.trim()];
+ return val !== undefined ? val : match;
+ });
+ }
+
+ /**
+ * 抖动检测 (Anti-Flapping)
+ * 计算过去 10 次状态变化的频率
+ */
+ detectFlapping(stateRecord, currentEventType) {
+ try {
+ const history = JSON.parse(stateRecord.state_history || '[]');
+ const eventVal = (currentEventType === 'up' || currentEventType === 'online') ? 1 : 0;
+ const now = Date.now();
+
+ // 1. 检查是否处于已锁定的抖动冷静期 (5分钟)
+ if (stateRecord.is_flapping && stateRecord.updated_at) {
+ const lastUpdate = new Date(stateRecord.updated_at).getTime();
+ if (now - lastUpdate < 5 * 60 * 1000) {
+ return true;
+ }
+ }
+
+ // 2. 记录历史
+ history.push({ t: now, v: eventVal });
+ if (history.length > 10) history.shift();
+
+ // 计算跳变次数 (v 变化的次数)
+ let flips = 0;
+ for (let i = 1; i < history.length; i++) {
+ if (history[i].v !== history[i - 1].v) flips++;
+ }
+
+ // 如果 10 次内有 4 次以上跳变,且时间间隔短(如 10 分钟内),判定为抖动
+ const durationMin = (history[history.length - 1].t - history[0].t) / 60000;
+ const isFlapping = flips >= 4 && durationMin < 10;
+
+ // 更新到数据库
+ storage.stateTracking.upsert(stateRecord.rule_id, stateRecord.fingerprint, {
+ state_history: JSON.stringify(history),
+ is_flapping: isFlapping ? 1 : 0
+ });
+
+ return isFlapping;
+ } catch (e) {
+ logger.error(`抖动检测异常: ${e.message}`);
+ return false;
+ }
+ }
+
/**
* 停止服务
*/
diff --git a/modules/notification-api/storage.js b/modules/notification-api/storage.js
index be3d2e8..67954df 100644
--- a/modules/notification-api/storage.js
+++ b/modules/notification-api/storage.js
@@ -8,6 +8,7 @@ const {
NotificationHistory,
AlertStateTracking,
NotificationGlobalConfig,
+ MaintenanceSchedule,
} = require('./models');
/**
@@ -295,4 +296,9 @@ module.exports = {
history: historyStorage,
stateTracking: stateTrackingStorage,
globalConfig: globalConfigStorage,
+ maintenance: {
+ getActive() {
+ return MaintenanceSchedule.getActive();
+ }
+ }
};
diff --git a/modules/server-api/agent-service.js b/modules/server-api/agent-service.js
index c76a8e0..dd7c666 100644
--- a/modules/server-api/agent-service.js
+++ b/modules/server-api/agent-service.js
@@ -45,6 +45,19 @@ class AgentService extends EventEmitter {
this.legacyMetrics = new Map();
this.legacyStatus = new Map();
+ // 统一任务注册表: taskId -> taskRecord
+ this.taskRegistry = new Map();
+ // 等待中的任务 Promise 解析器: taskId -> { resolve, reject }
+ this.taskResolvers = new Map();
+ // 任务进度轮询器: taskId -> intervalId
+ this.taskPollers = new Map();
+ // 任务清理策略
+ this.taskRetentionMs = 30 * 60 * 1000; // 30 分钟
+ this.taskCleanupTimer = setInterval(() => this.cleanupTaskRegistry(), 60 * 1000);
+ if (typeof this.taskCleanupTimer.unref === 'function') {
+ this.taskCleanupTimer.unref();
+ }
+
// 初始化加载或生成全局密钥
this.loadOrGenerateGlobalKey();
@@ -540,7 +553,13 @@ class AgentService extends EventEmitter {
socket.on(Events.AGENT_TASK_RESULT, result => {
if (!authenticated) return;
this.log(`任务结果: ${serverId} -> ${result.id} (${result.successful ? '成功' : '失败'})`);
- // TODO: 处理任务结果 (日志记录、通知等)
+ this.finishTaskRecord(result.id, {
+ id: result.id,
+ type: result.type,
+ successful: !!result.successful,
+ data: result.data,
+ delay: result.delay,
+ });
});
// 6. 接收 PTY 输出数据流
@@ -569,6 +588,7 @@ class AgentService extends EventEmitter {
console.log(`[AgentService] ${msg}`);
this.connections.delete(serverId);
this.stopHeartbeat(serverId);
+ this.failActiveTasksByServer(serverId, `Agent 已离线: ${reason}`);
this.updateServerStatus(serverId, 'offline');
this.broadcastServerStatus(serverId, 'offline');
this.triggerOfflineAlert(serverId); // Ensure offline alert is triggered
@@ -668,6 +688,7 @@ class AgentService extends EventEmitter {
*/
handleAgentTimeout(serverId) {
this.connections.delete(serverId);
+ this.failActiveTasksByServer(serverId, 'Agent 心跳超时');
this.updateServerStatus(serverId, 'offline');
this.broadcastServerStatus(serverId, 'offline');
@@ -825,28 +846,362 @@ class AgentService extends EventEmitter {
// ==================== 任务下发 ====================
- /**
- * 向 Agent 下发任务
- * @param {string} serverId - 目标主机 ID
- * @param {Object} task - 任务对象
- * @returns {boolean} 是否成功发送
- */
- sendTask(serverId, task) {
+ isTaskFinalState(state) {
+ return ['success', 'failed', 'timeout', 'cancelled'].includes(state);
+ }
+
+ snapshotTask(task) {
+ if (!task) return null;
+ return {
+ taskId: task.id,
+ id: task.id,
+ serverId: task.serverId,
+ domain: task.domain,
+ action: task.action,
+ type: task.type,
+ state: task.state,
+ progress: task.progress,
+ step: task.step,
+ message: task.message,
+ detail: task.detail,
+ result: task.result,
+ error: task.error,
+ timeoutMs: task.timeoutMs,
+ createdAt: task.createdAt,
+ updatedAt: task.updatedAt,
+ startedAt: task.startedAt,
+ finishedAt: task.finishedAt,
+ };
+ }
+
+ emitTaskUpdate(task) {
+ const snapshot = this.snapshotTask(task);
+ if (!snapshot) return;
+ this.emit('task:update', snapshot);
+ }
+
+ createTaskRecord(serverId, task, options = {}) {
+ const now = Date.now();
+ const taskId = task.id || crypto.randomUUID();
+ const timeoutMs = options.timeoutMs || 60000;
+
+ const record = {
+ id: taskId,
+ serverId,
+ domain: options.domain || 'system',
+ action: options.action || '',
+ type: task.type,
+ state: 'running',
+ progress: 0,
+ step: 'queued',
+ message: '任务已下发',
+ detail: '',
+ result: null,
+ error: null,
+ timeoutMs,
+ createdAt: now,
+ updatedAt: now,
+ startedAt: now,
+ finishedAt: null,
+ _timeoutTimer: null,
+ };
+
+ this.taskRegistry.set(taskId, record);
+ return record;
+ }
+
+ getTask(taskId) {
+ return this.snapshotTask(this.taskRegistry.get(taskId));
+ }
+
+ getRecentTasks(serverId = '', limit = 100) {
+ let tasks = Array.from(this.taskRegistry.values());
+ if (serverId) {
+ tasks = tasks.filter(item => item.serverId === serverId);
+ }
+ tasks.sort((a, b) => b.createdAt - a.createdAt);
+ return tasks.slice(0, limit).map(item => this.snapshotTask(item));
+ }
+
+ cleanupTaskRegistry() {
+ const now = Date.now();
+ for (const [taskId, task] of this.taskRegistry.entries()) {
+ if (!this.isTaskFinalState(task.state)) continue;
+ const finishedAt = task.finishedAt || task.updatedAt || task.createdAt;
+ if (now - finishedAt > this.taskRetentionMs) {
+ this.stopTaskProgressPolling(taskId);
+ if (task._timeoutTimer) {
+ clearTimeout(task._timeoutTimer);
+ }
+ this.taskRegistry.delete(taskId);
+ this.taskResolvers.delete(taskId);
+ }
+ }
+ }
+
+ updateTaskProgress(taskId, payload) {
+ const task = this.taskRegistry.get(taskId);
+ if (!task || this.isTaskFinalState(task.state)) return;
+
+ let progressData = payload;
+ if (typeof progressData === 'string') {
+ try {
+ progressData = JSON.parse(progressData);
+ } catch (e) {
+ return;
+ }
+ }
+ if (!progressData || typeof progressData !== 'object') return;
+
+ if (typeof progressData.percentage === 'number') {
+ const bounded = Math.max(0, Math.min(100, Math.round(progressData.percentage)));
+ task.progress = bounded;
+ }
+ if (typeof progressData.name === 'string' && progressData.name.trim()) {
+ task.step = progressData.name.trim();
+ }
+ if (typeof progressData.message === 'string' && progressData.message.trim()) {
+ task.message = progressData.message.trim();
+ }
+ if (typeof progressData.detail_msg === 'string') {
+ task.detail = progressData.detail_msg;
+ }
+ if (progressData.is_done === true) {
+ task.progress = 100;
+ }
+
+ task.updatedAt = Date.now();
+ this.emitTaskUpdate(task);
+ }
+
+ stopTaskProgressPolling(taskId) {
+ const timer = this.taskPollers.get(taskId);
+ if (timer) {
+ clearInterval(timer);
+ this.taskPollers.delete(taskId);
+ }
+ }
+
+ startTaskProgressPolling(taskId, serverId, intervalMs = 1500) {
+ if (this.taskPollers.has(taskId)) return;
+
+ const timer = setInterval(async () => {
+ const task = this.taskRegistry.get(taskId);
+ if (!task || this.isTaskFinalState(task.state)) {
+ this.stopTaskProgressPolling(taskId);
+ return;
+ }
+ if (!this.isOnline(serverId)) {
+ return;
+ }
+
+ try {
+ const result = await this._sendTaskAndWaitLegacy(
+ serverId,
+ {
+ type: TaskTypes.DOCKER_TASK_PROGRESS,
+ data: JSON.stringify({ task_id: taskId }),
+ timeout: 10,
+ },
+ 15000
+ );
+
+ if (result.successful && result.data) {
+ this.updateTaskProgress(taskId, result.data);
+ }
+ } catch (error) {
+ // 进度查询失败不直接中断主任务
+ }
+ }, Math.max(1000, intervalMs));
+
+ if (typeof timer.unref === 'function') {
+ timer.unref();
+ }
+ this.taskPollers.set(taskId, timer);
+ }
+
+ finishTaskRecord(taskId, result) {
+ const task = this.taskRegistry.get(taskId);
+ if (!task) return;
+
+ if (task._timeoutTimer) {
+ clearTimeout(task._timeoutTimer);
+ task._timeoutTimer = null;
+ }
+ this.stopTaskProgressPolling(taskId);
+
+ task.updatedAt = Date.now();
+ task.finishedAt = task.updatedAt;
+
+ if (result && result.successful) {
+ task.state = 'success';
+ task.progress = 100;
+ task.message = '任务执行成功';
+ task.result = result.data || '';
+ task.error = null;
+ } else {
+ task.state = 'failed';
+ task.message = '任务执行失败';
+ task.error = result?.data || '未知错误';
+ task.result = null;
+ }
+
+ this.emitTaskUpdate(task);
+
+ const resolver = this.taskResolvers.get(taskId);
+ if (resolver) {
+ this.taskResolvers.delete(taskId);
+ resolver.resolve(result);
+ }
+ }
+
+ failActiveTasksByServer(serverId, reason = 'Agent 连接中断') {
+ for (const [taskId, task] of this.taskRegistry.entries()) {
+ if (task.serverId !== serverId || this.isTaskFinalState(task.state)) continue;
+
+ if (task._timeoutTimer) {
+ clearTimeout(task._timeoutTimer);
+ task._timeoutTimer = null;
+ }
+ this.stopTaskProgressPolling(taskId);
+
+ task.state = 'failed';
+ task.error = reason;
+ task.message = reason;
+ task.updatedAt = Date.now();
+ task.finishedAt = task.updatedAt;
+ this.emitTaskUpdate(task);
+
+ const resolver = this.taskResolvers.get(taskId);
+ if (resolver) {
+ this.taskResolvers.delete(taskId);
+ resolver.reject(new Error(reason));
+ }
+ }
+ }
+
+ // 兼容内部短查询的旧实现(例如任务进度轮询)
+ _sendTaskAndWaitLegacy(serverId, task, timeout = 60000) {
+ return new Promise((resolve, reject) => {
+ const taskId = task.id || crypto.randomUUID();
+ const socket = this.connections.get(serverId);
+
+ if (!socket) {
+ return reject(new Error('主机不在线'));
+ }
+
+ const timer = setTimeout(() => {
+ socket.off(Events.AGENT_TASK_RESULT, resultHandler);
+ reject(new Error('任务执行超时'));
+ }, timeout);
+
+ const resultHandler = result => {
+ if (result.id === taskId) {
+ clearTimeout(timer);
+ socket.off(Events.AGENT_TASK_RESULT, resultHandler);
+ resolve(result);
+ }
+ };
+
+ socket.on(Events.AGENT_TASK_RESULT, resultHandler);
+ socket.emit(Events.DASHBOARD_TASK, {
+ id: taskId,
+ type: task.type,
+ data: task.data,
+ timeout: task.timeout || 0,
+ });
+ });
+ }
+
+ submitTask(serverId, task, options = {}) {
const socket = this.connections.get(serverId);
if (!socket) {
- console.warn(`[AgentService] 无法下发任务: ${serverId} 不在线`);
- return false;
+ throw new Error('主机不在线');
+ }
+
+ const timeoutMs = options.timeoutMs || 60000;
+ const record = this.createTaskRecord(serverId, task, {
+ timeoutMs,
+ domain: options.domain,
+ action: options.action,
+ });
+
+ record._timeoutTimer = setTimeout(() => {
+ if (this.isTaskFinalState(record.state)) return;
+
+ record.state = 'timeout';
+ record.error = '任务执行超时';
+ record.message = '任务执行超时';
+ record.updatedAt = Date.now();
+ record.finishedAt = record.updatedAt;
+ this.stopTaskProgressPolling(record.id);
+ this.emitTaskUpdate(record);
+
+ const resolver = this.taskResolvers.get(record.id);
+ if (resolver) {
+ this.taskResolvers.delete(record.id);
+ resolver.reject(new Error('任务执行超时'));
+ }
+ }, timeoutMs);
+
+ if (typeof record._timeoutTimer.unref === 'function') {
+ record._timeoutTimer.unref();
}
socket.emit(Events.DASHBOARD_TASK, {
- id: task.id || crypto.randomUUID(),
+ id: record.id,
type: task.type,
data: task.data,
timeout: task.timeout || 0,
});
- this.log(`任务已下发: ${serverId} -> ${task.type}`);
- return true;
+ this.emitTaskUpdate(record);
+ this.log(`任务已下发: ${serverId} -> ${task.type} (id: ${record.id})`);
+
+ if (options.trackProgress) {
+ this.startTaskProgressPolling(record.id, serverId, options.progressIntervalMs || 1500);
+ }
+
+ if (options.waitForResult === false) {
+ return record.id;
+ }
+
+ return new Promise((resolve, reject) => {
+ this.taskResolvers.set(record.id, { resolve, reject });
+ });
+ }
+
+ /**
+ * 向 Agent 下发任务
+ * @param {string} serverId - 目标主机 ID
+ * @param {Object} task - 任务对象
+ * @returns {string|false} 任务 ID
+ */
+ sendTask(serverId, task) {
+ // PTY 交互与非标准任务类型不进入任务注册表,避免高频输入污染任务中心
+ if (task.type === TaskTypes.PTY_START || typeof task.type !== 'number') {
+ const socket = this.connections.get(serverId);
+ if (!socket) {
+ return false;
+ }
+ socket.emit(Events.DASHBOARD_TASK, {
+ id: task.id || crypto.randomUUID(),
+ type: task.type,
+ data: task.data,
+ timeout: task.timeout || 0,
+ });
+ return task.id || true;
+ }
+
+ try {
+ return this.submitTask(serverId, task, {
+ waitForResult: false,
+ timeoutMs: Math.max(30000, ((task.timeout || 60) + 5) * 1000),
+ });
+ } catch (error) {
+ console.warn(`[AgentService] 无法下发任务: ${serverId} ${error.message}`);
+ return false;
+ }
}
/**
@@ -885,41 +1240,9 @@ class AgentService extends EventEmitter {
* @returns {Promise