Kaspersky Industrial CyberSecurity for Networks

Subscribing to notifications about tag values over the WebSocket protocol

March 22, 2024

ID 212101

When using the Kaspersky Industrial CyberSecurity for Networks API, a recipient app can create a subscription to notifications regarding modified values of a specific tag. The WebSocket protocol is used for creating a subscription and receiving notifications.

A subscription for a recipient app consists of the following steps:

  1. The recipient app establishes a connection with the Kaspersky Industrial CyberSecurity for Networks Server through the connector for this application using the REST API server.

    After successfully connecting to the Server, the connector receives an authentication token. The connector uses the authentication token for all subsequent interactions with the Server in this session (specifically, for requesting its configuration from the Server).

  2. The recipient app uses WebSocket to connect and sends a request to create a subscription to notifications regarding the received values of a relevant tag.

    The Kaspersky Industrial CyberSecurity for Networks Server receives the request and creates the subscription. A request is sent by using the appropriate functions provided by the WebSocket protocol.

  3. Kaspersky Industrial CyberSecurity for Networks detects a new tag value in traffic when reading or writing a tag.
  4. Kaspersky Industrial CyberSecurity for Networks sends the obtained tag value to the recipient app that has an active subscription to notifications regarding the values of this tag.

Main features of a subscription:

  • After the recipient app indicates the relevant tags of Kaspersky Industrial CyberSecurity for Networks, the Server sends confirmation regarding the capability to obtain the values of those tags. The recipient app then waits to receive the values of those tags over the established connection.
  • Creation and maintenance of a subscription relies on the WebSocket protocol and a connection at the same address that is used by the REST API server.
  • A Kaspersky Industrial CyberSecurity for Networks Server supports no more than one active subscription for tag values. If an active subscription was already created and is being used, and you attempt to create another subscription, you will see an error regarding an excessive number of connections.
  • The recipient app has the capability to close an established connection for a subscription at any time to stop receiving tag values.
  • A subscription is stopped if it is intentionally closed by the recipient app or if the connection is disrupted. If the Kaspersky Industrial CyberSecurity for Networks Server was temporarily unavailable (disconnected) and tag values were not forwarded for the subscription, the recipient app must re-subscribe to the tag values after the connection is restored.

Connecting with WebSocket

To receive tags by subscription, you can use the standard functions of WebSocket as well as the SignalR Core library. Packages for working with the SignalR Core library are available for the most common programming languages: C++, C#, Java, Python, Go, and JavaScript/TypeScript.

To connect using WebSocket, you need to specify the following address:
<publicApi address from the communication data package>/kics4net/api/v4/tag-values

However, the protocol indicated in the address string depends on the functionality utilized for the connection.

If the SignalR Core library is being used, the address string begins with https://. For example:
https://kics-server:8080/kics4net/api/v4/tag-values

If the standard functions of WebSocket are being used, you need to replace https with wss in the address string. For example:
wss://kics-server:8080/kics4net/api/v4/tag-values

If an authentication token is not provided when connecting (or the provided token has not passed verification), the server returns code 401 when responding to a request to open the connection.

Creating a subscription for tag values

To create a subscription, you must make a request with the GetTagValuesStream method name.

Example request argument:

{

"tagIdentifiers": [

{ "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" },

{ "tagName": "Asdu_1_object_1003", "assetName": "Asset 079" }

],

"streamConfig": {

"samplingRateHz": 1

}

}

A request argument consists of the following fields:

  • tagIdentifiers – array of IDs of tags whose values need to be received for the subscription.
  • assetName, tagName – values representing the device name and tag name (used to identify the tag whose values are needed for the subscription).
  • samplingRateHz – tag value sampling rate (used to reduce the volume of transmitted data). If a null value is defined for the field, sampling is not performed.

If a subscription creation argument does not satisfy the requirements of the fields, an error is returned with a description of the problem.

Example error for a subscription creation argument:

HubException: GetTagValuesStreamRequest has validation errors:

