using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.DependencyInjection; using System; using System.Collections.Generic; using System.Linq; using System.Text.Json; using System.Threading.Tasks; using VolPro.Entity.DomainModels; using VolPro.WebApi.Controllers.Hubs; using Warehouse.IRepositories; using Warehouse.IServices; namespace Warehouse.Services; /// /// 规则引擎核心服务。 /// 由 RuleEngineJob 每 10s 调用一次 EvaluateAllAsync。 /// /// 流程: /// 1. 加载所有启用规则(含条件+动作) /// 2. 从 gateway 批量获取实时值 /// 3. 逐规则评估条件 → 触发动作 → 写日志 /// public class RuleEngineService { private readonly Iwarehouse_ruleRepository _ruleRepo; private readonly Ibase_deviceRepository _devRepo; private readonly Iiot_devicedataRepository _dataRepo; private readonly Iiot_alarmRepository _alarmRepo; private readonly GatewayClient _gatewayClient; private readonly IHubContext _hub; public RuleEngineService( Iwarehouse_ruleRepository ruleRepo, Ibase_deviceRepository devRepo, Iiot_devicedataRepository dataRepo, Iiot_alarmRepository alarmRepo, GatewayClient gatewayClient, IHubContext hub) { _ruleRepo = ruleRepo; _devRepo = devRepo; _dataRepo = dataRepo; _alarmRepo = alarmRepo; _gatewayClient = gatewayClient; _hub = hub; } public async Task EvaluateAllAsync() { // 1. 加载启用规则 var rules = await LoadEnabledRulesAsync(); if (!rules.Any()) return; // 2. 构建 DeviceId → (AdapterCode, SourceId, BaseUrl) 映射 var deviceMap = await BuildDeviceMappingAsync(rules); // 3. 批量取实时值(按网关分组调 B4-batch) var realtimeData = await BatchFetchRealtimeAsync(rules, deviceMap); // 4. 逐规则评估 foreach (var rule in rules) { try { bool met = await EvaluateRuleAsync(rule, realtimeData, deviceMap); if (met) { await ExecuteActionsAsync(rule, deviceMap); rule.LastTriggered = DateTime.Now; } rule.LastEvaluated = DateTime.Now; await _ruleRepo.DbContext.Updateable(rule) .UpdateColumns(r => new { r.LastEvaluated, r.LastTriggered }).ExecuteCommandAsync(); } catch { /* 单规则失败不阻塞 */ } } } // ═══════════════════════════════════════════ // 规则加载 // ═══════════════════════════════════════════ private async Task> LoadEnabledRulesAsync() { var rules = await _ruleRepo.FindAsIQueryable(r => r.Enable == "启用" || r.Enable == null).ToListAsync(); foreach (var r in rules) { r.CooldownSec = r.CooldownSec > 0 ? r.CooldownSec : 60; r.Priority = r.Priority ?? 0; } return rules.OrderByDescending(r => r.Priority).ToList(); } // ═══════════════════════════════════════════ // 设备映射: DeviceId → (AdapterCode, SourceId, 网关BaseUrl) // ═══════════════════════════════════════════ private async Task> BuildDeviceMappingAsync( List rules) { var conditionDeviceIds = rules.SelectMany(r => r.warehouse_rulecondition ?? new()) .Select(c => c.DeviceId ?? 0).Where(id => id > 0).Distinct().ToList(); var actionDeviceIds = rules.SelectMany(r => r.warehouse_ruleaction ?? new()) .Select(a => a.DeviceId ?? 0).Where(id => id > 0).Distinct().ToList(); var allIds = conditionDeviceIds.Union(actionDeviceIds).ToList(); if (!allIds.Any()) return new(); var devices = await _devRepo.FindAsIQueryable(d => allIds.Contains(d.DeviceId)).ToListAsync(); var map = new Dictionary(); foreach (var d in devices) { string baseUrl = ""; if (!string.IsNullOrEmpty(d.AdapterCode)) { var prefix = d.AdapterCode.Split(':')[0]; // 从 gateway_nodes 查找对应的 BaseUrl try { var gw = _devRepo.DbContext.Queryable() .First(x => x.AdapterTypes != null && x.AdapterTypes.Contains(prefix) && x.IsOnline == "在线"); baseUrl = gw?.BaseUrl ?? ""; } catch { } } map[d.DeviceId] = (d.AdapterCode ?? "", d.SourceId ?? "", baseUrl); } return map; } // ═══════════════════════════════════════════ // 批量实时值获取 // ═══════════════════════════════════════════ private async Task>> BatchFetchRealtimeAsync( List rules, Dictionary deviceMap) { var result = new Dictionary<(string, string), List<(int, double)>>(); // 按网关分组 var gwGroups = new Dictionary>(); foreach (var (deviceId, (adapter, sourceId, baseUrl)) in deviceMap) { if (string.IsNullOrEmpty(baseUrl) || string.IsNullOrEmpty(sourceId)) continue; if (!gwGroups.ContainsKey(baseUrl)) gwGroups[baseUrl] = new(); gwGroups[baseUrl].Add((adapter, sourceId)); } foreach (var (baseUrl, pairs) in gwGroups) { foreach (var (adapter, sourceId) in pairs) { try { var data = await _gatewayClient.GetRealtimeAsync(baseUrl, adapter, sourceId); if (data == null) continue; var root = data.RootElement; if (root.TryGetProperty("rows", out var rows) && rows.ValueKind == JsonValueKind.Array) { var list = new List<(int, double)>(); foreach (var r in rows.EnumerateArray()) { int idx = r.TryGetProperty("index", out var i) ? i.GetInt32() : 0; double val = r.TryGetProperty("value", out var v) ? v.GetDouble() : 0; list.Add((idx, val)); } result[(adapter, sourceId)] = list; } } catch { } } } return result; } // ═══════════════════════════════════════════ // 条件评估 // ═══════════════════════════════════════════ private Task EvaluateRuleAsync(warehouse_rule rule, Dictionary<(string adapter, string sourceId), List<(int pointIndex, double value)>> realtimeData, Dictionary deviceMap) { var conditions = rule.warehouse_rulecondition ?? new(); if (!conditions.Any()) return Task.FromResult(false); var results = new List(); foreach (var cond in conditions) { if (cond.DeviceId == null || cond.ValueId == null) { results.Add(false); continue; } // 冷却检查 if (cond.LastTriggered.HasValue && rule.CooldownSec > 0) { if ((DateTime.Now - cond.LastTriggered.Value).TotalSeconds < rule.CooldownSec) { results.Add(false); continue; } } if (!deviceMap.TryGetValue(cond.DeviceId.Value, out var devInfo)) { results.Add(false); continue; } double? actualValue = null; if (realtimeData.TryGetValue((devInfo.adapterCode, devInfo.sourceId), out var points)) { // ValueId 对应 pointIndex(简化:直接使用 ValueId 作为 pointIndex) var point = points.FirstOrDefault(p => p.pointIndex == cond.ValueId); actualValue = point.pointIndex == 0 && !points.Any(p => p.pointIndex == cond.ValueId) && points.Count > 0 ? points.First().value : point.value; if (point.pointIndex == 0 && points.Count == 0) actualValue = null; } // 滞后窗:已触发过则用恢复阈值 bool isTriggered = cond.LastTriggered.HasValue; double target = isTriggered ? (double)(cond.RecoveryThreshold_Numeric ?? cond.TargetValue_Number ?? 0) : (double)(cond.TargetValue_Number ?? 0); bool met = Compare(actualValue, cond.CompareOperator ?? "大于", target); results.Add(met); if (met) cond.LastTriggered = DateTime.Now; } bool finalResult = rule.JudgmentMode == "AND" ? results.All(r => r) : results.Any(r => r); return Task.FromResult(finalResult); } private static bool Compare(double? actual, string op, double target) { double v = actual ?? double.MinValue; return op switch { "大于" => v > target, "小于" => v < target, "等于" => Math.Abs(v - target) < 0.001, "大于等于" => v >= target, "小于等于" => v <= target, "不等于" => Math.Abs(v - target) > 0.001, _ => false }; } // ═══════════════════════════════════════════ // 动作执行 // ═══════════════════════════════════════════ private async Task ExecuteActionsAsync(warehouse_rule rule, Dictionary deviceMap) { var actions = rule.warehouse_ruleaction ?? new(); if (!actions.Any()) return; // 冷却检查 if (rule.LastTriggered.HasValue && rule.CooldownSec > 0) { if ((DateTime.Now - rule.LastTriggered.Value).TotalSeconds < rule.CooldownSec) return; } var tasks = actions.Select(a => ExecuteSingleActionAsync(a, deviceMap)); await Task.WhenAll(tasks); } private async Task ExecuteSingleActionAsync(warehouse_ruleaction action, Dictionary deviceMap) { var actionType = action.ActionType ?? action.Type ?? "控制"; try { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); switch (actionType) { case "控制": if (action.DeviceId.HasValue && deviceMap.TryGetValue(action.DeviceId.Value, out var dev)) { if (!string.IsNullOrEmpty(dev.baseUrl)) { var pointIndex = action.ValueId ?? 0; var value = (double)(action.TargetValue_Switch == "开" ? 1 : action.TargetValue_Number ?? 0); await _gatewayClient.ControlDeviceAsync(dev.baseUrl, dev.adapterCode, dev.sourceId, pointIndex, value); } } break; case "告警": if (action.Alert == "是" && action.DeviceId.HasValue) { var alarm = new iot_alarm { SourceAlarmId = $"rule-{action.RuleID}-{DateTime.Now.Ticks}", DeviceId = action.DeviceId.Value, AlarmLevel = "重要", AlarmDesc = action.AlertMessage ?? $"规则触发", StartTime = DateTime.Now, State = "未确认", AdapterCode = "RuleEngine", CreateDate = DateTime.Now }; _alarmRepo.Add(alarm); } break; case "通知": await _hub.Clients.All.SendAsync("RuleTriggered", new { title = action.AlertMessage ?? "规则触发", alertMessage = action.AlertMessage, deviceId = action.DeviceId }, cts.Token); break; } } catch (OperationCanceledException) { /* 超时 */ } catch { /* 单动作失败不阻塞 */ } } }