Working with Kaspersky Endpoint Detection and Response Optimum using Kaspersky Security Center OpenAPI

March 5, 2024

ID 231480

When Kaspersky Endpoint Detection and Response Optimum is deployed locally using Kaspersky Security Center Web Console, you can configure automation and customize some operating scenarios and tasks using Kaspersky Security Center OpenAPI.

For more information about working with OpenAPI, refer to the Kaspersky Security Center Help.

The example below shows a script that isolates a computer from the network using Kaspersky Security Center OpenAPI.

Script example:

# #!/usr/bin/python -tt

# -*- coding: utf-8 -*-

 

import sys

import os

import argparse

import socket

import time

import getpass

import urllib3

from sys import platform

from urllib.parse import urlparse

from KlAkOAPI.Params import KlAkParams, KlAkArray, paramParams, strToBin

from KlAkOAPI.AdmServer import KlAkAdmServer

from KlAkOAPI.Error import KlAkError, KlAkResponseError

from KlAkOAPI.CgwHelper import KlAkCgwHelper

from KlAkOAPI.GatewayConnection import KlAkGatewayConnection   

from KlAkOAPI.HostGroup import KlAkHostGroup

from KlAkOAPI.ChunkAccessor import KlAkChunkAccessor

from KlAkOAPI.Tasks import KlAkTasks

from KlAkOAPI.HostTasks import KlAkHostTasks

from KlAkOAPI.NagHstCtl import KlAkNagHstCtl

 

# For basic authentication, you should either state '-user' and '-password' arguments or the following credentials must be applied: KSCServerUserAccountDefault / KSCServerUserPasswordDefault

# You should create an internal user with these credentials in advance in Kaspersky Security Center by using Kaspersky Security Center Web Console or MMC-based Administration Console and grant this user required privileges, such as 'Main Administrator' role.

# For Windows platform, NTLM authentication is applied when '-user' and '-password' arguments are omitted.

KSCServerUserAccountDefault = 'klakoapi_test'

KSCServerUserPasswordDefault = 'testpassword'

 

 

def GetHostNameByHostFQDN(server, strHostFQDN):

""" Find (internal) host name by host display name; the returned wsHostName is required for gateway connection to Network Agent """

    hostGroup = KlAkHostGroup(server)

    hostInfo = hostGroup.FindHosts('(KLHST_WKS_FQDN="' + strHostFQDN + '")', ['KLHST_WKS_HOSTNAME', 'KLHST_WKS_DN', 'name'], [], {'KLGRP_FIND_FROM_CUR_VS_ONLY': True}, 100)

    strAccessor = hostInfo.OutPar('strAccessor')

   

# get search result (in case of ambiguity first found host is taken)

    chunkAccessor = KlAkChunkAccessor (server)

   items_count = chunkAccessor.GetItemsCount(strAccessor).RetVal()

    if items_count < 1:

        raise KlAkError('no gateway host found by name ' + strHostFQDN)

    res_chunk = chunkAccessor.GetItemsChunk(strAccessor, 0, 1)

    res_array = KlAkParams(res_chunk.OutPar('pChunk'))['KLCSP_ITERATOR_ARRAY']

    res_host = res_array[0]

    wsHostName = res_host['KLHST_WKS_HOSTNAME']

 

    print('Host for Network Agent gateway connection is:', strHostFQDN, ' that corresponds to device', res_host['KLHST_WKS_DN'], 'in group', res_host['name'])

      

    return wsHostName

   

def GetServer(server_url):

"""Connection to Kaspersky Security Center Administration Server"""

# Connection to Administration Server installed on the current device, use the default port

if platform == "win32":

username = None # for Windows, use NTLM by default

        password = None

    else:

        username = 'klakoapi_test' # for other platform, use basic authentication, user should be created on Kaspersky Security Center Administration Server in advance

        password = 'testpassword'

   

    SSLVerifyCert = 'C:\\ProgramData\\KasperskyLab\\adminkit\\1093\\cert\\klserver.cer'

 

    # create Administration Server object

    server = KlAkAdmServer.Create(server_url, username, password, verify = SSLVerifyCert)

   

    return server  

    

def PrepareNagentGatewayConnection(server_main, server_parent_on_hierarchy = None, wsHostName = '', arrLocation = []):

""" Prepares token for gateway connection to Network Agent; see 'Creating gateway connections' section in Kaspersky Security Center OpenAPI documentation

arrLocation is updated with Network Agent location, can be used in creating chain of locations down by hierarchy """

if wsHostName == '':

raise Exception('no hosts found on Administration Server, nothing to demonstrate as Network Agent gateway connection') 

  

    if server_parent_on_hierarchy == None:

        server_parent_on_hierarchy = server_main

    

