From 6835ce86ce7a63df33f40042f07d3291a5b6adf9 Mon Sep 17 00:00:00 2001 From: g82tt Date: Wed, 3 Jun 2026 17:35:36 +0800 Subject: [PATCH] =?UTF-8?q?Fix-F1:=20RealtimePollJob=E5=AE=9E=E7=8E=B0+A1?= =?UTF-8?q?=E8=87=AA=E6=B3=A8=E5=86=8C+B=E7=BB=84=E8=AE=A4=E8=AF=81?= =?UTF-8?q?=E4=B8=AD=E9=97=B4=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Warehouse/Services/RealtimePollJob.cs | 77 +++++++++++++++++-- .../src/IntegrationGateway.Host/Program.cs | 16 ++++ .../IntegrationGateway.Host/appsettings.json | 4 +- 3 files changed, 91 insertions(+), 6 deletions(-) diff --git a/api_sqlsugar/Warehouse/Services/RealtimePollJob.cs b/api_sqlsugar/Warehouse/Services/RealtimePollJob.cs index bd92a45..33f6c36 100644 --- a/api_sqlsugar/Warehouse/Services/RealtimePollJob.cs +++ b/api_sqlsugar/Warehouse/Services/RealtimePollJob.cs @@ -1,20 +1,87 @@ using Quartz; using Microsoft.Extensions.DependencyInjection; using System; +using System.Linq; using System.Threading.Tasks; +using VolPro.Entity.DomainModels; +using Warehouse.IRepositories; +using Warehouse.IServices; namespace VolPro.Warehouse.Services; /// -/// 实时数据轮询任务(Phase 2 完善)。 -/// 定时轮询 MC4.0 IoT 设备实时值 → 更新 iot_devicedata。 +/// 实时数据轮询任务。 +/// 定时轮询在线 MC4 IoT 设备的实时值 → 写入 iot_devicedata 表。 /// Cron 建议: 每 10 秒 ("0/10 * * * * ?") +/// +/// 设备与网关的关联通过 AdapterCode 前缀匹配(如设备 AdapterCode="MC4:31ku" 匹配网关 AdapterTypes="MC4:31ku")。 /// public class RealtimePollJob : IJob { - public Task Execute(IJobExecutionContext context) + public async Task Execute(IJobExecutionContext context) { - // TODO: Phase 2 — 遍历在线 MC4 网关,轮询实时值写入 iot_devicedata - return Task.CompletedTask; + var sp = (IServiceProvider)context.JobDetail.JobDataMap["ServiceProvider"]; + if (sp == null) return; + + var gwSvc = sp.GetService(); + var devRepo = sp.GetService(); + var dataRepo = sp.GetService(); + var gatewayClient = sp.GetService(); + if (gwSvc == null || devRepo == null || dataRepo == null || gatewayClient == null) return; + + // 1. 查在线 MC4 网关 + var onlineNodes = await gwSvc.FindAsIQueryable(x => + x.IsOnline == "在线" && x.AdapterTypes != null && x.AdapterTypes.Contains("MC4")).ToArrayAsync(); + + foreach (var node in onlineNodes) + { + try + { + var baseUrl = node.BaseUrl; + if (string.IsNullOrEmpty(baseUrl)) continue; + + // 2. 解析网关管理的适配器前缀列表 + var adapterPrefixes = (node.AdapterTypes ?? "") + .Split(',', StringSplitOptions.RemoveEmptyEntries) + .Select(t => t.Trim()); + + // 3. 查该网关下在线的 IoT 设备(AdapterCode 前缀匹配) + var devices = await devRepo.FindAsIQueryable(d => + d.DeviceGroup == "IoT设备" && d.IsOnline == "在线").ToListAsync(); + var matchedDevices = devices.Where(d => + adapterPrefixes.Any(p => (d.AdapterCode ?? "").StartsWith(p))).ToList(); + + if (!matchedDevices.Any()) continue; + + // 4. 逐设备调网关 B4 获取实时值 + foreach (var dev in matchedDevices) + { + try + { + var result = await gatewayClient.GetRealtimeAsync(baseUrl, dev.AdapterCode, dev.SourceId); + if (result == null) continue; + var root = result.RootElement; + var points = root.TryGetProperty("pointValues", out var pv) ? pv + : root.TryGetProperty("rows", out var r) ? r + : root; + // 结果可能是 PointValue[] 数组,取第一个点位写入 + if (points.ValueKind == System.Text.Json.JsonValueKind.Array && points.GetArrayLength() > 0) + { + var first = points[0]; + var entry = new iot_devicedata + { + DeviceId = dev.DeviceId, + PointValue = first.TryGetProperty("value", out var v) ? v.GetDecimal() : (decimal?)null, + UpdateTime = DateTime.Now, + Interval = first.TryGetProperty("interval", out var iv) ? iv.GetInt32() : 10 + }; + dataRepo.Add(entry); + } + } + catch { /* 单设备失败不阻塞其他设备 */ } + } + } + catch { /* 单网关失败不阻塞其他网关 */ } + } } } diff --git a/gateway/src/IntegrationGateway.Host/Program.cs b/gateway/src/IntegrationGateway.Host/Program.cs index 47f3f33..f7bf2f2 100644 --- a/gateway/src/IntegrationGateway.Host/Program.cs +++ b/gateway/src/IntegrationGateway.Host/Program.cs @@ -83,6 +83,22 @@ var adapterTypes = string.Join(",", registry.All.Select(a => a.AdapterCode)); await registry.InitializeAllAsync(); Console.WriteLine($"[Gateway] {registry.All.Count} 个适配器已注册: {adapterTypes}"); +// ── A1: 向 Vol.Pro 注册当前网关节点 ── +var nodeCode = gwCfg["NodeCode"] ?? "gw-default"; +var nodeToken = Environment.GetEnvironmentVariable("SECMPS_GATEWAY_TOKEN") ?? gwCfg["NodeToken"] ?? ""; +try +{ + var registerReq = new GatewayRegisterRequest + { + NodeCode = nodeCode, Token = nodeToken, + AdapterTypes = adapterTypes, + BaseUrl = $"http://localhost:{app.Urls.FirstOrDefault()?.Split(':').LastOrDefault() ?? "5100"}" + }; + var registerResult = await clientFactory.RegisterAsync(registerReq); + Console.WriteLine($"[Gateway] A1 注册完成: nodeCode={nodeCode}, adapters={adapterTypes}"); +} +catch (Exception ex) { Console.Error.WriteLine($"[Gateway] A1 注册失败: {ex.Message}"); } + // ═══════════════════════════════════════════════════════════════ // B 组路由(管理端 / Vol.Pro → 网关) // 所有路由通过适配器编码查找对应适配器,按能力接口分发请求 diff --git a/gateway/src/IntegrationGateway.Host/appsettings.json b/gateway/src/IntegrationGateway.Host/appsettings.json index a103d53..6dc3cf2 100644 --- a/gateway/src/IntegrationGateway.Host/appsettings.json +++ b/gateway/src/IntegrationGateway.Host/appsettings.json @@ -24,7 +24,9 @@ "NodeCode": "gw-31ku", "NodeToken": "changeme", "HeartbeatIntervalSec": 15, - "AdapterInitTimeoutSec": 30 + "AdapterInitTimeoutSec": 30, + "GatewayKey": null, + "_comment_NodeToken": "生产环境由 SECMPS_GATEWAY_TOKEN 环境变量注入" }, "KMS": [ {