Skip to main content
Version: 1.0.0

Code Examples

Production-ready code examples for integrating with the Cert-IX Scan API. Each example includes retry logic, error handling, and webhook signature verification.

cURL Examples​

Submit a Scan​

curl -X POST https://api.cert-ix.com/scan-api/api/v1/scans \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"target": "example.com",
"scanType": "nmap",
"name": "Network audit",
"priority": "normal",
"options": {
"ports": "1-1024",
"timing": "T3",
"serviceDetection": true
}
}'

Check Scan Status​

curl -X GET "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID?scanType=nmap" \
-H "X-API-Key: $CERTIX_API_KEY"

Retrieve Results​

curl -X GET "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID/results?scanType=nmap" \
-H "X-API-Key: $CERTIX_API_KEY"

List Scans with Filters​

curl -X GET "https://api.cert-ix.com/scan-api/api/v1/scans?scanType=nmap&status=completed&page=1&limit=10" \
-H "X-API-Key: $CERTIX_API_KEY"

Cancel a Scan​

curl -X POST "https://api.cert-ix.com/scan-api/api/v1/scans/$SCAN_ID/cancel?scanType=nmap" \
-H "X-API-Key: $CERTIX_API_KEY"

Create a Webhook​

curl -X POST https://api.cert-ix.com/scan-api/api/v1/webhooks \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "Scan notifications",
"url": "https://your-server.example.com/webhooks/certix",
"events": ["scan.completed", "scan.failed"]
}'

Create a Scan Template​

curl -X POST https://api.cert-ix.com/scan-api/api/v1/scan-templates \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "Container CI Scan",
"scanType": "trivy",
"priority": "high",
"options": {
"scanners": ["vuln", "misconfig", "secret"],
"severity": ["CRITICAL", "HIGH"]
},
"tags": ["ci", "container"]
}'

Launch a Scan from a Template​

curl -X POST "https://api.cert-ix.com/scan-api/api/v1/scan-templates/$TEMPLATE_ID/launch" \
-H "X-API-Key: $CERTIX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"target": "registry.example.com/myapp:latest"
}'

Python Client​

Installation​

pip install requests

Full Client Class​

"""
Cert-IX Scan API Python Client
Production-ready with retry logic, error handling, and webhook verification.
"""

import hashlib
import hmac
import os
import time
from typing import Any, Dict, List, Optional

import requests


class CertIXScanClient:
"""Cert-IX Scan API client with built-in retry and error handling."""

def __init__(self, api_key: Optional[str] = None, base_url: Optional[str] = None):
# Read API key from parameter or environment variable
self.api_key = api_key or os.environ.get("CERTIX_API_KEY")
if not self.api_key:
raise ValueError("API key required. Set CERTIX_API_KEY or pass api_key.")

self.base_url = (
base_url
or os.environ.get("CERTIX_BASE_URL")
or "https://api.cert-ix.com/scan-api/api/v1"
)
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": self.api_key,
"Content-Type": "application/json",
})
# Maximum retry attempts for transient errors
self.max_retries = 3

def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
"""Execute an HTTP request with automatic retry on transient errors."""
url = f"{self.base_url}{path}"

for attempt in range(self.max_retries):
response = self.session.request(method, url, **kwargs)

# Success β€” return parsed response
if response.status_code < 400:
return response.json()

# Rate limited β€” wait and retry
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
time.sleep(retry_after)
continue

# Server error β€” retry with backoff
if response.status_code >= 500:
time.sleep(2 ** attempt)
continue

# Client error β€” do not retry, raise immediately
data = response.json()
raise CertIXAPIError(
status_code=response.status_code,
code=data.get("code", "UNKNOWN"),
message=data.get("error", "Unknown error"),
)

raise CertIXAPIError(429, "RATE_LIMIT_EXCEEDED", "Max retries exceeded")

# ── Scan Operations ─────────────────────────────────────────────────

def create_scan(
self,
target: str,
scan_type: str,
name: Optional[str] = None,
priority: str = "normal",
options: Optional[Dict] = None,
tags: Optional[List[str]] = None,
timeout: Optional[int] = None,
) -> Dict[str, Any]:
"""Submit a new scan request."""
payload = {"target": target, "scanType": scan_type, "priority": priority}
if name:
payload["name"] = name
if options:
payload["options"] = options
if tags:
payload["tags"] = tags
if timeout:
payload["timeout"] = timeout
result = self._request("POST", "/scans", json=payload)
return result["data"]