# step 1: get Network Agent location   

    cgwHelper = KlAkCgwHelper(server_parent_on_hierarchy)

    nagentLocation = cgwHelper.GetNagentLocation(wsHostName).RetVal()   

    

    # step 2: build locations list

    arrLocation.append(paramParams(nagentLocation))

   

    # step 3: prepare gateway connection to main server with locations array built on previous step

    gatewayConnection = KlAkGatewayConnection(server_main)

    response = gatewayConnection.PrepareGatewayConnection(arrLocation)

    token_on_nagent = response.OutPar('wstrAuthKey')

   

    # use token for further gateway connection to Network Agent

    return token_on_nagent 

 

def ConnectNagentGatewayAuth(server_url, gw_token):

""" Connect Network Agent by using gateway connection.

Token for gateway connection is prepared with PrepareNagentGatewayConnection(...) """

    print ('Main Kaspersky Security Center Administration Server address:', server_url)

   

    server = KlAkAdmServer.CreateGateway(server_url, gw_token, False)

   

    if server.connected:

        print ('Network Agent is connected successfully.')

    else:

        print ('Failed to connect Network Agent.')

       

    return server

 

def CreteGatewayToHost(server, server_url, hostId):

      

    # prepare token for gateway authentication

    print ('-- Prepare Network Agent gateway connection --')

    token = PrepareNagentGatewayConnection(server, wsHostName = hostId) # prepare token to connect to Network Agent on the current device; or use the device FQDN here

    server = ConnectNagentGatewayAuth(server_url, token)

 

    return server

   

def FindHostTask(server, hostId, strDisplayName):

    print('Searching for a task for the host', strDisplayName)

 

    oHostGroup = KlAkHostGroup(server)

    strSrvObjId = oHostGroup.GetHostTasks(hostId).RetVal()

   

    oHostTasks = KlAkHostTasks(server)

 

    oHostTasks.ResetTasksIterator(strSrvObjId, '', '', '', '', '')

    pTaskData = None

    while True:

        pTaskData = oHostTasks.GetNextTask(strSrvObjId).OutPar('pTaskData')

       

        if pTaskData == None or len(pTaskData) == 0:

            break

           

        strDN = pTaskData['TASK_INFO_PARAMS']['DisplayName']

        if strDN == strDisplayName:

            print('Task ' + strDN + ' found')

            break

                       

    return pTaskData

   

def StartHostTask(nagHstCtl, taskTsId, strProductName, strProductVersion):

    nagHstCtl.SendTaskAction(strProductName, strProductVersion, taskTsId, 5)

   

def WaitTaskCompletion(nagHstCtl, taskTsId, strProductName, strProductVersion):   

    pFilter = KlAkParams({})

    pFilter.AddParams('klhst-rt-TskInfo', {'klhst-ProductName':strProductName})

 

    n = 0

    bDone = False

    while n < 20 or bDone:

        info = nagHstCtl.GetHostRuntimeInfo(pFilter).RetVal()['klhst-rt-TskInfo'][strProductName]

 

        bFoundInfo = False

        for taskinfo in KlAkArray(info['klhst-rt-TskArray']):

            if taskinfo['taskStorageId'] == taskTsId:

                print('Current task state:', taskinfo['taskState'])

               

                bFoundInfo = True

               

                if taskinfo['taskState'] != 1: # Not running

                    print('Task is complete')

                    bDone = True

                    break

                   

        if not bFoundInfo:

            print('Task is complete')

            break

           

        time.sleep(1)

        n += 1

 

def main():

 

    server_address = socket.getfqdn()

    server_port = 13299

    server_url = 'https://' + server_address + ':' + str(server_port)

 

    # names of tasks related to isolation. Can be found in device properties in Kaspersky Security Center Web Console.

    strIsolate = 'Network isolation'

    strUnblock = 'Termination of network isolation'

    hostaddress = 'some-host.lab.local'

 

    #Connecting to Kaspersky Security Center

    server = GetServer(server_url)

 

    if hostaddress is None:

        hostaddress = socket.getfqdn()

   

    hostId = GetHostNameByHostFQDN(server, hostaddress)

   

    pTaskData = FindHostTask(server, hostId, strIsolate)

    # to remove isolation run

    # pTaskData = FindHostTask(server, hostId, strUnblock)

 

    taskTsId = pTaskData['TASK_UNIQUE_ID']

    strProductName = pTaskData['TASKID_PRODUCT_NAME']

    strProductVersion = pTaskData['TASKID_VERSION']

   

    nagent = CreteGatewayToHost(server, server_url, hostId)

    nagHstCtl = KlAkNagHstCtl(nagent)

 

    StartHostTask(nagHstCtl, taskTsId, strProductName, strProductVersion)

    WaitTaskCompletion(nagHstCtl, taskTsId, strProductName, strProductVersion)   

 

if __name__ == '__main__':

    main()

Did you find this article helpful?
What can we do better?
Thank you for your feedback! You're helping us improve.
Thank you for your feedback! You're helping us improve.