Best Practices
This guide provides recommendations for optimizing your use of the CyberSecFeed API, covering performance optimization, security considerations, and efficient vulnerability management workflows.
Performance Optimization
1. Connection Reuse
Reusing HTTP connections significantly improves performance by avoiding the overhead of establishing new connections for each request.
❌ Bad Practice
import requests
# Creates new connection for each request
for cve_id in cve_list:
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers={"X-API-Key": "your-key"}
)
✅ Good Practice
import requests
# Reuses connection across requests
session = requests.Session()
session.headers.update({"X-API-Key": "your-key"})
for cve_id in cve_list:
response = session.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}"
)
Performance Impact: Connection reuse can improve throughput by 50-70% for multiple requests.
2. Batch Operations
Always prefer batch operations over individual requests when working with multiple CVEs.
❌ Inefficient Approach
# 50 API calls, slower, counts against monthly quota
cves = []
for cve_id in cve_list[:50]:
response = session.get(f"{base_url}/cve/{cve_id}")
cves.append(response.json()["data"]["cve"])
✅ Efficient Approach
# 1 API call, 50x faster
response = session.get(
f"{base_url}/cves",
params={"ids": ",".join(cve_list[:50])}
)
cves = response.json()["data"]["cves"]
3. Field Projection
Request only the fields you need to reduce bandwidth and processing time.
# Full response: ~5KB per CVE
response = session.get(f"{base_url}/cve/CVE-2024-0001")
# Optimized response: ~500 bytes (90% reduction)
response = session.get(
f"{base_url}/cve/CVE-2024-0001",
params={"fields": "id,cvss.baseScore,kev"}
)
4. Use Enrichment Parameters Strategically
Request enrichment data only when needed to optimize performance and reduce bandwidth.
# ❌ Always requesting all enrichment data
response = session.get(
f"{base_url}/cve/CVE-2024-0001?include=acsc,enrichment"
)
# ✅ Request enrichment data based on use case
def get_cve_for_analysis(cve_id, include_regional=False, include_enrichment=False):
"""Get CVE with conditional enrichment"""
params = {}
# Add enrichment parameters based on requirements
include_params = []
if include_regional:
include_params.append("acsc")
if include_enrichment:
include_params.append("enrichment")
if include_params:
params["include"] = ",".join(include_params)
return session.get(f"{base_url}/cve/{cve_id}", params=params)
# Examples for different use cases:
# Basic vulnerability assessment
basic_cve = get_cve_for_analysis("CVE-2024-0001")
# Regional threat analysis (Australia/NZ)
regional_cve = get_cve_for_analysis("CVE-2024-0001", include_regional=True)
# Comprehensive threat intelligence
full_cve = get_cve_for_analysis("CVE-2024-0001",
include_regional=True,
include_enrichment=True)
5. Implement Caching
Use ETags to avoid downloading unchanged data and leverage edge caching.
class CachedClient:
def __init__(self, api_key):
self.session = requests.Session()
self.session.headers.update({"X-API-Key": api_key})
self.etag_cache = {}
def get_with_cache(self, url):
# Add ETag if cached
if url in self.etag_cache:
self.session.headers["If-None-Match"] = self.etag_cache[url]
response = self.session.get(url)
# Handle 304 Not Modified (cached at edge)
if response.status_code == 304:
return {"cached": True, "changed": False}
# Store new ETag
if "ETag" in response.headers:
self.etag_cache[url] = response.headers["ETag"]
return {"cached": False, "changed": True, "data": response.json()}
Performance Impact: ETag caching enables high cache hit rates and near-zero latency for cached responses through Cloudflare edge optimization.
5. Parallel Processing
For large-scale operations, use concurrent requests with proper throttling.
import asyncio
import aiohttp
from asyncio import Semaphore
async def fetch_cves_parallel(cve_ids, api_key, max_concurrent=10):
# Limit concurrent requests
semaphore = Semaphore(max_concurrent)
async def fetch_one(session, cve_id):
async with semaphore:
url = f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}"
async with session.get(url) as response:
return await response.json()
async with aiohttp.ClientSession(
headers={"X-API-Key": api_key}
) as session:
tasks = [fetch_one(session, cve_id) for cve_id in cve_ids]
return await asyncio.gather(*tasks)
Security Best Practices
1. API Key Management
Store Keys Securely
# ❌ Never hardcode
api_key = "sk-1234567890abcdef"
# ✅ Use environment variables
import os
api_key = os.environ.get("CYBERFEED_API_KEY")
if not api_key:
raise ValueError("API key not configured")
Rotate Keys Regularly
- Rotate production keys every 90 days
- Use different keys for different environments
- Implement zero-downtime rotation
2. Input Validation
Always validate CVE IDs before making API calls.
import re
def validate_cve_id(cve_id):
"""Validate CVE ID format"""
pattern = r"^CVE-\d{4}-\d{4,}$"
if not re.match(pattern, cve_id):
raise ValueError(f"Invalid CVE ID format: {cve_id}")
return True
# Use validation
try:
validate_cve_id(user_input)
response = session.get(f"{base_url}/cve/{user_input}")
except ValueError as e:
print(f"Invalid input: {e}")
3. Error Handling
Implement comprehensive error handling with retries.
from time import sleep
from random import uniform
def api_request_with_retry(session, url, max_retries=3):
"""Make API request with exponential backoff"""
for attempt in range(max_retries):
try:
response = session.get(url)
if response.status_code == 200:
return response.json()
elif response.status_code == 503: # Service unavailable
if attempt < max_retries - 1:
wait_time = (2 ** attempt) + uniform(0, 1)
sleep(wait_time)
else:
raise Exception(f"Service unavailable: {response.status_code}")
elif response.status_code >= 500: # Server error
if attempt < max_retries - 1:
# Exponential backoff with jitter
wait_time = (2 ** attempt) + uniform(0, 1)
sleep(wait_time)
else:
raise Exception(f"Server error: {response.status_code}")
else:
# Client error, don't retry
response.raise_for_status()
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
sleep(2 ** attempt)
else:
raise
raise Exception("Max retries exceeded")
4. Audit Logging
Log API usage for security and debugging.
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('cyberfeed')
def log_api_call(method, endpoint, status_code, response_time):
"""Log API calls for audit trail"""
logger.info(
f"API_CALL method={method} endpoint={endpoint} "
f"status={status_code} response_time={response_time}ms"
)
# Log errors with more detail
if status_code >= 400:
logger.error(
f"API_ERROR endpoint={endpoint} status={status_code}"
)
Vulnerability Management Workflows
1. Daily Security Briefing
Create an automated daily report of new threats.
from datetime import datetime, timedelta
def generate_daily_briefing(api_client):
"""Generate daily security briefing"""
yesterday = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%d")
briefing = {
"date": datetime.utcnow().strftime("%Y-%m-%d"),
"new_critical_cves": [],
"new_kev_entries": [],
"high_epss_alerts": []
}
# Get new critical CVEs
critical = api_client.search_cves(
severity_min=9.0,
published_after=yesterday,
limit=100
)
briefing["new_critical_cves"] = critical["data"]["cves"]
# Get new KEV entries
kev = api_client.get_kev(limit=20)
for entry in kev["data"]["vulnerabilities"]:
if entry["dateAdded"] >= yesterday:
briefing["new_kev_entries"].append(entry)
# Get high EPSS scores
high_epss = api_client.search_cves(
epss_min=0.9,
published_after=yesterday,
limit=50
)
briefing["high_epss_alerts"] = high_epss["data"]["cves"]
return briefing
2. Asset-Specific Monitoring
Monitor CVEs affecting your specific technology stack.
def monitor_technology_stack(api_client, technologies, include_regional=False):
"""Monitor CVEs for specific technologies with optional regional context"""
results = {}
for tech in technologies:
# Search for technology-specific CVEs with enrichment data
params = {
"q": tech,
"published_after": "2024-01-01",
"limit": 50
}
# Include regional threat intelligence if requested
if include_regional:
params["include"] = "acsc"
response = api_client.search_cves(**params)
# Filter by risk factors
high_risk = []
for cve in response["data"]["cves"]:
risk_score = calculate_risk_score(cve)
if risk_score > 0.7:
high_risk.append({
"cve": cve,
"risk_score": risk_score,
"has_acsc": bool(cve.get("acsc"))
})
results[tech] = sorted(
high_risk,
key=lambda x: x["risk_score"],
reverse=True
)
return results
def monitor_regional_threats(api_client, severity_threshold=7.0):
"""Monitor for regional threats with ACSC data"""
# Get CVEs with ACSC notices
response = api_client.search_cves(
severity_min=severity_threshold,
include="acsc",
limit=100
)
regional_threats = []
for cve in response["data"]["cves"]:
acsc_data = cve.get("acsc", [])
if acsc_data:
# Categorize by notice type
alerts = [n for n in acsc_data if n.get("type") == "alert"]
advisories = [n for n in acsc_data if n.get("type") == "advisory"]
regional_threats.append({
"cve_id": cve["id"],
"cvss_score": cve.get("cvss", {}).get("baseScore", 0),
"acsc_alerts": len(alerts),
"acsc_advisories": len(advisories),
"priority": determine_priority(cve),
"risk_score": calculate_risk_score(cve)
})
return sorted(regional_threats, key=lambda x: x["risk_score"], reverse=True)
def calculate_risk_score(cve): """Calculate composite risk score with enrichment data""" score = 0.0
CVSS contribution (35% - Technical severity)
cvss_score = cve.get("cvss", ).get("baseScore", 0) score += (cvss_score / 10) * 0.35
EPSS contribution (25% - Exploitation probability)
epss_score = cve.get("epss", ).get("score", 0) score += epss_score * 0.25
KEV contribution (30% - Active exploitation)
if cve.get("kev"): score += 0.30
ACSC contribution (10% - Regional threat context)
acsc_data = cve.get("acsc", []) if acsc_data:
Higher weight for alerts vs advisories
for notice in acsc_data: if notice.get("type") == "alert": score += 0.08 # Critical regional threats elif notice.get("type") == "advisory": score += 0.05 # Important regional context
Cap score at 1.0
return min(score, 1.0)
### 3. Patch Priority Matrix
Generate actionable patch priorities based on multiple factors.
```python
def generate_patch_priorities(api_client, cve_ids):
"""Generate patch priority matrix"""
# Batch fetch CVE data
cve_data = {}
for i in range(0, len(cve_ids), 50):
batch = cve_ids[i:i+50]
response = api_client.search_cves(ids=",".join(batch))
for cve in response["data"]["cves"]:
cve_data[cve["id"]] = cve
# Categorize by priority
priorities = {
"critical": [], # Patch within 24 hours
"high": [], # Patch within 1 week
"medium": [], # Patch within 30 days
"low": [] # Standard patch cycle
}
for cve_id, cve in cve_data.items():
priority = determine_priority(cve)
priorities[priority].append({
"id": cve_id,
"score": cve.get("cvss", {}).get("baseScore", 0),
"epss": cve.get("epss", {}).get("score", 0),
"kev": bool(cve.get("kev")),
"acsc": bool(cve.get("acsc"))
})
return priorities
def determine_priority(cve):
"""Determine patch priority based on multiple factors including enrichment data"""
# Critical: Active exploitation (KEV)
if cve.get("kev"):
return "critical"
# Critical: ACSC alert (regional critical threat)
acsc_data = cve.get("acsc", [])
for notice in acsc_data:
if notice.get("type") == "alert":
return "critical"
# Critical: High CVSS + High EPSS
cvss = cve.get("cvss", {}).get("baseScore", 0)
epss = cve.get("epss", {}).get("score", 0)
if cvss >= 9.0 and epss > 0.5:
return "critical"
# High: High severity, high probability, or ACSC advisory
if cvss >= 7.0 or epss > 0.7:
return "high"
# High: ACSC advisory (regional importance)
if any(notice.get("type") == "advisory" for notice in acsc_data):
return "high"
# Medium: Moderate risk
if cvss >= 4.0 or epss > 0.3:
return "medium"
return "low"
Integration Patterns
1. SIEM Integration
Send high-priority CVEs to your SIEM for correlation.
def send_to_siem(cve_data, siem_endpoint):
"""Send CVE data to SIEM in CEF format"""
cef_event = (
f"CEF:0|CyberSecFeed|CVE-API|1.0|{cve_data['id']}|"
f"New CVE Detected|{int(cve_data['cvss']['baseScore'])}|"
f"cs1Label=EPSS cs1={cve_data.get('epss', {}).get('score', 0)} "
f"cs2Label=KEV cs2={bool(cve_data.get('kev'))} "
f"msg={cve_data['description'][:100]}"
)
# Send to SIEM
requests.post(siem_endpoint, data=cef_event)
2. Ticketing System Integration
Automatically create tickets for high-priority vulnerabilities.
def create_remediation_ticket(cve_data, ticket_api):
"""Create ticket in ticketing system"""
priority = determine_priority(cve_data)
ticket = {
"title": f"Patch Required: {cve_data['id']}",
"priority": priority,
"description": f"""
CVE: {cve_data['id']}
Severity: {cve_data['cvss']['baseSeverity']} ({cve_data['cvss']['baseScore']})
EPSS Score: {cve_data.get('epss', {}).get('score', 'N/A')}
Active Exploitation: {'Yes' if cve_data.get('kev') else 'No'}
Description: {cve_data['description']}
Required Action: Patch affected systems according to vendor guidance.
""",
"due_date": calculate_due_date(priority)
}
return ticket_api.create_ticket(ticket)
Monitoring and Alerting
1. Set Up Monitoring
Monitor your API usage and system health.
class APIMonitor:
def __init__(self, api_client, alert_config):
self.api_client = api_client
self.alert_config = alert_config
self.metrics = {
"total_requests": 0,
"failed_requests": 0,
"quota_used_today": 0
}
def check_health(self):
"""Regular health check"""
try:
response = self.api_client.ping()
if response["data"]["status"] != "ok":
self.send_alert("API health check failed")
except Exception as e:
self.send_alert(f"Health check error: {e}")
def monitor_new_threats(self):
"""Monitor for new high-risk threats"""
threats = self.api_client.search_cves(
severity_min=9.0,
kev=True,
published_after=datetime.utcnow().strftime("%Y-%m-%d"),
limit=10
)
if threats["data"]["cves"]:
self.send_alert(
f"New critical threats detected: {len(threats['data']['cves'])}"
)
2. Performance Metrics
Track and optimize your API usage.
import time
from collections import deque
from statistics import mean, stdev
class PerformanceTracker:
def __init__(self, window_size=100):
self.response_times = deque(maxlen=window_size)
self.error_count = 0
self.request_count = 0
def track_request(self, func):
"""Decorator to track API performance"""
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
response_time = (time.time() - start_time) * 1000
self.response_times.append(response_time)
self.request_count += 1
return result
except Exception as e:
self.error_count += 1
raise e
return wrapper
def get_metrics(self):
"""Get performance metrics"""
if not self.response_times:
return None
return {
"mean_response_time": mean(self.response_times),
"std_dev": stdev(self.response_times) if len(self.response_times) > 1 else 0,
"min_response_time": min(self.response_times),
"max_response_time": max(self.response_times),
"error_rate": self.error_count / max(self.request_count, 1),
"total_requests": self.request_count
}
Summary
Following these best practices will help you:
- Maximize Performance: 50-90% improvement through optimization
- Ensure Security: Protect API keys and validate inputs
- Improve Reliability: Handle errors gracefully with retries
- Scale Efficiently: Use batch operations and caching
- Automate Workflows: Build efficient vulnerability management processes
Remember to:
- Always use HTTPS
- Implement proper error handling
- Monitor your usage
- Keep your integration code updated
- Follow the principle of least privilege for API access