Пример скрипта:
# #!/usr/bin/python -tt
# -*- coding: utf-8 -*-
import sys
import os
import argparse
import socket
import time
import getpass
import urllib3
from sys import platform
from urllib.parse import urlparse
from KlAkOAPI.Params import KlAkParams, KlAkArray, paramParams, strToBin
from KlAkOAPI.AdmServer import KlAkAdmServer
from KlAkOAPI.Error import KlAkError, KlAkResponseError
from KlAkOAPI.CgwHelper import KlAkCgwHelper
from KlAkOAPI.GatewayConnection import KlAkGatewayConnection
from KlAkOAPI.HostGroup import KlAkHostGroup
from KlAkOAPI.ChunkAccessor import KlAkChunkAccessor
from KlAkOAPI.Tasks import KlAkTasks
from KlAkOAPI.HostTasks import KlAkHostTasks
from KlAkOAPI.NagHstCtl import KlAkNagHstCtl
# For basic authentication, you should either state '-user' and '-password' arguments or the following credentials must be applied: KSCServerUserAccountDefault / KSCServerUserPasswordDefault
# You should create an internal user with these credentials in advance in Kaspersky Security Center by using Kaspersky Security Center Web Console or MMC-based Administration Console and grant this user required privileges, such as 'Main Administrator' role.
# For Windows platform, NTLM authentication is applied when '-user' and '-password' arguments are omitted.
KSCServerUserAccountDefault = 'klakoapi_test'
KSCServerUserPasswordDefault = 'testpassword'
def GetHostNameByHostFQDN(server, strHostFQDN):
""" Find (internal) host name by host display name; the returned wsHostName is required for gateway connection to Network Agent """
hostGroup = KlAkHostGroup(server)
hostInfo = hostGroup.FindHosts('(KLHST_WKS_FQDN="' + strHostFQDN + '")', ['KLHST_WKS_HOSTNAME', 'KLHST_WKS_DN', 'name'], [], {'KLGRP_FIND_FROM_CUR_VS_ONLY': True}, 100)
strAccessor = hostInfo.OutPar('strAccessor')
# get search result (in case of ambiguity first found host is taken)
chunkAccessor = KlAkChunkAccessor (server)
items_count = chunkAccessor.GetItemsCount(strAccessor).RetVal()
if items_count < 1:
raise KlAkError('no gateway host found by name ' + strHostFQDN)
res_chunk = chunkAccessor.GetItemsChunk(strAccessor, 0, 1)
res_array = KlAkParams(res_chunk.OutPar('pChunk'))['KLCSP_ITERATOR_ARRAY']
res_host = res_array[0]
wsHostName = res_host['KLHST_WKS_HOSTNAME']
print('Host for Network Agent gateway connection is:', strHostFQDN, ' that corresponds to device', res_host['KLHST_WKS_DN'], 'in group', res_host['name'])
return wsHostName
def GetServer(server_url):
"""Connection to Kaspersky Security Center Administration Server"""
# Connection to Administration Server installed on the current device, use the default port
if platform == "win32":
username = None # for Windows, use NTLM by default
password = None
else:
username = 'klakoapi_test' # for other platform, use basic authentication, user should be created on Kaspersky Security Center Administration Server in advance
password = 'testpassword'
SSLVerifyCert = 'C:\\ProgramData\\KasperskyLab\\adminkit\\1093\\cert\\klserver.cer'
# create Administration Server object
server = KlAkAdmServer.Create(server_url, username, password, verify = SSLVerifyCert)
return server
def PrepareNagentGatewayConnection(server_main, server_parent_on_hierarchy = None, wsHostName = '', arrLocation = []):
""" Prepares token for gateway connection to Network Agent; see 'Creating gateway connections' section in Kaspersky Security Center OpenAPI documentation
arrLocation is updated with Network Agent location, can be used in creating chain of locations down by hierarchy """
if wsHostName == '':
raise Exception('no hosts found on Administration Server, nothing to demonstrate as Network Agent gateway connection')
if server_parent_on_hierarchy == None:
server_parent_on_hierarchy = server_main
# step 1: get Network Agent location
cgwHelper = KlAkCgwHelper(server_parent_on_hierarchy)
nagentLocation = cgwHelper.GetNagentLocation(wsHostName).RetVal()
# step 2: build locations list
arrLocation.append(paramParams(nagentLocation))
# step 3: prepare gateway connection to main server with locations array built on previous step
gatewayConnection = KlAkGatewayConnection(server_main)
response = gatewayConnection.PrepareGatewayConnection(arrLocation)
token_on_nagent = response.OutPar('wstrAuthKey')
# use token for further gateway connection to Network Agent
return token_on_nagent
def ConnectNagentGatewayAuth(server_url, gw_token):
""" Connect Network Agent by using gateway connection.
Token for gateway connection is prepared with PrepareNagentGatewayConnection(...) """
print ('Main Kaspersky Security Center Administration Server address:', server_url)
server = KlAkAdmServer.CreateGateway(server_url, gw_token, False)
if server.connected:
print ('Network Agent is connected successfully.')
else:
print ('Failed to connect Network Agent.')
return server
def CreteGatewayToHost(server, server_url, hostId):
# prepare token for gateway authentication
print ('-- Prepare Network Agent gateway connection --')
token = PrepareNagentGatewayConnection(server, wsHostName = hostId) # prepare token to connect to Network Agent on the current device; or use the device FQDN here
server = ConnectNagentGatewayAuth(server_url, token)
return server
def FindHostTask(server, hostId, strDisplayName):
print('Searching for a task for the host', strDisplayName)
oHostGroup = KlAkHostGroup(server)
strSrvObjId = oHostGroup.GetHostTasks(hostId).RetVal()
oHostTasks = KlAkHostTasks(server)
oHostTasks.ResetTasksIterator(strSrvObjId, '', '', '', '', '')
pTaskData = None
while True:
pTaskData = oHostTasks.GetNextTask(strSrvObjId).OutPar('pTaskData')
if pTaskData == None or len(pTaskData) == 0:
break
strDN = pTaskData['TASK_INFO_PARAMS']['DisplayName']
if strDN == strDisplayName:
print('Task ' + strDN + ' found')
break
return pTaskData
def StartHostTask(nagHstCtl, taskTsId, strProductName, strProductVersion):
nagHstCtl.SendTaskAction(strProductName, strProductVersion, taskTsId, 5)
def WaitTaskCompletion(nagHstCtl, taskTsId, strProductName, strProductVersion):
pFilter = KlAkParams({})
pFilter.AddParams('klhst-rt-TskInfo', {'klhst-ProductName':strProductName})
n = 0
bDone = False
while n < 20 or bDone:
info = nagHstCtl.GetHostRuntimeInfo(pFilter).RetVal()['klhst-rt-TskInfo'][strProductName]
bFoundInfo = False
for taskinfo in KlAkArray(info['klhst-rt-TskArray']):
if taskinfo['taskStorageId'] == taskTsId:
print('Current task state:', taskinfo['taskState'])
bFoundInfo = True
if taskinfo['taskState'] != 1: # Not running
print('Task is complete')
bDone = True
break
if not bFoundInfo:
print('Task is complete')
break
time.sleep(1)
n += 1
def main():
server_address = socket.getfqdn()
server_port = 13299
server_url = 'https://' + server_address + ':' + str(server_port)
# names of tasks related to isolation. Can be found in device properties in Kaspersky Security Center Web Console.
strIsolate = 'Network isolation'
strUnblock = 'Termination of network isolation'
hostaddress = 'some-host.lab.local'
#Connecting to Kaspersky Security Center
server = GetServer(server_url)
if hostaddress is None:
hostaddress = socket.getfqdn()
hostId = GetHostNameByHostFQDN(server, hostaddress)
pTaskData = FindHostTask(server, hostId, strIsolate)
# to remove isolation run
# pTaskData = FindHostTask(server, hostId, strUnblock)
taskTsId = pTaskData['TASK_UNIQUE_ID']
strProductName = pTaskData['TASKID_PRODUCT_NAME']
strProductVersion = pTaskData['TASKID_VERSION']
nagent = CreteGatewayToHost(server, server_url, hostId)
nagHstCtl = KlAkNagHstCtl(nagent)
StartHostTask(nagHstCtl, taskTsId, strProductName, strProductVersion)
WaitTaskCompletion(nagHstCtl, taskTsId, strProductName, strProductVersion)
if __name__ == '__main__':
main()
|