Skip to main content

Performance Optimization Guide

This guide provides comprehensive strategies to optimize your CyberSecFeed API integration for maximum performance, minimal bandwidth usage, and efficient quota consumption.

Performance Overview

The CyberSecFeed API is designed for high performance with several optimization layers:

  • Edge Caching: 80%+ cache hit rates with Cloudflare CDN
  • Parameter-Based Loading: Request only the data you need
  • Field Projection: Minimize payload size
  • ETag Support: Efficient client-side caching
  • Batch Operations: Reduce request overhead

Performance Metrics

OperationP95 Response TimeCache Hit RateTypical Payload
CVE Detail (default)<200ms85%~2KB
CVE Detail + ACSC<250ms80%~2.5KB
CVE Search<500ms70%~20KB (10 results)
Batch Lookup (50 CVEs)<1s60%~100KB
KEV Catalog<300ms90%~50KB

1. Parameter-Based Optimization

Use Selective Data Loading

The most impactful optimization is requesting only the enrichment data you need:

# ❌ Slow - requests unnecessary data
def get_all_enrichment(cve_id):
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
params={'include': 'acsc,enrichment'},
headers=headers
)
return response.json()

# ✅ Fast - only core data + KEV + EPSS (default)
def get_basic_data(cve_id):
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers=headers
)
return response.json()

# ✅ Optimal - selective inclusion based on use case
def get_targeted_data(cve_id, include_regional=False):
params = {}
if include_regional:
params['include'] = 'acsc'

response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
params=params,
headers=headers
)
return response.json()

Performance Impact by Parameter

Parameter CombinationResponse TimePayload SizeUse Case
Default (no params)FastestSmallestBasic vulnerability scanning
include=acsc+10-20%+25%Regional threat intelligence
include=enrichment+20-30%+50%Advanced analytics
include=acsc,enrichment+30-40%+75%Complete intelligence picture

2. Field Projection Optimization

Request only the fields you need to minimize bandwidth and processing time:

# ❌ Full response (unnecessary data)
def get_full_cve(cve_id):
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers=headers
)
return response.json()

# ✅ Optimized for risk scoring
def get_risk_data(cve_id):
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
params={'fields': 'id,cvss.baseScore,kev,epss.score'},
headers=headers
)
return response.json()

# ✅ Optimized for compliance reporting
def get_compliance_data(cve_id):
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
params={
'fields': 'id,published,cvss.baseScore,cvss.baseSeverity,kev.dateAdded',
'include': 'acsc'
},
headers=headers
)
return response.json()

Common Field Combinations

FIELD_PRESETS = {
'minimal': 'id,cvss.baseScore',
'risk_scoring': 'id,cvss.baseScore,kev,epss.score',
'dashboard': 'id,published,cvss.baseScore,cvss.baseSeverity,description',
'compliance': 'id,published,cvss,kev.dateAdded,kev.requiredAction',
'threat_intel': 'id,cvss.baseScore,kev,epss.score,acsc.priority'
}

def get_cve_optimized(cve_id, preset='minimal', include_acsc=False):
params = {'fields': FIELD_PRESETS[preset]}
if include_acsc:
params['include'] = 'acsc'

response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
params=params,
headers=headers
)
return response.json()

3. Caching Strategies

ETag-Based Caching

Implement proper ETag support for maximum cache efficiency:

import hashlib

class OptimizedCyberSecFeedClient:
def __init__(self, api_key):
self.api_key = api_key
self.etag_cache = {}
self.session = requests.Session()
self.session.headers.update({'X-API-Key': api_key})

def get_cve_cached(self, cve_id, params=None):
"""Get CVE with ETag caching support"""

url = f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}"
cache_key = self._make_cache_key(url, params)

headers = {}
if cache_key in self.etag_cache:
headers['If-None-Match'] = self.etag_cache[cache_key]['etag']

response = self.session.get(url, params=params, headers=headers)

if response.status_code == 304:
# Cache hit - return cached data
return self.etag_cache[cache_key]['data']
elif response.status_code == 200:
# Cache miss - store new data
data = response.json()
if 'etag' in response.headers:
self.etag_cache[cache_key] = {
'etag': response.headers['etag'],
'data': data
}
return data
else:
response.raise_for_status()

def _make_cache_key(self, url, params):
"""Create cache key from URL and parameters"""
if params:
param_str = '&'.join(f"{k}={v}" for k, v in sorted(params.items()))
cache_input = f"{url}?{param_str}"
else:
cache_input = url
return hashlib.md5(cache_input.encode()).hexdigest()

# Usage
client = OptimizedCyberSecFeedClient(api_key)

# First request - cache miss
data1 = client.get_cve_cached('CVE-2024-0001') # ~200ms

# Second request - cache hit
data2 = client.get_cve_cached('CVE-2024-0001') # ~20ms

Application-Level Caching

from functools import lru_cache
import time

class CacheConfig:
CVE_CACHE_TTL = 3600 # 1 hour
STATS_CACHE_TTL = 300 # 5 minutes
KEV_CACHE_TTL = 1800 # 30 minutes

