Zum Hauptinhalt springen
Version: 1.0.0

Code-Beispiele

Produktionsbereite Code-Beispiele für die Integration mit der Cert-IX Scan-API. Jedes Beispiel enthält Wiederholungslogik, Fehlerbehandlung und Webhook-Signaturverifizierung.

cURL-Beispiele

Scan einreichen

curl -X POST https://api.cert-ix.com/scan-api/api/v1/scans \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"target": "example.com",
"scanType": "nmap",
"name": "Netzwerk-Audit",
"priority": "normal",
"options": {
"ports": "1-1024",
"timing": "T3",
"serviceDetection": true
}
}'

Status prüfen

curl -X GET "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID?scanType=nmap" \
-H "X-API-Key: $CERTIX_API_KEY"

Ergebnisse abrufen

curl -X GET "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID/results?scanType=nmap" \
-H "X-API-Key: $CERTIX_API_KEY"

Webhook erstellen

curl -X POST https://api.cert-ix.com/scan-api/api/v1/webhooks \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "Scan-Benachrichtigungen",
"url": "https://ihr-server.example.com/webhooks/certix",
"events": ["scan.completed", "scan.failed"]
}'

Scan-Vorlage erstellen

curl -X POST https://api.cert-ix.com/scan-api/api/v1/scan-templates \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "CI Container-Scan",
"scanType": "trivy",
"priority": "high",
"options": {
"scanners": ["vuln", "misconfig", "secret"],
"severity": ["CRITICAL", "HIGH"]
}
}'

Scan aus Vorlage starten

curl -X POST "https://api.cert-ix.com/scan-api/api/v1/scan-templates/$TEMPLATE_ID/launch" \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{"target": "registry.example.com/meineapp:latest"}'

Python-Client

Installation

pip install requests

Vollständige Client-Klasse

"""
Cert-IX Scan-API Python-Client
Produktionsbereit mit Wiederholungslogik, Fehlerbehandlung und Webhook-Verifizierung.
"""

import hashlib
import hmac
import os
import time
from typing import Any, Dict, List, Optional

import requests


class CertIXScanClient:
"""Cert-IX Scan-API Client mit integrierter Wiederholungslogik und Fehlerbehandlung."""

def __init__(self, api_key: Optional[str] = None, base_url: Optional[str] = None):
self.api_key = api_key or os.environ.get("CERTIX_API_KEY")
if not self.api_key:
raise ValueError("API-Schlüssel erforderlich. Setzen Sie CERTIX_API_KEY oder übergeben Sie api_key.")
self.base_url = (base_url or os.environ.get("CERTIX_BASE_URL")
or "https://api.cert-ix.com/scan-api/api/v1")
self.session = requests.Session()
self.session.headers.update({"X-API-Key": self.api_key, "Content-Type": "application/json"})
self.max_retries = 3

def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
"""HTTP-Anfrage mit automatischer Wiederholung bei transienten Fehlern ausführen."""
url = f"{self.base_url}{path}"
for attempt in range(self.max_retries):
response = self.session.request(method, url, **kwargs)
if response.status_code < 400:
return response.json()
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
time.sleep(retry_after)
continue
if response.status_code >= 500:
time.sleep(2 ** attempt)
continue
data = response.json()
raise CertIXAPIError(response.status_code, data.get("code", "UNKNOWN"), data.get("error", "Unbekannter Fehler"))
raise CertIXAPIError(429, "RATE_LIMIT_EXCEEDED", "Maximale Anzahl an Versuchen überschritten")

def create_scan(self, target: str, scan_type: str, name: Optional[str] = None,
priority: str = "normal", options: Optional[Dict] = None,
tags: Optional[List[str]] = None) -> Dict[str, Any]:
"""Neue Scan-Anfrage einreichen."""
payload = {"target": target, "scanType": scan_type, "priority": priority}
if name: payload["name"] = name
if options: payload["options"] = options
if tags: payload["tags"] = tags
return self._request("POST", "/scans", json=payload)["data"]

def get_scan(self, scan_id: str, scan_type: str = "nmap") -> Dict[str, Any]:
"""Scan-Status und -Details abrufen."""
return self._request("GET", f"/scans/{scan_id}", params={"scanType": scan_type})["data"]

def get_scan_results(self, scan_id: str, scan_type: str = "nmap") -> Any:
"""Ergebnisse abrufen (Scan muss abgeschlossen sein)."""
return self._request("GET", f"/scans/{scan_id}/results", params={"scanType": scan_type})["data"]

