Подписка на уведомления о значениях тега по протоколу WebSocket

При использовании Kaspersky Industrial CyberSecurity for Networks API стороннее приложение может создавать подписку на уведомления об изменении значений определенного тега. Для создания подписки и получения уведомлений используется протокол WebSocket.

Сценарий подписки для стороннего приложения состоит из следующих этапов:

  1. Стороннее приложение устанавливает соединение с Сервером Kaspersky Industrial CyberSecurity for Networks через коннектор для этого приложения с использованием сервера REST API.

    После успешного соединения с Сервером коннектору отправляется токен аутентификации. Коннектор использует токен аутентификации для всех последующих взаимодействий с Сервером в этом сеансе (в частности для запроса своей конфигурации с Сервера).

  2. Стороннее приложение подключается с использованием WebSocket и отправляет запрос для создания подписки на уведомления о получении значений нужного тега.

    Сервер Kaspersky Industrial CyberSecurity for Networks принимает запрос и создает подписку. Для отправки запроса используются соответствующие функции, предусмотренные протоколом WebSocket.

  3. Kaspersky Industrial CyberSecurity for Networks обнаруживает в трафике новое значение при чтении или записи тега.
  4. Kaspersky Industrial CyberSecurity for Networks отправляет полученное значение тега стороннему приложению, для которого действует подписка на уведомления о получении значений этого тега.

Основные особенности реализации подписки:

Подключение с использованием WebSocket

Для получения тегов по подписке могут использоваться как стандартные функции WebSocket, так и библиотека SignalR Core. Пакеты для работы с библиотекой SignalR Core доступны для наиболее распространенных языков программирования: С++, С#, Java, Python, Go, JavaScript/TypeScript.

Для подключения с использованием WebSocket вам нужно указывать следующий адрес:
<адрес publicApi из файла свертки>/kics4net/api/v4/tag-values

При этом указываемый протокол в строке адреса зависит от используемой функциональности для подключения.

Если используется библиотека SignalR Core, строка адреса начинается с https://. Например:
https://kics-server:8080/kics4net/api/v4/tag-values

Если используются стандартные функции WebSocket, в строке адреса нужно заменить https на wss. Например:
wss://kics-server:8080/kics4net/api/v4/tag-values

Если при подключении не предоставлен токен аутентификации (или предоставленный токен не прошел проверку), в ответ на открытие подключения сервер возвращает код 401.

Создание подписки на значения тегов

Для создания подписки требуется выполнить запрос с именем метода GetTagValuesStream.

Пример аргумента запроса:

{

"tagIdentifiers": [

{ "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" },

{ "tagName": "Asdu_1_object_1003", "assetName": "Asset 079" }

],

"streamConfig": {

"samplingRateHz": 1

}

}

Аргумент запроса состоит из следующих полей:

Если аргумент создания подписки не удовлетворяет требованиям к полям, возвращается ошибка с описанием проблемы.

Пример ошибки для аргумента создания подписки:

HubException: GetTagValuesStreamRequest has validation errors:

TagIdentifiers:

The TagName field is required.

The StreamConfig field is required.

Подтверждение подписки

При подтверждении подписки сервер возвращает по одному результату подтверждения для каждого тега, подходящего под значения tagIdentifiers в запросе.

Пример подтверждения подписки:

{

"confirmation": {

"result": "ok",

"tagIdentifier": { "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" },

"tagId": 102

}

}

Ответ с подтверждением подписки состоит из следующих полей:

Значения тегов по подписке

Программа отправляет значения тегов по подписке в структуре полей. На верхнем уровне в структуре представлены следующие поля:

{

    "value": {

        "tagId": <уникальный идентификатор тега в программе>,

        "tagValue": "<JSON-объект с данными тега>"

    }

}

Информация о новом значении тега отправляется в стороннее приложение в формате JSON. Отправляемый объект с данными содержит следующие поля:

