Code Examples
Production-ready code examples for integrating with the Cert-IX Scan API. Each example includes retry logic, error handling, and webhook signature verification.
cURL Examples​
Submit a Scan​
curl -X POST https://api.cert-ix.com/scan-api/api/v1/scans \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"target": "example.com",
"scanType": "nmap",
"name": "Network audit",
"priority": "normal",
"options": {
"ports": "1-1024",
"timing": "T3",
"serviceDetection": true
}
}'
Check Scan Status​
curl -X GET "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID?scanType=nmap" \
-H "X-API-Key: $CERTIX_API_KEY"
Retrieve Results​
curl -X GET "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID/results?scanType=nmap" \
-H "X-API-Key: $CERTIX_API_KEY"
List Scans with Filters​
curl -X GET "https://api.cert-ix.com/scan-api/api/v1/scans?scanType=nmap&status=completed&page=1&limit=10" \
-H "X-API-Key: $CERTIX_API_KEY"
Cancel a Scan​
curl -X POST "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID/cancel?scanType=nmap" \
-H "X-API-Key: $CERTIX_API_KEY"
Create a Webhook​
curl -X POST https://api.cert-ix.com/scan-api/api/v1/webhooks \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "Scan notifications",
"url": "https://your-server.example.com/webhooks/certix",
"events": ["scan.completed", "scan.failed"]
}'
Create a Scan Template​
curl -X POST https://api.cert-ix.com/scan-api/api/v1/scan-templates \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "Container CI Scan",
"scanType": "trivy",
"priority": "high",
"options": {
"scanners": ["vuln", "misconfig", "secret"],
"severity": ["CRITICAL", "HIGH"]
},
"tags": ["ci", "container"]
}'
Launch a Scan from a Template​
curl -X POST "https://api.cert-ix.com/scan-api/api/v1/scan-templates/$TEMPLATE_ID/launch" \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"target": "registry.example.com/myapp:latest"
}'
Python Client​
Installation​
pip install requests
Full Client Class​
"""
Cert-IX Scan API Python Client
Production-ready with retry logic, error handling, and webhook verification.
"""
import hashlib
import hmac
import os
import time
from typing import Any, Dict, List, Optional
import requests
class CertIXScanClient:
"""Cert-IX Scan API client with built-in retry and error handling."""
def __init__(self, api_key: Optional[str] = None, base_url: Optional[str] = None):
# Read API key from parameter or environment variable
self.api_key = api_key or os.environ.get("CERTIX_API_KEY")
if not self.api_key:
raise ValueError("API key required. Set CERTIX_API_KEY or pass api_key.")
self.base_url = (
base_url
or os.environ.get("CERTIX_BASE_URL")
or "https://api.cert-ix.com/scan-api/api/v1"
)
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": self.api_key,
"Content-Type": "application/json",
})
# Maximum retry attempts for transient errors
self.max_retries = 3
def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
"""Execute an HTTP request with automatic retry on transient errors."""
url = f"{self.base_url}{path}"
for attempt in range(self.max_retries):
response = self.session.request(method, url, **kwargs)
# Success — return parsed response
if response.status_code < 400:
return response.json()
# Rate limited — wait and retry
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
time.sleep(retry_after)
continue
# Server error — retry with backoff
if response.status_code >= 500:
time.sleep(2 ** attempt)
continue
# Client error — do not retry, raise immediately
data = response.json()
raise CertIXAPIError(
status_code=response.status_code,
code=data.get("code", "UNKNOWN"),
message=data.get("error", "Unknown error"),
)
raise CertIXAPIError(429, "RATE_LIMIT_EXCEEDED", "Max retries exceeded")
# ── Scan Operations ─────────────────────────────────────────────────
def create_scan(
self,
target: str,
scan_type: str,
name: Optional[str] = None,
priority: str = "normal",
options: Optional[Dict] = None,
tags: Optional[List[str]] = None,
timeout: Optional[int] = None,
) -> Dict[str, Any]:
"""Submit a new scan request."""
payload = {"target": target, "scanType": scan_type, "priority": priority}
if name:
payload["name"] = name
if options:
payload["options"] = options
if tags:
payload["tags"] = tags
if timeout:
payload["timeout"] = timeout
result = self._request("POST", "/scans", json=payload)
return result["data"]
def get_scan(self, scan_id: str, scan_type: str = "nmap") -> Dict[str, Any]:
"""Get scan status and details."""
result = self._request("GET", f"/scans/{scan_id}", params={"scanType": scan_type})
return result["data"]
def get_scan_results(self, scan_id: str, scan_type: str = "nmap") -> Any:
"""Retrieve scan results (scan must be completed)."""
result = self._request("GET", f"/scans/{scan_id}/results", params={"scanType": scan_type})
return result["data"]
def cancel_scan(self, scan_id: str, scan_type: str = "nmap") -> Dict[str, Any]:
"""Cancel a queued or running scan."""
result = self._request("POST", f"/scans/{scan_id}/cancel", params={"scanType": scan_type})
return result["data"]
def wait_for_scan(self, scan_id: str, scan_type: str = "nmap", timeout: int = 3600) -> Dict[str, Any]:
"""Poll until the scan reaches a terminal status."""
start = time.time()
terminal = {"completed", "failed", "cancelled"}
while time.time() - start < timeout:
scan = self.get_scan(scan_id, scan_type)
if scan["status"] in terminal:
return scan
# Adaptive polling interval
elapsed = time.time() - start
if elapsed < 120:
interval = 5
elif elapsed < 600:
interval = 15
else:
interval = 30
time.sleep(interval)
raise TimeoutError(f"Scan {scan_id} did not complete within {timeout}s")
# ── Template Operations ─────────────────────────────────────────────
def create_template(self, name: str, scan_type: str, **kwargs) -> Dict[str, Any]:
"""Create a reusable scan template."""
payload = {"name": name, "scanType": scan_type}
payload.update(kwargs)
result = self._request("POST", "/scan-templates", json=payload)
return result["data"]
def launch_template(self, template_id: str, target: str, **overrides) -> Dict[str, Any]:
"""Launch a scan from a saved template."""
payload = {"target": target}
payload.update(overrides)
result = self._request("POST", f"/scan-templates/{template_id}/launch", json=payload)
return result["data"]
# ── Webhook Operations ──────────────────────────────────────────────
def create_webhook(self, name: str, url: str, events: Optional[List[str]] = None) -> Dict[str, Any]:
"""Register a webhook for scan event notifications."""
payload = {"name": name, "url": url}
if events:
payload["events"] = events
result = self._request("POST", "/webhooks", json=payload)
return result["data"]
# ─ ─ Usage Analytics ─────────────────────────────────────────────────
def get_usage_summary(self, period: str = "30d") -> Dict[str, Any]:
"""Get aggregated usage summary."""
result = self._request("GET", "/usage/summary", params={"period": period})
return result["data"]
# ── Webhook Verification ────────────────────────────────────────────
@staticmethod
def verify_webhook_signature(body: bytes, signature_header: str, secret: str) -> bool:
"""Verify HMAC-SHA256 webhook signature."""
if not signature_header.startswith("sha256="):
return False
received = signature_header[7:]
expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(received, expected)
class CertIXAPIError(Exception):
"""Exception raised for Cert-IX API errors."""
def __init__(self, status_code: int, code: str, message: str):
self.status_code = status_code
self.code = code
self.message = message
super().__init__(f"[{status_code}] {code}: {message}")
Usage Example​
from certix_client import CertIXScanClient
# Initialize client (reads CERTIX_API_KEY from environment)
client = CertIXScanClient()
# Submit a scan and wait for results
scan = client.create_scan(
target="example.com",
scan_type="nmap",
name="Python SDK test",
options={"ports": "1-1024", "timing": "T3"},
)
print(f"Scan created: {scan['id']}")
# Wait for completion
final = client.wait_for_scan(scan["id"], "nmap")
print(f"Status: {final['status']}")
# Retrieve results
if final["status"] == "completed":
results = client.get_scan_results(scan["id"], "nmap")
print(f"Results: {results}")
Go Client​
Full Client Package​
package certix
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
)
// Client is the Cert-IX Scan API client.
type Client struct {
apiKey string
baseURL string
httpClient *http.Client
maxRetries int
}
// NewClient creates a new Cert-IX API client.
func NewClient(apiKey, baseURL string) (*Client, error) {
if apiKey == "" {
apiKey = os.Getenv("CERTIX_API_KEY")
}
if apiKey == "" {
return nil, fmt.Errorf("API key required: set CERTIX_API_KEY or pass apiKey")
}
if baseURL == "" {
baseURL = "https://api.cert-ix.com/scan-api/api/v1"
}
return &Client{
apiKey: apiKey,
baseURL: strings.TrimRight(baseURL, "/"),
httpClient: &http.Client{Timeout: 30 * time.Second},
maxRetries: 3,
}, nil
}
// APIResponse is the standard response envelope.
type APIResponse struct {
Success bool `json:"success"`
Data json.RawMessage `json:"data,omitempty"`
Error string `json:"error,omitempty"`
Code string `json:"code,omitempty"`
}
// ScanRequest is the request body for creating a scan.
type ScanRequest struct {
Target string `json:"target"`
ScanType string `json:"scanType"`
Name string `json:"name,omitempty"`
Priority string `json:"priority,omitempty"`
Options map[string]interface{} `json:"options,omitempty"`
Tags []string `json:"tags,omitempty"`
}
// ScanResponse is the scan status response.
type ScanResponse struct {
ID string `json:"id"`
ScanType string `json:"scanType"`
Name string `json:"name"`
Target string `json:"target"`
Status string `json:"status"`
Progress int `json:"progress"`
ResultCount int `json:"resultCount,omitempty"`
DurationMs int64 `json:"durationMs,omitempty"`
Error string `json:"error,omitempty"`
}
// APIError represents a Cert-IX API error.
type APIError struct {
StatusCode int
Code string
Message string
}
func (e *APIError) Error() string {
return fmt.Sprintf("[%d] %s: %s", e.StatusCode, e.Code, e.Message)
}
// doRequest executes an HTTP request with retry logic.
func (c *Client) doRequest(method, path string, body interface{}, params map[string]string) (*APIResponse, error) {
for attempt := 0; attempt < c.maxRetries; attempt++ {
reqURL := c.baseURL + path
if len(params) > 0 {
q := url.Values{}
for k, v := range params {
q.Set(k, v)
}
reqURL += "?" + q.Encode()
}
var bodyReader io.Reader
if body != nil {
data, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
bodyReader = bytes.NewReader(data)
}
req, err := http.NewRequest(method, reqURL, bodyReader)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("X-API-Key", c.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
time.Sleep(time.Duration(1<<attempt) * time.Second)
continue
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
var apiResp APIResponse
json.Unmarshal(respBody, &apiResp)
if resp.StatusCode < 400 {
return &apiResp, nil
}
if resp.StatusCode == 429 || resp.StatusCode >= 500 {
time.Sleep(time.Duration(1<<attempt) * time.Second)
continue
}
return nil, &APIError{StatusCode: resp.StatusCode, Code: apiResp.Code, Message: apiResp.Error}
}
return nil, &APIError{StatusCode: 429, Code: "RATE_LIMIT_EXCEEDED", Message: "max retries exceeded"}
}
// CreateScan submits a new scan request.
func (c *Client) CreateScan(req *ScanRequest) (*ScanResponse, error) {
resp, err := c.doRequest("POST", "/scans", req, nil)
if err != nil {
return nil, err
}
var scan ScanResponse
json.Unmarshal(resp.Data, &scan)
return &scan, nil
}
// GetScan retrieves scan status and details.
func (c *Client) GetScan(scanID, scanType string) (*ScanResponse, error) {
resp, err := c.doRequest("GET", "/scans/"+scanID, nil, map[string]string{"scanType": scanType})
if err != nil {
return nil, err
}
var scan ScanResponse
json.Unmarshal(resp.Data, &scan)
return &scan, nil
}
// GetScanResults retrieves results for a completed scan.
func (c *Client) GetScanResults(scanID, scanType string) (json.RawMessage, error) {
resp, err := c.doRequest("GET", "/scans/"+scanID+"/results", nil, map[string]string{"scanType": scanType})
if err != nil {
return nil, err
}
return resp.Data, nil
}
// WaitForScan polls until the scan reaches a terminal status.
func (c *Client) WaitForScan(scanID, scanType string, timeout time.Duration) (*ScanResponse, error) {
start := time.Now()
terminal := map[string]bool{"completed": true, "failed": true, "cancelled": true}
for time.Since(start) < timeout {
scan, err := c.GetScan(scanID, scanType)
if err != nil {
return nil, err
}
if terminal[scan.Status] {
return scan, nil
}
elapsed := time.Since(start)
var interval time.Duration
switch {
case elapsed < 2*time.Minute:
interval = 5 * time.Second
case elapsed < 10*time.Minute:
interval = 15 * time.Second
default:
interval = 30 * time.Second
}
time.Sleep(interval)
}
return nil, fmt.Errorf("scan %s did not complete within %v", scanID, timeout)
}
// VerifyWebhookSignature validates an HMAC-SHA256 webhook signature.
func VerifyWebhookSignature(body []byte, signatureHeader, secret string) bool {
if !strings.HasPrefix(signatureHeader, "sha256=") {
return false
}
received := signatureHeader[7:]
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(body)
expected := hex.EncodeToString(mac.Sum(nil))
return hmac.Equal([]byte(received), []byte(expected))
}
Usage Example​
package main
import (
"fmt"
"log"
"time"
certix "your-module/certix"
)
func main() {
client, err := certix.NewClient("", "")
if err != nil {
log.Fatal(err)
}
scan, err := client.CreateScan(&certix.ScanRequest{
Target: "example.com",
ScanType: "nmap",
Name: "Go SDK test",
Options: map[string]interface{}{"ports": "1-1024", "timing": "T3"},
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Scan created: %s\n", scan.ID)
final, err := client.WaitForScan(scan.ID, "nmap", 10*time.Minute)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Status: %s\n", final.Status)
}
Node.js Client​
Installation​
npm install node-fetch@3
# or use Node.js 18+ built-in fetch
Full Client Class​
/**
* Cert-IX Scan API Node.js Client
* Production-ready with retry logic, error handling, and webhook verification.
*/
const crypto = require("crypto");
class CertIXScanClient {
constructor(apiKey, baseUrl) {
this.apiKey = apiKey || process.env.CERTIX_API_KEY;
if (!this.apiKey) throw new Error("API key required. Set CERTIX_API_KEY or pass apiKey.");
this.baseUrl = baseUrl || process.env.CERTIX_BASE_URL || "https://api.cert-ix.com/scan-api/api/v1";
this.maxRetries = 3;
}
async _request(method, path, { body, params } = {}) {
let url = `${this.baseUrl}${path}`;
if (params) url += `?${new URLSearchParams(params).toString()}`;
for (let attempt = 0; attempt < this.maxRetries; attempt++) {
const options = {
method,
headers: { "X-API-Key": this.apiKey, "Content-Type": "application/json" },
};
if (body) options.body = JSON.stringify(body);
const response = await fetch(url, options);
if (response.status < 400) return response.json();
if (response.status === 429) {
const retryAfter = parseInt(response.headers.get("Retry-After") || 2 ** attempt, 10);
await new Promise((r) => setTimeout(r, retryAfter * 1000));
continue;
}
if (response.status >= 500) {
await new Promise((r) => setTimeout(r, 2 ** attempt * 1000));
continue;
}
const data = await response.json();
throw new CertIXAPIError(response.status, data.code || "UNKNOWN", data.error || "Unknown error");
}
throw new CertIXAPIError(429, "RATE_LIMIT_EXCEEDED", "Max retries exceeded");
}
// ── Scan Operations ──────────────────────────────────────────────────
async createScan({ target, scanType, name, priority = "normal", options, tags, timeout }) {
const payload = { target, scanType, priority };
if (name) payload.name = name;
if (options) payload.options = options;
if (tags) payload.tags = tags;
if (timeout) payload.timeout = timeout;
const result = await this._request("POST", "/scans", { body: payload });
return result.data;
}
async getScan(scanId, scanType = "nmap") {
const result = await this._request("GET", `/scans/${scanId}`, { params: { scanType } });
return result.data;
}
async getScanResults(scanId, scanType = "nmap") {
const result = await this._request("GET", `/scans/${scanId}/results`, { params: { scanType } });
return result.data;
}
async cancelScan(scanId, scanType = "nmap") {
const result = await this._request("POST", `/scans/${scanId}/cancel`, { params: { scanType } });
return result.data;
}
async waitForScan(scanId, scanType = "nmap", timeoutMs = 3600000) {
const start = Date.now();
const terminal = new Set(["completed", "failed", "cancelled"]);
while (Date.now() - start < timeoutMs) {
const scan = await this.getScan(scanId, scanType);
if (terminal.has(scan.status)) return scan;
const elapsed = Date.now() - start;
const interval = elapsed < 120_000 ? 5_000 : elapsed < 600_000 ? 15_000 : 30_000;
await new Promise((r) => setTimeout(r, interval));
}
throw new Error(`Scan ${scanId} did not complete within ${timeoutMs}ms`);
}
// ── Template Operations ──────────────────────────────────────────────
async createTemplate({ name, scanType, priority = "normal", options, tags, ...rest }) {
const payload = { name, scanType, priority, ...rest };
if (options) payload.options = options;
if (tags) payload.tags = tags;
const result = await this._request("POST", "/scan-templates", { body: payload });
return result.data;
}
async launchTemplate(templateId, target, overrides = {}) {
const payload = { target, ...overrides };
const result = await this._request("POST", `/scan-templates/${templateId}/launch`, { body: payload });
return result.data;
}
// ── Webhook Verification ─────────────────────────────────────────────
static verifyWebhookSignature(body, signatureHeader, secret) {
if (!signatureHeader.startsWith("sha256=")) return false;
const received = signatureHeader.slice(7);
const expected = crypto.createHmac("sha256", secret).update(body).digest("hex");
return crypto.timingSafeEqual(Buffer.from(received), Buffer.from(expected));
}
}
class CertIXAPIError extends Error {
constructor(statusCode, code, message) {
super(`[${statusCode}] ${code}: ${message}`);
this.statusCode = statusCode;
this.code = code;
}
}
module.exports = { CertIXScanClient, CertIXAPIError };
Usage Example​
const { CertIXScanClient } = require("./certix-client");
async function main() {
const client = new CertIXScanClient();
const scan = await client.createScan({
target: "example.com",
scanType: "nmap",
name: "Node.js SDK test",
options: { ports: "1-1024", timing: "T3" },
});
console.log(`Scan created: ${scan.id}`);
const final = await client.waitForScan(scan.id, "nmap", 600_000);
console.log(`Status: ${final.status}`);
if (final.status === "completed") {
const results = await client.getScanResults(scan.id, "nmap");
console.log("Results:", JSON.stringify(results, null, 2));
}
}
main().catch(console.error);
Express Webhook Handler​
const express = require("express");
const { CertIXScanClient } = require("./certix-client");
const app = express();
const WEBHOOK_SECRET = process.env.CERTIX_WEBHOOK_SECRET;
app.post("/webhooks/certix", express.raw({ type: "application/json" }), (req, res) => {
const signature = req.headers["x-webhook-signature"] || "";
if (!CertIXScanClient.verifyWebhookSignature(req.body, signature, WEBHOOK_SECRET)) {
return res.status(401).send("Invalid signature");
}
const payload = JSON.parse(req.body);
console.log(`Event: ${payload.event}`);
switch (payload.event) {
case "scan.completed":
console.log(`Scan ${payload.data.scanId} completed with ${payload.data.resultCount} findings`);
break;
case "scan.failed":
console.error(`Scan ${payload.data.scanId} failed: ${payload.data.error}`);
break;
}
res.status(200).send("OK");
});
app.listen(3000, () => console.log("Webhook server listening on port 3000"));
CI/CD Integration — GitHub Actions​
# .github/workflows/security-scan.yml
name: Security Scan
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
vulnerability-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build container image
run: docker build -t myapp:${{ github.sha }} .
- name: Run Cert-IX container scan
env:
CERTIX_API_KEY: ${{ secrets.CERTIX_API_KEY }}
run: |
python3 - <<'EOF'
import os, json, sys
sys.path.insert(0, ".")
from certix_client import CertIXScanClient
client = CertIXScanClient()
# Submit container image scan
scan = client.create_scan(
target=f"ghcr.io/${{ github.repository }}:${{ github.sha }}",
scan_type="trivy",
name=f"CI Scan - {os.environ.get('GITHUB_SHA', '')[:8]}",
priority="high",
options={
"scanners": ["vuln", "misconfig", "secret"],
"severity": ["CRITICAL", "HIGH"],
"ignoreUnfixed": True,
},
)
print(f"Scan submitted: {scan['id']}")
# Wait for completion (max 10 minutes)
result = client.wait_for_scan(scan["id"], "trivy", timeout=600)
if result["status"] == "failed":
print(f"Scan failed: {result.get('error')}")
exit(1)
# Check for critical findings
findings = client.get_scan_results(scan["id"], "trivy")
summary = findings.get("summary", {})
critical = summary.get("critical", 0)
high = summary.get("high", 0)
print(f"Findings: {critical} critical, {high} high")
if critical > 0:
print("CRITICAL vulnerabilities found — blocking deployment")
exit(1)
print("Security scan passed")
EOF
Next Steps: