API GMP not working

Hello everyone,

I’m developping an API to parse Greenbone reports, extracts metrics and expose them on an endpoint. Prometheus then scrapes the metrics and generate dashboards.

The authentication works, i’m getting all the reports, and i can generate an XMLTree and then with xpath extratc the metrics. But I’m stucked with this error message :

openvas-metrics | WARNING:openvas_gvmd_exporter:Session invalide ou erreur : Invalid State
openvas-metrics | ERROR:openvas_gvmd_exporter:Error fetching/parsing reports

Here’s the code :

#!/usr/bin/env python3
import os
import time
import logging
from typing import Optional
from lxml import etree
from lxml.etree import \_Element as Element
from gvm.connections import UnixSocketConnection
from gvm.protocols.gmp import GMPv227 as GMP
from prometheus_client import start_http_server, REGISTRY
from prometheus_client.core import GaugeMetricFamily

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(“openvas_gvmd_exporter”)

USER = os.environ.get(“OPENVAS_USER”, “xxxxxxxxxxxxxxx”)
PASS = os.environ.get(“OPENVAS_PASS”, “xxxxxxxxxxxxxxxx”)
GVMD_SOCKET = os.environ.get(“GVMD_SOCKET”, “/run/gvmd/gvmd.sock”)
PROM_PORT = int(os.environ.get(“OPENVAS_EXPORTER_PORT”, “8000”))
RECONNECT_DELAY = int(os.environ.get(“OPENVAS_RECONNECT_DELAY”, “10”))
MAX_RECONNECT_ATTEMPTS = int(os.environ.get(“OPENVAS_MAX_RECONNECT_ATTEMPTS”, “3”))

