Aller au contenu principal
Version: 1.0.0

Exemples de code

Exemples de code prêts pour la production pour l'intégration avec l'API Scan Cert-IX. Chaque exemple inclut une logique de retry, la gestion des erreurs et la vérification de signature des webhooks.

Exemples cURL

Soumettre une analyse

curl -X POST https://api.cert-ix.com/scan-api/api/v1/scans \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"target": "example.com",
"scanType": "nmap",
"name": "Audit réseau",
"priority": "normal",
"options": {
"ports": "1-1024",
"timing": "T3",
"serviceDetection": true
}
}'

Vérifier le statut

curl -X GET "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID?scanType=nmap" \
-H "X-API-Key: $CERTIX_API_KEY"

Récupérer les résultats

curl -X GET "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID/results?scanType=nmap" \
-H "X-API-Key: $CERTIX_API_KEY"

Lister les analyses avec filtres

curl -X GET "https://api.cert-ix.com/scan-api/api/v1/scans?scanType=nmap&status=completed&page=1&limit=10" \
-H "X-API-Key: $CERTIX_API_KEY"

Annuler une analyse

curl -X POST "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID/cancel?scanType=nmap" \
-H "X-API-Key: $CERTIX_API_KEY"

Créer un webhook

curl -X POST https://api.cert-ix.com/scan-api/api/v1/webhooks \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "Notifications d analyse",
"url": "https://votre-serveur.example.com/webhooks/certix",
"events": ["scan.completed", "scan.failed"]
}'

Créer un modèle d'analyse

curl -X POST https://api.cert-ix.com/scan-api/api/v1/scan-templates \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "Scan CI Conteneurs",
"scanType": "trivy",
"priority": "high",
"options": {
"scanners": ["vuln", "misconfig", "secret"],
"severity": ["CRITICAL", "HIGH"]
},
"tags": ["ci", "conteneur"]
}'

Lancer une analyse depuis un modèle

curl -X POST "https://api.cert-ix.com/scan-api/api/v1/scan-templates/$TEMPLATE_ID/launch" \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"target": "registry.example.com/monapp:latest"
}'

Client Python

Installation

pip install requests

Classe client complète

"""
Client Python de l'API Scan Cert-IX
Prêt pour la production avec logique de retry, gestion d'erreurs et vérification de webhooks.
"""

import hashlib
import hmac
import os
import time
from typing import Any, Dict, List, Optional

import requests


class CertIXScanClient:
"""Client de l'API Scan Cert-IX avec retry et gestion d'erreurs intégrés."""

def __init__(self, api_key: Optional[str] = None, base_url: Optional[str] = None):
# Lire la clé API depuis le paramètre ou la variable d'environnement
self.api_key = api_key or os.environ.get("CERTIX_API_KEY")
if not self.api_key:
raise ValueError("Clé API requise. Définissez CERTIX_API_KEY ou passez api_key.")

self.base_url = (
base_url
or os.environ.get("CERTIX_BASE_URL")
or "https://api.cert-ix.com/scan-api/api/v1"
)
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": self.api_key,
"Content-Type": "application/json",
})
# Nombre maximum de tentatives pour les erreurs transitoires
self.max_retries = 3

def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
"""Exécuter une requête HTTP avec retry automatique sur les erreurs transitoires."""
url = f"{self.base_url}{path}"

for attempt in range(self.max_retries):
response = self.session.request(method, url, **kwargs)

# Succès — retourner la réponse parsée
if response.status_code < 400:
return response.json()

# Limité — attendre et réessayer
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
time.sleep(retry_after)
continue

# Erreur serveur — réessayer avec backoff
if response.status_code >= 500:
time.sleep(2 ** attempt)
continue

# Erreur client — ne pas réessayer, lever immédiatement
data = response.json()
raise CertIXAPIError(
status_code=response.status_code,
code=data.get("code", "UNKNOWN"),
message=data.get("error", "Erreur inconnue"),
)

raise CertIXAPIError(429, "RATE_LIMIT_EXCEEDED", "Nombre max de tentatives dépassé")

# ── Opérations d'analyse ────────────────────────────────────────────

