Skip to main content

Pagination

CyberSecFeed uses cursor-based pagination to efficiently handle large result sets. This approach provides consistent results even as new data is added to the database.

How Pagination Works

  1. Initial Request: Specify a limit parameter (max 100)
  2. Check Response: Look for pagination.hasMore field
  3. Get Next Page: Use pagination.nextCursor with after parameter
  4. Repeat: Continue until hasMore is false

Basic Example

# First page - get 20 results
curl -H "X-API-Key: your-api-key-here" \
"https://api.cybersecfeed.com/api/v1/cves?severity_min=7.0&limit=20"

# Response includes:
{
"data": {
"cves": [...],
"pagination": {
"limit": 20,
"hasMore": true,
"nextCursor": "eyJpZCI6IkNWRS0yMDI0LTAwMjEifQ=="
}
}
}

# Next page - use the cursor
curl -H "X-API-Key: your-api-key-here" \
"https://api.cybersecfeed.com/api/v1/cves?severity_min=7.0&limit=20&after=eyJpZCI6IkNWRS0yMDI0LTAwMjEifQ=="

Pagination Response Format

{
"data": {
"cves": [...],
"pagination": {
"limit": 20, // Items per page
"hasMore": true, // More pages available?
"nextCursor": "...", // Cursor for next page
"totalEstimate": 1500 // Estimated total (optional)
}
}
}

Implementation Examples

Python - Simple Pagination

import requests

def get_all_critical_cves(api_key):
"""Retrieve all critical CVEs using pagination"""
base_url = "https://api.cybersecfeed.com/api/v1/cves"
headers = {"X-API-Key": api_key}
params = {
"severity_min": 9.0,
"limit": 100 # Maximum per page
}

all_cves = []
page_count = 0

while True:
page_count += 1
print(f"Fetching page {page_count}...")

response = requests.get(base_url, headers=headers, params=params)
response.raise_for_status()

data = response.json()
cves = data["data"]["cves"]
all_cves.extend(cves)

# Check if more pages exist
pagination = data["data"]["pagination"]
if not pagination.get("hasMore", False):
break

# Set cursor for next page
params["after"] = pagination["nextCursor"]

print(f"Retrieved {len(all_cves)} CVEs across {page_count} pages")
return all_cves

# Usage
critical_cves = get_all_critical_cves("your-api-key-here")

Python - Generator Pattern

def paginate_cves(api_key, **filters):
"""
Generator that yields CVEs one page at a time

Args:
api_key: Your API key
**filters: Any filter parameters (severity_min, kev, etc.)

Yields:
List of CVEs for each page
"""
base_url = "https://api.cybersecfeed.com/api/v1/cves"
headers = {"X-API-Key": api_key}

# Set up parameters
params = filters.copy()
params.setdefault("limit", 100)

while True:
response = requests.get(base_url, headers=headers, params=params)
response.raise_for_status()

data = response.json()
cves = data["data"]["cves"]

# Yield current page
yield cves

# Check for more pages
pagination = data["data"]["pagination"]
if not pagination.get("hasMore", False):
break

# Set up next page
params["after"] = pagination["nextCursor"]

# Usage - process pages as they arrive
for page in paginate_cves("your-api-key-here", severity_min=8.0, kev=True):
print(f"Processing {len(page)} CVEs...")
for cve in page:
# Process each CVE
process_cve(cve)

JavaScript/Node.js

async function getAllCVEs(apiKey, filters = {}) {
const baseUrl = 'https://api.cybersecfeed.com/api/v1/cves';
const headers = { 'X-API-Key': apiKey };

const allCVEs = [];
let hasMore = true;
let cursor = null;
let pageCount = 0;

while (hasMore) {
pageCount++;
console.log(`Fetching page ${pageCount}...`);

// Build query parameters
const params = new URLSearchParams({
...filters,
limit: 100,
...(cursor && { after: cursor }),
});

const response = await fetch(`${baseUrl}?${params}`, { headers });
const data = await response.json();

// Add CVEs to results
allCVEs.push(...data.data.cves);

// Check pagination
const pagination = data.data.pagination;
hasMore = pagination.hasMore || false;
cursor = pagination.nextCursor;
}

console.log(`Retrieved ${allCVEs.length} CVEs across ${pageCount} pages`);
return allCVEs;
}

