通过 WebSocket 协议订阅有关标记值的通知

当使用 Kaspersky Industrial CyberSecurity for Networks API 时,接收应用程序可以创建有关特定标签修改值的通知订阅。WebSocket 协议用于创建订阅和接收通知。

接收应用程序的订阅包括以下步骤:

  1. 接收应用程序使用 REST API 服务器通过此应用程序的连接器与 Kaspersky Industrial CyberSecurity for Networks 服务器建立连接。

    成功连接到服务器后,连接器会收到一个身份验证令牌。连接器将使用身份验证令牌在该会话中与服务器进行所有后续交互(具体来说,就是从服务器请求其配置)。

  2. 接收应用程序使用 WebSocket 进行连接并发送请求以创建有关相关标签接收值的通知订阅。

    Kaspersky Industrial CyberSecurity for Networks 服务器接收请求并创建订阅。使用 WebSocket 协议提供的适当功能发送请求。

  3. Kaspersky Industrial CyberSecurity for Networks 在读取或写入标签时会检测到流量中的新标签值。
  4. Kaspersky Industrial CyberSecurity for Networks 将获取的标签值发送给已有效订阅该标签值相关通知的接收应用程序。

订阅的主要功能:

使用 WebSocket 连接

要通过订阅接收标签,您可以使用 WebSocket 的标准功能以及 SignalR Core 库。使用 SignalR Core 库的软件包适用于最常见的编程语言:C++、C#、Java、Python、Go 和 JavaScript/TypeScript。

要使用 WebSocket 连接,您需要指定以下地址:
<通信数据包的 publicApi 地址>/kics4net/api/v4/tag-values

但是,地址字符串中指示的协议取决于用于连接的功能。

如果使用 SignalR Core 库,地址字符串以 https:// 开头。例如:
https://kics-server:8080/kics4net/api/v4/tag-values

如果使用 WebSocket 标准功能,则需要将地址字符串中的 https 替换为 wss。例如:
wss://kics-server:8080/kics4net/api/v4/tag-values

如果连接时未提供身份验证令牌(或者提供的令牌未通过验证),则服务器在响应打开连接的请求时返回代码 401。

创建标签值订阅

要创建订阅,您必须使用 GetTagValuesStream 方法名称发出请求。

请求参数示例:

{

"tagIdentifiers": [

{ "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" },

{ "tagName": "Asdu_1_object_1003", "assetName": "Asset 079" }

],

"streamConfig": {

"samplingRateHz": 1

}

}

请求参数由以下字段组成:

如果订阅创建参数不满足字段的要求,则会返回错误并附上问题描述。

订阅创建参数的错误示例:

HubException: GetTagValuesStreamRequest 有验证错误:

TagIdentifiers:

TagName 字段是必填的。

StreamConfig 字段是必填的。

确认订阅

在确认订阅时,服务器会针对请求中与 tagIdentifiers 值匹配的每个标签返回确认结果。

订阅确认示例:

{

"confirmation": {

"result": "ok",

"tagIdentifier": { "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" },

"tagId": 102

}

}

包含订阅确认的响应包含以下字段:

按订阅的标签值

应用程序按字段结构内的订阅发送标签值。以下字段显示在结构的顶层:

{

"value": {

"tagId": <应用程序中标签的唯一ID>,

"tagValue": "<带有标签数据的 JSON 对象>"

}

}

有关标签新值的信息以 JSON 格式发送到接收应用程序。发送的数据对象包含以下字段:

d 属性代表一个字典,其中每个键都是空层次标签字段的名称。每个字段值均具有以下属性:

通过订阅接收标签值的示例

下面是使用 Python 中的标准 WebSocket 功能通过订阅接收标签值的示例。

您必须首先运行以下命令:
pip install websocket_client

使用标准 WebSocket 功能的订阅示例:

import json, ssl, websocket

def on_message(ws, message):

print(message)

def on_error(ws, error):

print(f' 错误: {error}')

def on_close(ws):

print("### 已关闭 ###")

def on_open(ws):

print("连接已打开,握手已接收,准备发送消息")

# 所有发送的消息必须以此字符结尾

message_separator = chr(30)

# 设置 json 作为消息格式

protocol_selection_args = {

'protocol': 'json',

'version': 1

}

ws.send(json.dumps(protocol_selection_args) + message_separator)

# 创建订阅

args = {

'arguments': [

{

'tagIdentifiers': [

{

'tagName': 'tag_01',

'assetName': 'asset_02'

}

],

'streamConfig': {

'samplingRateHz': 5

}

}

],

'invocationId': '0', # 将包含在响应消息中

'target': 'getTagValuesStream',

'type': 4 # 对于传出消息,必须等于 4

}

ws.send(json.dumps(args) + message_separator)

def login():

token = "您应该在这里获取 API 的访问令牌"

return token

if __name__ == "__main__":

server_url = "wss://localhost:8091/kics4net/api/tag-values"

auth = "Authorization: Bearer " + login()

# 用于故障排除,取消注释下一行

# websocket.enableTrace(True)

ws = websocket.WebSocketApp(server_url,

on_message=on_message,

on_error=on_error,

on_close=on_close,

header=[auth])

print(f'打开到 {server_url} 的连接')

ws.on_open = on_open

ws.run_forever(

# 仅当服务器具有自签名证书时才使用它

sslopt={"cert_reqs": ssl.CERT_NONE}

)

下面是使用 Python 中的 SignalR Core 库通过订阅接收标签值的示例。

您必须首先运行以下命令:
pip install signalrcore

使用 SignalR Core 库的订阅示例:

import logging

from signalrcore.hub_connection_builder import HubConnectionBuilder

TOKEN = '您应该在这里获取 API 的访问令牌'

IP = '192.168.0.7'

PORT = '8080'

HUB = 'kics4net/api/v4/tag-values'

class WebsocketConnection(HubConnectionBuilder):

def __init__(self, url: str = None, options: dict = None, verify_ssl: bool = False):

super().__init__()

self.with_url(url, options=options)

self.configure_logging(logging.WARNING)

self.with_automatic_reconnect({

"type": "raw",

"keep_alive_interval": 10,

"reconnect_interval": 5,

"max_attempts": 5

})

self.verify_ssl = verify_ssl

def on_tag_stream_value(self, m):

result.append(m)

print(f'on_new_tag_value, {m}')

def on_tag_strean_error(self, e):

print(f'onError, {e}')

def on_tag_stream_complete(self, q):

print(f'onComplete, {q}')

def subscribe_tags(self):

print("连接已打开,握手已接收,准备发送消息")

args = {

'tagIdentifiers': [

{

'tagName': 'tag_01',

'assetName': 'asset_02'}

],

'streamConfig': {

'samplingRateHz': 5

}

}

self.stream("GetTagValuesStream", [args]) \

.subscribe({

"next": self.on_tag_stream_value,

"complete": self.on_tag_stream_complete,

"error": self.on_tag_strean_error

})

def main():

server_url = "https://{}:{}/{}".format(IP, PORT, HUB)

login = 'bearer {}'.format(TOKEN)

conn = WebsocketConnection(url=server_url, options={"headers": {"authorization": login}})

conn.build()

logging.info(f'打开到 {server_url} 的连接')

conn.on_open(conn.subscribe_tags)

conn.start()

logging.info('关闭连接')

conn.stop()

if __name__ == '__main__':

main()

页首