Production-ready patterns and best practices for ThemisDB. Learn from real-world experience to build robust, performant, and secure database applications.
- ✅ Query optimization patterns
- ✅ Security best practices
- ✅ Performance tuning strategies
- ✅ Error handling patterns
- ✅ Transaction management
- ✅ Monitoring and observability
- ✅ Code organization
Prerequisites: All previous tutorials
Time Required: 40 minutes
Difficulty: Intermediate to Advanced
- Query Optimization
- Security Best Practices
- Performance Best Practices
- Error Handling
- Transaction Patterns
- Monitoring
- Code Organization
❌ Bad: Full table scan
# No index on 'city' - scans all entities!
curl -X POST http://localhost:8080/query \
-d '{"table": "users", "predicates": [{"column": "city", "value": "Berlin"}]}'✅ Good: Use indexed column
# First create index
curl -X POST http://localhost:8080/index/create \
-d '{"table": "users", "column": "city", "type": "btree"}'
# Now query is fast
curl -X POST http://localhost:8080/query \
-d '{"table": "users", "predicates": [{"column": "city", "value": "Berlin"}], "use_index": true}'Performance impact: 100-1000x faster with index!
❌ Bad: Multiple single-column indexes
POST /index/create {"table": "orders", "column": "user_id"}
POST /index/create {"table": "orders", "column": "status"}
# Query can only use one index efficiently✅ Good: Composite index
POST /index/create {
"table": "orders",
"columns": ["user_id", "status", "created_at"],
"type": "btree"
}
# Optimizes: WHERE user_id=? AND status=? ORDER BY created_at❌ Bad: Fetch everything
curl http://localhost:8080/entities/products:laptop-001
# Returns all attributes including large description✅ Good: Fetch only what you need
curl -X POST http://localhost:8080/entities/products:laptop-001/attributes \
-d '{"fields": ["name", "price", "stock"]}'
# 10x faster, 90% less bandwidth❌ Bad: Unlimited results
curl -X POST http://localhost:8080/query \
-d '{"table": "products", "predicates": [...]}'
# Could return millions of rows!✅ Good: Always paginate
curl -X POST http://localhost:8080/query \
-d '{
"table": "products",
"predicates": [...],
"limit": 50,
"offset": 0
}'curl -X POST http://localhost:8080/query/explain \
-d '{
"table": "products",
"predicates": [
{"column": "category", "value": "electronics"},
{"column": "price", "operator": "<", "value": 1000}
]
}'Example output:
{
"plan": "index_scan",
"index_used": "products_category_price_idx",
"estimated_rows": 150,
"estimated_cost": 42
}❌ Bad: Plain HTTP
curl http://themisdb.example.com/entities/users:alice
# Credentials and data sent in clear text!✅ Good: HTTPS with TLS 1.3
curl https://themisdb.example.com/entities/users:alice \
--cacert ca.crt \
--cert client.crt \
--key client.keyServer configuration:
# config.yaml
tls:
enabled: true
cert_file: /etc/themis/server.crt
key_file: /etc/themis/server.key
ca_file: /etc/themis/ca.crt
min_version: "TLS1.3"❌ Bad: No auth
curl http://localhost:8080/entities/sensitive:data
# Anyone can access!✅ Good: API key authentication
curl https://localhost:8080/entities/sensitive:data \
-H "Authorization: Bearer sk_prod_abc123xyz"Better: mTLS (Mutual TLS)
# config.yaml
auth:
mode: mtls
require_client_cert: true
authorized_cns:
- "client1.example.com"
- "client2.example.com"# Create role with limited permissions
curl -X POST https://localhost:8080/admin/roles \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-d '{
"role": "read_only_user",
"permissions": ["read:users", "read:products"]
}'
# Assign role to API key
curl -X POST https://localhost:8080/admin/api-keys \
-d '{
"name": "App Backend",
"role": "read_only_user"
}'Field-level encryption:
from cryptography.fernet import Fernet
import requests
# Generate key (store securely!)
key = Fernet.generate_key()
cipher = Fernet(key)
# Encrypt sensitive field
ssn = "123-45-6789"
encrypted_ssn = cipher.encrypt(ssn.encode()).decode()
# Store encrypted
requests.put("http://localhost:8080/entities/users:alice", json={
"blob": json.dumps({
"name": "Alice",
"ssn_encrypted": encrypted_ssn # Encrypted!
})
})Enable audit logs:
# config.yaml
audit:
enabled: true
log_file: /var/log/themis/audit.log
events:
- create
- update
- delete
- query
include_data: false # Don't log sensitive dataMonitor audit logs:
tail -f /var/log/themis/audit.log | jq '.event, .entity, .user, .timestamp'Prevent abuse:
# config.yaml
rate_limit:
enabled: true
requests_per_second: 100
burst: 200
by: ip_address❌ Bad: New connection per request
def get_user(user_id):
response = requests.get(f"http://localhost:8080/entities/{user_id}")
return response.json()
# Creates new TCP connection every time!✅ Good: Reuse connections
import requests
# Create session (connection pool)
session = requests.Session()
session.mount('http://', requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=20,
max_retries=3
))
def get_user(user_id):
response = session.get(f"http://localhost:8080/entities/{user_id}")
return response.json()
# Reuses connections - 10x faster!❌ Bad: Individual operations in loop
for product in products:
create_product(product) # 1000 network calls✅ Good: Batch operation
batch_create_products(products) # 1 network callApplication-level caching:
from functools import lru_cache
import requests
@lru_cache(maxsize=1000)
def get_product(product_id):
response = requests.get(f"http://localhost:8080/entities/{product_id}")
return response.json()
# First call: fetches from DB
product = get_product("products:laptop-001")
# Second call: returns from cache
product = get_product("products:laptop-001") # Instant!Redis caching layer:
import redis
import requests
import json
redis_client = redis.Redis(host='localhost', port=6379)
def get_product_cached(product_id):
# Try cache first
cached = redis_client.get(product_id)
if cached:
return json.loads(cached)
# Cache miss - fetch from DB
response = requests.get(f"http://localhost:8080/entities/{product_id}")
data = response.json()
# Store in cache (TTL: 5 minutes)
redis_client.setex(product_id, 300, json.dumps(data))
return data// orders:order-001 - Denormalized for fast display
{
"entity_id": "orders:order-001",
"attributes": {
"user_id": "users:alice",
"user_name": "Alice Johnson", // Denormalized!
"user_email": "alice@example.com", // Denormalized!
"total": 99.99
}
}Trade-off: Faster reads, but must update multiple places on user name change.
# Check database metrics
curl http://localhost:4318/metrics | grep -E "query_duration|cache_hit_rate|index_usage"Key metrics to watch:
- Query duration (p50, p95, p99)
- Cache hit rate (target: >80%)
- Index usage rate
- Transaction duration
- Connection pool utilization
❌ Bad: Assume success
response = requests.get(f"http://localhost:8080/entities/{entity_id}")
data = response.json() # Could fail!✅ Good: Check status
response = requests.get(f"http://localhost:8080/entities/{entity_id}")
if response.status_code == 200:
data = response.json()
elif response.status_code == 404:
print(f"Entity {entity_id} not found")
else:
print(f"Error: {response.status_code} - {response.text}")import time
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def create_session_with_retries():
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1, # 1s, 2s, 4s delays
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["GET", "PUT", "POST", "DELETE"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
session = create_session_with_retries()def update_product_stock(product_id, quantity_delta):
max_retries = 3
for attempt in range(max_retries):
# Read current state
response = requests.get(f"http://localhost:8080/entities/{product_id}")
entity = response.json()
current_version = entity['version']
current_stock = entity['attributes']['stock']
# Calculate new stock
new_stock = current_stock + quantity_delta
if new_stock < 0:
raise ValueError("Insufficient stock")
# Update with version check
update_response = requests.put(
f"http://localhost:8080/entities/{product_id}",
headers={"If-Match": f'"version-{current_version}"'},
json={"attributes": {"stock": new_stock}}
)
if update_response.status_code == 200:
return True # Success!
elif update_response.status_code == 409: # Conflict
print(f"Version conflict, retrying... (attempt {attempt + 1})")
time.sleep(0.1 * (2 ** attempt)) # Exponential backoff
else:
raise Exception(f"Update failed: {update_response.text}")
raise Exception("Max retries exceeded")from pybreaker import CircuitBreaker
db_breaker = CircuitBreaker(
fail_max=5, # Open after 5 failures
timeout_duration=60, # Keep open for 60 seconds
name="themisdb"
)
@db_breaker
def query_database(query):
response = requests.post("http://localhost:8080/query", json=query)
response.raise_for_status()
return response.json()
# Usage
try:
result = query_database({"table": "users", ...})
except CircuitBreakerError:
print("Database circuit breaker open - using fallback")
result = get_cached_result()❌ Bad: No transaction
# Create order
create_entity(order)
# Decrement stock
update_entity(product, {"stock": new_stock})
# If second operation fails, order exists but stock unchanged!✅ Good: Transaction
# Start transaction
tx_response = requests.post("http://localhost:8080/tx/begin")
tx_id = tx_response.json()['tx_id']
try:
# Create order within transaction
requests.put(
f"http://localhost:8080/tx/{tx_id}/entities/{order_id}",
json=order_data
)
# Update stock within same transaction
requests.patch(
f"http://localhost:8080/tx/{tx_id}/entities/{product_id}",
json={"attributes": {"stock": new_stock}}
)
# Commit - all or nothing!
requests.post(f"http://localhost:8080/tx/{tx_id}/commit")
except Exception as e:
# Rollback on error
requests.post(f"http://localhost:8080/tx/{tx_id}/rollback")
raise❌ Bad: Long-running transaction
tx_id = begin_transaction()
# ... 30 seconds of processing ...
# ... network calls ...
# ... file I/O ...
commit_transaction(tx_id)
# Locks held for 30 seconds!✅ Good: Minimal transaction scope
# Do expensive work outside transaction
order_data = prepare_order_data() # 30 seconds
stock_updates = calculate_stock_updates()
# Transaction only for database operations
tx_id = begin_transaction()
create_order(tx_id, order_data) # 10ms
update_stocks(tx_id, stock_updates) # 10ms
commit_transaction(tx_id)
# Locks held for 20ms!def transfer_with_deadlock_retry(from_account, to_account, amount, max_retries=3):
for attempt in range(max_retries):
try:
tx_id = begin_transaction()
# Deduct from source
deduct_balance(tx_id, from_account, amount)
# Add to destination
add_balance(tx_id, to_account, amount)
commit_transaction(tx_id)
return True
except DeadlockException:
rollback_transaction(tx_id)
if attempt < max_retries - 1:
time.sleep(random.uniform(0.1, 0.5)) # Random backoff
else:
raisePrometheus configuration:
# prometheus.yml
scrape_configs:
- job_name: 'themisdb'
static_configs:
- targets: ['localhost:4318']
scrape_interval: 15sKey metrics to monitor:
# Query performance
themis_query_duration_seconds{quantile="0.99"}
themis_query_total
# Cache performance
themis_cache_hit_rate
themis_cache_size_bytes
# Database health
themis_active_connections
themis_transaction_duration_seconds
themis_index_usage_totalAlert rules:
# alerts.yml
groups:
- name: themisdb
rules:
- alert: HighQueryLatency
expr: themis_query_duration_seconds{quantile="0.99"} > 1.0
for: 5m
annotations:
summary: "High query latency detected"
- alert: LowCacheHitRate
expr: themis_cache_hit_rate < 0.7
for: 10m
annotations:
summary: "Cache hit rate below 70%"import requests
import time
def health_check():
"""Comprehensive health check"""
try:
# Check basic connectivity
response = requests.get("http://localhost:8080/health", timeout=5)
if response.status_code != 200:
return False, "Health endpoint failed"
# Check write capability
test_entity = f"health_check:{int(time.time())}"
write_response = requests.put(
f"http://localhost:8080/entities/{test_entity}",
json={"blob": '{"test": true}'},
timeout=5
)
if write_response.status_code != 200:
return False, "Write test failed"
# Clean up
requests.delete(f"http://localhost:8080/entities/{test_entity}")
return True, "All checks passed"
except Exception as e:
return False, f"Health check error: {str(e)}"Server configuration:
# config.yaml
logging:
slow_query_threshold_ms: 1000 # Log queries slower than 1s
slow_query_log: /var/log/themis/slow_queries.logAnalyze slow queries:
cat /var/log/themis/slow_queries.log | \
jq -r '[.duration_ms, .query.table, .query.predicates] | @csv' | \
sort -t, -k1 -rn | \
head -20import requests
from typing import Dict, List, Optional
class ThemisDBClient:
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
if api_key:
self.session.headers['Authorization'] = f'Bearer {api_key}'
def create_entity(self, entity_id: str, data: Dict) -> Dict:
"""Create an entity"""
response = self.session.put(
f"{self.base_url}/entities/{entity_id}",
json={"blob": json.dumps(data)}
)
response.raise_for_status()
return response.json()
def get_entity(self, entity_id: str) -> Optional[Dict]:
"""Get an entity"""
response = self.session.get(f"{self.base_url}/entities/{entity_id}")
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
def query(self, table: str, predicates: List[Dict], limit: int = 100) -> List[Dict]:
"""Query entities"""
response = self.session.post(
f"{self.base_url}/query",
json={
"table": table,
"predicates": predicates,
"limit": limit,
"return": "entities"
}
)
response.raise_for_status()
return response.json()['entities']
def batch_create(self, entities: List[Dict]) -> Dict:
"""Batch create entities"""
response = self.session.post(
f"{self.base_url}/batch/create",
json={"entities": entities}
)
response.raise_for_status()
return response.json()
# Usage
client = ThemisDBClient("http://localhost:8080", api_key="sk_...")
user = client.get_entity("users:alice")class UserRepository:
def __init__(self, client: ThemisDBClient):
self.client = client
def create_user(self, user_id: str, name: str, email: str) -> Dict:
"""Create a user"""
data = {"name": name, "email": email}
return self.client.create_entity(f"users:{user_id}", data)
def get_user(self, user_id: str) -> Optional[Dict]:
"""Get user by ID"""
entity = self.client.get_entity(f"users:{user_id}")
if entity:
return json.loads(entity['blob'])
return None
def find_users_by_city(self, city: str) -> List[Dict]:
"""Find all users in a city"""
entities = self.client.query(
table="users",
predicates=[{"column": "city", "value": city}]
)
return [json.loads(e['blob']) for e in entities]
# Usage
repo = UserRepository(client)
user = repo.get_user("alice")
berlin_users = repo.find_users_by_city("Berlin")# config.py
import os
from dataclasses import dataclass
@dataclass
class DatabaseConfig:
url: str
api_key: str
timeout: int
max_retries: int
@classmethod
def from_env(cls):
return cls(
url=os.getenv("THEMIS_URL", "http://localhost:8080"),
api_key=os.getenv("THEMIS_API_KEY", ""),
timeout=int(os.getenv("THEMIS_TIMEOUT", "30")),
max_retries=int(os.getenv("THEMIS_MAX_RETRIES", "3"))
)
# Usage
config = DatabaseConfig.from_env()
client = ThemisDBClient(config.url, config.api_key)Before deploying to production:
Security:
- TLS enabled with valid certificates
- Authentication configured
- RBAC roles defined
- Sensitive data encrypted
- Audit logging enabled
- Rate limiting configured
Performance:
- Connection pooling implemented
- Caching strategy in place
- All query paths have indexes
- Batch operations used for bulk data
- Query result limits enforced
Reliability:
- Error handling implemented
- Retry logic with backoff
- Circuit breakers configured
- Health checks implemented
- Backup strategy defined
Monitoring:
- Metrics collection configured
- Alerts set up
- Slow query logging enabled
- Dashboard created
- On-call rotation defined
Operations:
- Deployment playbook documented
- Rollback procedure tested
- Disaster recovery plan created
- Capacity planning done
- Team trained
- ✅ Query optimization techniques
- ✅ Security hardening strategies
- ✅ Performance tuning patterns
- ✅ Robust error handling
- ✅ Transaction best practices
- ✅ Monitoring and observability
- ✅ Clean code organization
- Review: Security Documentation
- Deep Dive: Performance Tuning Guide
- Deploy: Production Deployment Guide
Ready for production? See Operations Guide →