Pagination
CyberSecFeed uses cursor-based pagination to efficiently handle large result sets. This approach provides consistent results even as new data is added to the database.
How Pagination Works
- Initial Request: Specify a
limit
parameter (max 100) - Check Response: Look for
pagination.hasMore
field - Get Next Page: Use
pagination.nextCursor
withafter
parameter - Repeat: Continue until
hasMore
is false
Basic Example
# First page - get 20 results
curl -H "X-API-Key: your-api-key-here" \
"https://api.cybersecfeed.com/api/v1/cves?severity_min=7.0&limit=20"
# Response includes:
{
"data": {
"cves": [...],
"pagination": {
"limit": 20,
"hasMore": true,
"nextCursor": "eyJpZCI6IkNWRS0yMDI0LTAwMjEifQ=="
}
}
}
# Next page - use the cursor
curl -H "X-API-Key: your-api-key-here" \
"https://api.cybersecfeed.com/api/v1/cves?severity_min=7.0&limit=20&after=eyJpZCI6IkNWRS0yMDI0LTAwMjEifQ=="
Pagination Response Format
{
"data": {
"cves": [...],
"pagination": {
"limit": 20, // Items per page
"hasMore": true, // More pages available?
"nextCursor": "...", // Cursor for next page
"totalEstimate": 1500 // Estimated total (optional)
}
}
}
Implementation Examples
Python - Simple Pagination
import requests
def get_all_critical_cves(api_key):
"""Retrieve all critical CVEs using pagination"""
base_url = "https://api.cybersecfeed.com/api/v1/cves"
headers = {"X-API-Key": api_key}
params = {
"severity_min": 9.0,
"limit": 100 # Maximum per page
}
all_cves = []
page_count = 0
while True:
page_count += 1
print(f"Fetching page {page_count}...")
response = requests.get(base_url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
cves = data["data"]["cves"]
all_cves.extend(cves)
# Check if more pages exist
pagination = data["data"]["pagination"]
if not pagination.get("hasMore", False):
break
# Set cursor for next page
params["after"] = pagination["nextCursor"]
print(f"Retrieved {len(all_cves)} CVEs across {page_count} pages")
return all_cves
# Usage
critical_cves = get_all_critical_cves("your-api-key-here")
Python - Generator Pattern
def paginate_cves(api_key, **filters):
"""
Generator that yields CVEs one page at a time
Args:
api_key: Your API key
**filters: Any filter parameters (severity_min, kev, etc.)
Yields:
List of CVEs for each page
"""
base_url = "https://api.cybersecfeed.com/api/v1/cves"
headers = {"X-API-Key": api_key}
# Set up parameters
params = filters.copy()
params.setdefault("limit", 100)
while True:
response = requests.get(base_url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
cves = data["data"]["cves"]
# Yield current page
yield cves
# Check for more pages
pagination = data["data"]["pagination"]
if not pagination.get("hasMore", False):
break
# Set up next page
params["after"] = pagination["nextCursor"]
# Usage - process pages as they arrive
for page in paginate_cves("your-api-key-here", severity_min=8.0, kev=True):
print(f"Processing {len(page)} CVEs...")
for cve in page:
# Process each CVE
process_cve(cve)
JavaScript/Node.js
async function getAllCVEs(apiKey, filters = {}) {
const baseUrl = 'https://api.cybersecfeed.com/api/v1/cves';
const headers = { 'X-API-Key': apiKey };
const allCVEs = [];
let hasMore = true;
let cursor = null;
let pageCount = 0;
while (hasMore) {
pageCount++;
console.log(`Fetching page ${pageCount}...`);
// Build query parameters
const params = new URLSearchParams({
...filters,
limit: 100,
...(cursor && { after: cursor }),
});
const response = await fetch(`${baseUrl}?${params}`, { headers });
const data = await response.json();
// Add CVEs to results
allCVEs.push(...data.data.cves);
// Check pagination
const pagination = data.data.pagination;
hasMore = pagination.hasMore || false;
cursor = pagination.nextCursor;
}
console.log(`Retrieved ${allCVEs.length} CVEs across ${pageCount} pages`);
return allCVEs;
}
// Usage with async/await
(async () => {
const cves = await getAllCVEs('your-api-key-here', {
severity_min: 7.0,
published_after: '2024-01-01',
});
})();
Go Implementation
package main
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
)
type PaginatedResponse struct {
Data struct {
CVEs []CVE `json:"cves"`
Pagination struct {
Limit int `json:"limit"`
HasMore bool `json:"hasMore"`
NextCursor string `json:"nextCursor"`
} `json:"pagination"`
} `json:"data"`
}
func getAllCVEs(apiKey string, filters map[string]string) ([]CVE, error) {
baseURL := "https://api.cybersecfeed.com/api/v1/cves"
var allCVEs []CVE
cursor := ""
for {
// Build URL with parameters
params := url.Values{}
for k, v := range filters {
params.Set(k, v)
}
params.Set("limit", "100")
if cursor != "" {
params.Set("after", cursor)
}
// Create request
req, err := http.NewRequest("GET", baseURL+"?"+params.Encode(), nil)
if err != nil {
return nil, err
}
req.Header.Set("X-API-Key", apiKey)
// Make request
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Parse response
var data PaginatedResponse
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}
// Add CVEs to results
allCVEs = append(allCVEs, data.Data.CVEs...)
// Check for more pages
if !data.Data.Pagination.HasMore {
break
}
cursor = data.Data.Pagination.NextCursor
}
return allCVEs, nil
}
Advanced Pagination Patterns
Parallel Page Processing
import asyncio
import aiohttp
async def fetch_page(session, base_url, headers, params):
"""Fetch a single page of results"""
async with session.get(base_url, headers=headers, params=params) as response:
return await response.json()
async def get_all_pages_parallel(api_key, **filters):
"""
Fetch all pages in parallel (after getting first page)
Note: Use with caution to avoid overwhelming the API
"""
base_url = "https://api.cybersecfeed.com/api/v1/cves"
headers = {"X-API-Key": api_key}
# Get first page to determine total pages
first_params = {**filters, "limit": 100}
async with aiohttp.ClientSession() as session:
first_page = await fetch_page(session, base_url, headers, first_params)
all_cves = first_page["data"]["cves"]
# If more pages, fetch them in parallel
if first_page["data"]["pagination"]["hasMore"]:
# This is a simplified example - in practice, you'd need
# to fetch pages sequentially to get all cursors first
pass
return all_cves
Streaming Results
class CVEStreamer:
"""Stream CVEs as they're fetched"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.cybersecfeed.com/api/v1/cves"
def stream_cves(self, callback, **filters):
"""
Stream CVEs to a callback function
Args:
callback: Function to call for each CVE
**filters: Query filters
"""
headers = {"X-API-Key": self.api_key}
params = {**filters, "limit": 100}
total_processed = 0
while True:
response = requests.get(
self.base_url,
headers=headers,
params=params
)
data = response.json()
# Process each CVE
for cve in data["data"]["cves"]:
callback(cve)
total_processed += 1
# Check for more
pagination = data["data"]["pagination"]
if not pagination.get("hasMore", False):
break
params["after"] = pagination["nextCursor"]
return total_processed
# Usage
def process_cve(cve):
if cve["cvss"]["baseScore"] >= 9.0:
print(f"Critical: {cve['id']}")
streamer = CVEStreamer("your-api-key-here")
total = streamer.stream_cves(process_cve, kev=True)
print(f"Processed {total} KEV entries")
Best Practices
1. Set Appropriate Limits
# Good - use maximum limit for efficiency
params = {"limit": 100} # Maximum allowed
# Less efficient - too small
params = {"limit": 10} # Results in more API calls
2. Handle Errors Gracefully
def paginate_with_retry(api_key, max_retries=3, **filters):
"""Pagination with retry logic"""
headers = {"X-API-Key": api_key}
params = {**filters, "limit": 100}
all_results = []
retry_count = 0
while True:
try:
response = requests.get(
"https://api.cybersecfeed.com/api/v1/cves",
headers=headers,
params=params,
timeout=30
)
response.raise_for_status()
data = response.json()
all_results.extend(data["data"]["cves"])
if not data["data"]["pagination"].get("hasMore", False):
break
params["after"] = data["data"]["pagination"]["nextCursor"]
retry_count = 0 # Reset on success
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
raise
print(f"Error on page, retrying ({retry_count}/{max_retries})...")
time.sleep(2 ** retry_count)
return all_results
3. Progress Tracking
def paginate_with_progress(api_key, expected_total=None, **filters):
"""Show progress while paginating"""
from tqdm import tqdm
headers = {"X-API-Key": api_key}
params = {**filters, "limit": 100}
# Set up progress bar
pbar = tqdm(total=expected_total, desc="Fetching CVEs")
all_cves = []
while True:
response = requests.get(
"https://api.cybersecfeed.com/api/v1/cves",
headers=headers,
params=params
)
data = response.json()
cves = data["data"]["cves"]
all_cves.extend(cves)
# Update progress
pbar.update(len(cves))
pagination = data["data"]["pagination"]
if not pagination.get("hasMore", False):
break
params["after"] = pagination["nextCursor"]
# Update total if provided
if "totalEstimate" in pagination:
pbar.total = pagination["totalEstimate"]
pbar.close()
return all_cves
4. Memory-Efficient Processing
def process_large_dataset(api_key, processor_func, **filters):
"""Process large datasets without loading all into memory"""
headers = {"X-API-Key": api_key}
params = {**filters, "limit": 100}
processed_count = 0
while True:
response = requests.get(
"https://api.cybersecfeed.com/api/v1/cves",
headers=headers,
params=params
)
data = response.json()
# Process and discard
for cve in data["data"]["cves"]:
processor_func(cve)
processed_count += 1
pagination = data["data"]["pagination"]
if not pagination.get("hasMore", False):
break
params["after"] = pagination["nextCursor"]
return processed_count
Performance Tips
- Use Maximum Limit: Always use
limit=100
to minimize API calls - Process While Fetching: Don't wait for all pages before processing
- Implement Caching: Cache pages if you need to re-process data
- Monitor Rate Limits: Respect quota limitations
- Handle Failures: Implement retry logic for network issues
Common Issues
Issue: Cursor Expired
Cursors may expire after extended periods. Solution:
def handle_cursor_expiry(api_key, expired_cursor, **filters):
"""Restart pagination if cursor expires"""
# Start fresh without cursor
params = {**filters, "limit": 100}
# Continue pagination from beginning
Issue: Inconsistent Results
If data changes during pagination:
def get_consistent_snapshot(api_key, **filters):
"""Get all results quickly for consistency"""
# Use larger limit and process quickly
# Consider time-based filters for consistency
filters["published_before"] = datetime.now().isoformat()
Summary
Cursor-based pagination provides efficient, consistent access to large datasets. Key points:
- Use maximum limit (100) for efficiency
- Process data as you fetch it
- Handle errors and retries gracefully
- Don't store cursors long-term
- Consider memory usage for large datasets
Proper pagination implementation ensures your application can handle any volume of CVE data efficiently.