When using the Kaspersky Industrial CyberSecurity for Networks API, a recipient app can create a subscription to notifications regarding modified values of a specific tag. The WebSocket protocol is used for creating a subscription and receiving notifications.
A subscription for a recipient app consists of the following steps:
After successfully connecting to the Server, the connector receives an authentication token. The connector uses the authentication token for all subsequent interactions with the Server in this session (specifically, for requesting its configuration from the Server).
The Kaspersky Industrial CyberSecurity for Networks Server receives the request and creates the subscription. A request is sent by using the appropriate functions provided by the WebSocket protocol.
Main features of a subscription:
Connecting with WebSocket
To receive tags by subscription, you can use the standard functions of WebSocket as well as the SignalR Core library. Packages for working with the SignalR Core library are available for the most common programming languages: C++, C#, Java, Python, Go, and JavaScript/TypeScript.
To connect using WebSocket, you need to specify the following address:<publicApi address from the communication data package>/kics4net/api/v4/tag-values
However, the protocol indicated in the address string depends on the functionality utilized for the connection.
If the SignalR Core library is being used, the address string begins with https://
. For example: https://kics-server:8080/kics4net/api/v4/tag-values
If the standard functions of WebSocket are being used, you need to replace https
with wss
in the address string. For example: wss://kics-server:8080/kics4net/api/v4/tag-values
If an authentication token is not provided when connecting (or the provided token has not passed verification), the server returns code 401 when responding to a request to open the connection.
Creating a subscription for tag values
To create a subscription, you must make a request with the GetTagValuesStream
method name.
Example request argument: { "tagIdentifiers": [ { "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" }, { "tagName": "Asdu_1_object_1003", "assetName": "Asset 079" } ], "streamConfig": { "samplingRateHz": 1 } } |
A request argument consists of the following fields:
tagIdentifiers
– array of IDs of tags whose values need to be received for the subscription.assetName
, tagName
– values representing the device name and tag name (used to identify the tag whose values are needed for the subscription).samplingRateHz
– tag value sampling rate (used to reduce the volume of transmitted data). If a null value is defined for the field, sampling is not performed.If a subscription creation argument does not satisfy the requirements of the fields, an error is returned with a description of the problem.
Example error for a subscription creation argument:
|
Confirming a subscription
When confirming a subscription, the server returns a confirmation result for each tag that matches a tagIdentifiers
value in the request.
Example subscription confirmation: { "confirmation": { "result": "ok", "tagIdentifier": { "tagName": "Asdu_1_object_1001", "assetName": "Asset 079" }, "tagId": 102 } } |
A response containing a subscription confirmation consists of the following fields:
result
– status of the tag value subscription. Possible values:ok
– subscription was successfully created.notFound
– a tag with the specified assetName
or tagName
was not found.tagIdentifier
– tag ID equivalent to one value from the tagIdentifiers
array of arguments of a subscription creation request.tagId
– unique ID of a tag in the application. This can be used to receive information about a tag through the Kaspersky Industrial CyberSecurity for Networks API, or to identify a tag in a response containing its values.Tag values by subscription
The application sends tag values by subscription within a fields structure. The following fields are presented at the top level of the structure:
{
"value": {
"tagId": <
unique ID of the tag in the application
>,
"tagValue": "<
JSON object with tag data
>"
}
}
Information about a new value of a tag is sent to the recipient app in JSON format. The sent data object contains the following fields:
n
– tag data type represented by the name from TagStructure
.ts
– time when the last update of tag values was registered. Indicated in microseconds starting on 01/01/1970.dn
– transfer direction. Possible values: r
, w
, rw
.mp
– monitoring point ID.d
– contents of tag fields.The d
attribute represents a dictionary in which each key is the name of a null-hierarchy tag field. Each field value has the following attributes:
t
– mandatory attribute indicating one of the following data types:u
– UINT64.i
– INT64.b
– BOOL.d
– DOUBLE.s
– UTF8 string.t
– time in microseconds starting on 01/01/1970.e
– ENUM. The field additionally contains the following attributes:n
– name of the ENUM type.st
– structure.un
– UNION.v
– mandatory attribute indicating the tag field value.n
– name of the ENUM type from TagStructure
(only for the e
type – ENUM).s
– string value of ENUM (only for the e
type – ENUM).Example:
- enum:
name: OpType #
Name of ENUM type ('n' attribute)
data:
0: NUL #
0 is written to the 'v' attribute, NUL is written to the 's' attribute
1: PULSE_ON
2: PULSE_OFF
x
– identifies the main value of the tag.Format: "x": 1
The x
attribute is absent from all other fields of the tag.
q
– value of the quality attribute.ts
– timestamp status displaying its accuracy, temporary state, or reason for an error during verification.ds
– data status.o
– origin of the value or command.t
– time when the tag values were last updated (taken from traffic).ct
– cause of transmission.Format: "m": "q"
Example of a forwarded tag value in JSON format: { "n": "TagStructure1", "ts": 18446744073709551616, "dn": "r", "mp": 1, "d": { "value": { "t": "d", "v": 3.1415, "x": 1 }, "quality": { "t": "s", "v": "good", "m": "q" }, "mask": { "t": "u", "v": 18446744073709551616 }, "enumfield": { "t": "e", "n": "SwitchState", "v": 0, "s": "Off" }, "strucfield": { "t": "st", "v": { "v1": { "t": "d", "v": 3.1415 }, "q2": { "t": "s", "v": "good", "m": "q" } } }, "unionfield": { "t": "un", "v": { "_": { "t": "u", "v": 42 }, "low4bits": { "t": "u", "v": 10 }, "high4bits": { "t": "u", "v": 2 } } } } } |
Examples of receiving tag values by subscription
Below is an example of receiving tag values by subscription using standard WebSocket functions in Python.
You must first run the following command:pip install websocket_client
Example subscription using standard WebSocket functions: import json, ssl, websocket
def on_message(ws, message): print(message)
def on_error(ws, error): print(f' error: {error}')
def on_close(ws): print("### closed ###")
def on_open(ws): print("connection opened and handshake received ready to send messages")
# all sent messages must end with this character message_separator = chr(30)
# setting up json as messages format protocol_selection_args = { 'protocol': 'json', 'version': 1 } ws.send(json.dumps(protocol_selection_args) + message_separator)
# creating subscription args = { 'arguments': [ { 'tagIdentifiers': [ { 'tagName': 'tag_01', 'assetName': 'asset_02' } ], 'streamConfig': { 'samplingRateHz': 5 } } ], 'invocationId': '0', # will be included in response message 'target': 'getTagValuesStream', 'type': 4 # must be equal to 4 for outgoing messages }
ws.send(json.dumps(args) + message_separator)
def login(): token = "you should get access token for API here" return token
if __name__ == "__main__": server_url = "wss://localhost:8091/kics4net/api/tag-values" auth = "Authorization: Bearer " + login()
# for troubleshooting uncomment next line # websocket.enableTrace(True) ws = websocket.WebSocketApp(server_url, on_message=on_message, on_error=on_error, on_close=on_close, header=[auth])
print(f'opening connection to {server_url}') ws.on_open = on_open ws.run_forever( # use it only if Server has self-signed certificate sslopt={"cert_reqs": ssl.CERT_NONE} ) |
Below is an example of receiving tag values by subscription using the SignalR Core library in Python.
You must first run the following command:pip install signalrcore
Example subscription using the SignalR Core library: import logging from signalrcore.hub_connection_builder import HubConnectionBuilder
TOKEN = 'you should get access token for API here' IP = '192.168.0.7' PORT = '8080' HUB = 'kics4net/api/v4/tag-values'
class WebsocketConnection(HubConnectionBuilder): def __init__(self, url: str = None, options: dict = None, verify_ssl: bool = False): super().__init__() self.with_url(url, options=options) self.configure_logging(logging.WARNING) self.with_automatic_reconnect({ "type": "raw", "keep_alive_interval": 10, "reconnect_interval": 5, "max_attempts": 5 }) self.verify_ssl = verify_ssl
def on_tag_stream_value(self, m): result.append(m) print(f'on_new_tag_value, {m}')
def on_tag_strean_error(self, e): print(f'onError, {e}')
def on_tag_stream_complete(self, q): print(f'onComplete, {q}')
def subscribe_tags(self): print("connection opened and handshake received ready to send messages")
args = { 'tagIdentifiers': [ { 'tagName': 'tag_01', 'assetName': 'asset_02'} ], 'streamConfig': { 'samplingRateHz': 5 } }
self.stream("GetTagValuesStream", [args]) \ .subscribe({ "next": self.on_tag_stream_value, "complete": self.on_tag_stream_complete, "error": self.on_tag_strean_error })
def main(): server_url = "https://{}:{}/{}".format(IP, PORT, HUB) login = 'bearer {}'.format(TOKEN)
conn = WebsocketConnection(url=server_url, options={"headers": {"authorization": login}}) conn.build()
logging.info(f'opening connection to {server_url}') conn.on_open(conn.subscribe_tags) conn.start()
logging.info('closing connection') conn.stop()
if __name__ == '__main__': main() |