Performance Optimization
This guide covers techniques to maximize the performance of your CyberSecFeed API integration, from connection management to parallel processing.
Connection Management
Connection Pooling
Reusing HTTP connections significantly reduces latency by avoiding repeated SSL/TLS handshakes.
Python with requests
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class OptimizedAPIClient:
def __init__(self, api_key):
self.api_key = api_key
self.session = self._create_session()
def _create_session(self):
"""Create session with connection pooling and retry logic"""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"]
)
# Configure connection pooling
adapter = HTTPAdapter(
pool_connections=100, # Number of connection pools
pool_maxsize=100, # Connections per pool
max_retries=retry_strategy
)
# Mount adapter for HTTPS
session.mount("https://", adapter)
# Set default headers
session.headers.update({
"X-API-Key": self.api_key,
"Accept-Encoding": "gzip", # Enable compression
"Connection": "keep-alive"
})
return session
def get_cve(self, cve_id):
"""Get CVE using pooled connection"""
response = self.session.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}"
)
return response.json()
# Usage - connection is reused
client = OptimizedAPIClient("your-api-key-here")
for cve_id in cve_list:
data = client.get_cve(cve_id)
Node.js with Keep-Alive
const https = require('https');
const fetch = require('node-fetch');
// Create agent with keep-alive
const httpsAgent = new https.Agent({
keepAlive: true,
maxSockets: 100,
maxFreeSockets: 10,
timeout: 60000,
keepAliveMsecs: 30000,
});
class OptimizedAPIClient {
constructor(apiKey) {
this.apiKey = apiKey;
this.baseUrl = 'https://api.cybersecfeed.com/api/v1';
this.agent = httpsAgent;
}
async getCVE(cveId) {
const response = await fetch(`${this.baseUrl}/cve/${cveId}`, {
headers: {
'X-API-Key': this.apiKey,
'Accept-Encoding': 'gzip',
},
agent: this.agent,
compress: true,
});
return response.json();
}
}
Parallel Processing
Concurrent Requests
Process multiple requests simultaneously to improve throughput.
Python with asyncio
import asyncio
import aiohttp
from typing import List, Dict
class AsyncAPIClient:
def __init__(self, api_key: str, max_concurrent: int = 10):
self.api_key = api_key
self.base_url = "https://api.cybersecfeed.com/api/v1"
self.semaphore = asyncio.Semaphore(max_concurrent)
async def get_cve(self, session: aiohttp.ClientSession, cve_id: str) -> Dict:
"""Get single CVE with rate limiting"""
async with self.semaphore:
url = f"{self.base_url}/cve/{cve_id}"
async with session.get(url) as response:
return await response.json()
async def get_multiple_cves(self, cve_ids: List[str]) -> List[Dict]:
"""Get multiple CVEs concurrently"""
headers = {"X-API-Key": self.api_key}
async with aiohttp.ClientSession(headers=headers) as session:
tasks = [self.get_cve(session, cve_id) for cve_id in cve_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out errors
return [r for r in results if not isinstance(r, Exception)]
async def batch_process_cves(self, all_cve_ids: List[str], batch_size: int = 50):
"""Process large lists in concurrent batches"""
all_results = []
for i in range(0, len(all_cve_ids), batch_size):
batch = all_cve_ids[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}...")
# Use batch endpoint for efficiency
if len(batch) <= 50:
results = await self.get_batch(batch)
else:
results = await self.get_multiple_cves(batch)
all_results.extend(results)
return all_results
# Usage
async def main():
client = AsyncAPIClient("your-api-key-here", max_concurrent=20)
# Process 1000 CVEs
cve_ids = [f"CVE-2024-{i:04d}" for i in range(1, 1001)]
start_time = asyncio.get_event_loop().time()
results = await client.batch_process_cves(cve_ids)
end_time = asyncio.get_event_loop().time()
print(f"Processed {len(results)} CVEs in {end_time - start_time:.2f} seconds")
# Run
asyncio.run(main())
Python with concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from typing import List, Dict
class ConcurrentAPIClient:
def __init__(self, api_key: str, max_workers: int = 10):
self.api_key = api_key
self.max_workers = max_workers
self.session = self._create_session()
def _create_session(self):
session = requests.Session()
session.headers.update({"X-API-Key": self.api_key})
return session
def get_cve(self, cve_id: str) -> Dict:
"""Get single CVE"""
response = self.session.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}"
)
return response.json()
def get_multiple_cves_concurrent(self, cve_ids: List[str]) -> List[Dict]:
"""Get multiple CVEs using thread pool"""
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_cve = {
executor.submit(self.get_cve, cve_id): cve_id
for cve_id in cve_ids
}
# Process completed results
for future in as_completed(future_to_cve):
cve_id = future_to_cve[future]
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Error fetching {cve_id}: {e}")
return results
# Usage
client = ConcurrentAPIClient("your-api-key-here", max_workers=20)
cve_ids = ["CVE-2024-0001", "CVE-2024-0002", "CVE-2024-0003"]
results = client.get_multiple_cves_concurrent(cve_ids)
Caching Strategies
Edge Caching with ETag Support
CyberSecFeed leverages Cloudflare edge caching with ETag support for optimal performance:
import time
import requests
from functools import lru_cache
from typing import Optional, Dict
class EdgeOptimizedClient:
def __init__(self, api_key):
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({"X-API-Key": api_key})
self.etag_cache = {} # Local ETag storage
def get_cve_with_cache(self, cve_id: str) -> Dict:
"""Get CVE with ETag caching for edge optimization"""
url = f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}"
# Add If-None-Match header if we have an ETag
headers = {}
if cve_id in self.etag_cache:
headers["If-None-Match"] = self.etag_cache[cve_id]["etag"]
response = self.session.get(url, headers=headers)
# Handle 304 Not Modified (cached at edge)
if response.status_code == 304:
print(f"Cache hit for {cve_id} (304 Not Modified)")
return self.etag_cache[cve_id]["data"]
# Handle 200 OK (fresh data)
if response.status_code == 200:
data = response.json()
# Store ETag for future requests
if "ETag" in response.headers:
self.etag_cache[cve_id] = {
"etag": response.headers["ETag"],
"data": data,
"timestamp": time.time()
}
return data
response.raise_for_status()
def get_stats_with_cache(self) -> Dict:
"""Get platform stats with edge caching"""
url = "https://api.cybersecfeed.com/api/v1/stats"
# Add conditional headers for stats endpoint
headers = {}
if "stats" in self.etag_cache:
headers["If-None-Match"] = self.etag_cache["stats"]["etag"]
response = self.session.get(url, headers=headers)
if response.status_code == 304:
print("Stats cache hit at edge")
return self.etag_cache["stats"]["data"]
if response.status_code == 200:
data = response.json()
if "ETag" in response.headers:
self.etag_cache["stats"] = {
"etag": response.headers["ETag"],
"data": data,
"timestamp": time.time()
}
return data
response.raise_for_status()
# Usage example
client = EdgeOptimizedClient("your-api-key-here")
# First request: Cache miss, fetches from origin
data1 = client.get_cve_with_cache("CVE-2024-0001")
# Second request: Cache hit, served from edge (near-zero latency)
data2 = client.get_cve_with_cache("CVE-2024-0001")
Multi-Level Cache with Edge Integration
Combine local caching with edge optimization:
class MultiLevelCache:
def __init__(self, api_client):
self.api_client = api_client
self.memory_cache = {} # L1 cache (local memory)
self.memory_ttl = 300 # 5 minutes
# L2 cache is handled by Cloudflare edge with ETags
@lru_cache(maxsize=1000)
def _get_from_memory(self, cve_id: str) -> Optional[Dict]:
"""L1 memory cache with LRU eviction"""
if cve_id in self.memory_cache:
entry = self.memory_cache[cve_id]
if time.time() < entry['expires']:
return entry['data']
return None
def get_cve(self, cve_id: str) -> Dict:
"""Get CVE with multi-level caching strategy"""
# Check L1 (local memory) - fastest
cached = self._get_from_memory(cve_id)
if cached:
return cached
# Check L2 (edge cache via ETag) - fast
# L3 (origin server) - slowest
data = self.api_client.get_cve_with_cache(cve_id)
# Store in L1 for future requests
self.memory_cache[cve_id] = {
'data': data,
'expires': time.time() + self.memory_ttl
}
return data
def get_cache_performance(self) -> Dict:
"""Get cache performance metrics"""
total_requests = len(self.memory_cache)
return {
"l1_cache_size": total_requests,
"l1_cache_limit": 1000,
"edge_cache_enabled": True,
"estimated_cache_hit_rate": "80%+", # Based on platform metrics
"cache_levels": ["Memory (L1)", "Edge (L2)", "Origin (L3)"]
}
Cache Performance Optimization
Leverage platform-specific cache behaviors:
class CacheOptimizedClient:
def __init__(self, api_key):
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": api_key,
"Accept-Encoding": "gzip, br", # Enable compression
"Cache-Control": "max-age=300" # 5-minute client cache
})
def get_cacheable_endpoints(self) -> Dict:
"""Endpoints optimized for caching"""
return {
"cve_detail": {
"url_pattern": "/api/v1/cve/{id}",
"cache_ttl": "30 minutes",
"edge_cache": True,
"notes": "CVE data changes infrequently"
},
"kev_catalog": {
"url_pattern": "/api/v1/kev",
"cache_ttl": "1 hour",
"edge_cache": True,
"notes": "KEV catalog updated daily"
},
"platform_stats": {
"url_pattern": "/api/v1/stats",
"cache_ttl": "5 minutes",
"edge_cache": True,
"notes": "Stats updated frequently but cacheable"
},
"batch_queries": {
"url_pattern": "/api/v1/cves?ids=...",
"cache_ttl": "15 minutes",
"edge_cache": True,
"notes": "Batch requests highly cacheable"
}
}
def benchmark_cache_performance(self, cve_id: str, iterations: int = 10):
"""Benchmark cache performance"""
url = f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}"
times = []
for i in range(iterations):
start = time.time()
response = self.session.get(url)
elapsed = time.time() - start
times.append(elapsed)
cache_status = "HIT" if response.status_code == 304 else "MISS"
print(f"Request {i+1}: {elapsed:.3f}s ({cache_status})")
return {
"average_response_time": sum(times) / len(times),
"min_response_time": min(times),
"max_response_time": max(times),
"cache_optimization": "Enabled"
}
Request Optimization
Field Projection
Reduce response size by requesting only needed fields:
def get_cves_minimal(api_key, cve_ids):
"""Get only essential CVE data"""
# Request only specific fields
fields = "id,cvss.baseScore,kev,epss.score"
response = requests.get(
"https://api.cybersecfeed.com/api/v1/cves",
headers={"X-API-Key": api_key},
params={
"ids": ",".join(cve_ids),
"fields": fields
}
)
return response.json()
# Full response: ~250KB for 50 CVEs
# Projected response: ~25KB for 50 CVEs (90% reduction)
Batch vs Individual Requests
import time
def benchmark_approaches(api_key, cve_ids):
"""Compare batch vs individual performance"""
# Individual requests
start = time.time()
individual_results = []
for cve_id in cve_ids[:10]: # Test with 10
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers={"X-API-Key": api_key}
)
individual_results.append(response.json())
individual_time = time.time() - start
# Batch request
start = time.time()
response = requests.get(
"https://api.cybersecfeed.com/api/v1/cves",
headers={"X-API-Key": api_key},
params={"ids": ",".join(cve_ids[:10])}
)
batch_results = response.json()
batch_time = time.time() - start
print(f"Individual: {individual_time:.2f}s")
print(f"Batch: {batch_time:.2f}s")
print(f"Speedup: {individual_time/batch_time:.1f}x")
Monitoring and Metrics
Performance Tracking
import time
from dataclasses import dataclass
from typing import List
import statistics
@dataclass
class PerformanceMetrics:
request_count: int = 0
total_time: float = 0
response_times: List[float] = None
cache_hits: int = 0
errors: int = 0
def __post_init__(self):
if self.response_times is None:
self.response_times = []
def add_request(self, response_time: float, cache_hit: bool = False):
self.request_count += 1
self.total_time += response_time
self.response_times.append(response_time)
if cache_hit:
self.cache_hits += 1
def get_stats(self):
if not self.response_times:
return {}
return {
"total_requests": self.request_count,
"average_time": self.total_time / self.request_count,
"median_time": statistics.median(self.response_times),
"p95_time": statistics.quantiles(self.response_times, n=20)[18],
"p99_time": statistics.quantiles(self.response_times, n=100)[98],
"cache_hit_rate": self.cache_hits / self.request_count * 100,
"error_rate": self.errors / self.request_count * 100
}
class MonitoredAPIClient:
def __init__(self, api_key):
self.api_key = api_key
self.metrics = PerformanceMetrics()
def get_cve(self, cve_id):
start_time = time.time()
try:
response = requests.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}",
headers={"X-API-Key": self.api_key}
)
response.raise_for_status()
elapsed = time.time() - start_time
self.metrics.add_request(elapsed)
return response.json()
except Exception as e:
self.metrics.errors += 1
raise
def print_performance_report(self):
stats = self.metrics.get_stats()
print("\nPerformance Report:")
print(f"Total Requests: {stats['total_requests']}")
print(f"Average Response Time: {stats['average_time']:.3f}s")
print(f"P95 Response Time: {stats['p95_time']:.3f}s")
print(f"Cache Hit Rate: {stats['cache_hit_rate']:.1f}%")
Best Practices Summary
1. Connection Management
- Use connection pooling
- Enable HTTP keep-alive
- Set appropriate timeouts
- Enable gzip compression
2. Request Strategy
- Use batch endpoints when possible
- Implement field projection
- Cache aggressively
- Process concurrently
3. Error Handling
def robust_api_call(func, *args, max_retries=3, **kwargs):
"""Wrapper for robust API calls"""
last_error = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except requests.exceptions.Timeout:
last_error = "Timeout"
time.sleep(2 ** attempt)
except requests.exceptions.ConnectionError:
last_error = "Connection error"
time.sleep(2 ** attempt)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
# Rate limited
time.sleep(60)
else:
raise
raise Exception(f"Failed after {max_retries} attempts: {last_error}")
4. Resource Management
class APIClientContext:
"""Context manager for proper resource cleanup"""
def __init__(self, api_key):
self.api_key = api_key
self.session = None
def __enter__(self):
self.session = requests.Session()
self.session.headers.update({"X-API-Key": self.api_key})
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.session:
self.session.close()
def get_cve(self, cve_id):
return self.session.get(
f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}"
).json()
# Usage
with APIClientContext("your-api-key-here") as client:
data = client.get_cve("CVE-2024-0001")
Performance Benchmarks
Optimization | Improvement | Use Case | Platform Feature |
---|---|---|---|
Edge Caching | Near-zero latency | Repeated queries | Cloudflare CDN |
ETag Validation | High cache hit rate | Conditional requests | RFC 7232 compliant |
Connection Pooling | 40-60% faster | Multiple requests | HTTP keep-alive |
Batch Requests | 95% faster | Bulk lookups | Native batching |
Field Projection | 80% less data | Limited bandwidth | Query optimization |
Parallel Processing | 5-10x faster | Large datasets | Concurrent requests |
Gzip Compression | 70% smaller | All responses | Automatic compression |
Summary
Optimizing API performance requires a multi-faceted approach leveraging both client-side and platform optimizations:
Platform-Level Optimizations (Built-in)
- Edge Caching: Cloudflare CDN with intelligent caching
- ETag Support: RFC 7232 compliant conditional requests
- Automatic Compression: Gzip/Brotli encoding for all responses
- Global Distribution: Worldwide edge locations for low latency
Client-Side Optimizations (Implementation)
- Minimize Round Trips: Use batch endpoints and pagination efficiently
- Reduce Data Transfer: Project only needed fields with
fields
parameter - Reuse Connections: Implement connection pooling and HTTP keep-alive
- Process Concurrently: Use async/parallel processing within quotas
- Cache Strategically: Implement multi-level caching with ETag validation
- Monitor Performance: Track metrics to identify bottlenecks
Expected Performance Gains
- Edge caching: Near-zero latency for cached content
- Combined optimizations: 10x or better performance improvements
- Bandwidth efficiency: Up to 95% reduction with proper caching
- Scalability: Handle larger workloads within API quotas
By implementing these optimizations and leveraging platform features, you can achieve enterprise-grade performance while maintaining cost efficiency.