Files
EJM_Display/DataCenter/DataCenter.cpp
2025-10-20 22:28:37 +08:00

235 lines
7.5 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "DataCenter.h"
#include <QDebug>
#include <FileOperation/ConfigFiles.h>
#include <PublicFunctions/Basic.h>
#include <QCoreApplication>
DataCenter* DataCenter::m_instance = nullptr;
QMutex DataCenter::m_mutex; // 仅用于单例模式的线程安全
DataCenter::DataCenter(OpcUaManager *opcManager, QObject *parent)
: QObject(parent)
, m_opcManager(opcManager)
, m_batchSize(10)
, m_currentReadIndex(0)
{
Q_ASSERT(opcManager != nullptr);
// 创建专用线程和工作对象
m_opcThread = new QThread(this);
m_worker = new OpcWorker(opcManager); // 不设置父对象
// 将工作对象移动到线程
m_worker->moveToThread(m_opcThread);
// 连接定时器信号到工作对象的槽函数
connect(&m_worker->m_cyclicTimer, &QTimer::timeout,
m_worker, &OpcWorker::onTimerTimeout, Qt::DirectConnection);
// 连接工作对象信号到DataCenter
connect(m_worker, &OpcWorker::timeout, this, &DataCenter::onTimerTimeout, Qt::QueuedConnection);
// 连接OPC管理器信号到工作对象
connect(m_opcManager, &OpcUaManager::reconnected, this, &DataCenter::refreshAllNodes, Qt::QueuedConnection);
// 启动线程
m_opcThread->start();
}
DataCenter::~DataCenter()
{
if (m_opcThread && m_opcThread->isRunning()) {
m_opcThread->quit();
if (!m_opcThread->wait(500)) { // 等待500ms
m_opcThread->terminate();
m_opcThread->wait();
}
}
delete m_worker; // 释放工作对象
m_worker = nullptr;
}
DataCenter* DataCenter::instance(QObject *parent)
{
if (!m_instance) {
QMutexLocker locker(&m_mutex); // 单例创建时的线程安全锁
if (!m_instance) {
OpcUaManager* opc = OpcUaManager::instance(parent);
m_instance = new DataCenter(opc, parent);
}
}
return m_instance;
}
void DataCenter::initData()
{
// 清空全局变量
gOPC_NodeList.clear();
gOPC_NodeName.clear();
gOPC_NodeValue.clear();
gOPC_VarName.clear();
gOPC_SqlTable.clear();
gOPC_SqlField.clear();
m_initNodeIds.clear();
// 读取初始化节点配置
QList<QStringList> fileDataInit = ConfigFiles().ReadFile_Csv(ConfigurationPath + "OpcUa配置/OpcUA_初始化读取节点_配置.csv");
for (int row = 1; row < fileDataInit.size()-1; row++) {
if (fileDataInit.at(row).size() >= 4) {
addMonitoredNode(fileDataInit.at(row)[3], fileDataInit.at(row)[0],
fileDataInit.at(row)[2], fileDataInit.at(row)[1],
fileDataInit.at(row)[2]);
m_initNodeIds.append(fileDataInit.at(row)[3]);
}
}
// 读取循环节点配置
QList<QStringList> fileData = ConfigFiles().ReadFile_Csv(ConfigurationPath + "OpcUa配置/OpcUA_循环读取节点_配置.csv");
for (int row = 1; row < fileData.size()-1; row++) {
if (fileData.at(row).size() >= 4) {
addMonitoredNode(fileData.at(row)[3], fileData.at(row)[0],
fileData.at(row)[2], fileData.at(row)[1],
fileData.at(row)[2]);
}
}
qDebug() << "数据初始化完成,总条目数:"<<fileData.size()<<" 导入节点总数:" << gOPC_NodeList.size();
}
void DataCenter::addMonitoredNode(const QString &nodeId, const QString &nodeName,
const QString &varName, const QString &tableName,
const QString &fieldName)
{
QString mNodeID = nodeId;
if(!mNodeID.contains("ns=6;s=::AsGlobalPV:"))
mNodeID = "ns=6;s=::AsGlobalPV:"+mNodeID;
if (gOPC_NodeName.contains(mNodeID)) {
qWarning() << "节点已存在:" << mNodeID;
return;
}
// 使用全局变量存储
gOPC_NodeName[mNodeID] = nodeName.isEmpty() ? mNodeID : nodeName;
gOPC_VarName[mNodeID] = varName.isEmpty() ? mNodeID : varName;
gOPC_SqlTable[mNodeID] = tableName;
gOPC_SqlField[mNodeID] = fieldName;
gOPC_NodeList.append(mNodeID);
gOPC_NodeValue[mNodeID] = QVariant();
}
void DataCenter::browseRecursive(const QString &nodeId)
{
doBrowse(nodeId, 6, "");
}
void DataCenter::doBrowse(const QString &nodeId, int depth, QString prefix)
{
if (!m_opcManager->getClient() || m_opcManager->getClient()->state() != QOpcUaClient::Connected) {
qDebug() << "客户端未连接";
return;
}
// 命中目标节点就添加并返回
if (nodeId.startsWith("ns=6;s=::", Qt::CaseInsensitive) && !nodeId.contains("#")) {
addMonitoredNode(nodeId, "", "", "", "");
return;
}
QOpcUaNode *node = m_opcManager->getCachedNode(nodeId);
if (!node) {
return;
}
// 浏览子节点使用QueuedConnection确保跨线程安全
connect(node, &QOpcUaNode::browseFinished,
this, [this, depth, prefix](QVector<QOpcUaReferenceDescription> refs, QOpcUa::UaStatusCode status){
if (status != QOpcUa::UaStatusCode::Good) {
qDebug() << "浏览失败,状态码:" << status;
return;
}
if (refs.isEmpty()) {
return;
}
for (const auto &ref : refs) {
QString childId = ref.targetNodeId().nodeId();
doBrowse(childId, depth + 1, "└─ ");
}
}, Qt::QueuedConnection);
node->browseChildren(QOpcUa::ReferenceTypeId::HierarchicalReferences,
QOpcUa::NodeClass::Undefined);
}
void DataCenter::startCyclicRead(int intervalMs)
{
if (gOPC_NodeList.isEmpty()) {
qWarning()<<"没有需要监控的节点,请先添加节点";
return;
}
int actualInterval = qMax(intervalMs, 20); // 最小间隔10ms
QMetaObject::invokeMethod(m_worker, "startCyclicTimer",
Qt::QueuedConnection,
Q_ARG(int, actualInterval));
m_currentReadIndex = 0;
}
void DataCenter::stopCyclicRead()
{
QMetaObject::invokeMethod(m_worker, "stopCyclicTimer", Qt::QueuedConnection);
}
void DataCenter::refreshAllNodes()
{
if (gOPC_NodeList.isEmpty())
return;
m_currentReadIndex = 0;
// 检查定时器状态并启动
QMetaObject::invokeMethod(m_worker, [this]() {
if (!m_worker->m_cyclicTimer.isActive()) {
m_worker->startCyclicTimer(10); // 使用默认间隔
}
}, Qt::QueuedConnection);
onTimerTimeout();
}
QVariant DataCenter::getNodeValue(const QString &nodeId) const
{
return gOPC_NodeValue.value(nodeId, QVariant());
}
QVariant DataCenter::getNodeValueByName(const QString &nodeName) const
{
for (auto it = gOPC_NodeName.constBegin(); it != gOPC_NodeName.constEnd(); ++it) {
if (it.value() == nodeName) {
return gOPC_NodeValue.value(it.key(), QVariant());
}
}
return QVariant();
}
void DataCenter::onTimerTimeout()
{
if (gOPC_NodeList.isEmpty()) return;
// 准备批量读取的节点
QList<QString> batchNodes;
int remaining = gOPC_NodeList.size() - m_currentReadIndex;
int takeCount = qMin(m_batchSize, remaining);
for (int i = 0; i < takeCount; ++i) {
batchNodes.append(gOPC_NodeList[m_currentReadIndex + i]);
}
m_currentReadIndex += takeCount;
// 循环索引处理
if (m_currentReadIndex >= gOPC_NodeList.size()) {
m_currentReadIndex = 0;
}
if (!batchNodes.isEmpty()) {
QMetaObject::invokeMethod(m_opcManager, "readNodesValue",
Qt::QueuedConnection,
Q_ARG(QList<QString>, batchNodes));
}
}