TagIdentifiers:

The TagName field is required.

The StreamConfig field is required.

Confirming a subscription

When confirming a subscription, the server returns a confirmation result for each tag that matches a tagIdentifiers value in the request.

Example subscription confirmation:

{

"confirmation": {

"result": "ok",

"tagIdentifier": { "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" },

"tagId": 102

}

}

A response containing a subscription confirmation consists of the following fields:

  • result – status of the tag value subscription. Possible values:
    • ok – subscription was successfully created.
    • notFound – a tag with the specified assetName or tagName was not found.
  • tagIdentifier – tag ID equivalent to one value from the tagIdentifiers array of arguments of a subscription creation request.
  • tagId – unique ID of a tag in the application. This can be used to receive information about a tag through the Kaspersky Industrial CyberSecurity for Networks API, or to identify a tag in a response containing its values.

Tag values by subscription

The application sends tag values by subscription within a fields structure. The following fields are presented at the top level of the structure:

{

"value": {

"tagId": <unique ID of the tag in the application>,

"tagValue": "<JSON object with tag data>"

}

}

Information about a new value of a tag is sent to the recipient app in JSON format. The sent data object contains the following fields:

  • n – tag data type represented by the name from TagStructure.
  • ts – time when the last update of tag values was registered. Indicated in microseconds starting on 01/01/1970.
  • dn – transfer direction. Possible values: r, w, rw.
  • mp – monitoring point ID.
  • d – contents of tag fields.

The d attribute represents a dictionary in which each key is the name of a null-hierarchy tag field. Each field value has the following attributes:

  • t – mandatory attribute indicating one of the following data types:
    • u – UINT64.
    • i – INT64.
    • b – BOOL.
    • d – DOUBLE.
    • s – UTF8 string.
    • t – time in microseconds starting on 01/01/1970.
    • e – ENUM. The field additionally contains the following attributes:
      • n – name of the ENUM type.
      • v – original value of ENUM.
      • s – string value of ENUM.
    • st – structure.
    • un – UNION.
  • v – mandatory attribute indicating the tag field value.
  • n – name of the ENUM type from TagStructure (only for the e type – ENUM).
  • s – string value of ENUM (only for the e type – ENUM).

    Example:

    - enum:

    name: OpType # Name of ENUM type ('n' attribute)

    data:

    0: NUL # 0 is written to the 'v' attribute, NUL is written to the 's' attribute

    1: PULSE_ON

    2: PULSE_OFF

  • x – identifies the main value of the tag.

    Format: "x": 1

    The x attribute is absent from all other fields of the tag.

  • m – special marker of the tag parameter. This corresponds to the marker attribute with the following fields:
    • q – value of the quality attribute.
    • ts – timestamp status displaying its accuracy, temporary state, or reason for an error during verification.
    • ds – data status.
    • o – origin of the value or command.
    • t – time when the tag values were last updated (taken from traffic).
    • ct – cause of transmission.

    Format: "m": "q"

    Example of a forwarded tag value in JSON format:

    {

    "n": "TagStructure1",

    "ts": 18446744073709551616,

    "dn": "r",

    "mp": 1,

    "d":

    {

    "value":

    {

    "t": "d",

    "v": 3.1415,

    "x": 1

    },

    "quality":

    {

    "t": "s",

    "v": "good",

    "m": "q"

    },

    "mask":

    {

    "t": "u",

    "v": 18446744073709551616

    },

    "enumfield":

    {

    "t": "e",

    "n": "SwitchState",

    "v": 0,

    "s": "Off"

    },

    "strucfield":

    {

    "t": "st",

    "v":

    {

    "v1":

    {

    "t": "d",

    "v": 3.1415

    },

    "q2":

    {

    "t": "s",

    "v": "good",

    "m": "q"

    }

    }

    },

    "unionfield":

    {

    "t": "un",

    "v":

    {

    "_":

    {

    "t": "u",

    "v": 42

    },

    "low4bits":

    {

    "t": "u",

    "v": 10

    },

    "high4bits":

    {

    "t": "u",

    "v": 2

    }

    }

    }

    }

    }