class GvmdCollector:
  def __ini__(self, socket_path: str, user: str, password: str):
    self.socket_path = socket_path
    self.user = user
    self.password = password
    self._connect()

  def _connect(self): 
    conn = UnixSocketConnection(path=self.socket_path) 
    self.gvmd = GMP(conn) 
    self.gvmd.authenticate(self.user, self.password) 
    logger.info("Authenticated to gvmd via %s", self.socket_path)

  def _ensure_connected(self): 
    if self.gvmd is None: 
        self._connect() 
        try: 
            _ = self.gvmd.get_version() 
        except Exception: 
            logger.warning("gvmd session invalid, trying to reconnect...") 
        try: 
            self._connect() 
        except Exception: 
            logger.exception("Reconnection failed during ensure_connected") 
            self.gvmd = None 

  def collect(self): 
    up_metric = GaugeMetricFamily("openvas_up", "Is gvmd reachable (1 = yes, 0 = no)") 
    reports_metric = GaugeMetricFamily("openvas_reports_total", "Total number of unique reports available from gvmd") 
    active_hosts_metric = GaugeMetricFamily("openvas_active_hosts_total", "Number of unique active hosts found in reports") 
    high_vulns_metric = GaugeMetricFamily("openvas_high_vulns_total", "Total number of high or critical vulnerabilities (CVSS >= 7.0)") 
    hosts_with_high_metric = GaugeMetricFamily("openvas_hosts_with_high_vulns_total", "Number of unique hosts that have at least one high or critical vulnerability") 

    is_up = 0.0 
    total_reports = 0 
    total_hosts = 0 
    unique_hostnames = [] 
    high_vulns = 0 
    hosts_with_high = set() 

    try: 
        self._ensure_connected() 
        if self.gvmd is not None:
            try:
                self._ensure_connected()
                reports_xml = self.gvmd.get_reports(details=True, ignore_pagination=True)
                if isinstance(reports_xml, (bytes, str)): 
                    root = etree.fromstring(reports_xml) 
                else: 
                    root = reports_xml
            except Exception as e:
                logger.warning("Session invalide ou erreur : %s", e)
                self._connect()
                try:
                    reports_xml = self.gvmd.get_reports(details=True, ignore_pagination=True)
                except Exception as e:
                    logger.exception("Impossible de recupere les rapports %s", e)
                    return 
            #-- Trouve tous les attributs id de tous les éléments <report>
            #-- converti en str, supprime les doublons et vides
            ids = root.xpath(".//report/@id") 
            unique_ids = {str(x).strip() for x in ids if x and str(x).strip() != ""} 
            total_reports = len(unique_ids) 
            
            #-- récupère tous les <hostname> du XML, extrait les valeurs texte 
            #-- elimine les vides, supprime les doublons, les tris, compte les hotes uniques
            hostnames = [hn.text.strip() for hn in root.findall('.//hostname') if hn.text and hn.text.strip()] 
            unique_hostnames = sorted(set(hostnames)) 
            total_hosts = len(unique_hostnames) 
            
            # -- trouve tous les noeuds type report, result, nvt 
            items = root.xpath(".//report_item | .//result | .//nvt") 

        #-- extrait score criticite vulns
        #-- recherche dans les attributs du xml et dans les sous-elements
        def get_num(it): 
            #attributs
            for attr in ("severity", "cvss_base", "cvss", "risk"): 
                v = it.get(attr) 
                if v: 
                    try:
                        val = float(v)
                        logger.debug("get_num trouvé %s=%s => %f", attr, v, val)
                        #return float(v) 
                        return val
                    except Exception: 
                        pass 
            #elements
            for p in (".//cvss_base/text()", ".//cvss/text()", ".//severity/text()"): 
                found = it.xpath(p) 
                if found: 
                    try: 
                        logger.debug("get_text trouvé %s=%s", attr, v)
                        return float(str(found[0])) 
                    except Exception: 
                        pass 
            return None 

        #-- extrait le niveau textuel de severite
        #-- recherche dans les attributs du xml et dans les sous-elements
        def get_text(it):
            #attributs
            for attr in ("severity_text", "risk_factor", "threat"): 
                v = it.get(attr) 
                if v: 
                    return str(v).strip().lower() 
            #elements
            for p in (".//risk_factor/text()", ".//threat/text()", ".//severity/text()"): 
                    found = it.xpath(p) 
                    if found: 
                        return str(found[0]).strip().lower() 
            return None 
        
        for it in items: 
            anc = it.xpath("ancestor::report_host[1]") or [] 
            host = anc[0].get("name") if anc else (it.get("host") or it.get("hostname") or "") 
            host = (host or "").strip() 
                
            num = get_num(it) 
            txt = get_text(it) 
            logger.debug("Item pour host %s : num=%s, text=%s", host, num, txt)
            is_high = (num is not None and num >= 7.0) or (txt in ("high", "critical")) 
            logger.debug("is_high = %s", is_high)
                
            if is_high: 
                high_vulns += 1 
                if host: 
                    hosts_with_high.add(host) 
                        
        is_up = 1.0 
            
    except Exception: 
        logger.exception("Error fetching/parsing reports") 

    logger.info("Found %d unique hosts: %s", total_hosts, unique_hostnames) 
    up_metric.add_metric([], is_up) 
    reports_metric.add_metric([], float(total_reports)) 
    active_hosts_metric.add_metric([], float(total_hosts)) 
    high_vulns_metric.add_metric([], float(high_vulns)) 
    hosts_with_high_metric.add_metric([], float(len(hosts_with_high))) 

    # Yield les métriques à Prometheus 
    yield up_metric 
    yield reports_metric 
    yield active_hosts_metric 
    yield high_vulns_metric 
    yield hosts_with_high_metric 

def main():
  collector = GvmdCollector(socket_path=GVMD_SOCKET, user=USER, password=PASS)
  start_http_server(PROM_PORT)
  REGISTRY.register(collector)
  try:
    while True:
      time.sleep(600)
  except KeyboardInterrupt:
    logger.info(“Terminated by user”)

if __name__ == “__main__”:
  main()