// Usage with async/await
(async () => {
const cves = await getAllCVEs('your-api-key-here', {
severity_min: 7.0,
published_after: '2024-01-01',
});
})();

Go Implementation

package main

import (
"encoding/json"
"fmt"
"net/http"
"net/url"
)

type PaginatedResponse struct {
Data struct {
CVEs []CVE `json:"cves"`
Pagination struct {
Limit int `json:"limit"`
HasMore bool `json:"hasMore"`
NextCursor string `json:"nextCursor"`
} `json:"pagination"`
} `json:"data"`
}

func getAllCVEs(apiKey string, filters map[string]string) ([]CVE, error) {
baseURL := "https://api.cybersecfeed.com/api/v1/cves"
var allCVEs []CVE
cursor := ""

for {
// Build URL with parameters
params := url.Values{}
for k, v := range filters {
params.Set(k, v)
}
params.Set("limit", "100")
if cursor != "" {
params.Set("after", cursor)
}

// Create request
req, err := http.NewRequest("GET", baseURL+"?"+params.Encode(), nil)
if err != nil {
return nil, err
}
req.Header.Set("X-API-Key", apiKey)

// Make request
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

// Parse response
var data PaginatedResponse
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}

// Add CVEs to results
allCVEs = append(allCVEs, data.Data.CVEs...)

// Check for more pages
if !data.Data.Pagination.HasMore {
break
}
cursor = data.Data.Pagination.NextCursor
}

return allCVEs, nil
}

Advanced Pagination Patterns

Parallel Page Processing

import asyncio
import aiohttp

async def fetch_page(session, base_url, headers, params):
"""Fetch a single page of results"""
async with session.get(base_url, headers=headers, params=params) as response:
return await response.json()

async def get_all_pages_parallel(api_key, **filters):
"""
Fetch all pages in parallel (after getting first page)
Note: Use with caution to avoid overwhelming the API
"""
base_url = "https://api.cybersecfeed.com/api/v1/cves"
headers = {"X-API-Key": api_key}

# Get first page to determine total pages
first_params = {**filters, "limit": 100}

async with aiohttp.ClientSession() as session:
first_page = await fetch_page(session, base_url, headers, first_params)

all_cves = first_page["data"]["cves"]

# If more pages, fetch them in parallel
if first_page["data"]["pagination"]["hasMore"]:
# This is a simplified example - in practice, you'd need
# to fetch pages sequentially to get all cursors first
pass

return all_cves

Streaming Results

class CVEStreamer:
"""Stream CVEs as they're fetched"""

def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.cybersecfeed.com/api/v1/cves"

def stream_cves(self, callback, **filters):
"""
Stream CVEs to a callback function

Args:
callback: Function to call for each CVE
**filters: Query filters
"""
headers = {"X-API-Key": self.api_key}
params = {**filters, "limit": 100}

total_processed = 0

while True:
response = requests.get(
self.base_url,
headers=headers,
params=params
)
data = response.json()

# Process each CVE
for cve in data["data"]["cves"]:
callback(cve)
total_processed += 1

# Check for more
pagination = data["data"]["pagination"]
if not pagination.get("hasMore", False):
break

params["after"] = pagination["nextCursor"]

return total_processed

# Usage
def process_cve(cve):
if cve["cvss"]["baseScore"] >= 9.0:
print(f"Critical: {cve['id']}")

streamer = CVEStreamer("your-api-key-here")
total = streamer.stream_cves(process_cve, kev=True)
print(f"Processed {total} KEV entries")