Examples of receiving tag values by subscription

Below is an example of receiving tag values by subscription using standard WebSocket functions in Python.

You must first run the following command:
pip install websocket_client

Example subscription using standard WebSocket functions:

import json, ssl, websocket

def on_message(ws, message):

print(message)

def on_error(ws, error):

print(f' error: {error}')

def on_close(ws):

print("### closed ###")

def on_open(ws):

print("connection opened and handshake received ready to send messages")

# all sent messages must end with this character

message_separator = chr(30)

# setting up json as messages format

protocol_selection_args = {

'protocol': 'json',

'version': 1

}

ws.send(json.dumps(protocol_selection_args) + message_separator)

# creating subscription

args = {

'arguments': [

{

'tagIdentifiers': [

{

'tagName': 'tag_01',

'assetName': 'asset_02'

}

],

'streamConfig': {

'samplingRateHz': 5

}

}

],

'invocationId': '0', # will be included in response message

'target': 'getTagValuesStream',

'type': 4 # must be equal to 4 for outgoing messages

}

ws.send(json.dumps(args) + message_separator)

def login():

token = "you should get access token for API here"

return token

if __name__ == "__main__":

server_url = "wss://localhost:8091/kics4net/api/tag-values"

auth = "Authorization: Bearer " + login()

# for troubleshooting uncomment next line

# websocket.enableTrace(True)

ws = websocket.WebSocketApp(server_url,

on_message=on_message,

on_error=on_error,

on_close=on_close,

header=[auth])

print(f'opening connection to {server_url}')

ws.on_open = on_open

ws.run_forever(

# use it only if Server has self-signed certificate

sslopt={"cert_reqs": ssl.CERT_NONE}

)

Below is an example of receiving tag values by subscription using the SignalR Core library in Python.

You must first run the following command:
pip install signalrcore

Example subscription using the SignalR Core library:

import logging

from signalrcore.hub_connection_builder import HubConnectionBuilder

TOKEN = 'you should get access token for API here'

IP = '192.168.0.7'

PORT = '8080'

HUB = 'kics4net/api/v4/tag-values'

class WebsocketConnection(HubConnectionBuilder):

def __init__(self, url: str = None, options: dict = None, verify_ssl: bool = False):

super().__init__()

self.with_url(url, options=options)

self.configure_logging(logging.WARNING)

self.with_automatic_reconnect({

"type": "raw",

"keep_alive_interval": 10,

"reconnect_interval": 5,

"max_attempts": 5

})

self.verify_ssl = verify_ssl

def on_tag_stream_value(self, m):

result.append(m)

print(f'on_new_tag_value, {m}')

def on_tag_strean_error(self, e):

print(f'onError, {e}')

def on_tag_stream_complete(self, q):

print(f'onComplete, {q}')

def subscribe_tags(self):

print("connection opened and handshake received ready to send messages")

args = {

'tagIdentifiers': [

{

'tagName': 'tag_01',

'assetName': 'asset_02'}

],

'streamConfig': {

'samplingRateHz': 5

}

}

self.stream("GetTagValuesStream", [args]) \

.subscribe({

"next": self.on_tag_stream_value,

"complete": self.on_tag_stream_complete,

"error": self.on_tag_strean_error

})

def main():

server_url = "https://{}:{}/{}".format(IP, PORT, HUB)

login = 'bearer {}'.format(TOKEN)

conn = WebsocketConnection(url=server_url, options={"headers": {"authorization": login}})

conn.build()

logging.info(f'opening connection to {server_url}')

conn.on_open(conn.subscribe_tags)

conn.start()

logging.info('closing connection')

conn.stop()

if __name__ == '__main__':

main()

Did you find this article helpful?
What can we do better?
Thank you for your feedback! You're helping us improve.
Thank you for your feedback! You're helping us improve.