Contents
Getting tags and subscribing to tag changes
This chapter explains how to get tags, subscribe to tag change events, and implement a service that receives tag change events.
Getting tags
This section explains how to get tags by using Kaspersky Industrial CyberSecurity for Networks API.
Overview
Tags are values that describe parameters of an industrial process. For example, a manufacturing process involving a thermal oxidizer may have temperature, residence time, and turbulence among many other tags.
You can get information about tags by using the TagProvider
service methods. This service is defined in the tag_provider_service.proto
file. Messages and enumerations are defined in the tag_provider.proto
, tag_provider_service.proto
, and common.proto
files.
The TagProvider
service has the following method for getting tags:
GetTags
Returns tags. You can specify a filter for tags.
This method is synchronous. The response time for this method depends on the number of requested tags.
Getting tags (example)
To get all tags, use the GetTags
method and specify an empty filter. This method returns a stream of tags.
In the following example, a stub requests all tags.
tagStub = tag_provider_service_pb2_grpc.TagProviderStub(channel)
request = tag_provider_service_pb2.TagsRequest(filter = "") response = tagStub.GetTags(request)
for tag in response: if tag.HasField("boolVal"): value = tag.boolVal elif tag.HasField("int64Val"): value = tag.int64Val elif tag.HasField("doubleVal"): value = tag.doubleVal elif tag.HasField("stringVal"): value = tag.stringVal elif tag.HasField("binaryVal"): value = tag.binaryVal else: value = "No value" print("\n\n") print("Tag:", tag.id, "| Name:", tag.briefInfo.name, "| Description:", tag.briefInfo.description, "| Changed at:", datetime.datetime.utcfromtimestamp(tag.timestamp.seconds).strftime('%Y-%m-%d %H:%M:%S'), "| Value type: ", tag.WhichOneof("value"), "| Value: ", value) |
Using filters for tags
This section describes filters and filter syntax, and provides filter examples for tags.
About filters for tags
When getting tags, you can filter them by tag identifier and tag name. Filters are strings that can be specified in RPC calls. A filter is a group of conditions and logical operators that places constraints on the returned data.
For example, when you get tags by using the GetTags
method of TagProvider
service, and you want to get information about tags with certain identifiers, then you can use the following filter to do so:
filter: { tag_id = 1001, 1002, 1003 } |
For examples of using filters for tags, see subsection "Making tag requests with filters" below.
Tag filter fields
Tag filters support basic conditions for following fields:
Field names (tag requests)
Field name |
Value type |
Description |
Condition example |
---|---|---|---|
|
Integral |
Tag identifiers |
|
|
String |
Tag names |
|
Using lists and ranges
You can specify several tag identifiers, tag names, or tag descriptions using a list syntax:
value_1, value_2, ..., value_N, ...
The following are examples of list syntax:
tag_id = 1001, 1002, 1003 tag_name = Valve.State, Valve.Pressure |
For tag identifiers, you can also specify ranges using a range syntax:
value_1-value_N
The following are examples of range syntax:
tag_id = 1001-1100 |
You can also combine lists and ranges for tag identifiers, for example:
tag_id = 1001-1100, 2000-2050 |
Combining conditions
You can combine conditions using logical operators and parentheses:
filter: { (tag_id = 1001, 1002, 1003) || (tag_name = Valve.State, Valve.Pressure) } |
Making tag requests with filters
Following TagNotifier
service method uses filters:
GetTags
You can specify a filter in the
TagsRequest
message.
In the following example, a stub requests tags with specified identifiers.
tagStub = tag_provider_service_pb2_grpc.TagProviderStub(channel)
#rpc GetTags(TagsRequest) returns(stream Tag);
request = tag_provider_service_pb2.TagsRequest(filter = "filter:{tag_id = 1000, 1001, 1002 }") response = tagStub.GetTags(request) for tag in response: print("\n\n") print("Tag:", tag.id, "| Name:", tag.briefInfo.name, "| Changed at:", datetime.datetime.utcfromtimestamp(tag.timestamp.seconds).strftime('%Y-%m-%d %H:%M:%S')) |
Using sorting for tags
This section describes sorting and sorting syntax for tags, and provides sorting examples.
About sorting
Sorting is a special string that can be specified in RPC calls. Sorting can be combined with filters or used separately from them. A sorting for tags is a group of conditions that specify the order of returned tags.
For example, if you want to get tags by using the GetTags
method of TagProvider
, and you also want those tags to be sorted by event identifier in descending order, then you can use the following sorting to do so:
sort: { tag_id: desc } |
For examples of using sorting, see subsection "Making tag requests with sorting" below.
Combining sorting with filters
Sorting can be combined with filters.
sort: { tag_id: desc } filter: { tag_id = 1000-1100 } |
For an example of using filters with sorting, see subsections "Making tag requests with sorting" below.
Sorting condition syntax
Sorting conditions use the following syntax:
field : order
where field
is the name of the field affected by the condition, and order
is one of two values:
asc
Field values are sorted in ascending order.
desc
Field values are sorted in descending order.
The following are examples of sorting conditions:
tag_id : desc tag_name : asc tag_description : asc |
Field names (tag requests)
The following table summarizes field names that can be used in tag requests.
Field names (tag requests)
Field name |
Description |
---|---|
|
Tag identifier |
|
Tag name |
|
Tag description |
Making tag requests with sorting
The following TagNotifier
service method uses sorting:
GetTags
You can specify a sorting in the
TagsRequest
message.
The following is an example of using sorting to get tags with the GetTags
method:
tagStub = tag_provider_service_pb2_grpc.TagProviderStub(channel)
request = tag_provider_service_pb2.TagsRequest(filter = "sort: { tag_id: asc, tag_name: desc, tag_description: asc}") response = tagStub.GetTags(request) for tag in response: print("\n\n") print("Tag:", tag.id, "| Name:", tag.briefInfo.name, "| Changed at:", datetime.datetime.utcfromtimestamp(tag.timestamp.seconds).strftime('%Y-%m-%d %H:%M:%S')) |
The following is an example of combining sorting and filters to get a subset of tags with the GetTags
method:
tagStub = tag_provider_service_pb2_grpc.TagProviderStub(channel)
request = tag_provider_service_pb2.TagsRequest(filter = "filter:{tag_id=1000-2000} sort:{tag_id:desc}") response = tagStub.GetTags(request) for tag in response: print("\n\n") print("Tag:", tag.id, "| Name:", tag.briefInfo.name, "| Changed at:", datetime.datetime.utcfromtimestamp(tag.timestamp.seconds).strftime('%Y-%m-%d %H:%M:%S')) |
Subscribing to tag change events
This section explains how to subscribe to tag change events.
Overview
A tag change event is a message that contains a time stamp, and a list of tag identifiers whose values have changed. For each tag identifier, a new value of the tag is also returned.
You can subscribe to tag change events by using the TagProvider
service methods. This service is defined in the tag_provider_service.proto
file. Messages and enumerations are defined in the tag_provider.proto
, tag_provider_service.proto
, and common.proto
files.
The TagProvider
service has the following methods for subscribing to tag change events:
SubscribeTagNotifier
Subscribes to tag change events.
You can specify a list of tag identifiers. Only events for the specified identifiers will be included in the subscription.
UnsubscribeNotifier
Removes an existing subscription.
Subscribing to tag change events (example)
To subscribe to tags, use the SubscribeTagNotifier
method. In the following example, a stub makes a subscription request for tags with identifiers 1000
, 1001
, and 1002
. Kaspersky Industrial CyberSecurity for Networks API will send events to the client.example.com:50051
address and port, where they must be handled by the TagNotifier
service. After that, the stub waits for 10 seconds and unsubscribes from tag change events.
tagStub = tag_provider_service_pb2_grpc.TagProviderStub(channel)
#rpc SubscribeTagNotifier(SubscriptionRequest) returns(Cookie); #rpc UnsubscribeNotifier(Cookie) returns(google.protobuf.Empty);
listener = "client.example.com:50051"
tag_filter = tag_provider_service_pb2.Filter(tagIds=[1000, 1001, 1002])
request = tag_provider_service_pb2.SubscriptionRequest(address=listener, filter=tag_filter) response = tagStub.SubscribeTagNotifier(request) cookie = response.cookie
# wait for 10 seconds, then unsubscribe print("Sleeping for 10 seconds") time.sleep(10)
# unsubscribe print("Unsubscribing") request = common_pb2.Cookie(cookie=cookie) response = tagStub.UnsubscribeNotifier(request) |
Handling tag change events
This section explains how to implement a service that handles the tag change events sent by Kaspersky Industrial CyberSecurity for Networks.
Overview
When you subscribe to tag change events, you must specify an address where Kaspersky Industrial CyberSecurity for Networks has to send tag change events. To handle these messages, you must implement the OnNewTags
method of the TagNotifier
service and run the service. The service must listen for incoming messages on the specified address and port.
The TagNotifier
service is defined in the tag_notifier_service.proto
file. Messages and enumerations are defined in the tag_notifier_service.proto
file.
The TagNotifier
service has the following methods for handling messages with tag change events:
OnNewTags
This method is called when a message with tag change event is received.
Handling tag change events (example)
The following is an example of a service that handles tag change events. It has an implementation of the OnNewTags
method of the TagNotifier
service, which prints the received tag change events.
For more information about implementing gRPC services, see the gRPC documentation.
from __future__ import print_function import datetime import time from concurrent import futures import grpc from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 import tag_notifier_service_pb2 import tag_notifier_service_pb2_grpc
class TagNotifierServicer(tag_notifier_service_pb2_grpc.TagNotifierServicer):
def OnNewTags(self, request_iterator, context): # rpc OnNewTags(stream TagEvent) returns(google.protobuf.Empty); for tag_event in request_iterator: print("\n\n") print("Tag event at:", datetime.datetime.utcfromtimestamp(tag_event.timestamp.seconds).strftime('%Y-%m-%d %H:%M:%S'), "| Operation:", tag_event.operation, "| Monitoring point:", tag_event.monitoringPoint, "| From:", context.peer()) for tag in tag_event.tags: print(tag) return google_dot_protobuf_dot_empty__pb2.Empty()
def serve(address): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) tag_notifier_service_pb2_grpc.add_TagNotifierServicer_to_server(TagNotifierServicer(), server) with open('./certs/product_facade_grpc_server.crt', 'rt') as f: root_crt = f.read() with open('./certs/client.key', 'rt') as f: key = f.read() with open('./certs/client.crt', 'rt') as f: chain = f.read()
server_credentials = grpc.ssl_server_credentials( private_key_certificate_chain_pairs=[(key, chain,)], root_certificates=root_crt, require_client_auth=True ) server.start()
SECONDS_IN_24H = 60 * 60 * 24 try: while True: time.sleep(SECONDS_IN_24H) except KeyboardInterrupt: print("Stopping...") server.stop(0)
if __name__ == '__main__': address = '[::]:50051' print("Serving on", address) serve(address)
|