Files
SecMPS/api_sqlsugar/Warehouse/Services/RealtimePollJob.cs

95 lines
4.1 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Quartz;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Linq;
using System.Threading.Tasks;
using VolPro.Entity.DomainModels;
using Warehouse.IRepositories;
using Microsoft.Extensions.Configuration;
using System.Net.Http;
using Warehouse.IServices;
namespace VolPro.Warehouse.Services;
/// <summary>
/// 实时数据轮询任务。
/// 定时轮询在线 MC4 IoT 设备的实时值 → 写入 iot_devicedata 表。
/// Cron 建议: 每 10 秒 ("0/10 * * * * ?")
///
/// 设备与网关的关联通过 AdapterCode 前缀匹配(如设备 AdapterCode="MC4:31ku" 匹配网关 AdapterTypes="MC4:31ku")。
/// </summary>
public class RealtimePollJob : IJob
{
private readonly IServiceProvider _sp;
public RealtimePollJob(IServiceProvider sp) { _sp = sp; }
public async Task Execute(IJobExecutionContext? context)
{
var sp = _sp;
if (sp == null) return;
var gwSvc = sp.GetService<Igateway_nodesService>();
var devRepo = sp.GetService<Ibase_deviceRepository>();
var dataRepo = sp.GetService<Iiot_devicedataRepository>();
var httpFactory = sp.GetService<IHttpClientFactory>();
var config = sp.GetService<IConfiguration>();
var gatewayClient = httpFactory != null ? new GatewayClient(httpFactory, config!) : null;
if (gwSvc == null || devRepo == null || dataRepo == null || gatewayClient == null) return;
// 1. 查在线 MC4 网关
var onlineNodes = await gwSvc.FindAsIQueryable(x =>
x.IsOnline == "在线" && x.AdapterTypes != null && x.AdapterTypes.Contains("MC4")).ToArrayAsync();
foreach (var node in onlineNodes)
{
try
{
var baseUrl = node.BaseUrl;
if (string.IsNullOrEmpty(baseUrl)) continue;
// 2. 解析网关管理的适配器前缀列表
var adapterPrefixes = (node.AdapterTypes ?? "")
.Split(',', StringSplitOptions.RemoveEmptyEntries)
.Select(t => t.Trim());
// 3. 查该网关下在线的 IoT 设备AdapterCode 前缀匹配)
var devices = await devRepo.FindAsIQueryable(d =>
d.DeviceGroup == "IoT设备" && d.IsOnline == "在线").ToListAsync();
var matchedDevices = devices.Where(d =>
adapterPrefixes.Any(p => (d.AdapterCode ?? "").StartsWith(p))).ToList();
if (!matchedDevices.Any()) continue;
// 4. 逐设备调网关 B4 获取实时值
foreach (var dev in matchedDevices)
{
try
{
var result = await gatewayClient.GetRealtimeAsync(baseUrl, dev.AdapterCode, dev.SourceId);
if (result == null) continue;
var root = result.RootElement;
var points = root.TryGetProperty("pointValues", out var pv) ? pv
: root.TryGetProperty("rows", out var r) ? r
: root;
// 结果可能是 PointValue[] 数组,取第一个点位写入
if (points.ValueKind == System.Text.Json.JsonValueKind.Array && points.GetArrayLength() > 0)
{
var first = points[0];
var entry = new iot_devicedata
{
DeviceId = dev.DeviceId,
PointValue = first.TryGetProperty("value", out var v) ? v.GetDecimal() : (decimal?)null,
UpdateTime = DateTime.Now,
Interval = first.TryGetProperty("interval", out var iv) ? iv.GetInt32() : 10
};
dataRepo.Add(entry);
}
}
catch { /* 单设备失败不阻塞其他设备 */ }
}
}
catch { /* 单网关失败不阻塞其他网关 */ }
}
}
}