当使用 Kaspersky Industrial CyberSecurity for Networks API 时,接收应用程序可以创建有关特定标签修改值的通知订阅。WebSocket 协议用于创建订阅和接收通知。
接收应用程序的订阅包括以下步骤:
成功连接到服务器后,连接器会收到一个身份验证令牌。连接器将使用身份验证令牌在该会话中与服务器进行所有后续交互(具体来说,就是从服务器请求其配置)。
Kaspersky Industrial CyberSecurity for Networks 服务器接收请求并创建订阅。使用 WebSocket 协议提供的适当功能发送请求。
订阅的主要功能:
使用 WebSocket 连接
要通过订阅接收标签,您可以使用 WebSocket 的标准功能以及 SignalR Core 库。使用 SignalR Core 库的软件包适用于最常见的编程语言:C++、C#、Java、Python、Go 和 JavaScript/TypeScript。
要使用 WebSocket 连接,您需要指定以下地址:<通信数据包的 publicApi 地址>/kics4net/api/v4/tag-values
但是,地址字符串中指示的协议取决于用于连接的功能。
如果使用 SignalR Core 库,地址字符串以 https://
开头。例如:https://kics-server:8080/kics4net/api/v4/tag-values
如果使用 WebSocket 标准功能,则需要将地址字符串中的 https
替换为 wss
。例如:wss://kics-server:8080/kics4net/api/v4/tag-values
如果连接时未提供身份验证令牌(或者提供的令牌未通过验证),则服务器在响应打开连接的请求时返回代码 401。
创建标签值订阅
要创建订阅,您必须使用 GetTagValuesStream
方法名称发出请求。
请求参数示例: { "tagIdentifiers": [ { "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" }, { "tagName": "Asdu_1_object_1003", "assetName": "Asset 079" } ], "streamConfig": { "samplingRateHz": 1 } } |
请求参数由以下字段组成:
tagIdentifiers
– 包含订阅所需值的标签 ID 数组。assetName
、tagName
– 代表设备名称和标签名称的值(用于识别包含订阅所需值的标签)。samplingRateHz
– 标签值采样率(用于减少传输数据量)。如果为该字段定义了空值,则不会执行采样。如果订阅创建参数不满足字段的要求,则会返回错误并附上问题描述。
订阅创建参数的错误示例:
|
确认订阅
在确认订阅时,服务器会针对请求中与 tagIdentifiers
值匹配的每个标签返回确认结果。
订阅确认示例: { "confirmation": { "result": "ok", "tagIdentifier": { "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" }, "tagId": 102 } } |
包含订阅确认的响应包含以下字段:
result
– 标签值订阅的状态。可能值:ok
– 订阅已成功创建。notFound
– 未找到具有指定 assetName
或 tagName
的标签。tagIdentifier
– 标签 ID,相当于订阅创建请求的参数 tagIdentifiers
数组中的一个值。tagId
– 应用程序中标签的唯一ID。这可用于通过 Kaspersky Industrial CyberSecurity for Networks API 接收有关标签的信息,或在包含其值的响应中识别标签。按订阅的标签值
应用程序按字段结构内的订阅发送标签值。以下字段显示在结构的顶层:
{
"value": {
"tagId": <
应用程序中标签的唯一ID
>,
"tagValue": "<
带有标签数据的 JSON 对象
>"
}
}
有关标签新值的信息以 JSON 格式发送到接收应用程序。发送的数据对象包含以下字段:
n
– 由 TagStructure
中的名称表示的标签数据类型。ts
– 标签值最后更新注册的时间。以从 1970 年 1 月 1 日开始的微秒表示。dn
– 传输方向。可能的值:r
、w
、rw
。mp
– 监测点 ID。d
– 标签字段的内容。d
属性代表一个字典,其中每个键都是空层次标签字段的名称。每个字段值均具有以下属性:
t
– 强制属性,指示以下数据类型之一:u
– UINT64i
– INT64b
– BOOLd
– DOUBLEs
– UTF8 字符串t
– 从 1970 年 1 月 1 日开始的时间(以微秒为单位)e
– ENUM。该字段还包含以下属性:n
– ENUM 类型的名称st
– 结构un
– UNIONv
– 强制属性,指示标签字段值n
– TagStructure
中的 ENUM 类型的名称(仅适用于 e
类型 – ENUM)s
– ENUM 的字符串值(仅适用于 e
类型 – ENUM)。示例:
- enum:
name: OpType #
ENUM 类型的名称('n' 属性)
data:
0: NUL #
0 写入 'v' 属性,NUL 写入 's' 属性
1: PULSE_ON
2: PULSE_OFF
x
– 标识标签的主值。格式:"x": 1
标签的所有其他字段均不存在 x
属性。
q
– 质量属性的值。ts
– 时间戳状态显示其准确性、临时状态或验证期间出现错误的原因。ds
– 数据状态。o
– 值或命令的来源。t
– 标签值最后更新的时间(从流量中获取)。ct
– 传输原因。格式:"m": "q"
以 JSON 格式转发的标签值示例: { "n": "TagStructure1", "ts": 18446744073709551616, "dn": "r", "mp": 1, "d": { "value": { "t": "d", "v": 3.1415, "x": 1 }, "quality": { "t": "s", "v": "good", "m": "q" }, "mask": { "t": "u", "v": 18446744073709551616 }, "enumfield": { "t": "e", "n": "SwitchState", "v": 0, "s": "Off" }, "strucfield": { "t": "st", "v": { "v1": { "t": "d", "v": 3.1415 }, "q2": { "t": "s", "v": "good", "m": "q" } } }, "unionfield": { "t": "un", "v": { "_": { "t": "u", "v": 42 }, "low4bits": { "t": "u", "v": 10 }, "high4bits": { "t": "u", "v": 2 } } } } } |
通过订阅接收标签值的示例
下面是使用 Python 中的标准 WebSocket 功能通过订阅接收标签值的示例。
您必须首先运行以下命令:pip install websocket_client
使用标准 WebSocket 功能的订阅示例: import json, ssl, websocket
def on_message(ws, message): print(message)
def on_error(ws, error): print(f' 错误: {error}')
def on_close(ws): print("### 已关闭 ###")
def on_open(ws): print("连接已打开,握手已接收,准备发送消息")
# 所有发送的消息必须以此字符结尾 message_separator = chr(30)
# 设置 json 作为消息格式 protocol_selection_args = { 'protocol': 'json', 'version': 1 } ws.send(json.dumps(protocol_selection_args) + message_separator)
# 创建订阅 args = { 'arguments': [ { 'tagIdentifiers': [ { 'tagName': 'tag_01', 'assetName': 'asset_02' } ], 'streamConfig': { 'samplingRateHz': 5 } } ], 'invocationId': '0', # 将包含在响应消息中 'target': 'getTagValuesStream', 'type': 4 # 对于传出消息,必须等于 4 }
ws.send(json.dumps(args) + message_separator)
def login(): token = "您应该在这里获取 API 的访问令牌" return token
if __name__ == "__main__": server_url = "wss://localhost:8091/kics4net/api/tag-values" auth = "Authorization: Bearer " + login()
# 用于故障排除,取消注释下一行 # websocket.enableTrace(True) ws = websocket.WebSocketApp(server_url, on_message=on_message, on_error=on_error, on_close=on_close, header=[auth])
print(f'打开到 {server_url} 的连接') ws.on_open = on_open ws.run_forever( # 仅当服务器具有自签名证书时才使用它 sslopt={"cert_reqs": ssl.CERT_NONE} ) |
下面是使用 Python 中的 SignalR Core 库通过订阅接收标签值的示例。
您必须首先运行以下命令:pip install signalrcore
使用 SignalR Core 库的订阅示例: import logging from signalrcore.hub_connection_builder import HubConnectionBuilder
TOKEN = '您应该在这里获取 API 的访问令牌' IP = '192.168.0.7' PORT = '8080' HUB = 'kics4net/api/v4/tag-values'
class WebsocketConnection(HubConnectionBuilder): def __init__(self, url: str = None, options: dict = None, verify_ssl: bool = False): super().__init__() self.with_url(url, options=options) self.configure_logging(logging.WARNING) self.with_automatic_reconnect({ "type": "raw", "keep_alive_interval": 10, "reconnect_interval": 5, "max_attempts": 5 }) self.verify_ssl = verify_ssl
def on_tag_stream_value(self, m): result.append(m) print(f'on_new_tag_value, {m}')
def on_tag_strean_error(self, e): print(f'onError, {e}')
def on_tag_stream_complete(self, q): print(f'onComplete, {q}')
def subscribe_tags(self): print("连接已打开,握手已接收,准备发送消息")
args = { 'tagIdentifiers': [ { 'tagName': 'tag_01', 'assetName': 'asset_02'} ], 'streamConfig': { 'samplingRateHz': 5 } }
self.stream("GetTagValuesStream", [args]) \ .subscribe({ "next": self.on_tag_stream_value, "complete": self.on_tag_stream_complete, "error": self.on_tag_strean_error })
def main(): server_url = "https://{}:{}/{}".format(IP, PORT, HUB) login = 'bearer {}'.format(TOKEN)
conn = WebsocketConnection(url=server_url, options={"headers": {"authorization": login}}) conn.build()
logging.info(f'打开到 {server_url} 的连接') conn.on_open(conn.subscribe_tags) conn.start()
logging.info('关闭连接') conn.stop()
if __name__ == '__main__': main() |