Part 5: Testing, Validation, Monitoring & Sprint-1 Checklist
Payment & Chargeback Fraud Platform - Principal-Level Design Document
Phase 1: Real-Time Streaming & Decisioning
1. Offline Validation & Replay Testing
1.1 Historical Replay Framework
class HistoricalReplayEngine:
"""Replay historical transactions through the fraud pipeline."""
def __init__(self):
self.feature_store = PointInTimeFeatureStore()
self.scoring_service = RiskScoringService()
self.policy_engine = PolicyEngine()
async def replay(
self,
start_date: datetime,
end_date: datetime,
policy_config: Optional[dict] = None,
model_version: Optional[str] = None,
sample_rate: float = 1.0
) -> ReplayResults:
"""
Replay historical transactions with optional policy/model changes.
This enables:
- Threshold simulation (what-if analysis)
- Model validation before deployment
- Policy change impact estimation
"""
# Load historical transactions
transactions = await self._load_transactions(start_date, end_date, sample_rate)
results = []
for txn in transactions:
# Get features as they existed at transaction time
features = await self.feature_store.get_features_at_time(
txn["auth_id"],
txn["transaction_date"]
)
# Score with specified (or current) model
if model_version:
scores = await self.scoring_service.score_with_model(
txn, features, model_version
)
else:
scores = await self.scoring_service.score(txn, features)
# Apply policy (specified or current)
if policy_config:
decision = await self.policy_engine.evaluate_with_config(
txn, features, scores, policy_config
)
else:
decision = await self.policy_engine.evaluate(txn, features, scores)
# Record result
results.append({
"auth_id": txn["auth_id"],
"transaction_date": txn["transaction_date"],
"amount_usd": txn["amount_usd"],
# Original decision
"original_action": txn["original_decision"],
"original_score": txn.get("original_score"),
# Replay decision
"replay_action": decision.action,
"replay_score": scores.criminal_fraud_score,
# Actual outcome (for validation)
"actual_is_fraud": txn["is_fraud"],
"actual_chargeback_amount": txn.get("chargeback_amount"),
# Decision changed?
"decision_changed": txn["original_decision"] != decision.action,
})
return self._analyze_results(results, start_date, end_date)
def _analyze_results(
self,
results: list,
start_date: datetime,
end_date: datetime
) -> ReplayResults:
"""Analyze replay results and compute impact metrics."""
df = pd.DataFrame(results)
total = len(df)
fraud_count = df["actual_is_fraud"].sum()
# Original metrics
original_approved = (df["original_action"].isin(["ALLOW", "FRICTION"])).sum()
original_blocked = (df["original_action"] == "BLOCK").sum()
original_fraud_passed = df[
(df["original_action"].isin(["ALLOW", "FRICTION"])) &
(df["actual_is_fraud"] == True)
]["amount_usd"].sum()
# Replay metrics
replay_approved = (df["replay_action"].isin(["ALLOW", "FRICTION"])).sum()
replay_blocked = (df["replay_action"] == "BLOCK").sum()
replay_fraud_passed = df[
(df["replay_action"].isin(["ALLOW", "FRICTION"])) &
(df["actual_is_fraud"] == True)
]["amount_usd"].sum()
# Confusion matrix for replay vs actual
replay_tp = len(df[(df["replay_action"] == "BLOCK") & (df["actual_is_fraud"] == True)])
replay_fp = len(df[(df["replay_action"] == "BLOCK") & (df["actual_is_fraud"] == False)])
replay_fn = len(df[(df["replay_action"] != "BLOCK") & (df["actual_is_fraud"] == True)])
replay_tn = len(df[(df["replay_action"] != "BLOCK") & (df["actual_is_fraud"] == False)])
return ReplayResults(
date_range=f"{start_date.date()} to {end_date.date()}",
transaction_count=total,
fraud_count=fraud_count,
original_metrics={
"approval_rate": original_approved / total,
"block_rate": original_blocked / total,
"fraud_passed_usd": original_fraud_passed,
},
replay_metrics={
"approval_rate": replay_approved / total,
"block_rate": replay_blocked / total,
"fraud_passed_usd": replay_fraud_passed,
"precision": replay_tp / (replay_tp + replay_fp) if (replay_tp + replay_fp) > 0 else 0,
"recall": replay_tp / (replay_tp + replay_fn) if (replay_tp + replay_fn) > 0 else 0,
"false_positive_rate": replay_fp / (replay_fp + replay_tn) if (replay_fp + replay_tn) > 0 else 0,
},
impact={
"approval_rate_change": (replay_approved - original_approved) / total,
"fraud_loss_change_usd": replay_fraud_passed - original_fraud_passed,
"false_positives_change": replay_fp - (original_blocked - len(df[(df["original_action"] == "BLOCK") & (df["actual_is_fraud"] == True)])),
},
decisions_changed=df["decision_changed"].sum(),
decisions_changed_pct=df["decision_changed"].mean() * 100,
)
async def simulate_threshold_change(
self,
new_thresholds: dict,
start_date: datetime,
end_date: datetime
) -> ThresholdSimulationResult:
"""Simulate specific threshold changes."""
# Build policy config with new thresholds
policy_config = {
"score_thresholds": {
"criminal_fraud": new_thresholds.get("criminal_fraud", {}),
"friendly_fraud": new_thresholds.get("friendly_fraud", {}),
}
}
# Run replay
results = await self.replay(
start_date=start_date,
end_date=end_date,
policy_config=policy_config
)
# Generate recommendation
if results.impact["fraud_loss_change_usd"] < 0 and results.impact["approval_rate_change"] >= -0.01:
recommendation = "RECOMMEND_DEPLOY"
reason = f"Reduces fraud loss by ${abs(results.impact['fraud_loss_change_usd']):,.2f} with minimal approval impact"
elif results.impact["fraud_loss_change_usd"] < -10000 and results.impact["approval_rate_change"] >= -0.03:
recommendation = "CONSIDER_DEPLOY"
reason = f"Significant fraud reduction, moderate approval impact"
elif results.impact["approval_rate_change"] < -0.05:
recommendation = "DO_NOT_DEPLOY"
reason = f"Approval rate drops {abs(results.impact['approval_rate_change'])*100:.1f}%"
else:
recommendation = "NEUTRAL"
reason = "No significant improvement"
return ThresholdSimulationResult(
new_thresholds=new_thresholds,
replay_results=results,
recommendation=recommendation,
reason=reason
)
1.2 Model Validation Pipeline
class ModelValidationPipeline:
"""Validate new models before deployment."""
VALIDATION_CRITERIA = {
"min_auc_roc": 0.85,
"max_auc_drop_from_champion": 0.02,
"min_precision_at_90_recall": 0.20,
"max_false_positive_rate": 0.10,
"max_latency_p99_ms": 25,
}
async def validate(
self,
challenger_model_uri: str,
validation_dataset_id: str
) -> ValidationResult:
"""Run full validation suite on challenger model."""
# Load validation data
data = await self.load_dataset(validation_dataset_id)
# Load models
challenger = await self.load_model(challenger_model_uri)
champion = await self.load_model("models:/fraud_model/Champion")
results = {
"dataset_size": len(data),
"fraud_rate": data["is_fraud"].mean(),
}
# Test 1: AUC-ROC
challenger_predictions = challenger.predict_proba(data["features"])
champion_predictions = champion.predict_proba(data["features"])
challenger_auc = roc_auc_score(data["is_fraud"], challenger_predictions)
champion_auc = roc_auc_score(data["is_fraud"], champion_predictions)
results["challenger_auc"] = challenger_auc
results["champion_auc"] = champion_auc
results["auc_difference"] = challenger_auc - champion_auc
# Test 2: Precision at 90% recall
results["precision_at_90_recall"] = self._precision_at_recall(
data["is_fraud"], challenger_predictions, recall_target=0.90
)
# Test 3: False positive rate at optimal threshold
optimal_threshold = self._find_optimal_threshold(
data["is_fraud"], challenger_predictions
)
predictions_binary = challenger_predictions >= optimal_threshold
tn, fp, fn, tp = confusion_matrix(data["is_fraud"], predictions_binary).ravel()
results["false_positive_rate"] = fp / (fp + tn)
results["optimal_threshold"] = optimal_threshold
# Test 4: Latency benchmark
latency_results = await self._benchmark_latency(challenger)
results["latency_p50_ms"] = latency_results["p50"]
results["latency_p99_ms"] = latency_results["p99"]
# Test 5: Feature importance stability
results["feature_importance"] = self._get_feature_importance(challenger)
results["importance_correlation"] = self._compare_feature_importance(
challenger, champion
)
# Evaluate against criteria
validations = []
if challenger_auc >= self.VALIDATION_CRITERIA["min_auc_roc"]:
validations.append({"check": "min_auc_roc", "passed": True})
else:
validations.append({
"check": "min_auc_roc",
"passed": False,
"reason": f"AUC {challenger_auc:.4f} < {self.VALIDATION_CRITERIA['min_auc_roc']}"
})
if champion_auc - challenger_auc <= self.VALIDATION_CRITERIA["max_auc_drop_from_champion"]:
validations.append({"check": "max_auc_drop", "passed": True})
else:
validations.append({
"check": "max_auc_drop",
"passed": False,
"reason": f"AUC dropped {champion_auc - challenger_auc:.4f} from champion"
})
if results["precision_at_90_recall"] >= self.VALIDATION_CRITERIA["min_precision_at_90_recall"]:
validations.append({"check": "precision_at_recall", "passed": True})
else:
validations.append({
"check": "precision_at_recall",
"passed": False,
"reason": f"Precision {results['precision_at_90_recall']:.4f} < required"
})
if results["latency_p99_ms"] <= self.VALIDATION_CRITERIA["max_latency_p99_ms"]:
validations.append({"check": "latency", "passed": True})
else:
validations.append({
"check": "latency",
"passed": False,
"reason": f"P99 latency {results['latency_p99_ms']}ms > {self.VALIDATION_CRITERIA['max_latency_p99_ms']}ms"
})
# Overall pass/fail
all_passed = all(v["passed"] for v in validations)
return ValidationResult(
model_uri=challenger_model_uri,
validation_dataset=validation_dataset_id,
timestamp=datetime.utcnow(),
results=results,
validations=validations,
passed=all_passed,
recommendation="PROCEED_TO_SHADOW" if all_passed else "REJECT"
)
def _precision_at_recall(
self,
y_true: np.array,
y_scores: np.array,
recall_target: float
) -> float:
"""Find precision at specified recall level."""
precision, recall, thresholds = precision_recall_curve(y_true, y_scores)
# Find threshold that achieves target recall
for i, r in enumerate(recall):
if r >= recall_target:
return precision[i]
return precision[-1]
2. Pre-Production Acceptance Criteria
2.1 Sprint-1 Go/No-Go Checklist
class Sprint1AcceptanceCriteria:
"""Pre-production acceptance criteria for Sprint-1."""
CRITERIA = {
# Latency requirements
"latency": {
"end_to_end_p50_ms": {"target": 100, "max": 150},
"end_to_end_p99_ms": {"target": 180, "max": 200},
"feature_assembly_p99_ms": {"target": 45, "max": 50},
"model_inference_p99_ms": {"target": 25, "max": 30},
"policy_evaluation_p99_ms": {"target": 12, "max": 15},
},
# Correctness requirements
"correctness": {
"idempotency_duplicate_rate": {"target": 0, "max": 0},
"event_ordering_violations": {"target": 0, "max": 0},
"feature_computation_accuracy": {"target": 0.999, "min": 0.995},
"decision_consistency_rate": {"target": 1.0, "min": 0.999},
},
# Performance requirements
"performance": {
"throughput_tps": {"target": 1000, "min": 500},
"error_rate": {"target": 0.001, "max": 0.01},
"availability": {"target": 0.9999, "min": 0.999},
},
# Business metrics (compared to baseline)
"business": {
"approval_rate_delta": {"target": 0, "min": -0.02},
"fraud_detection_rate_delta": {"target": 0, "min": -0.05},
"false_positive_rate_delta": {"target": 0, "max": 0.01},
},
# Operational readiness
"operational": {
"monitoring_coverage": {"target": 1.0, "min": 0.9},
"alert_response_test": {"target": True},
"runbook_completeness": {"target": 1.0, "min": 0.9},
"rollback_test": {"target": True},
},
}
async def run_acceptance_tests(self) -> AcceptanceReport:
"""Run full acceptance test suite."""
results = {}
# Latency tests
results["latency"] = await self._test_latency()
# Correctness tests
results["correctness"] = await self._test_correctness()
# Performance tests
results["performance"] = await self._test_performance()
# Business metrics comparison
results["business"] = await self._test_business_metrics()
# Operational readiness
results["operational"] = await self._test_operational()
# Evaluate results
all_passed = True
failures = []
for category, category_results in results.items():
for metric, result in category_results.items():
criteria = self.CRITERIA[category][metric]
passed = self._evaluate_metric(result["value"], criteria)
result["passed"] = passed
result["criteria"] = criteria
if not passed:
all_passed = False
failures.append({
"category": category,
"metric": metric,
"value": result["value"],
"criteria": criteria
})
return AcceptanceReport(
timestamp=datetime.utcnow(),
results=results,
passed=all_passed,
failures=failures,
recommendation="GO" if all_passed else "NO_GO"
)
async def _test_latency(self) -> dict:
"""Test latency under load."""
# Run load test
load_test = await self._run_load_test(
duration_seconds=300,
target_tps=500,
ramp_up_seconds=60
)
return {
"end_to_end_p50_ms": {"value": load_test.p50_latency},
"end_to_end_p99_ms": {"value": load_test.p99_latency},
"feature_assembly_p99_ms": {"value": load_test.feature_p99},
"model_inference_p99_ms": {"value": load_test.model_p99},
"policy_evaluation_p99_ms": {"value": load_test.policy_p99},
}
async def _test_correctness(self) -> dict:
"""Test correctness guarantees."""
# Idempotency test
duplicate_results = await self._test_idempotency(
num_requests=1000,
duplicate_rate=0.1 # Send 10% duplicates
)
# Event ordering test
ordering_results = await self._test_event_ordering(
num_sequences=100
)
# Feature accuracy test
feature_accuracy = await self._test_feature_accuracy(
sample_size=1000
)
# Decision consistency (same input = same output)
consistency = await self._test_decision_consistency(
num_requests=500,
repeat_each=3
)
return {
"idempotency_duplicate_rate": {"value": duplicate_results.duplicate_effects},
"event_ordering_violations": {"value": ordering_results.violations},
"feature_computation_accuracy": {"value": feature_accuracy.accuracy},
"decision_consistency_rate": {"value": consistency.consistency_rate},
}
async def _test_idempotency(
self,
num_requests: int,
duplicate_rate: float
) -> IdempotencyTestResult:
"""Verify idempotency guarantees."""
events = self._generate_test_events(num_requests)
# Add duplicates
duplicates_to_send = int(num_requests * duplicate_rate)
duplicate_events = random.sample(events, duplicates_to_send)
all_events = events + duplicate_events
random.shuffle(all_events)
# Send all events
responses = []
for event in all_events:
response = await self.api_client.process_event(event)
responses.append({
"event": event,
"response": response
})
# Check for duplicate effects
effects_by_idem_key = defaultdict(list)
for r in responses:
key = r["event"]["idempotency_key"]
effects_by_idem_key[key].append(r["response"])
duplicate_effects = 0
for key, effects in effects_by_idem_key.items():
if len(effects) > 1:
# Check if effects are identical (idempotent)
if not all(e == effects[0] for e in effects):
duplicate_effects += 1
return IdempotencyTestResult(
total_events=len(all_events),
unique_events=num_requests,
duplicates_sent=duplicates_to_send,
duplicate_effects=duplicate_effects
)
2.2 Load Testing Configuration
class LoadTestRunner:
"""Run load tests for capacity validation."""
async def run_load_test(
self,
duration_seconds: int,
target_tps: int,
ramp_up_seconds: int = 60
) -> LoadTestResults:
"""Run load test with specified parameters."""
# Generate test events
events_needed = duration_seconds * target_tps
test_events = await self._generate_realistic_events(events_needed)
# Configure load pattern
load_pattern = self._create_ramp_pattern(
target_tps=target_tps,
duration=duration_seconds,
ramp_up=ramp_up_seconds
)
# Run test
results = []
start_time = time.time()
async with aiohttp.ClientSession() as session:
for batch_start, batch_tps in load_pattern:
batch_events = test_events[batch_start:batch_start + batch_tps]
# Send batch concurrently
tasks = [
self._send_event(session, event)
for event in batch_events
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
results.extend(batch_results)
# Wait for next second
elapsed = time.time() - start_time
if elapsed < batch_start + 1:
await asyncio.sleep(batch_start + 1 - elapsed)
# Analyze results
return self._analyze_load_results(results)
def _analyze_load_results(self, results: list) -> LoadTestResults:
"""Analyze load test results."""
successful = [r for r in results if not isinstance(r, Exception) and r["status"] == "success"]
failed = [r for r in results if isinstance(r, Exception) or r.get("status") != "success"]
latencies = [r["latency_ms"] for r in successful]
return LoadTestResults(
total_requests=len(results),
successful_requests=len(successful),
failed_requests=len(failed),
success_rate=len(successful) / len(results) if results else 0,
error_rate=len(failed) / len(results) if results else 0,
# Latency percentiles
p50_latency=np.percentile(latencies, 50) if latencies else 0,
p90_latency=np.percentile(latencies, 90) if latencies else 0,
p95_latency=np.percentile(latencies, 95) if latencies else 0,
p99_latency=np.percentile(latencies, 99) if latencies else 0,
max_latency=max(latencies) if latencies else 0,
# Throughput
actual_tps=len(successful) / (results[-1]["timestamp"] - results[0]["timestamp"]).total_seconds() if len(results) > 1 else 0,
# Component breakdown
feature_p99=np.percentile([r["feature_latency"] for r in successful], 99) if successful else 0,
model_p99=np.percentile([r["model_latency"] for r in successful], 99) if successful else 0,
policy_p99=np.percentile([r["policy_latency"] for r in successful], 99) if successful else 0,
)
3. Production Monitoring & Alerting
3.1 Monitoring Dashboard Configuration
# grafana_dashboards/fraud_platform.yaml
dashboard:
title: "Fraud Platform - Production Monitoring"
refresh: "30s"
time:
from: "now-1h"
to: "now"
rows:
- title: "System Health"
panels:
- title: "Decision Latency (P99)"
type: "graph"
targets:
- expr: "histogram_quantile(0.99, rate(fraud_decision_latency_bucket[5m]))"
thresholds:
- value: 150
color: "yellow"
- value: 200
color: "red"
- title: "Request Rate"
type: "graph"
targets:
- expr: "rate(fraud_decisions_total[1m])"
alert:
name: "Low Traffic Alert"
condition: "< 10" # Less than 10 TPS
- title: "Error Rate"
type: "graph"
targets:
- expr: "rate(fraud_decision_errors_total[5m]) / rate(fraud_decisions_total[5m])"
thresholds:
- value: 0.01
color: "yellow"
- value: 0.05
color: "red"
- title: "Safe Mode Status"
type: "stat"
targets:
- expr: "fraud_safe_mode_active"
valueMappings:
- value: 0
text: "Normal"
color: "green"
- value: 1
text: "SAFE MODE"
color: "red"
- title: "Decision Distribution"
panels:
- title: "Decisions by Type"
type: "piechart"
targets:
- expr: "sum(rate(fraud_decisions_total[5m])) by (decision)"
- title: "Approval Rate (Rolling 1h)"
type: "gauge"
targets:
- expr: "sum(rate(fraud_decisions_total{decision='ALLOW'}[1h])) / sum(rate(fraud_decisions_total[1h]))"
thresholds:
- value: 0.90
color: "green"
- value: 0.85
color: "yellow"
- value: 0.80
color: "red"
- title: "Block Rate (Rolling 1h)"
type: "gauge"
targets:
- expr: "sum(rate(fraud_decisions_total{decision='BLOCK'}[1h])) / sum(rate(fraud_decisions_total[1h]))"
- title: "Model & Feature Health"
panels:
- title: "Criminal Fraud Score Distribution"
type: "heatmap"
targets:
- expr: "histogram_quantile(0.5, fraud_criminal_score_bucket)"
- title: "Feature Drift (PSI)"
type: "graph"
targets:
- expr: "fraud_feature_psi{feature=~'card_attempts.*|device_distinct.*'}"
thresholds:
- value: 0.1
color: "yellow"
- value: 0.2
color: "red"
- title: "Model Prediction Latency"
type: "graph"
targets:
- expr: "histogram_quantile(0.99, rate(fraud_model_latency_bucket[5m]))"
- title: "Champion vs Challenger"
type: "graph"
targets:
- expr: "rate(fraud_decisions_total{experiment='champion'}[5m])"
legend: "Champion"
- expr: "rate(fraud_decisions_total{experiment!='champion'}[5m])"
legend: "Challenger"
- title: "Velocity & Attacks"
panels:
- title: "Velocity Rules Triggered"
type: "graph"
targets:
- expr: "sum(rate(fraud_velocity_triggered_total[5m])) by (rule)"
- title: "Card Testing Detections"
type: "graph"
targets:
- expr: "rate(fraud_card_testing_detected_total[5m])"
alert:
name: "Card Testing Spike"
condition: "> 100"
- title: "Bot Detections"
type: "graph"
targets:
- expr: "rate(fraud_bot_detected_total[5m])"
- title: "Blocklist Hits"
type: "graph"
targets:
- expr: "sum(rate(fraud_blocklist_hits_total[5m])) by (list_type)"
- title: "Business Metrics (Lagged)"
panels:
- title: "Fraud Rate (30-day lag)"
type: "stat"
targets:
- expr: "fraud_rate_30d_lag"
thresholds:
- value: 0.005
color: "green"
- value: 0.01
color: "yellow"
- value: 0.02
color: "red"
- title: "Chargeback Volume (USD)"
type: "graph"
targets:
- expr: "sum(fraud_chargeback_amount_total)"
- title: "Dispute Win Rate"
type: "gauge"
targets:
- expr: "fraud_dispute_win_rate"
3.2 Alert Configuration
# alertmanager/fraud_alerts.yaml
groups:
- name: fraud_platform_critical
rules:
# Latency alerts
- alert: FraudDecisionLatencyHigh
expr: histogram_quantile(0.99, rate(fraud_decision_latency_bucket[5m])) > 200
for: 2m
labels:
severity: critical
team: fraud-platform
annotations:
summary: "Fraud decision P99 latency exceeds 200ms"
description: "P99 latency is {{ $value }}ms. Check model service and Redis."
runbook: "https://runbooks.internal/fraud/high-latency"
- alert: FraudDecisionLatencyWarning
expr: histogram_quantile(0.99, rate(fraud_decision_latency_bucket[5m])) > 150
for: 5m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Fraud decision P99 latency elevated"
# Error rate alerts
- alert: FraudErrorRateCritical
expr: rate(fraud_decision_errors_total[5m]) / rate(fraud_decisions_total[5m]) > 0.05
for: 1m
labels:
severity: critical
team: fraud-platform
annotations:
summary: "Fraud error rate exceeds 5%"
description: "Error rate is {{ $value | humanizePercentage }}. Entering safe mode may be required."
runbook: "https://runbooks.internal/fraud/high-error-rate"
# Safe mode alert
- alert: FraudSafeModeActive
expr: fraud_safe_mode_active == 1
for: 0m
labels:
severity: critical
team: fraud-platform
annotations:
summary: "Fraud platform in SAFE MODE"
description: "Fallback rules are in effect. ML models may be unavailable."
runbook: "https://runbooks.internal/fraud/safe-mode"
# Traffic anomaly
- alert: FraudTrafficDrop
expr: rate(fraud_decisions_total[5m]) < 10
for: 5m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Fraud platform traffic unusually low"
description: "Only {{ $value }} TPS. Check upstream integrations."
- alert: FraudTrafficSpike
expr: rate(fraud_decisions_total[5m]) > rate(fraud_decisions_total[1h] offset 1d) * 2
for: 5m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Fraud platform traffic spike detected"
description: "Traffic is 2x higher than same time yesterday."
- name: fraud_platform_model_health
rules:
# Feature drift
- alert: FeatureDriftDetected
expr: fraud_feature_psi > 0.2
for: 15m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Feature drift detected: {{ $labels.feature }}"
description: "PSI is {{ $value }}. Consider retraining model."
# Score distribution shift
- alert: ScoreDistributionShift
expr: abs(avg(fraud_criminal_score) - avg(fraud_criminal_score offset 1d)) > 0.1
for: 30m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Criminal fraud score distribution shifted"
description: "Average score changed by {{ $value }}. Investigate model or data issues."
# Model version mismatch
- alert: ModelVersionMismatch
expr: count(count by (model_version) (fraud_decisions_total)) > 2
for: 5m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Multiple model versions serving traffic"
- name: fraud_platform_business
rules:
# Approval rate drop
- alert: ApprovalRateDrop
expr: sum(rate(fraud_decisions_total{decision='ALLOW'}[1h])) / sum(rate(fraud_decisions_total[1h])) < 0.85
for: 30m
labels:
severity: warning
team: fraud-ops
annotations:
summary: "Approval rate below 85%"
description: "Approval rate is {{ $value | humanizePercentage }}. Review threshold settings."
# Block rate spike
- alert: BlockRateSpike
expr: sum(rate(fraud_decisions_total{decision='BLOCK'}[1h])) / sum(rate(fraud_decisions_total[1h])) > 0.10
for: 15m
labels:
severity: warning
team: fraud-ops
annotations:
summary: "Block rate exceeds 10%"
description: "Unusual block rate. Possible attack or threshold issue."
# Card testing attack
- alert: CardTestingAttack
expr: sum(rate(fraud_card_testing_detected_total[5m])) > 50
for: 5m
labels:
severity: critical
team: fraud-ops
annotations:
summary: "Card testing attack in progress"
description: "{{ $value }} card testing events per minute detected."
runbook: "https://runbooks.internal/fraud/card-testing-response"
3.3 Metrics Collection Code
from prometheus_client import Counter, Histogram, Gauge
# Decision metrics
DECISIONS_TOTAL = Counter(
"fraud_decisions_total",
"Total fraud decisions",
["decision", "experiment", "model_version"]
)
DECISION_LATENCY = Histogram(
"fraud_decision_latency_seconds",
"Fraud decision latency",
buckets=[0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.25, 0.3, 0.5, 1.0]
)
DECISION_ERRORS = Counter(
"fraud_decision_errors_total",
"Fraud decision errors",
["error_type"]
)
# Score metrics
CRIMINAL_SCORE = Histogram(
"fraud_criminal_score",
"Criminal fraud score distribution",
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]
)
FRIENDLY_SCORE = Histogram(
"fraud_friendly_score",
"Friendly fraud score distribution",
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]
)
# Feature metrics
FEATURE_PSI = Gauge(
"fraud_feature_psi",
"Feature Population Stability Index",
["feature"]
)
# Velocity metrics
VELOCITY_TRIGGERED = Counter(
"fraud_velocity_triggered_total",
"Velocity rules triggered",
["rule"]
)
# Detection metrics
CARD_TESTING_DETECTED = Counter(
"fraud_card_testing_detected_total",
"Card testing patterns detected"
)
BOT_DETECTED = Counter(
"fraud_bot_detected_total",
"Bot transactions detected"
)
BLOCKLIST_HITS = Counter(
"fraud_blocklist_hits_total",
"Blocklist hits",
["list_type"]
)
# Safe mode
SAFE_MODE_ACTIVE = Gauge(
"fraud_safe_mode_active",
"Safe mode status (1=active, 0=normal)"
)
# Component latency
MODEL_LATENCY = Histogram(
"fraud_model_latency_seconds",
"Model inference latency",
buckets=[0.005, 0.01, 0.015, 0.02, 0.025, 0.03, 0.05, 0.1]
)
FEATURE_LATENCY = Histogram(
"fraud_feature_latency_seconds",
"Feature assembly latency",
buckets=[0.01, 0.02, 0.03, 0.04, 0.05, 0.075, 0.1]
)
POLICY_LATENCY = Histogram(
"fraud_policy_latency_seconds",
"Policy evaluation latency",
buckets=[0.002, 0.005, 0.01, 0.015, 0.02, 0.025]
)
class MetricsCollector:
"""Collect and expose metrics."""
def record_decision(
self,
decision: PolicyDecision,
scores: RiskAssessment,
latency_ms: float,
experiment: str,
model_version: str
):
"""Record a fraud decision."""
# Decision counter
DECISIONS_TOTAL.labels(
decision=decision.action,
experiment=experiment,
model_version=model_version
).inc()
# Latency histogram
DECISION_LATENCY.observe(latency_ms / 1000)
# Score histograms
CRIMINAL_SCORE.observe(scores.criminal_fraud_score)
FRIENDLY_SCORE.observe(scores.friendly_fraud_score)
# Velocity rules
for rule in scores.signals.get("velocity", []):
VELOCITY_TRIGGERED.labels(rule=rule["rule_name"]).inc()
# Detection signals
if scores.signals.get("card_testing"):
CARD_TESTING_DETECTED.inc()
if scores.signals.get("bot", {}).get("is_bot"):
BOT_DETECTED.inc()
def record_component_latency(
self,
feature_ms: float,
model_ms: float,
policy_ms: float
):
"""Record component-level latencies."""
FEATURE_LATENCY.observe(feature_ms / 1000)
MODEL_LATENCY.observe(model_ms / 1000)
POLICY_LATENCY.observe(policy_ms / 1000)
def record_error(self, error_type: str):
"""Record an error."""
DECISION_ERRORS.labels(error_type=error_type).inc()
def set_safe_mode(self, active: bool):
"""Set safe mode status."""
SAFE_MODE_ACTIVE.set(1 if active else 0)
async def update_feature_drift(self):
"""Periodic job to compute feature drift metrics."""
drift_calculator = FeatureDriftCalculator()
for feature in TOP_FEATURES:
psi = await drift_calculator.compute_psi(
feature,
reference_window_days=30,
current_window_days=1
)
FEATURE_PSI.labels(feature=feature).set(psi)
4. Sprint-1 Implementation Checklist
4.1 Infrastructure Setup
| Task | Status | Owner | Notes |
|---|---|---|---|
| Kafka cluster provisioned | [ ] | Platform | 3 brokers, 7-day retention |
| Flink cluster deployed | [ ] | Platform | Checkpointing enabled |
| Redis Cluster deployed | [ ] | Platform | 6 nodes, persistence |
| PostgreSQL (evidence vault) | [ ] | Platform | RDS Multi-AZ |
| S3 bucket (evidence blobs) | [ ] | Platform | Versioning enabled |
| Kubernetes namespace | [ ] | Platform | Resource quotas set |
| Secrets management | [ ] | Security | HashiCorp Vault |
| Network policies | [ ] | Security | Pod-to-pod isolation |
4.2 Core Services
| Service | Status | Endpoint | Health Check |
|---|---|---|---|
| Ingestion API | [ ] | /api/v1/events | /health |
| Feature Service | [ ] | Internal | /health |
| Risk Scoring Service | [ ] | Internal | /health |
| Policy Engine | [ ] | Internal | /health |
| Evidence Service | [ ] | Internal | /health |
| Model Service (Seldon) | [ ] | /v1/models/fraud | /health |
4.3 Data Pipelines
| Pipeline | Status | Kafka Topics | Notes |
|---|---|---|---|
| Event Normalization | [ ] | raw-events -> normalized-events | Flink job |
| Velocity Counter Updates | [ ] | normalized-events -> Redis | Flink job |
| Entity Profile Updates | [ ] | normalized-events -> Redis/Feast | Flink job |
| Evidence Writer | [ ] | decisions -> PostgreSQL/S3 | Flink job |
| Chargeback Ingestion | [ ] | chargebacks -> PostgreSQL | Flink job |
| Training Data Generator | [ ] | Batch (weekly) | Kubeflow |
4.4 Model & Policy
| Item | Status | Version | Notes |
|---|---|---|---|
| Criminal fraud model trained | [ ] | v1.0 | XGBoost/LightGBM |
| Friendly fraud model trained | [ ] | v1.0 | XGBoost/LightGBM |
| Model registered in MLflow | [ ] | Champion stage | |
| Policy config finalized | [ ] | 2025.01.01 | YAML in Git |
| Thresholds calibrated | [ ] | Based on historical data | |
| Velocity rules configured | [ ] | Redis-backed | |
| Blocklists seeded | [ ] | Initial known bad actors |
4.5 Monitoring & Observability
| Item | Status | Tool | Notes |
|---|---|---|---|
| Prometheus metrics exposed | [ ] | Prometheus | All services |
| Grafana dashboards created | [ ] | Grafana | 5 dashboards |
| Alertmanager rules deployed | [ ] | Alertmanager | 15+ rules |
| PagerDuty integration | [ ] | PagerDuty | Escalation policies |
| Distributed tracing | [ ] | Jaeger | OpenTelemetry |
| Log aggregation | [ ] | Elasticsearch | 30-day retention |
| Runbooks documented | [ ] | Confluence | 10+ runbooks |
4.6 Testing & Validation
| Test Type | Status | Coverage | Notes |
|---|---|---|---|
| Unit tests | [ ] | 80%+ | All services |
| Integration tests | [ ] | Key flows | End-to-end |
| Load testing | [ ] | 500+ TPS | 5-minute sustained |
| Chaos testing | [ ] | Redis failure, model failure | |
| Idempotency testing | [ ] | 10% duplicates | |
| Replay testing | [ ] | 30-day historical | |
| Security testing | [ ] | OWASP, PCI scan |
4.7 Documentation
| Document | Status | Location | Notes |
|---|---|---|---|
| Architecture diagram | [ ] | Confluence | Updated |
| API documentation | [ ] | Swagger/OpenAPI | All endpoints |
| Runbooks | [ ] | Confluence | 10+ scenarios |
| Incident response playbook | [ ] | Confluence | Severity matrix |
| Threshold tuning guide | [ ] | Confluence | For fraud ops |
| Model documentation | [ ] | MLflow | Training, features, metrics |
4.8 Go-Live Checklist
| Step | Status | Owner | Sign-off |
|---|---|---|---|
| All acceptance tests passing | [ ] | QA | |
| Load test at 2x expected traffic | [ ] | Platform | |
| Rollback procedure tested | [ ] | Platform | |
| On-call rotation scheduled | [ ] | Ops | |
| Stakeholder sign-off | [ ] | Product | |
| PCI compliance verified | [ ] | Security | |
| Traffic cutover plan reviewed | [ ] | Platform | |
| Monitoring dashboards accessible | [ ] | Ops | |
| Alert routing confirmed | [ ] | Ops | |
| Post-launch monitoring plan | [ ] | All | 24/7 for 72 hours |
5. Summary: Sprint-1 Deliverables
5.1 What Ships in Sprint-1
- Real-time decision API - <200ms latency, exactly-once semantics
- Velocity features - Card, device, IP, user counters with sliding windows
- Criminal fraud detection - Card testing, velocity attacks, geo anomalies, bot detection
- Friendly fraud scoring - Historical abuse patterns, behavioral consistency
- Policy engine - Configurable thresholds, velocity rules, blocklists/allowlists
- Evidence vault - Immutable transaction evidence for disputes
- Basic chargeback ingestion - Link chargebacks to transactions, label taxonomy
- Monitoring & alerting - Grafana dashboards, Prometheus metrics, PagerDuty alerts
5.2 Deferred to Later Sprints
| Item | Sprint | Rationale |
|---|---|---|
| Automated representment | Sprint 2 | Requires evidence quality baseline |
| Economic optimization UI | Sprint 2 | Manual thresholds sufficient initially |
| Champion/challenger A/B | Sprint 2 | Need baseline metrics first |
| Model retraining pipeline | Sprint 3 | Need labeled data from Sprint 1-2 |
| IRSF detection | Phase 2 | Different signal types |
| ATO detection | Phase 2 | Requires session/login data |
| Subscription fraud | Phase 2 | Different lifecycle |
5.3 Success Metrics for Sprint-1
| Metric | Target | Measurement |
|---|---|---|
| P99 Latency | <200ms | Prometheus histogram |
| Availability | 99.9% | Uptime monitoring |
| Error Rate | <0.1% | Error counter / total |
| Approval Rate | >92% | Decision counter |
| Block Rate | <5% | Decision counter |
| False Positive Rate | <10% of blocks | Post-hoc analysis |
| Fraud Detection Rate | >70% of known fraud | Historical comparison |
6. Appendix: Quick Reference
6.1 Key Redis Keys
# Entity profiles
entity:user:{user_id}
entity:device:{device_fingerprint}
entity:card:{card_token}
entity:ip:{ip_hash}
entity:service:{service_id}
# Velocity counters (sorted sets with timestamps)
velocity:card:{card_token}:attempts
velocity:device:{device_fp}:cards
velocity:ip:{ip_hash}:users
# HyperLogLog for distinct counts
hll:device:{device_fp}:cards:1h
hll:device:{device_fp}:cards:24h
# Blocklists
blocklist:card_tokens
blocklist:devices
blocklist:ips
blocklist:users
# Allowlists
allowlist:users
allowlist:services
# Idempotency
idempotency:{key}
6.2 Key Kafka Topics
raw-payment-events # From PSP webhooks
normalized-events # After normalization
decisions # Decision events
evidence-captured # Evidence capture confirmations
chargebacks # Chargeback events
issuer-alerts # TC40/SAFE alerts
dispute-outcomes # Dispute resolution events
6.3 Key API Endpoints
POST /api/v1/events # Process payment event
GET /api/v1/decisions/{id} # Get decision by ID
GET /api/v1/evidence/{auth_id} # Get evidence for transaction
GET /api/v1/health # Health check
GET /api/v1/metrics # Prometheus metrics
# Internal
POST /internal/score # Risk scoring
POST /internal/policy/evaluate # Policy evaluation
GET /internal/features/{entity} # Get entity features
End of Sprint-1 Design Document