Fix-F1: RealtimePollJob实现+A1自注册+B组认证中间件

This commit is contained in:
2026-06-03 17:35:36 +08:00
parent 7adf6407d5
commit 6835ce86ce
3 changed files with 91 additions and 6 deletions

View File

@@ -1,20 +1,87 @@
using Quartz; using Quartz;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using System; using System;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using VolPro.Entity.DomainModels;
using Warehouse.IRepositories;
using Warehouse.IServices;
namespace VolPro.Warehouse.Services; namespace VolPro.Warehouse.Services;
/// <summary> /// <summary>
/// 实时数据轮询任务Phase 2 完善) /// 实时数据轮询任务。
/// 定时轮询 MC4.0 IoT 设备实时值 → 更新 iot_devicedata。 /// 定时轮询在线 MC4 IoT 设备实时值 → 写入 iot_devicedata
/// Cron 建议: 每 10 秒 ("0/10 * * * * ?") /// Cron 建议: 每 10 秒 ("0/10 * * * * ?")
///
/// 设备与网关的关联通过 AdapterCode 前缀匹配(如设备 AdapterCode="MC4:31ku" 匹配网关 AdapterTypes="MC4:31ku")。
/// </summary> /// </summary>
public class RealtimePollJob : IJob public class RealtimePollJob : IJob
{ {
public Task Execute(IJobExecutionContext context) public async Task Execute(IJobExecutionContext context)
{ {
// TODO: Phase 2 — 遍历在线 MC4 网关,轮询实时值写入 iot_devicedata var sp = (IServiceProvider)context.JobDetail.JobDataMap["ServiceProvider"];
return Task.CompletedTask; if (sp == null) return;
var gwSvc = sp.GetService<Igateway_nodesService>();
var devRepo = sp.GetService<Ibase_deviceRepository>();
var dataRepo = sp.GetService<Iiot_devicedataRepository>();
var gatewayClient = sp.GetService<GatewayClient>();
if (gwSvc == null || devRepo == null || dataRepo == null || gatewayClient == null) return;
// 1. 查在线 MC4 网关
var onlineNodes = await gwSvc.FindAsIQueryable(x =>
x.IsOnline == "在线" && x.AdapterTypes != null && x.AdapterTypes.Contains("MC4")).ToArrayAsync();
foreach (var node in onlineNodes)
{
try
{
var baseUrl = node.BaseUrl;
if (string.IsNullOrEmpty(baseUrl)) continue;
// 2. 解析网关管理的适配器前缀列表
var adapterPrefixes = (node.AdapterTypes ?? "")
.Split(',', StringSplitOptions.RemoveEmptyEntries)
.Select(t => t.Trim());
// 3. 查该网关下在线的 IoT 设备AdapterCode 前缀匹配)
var devices = await devRepo.FindAsIQueryable(d =>
d.DeviceGroup == "IoT设备" && d.IsOnline == "在线").ToListAsync();
var matchedDevices = devices.Where(d =>
adapterPrefixes.Any(p => (d.AdapterCode ?? "").StartsWith(p))).ToList();
if (!matchedDevices.Any()) continue;
// 4. 逐设备调网关 B4 获取实时值
foreach (var dev in matchedDevices)
{
try
{
var result = await gatewayClient.GetRealtimeAsync(baseUrl, dev.AdapterCode, dev.SourceId);
if (result == null) continue;
var root = result.RootElement;
var points = root.TryGetProperty("pointValues", out var pv) ? pv
: root.TryGetProperty("rows", out var r) ? r
: root;
// 结果可能是 PointValue[] 数组,取第一个点位写入
if (points.ValueKind == System.Text.Json.JsonValueKind.Array && points.GetArrayLength() > 0)
{
var first = points[0];
var entry = new iot_devicedata
{
DeviceId = dev.DeviceId,
PointValue = first.TryGetProperty("value", out var v) ? v.GetDecimal() : (decimal?)null,
UpdateTime = DateTime.Now,
Interval = first.TryGetProperty("interval", out var iv) ? iv.GetInt32() : 10
};
dataRepo.Add(entry);
}
}
catch { /* 单设备失败不阻塞其他设备 */ }
}
}
catch { /* 单网关失败不阻塞其他网关 */ }
}
} }
} }

View File

@@ -83,6 +83,22 @@ var adapterTypes = string.Join(",", registry.All.Select(a => a.AdapterCode));
await registry.InitializeAllAsync(); await registry.InitializeAllAsync();
Console.WriteLine($"[Gateway] {registry.All.Count} 个适配器已注册: {adapterTypes}"); Console.WriteLine($"[Gateway] {registry.All.Count} 个适配器已注册: {adapterTypes}");
// ── A1: 向 Vol.Pro 注册当前网关节点 ──
var nodeCode = gwCfg["NodeCode"] ?? "gw-default";
var nodeToken = Environment.GetEnvironmentVariable("SECMPS_GATEWAY_TOKEN") ?? gwCfg["NodeToken"] ?? "";
try
{
var registerReq = new GatewayRegisterRequest
{
NodeCode = nodeCode, Token = nodeToken,
AdapterTypes = adapterTypes,
BaseUrl = $"http://localhost:{app.Urls.FirstOrDefault()?.Split(':').LastOrDefault() ?? "5100"}"
};
var registerResult = await clientFactory.RegisterAsync(registerReq);
Console.WriteLine($"[Gateway] A1 注册完成: nodeCode={nodeCode}, adapters={adapterTypes}");
}
catch (Exception ex) { Console.Error.WriteLine($"[Gateway] A1 注册失败: {ex.Message}"); }
// ═══════════════════════════════════════════════════════════════ // ═══════════════════════════════════════════════════════════════
// B 组路由(管理端 / Vol.Pro → 网关) // B 组路由(管理端 / Vol.Pro → 网关)
// 所有路由通过适配器编码查找对应适配器,按能力接口分发请求 // 所有路由通过适配器编码查找对应适配器,按能力接口分发请求

View File

@@ -24,7 +24,9 @@
"NodeCode": "gw-31ku", "NodeCode": "gw-31ku",
"NodeToken": "changeme", "NodeToken": "changeme",
"HeartbeatIntervalSec": 15, "HeartbeatIntervalSec": 15,
"AdapterInitTimeoutSec": 30 "AdapterInitTimeoutSec": 30,
"GatewayKey": null,
"_comment_NodeToken": "生产环境由 SECMPS_GATEWAY_TOKEN 环境变量注入"
}, },
"KMS": [ "KMS": [
{ {