class TimedCache:
def __init__(self, ttl):
self.cache = {}
self.ttl = ttl

def get(self, key):
if key in self.cache:
data, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
return data
else:
del self.cache[key]
return None

def set(self, key, value):
self.cache[key] = (value, time.time())

class CachedClient:
def __init__(self, api_key):
self.api_key = api_key
self.cve_cache = TimedCache(CacheConfig.CVE_CACHE_TTL)
self.stats_cache = TimedCache(CacheConfig.STATS_CACHE_TTL)

def get_cve(self, cve_id, use_cache=True):
if use_cache:
cached = self.cve_cache.get(cve_id)
if cached:
return cached

# Make API request
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers={'X-API-Key': self.api_key}
)
data = response.json()

if use_cache:
self.cve_cache.set(cve_id, data)

return data

4. Batch Operation Optimization

Efficient Batch Processing

def optimized_batch_processing(cve_ids, api_key):
"""Process CVEs in optimized batches"""

session = requests.Session()
session.headers.update({'X-API-Key': api_key})

# Use maximum batch size
batch_size = 50
all_results = []

for i in range(0, len(cve_ids), batch_size):
batch = cve_ids[i:i + batch_size]

# Optimize batch request
params = {
'ids': ','.join(batch),
'fields': 'id,cvss.baseScore,kev,epss.score', # Only essential data
# Include ACSC only if needed for this batch
# 'include': 'acsc'
}

response = session.get(
'https://api.cybersecfeed.com/api/v1/cves',
params=params
)

if response.status_code == 200:
batch_data = response.json()
all_results.extend(batch_data['data']['cves'])

# Small delay to avoid overwhelming the API
time.sleep(0.1)

return all_results

# Process 500 CVEs efficiently
large_cve_list = [f"CVE-2024-{str(i).zfill(4)}" for i in range(1, 501)]
results = optimized_batch_processing(large_cve_list, api_key)
print(f"Processed {len(results)} CVEs")

Parallel Processing with Rate Limiting

import asyncio
import aiohttp
from asyncio import Semaphore

class AsyncOptimizedClient:
def __init__(self, api_key, max_concurrent=10):
self.api_key = api_key
self.semaphore = Semaphore(max_concurrent)
self.base_url = "https://api.cybersecfeed.com/api/v1"

async def fetch_cve_batch(self, session, cve_ids, include_acsc=False):
"""Fetch a batch of CVEs asynchronously"""

async with self.semaphore: # Rate limiting
params = {
'ids': ','.join(cve_ids[:50]),
'fields': 'id,cvss.baseScore,kev,epss.score'
}
if include_acsc:
params['include'] = 'acsc'

async with session.get(
f"{self.base_url}/cves",
params=params,
headers={'X-API-Key': self.api_key}
) as response:
return await response.json()

async def process_large_list(self, all_cve_ids, include_acsc=False):
"""Process large CVE list with controlled concurrency"""

# Split into batches of 50
batches = [all_cve_ids[i:i+50] for i in range(0, len(all_cve_ids), 50)]

async with aiohttp.ClientSession() as session:
tasks = [
self.fetch_cve_batch(session, batch, include_acsc)
for batch in batches
]

batch_results = await asyncio.gather(*tasks)

# Combine all results
all_cves = []
for batch_result in batch_results:
if batch_result.get('data', {}).get('cves'):
all_cves.extend(batch_result['data']['cves'])

return all_cves

# Usage
async def main():
client = AsyncOptimizedClient(api_key, max_concurrent=5)
large_list = [f"CVE-2024-{str(i).zfill(4)}" for i in range(1, 1001)]

start_time = time.time()
results = await client.process_large_list(large_list)
end_time = time.time()

print(f"Processed {len(results)} CVEs in {end_time - start_time:.2f} seconds")

# Run async processing
asyncio.run(main())

5. Connection Optimization

Connection Pooling

from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class OptimizedSession:
def __init__(self, api_key):
self.session = requests.Session()

# Configure connection pooling
adapter = HTTPAdapter(
pool_connections=10, # Number of connection pools
pool_maxsize=20, # Max connections per pool
max_retries=Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1,
respect_retry_after_header=True
)
)

self.session.mount('https://', adapter)
self.session.headers.update({
'X-API-Key': api_key,
'User-Agent': 'MyApp/1.0 (Optimized)',
'Accept-Encoding': 'gzip, deflate'
})

def get(self, url, **kwargs):
return self.session.get(url, **kwargs)

# Use persistent session
session = OptimizedSession(api_key)

# Multiple requests reuse connections
for cve_id in cve_list:
response = session.get(f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}")

Timeout Optimization

# Configure timeouts for different operations
TIMEOUTS = {
'ping': (2, 5), # (connect, read) in seconds
'cve_detail': (3, 10),
'search': (5, 30),
'batch': (5, 60),
'stats': (3, 15)
}

def get_cve_with_timeout(cve_id, operation_type='cve_detail'):
timeout = TIMEOUTS.get(operation_type, (5, 30))

