#include "DataCenter.h" #include #include #include #include DataCenter* DataCenter::m_instance = nullptr; QMutex DataCenter::m_mutex; // 仅用于单例模式的线程安全 DataCenter::DataCenter(OpcUaManager *opcManager, QObject *parent) : QObject(parent) , m_opcManager(opcManager) , m_batchSize(10) , m_currentReadIndex(0) { Q_ASSERT(opcManager != nullptr); // 创建专用线程和工作对象 m_opcThread = new QThread(this); m_worker = new OpcWorker(opcManager); // 不设置父对象 // 将工作对象移动到线程 m_worker->moveToThread(m_opcThread); // 连接定时器信号到工作对象的槽函数 connect(&m_worker->m_cyclicTimer, &QTimer::timeout, m_worker, &OpcWorker::onTimerTimeout, Qt::DirectConnection); // 连接工作对象信号到DataCenter connect(m_worker, &OpcWorker::timeout, this, &DataCenter::onTimerTimeout, Qt::QueuedConnection); // 连接OPC管理器信号到工作对象 connect(m_opcManager, &OpcUaManager::reconnected, this, &DataCenter::refreshAllNodes, Qt::QueuedConnection); // 启动线程 m_opcThread->start(); } DataCenter::~DataCenter() { if (m_opcThread && m_opcThread->isRunning()) { m_opcThread->quit(); if (!m_opcThread->wait(500)) { // 等待500ms m_opcThread->terminate(); m_opcThread->wait(); } } delete m_worker; // 释放工作对象 m_worker = nullptr; } DataCenter* DataCenter::instance(QObject *parent) { if (!m_instance) { QMutexLocker locker(&m_mutex); // 单例创建时的线程安全锁 if (!m_instance) { OpcUaManager* opc = OpcUaManager::instance(parent); m_instance = new DataCenter(opc, parent); } } return m_instance; } void DataCenter::initData() { // 清空全局变量 gOPC_NodeList.clear(); gOPC_NodeName.clear(); gOPC_NodeValue.clear(); gOPC_VarName.clear(); gOPC_SqlTable.clear(); gOPC_SqlField.clear(); m_initNodeIds.clear(); // 读取初始化节点配置 QList fileDataInit = ConfigFiles().ReadFile_Csv("./ProgramConfig/OpcUA_初始化读取节点_配置.csv"); for (int row = 1; row < fileDataInit.size()-1; row++) { if (fileDataInit.at(row).size() >= 4) { addMonitoredNode(fileDataInit.at(row)[3], fileDataInit.at(row)[0], fileDataInit.at(row)[2], fileDataInit.at(row)[1], fileDataInit.at(row)[2]); m_initNodeIds.append(fileDataInit.at(row)[3]); } } // 读取循环节点配置 QList fileData = ConfigFiles().ReadFile_Csv("./ProgramConfig/OpcUA_循环读取节点_配置.csv"); for (int row = 1; row < fileData.size()-1; row++) { if (fileData.at(row).size() >= 4) { addMonitoredNode(fileData.at(row)[3], fileData.at(row)[0], fileData.at(row)[2], fileData.at(row)[1], fileData.at(row)[2]); } } qDebug() << "数据初始化完成,节点总数:" << gOPC_NodeList.size(); } void DataCenter::addMonitoredNode(const QString &nodeId, const QString &nodeName, const QString &varName, const QString &tableName, const QString &fieldName) { if (gOPC_NodeName.contains(nodeId)) { qWarning() << "节点已存在:" << nodeId; return; } // 使用全局变量存储 gOPC_NodeName[nodeId] = nodeName.isEmpty() ? nodeId : nodeName; gOPC_VarName[nodeId] = varName.isEmpty() ? nodeId : varName; gOPC_SqlTable[nodeId] = tableName; gOPC_SqlField[nodeId] = fieldName; gOPC_NodeList.append(nodeId); gOPC_NodeValue[nodeId] = QVariant(); } void DataCenter::browseRecursive(const QString &nodeId) { doBrowse(nodeId, 6, ""); } void DataCenter::doBrowse(const QString &nodeId, int depth, QString prefix) { if (!m_opcManager->getClient() || m_opcManager->getClient()->state() != QOpcUaClient::Connected) { qDebug() << "客户端未连接"; return; } // 命中目标节点就添加并返回 if (nodeId.startsWith("ns=6;s=::", Qt::CaseInsensitive) && !nodeId.contains("#")) { addMonitoredNode(nodeId, "", "", "", ""); return; } QOpcUaNode *node = m_opcManager->getCachedNode(nodeId); if (!node) { return; } // 浏览子节点(使用QueuedConnection确保跨线程安全) connect(node, &QOpcUaNode::browseFinished, this, [this, depth, prefix](QVector refs, QOpcUa::UaStatusCode status){ if (status != QOpcUa::UaStatusCode::Good) { qDebug() << "浏览失败,状态码:" << status; return; } if (refs.isEmpty()) { return; } for (const auto &ref : refs) { QString childId = ref.targetNodeId().nodeId(); doBrowse(childId, depth + 1, "└─ "); } }, Qt::QueuedConnection); node->browseChildren(QOpcUa::ReferenceTypeId::HierarchicalReferences, QOpcUa::NodeClass::Undefined); } void DataCenter::startCyclicRead(int intervalMs) { if (gOPC_NodeList.isEmpty()) { qWarning()<<"没有需要监控的节点,请先添加节点"; return; } int actualInterval = qMax(intervalMs, 20); // 最小间隔10ms QMetaObject::invokeMethod(m_worker, "startCyclicTimer", Qt::QueuedConnection, Q_ARG(int, actualInterval)); m_currentReadIndex = 0; } void DataCenter::stopCyclicRead() { QMetaObject::invokeMethod(m_worker, "stopCyclicTimer", Qt::QueuedConnection); } void DataCenter::refreshAllNodes() { if (gOPC_NodeList.isEmpty()) return; m_currentReadIndex = 0; // 检查定时器状态并启动 QMetaObject::invokeMethod(m_worker, [this]() { if (!m_worker->m_cyclicTimer.isActive()) { m_worker->startCyclicTimer(10); // 使用默认间隔 } }, Qt::QueuedConnection); onTimerTimeout(); } QVariant DataCenter::getNodeValue(const QString &nodeId) const { return gOPC_NodeValue.value(nodeId, QVariant()); } QVariant DataCenter::getNodeValueByName(const QString &nodeName) const { for (auto it = gOPC_NodeName.constBegin(); it != gOPC_NodeName.constEnd(); ++it) { if (it.value() == nodeName) { return gOPC_NodeValue.value(it.key(), QVariant()); } } return QVariant(); } void DataCenter::onTimerTimeout() { if (gOPC_NodeList.isEmpty()) return; // 准备批量读取的节点 QList batchNodes; int remaining = gOPC_NodeList.size() - m_currentReadIndex; int takeCount = qMin(m_batchSize, remaining); for (int i = 0; i < takeCount; ++i) { batchNodes.append(gOPC_NodeList[m_currentReadIndex + i]); } m_currentReadIndex += takeCount; // 循环索引处理 if (m_currentReadIndex >= gOPC_NodeList.size()) { m_currentReadIndex = 0; } if (!batchNodes.isEmpty()) { QMetaObject::invokeMethod(m_opcManager, "readNodesValue", Qt::QueuedConnection, Q_ARG(QList, batchNodes)); } }