Subscribing to notifications about tag values over the WebSocket protocol

When using the Kaspersky Industrial CyberSecurity for Networks API, a recipient app can create a subscription to notifications regarding modified values of a specific tag. The WebSocket protocol is used for creating a subscription and receiving notifications.

A subscription for a recipient app consists of the following steps:

  1. The recipient app establishes a connection with the Kaspersky Industrial CyberSecurity for Networks Server through the connector for this application using the REST API server.

    After successfully connecting to the Server, the connector receives an authentication token. The connector uses the authentication token for all subsequent interactions with the Server in this session (specifically, for requesting its configuration from the Server).

  2. The recipient app uses WebSocket to connect and sends a request to create a subscription to notifications regarding the received values of a relevant tag.

    The Kaspersky Industrial CyberSecurity for Networks Server receives the request and creates the subscription. A request is sent by using the appropriate functions provided by the WebSocket protocol.

  3. Kaspersky Industrial CyberSecurity for Networks detects a new tag value in traffic when reading or writing a tag.
  4. Kaspersky Industrial CyberSecurity for Networks sends the obtained tag value to the recipient app that has an active subscription to notifications regarding the values of this tag.

Main features of a subscription:

Connecting with WebSocket

To receive tags by subscription, you can use the standard functions of WebSocket as well as the SignalR Core library. Packages for working with the SignalR Core library are available for the most common programming languages: C++, C#, Java, Python, Go, and JavaScript/TypeScript.

To connect using WebSocket, you need to specify the following address:
<publicApi address from the communication data package>/kics4net/api/v4/tag-values

However, the protocol indicated in the address string depends on the functionality utilized for the connection.

If the SignalR Core library is being used, the address string begins with https://. For example:
https://kics-server:8080/kics4net/api/v4/tag-values

If the standard functions of WebSocket are being used, you need to replace https with wss in the address string. For example:
wss://kics-server:8080/kics4net/api/v4/tag-values

If an authentication token is not provided when connecting (or the provided token has not passed verification), the server returns code 401 when responding to a request to open the connection.

Creating a subscription for tag values

To create a subscription, you must make a request with the GetTagValuesStream method name.

Example request argument:

{

"tagIdentifiers": [

{ "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" },

{ "tagName": "Asdu_1_object_1003", "assetName": "Asset 079" }

],

"streamConfig": {

"samplingRateHz": 1

}

}

A request argument consists of the following fields:

If a subscription creation argument does not satisfy the requirements of the fields, an error is returned with a description of the problem.

Example error for a subscription creation argument:

HubException: GetTagValuesStreamRequest has validation errors:

TagIdentifiers:

The TagName field is required.

The StreamConfig field is required.

Confirming a subscription

When confirming a subscription, the server returns a confirmation result for each tag that matches a tagIdentifiers value in the request.

Example subscription confirmation:

{

"confirmation": {

"result": "ok",

"tagIdentifier": { "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" },

"tagId": 102

}

}

A response containing a subscription confirmation consists of the following fields:

Tag values by subscription

The application sends tag values by subscription within a fields structure. The following fields are presented at the top level of the structure:

{

"value": {

"tagId": <unique ID of the tag in the application>,

"tagValue": "<JSON object with tag data>"

}

}

Information about a new value of a tag is sent to the recipient app in JSON format. The sent data object contains the following fields:

The d attribute represents a dictionary in which each key is the name of a null-hierarchy tag field. Each field value has the following attributes:

Examples of receiving tag values by subscription

Below is an example of receiving tag values by subscription using standard WebSocket functions in Python.

You must first run the following command:
pip install websocket_client

Example subscription using standard WebSocket functions:

import json, ssl, websocket

def on_message(ws, message):

print(message)

def on_error(ws, error):

print(f' error: {error}')

def on_close(ws):

print("### closed ###")

def on_open(ws):

print("connection opened and handshake received ready to send messages")

# all sent messages must end with this character

message_separator = chr(30)

# setting up json as messages format

protocol_selection_args = {

'protocol': 'json',

'version': 1

}

ws.send(json.dumps(protocol_selection_args) + message_separator)

# creating subscription

args = {

'arguments': [

{

'tagIdentifiers': [

{

'tagName': 'tag_01',

'assetName': 'asset_02'

}

],

'streamConfig': {

'samplingRateHz': 5

}

}

],

'invocationId': '0', # will be included in response message

'target': 'getTagValuesStream',

'type': 4 # must be equal to 4 for outgoing messages

}

ws.send(json.dumps(args) + message_separator)

def login():

token = "you should get access token for API here"

return token

if __name__ == "__main__":

server_url = "wss://localhost:8091/kics4net/api/tag-values"

auth = "Authorization: Bearer " + login()

# for troubleshooting uncomment next line

# websocket.enableTrace(True)

ws = websocket.WebSocketApp(server_url,

on_message=on_message,

on_error=on_error,

on_close=on_close,

header=[auth])

print(f'opening connection to {server_url}')

ws.on_open = on_open

ws.run_forever(

# use it only if Server has self-signed certificate

sslopt={"cert_reqs": ssl.CERT_NONE}

)

Below is an example of receiving tag values by subscription using the SignalR Core library in Python.

You must first run the following command:
pip install signalrcore

Example subscription using the SignalR Core library:

import logging

from signalrcore.hub_connection_builder import HubConnectionBuilder

TOKEN = 'you should get access token for API here'

IP = '192.168.0.7'

PORT = '8080'

HUB = 'kics4net/api/v4/tag-values'

class WebsocketConnection(HubConnectionBuilder):

def __init__(self, url: str = None, options: dict = None, verify_ssl: bool = False):

super().__init__()

self.with_url(url, options=options)

self.configure_logging(logging.WARNING)

self.with_automatic_reconnect({

"type": "raw",

"keep_alive_interval": 10,

"reconnect_interval": 5,

"max_attempts": 5

})

self.verify_ssl = verify_ssl

def on_tag_stream_value(self, m):

result.append(m)

print(f'on_new_tag_value, {m}')

def on_tag_strean_error(self, e):

print(f'onError, {e}')

def on_tag_stream_complete(self, q):

print(f'onComplete, {q}')

def subscribe_tags(self):

print("connection opened and handshake received ready to send messages")

args = {

'tagIdentifiers': [

{

'tagName': 'tag_01',

'assetName': 'asset_02'}

],

'streamConfig': {

'samplingRateHz': 5

}

}

self.stream("GetTagValuesStream", [args]) \

.subscribe({

"next": self.on_tag_stream_value,

"complete": self.on_tag_stream_complete,

"error": self.on_tag_strean_error

})

def main():

server_url = "https://{}:{}/{}".format(IP, PORT, HUB)

login = 'bearer {}'.format(TOKEN)

conn = WebsocketConnection(url=server_url, options={"headers": {"authorization": login}})

conn.build()

logging.info(f'opening connection to {server_url}')

conn.on_open(conn.subscribe_tags)

conn.start()

logging.info('closing connection')

conn.stop()

if __name__ == '__main__':

main()

Page top