def create_scan(self, target: str, scan_type: str, name: Optional[str] = None,
priority: str = "normal", options: Optional[Dict] = None,
tags: Optional[List[str]] = None, timeout: Optional[int] = None) -> Dict[str, Any]:
"""Soumettre une nouvelle requête d'analyse."""
payload = {"target": target, "scanType": scan_type, "priority": priority}
if name: payload["name"] = name
if options: payload["options"] = options
if tags: payload["tags"] = tags
if timeout: payload["timeout"] = timeout
result = self._request("POST", "/scans", json=payload)
return result["data"]

def get_scan(self, scan_id: str, scan_type: str = "nmap") -> Dict[str, Any]:
"""Obtenir le statut et les détails d'une analyse."""
result = self._request("GET", f"/scans/{scan_id}", params={"scanType": scan_type})
return result["data"]

def get_scan_results(self, scan_id: str, scan_type: str = "nmap") -> Any:
"""Récupérer les résultats (l'analyse doit être terminée)."""
result = self._request("GET", f"/scans/{scan_id}/results", params={"scanType": scan_type})
return result["data"]

def cancel_scan(self, scan_id: str, scan_type: str = "nmap") -> Dict[str, Any]:
"""Annuler une analyse en file d'attente ou en cours."""
result = self._request("POST", f"/scans/{scan_id}/cancel", params={"scanType": scan_type})
return result["data"]

def wait_for_scan(self, scan_id: str, scan_type: str = "nmap", timeout: int = 3600) -> Dict[str, Any]:
"""Interroger jusqu'à ce que l'analyse atteigne un statut terminal."""
start = time.time()
terminal = {"completed", "failed", "cancelled"}

while time.time() - start < timeout:
scan = self.get_scan(scan_id, scan_type)
if scan["status"] in terminal:
return scan

# Intervalle de polling adaptatif
elapsed = time.time() - start
if elapsed < 120: interval = 5
elif elapsed < 600: interval = 15
else: interval = 30
time.sleep(interval)

raise TimeoutError(f"L'analyse {scan_id} ne s'est pas terminée dans {timeout}s")

# ── Opérations de modèles ───────────────────────────────────────────

def create_template(self, name: str, scan_type: str, **kwargs) -> Dict[str, Any]:
"""Créer un modèle d'analyse réutilisable."""
payload = {"name": name, "scanType": scan_type}
payload.update(kwargs)
result = self._request("POST", "/scan-templates", json=payload)
return result["data"]

def launch_template(self, template_id: str, target: str, **overrides) -> Dict[str, Any]:
"""Lancer une analyse depuis un modèle sauvegardé."""
payload = {"target": target}
payload.update(overrides)
result = self._request("POST", f"/scan-templates/{template_id}/launch", json=payload)
return result["data"]

# ── Vérification de webhook ─────────────────────────────────────────

@staticmethod
def verify_webhook_signature(body: bytes, signature_header: str, secret: str) -> bool:
"""Vérifier la signature HMAC-SHA256 du webhook."""
if not signature_header.startswith("sha256="):
return False
received = signature_header[7:]
expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(received, expected)


class CertIXAPIError(Exception):
"""Exception levée pour les erreurs de l'API Cert-IX."""
def __init__(self, status_code: int, code: str, message: str):
self.status_code = status_code
self.code = code
self.message = message
super().__init__(f"[{status_code}] {code}: {message}")

Exemple d'utilisation

from certix_client import CertIXScanClient

# Initialiser le client (lit CERTIX_API_KEY depuis l'environnement)
client = CertIXScanClient()

# Soumettre une analyse et attendre les résultats
scan = client.create_scan(
target="example.com",
scan_type="nmap",
name="Test SDK Python",
options={"ports": "1-1024", "timing": "T3"},
)
print(f"Analyse créée : {scan['id']}")

# Attendre la complétion
final = client.wait_for_scan(scan["id"], "nmap")
print(f"Statut : {final['status']}")

# Récupérer les résultats
if final["status"] == "completed":
results = client.get_scan_results(scan["id"], "nmap")
print(f"Résultats : {results}")

Client Go

Package client complet

package certix

import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
)

// Client est le client de l'API Scan Cert-IX.
type Client struct {
apiKey string
baseURL string
httpClient *http.Client
maxRetries int
}

