Kaspersky Endpoint Detection and Response Optimum

Kaspersky Endpoint Detection and Response Optimum im Zusammenspiel mit Kaspersky Security Center OpenAPI ausführen

2. Mai 2024

ID 231480

Wenn Kaspersky Endpoint Detection and Response Optimum mittels Kaspersky Security Center Web Console lokal bereitgestellt wird, können Sie die Automatisierung konfigurieren und einige Betriebsszenarien und Aufgaben mithilfe der Kaspersky Security Center OpenAPI anpassen.

Weitere Informationen zur Verwendung von OpenAPI finden Sie in der Hilfe von Kaspersky Security Center.

Das folgende Beispiel zeigt ein Skript, das einen Computer mittels der OpenAPI von Kaspersky Security Center vom Netzwerk trennt.


# #!/usr/bin/python -tt

# -*- coding: utf-8 -*-


import sys

import os

import argparse

import socket

import time

import getpass

import urllib3

from sys import platform

from urllib.parse import urlparse

from KlAkOAPI.Params import KlAkParams, KlAkArray, paramParams, strToBin

from KlAkOAPI.AdmServer import KlAkAdmServer

from KlAkOAPI.Error import KlAkError, KlAkResponseError

from KlAkOAPI.CgwHelper import KlAkCgwHelper

from KlAkOAPI.GatewayConnection import KlAkGatewayConnection   

from KlAkOAPI.HostGroup import KlAkHostGroup

from KlAkOAPI.ChunkAccessor import KlAkChunkAccessor

from KlAkOAPI.Tasks import KlAkTasks

from KlAkOAPI.HostTasks import KlAkHostTasks

from KlAkOAPI.NagHstCtl import KlAkNagHstCtl


# For basic authentication, you should either state '-user' and '-password' arguments or the following credentials must be applied: KSCServerUserAccountDefault / KSCServerUserPasswordDefault

# You should create an internal user with these credentials in advance in Kaspersky Security Center by using Kaspersky Security Center Web Console or MMC-based Administration Console and grant this user required privileges, such as 'Main Administrator' role.

# For Windows platform, NTLM authentication is applied when '-user' and '-password' arguments are omitted.

KSCServerUserAccountDefault = 'klakoapi_test'

KSCServerUserPasswordDefault = 'testpassword'



def GetHostNameByHostFQDN(server, strHostFQDN):

""" Find (internal) host name by host display name; the returned wsHostName is required for gateway connection to Network Agent """

    hostGroup = KlAkHostGroup(server)

    hostInfo = hostGroup.FindHosts('(KLHST_WKS_FQDN="' + strHostFQDN + '")', ['KLHST_WKS_HOSTNAME', 'KLHST_WKS_DN', 'name'], [], {'KLGRP_FIND_FROM_CUR_VS_ONLY': True}, 100)

    strAccessor = hostInfo.OutPar('strAccessor')


# get search result (in case of ambiguity first found host is taken)

    chunkAccessor = KlAkChunkAccessor (server)

   items_count = chunkAccessor.GetItemsCount(strAccessor).RetVal()

    if items_count < 1:

        raise KlAkError('no gateway host found by name ' + strHostFQDN)

    res_chunk = chunkAccessor.GetItemsChunk(strAccessor, 0, 1)

    res_array = KlAkParams(res_chunk.OutPar('pChunk'))['KLCSP_ITERATOR_ARRAY']

    res_host = res_array[0]

    wsHostName = res_host['KLHST_WKS_HOSTNAME']


    print('Host for Network Agent gateway connection is:', strHostFQDN, ' that corresponds to device', res_host['KLHST_WKS_DN'], 'in group', res_host['name'])


    return wsHostName


def GetServer(server_url):

"""Connection to Kaspersky Security Center Administration Server"""

# Connection to Administration Server installed on the current device, use the default port

if platform == "win32":

username = None # for Windows, use NTLM by default

        password = None


        username = 'klakoapi_test' # for other platform, use basic authentication, user should be created on Kaspersky Security Center Administration Server in advance

        password = 'testpassword'


    SSLVerifyCert = 'C:\\ProgramData\\KasperskyLab\\adminkit\\1093\\cert\\klserver.cer'


    # create Administration Server object

    server = KlAkAdmServer.Create(server_url, username, password, verify = SSLVerifyCert)


    return server  


def PrepareNagentGatewayConnection(server_main, server_parent_on_hierarchy = None, wsHostName = '', arrLocation = []):

""" Prepares token for gateway connection to Network Agent; see 'Creating gateway connections' section in Kaspersky Security Center OpenAPI documentation

arrLocation is updated with Network Agent location, can be used in creating chain of locations down by hierarchy """

if wsHostName == '':

