HTTP Caching with ETags
CyberSecFeed supports HTTP caching through ETags (Entity Tags), allowing you to avoid downloading unchanged data and significantly reduce bandwidth usage.
How ETags Work
- Initial Request: Server sends response with
ETag
header - Client Storage: Client stores the ETag value
- Subsequent Request: Client sends
If-None-Match
header with stored ETag - Server Response:
304 Not Modified
if data unchanged (no response body)200 OK
with new data and new ETag if changed
Benefits
- Bandwidth Savings: Up to 96% reduction for unchanged data
- Faster Response: 304 responses are much quicker
- Reduced Server Load: Less data processing and transfer
- Cost Efficiency: Lower data transfer costs
Basic Implementation
cURL Example
# First request - save ETag
curl -i -H "X-API-Key: your-api-key-here" \
https://api.cybersecfeed.com/api/v1/cve/CVE-2024-0001
# Response includes:
# ETag: "33a64df551425fcc55e4d42a148795d9f25f89d4"
# Subsequent request with ETag
curl -i -H "X-API-Key: your-api-key-here" \
-H 'If-None-Match: "33a64df551425fcc55e4d42a148795d9f25f89d4"' \
https://api.cybersecfeed.com/api/v1/cve/CVE-2024-0001
# If unchanged: HTTP/1.1 304 Not Modified
Python Implementation
import requests
import json
import os
class CachedAPIClient:
def __init__(self, api_key, cache_dir="./cache"):
self.api_key = api_key
self.cache_dir = cache_dir
self.headers = {"X-API-Key": api_key}
# Create cache directory
os.makedirs(cache_dir, exist_ok=True)
def _get_cache_path(self, cache_key):
"""Get file path for cache storage"""
return os.path.join(self.cache_dir, f"{cache_key}.json")
def _load_cache(self, cache_key):
"""Load cached data and ETag"""
cache_path = self._get_cache_path(cache_key)
if os.path.exists(cache_path):
with open(cache_path, 'r') as f:
return json.load(f)
return None
def _save_cache(self, cache_key, data, etag):
"""Save data and ETag to cache"""
cache_path = self._get_cache_path(cache_key)
cache_data = {
"etag": etag,
"data": data
}
with open(cache_path, 'w') as f:
json.dump(cache_data, f)
def get_cve(self, cve_id):
"""Get CVE with caching support"""
url = f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}"
cache_key = f"cve_{cve_id}"
# Check cache
cached = self._load_cache(cache_key)
# Prepare headers
headers = self.headers.copy()
if cached and cached.get("etag"):
headers["If-None-Match"] = cached["etag"]
# Make request
response = requests.get(url, headers=headers)
if response.status_code == 304:
# Data hasn't changed, return cached version
print(f"Cache hit for {cve_id}")
return cached["data"]
elif response.status_code == 200:
# New data received
data = response.json()
# Save to cache if ETag present
if "ETag" in response.headers:
self._save_cache(cache_key, data, response.headers["ETag"])
print(f"Cache updated for {cve_id}")
return data
else:
response.raise_for_status()
# Usage example
client = CachedAPIClient("your-api-key-here")
# First call - fetches from API
cve_data = client.get_cve("CVE-2024-0001")
# Second call - returns cached if unchanged
cve_data = client.get_cve("CVE-2024-0001")
Advanced Caching Strategies
In-Memory Cache with TTL
from datetime import datetime, timedelta
import hashlib
class InMemoryCachedClient:
def __init__(self, api_key, ttl_minutes=60):
self.api_key = api_key
self.ttl = timedelta(minutes=ttl_minutes)
self.cache = {} # {url: {"etag": str, "data": dict, "expires": datetime}}
def _is_cache_valid(self, cached_item):
"""Check if cached item is still within TTL"""
return datetime.now() < cached_item["expires"]
def get(self, path):
"""GET request with caching"""
url = f"https://api.cybersecfeed.com{path}"
headers = {"X-API-Key": self.api_key}
# Check cache
if url in self.cache and self._is_cache_valid(self.cache[url]):
cached = self.cache[url]
headers["If-None-Match"] = cached["etag"]
response = requests.get(url, headers=headers)
if response.status_code == 304 and url in self.cache:
# Extend cache TTL
self.cache[url]["expires"] = datetime.now() + self.ttl
return self.cache[url]["data"]
elif response.status_code == 200:
data = response.json()
# Cache with TTL
if "ETag" in response.headers:
self.cache[url] = {
"etag": response.headers["ETag"],
"data": data,
"expires": datetime.now() + self.ttl
}
return data
response.raise_for_status()
Database-Backed Cache
import sqlite3
from datetime import datetime
class DatabaseCachedClient:
def __init__(self, api_key, db_path="cache.db"):
self.api_key = api_key
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Initialize cache database"""
conn = sqlite3.connect(self.db_path)
conn.execute('''
CREATE TABLE IF NOT EXISTS cache (
url TEXT PRIMARY KEY,
etag TEXT,
data TEXT,
last_updated TIMESTAMP
)
''')
conn.commit()
conn.close()
def get_cve_with_cache(self, cve_id):
"""Get CVE with database caching"""
url = f"https://api.cybersecfeed.com/api/v1/cve/{cve_id}"
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Check cache
cursor.execute("SELECT etag, data FROM cache WHERE url = ?", (url,))
cached = cursor.fetchone()
headers = {"X-API-Key": self.api_key}
if cached:
headers["If-None-Match"] = cached[0]
response = requests.get(url, headers=headers)
if response.status_code == 304 and cached:
# Return cached data
conn.close()
return json.loads(cached[1])
elif response.status_code == 200:
data = response.json()
etag = response.headers.get("ETag", "")
# Update cache
cursor.execute('''
INSERT OR REPLACE INTO cache (url, etag, data, last_updated)
VALUES (?, ?, ?, ?)
''', (url, etag, json.dumps(data), datetime.now()))
conn.commit()
conn.close()
return data
conn.close()
response.raise_for_status()
Monitoring Cache Performance
class CacheMetrics:
def __init__(self):
self.hits = 0
self.misses = 0
self.bandwidth_saved = 0
self.requests_saved = 0
def record_hit(self, estimated_size_bytes=5000):
"""Record a cache hit"""
self.hits += 1
self.requests_saved += 1
self.bandwidth_saved += estimated_size_bytes
def record_miss(self):
"""Record a cache miss"""
self.misses += 1
def get_stats(self):
"""Get cache statistics"""
total = self.hits + self.misses
hit_rate = (self.hits / total * 100) if total > 0 else 0
return {
"total_requests": total,
"cache_hits": self.hits,
"cache_misses": self.misses,
"hit_rate": f"{hit_rate:.1f}%",
"bandwidth_saved_mb": round(self.bandwidth_saved / 1024 / 1024, 2),
"api_calls_saved": self.requests_saved
}
# Usage with metrics
metrics = CacheMetrics()
class MetricsCachedClient(CachedAPIClient):
def get_cve(self, cve_id):
# ... existing code ...
if response.status_code == 304:
metrics.record_hit()
# ... return cached data ...
elif response.status_code == 200:
metrics.record_miss()
# ... process new data ...
# Check performance
print(metrics.get_stats())
# Output: {'hit_rate': '78.5%', 'bandwidth_saved_mb': 123.4, ...}
Best Practices
1. Handle Cache Invalidation
def invalidate_cache(self, cve_id=None):
"""Invalidate cache for specific CVE or all"""
if cve_id:
cache_key = f"cve_{cve_id}"
cache_path = self._get_cache_path(cache_key)
if os.path.exists(cache_path):
os.remove(cache_path)
else:
# Clear all cache
for file in os.listdir(self.cache_dir):
os.remove(os.path.join(self.cache_dir, file))
2. Respect Cache Headers
def should_cache(response):
"""Determine if response should be cached"""
# Check Cache-Control header
cache_control = response.headers.get("Cache-Control", "")
if "no-store" in cache_control:
return False
# Must have ETag
return "ETag" in response.headers
3. Implement Cache Warmup
async def warmup_cache(cve_ids, api_client):
"""Pre-populate cache with frequently accessed CVEs"""
import asyncio
async def fetch_cve(cve_id):
return await api_client.get_cve_async(cve_id)
# Fetch in parallel
tasks = [fetch_cve(cve_id) for cve_id in cve_ids]
await asyncio.gather(*tasks)
Performance Impact
Scenario | Without Cache | With Cache | Improvement |
---|---|---|---|
Bandwidth per request | 5 KB | 200 B | 96% reduction |
Response time | 200ms | 50ms | 75% faster |
API quota usage | 1 call | 1 call | No change |
Data freshness | Always fresh | When changed | Optimal |
Summary
ETag caching is a powerful feature that can dramatically reduce bandwidth usage and improve response times. By implementing proper caching, you can:
- Save up to 96% bandwidth on unchanged data
- Reduce response times by 75%
- Lower infrastructure costs
- Improve application performance
Always implement caching for production applications to ensure optimal performance and efficiency.