Атрибут d представляет словарь, в котором каждый ключ является именем поля тега нулевой иерархии. Значение каждого поля имеет следующие атрибуты:

Примеры получения значений тегов по подписке

Ниже представлен пример получения значений тегов по подписке с использованием стандартных функций WebSocket на языке Python.

Предварительно требуется выполнить команду:
pip install websocket_client

Пример подписки с использованием стандартных функций WebSocket:

import json, ssl, websocket

def on_message(ws, message):

print(message)

def on_error(ws, error):

print(f' error: {error}')

def on_close(ws):

print("### closed ###")

def on_open(ws):

print("connection opened and handshake received ready to send messages")

# all sent messages must end with this character

message_separator = chr(30)

# setting up json as messages format

protocol_selection_args = {

'protocol': 'json',

'version': 1

}

ws.send(json.dumps(protocol_selection_args) + message_separator)

# creating subscription

args = {

'arguments': [

{

'tagIdentifiers': [

{

'tagName': 'tag_01',

'assetName': 'asset_02'

}

],

'streamConfig': {

'samplingRateHz': 5

}

}

],

'invocationId': '0', # will be included in response message

'target': 'getTagValuesStream',

'type': 4 # must be equal to 4 for outgoing messages

}

ws.send(json.dumps(args) + message_separator)

def login():

token = "you should get access token for API here"

return token

if __name__ == "__main__":

server_url = "wss://localhost:8091/kics4net/api/tag-values"

auth = "Authorization: Bearer " + login()

# for troubleshooting uncomment next line

# websocket.enableTrace(True)

ws = websocket.WebSocketApp(server_url,

on_message=on_message,

on_error=on_error,

on_close=on_close,

header=[auth])

print(f'opening connection to {server_url}')

ws.on_open = on_open

ws.run_forever(

# use it only if Server has self-signed certificate

sslopt={"cert_reqs": ssl.CERT_NONE}

)

Ниже представлен пример получения значений тегов по подписке с использованием библиотеки SignalR Core на языке Python.

Предварительно требуется выполнить команду:
pip install signalrcore

Пример подписки с использованием библиотеки SignalR Core:

import logging

from signalrcore.hub_connection_builder import HubConnectionBuilder

TOKEN = 'you should get access token for API here’

IP = '192.168.0.7'

PORT = '8080'

HUB = 'kics4net/api/v4/tag-values'

class WebsocketConnection(HubConnectionBuilder):

def __init__(self, url: str = None, options: dict = None, verify_ssl: bool = False):

super().__init__()

self.with_url(url, options=options)

self.configure_logging(logging.WARNING)

self.with_automatic_reconnect({

"type": "raw",

"keep_alive_interval": 10,

"reconnect_interval": 5,

"max_attempts": 5

})

self.verify_ssl = verify_ssl

def on_tag_stream_value(self, m):

result.append(m)

print(f'on_new_tag_value, {m}')

def on_tag_strean_error(self, e):

print(f'onError, {e}')

def on_tag_stream_complete(self, q):

print(f'onComplete, {q}')

def subscribe_tags(self):

print("connection opened and handshake received ready to send messages")

args = {

'tagIdentifiers': [

{

'tagName': 'tag_01',

'assetName': 'asset_02'}

],

'streamConfig': {

'samplingRateHz': 5

}

}

self.stream("GetTagValuesStream", [args]) \

.subscribe({

"next": self.on_tag_stream_value,

"complete": self.on_tag_stream_complete,

"error": self.on_tag_strean_error

})

def main():

server_url = "https://{}:{}/{}".format(IP, PORT, HUB)

login = 'bearer {}'.format(TOKEN)

conn = WebsocketConnection(url=server_url, options={"headers": {"authorization": login}})

conn.build()

logging.info(f'opening connection to {server_url}')

conn.on_open(conn.subscribe_tags)

conn.start()

logging.info('closing connection')

conn.stop()

if __name__ == '__main__':

main()

В начало