Performance Optimization Guide
This guide provides comprehensive strategies to optimize your CyberSecFeed API integration for maximum performance, minimal bandwidth usage, and efficient quota consumption.
Performance Overview
The CyberSecFeed API is designed for high performance with several optimization layers:
- Edge Caching: 80%+ cache hit rates with Cloudflare CDN
- Parameter-Based Loading: Request only the data you need
- Field Projection: Minimize payload size
- ETag Support: Efficient client-side caching
- Batch Operations: Reduce request overhead
Performance Metrics
Operation | P95 Response Time | Cache Hit Rate | Typical Payload |
---|---|---|---|
CVE Detail (default) | <200ms | 85% | ~2KB |
CVE Detail + ACSC | <250ms | 80% | ~2.5KB |
CVE Search | <500ms | 70% | ~20KB (10 results) |
Batch Lookup (50 CVEs) | <1s | 60% | ~100KB |
KEV Catalog | <300ms | 90% | ~50KB |
1. Parameter-Based Optimization
Use Selective Data Loading
The most impactful optimization is requesting only the enrichment data you need:
# ❌ Slow - requests unnecessary data
def get_all_enrichment(cve_id):
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
params={'include': 'acsc,enrichment'},
headers=headers
)
return response.json()
# ✅ Fast - only core data + KEV + EPSS (default)
def get_basic_data(cve_id):
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers=headers
)
return response.json()
# ✅ Optimal - selective inclusion based on use case
def get_targeted_data(cve_id, include_regional=False):
params = {}
if include_regional:
params['include'] = 'acsc'
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
params=params,
headers=headers
)
return response.json()
Performance Impact by Parameter
Parameter Combination | Response Time | Payload Size | Use Case |
---|---|---|---|
Default (no params) | Fastest | Smallest | Basic vulnerability scanning |
include=acsc | +10-20% | +25% | Regional threat intelligence |
include=enrichment | +20-30% | +50% | Advanced analytics |
include=acsc,enrichment | +30-40% | +75% | Complete intelligence picture |
2. Field Projection Optimization
Request only the fields you need to minimize bandwidth and processing time:
# ❌ Full response (unnecessary data)
def get_full_cve(cve_id):
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers=headers
)
return response.json()
# ✅ Optimized for risk scoring
def get_risk_data(cve_id):
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
params={'fields': 'id,cvss.baseScore,kev,epss.score'},
headers=headers
)
return response.json()
# ✅ Optimized for compliance reporting
def get_compliance_data(cve_id):
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
params={
'fields': 'id,published,cvss.baseScore,cvss.baseSeverity,kev.dateAdded',
'include': 'acsc'
},
headers=headers
)
return response.json()
Common Field Combinations
FIELD_PRESETS = {
'minimal': 'id,cvss.baseScore',
'risk_scoring': 'id,cvss.baseScore,kev,epss.score',
'dashboard': 'id,published,cvss.baseScore,cvss.baseSeverity,description',
'compliance': 'id,published,cvss,kev.dateAdded,kev.requiredAction',
'threat_intel': 'id,cvss.baseScore,kev,epss.score,acsc.priority'
}
def get_cve_optimized(cve_id, preset='minimal', include_acsc=False):
params = {'fields': FIELD_PRESETS[preset]}
if include_acsc:
params['include'] = 'acsc'
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
params=params,
headers=headers
)
return response.json()
3. Caching Strategies
ETag-Based Caching
Implement proper ETag support for maximum cache efficiency:
import hashlib
class OptimizedCyberSecFeedClient:
def __init__(self, api_key):
self.api_key = api_key
self.etag_cache = {}
self.session = requests.Session()
self.session.headers.update({'X-API-Key': api_key})
def get_cve_cached(self, cve_id, params=None):
"""Get CVE with ETag caching support"""
url = f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}"
cache_key = self._make_cache_key(url, params)
headers = {}
if cache_key in self.etag_cache:
headers['If-None-Match'] = self.etag_cache[cache_key]['etag']
response = self.session.get(url, params=params, headers=headers)
if response.status_code == 304:
# Cache hit - return cached data
return self.etag_cache[cache_key]['data']
elif response.status_code == 200:
# Cache miss - store new data
data = response.json()
if 'etag' in response.headers:
self.etag_cache[cache_key] = {
'etag': response.headers['etag'],
'data': data
}
return data
else:
response.raise_for_status()
def _make_cache_key(self, url, params):
"""Create cache key from URL and parameters"""
if params:
param_str = '&'.join(f"{k}={v}" for k, v in sorted(params.items()))
cache_input = f"{url}?{param_str}"
else:
cache_input = url
return hashlib.md5(cache_input.encode()).hexdigest()
# Usage
client = OptimizedCyberSecFeedClient(api_key)
# First request - cache miss
data1 = client.get_cve_cached('CVE-2024-0001') # ~200ms
# Second request - cache hit
data2 = client.get_cve_cached('CVE-2024-0001') # ~20ms
Application-Level Caching
from functools import lru_cache
import time
class CacheConfig:
CVE_CACHE_TTL = 3600 # 1 hour
STATS_CACHE_TTL = 300 # 5 minutes
KEV_CACHE_TTL = 1800 # 30 minutes
class TimedCache:
def __init__(self, ttl):
self.cache = {}
self.ttl = ttl
def get(self, key):
if key in self.cache:
data, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
return data
else:
del self.cache[key]
return None
def set(self, key, value):
self.cache[key] = (value, time.time())
class CachedClient:
def __init__(self, api_key):
self.api_key = api_key
self.cve_cache = TimedCache(CacheConfig.CVE_CACHE_TTL)
self.stats_cache = TimedCache(CacheConfig.STATS_CACHE_TTL)
def get_cve(self, cve_id, use_cache=True):
if use_cache:
cached = self.cve_cache.get(cve_id)
if cached:
return cached
# Make API request
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers={'X-API-Key': self.api_key}
)
data = response.json()
if use_cache:
self.cve_cache.set(cve_id, data)
return data
4. Batch Operation Optimization
Efficient Batch Processing
def optimized_batch_processing(cve_ids, api_key):
"""Process CVEs in optimized batches"""
session = requests.Session()
session.headers.update({'X-API-Key': api_key})
# Use maximum batch size
batch_size = 50
all_results = []
for i in range(0, len(cve_ids), batch_size):
batch = cve_ids[i:i + batch_size]
# Optimize batch request
params = {
'ids': ','.join(batch),
'fields': 'id,cvss.baseScore,kev,epss.score', # Only essential data
# Include ACSC only if needed for this batch
# 'include': 'acsc'
}
response = session.get(
'https://api.cybersecfeed.com/api/v1/cves',
params=params
)
if response.status_code == 200:
batch_data = response.json()
all_results.extend(batch_data['data']['cves'])
# Small delay to avoid overwhelming the API
time.sleep(0.1)
return all_results
# Process 500 CVEs efficiently
large_cve_list = [f"CVE-2024-{str(i).zfill(4)}" for i in range(1, 501)]
results = optimized_batch_processing(large_cve_list, api_key)
print(f"Processed {len(results)} CVEs")
Parallel Processing with Rate Limiting
import asyncio
import aiohttp
from asyncio import Semaphore
class AsyncOptimizedClient:
def __init__(self, api_key, max_concurrent=10):
self.api_key = api_key
self.semaphore = Semaphore(max_concurrent)
self.base_url = "https://api.cybersecfeed.com/api/v1"
async def fetch_cve_batch(self, session, cve_ids, include_acsc=False):
"""Fetch a batch of CVEs asynchronously"""
async with self.semaphore: # Rate limiting
params = {
'ids': ','.join(cve_ids[:50]),
'fields': 'id,cvss.baseScore,kev,epss.score'
}
if include_acsc:
params['include'] = 'acsc'
async with session.get(
f"{self.base_url}/cves",
params=params,
headers={'X-API-Key': self.api_key}
) as response:
return await response.json()
async def process_large_list(self, all_cve_ids, include_acsc=False):
"""Process large CVE list with controlled concurrency"""
# Split into batches of 50
batches = [all_cve_ids[i:i+50] for i in range(0, len(all_cve_ids), 50)]
async with aiohttp.ClientSession() as session:
tasks = [
self.fetch_cve_batch(session, batch, include_acsc)
for batch in batches
]
batch_results = await asyncio.gather(*tasks)
# Combine all results
all_cves = []
for batch_result in batch_results:
if batch_result.get('data', {}).get('cves'):
all_cves.extend(batch_result['data']['cves'])
return all_cves
# Usage
async def main():
client = AsyncOptimizedClient(api_key, max_concurrent=5)
large_list = [f"CVE-2024-{str(i).zfill(4)}" for i in range(1, 1001)]
start_time = time.time()
results = await client.process_large_list(large_list)
end_time = time.time()
print(f"Processed {len(results)} CVEs in {end_time - start_time:.2f} seconds")
# Run async processing
asyncio.run(main())
5. Connection Optimization
Connection Pooling
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class OptimizedSession:
def __init__(self, api_key):
self.session = requests.Session()
# Configure connection pooling
adapter = HTTPAdapter(
pool_connections=10, # Number of connection pools
pool_maxsize=20, # Max connections per pool
max_retries=Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1,
respect_retry_after_header=True
)
)
self.session.mount('https://', adapter)
self.session.headers.update({
'X-API-Key': api_key,
'User-Agent': 'MyApp/1.0 (Optimized)',
'Accept-Encoding': 'gzip, deflate'
})
def get(self, url, **kwargs):
return self.session.get(url, **kwargs)
# Use persistent session
session = OptimizedSession(api_key)
# Multiple requests reuse connections
for cve_id in cve_list:
response = session.get(f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}")
Timeout Optimization
# Configure timeouts for different operations
TIMEOUTS = {
'ping': (2, 5), # (connect, read) in seconds
'cve_detail': (3, 10),
'search': (5, 30),
'batch': (5, 60),
'stats': (3, 15)
}
def get_cve_with_timeout(cve_id, operation_type='cve_detail'):
timeout = TIMEOUTS.get(operation_type, (5, 30))
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers={'X-API-Key': api_key},
timeout=timeout
)
return response.json()
6. Use Case-Specific Optimizations
High-Frequency Monitoring
class HighFrequencyMonitor:
"""Optimized for frequent checks with minimal data"""
def __init__(self, api_key):
self.client = OptimizedCyberSecFeedClient(api_key)
self.last_check = None
def check_new_critical_cves(self):
"""Fast check for new critical CVEs"""
# Use minimal fields for speed
params = {
'severity_min': 9.0,
'limit': 10,
'fields': 'id,published,cvss.baseScore'
}
if self.last_check:
params['published_after'] = self.last_check
response = self.client.session.get(
'https://api.cybersecfeed.com/api/v1/cves',
params=params
)
self.last_check = datetime.utcnow().isoformat()
return response.json()
Bulk Data Analysis
class BulkAnalyzer:
"""Optimized for processing large datasets"""
def __init__(self, api_key):
self.api_key = api_key
def analyze_cve_list(self, cve_ids):
"""Analyze large CVE list with optimal batching"""
# Process in chunks with minimal data
results = []
chunk_size = 50
for i in range(0, len(cve_ids), chunk_size):
chunk = cve_ids[i:i + chunk_size]
response = requests.get(
'https://api.cybersecfeed.com/api/v1/cves',
params={
'ids': ','.join(chunk),
'fields': 'id,cvss.baseScore,kev,epss.score'
},
headers={'X-API-Key': self.api_key}
)
if response.status_code == 200:
chunk_data = response.json()['data']['cves']
results.extend(chunk_data)
return self.calculate_risk_metrics(results)
def calculate_risk_metrics(self, cves):
"""Fast risk calculation on minimal data"""
metrics = {
'total_cves': len(cves),
'critical_count': 0,
'kev_count': 0,
'high_epss_count': 0,
'avg_cvss': 0
}
total_cvss = 0
for cve in cves:
cvss_score = cve.get('cvss', {}).get('baseScore', 0)
total_cvss += cvss_score
if cvss_score >= 9.0:
metrics['critical_count'] += 1
if cve.get('kev'):
metrics['kev_count'] += 1
if cve.get('epss', {}).get('score', 0) > 0.7:
metrics['high_epss_count'] += 1
metrics['avg_cvss'] = total_cvss / len(cves) if cves else 0
return metrics
7. Performance Monitoring
Response Time Tracking
import time
from collections import defaultdict
class PerformanceTracker:
def __init__(self):
self.metrics = defaultdict(list)
def track_request(self, operation, func, *args, **kwargs):
"""Track request performance"""
start_time = time.time()
try:
result = func(*args, **kwargs)
success = True
except Exception as e:
result = None
success = False
duration = time.time() - start_time
self.metrics[operation].append({
'duration': duration,
'success': success,
'timestamp': time.time()
})
return result
def get_stats(self, operation):
"""Get performance statistics"""
if operation not in self.metrics:
return None
durations = [m['duration'] for m in self.metrics[operation] if m['success']]
if not durations:
return None
return {
'count': len(durations),
'avg': sum(durations) / len(durations),
'min': min(durations),
'max': max(durations),
'p95': sorted(durations)[int(len(durations) * 0.95)]
}
# Usage
tracker = PerformanceTracker()
# Track CVE requests
def get_cve_tracked(cve_id):
return tracker.track_request(
'cve_detail',
lambda: requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers={'X-API-Key': api_key}
).json()
)
# Get performance stats
stats = tracker.get_stats('cve_detail')
print(f"Average response time: {stats['avg']:.3f}s")
print(f"95th percentile: {stats['p95']:.3f}s")
8. Optimization Checklist
Pre-Optimization Audit
- Identify which enrichment data you actually use
- Review field usage - are you reading all returned fields?
- Check for duplicate or unnecessary API calls
- Measure current response times and bandwidth usage
Implementation
- Implement parameter-based data loading (
include
parameters) - Add field projection for targeted data retrieval
- Set up proper ETag-based caching
- Use batch operations for multiple CVE lookups
- Configure connection pooling and timeouts
Monitoring
- Track response times and cache hit rates
- Monitor quota usage via
/usage
endpoint - Set up alerts for performance degradation
- Regular performance testing with realistic data sets
Advanced Optimizations
- Implement application-level caching with TTL
- Use async processing for bulk operations
- Consider CDN or proxy caching for frequently accessed data
- Optimize data processing pipelines for minimal memory usage
Expected Performance Gains
Following this guide, you can expect:
Optimization | Performance Improvement | Bandwidth Reduction | Quota Efficiency |
---|---|---|---|
Parameter Control | 20-40% faster | 25-50% less | Same requests, less data |
Field Projection | 30-60% faster | 40-70% less | Same requests, minimal data |
ETag Caching | 80-95% faster (cached) | 90%+ less | Dramatically fewer requests |
Batch Operations | 50x faster (vs individual) | Overhead reduction | 98% fewer requests |
Combined | 5-10x improvement | 70-90% reduction | Massive efficiency gains |
These optimizations not only improve your application's performance but also help you stay within API quotas while getting better user experience through faster response times.