response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers={'X-API-Key': api_key},
timeout=timeout
)
return response.json()

6. Use Case-Specific Optimizations

High-Frequency Monitoring

class HighFrequencyMonitor:
"""Optimized for frequent checks with minimal data"""

def __init__(self, api_key):
self.client = OptimizedCyberSecFeedClient(api_key)
self.last_check = None

def check_new_critical_cves(self):
"""Fast check for new critical CVEs"""

# Use minimal fields for speed
params = {
'severity_min': 9.0,
'limit': 10,
'fields': 'id,published,cvss.baseScore'
}

if self.last_check:
params['published_after'] = self.last_check

response = self.client.session.get(
'https://api.cybersecfeed.com/api/v1/cves',
params=params
)

self.last_check = datetime.utcnow().isoformat()
return response.json()

Bulk Data Analysis

class BulkAnalyzer:
"""Optimized for processing large datasets"""

def __init__(self, api_key):
self.api_key = api_key

def analyze_cve_list(self, cve_ids):
"""Analyze large CVE list with optimal batching"""

# Process in chunks with minimal data
results = []
chunk_size = 50

for i in range(0, len(cve_ids), chunk_size):
chunk = cve_ids[i:i + chunk_size]

response = requests.get(
'https://api.cybersecfeed.com/api/v1/cves',
params={
'ids': ','.join(chunk),
'fields': 'id,cvss.baseScore,kev,epss.score'
},
headers={'X-API-Key': self.api_key}
)

if response.status_code == 200:
chunk_data = response.json()['data']['cves']
results.extend(chunk_data)

return self.calculate_risk_metrics(results)

def calculate_risk_metrics(self, cves):
"""Fast risk calculation on minimal data"""

metrics = {
'total_cves': len(cves),
'critical_count': 0,
'kev_count': 0,
'high_epss_count': 0,
'avg_cvss': 0
}

total_cvss = 0
for cve in cves:
cvss_score = cve.get('cvss', {}).get('baseScore', 0)
total_cvss += cvss_score

if cvss_score >= 9.0:
metrics['critical_count'] += 1

if cve.get('kev'):
metrics['kev_count'] += 1

if cve.get('epss', {}).get('score', 0) > 0.7:
metrics['high_epss_count'] += 1

metrics['avg_cvss'] = total_cvss / len(cves) if cves else 0
return metrics

7. Performance Monitoring

Response Time Tracking

import time
from collections import defaultdict

class PerformanceTracker:
def __init__(self):
self.metrics = defaultdict(list)

def track_request(self, operation, func, *args, **kwargs):
"""Track request performance"""

start_time = time.time()
try:
result = func(*args, **kwargs)
success = True
except Exception as e:
result = None
success = False

duration = time.time() - start_time

self.metrics[operation].append({
'duration': duration,
'success': success,
'timestamp': time.time()
})

return result

def get_stats(self, operation):
"""Get performance statistics"""

if operation not in self.metrics:
return None

durations = [m['duration'] for m in self.metrics[operation] if m['success']]

if not durations:
return None

return {
'count': len(durations),
'avg': sum(durations) / len(durations),
'min': min(durations),
'max': max(durations),
'p95': sorted(durations)[int(len(durations) * 0.95)]
}

# Usage
tracker = PerformanceTracker()

# Track CVE requests
def get_cve_tracked(cve_id):
return tracker.track_request(
'cve_detail',
lambda: requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers={'X-API-Key': api_key}
).json()
)

# Get performance stats
stats = tracker.get_stats('cve_detail')
print(f"Average response time: {stats['avg']:.3f}s")
print(f"95th percentile: {stats['p95']:.3f}s")

8. Optimization Checklist

Pre-Optimization Audit

  • Identify which enrichment data you actually use
  • Review field usage - are you reading all returned fields?
  • Check for duplicate or unnecessary API calls
  • Measure current response times and bandwidth usage

Implementation

  • Implement parameter-based data loading (include parameters)
  • Add field projection for targeted data retrieval
  • Set up proper ETag-based caching
  • Use batch operations for multiple CVE lookups
  • Configure connection pooling and timeouts

Monitoring

  • Track response times and cache hit rates
  • Monitor quota usage via /usage endpoint
  • Set up alerts for performance degradation
  • Regular performance testing with realistic data sets

Advanced Optimizations

  • Implement application-level caching with TTL
  • Use async processing for bulk operations
  • Consider CDN or proxy caching for frequently accessed data
  • Optimize data processing pipelines for minimal memory usage

Expected Performance Gains

Following this guide, you can expect:

OptimizationPerformance ImprovementBandwidth ReductionQuota Efficiency
Parameter Control20-40% faster25-50% lessSame requests, less data
Field Projection30-60% faster40-70% lessSame requests, minimal data
ETag Caching80-95% faster (cached)90%+ lessDramatically fewer requests
Batch Operations50x faster (vs individual)Overhead reduction98% fewer requests
Combined5-10x improvement70-90% reductionMassive efficiency gains

These optimizations not only improve your application's performance but also help you stay within API quotas while getting better user experience through faster response times.