package com.qianwen.mdc.service.opcua; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.nio.file.Files; import java.nio.file.Path; import org.eclipse.milo.opcua.sdk.client.OpcUaClient; import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider; import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem.ValueConsumer; import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem; import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem.DataValueListener; import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedSubscription; import org.eclipse.milo.opcua.stack.core.AttributeId; import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger; import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort; import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode; import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn; import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest; import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters; import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyOpcuaClilent { /** * namespace */ private int ns = 3; private static AtomicInteger atomic = new AtomicInteger(1); private OpcUaClient client; // 批量订阅namespaceIndex默认为2 private int batchNamespaceIndex = 2; // 批量订阅时的identifiers private List batchIdentifiers; private ValueConsumer valueConsumer; DataValueListener valueListener; public ValueConsumer getValueConsumer() { return valueConsumer; } public void setValueConsumer(ValueConsumer valueConsumer) { this.valueConsumer = valueConsumer; } private static final Logger logger = LoggerFactory.getLogger(MyOpcuaClilent.class); /** * 连接opcua服务器 * @param ip * @param port * @param suffix * @return * @throws Exception */ public OpcUaClient connect(String ip, int port, String suffix) throws Exception { String endPointUrl = "opc.tcp://" + ip + ":" + port + suffix; Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "security"); Files.createDirectories(securityTempDir); if (!Files.exists(securityTempDir)) { throw new RuntimeException("创建 security 失败: " + securityTempDir); } OpcUaClient opcUaClient = OpcUaClient.create(endPointUrl, endpoints -> endpoints.stream() .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri())) .findFirst(), configBuilder -> configBuilder .setApplicationName(LocalizedText.english("eclipse milo opc-ua client")) .setApplicationUri("urn:eclipse:milo:examples:client") //访问方式:匿名 .setIdentityProvider(new AnonymousProvider()) .setRequestTimeout(UInteger.valueOf(5000)) .build() ); opcUaClient.connect().get(); Thread.sleep(2000); // 线程休眠一下再返回对象,给创建过程一个时间。 this.client = opcUaClient; return opcUaClient; } /** * 读取节点数据 * @param client * @param namespaceIndex * @param identifier * @throws Exception */ public Object readValue(int ns, int identifier) throws Exception { //节点 NodeId nodeId = new NodeId(ns, identifier); try { //读取节点数据 DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get(); Object val = value.getValue().getValue(); // 状态 System.out.println("Status: " + value.getStatusCode()); if (val instanceof UShort) { val = ((UShort) val).intValue(); } else if (val instanceof Boolean) { if (val.equals(true)) { val = 1; } else { val = 0; } } return val; //标识符 //String id = String.valueOf(nodeId.getIdentifier()); //System.out.println(id + ": " + value.getValue().getValue()); }catch(Exception e) { logger.info("读取节点数据异常 identifier: {}", identifier,e); return null; } } public void readNodeValue(OpcUaClient client, NodeId nodeId) throws Exception { //读取节点数据 DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get(); // 状态 System.out.println("Status: " + value.getStatusCode()); //标识符 String id = String.valueOf(nodeId.getIdentifier()); System.out.println(id + ": " + value.getValue().getValue()); } /** * 订阅 * @param identifier */ public void subscribe(String identifier) { try { client .getSubscriptionManager() .createSubscription(1000.0) .thenAccept(t -> { NodeId nodeId = new NodeId(this.ns, identifier); ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, null); //创建监控的参数 MonitoringParameters parameters = new MonitoringParameters(UInteger.valueOf(atomic.getAndIncrement()), 1000.0, null, UInteger.valueOf(10), true); //创建监控项请求 //该请求最后用于创建订阅。 MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters); List requests = new ArrayList<>(); requests.add(request); //创建监控项,并且注册变量值改变时候的回调函数。 t.createMonitoredItems( TimestampsToReturn.Both, requests, (item, id) -> item.setValueConsumer(valueConsumer) ); }).get(); //持续订阅 Thread.sleep(Long.MAX_VALUE); }catch(Exception e) { } } /** * 批量订阅 * * @param client * @throws Exception */ public void subscribeBatch(OpcUaClient client) throws Exception { final CountDownLatch eventLatch = new CountDownLatch(1); //处理订阅业务 handlerMultipleNode(); //持续监听 eventLatch.await(); } /** * 处理订阅业务 * * @param client OPC UA客户端 */ private void handlerMultipleNode() { try { //创建订阅 ManagedSubscription subscription = ManagedSubscription.create(client); List nodeIdList = new ArrayList<>(); for (String id : batchIdentifiers) { nodeIdList.add(new NodeId(batchNamespaceIndex, id)); } //监听 List dataItemList = subscription.createDataItems(nodeIdList); for (ManagedDataItem managedDataItem : dataItemList) { DataValueListener l; managedDataItem.addDataValueListener(valueListener); /* managedDataItem.addDataValueListener((t) -> { System.out.println(managedDataItem.getNodeId().getIdentifier().toString() + ":" + t.getValue().getValue().toString()); }); */ } } catch (Exception e) { e.printStackTrace(); } } public int getBatchNamespaceIndex() { return batchNamespaceIndex; } public void setBatchNamespaceIndex(int batchNamespaceIndex) { this.batchNamespaceIndex = batchNamespaceIndex; } public List getBatchIdentifiers() { return batchIdentifiers; } public void setBatchIdentifiers(List batchIdentifiers) { this.batchIdentifiers = batchIdentifiers; } public void setValueListener(DataValueListener valueListener) { this.valueListener = valueListener; } }