def wait_for_scan(self, scan_id: str, scan_type: str = "nmap", timeout: int = 3600) -> Dict[str, Any]:
"""Abfragen bis der Scan einen Terminalzustand erreicht."""
start = time.time()
terminal = {"completed", "failed", "cancelled"}
while time.time() - start < timeout:
scan = self.get_scan(scan_id, scan_type)
if scan["status"] in terminal:
return scan
elapsed = time.time() - start
interval = 5 if elapsed < 120 else 15 if elapsed < 600 else 30
time.sleep(interval)
raise TimeoutError(f"Scan {scan_id} wurde nicht innerhalb von {timeout}s abgeschlossen")

@staticmethod
def verify_webhook_signature(body: bytes, signature_header: str, secret: str) -> bool:
"""HMAC-SHA256-Webhook-Signatur verifizieren."""
if not signature_header.startswith("sha256="):
return False
received = signature_header[7:]
expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(received, expected)


class CertIXAPIError(Exception):
def __init__(self, status_code: int, code: str, message: str):
self.status_code = status_code
self.code = code
self.message = message
super().__init__(f"[{status_code}] {code}: {message}")

Verwendungsbeispiel

client = CertIXScanClient()

# Scan einreichen und auf Ergebnisse warten
scan = client.create_scan(target="example.com", scan_type="nmap",
name="Python SDK Test", options={"ports": "1-1024", "timing": "T3"})
print(f"Scan erstellt: {scan['id']}")

final = client.wait_for_scan(scan["id"], "nmap")
if final["status"] == "completed":
results = client.get_scan_results(scan["id"], "nmap")
print(f"Ergebnisse: {results}")

Go-Client

package certix

import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
)

// Client ist der Cert-IX Scan-API Client.
type Client struct {
apiKey string
baseURL string
httpClient *http.Client
maxRetries int
}

// NewClient erstellt einen neuen Cert-IX API Client.
func NewClient(apiKey, baseURL string) (*Client, error) {
if apiKey == "" {
apiKey = os.Getenv("CERTIX_API_KEY")
}
if apiKey == "" {
return nil, fmt.Errorf("API-Schlüssel erforderlich: setzen Sie CERTIX_API_KEY")
}
if baseURL == "" {
baseURL = "https://api.cert-ix.com/scan-api/api/v1"
}
return &Client{
apiKey: apiKey, baseURL: strings.TrimRight(baseURL, "/"),
httpClient: &http.Client{Timeout: 30 * time.Second}, maxRetries: 3,
}, nil
}

// ScanRequest ist der Anfragekörper zum Erstellen eines Scans.
type ScanRequest struct {
Target string `json:"target"`
ScanType string `json:"scanType"`
Name string `json:"name,omitempty"`
Priority string `json:"priority,omitempty"`
Options map[string]interface{} `json:"options,omitempty"`
}

// ScanResponse ist die Scan-Status-Antwort.
type ScanResponse struct {
ID string `json:"id"`
ScanType string `json:"scanType"`
Status string `json:"status"`
Progress int `json:"progress"`
Error string `json:"error,omitempty"`
}

// WaitForScan fragt ab bis der Scan einen Terminalzustand erreicht.
func (c *Client) WaitForScan(scanID, scanType string, timeout time.Duration) (*ScanResponse, error) {
start := time.Now()
terminal := map[string]bool{"completed": true, "failed": true, "cancelled": true}
for time.Since(start) < timeout {
scan, err := c.GetScan(scanID, scanType)
if err != nil { return nil, err }
if terminal[scan.Status] { return scan, nil }
elapsed := time.Since(start)
var interval time.Duration
switch {
case elapsed < 2*time.Minute: interval = 5 * time.Second
case elapsed < 10*time.Minute: interval = 15 * time.Second
default: interval = 30 * time.Second
}
time.Sleep(interval)
}
return nil, fmt.Errorf("scan %s wurde nicht innerhalb von %v abgeschlossen", scanID, timeout)
}

// VerifyWebhookSignature validiert die HMAC-SHA256-Signatur eines Webhooks.
func VerifyWebhookSignature(body []byte, signatureHeader, secret string) bool {
if !strings.HasPrefix(signatureHeader, "sha256=") { return false }
received := signatureHeader[7:]
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(body)
expected := hex.EncodeToString(mac.Sum(nil))
return hmac.Equal([]byte(received), []byte(expected))
}

Node.js-Client

const crypto = require("crypto");