// NewClient crée un nouveau client de l'API Cert-IX.
func NewClient(apiKey, baseURL string) (*Client, error) {
if apiKey == "" {
apiKey = os.Getenv("CERTIX_API_KEY")
}
if apiKey == "" {
return nil, fmt.Errorf("Clé API requise : définissez CERTIX_API_KEY ou passez apiKey")
}
if baseURL == "" {
baseURL = "https://api.cert-ix.com/scan-api/api/v1"
}
return &Client{
apiKey: apiKey,
baseURL: strings.TrimRight(baseURL, "/"),
httpClient: &http.Client{Timeout: 30 * time.Second},
maxRetries: 3,
}, nil
}

// ScanRequest est le corps de requête pour créer une analyse.
type ScanRequest struct {
Target string `json:"target"`
ScanType string `json:"scanType"`
Name string `json:"name,omitempty"`
Priority string `json:"priority,omitempty"`
Options map[string]interface{} `json:"options,omitempty"`
Tags []string `json:"tags,omitempty"`
}

// ScanResponse est la réponse de statut d'analyse.
type ScanResponse struct {
ID string `json:"id"`
ScanType string `json:"scanType"`
Name string `json:"name"`
Target string `json:"target"`
Status string `json:"status"`
Progress int `json:"progress"`
ResultCount int `json:"resultCount,omitempty"`
DurationMs int64 `json:"durationMs,omitempty"`
Error string `json:"error,omitempty"`
}

// CreateScan soumet une nouvelle requête d'analyse.
func (c *Client) CreateScan(req *ScanRequest) (*ScanResponse, error) {
resp, err := c.doRequest("POST", "/scans", req, nil)
if err != nil {
return nil, err
}
var scan ScanResponse
json.Unmarshal(resp.Data, &scan)
return &scan, nil
}

// GetScan récupère le statut et les détails d'une analyse.
func (c *Client) GetScan(scanID, scanType string) (*ScanResponse, error) {
resp, err := c.doRequest("GET", "/scans/"+scanID, nil, map[string]string{"scanType": scanType})
if err != nil {
return nil, err
}
var scan ScanResponse
json.Unmarshal(resp.Data, &scan)
return &scan, nil
}

// WaitForScan interroge jusqu'à ce que l'analyse atteigne un statut terminal.
func (c *Client) WaitForScan(scanID, scanType string, timeout time.Duration) (*ScanResponse, error) {
start := time.Now()
terminal := map[string]bool{"completed": true, "failed": true, "cancelled": true}

for time.Since(start) < timeout {
scan, err := c.GetScan(scanID, scanType)
if err != nil {
return nil, err
}
if terminal[scan.Status] {
return scan, nil
}
elapsed := time.Since(start)
var interval time.Duration
switch {
case elapsed < 2*time.Minute:
interval = 5 * time.Second
case elapsed < 10*time.Minute:
interval = 15 * time.Second
default:
interval = 30 * time.Second
}
time.Sleep(interval)
}
return nil, fmt.Errorf("l'analyse %s ne s'est pas terminée dans %v", scanID, timeout)
}

// VerifyWebhookSignature valide la signature HMAC-SHA256 d'un webhook.
func VerifyWebhookSignature(body []byte, signatureHeader, secret string) bool {
if !strings.HasPrefix(signatureHeader, "sha256=") {
return false
}
received := signatureHeader[7:]
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(body)
expected := hex.EncodeToString(mac.Sum(nil))
return hmac.Equal([]byte(received), []byte(expected))
}

Client Node.js

Classe client complète

/**
* Client Node.js de l'API Scan Cert-IX
* Prêt pour la production avec logique de retry, gestion d'erreurs et vérification de webhooks.
*/

const crypto = require("crypto");