raise Exception('no hosts found on Administration Server, nothing to demonstrate as Network Agent gateway connection') 


    if server_parent_on_hierarchy == None:

        server_parent_on_hierarchy = server_main


# step 1: get Network Agent location   

    cgwHelper = KlAkCgwHelper(server_parent_on_hierarchy)

    nagentLocation = cgwHelper.GetNagentLocation(wsHostName).RetVal()   


    # step 2: build locations list



    # step 3: prepare gateway connection to main server with locations array built on previous step

    gatewayConnection = KlAkGatewayConnection(server_main)

    response = gatewayConnection.PrepareGatewayConnection(arrLocation)

    token_on_nagent = response.OutPar('wstrAuthKey')


    # use token for further gateway connection to Network Agent

    return token_on_nagent 


def ConnectNagentGatewayAuth(server_url, gw_token):

""" Connect Network Agent by using gateway connection.

Token for gateway connection is prepared with PrepareNagentGatewayConnection(...) """

    print ('Main Kaspersky Security Center Administration Server address:', server_url)


    server = KlAkAdmServer.CreateGateway(server_url, gw_token, False)


    if server.connected:

        print ('Network Agent is connected successfully.')


        print ('Failed to connect Network Agent.')


    return server


def CreteGatewayToHost(server, server_url, hostId):


    # prepare token for gateway authentication

    print ('-- Prepare Network Agent gateway connection --')

    token = PrepareNagentGatewayConnection(server, wsHostName = hostId) # prepare token to connect to Network Agent on the current device; or use the device FQDN here

    server = ConnectNagentGatewayAuth(server_url, token)


    return server


def FindHostTask(server, hostId, strDisplayName):

    print('Searching for a task for the host', strDisplayName)


    oHostGroup = KlAkHostGroup(server)

    strSrvObjId = oHostGroup.GetHostTasks(hostId).RetVal()


    oHostTasks = KlAkHostTasks(server)


    oHostTasks.ResetTasksIterator(strSrvObjId, '', '', '', '', '')

    pTaskData = None

    while True:

        pTaskData = oHostTasks.GetNextTask(strSrvObjId).OutPar('pTaskData')


        if pTaskData == None or len(pTaskData) == 0:



        strDN = pTaskData['TASK_INFO_PARAMS']['DisplayName']

        if strDN == strDisplayName:

            print('Task ' + strDN + ' found')



    return pTaskData


def StartHostTask(nagHstCtl, taskTsId, strProductName, strProductVersion):

    nagHstCtl.SendTaskAction(strProductName, strProductVersion, taskTsId, 5)


def WaitTaskCompletion(nagHstCtl, taskTsId, strProductName, strProductVersion):   

    pFilter = KlAkParams({})

    pFilter.AddParams('klhst-rt-TskInfo', {'klhst-ProductName':strProductName})


    n = 0

    bDone = False

    while n < 20 or bDone:

        info = nagHstCtl.GetHostRuntimeInfo(pFilter).RetVal()['klhst-rt-TskInfo'][strProductName]


        bFoundInfo = False

        for taskinfo in KlAkArray(info['klhst-rt-TskArray']):

            if taskinfo['taskStorageId'] == taskTsId:

                print('Current task state:', taskinfo['taskState'])


                bFoundInfo = True


                if taskinfo['taskState'] != 1: # Not running

                    print('Task is complete')

                    bDone = True



        if not bFoundInfo:

            print('Task is complete')




        n += 1


def main():


    server_address = socket.getfqdn()

    server_port = 13299

    server_url = 'https://' + server_address + ':' + str(server_port)


    # names of tasks related to isolation. Can be found in device properties in Kaspersky Security Center Web Console.

    strIsolate = 'Network isolation'

    strUnblock = 'Termination of network isolation'

    hostaddress = 'some-host.lab.local'


    #Connecting to Kaspersky Security Center

    server = GetServer(server_url)


    if hostaddress is None:

        hostaddress = socket.getfqdn()


    hostId = GetHostNameByHostFQDN(server, hostaddress)


    pTaskData = FindHostTask(server, hostId, strIsolate)

    # to remove isolation run

    # pTaskData = FindHostTask(server, hostId, strUnblock)


    taskTsId = pTaskData['TASK_UNIQUE_ID']

    strProductName = pTaskData['TASKID_PRODUCT_NAME']

    strProductVersion = pTaskData['TASKID_VERSION']


    nagent = CreteGatewayToHost(server, server_url, hostId)

    nagHstCtl = KlAkNagHstCtl(nagent)


    StartHostTask(nagHstCtl, taskTsId, strProductName, strProductVersion)

    WaitTaskCompletion(nagHstCtl, taskTsId, strProductName, strProductVersion)   


if __name__ == '__main__':


