Optimize your VBAPI integration with rate limiting strategies, performance monitoring, and scaling best practices for production deployments.
Performance Disclaimer
All code examples on this page are illustrative pseudo-code and are not intended for direct production use.
VBAPI enforces throttling using a token bucket–style algorithm to ensure fair usage and system stability.
- 25 requests per second sustained rate
- 50 requests per second burst rate
- Additional bursts allowed within internal smoothing windows
- Per-client limits based on API key
When limits are exceeded, VBAPI returns 429 Too Many Requests.
# Instead of multiple individual creates:
for county in counties:
create_county(county) # 10 requests
# Use batch create:
create_counties_batch(counties) # 1 requestdef get_all_members_efficiently(page_size=500):
"""Efficiently retrieve all members with optimal page size"""
members = []
page = 1
while True:
response = api_client.list_members(
page=page,
pageSize=page_size,
)
batch = response['data']
if not batch:
break
members.extend(batch)
page += 1
# Respect rate limits
time.sleep(0.1) # 10 requests per second
return membersimport time
from collections import deque
class AdaptiveRateLimiter:
"""Smart rate limiter that adapts to API responses"""
def __init__(self, initial_rate=20):
self.current_rate = initial_rate # requests per second
self.request_times = deque()
self.last_429 = None
self.consecutive_success = 0
def wait_if_needed(self):
"""Wait if necessary to respect current rate limit"""
now = time.time()
# Remove old request times (older than 1 second)
while self.request_times and self.request_times[0] <= now - 1:
self.request_times.popleft()
# If we're at the limit, wait
if len(self.request_times) >= self.current_rate:
sleep_time = 1 - (now - self.request_times[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.request_times.append(time.time())
def handle_429_response(self):
"""Reduce rate when we get 429"""
self.last_429 = time.time()
self.current_rate = max(1, int(self.current_rate * 0.5))
self.consecutive_success = 0
print(f"Rate limited! Reducing to {self.current_rate} req/sec")
def handle_success_response(self):
"""Gradually increase rate on success"""
self.consecutive_success += 1
if (self.last_429 and
time.time() - self.last_429 > 60 and
self.consecutive_success >= 20):
# Gradually increase rate after 1 minute of no 429s
self.current_rate = min(25, self.current_rate + 1)
self.last_429 = None
self.consecutive_success = 0
# Usage
rate_limiter = AdaptiveRateLimiter()
def make_api_call(url, headers, data):
rate_limiter.wait_if_needed()
response = requests.post(url, headers=headers, json=data)
if response.status_code == 429:
rate_limiter.handle_429_response()
raise RateLimitExceeded()
else:
rate_limiter.handle_success_response()
return responseimport asyncio
import aiohttp
from asyncio import Semaphore
class AsyncVBAPIClient:
def __init__(self, max_concurrent=10, requests_per_second=20):
self.semaphore = Semaphore(max_concurrent)
self.rate_limiter = asyncio.create_task(self._rate_limiter(requests_per_second))
self.request_queue = asyncio.Queue()
async def _rate_limiter(self, rps):
"""Release requests at specified rate"""
while True:
await self.request_queue.put(None)
await asyncio.sleep(1 / rps)
async def make_request(self, method, url, **kwargs):
async with self.semaphore:
# Wait for rate limiter
await self.request_queue.get()
async with aiohttp.ClientSession() as session:
async with session.request(method, url, **kwargs) as response:
return await response.json()
# Process many requests in parallel while respecting rate limits
async def process_claims_parallel(claim_ids):
client = AsyncVBAPIClient(max_concurrent=5, requests_per_second=20)
tasks = [
client.make_request('GET', f'/claims/{claim_id}')
for claim_id in claim_ids
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return resultsimport time
import threading
class TokenBucketRateLimiter:
"""Token bucket rate limiter implementation"""
def __init__(self, rate, capacity=None):
self.rate = rate # tokens per second
self.capacity = capacity or rate * 2 # bucket size
self.tokens = self.capacity
self.last_update = time.time()
self.lock = threading.Lock()
def acquire(self, tokens=1):
"""Acquire tokens, blocking if necessary"""
with self.lock:
now = time.time()
# Add tokens based on time elapsed
elapsed = now - self.last_update
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
else:
# Calculate wait time
wait_time = (tokens - self.tokens) / self.rate
return wait_time
def wait_and_acquire(self, tokens=1):
"""Acquire tokens, waiting if necessary"""
result = self.acquire(tokens)
if result is not True:
time.sleep(result)
return self.acquire(tokens)
return True
# Usage
bucket = TokenBucketRateLimiter(rate=25, capacity=50)
def rate_limited_request(url, headers, data):
bucket.wait_and_acquire() # Wait if necessary
return requests.post(url, headers=headers, json=data)Clients may request a Service Level Review via VBA Support for increased rate limits.
- Expected production load - Documented traffic projections
- Integration design - Efficient use of batch operations and pagination
- Test results - Performance testing in lower environments
- Business justification - Critical business processes requiring higher throughput
- Document current usage patterns and bottlenecks
- Provide load testing results from development/test environments
- Detail optimization efforts (batching, caching, etc.)
- Submit request to VBA Support with business justification
- Load testing completed with realistic data volumes
- Rate limiting strategies implemented and tested
- Caching strategy implemented for frequently accessed data
- Error handling and retry logic tested
- Performance baselines established
- Resource limits appropriate for expected load
- Regular performance reviews scheduled
- Rate limit utilization tracked
- API usage patterns analyzed for improvements
This comprehensive performance guide ensures your VBAPI integration scales efficiently and maintains optimal performance under production loads.