def get_scan(self, scan_id: str, scan_type: str = "nmap") -> Dict[str, Any]:
"""Get scan status and details."""
result = self._request("GET", f"/scans/{scan_id}", params={"scanType": scan_type})
return result["data"]

def get_scan_results(self, scan_id: str, scan_type: str = "nmap") -> Any:
"""Retrieve scan results (scan must be completed)."""
result = self._request("GET", f"/scans/{scan_id}/results", params={"scanType": scan_type})
return result["data"]

def cancel_scan(self, scan_id: str, scan_type: str = "nmap") -> Dict[str, Any]:
"""Cancel a queued or running scan."""
result = self._request("POST", f"/scans/{scan_id}/cancel", params={"scanType": scan_type})
return result["data"]

def wait_for_scan(self, scan_id: str, scan_type: str = "nmap", timeout: int = 3600) -> Dict[str, Any]:
"""Poll until the scan reaches a terminal status."""
start = time.time()
terminal = {"completed", "failed", "cancelled"}

while time.time() - start < timeout:
scan = self.get_scan(scan_id, scan_type)
if scan["status"] in terminal:
return scan

# Adaptive polling interval
elapsed = time.time() - start
if elapsed < 120:
interval = 5
elif elapsed < 600:
interval = 15
else:
interval = 30
time.sleep(interval)

raise TimeoutError(f"Scan {scan_id} did not complete within {timeout}s")

# ── Template Operations ─────────────────────────────────────────────

def create_template(self, name: str, scan_type: str, **kwargs) -> Dict[str, Any]:
"""Create a reusable scan template."""
payload = {"name": name, "scanType": scan_type}
payload.update(kwargs)
result = self._request("POST", "/scan-templates", json=payload)
return result["data"]

def launch_template(self, template_id: str, target: str, **overrides) -> Dict[str, Any]:
"""Launch a scan from a saved template."""
payload = {"target": target}
payload.update(overrides)
result = self._request("POST", f"/scan-templates/{template_id}/launch", json=payload)
return result["data"]

# ── Webhook Operations ──────────────────────────────────────────────

def create_webhook(self, name: str, url: str, events: Optional[List[str]] = None) -> Dict[str, Any]:
"""Register a webhook for scan event notifications."""
payload = {"name": name, "url": url}
if events:
payload["events"] = events
result = self._request("POST", "/webhooks", json=payload)
return result["data"]

# ── Usage Analytics ─────────────────────────────────────────────────

def get_usage_summary(self, period: str = "30d") -> Dict[str, Any]:
"""Get aggregated usage summary."""
result = self._request("GET", "/usage/summary", params={"period": period})
return result["data"]

# ── Webhook Verification ────────────────────────────────────────────

@staticmethod
def verify_webhook_signature(body: bytes, signature_header: str, secret: str) -> bool:
"""Verify HMAC-SHA256 webhook signature."""
if not signature_header.startswith("sha256="):
return False
received = signature_header[7:]
expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(received, expected)


class CertIXAPIError(Exception):
"""Exception raised for Cert-IX API errors."""
def __init__(self, status_code: int, code: str, message: str):
self.status_code = status_code
self.code = code
self.message = message
super().__init__(f"[{status_code}] {code}: {message}")

Usage Example​

from certix_client import CertIXScanClient

# Initialize client (reads CERTIX_API_KEY from environment)
client = CertIXScanClient()

# Submit a scan and wait for results
scan = client.create_scan(
target="example.com",
scan_type="nmap",
name="Python SDK test",
options={"ports": "1-1024", "timing": "T3"},
)
print(f"Scan created: {scan['id']}")

# Wait for completion
final = client.wait_for_scan(scan["id"], "nmap")
print(f"Status: {final['status']}")

# Retrieve results
if final["status"] == "completed":
results = client.get_scan_results(scan["id"], "nmap")
print(f"Results: {results}")

Go Client​

Full Client Package​

package certix

import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
)

// Client is the Cert-IX Scan API client.
type Client struct {
apiKey string
baseURL string
httpClient *http.Client
maxRetries int
}

// NewClient creates a new Cert-IX API client.
func NewClient(apiKey, baseURL string) (*Client, error) {
if apiKey == "" {
apiKey = os.Getenv("CERTIX_API_KEY")
}
if apiKey == "" {
return nil, fmt.Errorf("API key required: set CERTIX_API_KEY or pass apiKey")
}
if baseURL == "" {
baseURL = "https://api.cert-ix.com/scan-api/api/v1"
}
return &Client{
apiKey: apiKey,
baseURL: strings.TrimRight(baseURL, "/"),
httpClient: &http.Client{Timeout: 30 * time.Second},
maxRetries: 3,
}, nil
}

