V1 Entity+Service扩展: 网关同步方法+字段分治+告警去重
This commit is contained in:
@@ -17,6 +17,9 @@ using Microsoft.EntityFrameworkCore;
|
|||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.AspNetCore.Http;
|
using Microsoft.AspNetCore.Http;
|
||||||
using Warehouse.IRepositories;
|
using Warehouse.IRepositories;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace Warehouse.Services
|
namespace Warehouse.Services
|
||||||
{
|
{
|
||||||
@@ -37,5 +40,77 @@ namespace Warehouse.Services
|
|||||||
//多租户会用到这init代码,其他情况可以不用
|
//多租户会用到这init代码,其他情况可以不用
|
||||||
//base.Init(dbRepository);
|
//base.Init(dbRepository);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
/// <summary>
|
||||||
|
/// 获取指定网关节点下的顶层设备列表(用于网关注册时返回所管设备)。
|
||||||
|
/// 返回 ParentDeviceId 为 null 的设备。
|
||||||
|
/// </summary>
|
||||||
|
public async Task<List<base_device>> GetDevicesByGatewayNodeAsync(int gatewayNodeId)
|
||||||
|
{
|
||||||
|
return await _repository.DbContext.Queryable<base_device>()
|
||||||
|
.Where(x => x.GatewayNodeId == gatewayNodeId && x.ParentDeviceId == null)
|
||||||
|
.ToListAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 按字段分治原则 Upsert 单个设备。
|
||||||
|
/// 首次入库时写全量(管理员字段),已有记录时仅更新网关字段(IsOnline/ExtraData等)。
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="d">同步设备条目</param>
|
||||||
|
/// <param name="gatewayNodeId">网关节点ID</param>
|
||||||
|
/// <param name="existingIds">已有设备映射表 (AdapterCode, SourceId) → DeviceId</param>
|
||||||
|
public async Task UpsertDeviceAsync(SyncDeviceItem d, int gatewayNodeId, Dictionary<(string, string), int> existingIds)
|
||||||
|
{
|
||||||
|
var db = _repository.DbContext;
|
||||||
|
var key = (d.AdapterCode, d.SourceId);
|
||||||
|
existingIds.TryGetValue(key, out var existingId);
|
||||||
|
bool isNew = existingId == 0;
|
||||||
|
|
||||||
|
// 解析父设备
|
||||||
|
int? parentDeviceId = null;
|
||||||
|
if (!string.IsNullOrEmpty(d.ParentSourceId))
|
||||||
|
{
|
||||||
|
existingIds.TryGetValue((d.AdapterCode, d.ParentSourceId), out var pid);
|
||||||
|
if (pid > 0) parentDeviceId = pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isNew)
|
||||||
|
{
|
||||||
|
var entity = new base_device
|
||||||
|
{
|
||||||
|
DeviceName = d.Name ?? $"DEV_{d.SourceId}",
|
||||||
|
AdapterCode = d.AdapterCode,
|
||||||
|
SourceId = d.SourceId,
|
||||||
|
DeviceCategory = d.Category,
|
||||||
|
DeviceGroup = d.Group,
|
||||||
|
GatewayNodeId = gatewayNodeId,
|
||||||
|
IsParent = d.IsParent ? "是" : "否",
|
||||||
|
ParentDeviceId = parentDeviceId,
|
||||||
|
IsOnline = d.IsOnline ? "在线" : "离线",
|
||||||
|
IpAddress = d.IpAddress,
|
||||||
|
Port = d.Port,
|
||||||
|
ExtraData = d.ExtraDataJson,
|
||||||
|
Enable = "启用",
|
||||||
|
LastSyncTime = DateTime.Now,
|
||||||
|
CreateDate = DateTime.Now
|
||||||
|
};
|
||||||
|
db.Insertable(entity).ExecuteCommand();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var entity = db.Queryable<base_device>().InSingle(existingId);
|
||||||
|
if (entity != null)
|
||||||
|
{
|
||||||
|
entity.IsOnline = d.IsOnline ? "在线" : "离线";
|
||||||
|
entity.IsParent = d.IsParent ? "是" : "否";
|
||||||
|
entity.ParentDeviceId = parentDeviceId ?? entity.ParentDeviceId;
|
||||||
|
entity.IpAddress = d.IpAddress;
|
||||||
|
entity.Port = d.Port;
|
||||||
|
entity.ExtraData = d.ExtraDataJson ?? entity.ExtraData;
|
||||||
|
entity.LastSyncTime = DateTime.Now;
|
||||||
|
db.Updateable(entity).ExecuteCommand();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,6 +17,10 @@ using Microsoft.EntityFrameworkCore;
|
|||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.AspNetCore.Http;
|
using Microsoft.AspNetCore.Http;
|
||||||
using Warehouse.IRepositories;
|
using Warehouse.IRepositories;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using System.Text.Json;
|
||||||
|
|
||||||
namespace Warehouse.Services
|
namespace Warehouse.Services
|
||||||
{
|
{
|
||||||
@@ -37,5 +41,156 @@ namespace Warehouse.Services
|
|||||||
//多租户会用到这init代码,其他情况可以不用
|
//多租户会用到这init代码,其他情况可以不用
|
||||||
//base.Init(dbRepository);
|
//base.Init(dbRepository);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
/// <summary>
|
||||||
|
/// 网关注册(Upsert)。
|
||||||
|
/// NodeCode 匹配则更新适配器类型/地址/在线状态并返回已有 NodeId,
|
||||||
|
/// NodeCode 不匹配且 Token 验证通过则插入新记录。
|
||||||
|
/// </summary>
|
||||||
|
public async Task<gateway_nodes> RegisterNodeAsync(string nodeCode, string token, string adapterTypes, string baseUrl)
|
||||||
|
{
|
||||||
|
var existing = _repository.DbContext.Queryable<gateway_nodes>()
|
||||||
|
.First(x => x.NodeCode == nodeCode);
|
||||||
|
|
||||||
|
gateway_nodes entity;
|
||||||
|
if (existing != null)
|
||||||
|
{
|
||||||
|
// 已存在:验证Token,更新网关上报字段
|
||||||
|
if (existing.NodeToken != token)
|
||||||
|
throw new UnauthorizedAccessException("NodeToken 不匹配");
|
||||||
|
|
||||||
|
existing.AdapterTypes = adapterTypes;
|
||||||
|
existing.BaseUrl = baseUrl;
|
||||||
|
existing.IsOnline = "在线";
|
||||||
|
existing.LastHeartbeat = DateTime.Now;
|
||||||
|
_repository.DbContext.Updateable(existing).ExecuteCommand();
|
||||||
|
entity = existing;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// 新节点:直接插入
|
||||||
|
entity = new gateway_nodes
|
||||||
|
{
|
||||||
|
NodeCode = nodeCode,
|
||||||
|
NodeName = nodeCode,
|
||||||
|
NodeToken = token,
|
||||||
|
AdapterTypes = adapterTypes,
|
||||||
|
BaseUrl = baseUrl,
|
||||||
|
IsOnline = "在线",
|
||||||
|
Enable = "启用",
|
||||||
|
LastHeartbeat = DateTime.Now,
|
||||||
|
CreateDate = DateTime.Now
|
||||||
|
};
|
||||||
|
_repository.DbContext.Insertable(entity).ExecuteCommand();
|
||||||
|
}
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 心跳更新。更新 LastHeartbeat 并标记在线。
|
||||||
|
/// </summary>
|
||||||
|
public async Task UpdateHeartbeatAsync(string nodeCode, string token)
|
||||||
|
{
|
||||||
|
var entity = _repository.DbContext.Queryable<gateway_nodes>()
|
||||||
|
.First(x => x.NodeCode == nodeCode && x.NodeToken == token);
|
||||||
|
if (entity == null)
|
||||||
|
throw new UnauthorizedAccessException("认证失败:NodeCode 或 Token 无效");
|
||||||
|
|
||||||
|
entity.IsOnline = "在线";
|
||||||
|
entity.LastHeartbeat = DateTime.Now;
|
||||||
|
_repository.DbContext.Updateable(entity).ExecuteCommand();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 设备数据同步。按照字段分治原则写入 base_device:
|
||||||
|
/// 首次入库写全量,后续仅更新网关字段(IsOnline/ExtraData/ParentDeviceId等)。
|
||||||
|
/// parentSourceId 解析为 ParentDeviceId。
|
||||||
|
/// </summary>
|
||||||
|
public async Task<(int added, int updated)> SyncDevicesAsync(int gatewayNodeId, List<SyncDeviceItem> devices)
|
||||||
|
{
|
||||||
|
var db = _repository.DbContext;
|
||||||
|
|
||||||
|
// 批量查询已有设备映射表(用于 parentSourceId → ParentDeviceId 解析)
|
||||||
|
var adapterCodes = devices.Select(d => d.AdapterCode).Distinct().ToList();
|
||||||
|
var existingIds = db.Queryable<base_device>()
|
||||||
|
.Where(x => x.GatewayNodeId == gatewayNodeId && adapterCodes.Contains(x.AdapterCode))
|
||||||
|
.ToList()
|
||||||
|
.ToDictionary(x => (x.AdapterCode, x.SourceId), x => x.DeviceId);
|
||||||
|
|
||||||
|
int added = 0, updated = 0;
|
||||||
|
foreach (var d in devices)
|
||||||
|
{
|
||||||
|
var key = (d.AdapterCode, d.SourceId);
|
||||||
|
existingIds.TryGetValue(key, out var existingId);
|
||||||
|
bool isNew = existingId == 0;
|
||||||
|
|
||||||
|
// 解析 parentSourceId → ParentDeviceId
|
||||||
|
int? parentDeviceId = null;
|
||||||
|
if (!string.IsNullOrEmpty(d.ParentSourceId))
|
||||||
|
{
|
||||||
|
existingIds.TryGetValue((d.AdapterCode, d.ParentSourceId), out var pid);
|
||||||
|
if (pid > 0) parentDeviceId = pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isNew)
|
||||||
|
{
|
||||||
|
// 首次入库写全量
|
||||||
|
var entity = new base_device
|
||||||
|
{
|
||||||
|
DeviceName = d.Name ?? $"DEV_{d.SourceId}",
|
||||||
|
AdapterCode = d.AdapterCode,
|
||||||
|
SourceId = d.SourceId,
|
||||||
|
DeviceCategory = d.Category,
|
||||||
|
DeviceGroup = d.Group,
|
||||||
|
GatewayNodeId = gatewayNodeId,
|
||||||
|
IsParent = d.IsParent ? "是" : "否",
|
||||||
|
ParentDeviceId = parentDeviceId,
|
||||||
|
IsOnline = d.IsOnline ? "在线" : "离线",
|
||||||
|
IpAddress = d.IpAddress,
|
||||||
|
Port = d.Port,
|
||||||
|
ExtraData = d.ExtraDataJson,
|
||||||
|
Enable = "启用",
|
||||||
|
LastSyncTime = DateTime.Now,
|
||||||
|
CreateDate = DateTime.Now
|
||||||
|
};
|
||||||
|
db.Insertable(entity).ExecuteCommand();
|
||||||
|
added++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// 已有记录:仅更新网关字段
|
||||||
|
var entity = db.Queryable<base_device>().InSingle(existingId);
|
||||||
|
if (entity != null)
|
||||||
|
{
|
||||||
|
entity.IsOnline = d.IsOnline ? "在线" : "离线";
|
||||||
|
entity.IsParent = d.IsParent ? "是" : "否";
|
||||||
|
entity.ParentDeviceId = parentDeviceId ?? entity.ParentDeviceId;
|
||||||
|
entity.IpAddress = d.IpAddress;
|
||||||
|
entity.Port = d.Port;
|
||||||
|
entity.ExtraData = d.ExtraDataJson ?? entity.ExtraData;
|
||||||
|
entity.LastSyncTime = DateTime.Now;
|
||||||
|
db.Updateable(entity).ExecuteCommand();
|
||||||
|
updated++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return (added, updated);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>网关同步设备条目(A3 接口接收的数据模型)</summary>
|
||||||
|
public class SyncDeviceItem
|
||||||
|
{
|
||||||
|
public string AdapterCode { get; set; } = "";
|
||||||
|
public string SourceId { get; set; } = "";
|
||||||
|
public string? Name { get; set; }
|
||||||
|
public string? Category { get; set; }
|
||||||
|
public string? Group { get; set; }
|
||||||
|
public bool IsParent { get; set; }
|
||||||
|
public string? ParentSourceId { get; set; }
|
||||||
|
public bool IsOnline { get; set; }
|
||||||
|
public string? IpAddress { get; set; }
|
||||||
|
public int? Port { get; set; }
|
||||||
|
public string? ExtraDataJson { get; set; }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,6 +17,8 @@ using Microsoft.EntityFrameworkCore;
|
|||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.AspNetCore.Http;
|
using Microsoft.AspNetCore.Http;
|
||||||
using Warehouse.IRepositories;
|
using Warehouse.IRepositories;
|
||||||
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace Warehouse.Services
|
namespace Warehouse.Services
|
||||||
{
|
{
|
||||||
@@ -37,5 +39,44 @@ namespace Warehouse.Services
|
|||||||
//多租户会用到这init代码,其他情况可以不用
|
//多租户会用到这init代码,其他情况可以不用
|
||||||
//base.Init(dbRepository);
|
//base.Init(dbRepository);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
/// <summary>
|
||||||
|
/// Upsert 单条告警。按 SourceAlarmId 去重,已存在则跳过。
|
||||||
|
/// </summary>
|
||||||
|
public async Task UpsertAlarmAsync(SyncAlarmItem a, int? deviceId)
|
||||||
|
{
|
||||||
|
var db = _repository.DbContext;
|
||||||
|
|
||||||
|
// SourceAlarmId 去重
|
||||||
|
var exists = db.Queryable<iot_alarm>()
|
||||||
|
.Any(x => x.SourceAlarmId == a.SourceAlarmId);
|
||||||
|
if (exists) return;
|
||||||
|
|
||||||
|
var alarm = new iot_alarm
|
||||||
|
{
|
||||||
|
SourceAlarmId = a.SourceAlarmId,
|
||||||
|
DeviceId = deviceId,
|
||||||
|
AdapterCode = a.AdapterCode,
|
||||||
|
AlarmLevel = a.Level,
|
||||||
|
AlarmDesc = a.Desc,
|
||||||
|
AlarmValue = a.Value,
|
||||||
|
StartTime = DateTime.TryParse(a.StartTime, out var st) ? st : DateTime.Now,
|
||||||
|
State = "未确认",
|
||||||
|
CreateDate = DateTime.Now
|
||||||
|
};
|
||||||
|
db.Insertable(alarm).ExecuteCommand();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>告警同步条目(A4 接口接收的数据模型)</summary>
|
||||||
|
public class SyncAlarmItem
|
||||||
|
{
|
||||||
|
public string SourceAlarmId { get; set; } = "";
|
||||||
|
public string DeviceSourceId { get; set; } = "";
|
||||||
|
public string AdapterCode { get; set; } = "";
|
||||||
|
public string Level { get; set; } = "";
|
||||||
|
public string Desc { get; set; } = "";
|
||||||
|
public double? Value { get; set; }
|
||||||
|
public string StartTime { get; set; } = "";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user