class CertIXScanClient {
constructor(apiKey, baseUrl) {
this.apiKey = apiKey || process.env.CERTIX_API_KEY;
if (!this.apiKey) throw new Error("Clé API requise. Définissez CERTIX_API_KEY ou passez apiKey.");
this.baseUrl = baseUrl || process.env.CERTIX_BASE_URL || "https://api.cert-ix.com/scan-api/api/v1";
this.maxRetries = 3;
}

async _request(method, path, { body, params } = {}) {
let url = `${this.baseUrl}${path}`;
if (params) url += `?${new URLSearchParams(params).toString()}`;

for (let attempt = 0; attempt < this.maxRetries; attempt++) {
const options = {
method,
headers: { "X-API-Key": this.apiKey, "Content-Type": "application/json" },
};
if (body) options.body = JSON.stringify(body);

const response = await fetch(url, options);

if (response.status < 400) return response.json();

if (response.status === 429) {
const retryAfter = parseInt(response.headers.get("Retry-After") || 2 ** attempt, 10);
await new Promise((r) => setTimeout(r, retryAfter * 1000));
continue;
}
if (response.status >= 500) {
await new Promise((r) => setTimeout(r, 2 ** attempt * 1000));
continue;
}

const data = await response.json();
throw new CertIXAPIError(response.status, data.code || "UNKNOWN", data.error || "Erreur inconnue");
}
throw new CertIXAPIError(429, "RATE_LIMIT_EXCEEDED", "Nombre max de tentatives dépassé");
}

// ── Opérations d'analyse ─────────────────────────────────────────────

async createScan({ target, scanType, name, priority = "normal", options, tags, timeout }) {
const payload = { target, scanType, priority };
if (name) payload.name = name;
if (options) payload.options = options;
if (tags) payload.tags = tags;
if (timeout) payload.timeout = timeout;
const result = await this._request("POST", "/scans", { body: payload });
return result.data;
}

async getScan(scanId, scanType = "nmap") {
const result = await this._request("GET", `/scans/${scanId}`, { params: { scanType } });
return result.data;
}

async waitForScan(scanId, scanType = "nmap", timeoutMs = 3600000) {
const start = Date.now();
const terminal = new Set(["completed", "failed", "cancelled"]);

while (Date.now() - start < timeoutMs) {
const scan = await this.getScan(scanId, scanType);
if (terminal.has(scan.status)) return scan;

const elapsed = Date.now() - start;
const interval = elapsed < 120_000 ? 5_000 : elapsed < 600_000 ? 15_000 : 30_000;
await new Promise((r) => setTimeout(r, interval));
}
throw new Error(`L'analyse ${scanId} ne s'est pas terminée dans ${timeoutMs}ms`);
}

// ── Vérification de webhook ──────────────────────────────────────────

static verifyWebhookSignature(body, signatureHeader, secret) {
if (!signatureHeader.startsWith("sha256=")) return false;
const received = signatureHeader.slice(7);
const expected = crypto.createHmac("sha256", secret).update(body).digest("hex");
return crypto.timingSafeEqual(Buffer.from(received), Buffer.from(expected));
}
}

class CertIXAPIError extends Error {
constructor(statusCode, code, message) {
super(`[${statusCode}] ${code}: ${message}`);
this.statusCode = statusCode;
this.code = code;
}
}

module.exports = { CertIXScanClient, CertIXAPIError };

Intégration CI/CD — GitHub Actions

# .github/workflows/security-scan.yml
name: Analyse de sécurité
on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
vulnerability-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Construire l'image conteneur
run: docker build -t monapp:${{ github.sha }} .

- name: Lancer l'analyse Cert-IX
env:
CERTIX_API_KEY: ${{ secrets.CERTIX_API_KEY }}
run: |
python3 - <<'EOF'
import os, json, sys
sys.path.insert(0, ".")
from certix_client import CertIXScanClient

client = CertIXScanClient()

# Soumettre l'analyse de l'image conteneur
scan = client.create_scan(
target=f"ghcr.io/${{ github.repository }}:${{ github.sha }}",
scan_type="trivy",
name=f"Scan CI - {os.environ.get('GITHUB_SHA', '')[:8]}",
priority="high",
options={
"scanners": ["vuln", "misconfig", "secret"],
"severity": ["CRITICAL", "HIGH"],
"ignoreUnfixed": True,
},
)
print(f"Analyse soumise : {scan['id']}")

# Attendre la complétion (max 10 minutes)
result = client.wait_for_scan(scan["id"], "trivy", timeout=600)

if result["status"] == "failed":
print(f"Analyse échouée : {result.get('error')}")
exit(1)

# Vérifier les découvertes critiques
findings = client.get_scan_results(scan["id"], "trivy")
summary = findings.get("summary", {})
critical = summary.get("critical", 0)
high = summary.get("high", 0)

print(f"Découvertes : {critical} critiques, {high} élevées")

if critical > 0:
print("Vulnérabilités CRITIQUES trouvées — déploiement bloqué")
exit(1)

print("Analyse de sécurité réussie")
EOF

Prochaines étapes :