// APIResponse is the standard response envelope.
type APIResponse struct {
Success bool `json:"success"`
Data json.RawMessage `json:"data,omitempty"`
Error string `json:"error,omitempty"`
Code string `json:"code,omitempty"`
}

// ScanRequest is the request body for creating a scan.
type ScanRequest struct {
Target string `json:"target"`
ScanType string `json:"scanType"`
Name string `json:"name,omitempty"`
Priority string `json:"priority,omitempty"`
Options map[string]interface{} `json:"options,omitempty"`
Tags []string `json:"tags,omitempty"`
}

// ScanResponse is the scan status response.
type ScanResponse struct {
ID string `json:"id"`
ScanType string `json:"scanType"`
Name string `json:"name"`
Target string `json:"target"`
Status string `json:"status"`
Progress int `json:"progress"`
ResultCount int `json:"resultCount,omitempty"`
DurationMs int64 `json:"durationMs,omitempty"`
Error string `json:"error,omitempty"`
}

// APIError represents a Cert-IX API error.
type APIError struct {
StatusCode int
Code string
Message string
}

func (e *APIError) Error() string {
return fmt.Sprintf("[%d] %s: %s", e.StatusCode, e.Code, e.Message)
}

// doRequest executes an HTTP request with retry logic.
func (c *Client) doRequest(method, path string, body interface{}, params map[string]string) (*APIResponse, error) {
for attempt := 0; attempt < c.maxRetries; attempt++ {
reqURL := c.baseURL + path
if len(params) > 0 {
q := url.Values{}
for k, v := range params {
q.Set(k, v)
}
reqURL += "?" + q.Encode()
}

var bodyReader io.Reader
if body != nil {
data, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
bodyReader = bytes.NewReader(data)
}

req, err := http.NewRequest(method, reqURL, bodyReader)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("X-API-Key", c.apiKey)
req.Header.Set("Content-Type", "application/json")

resp, err := c.httpClient.Do(req)
if err != nil {
time.Sleep(time.Duration(1<<attempt) * time.Second)
continue
}
defer resp.Body.Close()

respBody, _ := io.ReadAll(resp.Body)
var apiResp APIResponse
json.Unmarshal(respBody, &apiResp)

if resp.StatusCode < 400 {
return &apiResp, nil
}
if resp.StatusCode == 429 || resp.StatusCode >= 500 {
time.Sleep(time.Duration(1<<attempt) * time.Second)
continue
}
return nil, &APIError{StatusCode: resp.StatusCode, Code: apiResp.Code, Message: apiResp.Error}
}
return nil, &APIError{StatusCode: 429, Code: "RATE_LIMIT_EXCEEDED", Message: "max retries exceeded"}
}

// CreateScan submits a new scan request.
func (c *Client) CreateScan(req *ScanRequest) (*ScanResponse, error) {
resp, err := c.doRequest("POST", "/scans", req, nil)
if err != nil {
return nil, err
}
var scan ScanResponse
json.Unmarshal(resp.Data, &scan)
return &scan, nil
}

// GetScan retrieves scan status and details.
func (c *Client) GetScan(scanID, scanType string) (*ScanResponse, error) {
resp, err := c.doRequest("GET", "/scans/"+scanID, nil, map[string]string{"scanType": scanType})
if err != nil {
return nil, err
}
var scan ScanResponse
json.Unmarshal(resp.Data, &scan)
return &scan, nil
}

// GetScanResults retrieves results for a completed scan.
func (c *Client) GetScanResults(scanID, scanType string) (json.RawMessage, error) {
resp, err := c.doRequest("GET", "/scans/"+scanID+"/results", nil, map[string]string{"scanType": scanType})
if err != nil {
return nil, err
}
return resp.Data, nil
}

// WaitForScan polls until the scan reaches a terminal status.
func (c *Client) WaitForScan(scanID, scanType string, timeout time.Duration) (*ScanResponse, error) {
start := time.Now()
terminal := map[string]bool{"completed": true, "failed": true, "cancelled": true}

for time.Since(start) < timeout {
scan, err := c.GetScan(scanID, scanType)
if err != nil {
return nil, err
}
if terminal[scan.Status] {
return scan, nil
}
elapsed := time.Since(start)
var interval time.Duration
switch {
case elapsed < 2*time.Minute:
interval = 5 * time.Second
case elapsed < 10*time.Minute:
interval = 15 * time.Second
default:
interval = 30 * time.Second
}
time.Sleep(interval)
}
return nil, fmt.Errorf("scan %s did not complete within %v", scanID, timeout)
}

