tis-wazuh-plugin-import-from-wazuh
10-1
Plugin for importing wazuh breaches in wapt data audit
906 downloads
See build result See VirusTotal scan
Description
- package : tis-wazuh-plugin-import-from-wazuh
- name : Plugin Import Wazuh
- version : 10-1
- categories :
- maintainer : Amélie LE JEUNE,Tranquil IT
- installed_size :
- editor :
- licence :
- signature_date : 2023-10-17T11:04:56.496599
- size : 24.27 Ko
- locale :
- target_os : all
- impacted_process :
- architecture : all
control
package : tis-wazuh-plugin-import-from-wazuh
version : 10-1
architecture : all
section : base
priority : optional
name : Plugin Import Wazuh
categories :
maintainer : Amélie LE JEUNE,Tranquil IT
description : Plugin for importing wazuh breaches in wapt data audit
depends :
conflicts :
maturity : PROD
locale :
target_os : all
min_wapt_version :
sources :
installed_size :
impacted_process :
description_fr :
description_pl :
description_de :
description_es :
description_pt :
description_it :
description_nl :
description_ru :
audit_schedule : 1h
editor :
keywords :
licence :
homepage :
package_uuid : 2f93d7cc-af97-4da6-8e76-86f90944c90a
valid_from :
valid_until :
forced_install_on :
changelog :
min_os_version :
max_os_version :
icon_sha256sum : ab03226aa75b0f33fcb7fe327fa164535e5410d0b5cbe02fdd5e124a017f9cb9
signer : Tranquil IT
signer_fingerprint: 8c5127a75392be9cc9afd0dbae1222a673072c308c14d88ab246e23832e8c6bb
signature : P3oyFFHePz7jIrZ9VivERVITqJqq2QDt3dC9D9Sh2DdE0IPQbC6Jg6FiiR7gW177MCKkDxC4N8PWpRWeGeey34KQDAqkhR/x58PLag+eOw1Wl0Y2VeZxPGjmRc8ofNKoSikjabXxj96SUPkTpwcYLxQ/WMgFUuNlBF5pzl31lCO0mXah0FuQ/AcE8eiVAcv+2pQMjzJ8SUedSyiwdhmq+33azHvsm5An7KYyCIgtfJAVDNGBGPGIyVCgCMY5sscX6wG/ETG0KWHz55usR8d3h6rzhnzpuW5lhEQzfgBCNPNaOLXiKxn+d8QTugJiMck6++yuZioM0Na7TtCgQg8fiA==
signature_date : 2023-10-17T11:04:56.496599
signed_attributes : package,version,architecture,section,priority,name,categories,maintainer,description,depends,conflicts,maturity,locale,target_os,min_wapt_version,sources,installed_size,impacted_process,description_fr,description_pl,description_de,description_es,description_pt,description_it,description_nl,description_ru,audit_schedule,editor,keywords,licence,homepage,package_uuid,valid_from,valid_until,forced_install_on,changelog,min_os_version,max_os_version,icon_sha256sum,signer,signer_fingerprint,signature_date,signed_attributes
Setup.py
# -*- coding: utf-8 -*-
from setuphelpers import *
from configparser import ConfigParser
import json
import requests
import urllib3
from base64 import b64encode
import time
import datetime
plugin_inifile = "wazuh_api.ini"
wazuh_template_severities_html = "host_audit_wazuh_wazuh_severities.html"
wazuh_template_sca_html = "host_audit_wazuh_wazuh_sca.html"
def list_to_dict(newkey=None, listconvert=None):
newdict = {}
for p in listconvert:
newdict[p[newkey]] = p
return newdict
# def sort_dict_by_cvss3_score(vul_dict):
# sorted_dict = {}
# for k, v in sorted(vul_dict.items(), key=lambda x: x[1]["cvss3_score"])[::-1]:
# # print("%s: %s" % (k, v))
# sorted_dict[k] = v
# return sorted_dict
def install():
wapt_template_dir = makepath(WAPT.wapt_base_dir, "templates")
if not isfile(makepath(WAPT.private_dir, plugin_inifile)):
filecopyto(plugin_inifile, makepath(WAPT.private_dir, plugin_inifile))
if not isfile(makepath(WAPT.private_dir, "wapt_api.ini")):
filecopyto("wapt_api.ini", makepath(WAPT.private_dir, "wapt_api.ini"))
filecopyto(wazuh_template_severities_html, makepath(wapt_template_dir, wazuh_template_severities_html))
filecopyto(wazuh_template_sca_html, makepath(wapt_template_dir, wazuh_template_sca_html))
def audit():
CONFWAPT = ConfigParser()
CONFWAPT.read(makepath(WAPT.private_dir, "wapt_api.ini"))
username_wapt = CONFWAPT.get("wapt", "wapt_username")
password_wapt = CONFWAPT.get("wapt", "wapt_password")
WAPT.waptserver.post('api/v3/login' ,data=json.dumps({'user': username_wapt, 'password': password_wapt}))
CONFWAZUH = ConfigParser()
CONFWAZUH.read(makepath(WAPT.private_dir, plugin_inifile))
wazuh_url = CONFWAZUH.get("wazuh", "wazuh_url")
CLIENT = CWAZUHApi(wazuh_url, CONFWAZUH.get("wazuh", "wazuh_user"), CONFWAZUH.get("wazuh", "wazuh_password"), verify_ssl=False)
dict_hostname_uuid = {}
for pc in WAPT.waptserver.get("api/v3/hosts?&limit=1000000")["result"]:
dict_hostname_uuid[pc["computer_fqdn"].lower()] = pc["uuid"]
dict_hostname_uuid[pc["computer_fqdn"].lower().split('.')[0]] = pc["uuid"]
list_error = []
waptdata = []
wazuh_vulnerabilities = CLIENT.get_severities()
wazuh_sca = CLIENT.get_sca()
for entry in wazuh_vulnerabilities.keys():
# print(entry)
if entry.lower() in dict_hostname_uuid:
try:
list_patch = []
resultpc = {}
for patch in wazuh_vulnerabilities[entry]:
list_patch.append(patch)
resultpc["hostname"] = entry
resultpc["cve_announcements_count"] = len(list_patch)
app_patch = {}
for cve_name in list_patch :
if cve_name["status"] != "VALID":
continue
#if cve_name["Package unfixed"] == 'Package unfixed':
# continue
patch_name = cve_name["title"].split('affects ',1)[1]
app_patch[patch_name]= None
resultpc["list_patch"] = list(app_patch)
if len(list_patch) > 0:
resultpc["status"] = "server_vulnerable"
else:
resultpc["status"] = "server_compliant"
resultpc["wazuh_url"] = wazuh_url
resultpc["cve_announcements"] = list_to_dict(newkey="cve", listconvert=[cve for cve in list_patch if cve["status"] == "VALID"])
# resultpc["cve_announcements"] = sort_dict_by_cvss3_score(list_to_dict(newkey="cve", listconvert=[cve for cve in list_patch if cve["status"] == "VALID"]))
resultpc["security_issues_count"] = len(
list_to_dict(
newkey="cve",
listconvert=[
cve for cve in list_patch if cve["status"] == "VALID" and (cve["severity"] == "Critical" or cve["severity"] == "High")
],
)
)
waptdata.append(
{
"host_id": dict_hostname_uuid[str(entry).lower()],
"value_id": int(time.monotonic() * 1000),
"value_date": datetime2isodate(datetime.datetime.utcnow()),
"value_section": "wazuh",
"value_key": "wazuh_severities",
"value": resultpc,
"expiration_date": datetime2isodate(datetime.datetime.utcnow() + datetime.timedelta(minutes=1440)),
}
)
print("Get %s from wazuh" % entry.lower())
time.sleep(0.0000001)
print(WAPT.waptserver.post("api/v3/update_hosts_audit_data", data=json.dumps(waptdata)))
except Exception as e:
list_error.append("Error %s %s" % (entry.lower(), str(e)))
for entry in wazuh_sca.keys():
if entry.lower() in dict_hostname_uuid:
try:
list_advices = []
resultpc = {}
for advice in wazuh_sca[entry]:
list_advices.append(advice)
resultpc["hostname"] = entry
resultpc["improvement_advice_count"] = len(list_advices)
if len(list_advices) > 0:
resultpc["status"] = "server_vulnerable"
else:
resultpc["status"] = "server_compliant"
resultpc["list_advice"] = [advice_entry["title"] for advice_entry in list_advices if advice_entry["result"] == "failed"]
resultpc["wazuh_url"] = wazuh_url
resultpc["detail_advices"] = list_to_dict(
newkey="id", listconvert=[advice_entry for advice_entry in list_advices if advice_entry["result"] == "failed"]
)
waptdata.append(
{
"host_id": dict_hostname_uuid[str(entry).lower()],
"value_id": int(time.monotonic() * 1000),
"value_date": datetime2isodate(datetime.datetime.utcnow()),
"value_section": "wazuh",
"value_key": "wazuh_sca",
"value": resultpc,
"expiration_date": datetime2isodate(datetime.datetime.utcnow() + datetime.timedelta(minutes=1440)),
}
)
print("Get %s from wazuh" % entry.lower())
time.sleep(0.0000001)
print(WAPT.waptserver.post("api/v3/update_hosts_audit_data", data=json.dumps(waptdata)))
except Exception as e:
list_error.append("Error %s %s" % (entry.lower(), str(e)))
if list_error:
print(" \n".join(list_error))
return "ERROR"
else:
return "OK"
def uninstall():
wapt_template_dir = makepath(WAPT.wapt_base_dir, "templates")
remove_file(makepath(WAPT.private_dir, plugin_inifile))
remove_file(makepath(WAPT.private_dir, "wapt_api.ini"))
remove_file(makepath(wapt_template_dir, wazuh_template_severities_html))
remove_file(makepath(wapt_template_dir, wazuh_template_sca_html))
class CWAZUHApi:
"""Class used to communicate with the WAZUH API"""
def __init__(self, api_url=None, api_user=None, api_password=None, verify_ssl=False):
self.verify_ssl = verify_ssl
# self.logger = logging.getLogger(self.__class__.__name__)
try:
self.api_url = api_url
self.api_user = api_user
self.api_password = api_password
except KeyError as error:
error("CWAZUHApi is missing one of its argument. You can use the environnement variable %s.")
# Disable insecure https warnings (for self-signed SSL certificates)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Configuration
self.protocol = "https"
self.port = 55000
login_endpoint = "security/user/authenticate"
login_url = f"{self.protocol}://{api_url}:{self.port}/{login_endpoint}"
basic_auth = f"{api_user}:{api_password}".encode()
login_headers = {"Content-Type": "application/json", "Authorization": f"Basic {b64encode(basic_auth).decode()}"}
response = requests.post(login_url, headers=login_headers, verify=False)
self.token = json.loads(response.content.decode())["data"]["token"]
def get_severities(self):
dict_summary_info = {}
dict_agent_info = {}
# New authorization header with the JWT token we got
requests_headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.token}"}
all_hosts = requests.get(f"{self.protocol}://{self.api_url}:{self.port}/agents?pretty=true", headers=requests_headers, verify=False)
# print(json.loads(all_hosts.text))
for entry in json.loads(all_hosts.text)["data"]["affected_items"]:
dict_summary_info[entry["id"]] = entry["name"]
for agent in dict_summary_info.keys():
agent_all_info = requests.get(
f"{self.protocol}://{self.api_url}:{self.port}/vulnerability/{agent}?pretty=true", headers=requests_headers, verify=False
)
agent_info = json.loads(agent_all_info.text)["data"]["affected_items"]
dict_agent_info[dict_summary_info[agent]] = agent_info
return dict_agent_info
def get_sca(self):
dict_summary_info = {}
dict_agent_info = {}
# New authorization header with the JWT token we got
requests_headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.token}"}
all_hosts = requests.get(f"{self.protocol}://{self.api_url}:{self.port}/agents?pretty=true", headers=requests_headers, verify=False)
for entry in json.loads(all_hosts.text)["data"]["affected_items"]:
dict_summary_info[entry["id"]] = entry["name"]
for agent in dict_summary_info.keys():
agent_all_info = requests.get(
f"{self.protocol}://{self.api_url}:{self.port}/sca/{agent}?pretty=true", headers=requests_headers, verify=False
)
agent_base_info = json.loads(agent_all_info.text)["data"]["affected_items"]
if agent_base_info :
agent_all_info_sca = requests.get(
f"{self.protocol}://{self.api_url}:{self.port}/sca/{agent}/checks/{agent_base_info[0]['policy_id']}?pretty=true",
headers=requests_headers,
verify=False,
)
agent_info = json.loads(agent_all_info_sca.text)["data"]["affected_items"]
else:
agent_info=[]
dict_agent_info[dict_summary_info[agent]] = agent_info
return dict_agent_info
79cf3242228b9c171eb4670a9a415937bc065a74b663e2d61768e28494b4fe20 : setup.py
06b2f22ea52ae6175a63b6668f5cd9c311710d91952bf479ee52d8025f9c6eea : wapt_api.ini
e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 : update_package.py
a7e8856533f1301e7fa83dec82b70c0e595dadaef91601275505729fad2b5d86 : host_audit_wazuh_wazuh_severities.html
93760f97adba047cf5eaee726956e770518791435c44d4434f0c1e18ce4eb66a : wazuh_api.ini
ab03226aa75b0f33fcb7fe327fa164535e5410d0b5cbe02fdd5e124a017f9cb9 : WAPT/icon.png
a5a97261381e1d0ad46ee15916abec9c2631d0201f5cc50ceb0197a165a0bbbf : WAPT/certificate.crt
e214a9848569c5d18582a4a4d65aa433ce59733667b4be4ce6ad4d93bc8c3147 : luti.json
41c956f320ffd902ad317e755fc30dcf3bf3fa42522801bc9cc906150bee28a1 : host_audit_wazuh_wazuh_sca.html
50516201131f68b266f764dd834e90c6444c55e50cb54ebcc5d0f8e23c8a9cc9 : WAPT/control