Hello everyone,
I’m developping an API to parse Greenbone reports, extracts metrics and expose them on an endpoint. Prometheus then scrapes the metrics and generate dashboards.
The authentication works, i’m getting all the reports, and i can generate an XMLTree and then with xpath extratc the metrics. But I’m stucked with this error message :
openvas-metrics | WARNING:openvas_gvmd_exporter:Session invalide ou erreur : Invalid State
openvas-metrics | ERROR:openvas_gvmd_exporter:Error fetching/parsing reports
Here’s the code :
#!/usr/bin/env python3
import os
import time
import logging
from typing import Optional
from lxml import etree
from lxml.etree import \_Element as Element
from gvm.connections import UnixSocketConnection
from gvm.protocols.gmp import GMPv227 as GMP
from prometheus_client import start_http_server, REGISTRY
from prometheus_client.core import GaugeMetricFamily
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(“openvas_gvmd_exporter”)
USER = os.environ.get(“OPENVAS_USER”, “xxxxxxxxxxxxxxx”)
PASS = os.environ.get(“OPENVAS_PASS”, “xxxxxxxxxxxxxxxx”)
GVMD_SOCKET = os.environ.get(“GVMD_SOCKET”, “/run/gvmd/gvmd.sock”)
PROM_PORT = int(os.environ.get(“OPENVAS_EXPORTER_PORT”, “8000”))
RECONNECT_DELAY = int(os.environ.get(“OPENVAS_RECONNECT_DELAY”, “10”))
MAX_RECONNECT_ATTEMPTS = int(os.environ.get(“OPENVAS_MAX_RECONNECT_ATTEMPTS”, “3”))
class GvmdCollector:
def __ini__(self, socket_path: str, user: str, password: str):
self.socket_path = socket_path
self.user = user
self.password = password
self._connect()
def _connect(self):
conn = UnixSocketConnection(path=self.socket_path)
self.gvmd = GMP(conn)
self.gvmd.authenticate(self.user, self.password)
logger.info("Authenticated to gvmd via %s", self.socket_path)
def _ensure_connected(self):
if self.gvmd is None:
self._connect()
try:
_ = self.gvmd.get_version()
except Exception:
logger.warning("gvmd session invalid, trying to reconnect...")
try:
self._connect()
except Exception:
logger.exception("Reconnection failed during ensure_connected")
self.gvmd = None
def collect(self):
up_metric = GaugeMetricFamily("openvas_up", "Is gvmd reachable (1 = yes, 0 = no)")
reports_metric = GaugeMetricFamily("openvas_reports_total", "Total number of unique reports available from gvmd")
active_hosts_metric = GaugeMetricFamily("openvas_active_hosts_total", "Number of unique active hosts found in reports")
high_vulns_metric = GaugeMetricFamily("openvas_high_vulns_total", "Total number of high or critical vulnerabilities (CVSS >= 7.0)")
hosts_with_high_metric = GaugeMetricFamily("openvas_hosts_with_high_vulns_total", "Number of unique hosts that have at least one high or critical vulnerability")
is_up = 0.0
total_reports = 0
total_hosts = 0
unique_hostnames = []
high_vulns = 0
hosts_with_high = set()
try:
self._ensure_connected()
if self.gvmd is not None:
try:
self._ensure_connected()
reports_xml = self.gvmd.get_reports(details=True, ignore_pagination=True)
if isinstance(reports_xml, (bytes, str)):
root = etree.fromstring(reports_xml)
else:
root = reports_xml
except Exception as e:
logger.warning("Session invalide ou erreur : %s", e)
self._connect()
try:
reports_xml = self.gvmd.get_reports(details=True, ignore_pagination=True)
except Exception as e:
logger.exception("Impossible de recupere les rapports %s", e)
return
#-- Trouve tous les attributs id de tous les éléments <report>
#-- converti en str, supprime les doublons et vides
ids = root.xpath(".//report/@id")
unique_ids = {str(x).strip() for x in ids if x and str(x).strip() != ""}
total_reports = len(unique_ids)
#-- récupère tous les <hostname> du XML, extrait les valeurs texte
#-- elimine les vides, supprime les doublons, les tris, compte les hotes uniques
hostnames = [hn.text.strip() for hn in root.findall('.//hostname') if hn.text and hn.text.strip()]
unique_hostnames = sorted(set(hostnames))
total_hosts = len(unique_hostnames)
# -- trouve tous les noeuds type report, result, nvt
items = root.xpath(".//report_item | .//result | .//nvt")
#-- extrait score criticite vulns
#-- recherche dans les attributs du xml et dans les sous-elements
def get_num(it):
#attributs
for attr in ("severity", "cvss_base", "cvss", "risk"):
v = it.get(attr)
if v:
try:
val = float(v)
logger.debug("get_num trouvé %s=%s => %f", attr, v, val)
#return float(v)
return val
except Exception:
pass
#elements
for p in (".//cvss_base/text()", ".//cvss/text()", ".//severity/text()"):
found = it.xpath(p)
if found:
try:
logger.debug("get_text trouvé %s=%s", attr, v)
return float(str(found[0]))
except Exception:
pass
return None
#-- extrait le niveau textuel de severite
#-- recherche dans les attributs du xml et dans les sous-elements
def get_text(it):
#attributs
for attr in ("severity_text", "risk_factor", "threat"):
v = it.get(attr)
if v:
return str(v).strip().lower()
#elements
for p in (".//risk_factor/text()", ".//threat/text()", ".//severity/text()"):
found = it.xpath(p)
if found:
return str(found[0]).strip().lower()
return None
for it in items:
anc = it.xpath("ancestor::report_host[1]") or []
host = anc[0].get("name") if anc else (it.get("host") or it.get("hostname") or "")
host = (host or "").strip()
num = get_num(it)
txt = get_text(it)
logger.debug("Item pour host %s : num=%s, text=%s", host, num, txt)
is_high = (num is not None and num >= 7.0) or (txt in ("high", "critical"))
logger.debug("is_high = %s", is_high)
if is_high:
high_vulns += 1
if host:
hosts_with_high.add(host)
is_up = 1.0
except Exception:
logger.exception("Error fetching/parsing reports")
logger.info("Found %d unique hosts: %s", total_hosts, unique_hostnames)
up_metric.add_metric([], is_up)
reports_metric.add_metric([], float(total_reports))
active_hosts_metric.add_metric([], float(total_hosts))
high_vulns_metric.add_metric([], float(high_vulns))
hosts_with_high_metric.add_metric([], float(len(hosts_with_high)))
# Yield les métriques à Prometheus
yield up_metric
yield reports_metric
yield active_hosts_metric
yield high_vulns_metric
yield hosts_with_high_metric
def main():
collector = GvmdCollector(socket_path=GVMD_SOCKET, user=USER, password=PASS)
start_http_server(PROM_PORT)
REGISTRY.register(collector)
try:
while True:
time.sleep(600)
except KeyboardInterrupt:
logger.info(“Terminated by user”)
if __name__ == “__main__”:
main()