Best Practices

1. Set Appropriate Limits

# Good - use maximum limit for efficiency
params = {"limit": 100} # Maximum allowed

# Less efficient - too small
params = {"limit": 10} # Results in more API calls

2. Handle Errors Gracefully

def paginate_with_retry(api_key, max_retries=3, **filters):
"""Pagination with retry logic"""
headers = {"X-API-Key": api_key}
params = {**filters, "limit": 100}

all_results = []
retry_count = 0

while True:
try:
response = requests.get(
"https://api.cybersecfeed.com/api/v1/cves",
headers=headers,
params=params,
timeout=30
)
response.raise_for_status()

data = response.json()
all_results.extend(data["data"]["cves"])

if not data["data"]["pagination"].get("hasMore", False):
break

params["after"] = data["data"]["pagination"]["nextCursor"]
retry_count = 0 # Reset on success

except Exception as e:
retry_count += 1
if retry_count >= max_retries:
raise
print(f"Error on page, retrying ({retry_count}/{max_retries})...")
time.sleep(2 ** retry_count)

return all_results

3. Progress Tracking

def paginate_with_progress(api_key, expected_total=None, **filters):
"""Show progress while paginating"""
from tqdm import tqdm

headers = {"X-API-Key": api_key}
params = {**filters, "limit": 100}

# Set up progress bar
pbar = tqdm(total=expected_total, desc="Fetching CVEs")

all_cves = []

while True:
response = requests.get(
"https://api.cybersecfeed.com/api/v1/cves",
headers=headers,
params=params
)
data = response.json()

cves = data["data"]["cves"]
all_cves.extend(cves)

# Update progress
pbar.update(len(cves))

pagination = data["data"]["pagination"]
if not pagination.get("hasMore", False):
break

params["after"] = pagination["nextCursor"]

# Update total if provided
if "totalEstimate" in pagination:
pbar.total = pagination["totalEstimate"]

pbar.close()
return all_cves

4. Memory-Efficient Processing

def process_large_dataset(api_key, processor_func, **filters):
"""Process large datasets without loading all into memory"""
headers = {"X-API-Key": api_key}
params = {**filters, "limit": 100}

processed_count = 0

while True:
response = requests.get(
"https://api.cybersecfeed.com/api/v1/cves",
headers=headers,
params=params
)
data = response.json()

# Process and discard
for cve in data["data"]["cves"]:
processor_func(cve)
processed_count += 1

pagination = data["data"]["pagination"]
if not pagination.get("hasMore", False):
break

params["after"] = pagination["nextCursor"]

return processed_count

Performance Tips

  1. Use Maximum Limit: Always use limit=100 to minimize API calls
  2. Process While Fetching: Don't wait for all pages before processing
  3. Implement Caching: Cache pages if you need to re-process data
  4. Monitor Rate Limits: Respect quota limitations
  5. Handle Failures: Implement retry logic for network issues

Common Issues

Issue: Cursor Expired

Cursors may expire after extended periods. Solution:

def handle_cursor_expiry(api_key, expired_cursor, **filters):
"""Restart pagination if cursor expires"""
# Start fresh without cursor
params = {**filters, "limit": 100}
# Continue pagination from beginning

Issue: Inconsistent Results

If data changes during pagination:

def get_consistent_snapshot(api_key, **filters):
"""Get all results quickly for consistency"""
# Use larger limit and process quickly
# Consider time-based filters for consistency
filters["published_before"] = datetime.now().isoformat()

Summary

Cursor-based pagination provides efficient, consistent access to large datasets. Key points:

  • Use maximum limit (100) for efficiency
  • Process data as you fetch it
  • Handle errors and retries gracefully
  • Don't store cursors long-term
  • Consider memory usage for large datasets

Proper pagination implementation ensures your application can handle any volume of CVE data efficiently.