// VerifyWebhookSignature validates an HMAC-SHA256 webhook signature.
func VerifyWebhookSignature(body []byte, signatureHeader, secret string) bool {
if !strings.HasPrefix(signatureHeader, "sha256=") {
return false
}
received := signatureHeader[7:]
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(body)
expected := hex.EncodeToString(mac.Sum(nil))
return hmac.Equal([]byte(received), []byte(expected))
}

Usage Example​

package main

import (
"fmt"
"log"
"time"
certix "your-module/certix"
)

func main() {
client, err := certix.NewClient("", "")
if err != nil {
log.Fatal(err)
}

scan, err := client.CreateScan(&certix.ScanRequest{
Target: "example.com",
ScanType: "nmap",
Name: "Go SDK test",
Options: map[string]interface{}{"ports": "1-1024", "timing": "T3"},
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Scan created: %s\n", scan.ID)

final, err := client.WaitForScan(scan.ID, "nmap", 10*time.Minute)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Status: %s\n", final.Status)
}

Node.js Client​

Installation​

npm install node-fetch@3
# or use Node.js 18+ built-in fetch

Full Client Class​

/**
* Cert-IX Scan API Node.js Client
* Production-ready with retry logic, error handling, and webhook verification.
*/

const crypto = require("crypto");

class CertIXScanClient {
constructor(apiKey, baseUrl) {
this.apiKey = apiKey || process.env.CERTIX_API_KEY;
if (!this.apiKey) throw new Error("API key required. Set CERTIX_API_KEY or pass apiKey.");
this.baseUrl = baseUrl || process.env.CERTIX_BASE_URL || "https://api.cert-ix.com/scan-api/api/v1";
this.maxRetries = 3;
}

async _request(method, path, { body, params } = {}) {
let url = `${this.baseUrl}${path}`;
if (params) url += `?${new URLSearchParams(params).toString()}`;

for (let attempt = 0; attempt < this.maxRetries; attempt++) {
const options = {
method,
headers: { "X-API-Key": this.apiKey, "Content-Type": "application/json" },
};
if (body) options.body = JSON.stringify(body);

const response = await fetch(url, options);

if (response.status < 400) return response.json();

if (response.status === 429) {
const retryAfter = parseInt(response.headers.get("Retry-After") || 2 ** attempt, 10);
await new Promise((r) => setTimeout(r, retryAfter * 1000));
continue;
}
if (response.status >= 500) {
await new Promise((r) => setTimeout(r, 2 ** attempt * 1000));
continue;
}

const data = await response.json();
throw new CertIXAPIError(response.status, data.code || "UNKNOWN", data.error || "Unknown error");
}
throw new CertIXAPIError(429, "RATE_LIMIT_EXCEEDED", "Max retries exceeded");
}

// ── Scan Operations ──────────────────────────────────────────────────

async createScan({ target, scanType, name, priority = "normal", options, tags, timeout }) {
const payload = { target, scanType, priority };
if (name) payload.name = name;
if (options) payload.options = options;
if (tags) payload.tags = tags;
if (timeout) payload.timeout = timeout;
const result = await this._request("POST", "/scans", { body: payload });
return result.data;
}

async getScan(scanId, scanType = "nmap") {
const result = await this._request("GET", `/scans/${scanId}`, { params: { scanType } });
return result.data;
}

async getScanResults(scanId, scanType = "nmap") {
const result = await this._request("GET", `/scans/${scanId}/results`, { params: { scanType } });
return result.data;
}

async cancelScan(scanId, scanType = "nmap") {
const result = await this._request("POST", `/scans/${scanId}/cancel`, { params: { scanType } });
return result.data;
}

async waitForScan(scanId, scanType = "nmap", timeoutMs = 3600000) {
const start = Date.now();
const terminal = new Set(["completed", "failed", "cancelled"]);

while (Date.now() - start < timeoutMs) {
const scan = await this.getScan(scanId, scanType);
if (terminal.has(scan.status)) return scan;

const elapsed = Date.now() - start;
const interval = elapsed < 120_000 ? 5_000 : elapsed < 600_000 ? 15_000 : 30_000;
await new Promise((r) => setTimeout(r, interval));
}
throw new Error(`Scan ${scanId} did not complete within ${timeoutMs}ms`);
}

// ── Template Operations ──────────────────────────────────────────────

async createTemplate({ name, scanType, priority = "normal", options, tags, ...rest }) {
const payload = { name, scanType, priority, ...rest };
if (options) payload.options = options;
if (tags) payload.tags = tags;
const result = await this._request("POST", "/scan-templates", { body: payload });
return result.data;
}

async launchTemplate(templateId, target, overrides = {}) {
const payload = { target, ...overrides };
const result = await this._request("POST", `/scan-templates/${templateId}/launch`, { body: payload });
return result.data;
}

// ── Webhook Verification ─────────────────────────────────────────────

static verifyWebhookSignature(body, signatureHeader, secret) {
if (!signatureHeader.startsWith("sha256=")) return false;
const received = signatureHeader.slice(7);
const expected = crypto.createHmac("sha256", secret).update(body).digest("hex");
return crypto.timingSafeEqual(Buffer.from(received), Buffer.from(expected));
}
}

class CertIXAPIError extends Error {
constructor(statusCode, code, message) {
super(`[${statusCode}] ${code}: ${message}`);
this.statusCode = statusCode;
this.code = code;
}
}

module.exports = { CertIXScanClient, CertIXAPIError };

Usage Example​

const { CertIXScanClient } = require("./certix-client");

async function main() {
const client = new CertIXScanClient();

const scan = await client.createScan({
target: "example.com",
scanType: "nmap",
name: "Node.js SDK test",
options: { ports: "1-1024", timing: "T3" },
});
console.log(`Scan created: ${scan.id}`);

const final = await client.waitForScan(scan.id, "nmap", 600_000);
console.log(`Status: ${final.status}`);

if (final.status === "completed") {
const results = await client.getScanResults(scan.id, "nmap");
console.log("Results:", JSON.stringify(results, null, 2));
}
}

main().catch(console.error);

Express Webhook Handler​

const express = require("express");
const { CertIXScanClient } = require("./certix-client");

const app = express();
const WEBHOOK_SECRET = process.env.CERTIX_WEBHOOK_SECRET;

app.post("/webhooks/certix", express.raw({ type: "application/json" }), (req, res) => {
const signature = req.headers["x-webhook-signature"] || "";
if (!CertIXScanClient.verifyWebhookSignature(req.body, signature, WEBHOOK_SECRET)) {
return res.status(401).send("Invalid signature");
}

const payload = JSON.parse(req.body);
console.log(`Event: ${payload.event}`);

switch (payload.event) {
case "scan.completed":
console.log(`Scan ${payload.data.scanId} completed with ${payload.data.resultCount} findings`);
break;
case "scan.failed":
console.error(`Scan ${payload.data.scanId} failed: ${payload.data.error}`);
break;
}

res.status(200).send("OK");
});

app.listen(3000, () => console.log("Webhook server listening on port 3000"));

CI/CD Integration β€” GitHub Actions​

# .github/workflows/security-scan.yml
name: Security Scan
on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
vulnerability-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Build container image
run: docker build -t myapp:${{ github.sha }} .

- name: Run Cert-IX container scan
env:
CERTIX_API_KEY: ${{ secrets.CERTIX_API_KEY }}
run: |
python3 - <<'EOF'
import os, json, sys
sys.path.insert(0, ".")
from certix_client import CertIXScanClient

client = CertIXScanClient()

# Submit container image scan
scan = client.create_scan(
target=f"ghcr.io/${{ github.repository }}:${{ github.sha }}",
scan_type="trivy",
name=f"CI Scan - {os.environ.get('GITHUB_SHA', '')[:8]}",
priority="high",
options={
"scanners": ["vuln", "misconfig", "secret"],
"severity": ["CRITICAL", "HIGH"],
"ignoreUnfixed": True,
},
)
print(f"Scan submitted: {scan['id']}")

# Wait for completion (max 10 minutes)
result = client.wait_for_scan(scan["id"], "trivy", timeout=600)

if result["status"] == "failed":
print(f"Scan failed: {result.get('error')}")
exit(1)

# Check for critical findings
findings = client.get_scan_results(scan["id"], "trivy")
summary = findings.get("summary", {})
critical = summary.get("critical", 0)
high = summary.get("high", 0)

print(f"Findings: {critical} critical, {high} high")

if critical > 0:
print("CRITICAL vulnerabilities found β€” blocking deployment")
exit(1)

print("Security scan passed")
EOF

Next Steps: