Handling tag change events
This section explains how to implement a service that handles the tag change events sent by Kaspersky Industrial CyberSecurity for Networks.
Overview
When you subscribe to tag change events, you must specify an address where Kaspersky Industrial CyberSecurity for Networks has to send tag change events. To handle these messages, you must implement the OnNewTags
method of the TagNotifier
service and run the service. The service must listen for incoming messages on the specified address and port.
The TagNotifier
service is defined in the tag_notifier_service.proto
file. Messages and enumerations are defined in the tag_notifier_service.proto
file.
The TagNotifier
service has the following methods for handling messages with tag change events:
OnNewTags
This method is called when a message with tag change event is received.
Handling tag change events (example)
The following is an example of a service that handles tag change events. It has an implementation of the OnNewTags
method of the TagNotifier
service, which prints the received tag change events.
For more information about implementing gRPC services, see the gRPC documentation.
from __future__ import print_function import datetime import time from concurrent import futures import grpc from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 import tag_notifier_service_pb2 import tag_notifier_service_pb2_grpc
class TagNotifierServicer(tag_notifier_service_pb2_grpc.TagNotifierServicer):
def OnNewTags(self, request_iterator, context): # rpc OnNewTags(stream TagEvent) returns(google.protobuf.Empty); for tag_event in request_iterator: print("\n\n") print("Tag event at:", datetime.datetime.utcfromtimestamp(tag_event.timestamp.seconds).strftime('%Y-%m-%d %H:%M:%S'), "| Operation:", tag_event.operation, "| Monitoring point:", tag_event.monitoringPoint, "| From:", context.peer()) for tag in tag_event.tags: print(tag) return google_dot_protobuf_dot_empty__pb2.Empty()
def serve(address): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) tag_notifier_service_pb2_grpc.add_TagNotifierServicer_to_server(TagNotifierServicer(), server) with open('./certs/product_facade_grpc_server.crt', 'rt') as f: root_crt = f.read() with open('./certs/client.key', 'rt') as f: key = f.read() with open('./certs/client.crt', 'rt') as f: chain = f.read()
server_credentials = grpc.ssl_server_credentials( private_key_certificate_chain_pairs=[(key, chain,)], root_certificates=root_crt, require_client_auth=True ) server.start()
SECONDS_IN_24H = 60 * 60 * 24 try: while True: time.sleep(SECONDS_IN_24H) except KeyboardInterrupt: print("Stopping...") server.stop(0)
if __name__ == '__main__': address = '[::]:50051' print("Serving on", address) serve(address)
|