class CertIXScanClient {
constructor(apiKey, baseUrl) {
this.apiKey = apiKey || process.env.CERTIX_API_KEY;
if (!this.apiKey) throw new Error("API-Schlüssel erforderlich. Setzen Sie CERTIX_API_KEY.");
this.baseUrl = baseUrl || "https://api.cert-ix.com/scan-api/api/v1";
this.maxRetries = 3;
}

async _request(method, path, { body, params } = {}) {
let url = `${this.baseUrl}${path}`;
if (params) url += `?${new URLSearchParams(params).toString()}`;
for (let attempt = 0; attempt < this.maxRetries; attempt++) {
const options = {
method,
headers: { "X-API-Key": this.apiKey, "Content-Type": "application/json" },
};
if (body) options.body = JSON.stringify(body);
const response = await fetch(url, options);
if (response.status < 400) return response.json();
if (response.status === 429) {
const retryAfter = parseInt(response.headers.get("Retry-After") || 2 ** attempt, 10);
await new Promise((r) => setTimeout(r, retryAfter * 1000));
continue;
}
if (response.status >= 500) {
await new Promise((r) => setTimeout(r, 2 ** attempt * 1000));
continue;
}
const data = await response.json();
throw new Error(`[${response.status}] ${data.code}: ${data.error}`);
}
throw new Error("[429] RATE_LIMIT_EXCEEDED: Maximale Anzahl an Versuchen überschritten");
}

async createScan({ target, scanType, name, priority = "normal", options, tags }) {
const payload = { target, scanType, priority };
if (name) payload.name = name;
if (options) payload.options = options;
if (tags) payload.tags = tags;
const result = await this._request("POST", "/scans", { body: payload });
return result.data;
}

async getScan(scanId, scanType = "nmap") {
const result = await this._request("GET", `/scans/${scanId}`, { params: { scanType } });
return result.data;
}

async waitForScan(scanId, scanType = "nmap", timeoutMs = 3600000) {
const start = Date.now();
const terminal = new Set(["completed", "failed", "cancelled"]);
while (Date.now() - start < timeoutMs) {
const scan = await this.getScan(scanId, scanType);
if (terminal.has(scan.status)) return scan;
const elapsed = Date.now() - start;
const interval = elapsed < 120_000 ? 5_000 : elapsed < 600_000 ? 15_000 : 30_000;
await new Promise((r) => setTimeout(r, interval));
}
throw new Error(`Scan ${scanId} wurde nicht innerhalb von ${timeoutMs}ms abgeschlossen`);
}

static verifyWebhookSignature(body, signatureHeader, secret) {
if (!signatureHeader.startsWith("sha256=")) return false;
const received = signatureHeader.slice(7);
const expected = crypto.createHmac("sha256", secret).update(body).digest("hex");
return crypto.timingSafeEqual(Buffer.from(received), Buffer.from(expected));
}
}

module.exports = { CertIXScanClient };

CI/CD-Integration — GitHub Actions

name: Sicherheitsscan
on:
push:
branches: [main]

jobs:
vulnerability-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Container-Image bauen
run: docker build -t meineapp:${{ github.sha }} .

- name: Cert-IX Scan starten
env:
CERTIX_API_KEY: ${{ secrets.CERTIX_API_KEY }}
run: |
SCAN_RESPONSE=$(curl -s -X POST "https://api.cert-ix.com/scan-api/api/v1/scans" \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d "{
\"target\": \"ghcr.io/${{ github.repository }}:${{ github.sha }}\",
\"scanType\": \"trivy\",
\"name\": \"CI-Scan - ${{ github.sha }}\",
\"priority\": \"high\",
\"options\": {
\"scanners\": [\"vuln\", \"misconfig\", \"secret\"],
\"severity\": [\"CRITICAL\", \"HIGH\"]
}
}")
echo "Antwort: $SCAN_RESPONSE"
SCAN_ID=$(echo "$SCAN_RESPONSE" | jq -r '.data.id')
echo "SCAN_ID=$SCAN_ID" >> $GITHUB_ENV

- name: Auf Ergebnisse warten
env:
CERTIX_API_KEY: ${{ secrets.CERTIX_API_KEY }}
run: |
for i in $(seq 1 60); do
STATUS=$(curl -s "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID?scanType=trivy" \
-H "X-API-Key: $CERTIX_API_KEY" | jq -r '.data.status')
echo "Status: $STATUS"
[ "$STATUS" = "completed" ] && break
[ "$STATUS" = "failed" ] && exit 1
sleep 10
done

Nächste Schritte: