Example (Python):
###################################################
# General part
###################################################
import time
import datetime
import requests
import jwt
# The certificate is required for authentication of an external resource
# You can download the certificate from https://mdr.kaspersky.com,
# save it to your disk, and add the path to it in the variable:
VERIFY_CERT_PATH = "C:\\tools\\DigiCert Global Root G2.crt"
# MDR REST API URL:
API_URL = "https://mdr.kaspersky.com/api/v1"
# Your client's ID and your tokens.
# For details about getting your ID and the tokens, refer to the help https://support.kaspersky.com/MDR/en-US/258285.htm
CLIENT_ID = "9ed43ed54sAmpleIdf349323951f" # ( Paste your value )
REFRESH_TOKEN = "ReFrEsHToKeN" # ( Paste your value )
ACCESS_TOKEN = "AcCeSsToKeN" # ( Paste your value )
###################################################
# Get access token and a refresh token for the next access token update
###################################################
if REFRESH_TOKEN:
refresh_token_exp = jwt.decode(REFRESH_TOKEN, options={"verify_signature": False}).get("exp")
print(f"REFRESH_TOKEN expiration date and time: {datetime.datetime.fromtimestamp(refresh_token_exp)}")
if refresh_token_exp > time.time():
print("REFRESH_TOKEN is actual ")
else:
print(
" You should update REFRESH_TOKEN . Please take it from MDR Console (https://support.kaspersky.com/MDR/en-US/258285.htm). "
)
exit()
else:
print(
" You should fill REFRESH_TOKEN value. Please take it from MDR Console (https://support.kaspersky.com/MDR/en-US/258285.htm). "
)
exit()
# Check the presence and validity of the access token
need_update_access_token = False
if ACCESS_TOKEN:
access_token_exp = jwt.decode(ACCESS_TOKEN, options={"verify_signature": False}).get("exp")
print(f"ACCESS_TOKEN expiration date and time: {datetime.datetime.fromtimestamp(access_token_exp)}")
if access_token_exp > time.time():
print("ACCESS_TOKEN is actual ")
else:
need_update_access_token = True
else:
need_update_access_token = True
# If necessary, update the access token and refresh token for the next access token update
access_token = ACCESS_TOKEN
if need_update_access_token:
request_body = {"refresh_token": REFRESH_TOKEN}
result = requests.post(url=f"{API_URL}/{CLIENT_ID}/session/confirm", json=request_body, verify=VERIFY_CERT_PATH)
result_json = result.json()
if "error" in result_json:
print(result_json)
exit()
# It is necessary to save the refresh token in order to obtain next access token after the expiration of the current access token
refresh_token = result_json["refresh_token"]
print(
f'!!! Your new REFRESH_TOKEN for the next time for request ACCESS_TOKEN (please replace value of REFRESH_TOKEN with this value) : "{refresh_token}"'
)
# A new access token is required to retrieve the data
access_token = result_json["access_token"]
print(f'!!! Your new ACCESS_TOKEN (please replace value of ACCESS_TOKEN with this value): "{access_token}"')
# The access token is added to the request header
headers = {"Authorization": f"Bearer {access_token}"}
###################################################
# Get the number of assets
###################################################
# The date and time are in milliseconds since 1970-01-01T00:00:00Z
request_body = {
"max_last_seen": int(time.time())
* 1000, # Limiting the maximum time for the last appearance of the asset to the current time
"min_last_seen": 1639311132000, # Limiting the minimum time for the last appearance of the asset with the constant - Sunday, December 12, 2021 12:12:12 PM (GMT)
}
result = requests.post(
url=f"{API_URL}/{CLIENT_ID}/assets/count", json=request_body, headers=headers, verify=VERIFY_CERT_PATH
)
print(result.json())
###################################################
# Get the list of assets
###################################################
request_body = {
# Search parameters:
"max_last_seen": int(time.time())
* 1000, # Limiting the maximum time for the last appearance of the asset to the current time
"min_last_seen": 1639311132000, # Limiting the minimum time for the last appearance of the asset with the constant - Sunday, December 12, 2021 12:12:12 PM (GMT)
"domain": "",
"host_names": ["MA-MDR-KES-S", "SIN-MDR-KSC"], # (Paste your value) Host names list
"is_isolated": False,
"network_interface": "10.70.104.1",
"os_version": "Windows", # The asset must contain the specified line in the name of the OS
"product": "",
"search_phrase": "mdr", # Phrase to search by field contents: "host_name", "domain", "installed_product_info", "network_interfaces", "os_version"
"statuses": ["OK", "ABSENT"], # Search for assets with the current statuses listed here
# Options for displaying search results:
"sort": "first_seen:asc", # Sort results by time of first occurrence. In case of page-by-page retrieval of results, it is necessary to specify a field for sorting that will not change from query to query, for example, "first_seen" (do not specify fields whose values are constantly changing, for example, the field "last_seen" ; this may lead to incorrect results) .
"page_size": 100, # Assets per page - 100
"page": 1, # Get the first page of search results
"version": 2, # Solution version
}
result = requests.post(
url=f"{API_URL}/{CLIENT_ID}/assets/list", json=request_body, headers=headers, verify=VERIFY_CERT_PATH
)
print(result.json())
###################################################
# Get asset details
###################################################
request_body = {
"asset_id": "0xFA6A68CC9A9415963DE841048A3BE929", # (Paste your value) Asset ID
"version": 2, # Solution version
}
result = requests.post(
url=f"{API_URL}/{CLIENT_ID}/assets/details", json=request_body, headers=headers, verify=VERIFY_CERT_PATH
).json()
print(result)
###################################################
# Get the number of incidents
###################################################
request_body = {
"max_update_time": int(time.time())
* 1000, # Limit the maximum time of the last incident update to the current time
"min_update_time": 1639311132000, # Limiting the minimum time for the last incident update with the constant - Sunday, December 12, 2021 12:12:12 PM (GMT)
"affected_hosts": [
"MA-MDR-KES-S:0xFA6A68CC9A94145456E841048A3BE929"
], # (Paste your value) Host list in "host_name:asset_id" format
}
result = requests.post(
url=f"{API_URL}/{CLIENT_ID}/incidents/count", json=request_body, headers=headers, verify=VERIFY_CERT_PATH
)
print(result.json())
###################################################
# Get the list of incidents
###################################################
request_body = {
# Search parameters:
"max_creation_time": int(time.time())
* 1000, # Limit the maximum time of the incident creation to the current time
"min_creation_time": 1639311132000, # Limiting the minimum time for the incident creation with the constant - Sunday, December 12, 2021 12:12:12 PM (GMT)
"asset_ids": [
"0xFA6A68CC9A9415963DE841048A3BE929"
], # (Paste your value) List of assets for which we get the incidents
"priorities": ["HIGH"],
"resolutions": ["True positive"],
"response_statuses": ["Confirmed"],
"response_types": ["hash"],
"statuses": ["Closed"],
# Parameters for providing results
"markdown_to_html": True, # Results in HTML format. If the value is "False" , the results are in Markdown format.
"sort": "creation_time:asc", # Sort results by date and time of incident creation. In case of page-by-page retrieval of results, it is necessary to specify a field for sorting that will not change from query to query, for example, creation_time (do not specify fields whose values are constantly changing, for example, the "update_time" field; this may lead to incorrect results).
"page_size": 100, # Incidents per page - 100
"page": 1, # Get the first page of search results
}
result = requests.post(
url=f"{API_URL}/{CLIENT_ID}/incidents/list", json=request_body, headers=headers, verify=VERIFY_CERT_PATH
)
print(result.json())
###################################################
# Get incident details
###################################################
request_body = {
"incident_id": "60gWG4UBMUGN-LWUuv1m", # (Paste your value) Incident ID
"markdown_to_html": True, # Results in HTML format. If the value is "False", the results are in Markdown format.
}
result = requests.post(
url=f"{API_URL}/{CLIENT_ID}/incidents/details", json=request_body, headers=headers, verify=VERIFY_CERT_PATH
)
print(result.json())
###################################################
# Get a list of responses for the incident
###################################################
request_body = {
"incident_id": "60gWG4UBMUGN-LWUuv1m", # (Paste your value) Incident ID
"page_size": 10, # Responses per page - 10
"page": 1, # Get the first page of search results
}
result = requests.post(
url=f"{API_URL}/{CLIENT_ID}/responses/list", json=request_body, headers=headers, verify=VERIFY_CERT_PATH
)
print(result.json())
###################################################
# Confirm the response
###################################################
request_body = {
"response_id": "CEgYG4UBMUGN-LWULP7W", # (Paste your value) Response ID
"comment": "comment_text", # Comment to be added to the response
"status": "Confirmed", # New response status - "Confirmed"
}
result = requests.post(
url=f"{API_URL}/{CLIENT_ID}/response/update", json=request_body, headers=headers, verify=VERIFY_CERT_PATH
)
print(result.json())
###################################################
# Decline the response
###################################################
request_body = {
"response_id": "CEgYG4UBMUGN-LWULP7W", # (Paste your value) Response ID
"comment": "comment_text", # Comment to be added to the response
"status": "Declined", # New response status - "Declined"
}
result = requests.post(
url=f"{API_URL}/{CLIENT_ID}/response/update", json=request_body, headers=headers, verify=VERIFY_CERT_PATH
)
print(result.json())
###################################################
# Update response list
###################################################
request_body = {
"responses_ids": [
"CEgYG4UBMUGN-LWULP7W",
"2ESl6IgB4cAOUyXBb5IB",
], # (Paste your values) Response IDs
"comment": "comment_text", # Comment to be added to the responses
"status": "Confirmed", # New responses status - "Confirmed"
}
result = requests.post(
url=f"{API_URL}/{CLIENT_ID}/responses/update", json=request_body, headers=headers, verify=VERIFY_CERT_PATH
)
print(result.json())
|