Kaspersky Industrial CyberSecurity for Networks API Developer's Guide

Handling tag change events

This section explains how to implement a service that handles the tag change events sent by Kaspersky Industrial CyberSecurity for Networks.

Overview

When you subscribe to tag change events, you must specify an address where Kaspersky Industrial CyberSecurity for Networks has to send tag change events. To handle these messages, you must implement the OnNewTags method of the TagNotifier service and run the service. The service must listen for incoming messages on the specified address and port.

The TagNotifier service is defined in the tag_notifier_service.proto file. Messages and enumerations are defined in the tag_notifier_service.proto file.

The TagNotifier service has the following methods for handling messages with tag change events:

  • OnNewTags

    This method is called when a message with tag change event is received.

Handling tag change events (example)

The following is an example of a service that handles tag change events. It has an implementation of the OnNewTags method of the TagNotifier service, which prints the received tag change events.

For more information about implementing gRPC services, see the gRPC documentation.

from __future__ import print_function

import datetime

import time

from concurrent import futures

import grpc

from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2

import tag_notifier_service_pb2

import tag_notifier_service_pb2_grpc

 

class TagNotifierServicer(tag_notifier_service_pb2_grpc.TagNotifierServicer):

 

def OnNewTags(self, request_iterator, context):

# rpc OnNewTags(stream TagEvent) returns(google.protobuf.Empty);

for tag_event in request_iterator:

print("\n\n")

print("Tag event at:", datetime.datetime.utcfromtimestamp(tag_event.timestamp.seconds).strftime('%Y-%m-%d %H:%M:%S'),

"| Operation:", tag_event.operation,

"| Monitoring point:", tag_event.monitoringPoint,

"| From:", context.peer())

for tag in tag_event.tags:

print(tag)

return google_dot_protobuf_dot_empty__pb2.Empty()

 

def serve(address):

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

tag_notifier_service_pb2_grpc.add_TagNotifierServicer_to_server(TagNotifierServicer(), server)

with open('./certs/product_facade_grpc_server.crt', 'rt') as f:

root_crt = f.read()

with open('./certs/client.key', 'rt') as f:

key = f.read()

with open('./certs/client.crt', 'rt') as f:

chain = f.read()

 

server_credentials = grpc.ssl_server_credentials(

private_key_certificate_chain_pairs=[(key, chain,)],

root_certificates=root_crt,

require_client_auth=True

)

server.start()

 

SECONDS_IN_24H = 60 * 60 * 24

try:

while True:

time.sleep(SECONDS_IN_24H)

except KeyboardInterrupt:

print("Stopping...")

server.stop(0)

 

if __name__ == '__main__':

address = '[::]:50051'

print("Serving on", address)

serve(address)