Files
SecMPS/api_sqlsugar/Warehouse/Services/RuleEngineService.cs

321 lines
13 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using VolPro.Entity.DomainModels;
using VolPro.WebApi.Controllers.Hubs;
using Warehouse.IRepositories;
using Warehouse.IServices;
namespace Warehouse.Services;
/// <summary>
/// 规则引擎核心服务。
/// 由 RuleEngineJob 每 10s 调用一次 EvaluateAllAsync。
///
/// 流程:
/// 1. 加载所有启用规则(含条件+动作)
/// 2. 从 gateway 批量获取实时值
/// 3. 逐规则评估条件 → 触发动作 → 写日志
/// </summary>
public class RuleEngineService
{
private readonly Iwarehouse_ruleRepository _ruleRepo;
private readonly Ibase_deviceRepository _devRepo;
private readonly Iiot_devicedataRepository _dataRepo;
private readonly Iiot_alarmRepository _alarmRepo;
private readonly GatewayClient _gatewayClient;
private readonly IHubContext<HomePageMessageHub> _hub;
public RuleEngineService(
Iwarehouse_ruleRepository ruleRepo,
Ibase_deviceRepository devRepo,
Iiot_devicedataRepository dataRepo,
Iiot_alarmRepository alarmRepo,
GatewayClient gatewayClient,
IHubContext<HomePageMessageHub> hub)
{
_ruleRepo = ruleRepo;
_devRepo = devRepo;
_dataRepo = dataRepo;
_alarmRepo = alarmRepo;
_gatewayClient = gatewayClient;
_hub = hub;
}
public async Task EvaluateAllAsync()
{
// 1. 加载启用规则
var rules = await LoadEnabledRulesAsync();
if (!rules.Any()) return;
// 2. 构建 DeviceId → (AdapterCode, SourceId, BaseUrl) 映射
var deviceMap = await BuildDeviceMappingAsync(rules);
// 3. 批量取实时值(按网关分组调 B4-batch
var realtimeData = await BatchFetchRealtimeAsync(rules, deviceMap);
// 4. 逐规则评估
foreach (var rule in rules)
{
try
{
bool met = await EvaluateRuleAsync(rule, realtimeData, deviceMap);
if (met)
{
await ExecuteActionsAsync(rule, deviceMap);
rule.LastTriggered = DateTime.Now;
}
rule.LastEvaluated = DateTime.Now;
await _ruleRepo.DbContext.Updateable(rule)
.UpdateColumns(r => new { r.LastEvaluated, r.LastTriggered }).ExecuteCommandAsync();
}
catch { /* 单规则失败不阻塞 */ }
}
}
// ═══════════════════════════════════════════
// 规则加载
// ═══════════════════════════════════════════
private async Task<List<warehouse_rule>> LoadEnabledRulesAsync()
{
var rules = await _ruleRepo.FindAsIQueryable(r =>
r.Enable == "启用" || r.Enable == null).ToListAsync();
foreach (var r in rules)
{
r.CooldownSec = r.CooldownSec > 0 ? r.CooldownSec : 60;
r.Priority = r.Priority ?? 0;
}
return rules.OrderByDescending(r => r.Priority).ToList();
}
// ═══════════════════════════════════════════
// 设备映射: DeviceId → (AdapterCode, SourceId, 网关BaseUrl)
// ═══════════════════════════════════════════
private async Task<Dictionary<int, (string adapterCode, string sourceId, string baseUrl)>> BuildDeviceMappingAsync(
List<warehouse_rule> rules)
{
var conditionDeviceIds = rules.SelectMany(r => r.warehouse_rulecondition ?? new())
.Select(c => c.DeviceId ?? 0).Where(id => id > 0).Distinct().ToList();
var actionDeviceIds = rules.SelectMany(r => r.warehouse_ruleaction ?? new())
.Select(a => a.DeviceId ?? 0).Where(id => id > 0).Distinct().ToList();
var allIds = conditionDeviceIds.Union(actionDeviceIds).ToList();
if (!allIds.Any()) return new();
var devices = await _devRepo.FindAsIQueryable(d => allIds.Contains(d.DeviceId)).ToListAsync();
var map = new Dictionary<int, (string, string, string)>();
foreach (var d in devices)
{
string baseUrl = "";
if (!string.IsNullOrEmpty(d.AdapterCode))
{
var prefix = d.AdapterCode.Split(':')[0];
// 从 gateway_nodes 查找对应的 BaseUrl
try
{
var gw = _devRepo.DbContext.Queryable<gateway_nodes>()
.First(x => x.AdapterTypes != null && x.AdapterTypes.Contains(prefix) && x.IsOnline == "在线");
baseUrl = gw?.BaseUrl ?? "";
}
catch { }
}
map[d.DeviceId] = (d.AdapterCode ?? "", d.SourceId ?? "", baseUrl);
}
return map;
}
// ═══════════════════════════════════════════
// 批量实时值获取
// ═══════════════════════════════════════════
private async Task<Dictionary<(string adapter, string sourceId), List<(int pointIndex, double value)>>> BatchFetchRealtimeAsync(
List<warehouse_rule> rules,
Dictionary<int, (string adapterCode, string sourceId, string baseUrl)> deviceMap)
{
var result = new Dictionary<(string, string), List<(int, double)>>();
// 按网关分组
var gwGroups = new Dictionary<string, List<(string adapter, string sourceId)>>();
foreach (var (deviceId, (adapter, sourceId, baseUrl)) in deviceMap)
{
if (string.IsNullOrEmpty(baseUrl) || string.IsNullOrEmpty(sourceId)) continue;
if (!gwGroups.ContainsKey(baseUrl))
gwGroups[baseUrl] = new();
gwGroups[baseUrl].Add((adapter, sourceId));
}
foreach (var (baseUrl, pairs) in gwGroups)
{
foreach (var (adapter, sourceId) in pairs)
{
try
{
var data = await _gatewayClient.GetRealtimeAsync(baseUrl, adapter, sourceId);
if (data == null) continue;
var root = data.RootElement;
if (root.TryGetProperty("rows", out var rows) && rows.ValueKind == JsonValueKind.Array)
{
var list = new List<(int, double)>();
foreach (var r in rows.EnumerateArray())
{
int idx = r.TryGetProperty("index", out var i) ? i.GetInt32() : 0;
double val = r.TryGetProperty("value", out var v) ? v.GetDouble() : 0;
list.Add((idx, val));
}
result[(adapter, sourceId)] = list;
}
}
catch { }
}
}
return result;
}
// ═══════════════════════════════════════════
// 条件评估
// ═══════════════════════════════════════════
private Task<bool> EvaluateRuleAsync(warehouse_rule rule,
Dictionary<(string adapter, string sourceId), List<(int pointIndex, double value)>> realtimeData,
Dictionary<int, (string adapterCode, string sourceId, string baseUrl)> deviceMap)
{
var conditions = rule.warehouse_rulecondition ?? new();
if (!conditions.Any()) return Task.FromResult(false);
var results = new List<bool>();
foreach (var cond in conditions)
{
if (cond.DeviceId == null || cond.ValueId == null) { results.Add(false); continue; }
// 冷却检查
if (cond.LastTriggered.HasValue && rule.CooldownSec > 0)
{
if ((DateTime.Now - cond.LastTriggered.Value).TotalSeconds < rule.CooldownSec)
{ results.Add(false); continue; }
}
if (!deviceMap.TryGetValue(cond.DeviceId.Value, out var devInfo)) { results.Add(false); continue; }
double? actualValue = null;
if (realtimeData.TryGetValue((devInfo.adapterCode, devInfo.sourceId), out var points))
{
// ValueId 对应 pointIndex简化直接使用 ValueId 作为 pointIndex
var point = points.FirstOrDefault(p => p.pointIndex == cond.ValueId);
actualValue = point.pointIndex == 0 && !points.Any(p => p.pointIndex == cond.ValueId) && points.Count > 0
? points.First().value : point.value;
if (point.pointIndex == 0 && points.Count == 0) actualValue = null;
}
// 滞后窗:已触发过则用恢复阈值
bool isTriggered = cond.LastTriggered.HasValue;
double target = isTriggered
? (double)(cond.RecoveryThreshold_Numeric ?? cond.TargetValue_Number ?? 0)
: (double)(cond.TargetValue_Number ?? 0);
bool met = Compare(actualValue, cond.CompareOperator ?? "大于", target);
results.Add(met);
if (met) cond.LastTriggered = DateTime.Now;
}
bool finalResult = rule.JudgmentMode == "AND"
? results.All(r => r)
: results.Any(r => r);
return Task.FromResult(finalResult);
}
private static bool Compare(double? actual, string op, double target)
{
double v = actual ?? double.MinValue;
return op switch
{
"大于" => v > target,
"小于" => v < target,
"等于" => Math.Abs(v - target) < 0.001,
"大于等于" => v >= target,
"小于等于" => v <= target,
"不等于" => Math.Abs(v - target) > 0.001,
_ => false
};
}
// ═══════════════════════════════════════════
// 动作执行
// ═══════════════════════════════════════════
private async Task ExecuteActionsAsync(warehouse_rule rule,
Dictionary<int, (string adapterCode, string sourceId, string baseUrl)> deviceMap)
{
var actions = rule.warehouse_ruleaction ?? new();
if (!actions.Any()) return;
// 冷却检查
if (rule.LastTriggered.HasValue && rule.CooldownSec > 0)
{
if ((DateTime.Now - rule.LastTriggered.Value).TotalSeconds < rule.CooldownSec)
return;
}
var tasks = actions.Select(a => ExecuteSingleActionAsync(a, deviceMap));
await Task.WhenAll(tasks);
}
private async Task ExecuteSingleActionAsync(warehouse_ruleaction action,
Dictionary<int, (string adapterCode, string sourceId, string baseUrl)> deviceMap)
{
var actionType = action.ActionType ?? action.Type ?? "控制";
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
switch (actionType)
{
case "控制":
if (action.DeviceId.HasValue && deviceMap.TryGetValue(action.DeviceId.Value, out var dev))
{
if (!string.IsNullOrEmpty(dev.baseUrl))
{
var pointIndex = action.ValueId ?? 0;
var value = (double)(action.TargetValue_Switch == "开" ? 1 : action.TargetValue_Number ?? 0);
await _gatewayClient.ControlDeviceAsync(dev.baseUrl, dev.adapterCode, dev.sourceId, pointIndex, value);
}
}
break;
case "告警":
if (action.Alert == "是" && action.DeviceId.HasValue)
{
var alarm = new iot_alarm
{
SourceAlarmId = $"rule-{action.RuleID}-{DateTime.Now.Ticks}",
DeviceId = action.DeviceId.Value,
AlarmLevel = "重要",
AlarmDesc = action.AlertMessage ?? $"规则触发",
StartTime = DateTime.Now,
State = "未确认",
AdapterCode = "RuleEngine",
CreateDate = DateTime.Now
};
_alarmRepo.Add(alarm);
}
break;
case "通知":
await _hub.Clients.All.SendAsync("RuleTriggered", new
{
title = action.AlertMessage ?? "规则触发",
alertMessage = action.AlertMessage,
deviceId = action.DeviceId
}, cts.Token);
break;
}
}
catch (OperationCanceledException) { /* 超时 */ }
catch { /* 单动作失败不阻塞 */ }
}
}