Skip to main content

Part 5: Testing, Validation, Monitoring & Sprint-1 Checklist

Payment & Chargeback Fraud Platform - Principal-Level Design Document

Phase 1: Real-Time Streaming & Decisioning


1. Offline Validation & Replay Testing

1.1 Historical Replay Framework

class HistoricalReplayEngine:
"""Replay historical transactions through the fraud pipeline."""

def __init__(self):
self.feature_store = PointInTimeFeatureStore()
self.scoring_service = RiskScoringService()
self.policy_engine = PolicyEngine()

async def replay(
self,
start_date: datetime,
end_date: datetime,
policy_config: Optional[dict] = None,
model_version: Optional[str] = None,
sample_rate: float = 1.0
) -> ReplayResults:
"""
Replay historical transactions with optional policy/model changes.

This enables:
- Threshold simulation (what-if analysis)
- Model validation before deployment
- Policy change impact estimation
"""

# Load historical transactions
transactions = await self._load_transactions(start_date, end_date, sample_rate)

results = []
for txn in transactions:
# Get features as they existed at transaction time
features = await self.feature_store.get_features_at_time(
txn["auth_id"],
txn["transaction_date"]
)

# Score with specified (or current) model
if model_version:
scores = await self.scoring_service.score_with_model(
txn, features, model_version
)
else:
scores = await self.scoring_service.score(txn, features)

# Apply policy (specified or current)
if policy_config:
decision = await self.policy_engine.evaluate_with_config(
txn, features, scores, policy_config
)
else:
decision = await self.policy_engine.evaluate(txn, features, scores)

# Record result
results.append({
"auth_id": txn["auth_id"],
"transaction_date": txn["transaction_date"],
"amount_usd": txn["amount_usd"],

# Original decision
"original_action": txn["original_decision"],
"original_score": txn.get("original_score"),

# Replay decision
"replay_action": decision.action,
"replay_score": scores.criminal_fraud_score,

# Actual outcome (for validation)
"actual_is_fraud": txn["is_fraud"],
"actual_chargeback_amount": txn.get("chargeback_amount"),

# Decision changed?
"decision_changed": txn["original_decision"] != decision.action,
})

return self._analyze_results(results, start_date, end_date)

def _analyze_results(
self,
results: list,
start_date: datetime,
end_date: datetime
) -> ReplayResults:
"""Analyze replay results and compute impact metrics."""

df = pd.DataFrame(results)

total = len(df)
fraud_count = df["actual_is_fraud"].sum()

# Original metrics
original_approved = (df["original_action"].isin(["ALLOW", "FRICTION"])).sum()
original_blocked = (df["original_action"] == "BLOCK").sum()
original_fraud_passed = df[
(df["original_action"].isin(["ALLOW", "FRICTION"])) &
(df["actual_is_fraud"] == True)
]["amount_usd"].sum()

# Replay metrics
replay_approved = (df["replay_action"].isin(["ALLOW", "FRICTION"])).sum()
replay_blocked = (df["replay_action"] == "BLOCK").sum()
replay_fraud_passed = df[
(df["replay_action"].isin(["ALLOW", "FRICTION"])) &
(df["actual_is_fraud"] == True)
]["amount_usd"].sum()

# Confusion matrix for replay vs actual
replay_tp = len(df[(df["replay_action"] == "BLOCK") & (df["actual_is_fraud"] == True)])
replay_fp = len(df[(df["replay_action"] == "BLOCK") & (df["actual_is_fraud"] == False)])
replay_fn = len(df[(df["replay_action"] != "BLOCK") & (df["actual_is_fraud"] == True)])
replay_tn = len(df[(df["replay_action"] != "BLOCK") & (df["actual_is_fraud"] == False)])

return ReplayResults(
date_range=f"{start_date.date()} to {end_date.date()}",
transaction_count=total,
fraud_count=fraud_count,

original_metrics={
"approval_rate": original_approved / total,
"block_rate": original_blocked / total,
"fraud_passed_usd": original_fraud_passed,
},

replay_metrics={
"approval_rate": replay_approved / total,
"block_rate": replay_blocked / total,
"fraud_passed_usd": replay_fraud_passed,
"precision": replay_tp / (replay_tp + replay_fp) if (replay_tp + replay_fp) > 0 else 0,
"recall": replay_tp / (replay_tp + replay_fn) if (replay_tp + replay_fn) > 0 else 0,
"false_positive_rate": replay_fp / (replay_fp + replay_tn) if (replay_fp + replay_tn) > 0 else 0,
},

impact={
"approval_rate_change": (replay_approved - original_approved) / total,
"fraud_loss_change_usd": replay_fraud_passed - original_fraud_passed,
"false_positives_change": replay_fp - (original_blocked - len(df[(df["original_action"] == "BLOCK") & (df["actual_is_fraud"] == True)])),
},

decisions_changed=df["decision_changed"].sum(),
decisions_changed_pct=df["decision_changed"].mean() * 100,
)

async def simulate_threshold_change(
self,
new_thresholds: dict,
start_date: datetime,
end_date: datetime
) -> ThresholdSimulationResult:
"""Simulate specific threshold changes."""

# Build policy config with new thresholds
policy_config = {
"score_thresholds": {
"criminal_fraud": new_thresholds.get("criminal_fraud", {}),
"friendly_fraud": new_thresholds.get("friendly_fraud", {}),
}
}

# Run replay
results = await self.replay(
start_date=start_date,
end_date=end_date,
policy_config=policy_config
)

# Generate recommendation
if results.impact["fraud_loss_change_usd"] < 0 and results.impact["approval_rate_change"] >= -0.01:
recommendation = "RECOMMEND_DEPLOY"
reason = f"Reduces fraud loss by ${abs(results.impact['fraud_loss_change_usd']):,.2f} with minimal approval impact"
elif results.impact["fraud_loss_change_usd"] < -10000 and results.impact["approval_rate_change"] >= -0.03:
recommendation = "CONSIDER_DEPLOY"
reason = f"Significant fraud reduction, moderate approval impact"
elif results.impact["approval_rate_change"] < -0.05:
recommendation = "DO_NOT_DEPLOY"
reason = f"Approval rate drops {abs(results.impact['approval_rate_change'])*100:.1f}%"
else:
recommendation = "NEUTRAL"
reason = "No significant improvement"

return ThresholdSimulationResult(
new_thresholds=new_thresholds,
replay_results=results,
recommendation=recommendation,
reason=reason
)

1.2 Model Validation Pipeline

class ModelValidationPipeline:
"""Validate new models before deployment."""

VALIDATION_CRITERIA = {
"min_auc_roc": 0.85,
"max_auc_drop_from_champion": 0.02,
"min_precision_at_90_recall": 0.20,
"max_false_positive_rate": 0.10,
"max_latency_p99_ms": 25,
}

async def validate(
self,
challenger_model_uri: str,
validation_dataset_id: str
) -> ValidationResult:
"""Run full validation suite on challenger model."""

# Load validation data
data = await self.load_dataset(validation_dataset_id)

# Load models
challenger = await self.load_model(challenger_model_uri)
champion = await self.load_model("models:/fraud_model/Champion")

results = {
"dataset_size": len(data),
"fraud_rate": data["is_fraud"].mean(),
}

# Test 1: AUC-ROC
challenger_predictions = challenger.predict_proba(data["features"])
champion_predictions = champion.predict_proba(data["features"])

challenger_auc = roc_auc_score(data["is_fraud"], challenger_predictions)
champion_auc = roc_auc_score(data["is_fraud"], champion_predictions)

results["challenger_auc"] = challenger_auc
results["champion_auc"] = champion_auc
results["auc_difference"] = challenger_auc - champion_auc

# Test 2: Precision at 90% recall
results["precision_at_90_recall"] = self._precision_at_recall(
data["is_fraud"], challenger_predictions, recall_target=0.90
)

# Test 3: False positive rate at optimal threshold
optimal_threshold = self._find_optimal_threshold(
data["is_fraud"], challenger_predictions
)
predictions_binary = challenger_predictions >= optimal_threshold
tn, fp, fn, tp = confusion_matrix(data["is_fraud"], predictions_binary).ravel()
results["false_positive_rate"] = fp / (fp + tn)
results["optimal_threshold"] = optimal_threshold

# Test 4: Latency benchmark
latency_results = await self._benchmark_latency(challenger)
results["latency_p50_ms"] = latency_results["p50"]
results["latency_p99_ms"] = latency_results["p99"]

# Test 5: Feature importance stability
results["feature_importance"] = self._get_feature_importance(challenger)
results["importance_correlation"] = self._compare_feature_importance(
challenger, champion
)

# Evaluate against criteria
validations = []

if challenger_auc >= self.VALIDATION_CRITERIA["min_auc_roc"]:
validations.append({"check": "min_auc_roc", "passed": True})
else:
validations.append({
"check": "min_auc_roc",
"passed": False,
"reason": f"AUC {challenger_auc:.4f} < {self.VALIDATION_CRITERIA['min_auc_roc']}"
})

if champion_auc - challenger_auc <= self.VALIDATION_CRITERIA["max_auc_drop_from_champion"]:
validations.append({"check": "max_auc_drop", "passed": True})
else:
validations.append({
"check": "max_auc_drop",
"passed": False,
"reason": f"AUC dropped {champion_auc - challenger_auc:.4f} from champion"
})

if results["precision_at_90_recall"] >= self.VALIDATION_CRITERIA["min_precision_at_90_recall"]:
validations.append({"check": "precision_at_recall", "passed": True})
else:
validations.append({
"check": "precision_at_recall",
"passed": False,
"reason": f"Precision {results['precision_at_90_recall']:.4f} < required"
})

if results["latency_p99_ms"] <= self.VALIDATION_CRITERIA["max_latency_p99_ms"]:
validations.append({"check": "latency", "passed": True})
else:
validations.append({
"check": "latency",
"passed": False,
"reason": f"P99 latency {results['latency_p99_ms']}ms > {self.VALIDATION_CRITERIA['max_latency_p99_ms']}ms"
})

# Overall pass/fail
all_passed = all(v["passed"] for v in validations)

return ValidationResult(
model_uri=challenger_model_uri,
validation_dataset=validation_dataset_id,
timestamp=datetime.utcnow(),
results=results,
validations=validations,
passed=all_passed,
recommendation="PROCEED_TO_SHADOW" if all_passed else "REJECT"
)

def _precision_at_recall(
self,
y_true: np.array,
y_scores: np.array,
recall_target: float
) -> float:
"""Find precision at specified recall level."""
precision, recall, thresholds = precision_recall_curve(y_true, y_scores)

# Find threshold that achieves target recall
for i, r in enumerate(recall):
if r >= recall_target:
return precision[i]

return precision[-1]

2. Pre-Production Acceptance Criteria

2.1 Sprint-1 Go/No-Go Checklist

class Sprint1AcceptanceCriteria:
"""Pre-production acceptance criteria for Sprint-1."""

CRITERIA = {
# Latency requirements
"latency": {
"end_to_end_p50_ms": {"target": 100, "max": 150},
"end_to_end_p99_ms": {"target": 180, "max": 200},
"feature_assembly_p99_ms": {"target": 45, "max": 50},
"model_inference_p99_ms": {"target": 25, "max": 30},
"policy_evaluation_p99_ms": {"target": 12, "max": 15},
},

# Correctness requirements
"correctness": {
"idempotency_duplicate_rate": {"target": 0, "max": 0},
"event_ordering_violations": {"target": 0, "max": 0},
"feature_computation_accuracy": {"target": 0.999, "min": 0.995},
"decision_consistency_rate": {"target": 1.0, "min": 0.999},
},

# Performance requirements
"performance": {
"throughput_tps": {"target": 1000, "min": 500},
"error_rate": {"target": 0.001, "max": 0.01},
"availability": {"target": 0.9999, "min": 0.999},
},

# Business metrics (compared to baseline)
"business": {
"approval_rate_delta": {"target": 0, "min": -0.02},
"fraud_detection_rate_delta": {"target": 0, "min": -0.05},
"false_positive_rate_delta": {"target": 0, "max": 0.01},
},

# Operational readiness
"operational": {
"monitoring_coverage": {"target": 1.0, "min": 0.9},
"alert_response_test": {"target": True},
"runbook_completeness": {"target": 1.0, "min": 0.9},
"rollback_test": {"target": True},
},
}

async def run_acceptance_tests(self) -> AcceptanceReport:
"""Run full acceptance test suite."""

results = {}

# Latency tests
results["latency"] = await self._test_latency()

# Correctness tests
results["correctness"] = await self._test_correctness()

# Performance tests
results["performance"] = await self._test_performance()

# Business metrics comparison
results["business"] = await self._test_business_metrics()

# Operational readiness
results["operational"] = await self._test_operational()

# Evaluate results
all_passed = True
failures = []

for category, category_results in results.items():
for metric, result in category_results.items():
criteria = self.CRITERIA[category][metric]

passed = self._evaluate_metric(result["value"], criteria)
result["passed"] = passed
result["criteria"] = criteria

if not passed:
all_passed = False
failures.append({
"category": category,
"metric": metric,
"value": result["value"],
"criteria": criteria
})

return AcceptanceReport(
timestamp=datetime.utcnow(),
results=results,
passed=all_passed,
failures=failures,
recommendation="GO" if all_passed else "NO_GO"
)

async def _test_latency(self) -> dict:
"""Test latency under load."""

# Run load test
load_test = await self._run_load_test(
duration_seconds=300,
target_tps=500,
ramp_up_seconds=60
)

return {
"end_to_end_p50_ms": {"value": load_test.p50_latency},
"end_to_end_p99_ms": {"value": load_test.p99_latency},
"feature_assembly_p99_ms": {"value": load_test.feature_p99},
"model_inference_p99_ms": {"value": load_test.model_p99},
"policy_evaluation_p99_ms": {"value": load_test.policy_p99},
}

async def _test_correctness(self) -> dict:
"""Test correctness guarantees."""

# Idempotency test
duplicate_results = await self._test_idempotency(
num_requests=1000,
duplicate_rate=0.1 # Send 10% duplicates
)

# Event ordering test
ordering_results = await self._test_event_ordering(
num_sequences=100
)

# Feature accuracy test
feature_accuracy = await self._test_feature_accuracy(
sample_size=1000
)

# Decision consistency (same input = same output)
consistency = await self._test_decision_consistency(
num_requests=500,
repeat_each=3
)

return {
"idempotency_duplicate_rate": {"value": duplicate_results.duplicate_effects},
"event_ordering_violations": {"value": ordering_results.violations},
"feature_computation_accuracy": {"value": feature_accuracy.accuracy},
"decision_consistency_rate": {"value": consistency.consistency_rate},
}

async def _test_idempotency(
self,
num_requests: int,
duplicate_rate: float
) -> IdempotencyTestResult:
"""Verify idempotency guarantees."""

events = self._generate_test_events(num_requests)

# Add duplicates
duplicates_to_send = int(num_requests * duplicate_rate)
duplicate_events = random.sample(events, duplicates_to_send)
all_events = events + duplicate_events
random.shuffle(all_events)

# Send all events
responses = []
for event in all_events:
response = await self.api_client.process_event(event)
responses.append({
"event": event,
"response": response
})

# Check for duplicate effects
effects_by_idem_key = defaultdict(list)
for r in responses:
key = r["event"]["idempotency_key"]
effects_by_idem_key[key].append(r["response"])

duplicate_effects = 0
for key, effects in effects_by_idem_key.items():
if len(effects) > 1:
# Check if effects are identical (idempotent)
if not all(e == effects[0] for e in effects):
duplicate_effects += 1

return IdempotencyTestResult(
total_events=len(all_events),
unique_events=num_requests,
duplicates_sent=duplicates_to_send,
duplicate_effects=duplicate_effects
)

2.2 Load Testing Configuration

class LoadTestRunner:
"""Run load tests for capacity validation."""

async def run_load_test(
self,
duration_seconds: int,
target_tps: int,
ramp_up_seconds: int = 60
) -> LoadTestResults:
"""Run load test with specified parameters."""

# Generate test events
events_needed = duration_seconds * target_tps
test_events = await self._generate_realistic_events(events_needed)

# Configure load pattern
load_pattern = self._create_ramp_pattern(
target_tps=target_tps,
duration=duration_seconds,
ramp_up=ramp_up_seconds
)

# Run test
results = []
start_time = time.time()

async with aiohttp.ClientSession() as session:
for batch_start, batch_tps in load_pattern:
batch_events = test_events[batch_start:batch_start + batch_tps]

# Send batch concurrently
tasks = [
self._send_event(session, event)
for event in batch_events
]

batch_results = await asyncio.gather(*tasks, return_exceptions=True)
results.extend(batch_results)

# Wait for next second
elapsed = time.time() - start_time
if elapsed < batch_start + 1:
await asyncio.sleep(batch_start + 1 - elapsed)

# Analyze results
return self._analyze_load_results(results)

def _analyze_load_results(self, results: list) -> LoadTestResults:
"""Analyze load test results."""

successful = [r for r in results if not isinstance(r, Exception) and r["status"] == "success"]
failed = [r for r in results if isinstance(r, Exception) or r.get("status") != "success"]

latencies = [r["latency_ms"] for r in successful]

return LoadTestResults(
total_requests=len(results),
successful_requests=len(successful),
failed_requests=len(failed),
success_rate=len(successful) / len(results) if results else 0,
error_rate=len(failed) / len(results) if results else 0,

# Latency percentiles
p50_latency=np.percentile(latencies, 50) if latencies else 0,
p90_latency=np.percentile(latencies, 90) if latencies else 0,
p95_latency=np.percentile(latencies, 95) if latencies else 0,
p99_latency=np.percentile(latencies, 99) if latencies else 0,
max_latency=max(latencies) if latencies else 0,

# Throughput
actual_tps=len(successful) / (results[-1]["timestamp"] - results[0]["timestamp"]).total_seconds() if len(results) > 1 else 0,

# Component breakdown
feature_p99=np.percentile([r["feature_latency"] for r in successful], 99) if successful else 0,
model_p99=np.percentile([r["model_latency"] for r in successful], 99) if successful else 0,
policy_p99=np.percentile([r["policy_latency"] for r in successful], 99) if successful else 0,
)

3. Production Monitoring & Alerting

3.1 Monitoring Dashboard Configuration

# grafana_dashboards/fraud_platform.yaml

dashboard:
title: "Fraud Platform - Production Monitoring"
refresh: "30s"
time:
from: "now-1h"
to: "now"

rows:
- title: "System Health"
panels:
- title: "Decision Latency (P99)"
type: "graph"
targets:
- expr: "histogram_quantile(0.99, rate(fraud_decision_latency_bucket[5m]))"
thresholds:
- value: 150
color: "yellow"
- value: 200
color: "red"

- title: "Request Rate"
type: "graph"
targets:
- expr: "rate(fraud_decisions_total[1m])"
alert:
name: "Low Traffic Alert"
condition: "< 10" # Less than 10 TPS

- title: "Error Rate"
type: "graph"
targets:
- expr: "rate(fraud_decision_errors_total[5m]) / rate(fraud_decisions_total[5m])"
thresholds:
- value: 0.01
color: "yellow"
- value: 0.05
color: "red"

- title: "Safe Mode Status"
type: "stat"
targets:
- expr: "fraud_safe_mode_active"
valueMappings:
- value: 0
text: "Normal"
color: "green"
- value: 1
text: "SAFE MODE"
color: "red"

- title: "Decision Distribution"
panels:
- title: "Decisions by Type"
type: "piechart"
targets:
- expr: "sum(rate(fraud_decisions_total[5m])) by (decision)"

- title: "Approval Rate (Rolling 1h)"
type: "gauge"
targets:
- expr: "sum(rate(fraud_decisions_total{decision='ALLOW'}[1h])) / sum(rate(fraud_decisions_total[1h]))"
thresholds:
- value: 0.90
color: "green"
- value: 0.85
color: "yellow"
- value: 0.80
color: "red"

- title: "Block Rate (Rolling 1h)"
type: "gauge"
targets:
- expr: "sum(rate(fraud_decisions_total{decision='BLOCK'}[1h])) / sum(rate(fraud_decisions_total[1h]))"

- title: "Model & Feature Health"
panels:
- title: "Criminal Fraud Score Distribution"
type: "heatmap"
targets:
- expr: "histogram_quantile(0.5, fraud_criminal_score_bucket)"

- title: "Feature Drift (PSI)"
type: "graph"
targets:
- expr: "fraud_feature_psi{feature=~'card_attempts.*|device_distinct.*'}"
thresholds:
- value: 0.1
color: "yellow"
- value: 0.2
color: "red"

- title: "Model Prediction Latency"
type: "graph"
targets:
- expr: "histogram_quantile(0.99, rate(fraud_model_latency_bucket[5m]))"

- title: "Champion vs Challenger"
type: "graph"
targets:
- expr: "rate(fraud_decisions_total{experiment='champion'}[5m])"
legend: "Champion"
- expr: "rate(fraud_decisions_total{experiment!='champion'}[5m])"
legend: "Challenger"

- title: "Velocity & Attacks"
panels:
- title: "Velocity Rules Triggered"
type: "graph"
targets:
- expr: "sum(rate(fraud_velocity_triggered_total[5m])) by (rule)"

- title: "Card Testing Detections"
type: "graph"
targets:
- expr: "rate(fraud_card_testing_detected_total[5m])"
alert:
name: "Card Testing Spike"
condition: "> 100"

- title: "Bot Detections"
type: "graph"
targets:
- expr: "rate(fraud_bot_detected_total[5m])"

- title: "Blocklist Hits"
type: "graph"
targets:
- expr: "sum(rate(fraud_blocklist_hits_total[5m])) by (list_type)"

- title: "Business Metrics (Lagged)"
panels:
- title: "Fraud Rate (30-day lag)"
type: "stat"
targets:
- expr: "fraud_rate_30d_lag"
thresholds:
- value: 0.005
color: "green"
- value: 0.01
color: "yellow"
- value: 0.02
color: "red"

- title: "Chargeback Volume (USD)"
type: "graph"
targets:
- expr: "sum(fraud_chargeback_amount_total)"

- title: "Dispute Win Rate"
type: "gauge"
targets:
- expr: "fraud_dispute_win_rate"

3.2 Alert Configuration

# alertmanager/fraud_alerts.yaml

groups:
- name: fraud_platform_critical
rules:
# Latency alerts
- alert: FraudDecisionLatencyHigh
expr: histogram_quantile(0.99, rate(fraud_decision_latency_bucket[5m])) > 200
for: 2m
labels:
severity: critical
team: fraud-platform
annotations:
summary: "Fraud decision P99 latency exceeds 200ms"
description: "P99 latency is {{ $value }}ms. Check model service and Redis."
runbook: "https://runbooks.internal/fraud/high-latency"

- alert: FraudDecisionLatencyWarning
expr: histogram_quantile(0.99, rate(fraud_decision_latency_bucket[5m])) > 150
for: 5m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Fraud decision P99 latency elevated"

# Error rate alerts
- alert: FraudErrorRateCritical
expr: rate(fraud_decision_errors_total[5m]) / rate(fraud_decisions_total[5m]) > 0.05
for: 1m
labels:
severity: critical
team: fraud-platform
annotations:
summary: "Fraud error rate exceeds 5%"
description: "Error rate is {{ $value | humanizePercentage }}. Entering safe mode may be required."
runbook: "https://runbooks.internal/fraud/high-error-rate"

# Safe mode alert
- alert: FraudSafeModeActive
expr: fraud_safe_mode_active == 1
for: 0m
labels:
severity: critical
team: fraud-platform
annotations:
summary: "Fraud platform in SAFE MODE"
description: "Fallback rules are in effect. ML models may be unavailable."
runbook: "https://runbooks.internal/fraud/safe-mode"

# Traffic anomaly
- alert: FraudTrafficDrop
expr: rate(fraud_decisions_total[5m]) < 10
for: 5m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Fraud platform traffic unusually low"
description: "Only {{ $value }} TPS. Check upstream integrations."

- alert: FraudTrafficSpike
expr: rate(fraud_decisions_total[5m]) > rate(fraud_decisions_total[1h] offset 1d) * 2
for: 5m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Fraud platform traffic spike detected"
description: "Traffic is 2x higher than same time yesterday."

- name: fraud_platform_model_health
rules:
# Feature drift
- alert: FeatureDriftDetected
expr: fraud_feature_psi > 0.2
for: 15m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Feature drift detected: {{ $labels.feature }}"
description: "PSI is {{ $value }}. Consider retraining model."

# Score distribution shift
- alert: ScoreDistributionShift
expr: abs(avg(fraud_criminal_score) - avg(fraud_criminal_score offset 1d)) > 0.1
for: 30m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Criminal fraud score distribution shifted"
description: "Average score changed by {{ $value }}. Investigate model or data issues."

# Model version mismatch
- alert: ModelVersionMismatch
expr: count(count by (model_version) (fraud_decisions_total)) > 2
for: 5m
labels:
severity: warning
team: fraud-platform
annotations:
summary: "Multiple model versions serving traffic"

- name: fraud_platform_business
rules:
# Approval rate drop
- alert: ApprovalRateDrop
expr: sum(rate(fraud_decisions_total{decision='ALLOW'}[1h])) / sum(rate(fraud_decisions_total[1h])) < 0.85
for: 30m
labels:
severity: warning
team: fraud-ops
annotations:
summary: "Approval rate below 85%"
description: "Approval rate is {{ $value | humanizePercentage }}. Review threshold settings."

# Block rate spike
- alert: BlockRateSpike
expr: sum(rate(fraud_decisions_total{decision='BLOCK'}[1h])) / sum(rate(fraud_decisions_total[1h])) > 0.10
for: 15m
labels:
severity: warning
team: fraud-ops
annotations:
summary: "Block rate exceeds 10%"
description: "Unusual block rate. Possible attack or threshold issue."

# Card testing attack
- alert: CardTestingAttack
expr: sum(rate(fraud_card_testing_detected_total[5m])) > 50
for: 5m
labels:
severity: critical
team: fraud-ops
annotations:
summary: "Card testing attack in progress"
description: "{{ $value }} card testing events per minute detected."
runbook: "https://runbooks.internal/fraud/card-testing-response"

3.3 Metrics Collection Code

from prometheus_client import Counter, Histogram, Gauge

# Decision metrics
DECISIONS_TOTAL = Counter(
"fraud_decisions_total",
"Total fraud decisions",
["decision", "experiment", "model_version"]
)

DECISION_LATENCY = Histogram(
"fraud_decision_latency_seconds",
"Fraud decision latency",
buckets=[0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.25, 0.3, 0.5, 1.0]
)

DECISION_ERRORS = Counter(
"fraud_decision_errors_total",
"Fraud decision errors",
["error_type"]
)

# Score metrics
CRIMINAL_SCORE = Histogram(
"fraud_criminal_score",
"Criminal fraud score distribution",
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]
)

FRIENDLY_SCORE = Histogram(
"fraud_friendly_score",
"Friendly fraud score distribution",
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]
)

# Feature metrics
FEATURE_PSI = Gauge(
"fraud_feature_psi",
"Feature Population Stability Index",
["feature"]
)

# Velocity metrics
VELOCITY_TRIGGERED = Counter(
"fraud_velocity_triggered_total",
"Velocity rules triggered",
["rule"]
)

# Detection metrics
CARD_TESTING_DETECTED = Counter(
"fraud_card_testing_detected_total",
"Card testing patterns detected"
)

BOT_DETECTED = Counter(
"fraud_bot_detected_total",
"Bot transactions detected"
)

BLOCKLIST_HITS = Counter(
"fraud_blocklist_hits_total",
"Blocklist hits",
["list_type"]
)

# Safe mode
SAFE_MODE_ACTIVE = Gauge(
"fraud_safe_mode_active",
"Safe mode status (1=active, 0=normal)"
)

# Component latency
MODEL_LATENCY = Histogram(
"fraud_model_latency_seconds",
"Model inference latency",
buckets=[0.005, 0.01, 0.015, 0.02, 0.025, 0.03, 0.05, 0.1]
)

FEATURE_LATENCY = Histogram(
"fraud_feature_latency_seconds",
"Feature assembly latency",
buckets=[0.01, 0.02, 0.03, 0.04, 0.05, 0.075, 0.1]
)

POLICY_LATENCY = Histogram(
"fraud_policy_latency_seconds",
"Policy evaluation latency",
buckets=[0.002, 0.005, 0.01, 0.015, 0.02, 0.025]
)


class MetricsCollector:
"""Collect and expose metrics."""

def record_decision(
self,
decision: PolicyDecision,
scores: RiskAssessment,
latency_ms: float,
experiment: str,
model_version: str
):
"""Record a fraud decision."""

# Decision counter
DECISIONS_TOTAL.labels(
decision=decision.action,
experiment=experiment,
model_version=model_version
).inc()

# Latency histogram
DECISION_LATENCY.observe(latency_ms / 1000)

# Score histograms
CRIMINAL_SCORE.observe(scores.criminal_fraud_score)
FRIENDLY_SCORE.observe(scores.friendly_fraud_score)

# Velocity rules
for rule in scores.signals.get("velocity", []):
VELOCITY_TRIGGERED.labels(rule=rule["rule_name"]).inc()

# Detection signals
if scores.signals.get("card_testing"):
CARD_TESTING_DETECTED.inc()

if scores.signals.get("bot", {}).get("is_bot"):
BOT_DETECTED.inc()

def record_component_latency(
self,
feature_ms: float,
model_ms: float,
policy_ms: float
):
"""Record component-level latencies."""
FEATURE_LATENCY.observe(feature_ms / 1000)
MODEL_LATENCY.observe(model_ms / 1000)
POLICY_LATENCY.observe(policy_ms / 1000)

def record_error(self, error_type: str):
"""Record an error."""
DECISION_ERRORS.labels(error_type=error_type).inc()

def set_safe_mode(self, active: bool):
"""Set safe mode status."""
SAFE_MODE_ACTIVE.set(1 if active else 0)

async def update_feature_drift(self):
"""Periodic job to compute feature drift metrics."""
drift_calculator = FeatureDriftCalculator()

for feature in TOP_FEATURES:
psi = await drift_calculator.compute_psi(
feature,
reference_window_days=30,
current_window_days=1
)
FEATURE_PSI.labels(feature=feature).set(psi)

4. Sprint-1 Implementation Checklist

4.1 Infrastructure Setup

TaskStatusOwnerNotes
Kafka cluster provisioned[ ]Platform3 brokers, 7-day retention
Flink cluster deployed[ ]PlatformCheckpointing enabled
Redis Cluster deployed[ ]Platform6 nodes, persistence
PostgreSQL (evidence vault)[ ]PlatformRDS Multi-AZ
S3 bucket (evidence blobs)[ ]PlatformVersioning enabled
Kubernetes namespace[ ]PlatformResource quotas set
Secrets management[ ]SecurityHashiCorp Vault
Network policies[ ]SecurityPod-to-pod isolation

4.2 Core Services

ServiceStatusEndpointHealth Check
Ingestion API[ ]/api/v1/events/health
Feature Service[ ]Internal/health
Risk Scoring Service[ ]Internal/health
Policy Engine[ ]Internal/health
Evidence Service[ ]Internal/health
Model Service (Seldon)[ ]/v1/models/fraud/health

4.3 Data Pipelines

PipelineStatusKafka TopicsNotes
Event Normalization[ ]raw-events -> normalized-eventsFlink job
Velocity Counter Updates[ ]normalized-events -> RedisFlink job
Entity Profile Updates[ ]normalized-events -> Redis/FeastFlink job
Evidence Writer[ ]decisions -> PostgreSQL/S3Flink job
Chargeback Ingestion[ ]chargebacks -> PostgreSQLFlink job
Training Data Generator[ ]Batch (weekly)Kubeflow

4.4 Model & Policy

ItemStatusVersionNotes
Criminal fraud model trained[ ]v1.0XGBoost/LightGBM
Friendly fraud model trained[ ]v1.0XGBoost/LightGBM
Model registered in MLflow[ ]Champion stage
Policy config finalized[ ]2025.01.01YAML in Git
Thresholds calibrated[ ]Based on historical data
Velocity rules configured[ ]Redis-backed
Blocklists seeded[ ]Initial known bad actors

4.5 Monitoring & Observability

ItemStatusToolNotes
Prometheus metrics exposed[ ]PrometheusAll services
Grafana dashboards created[ ]Grafana5 dashboards
Alertmanager rules deployed[ ]Alertmanager15+ rules
PagerDuty integration[ ]PagerDutyEscalation policies
Distributed tracing[ ]JaegerOpenTelemetry
Log aggregation[ ]Elasticsearch30-day retention
Runbooks documented[ ]Confluence10+ runbooks

4.6 Testing & Validation

Test TypeStatusCoverageNotes
Unit tests[ ]80%+All services
Integration tests[ ]Key flowsEnd-to-end
Load testing[ ]500+ TPS5-minute sustained
Chaos testing[ ]Redis failure, model failure
Idempotency testing[ ]10% duplicates
Replay testing[ ]30-day historical
Security testing[ ]OWASP, PCI scan

4.7 Documentation

DocumentStatusLocationNotes
Architecture diagram[ ]ConfluenceUpdated
API documentation[ ]Swagger/OpenAPIAll endpoints
Runbooks[ ]Confluence10+ scenarios
Incident response playbook[ ]ConfluenceSeverity matrix
Threshold tuning guide[ ]ConfluenceFor fraud ops
Model documentation[ ]MLflowTraining, features, metrics

4.8 Go-Live Checklist

StepStatusOwnerSign-off
All acceptance tests passing[ ]QA
Load test at 2x expected traffic[ ]Platform
Rollback procedure tested[ ]Platform
On-call rotation scheduled[ ]Ops
Stakeholder sign-off[ ]Product
PCI compliance verified[ ]Security
Traffic cutover plan reviewed[ ]Platform
Monitoring dashboards accessible[ ]Ops
Alert routing confirmed[ ]Ops
Post-launch monitoring plan[ ]All24/7 for 72 hours

5. Summary: Sprint-1 Deliverables

5.1 What Ships in Sprint-1

  1. Real-time decision API - <200ms latency, exactly-once semantics
  2. Velocity features - Card, device, IP, user counters with sliding windows
  3. Criminal fraud detection - Card testing, velocity attacks, geo anomalies, bot detection
  4. Friendly fraud scoring - Historical abuse patterns, behavioral consistency
  5. Policy engine - Configurable thresholds, velocity rules, blocklists/allowlists
  6. Evidence vault - Immutable transaction evidence for disputes
  7. Basic chargeback ingestion - Link chargebacks to transactions, label taxonomy
  8. Monitoring & alerting - Grafana dashboards, Prometheus metrics, PagerDuty alerts

5.2 Deferred to Later Sprints

ItemSprintRationale
Automated representmentSprint 2Requires evidence quality baseline
Economic optimization UISprint 2Manual thresholds sufficient initially
Champion/challenger A/BSprint 2Need baseline metrics first
Model retraining pipelineSprint 3Need labeled data from Sprint 1-2
IRSF detectionPhase 2Different signal types
ATO detectionPhase 2Requires session/login data
Subscription fraudPhase 2Different lifecycle

5.3 Success Metrics for Sprint-1

MetricTargetMeasurement
P99 Latency<200msPrometheus histogram
Availability99.9%Uptime monitoring
Error Rate<0.1%Error counter / total
Approval Rate>92%Decision counter
Block Rate<5%Decision counter
False Positive Rate<10% of blocksPost-hoc analysis
Fraud Detection Rate>70% of known fraudHistorical comparison

6. Appendix: Quick Reference

6.1 Key Redis Keys

# Entity profiles
entity:user:{user_id}
entity:device:{device_fingerprint}
entity:card:{card_token}
entity:ip:{ip_hash}
entity:service:{service_id}

# Velocity counters (sorted sets with timestamps)
velocity:card:{card_token}:attempts
velocity:device:{device_fp}:cards
velocity:ip:{ip_hash}:users

# HyperLogLog for distinct counts
hll:device:{device_fp}:cards:1h
hll:device:{device_fp}:cards:24h

# Blocklists
blocklist:card_tokens
blocklist:devices
blocklist:ips
blocklist:users

# Allowlists
allowlist:users
allowlist:services

# Idempotency
idempotency:{key}

6.2 Key Kafka Topics

raw-payment-events          # From PSP webhooks
normalized-events # After normalization
decisions # Decision events
evidence-captured # Evidence capture confirmations
chargebacks # Chargeback events
issuer-alerts # TC40/SAFE alerts
dispute-outcomes # Dispute resolution events

6.3 Key API Endpoints

POST /api/v1/events              # Process payment event
GET /api/v1/decisions/{id} # Get decision by ID
GET /api/v1/evidence/{auth_id} # Get evidence for transaction
GET /api/v1/health # Health check
GET /api/v1/metrics # Prometheus metrics

# Internal
POST /internal/score # Risk scoring
POST /internal/policy/evaluate # Policy evaluation
GET /internal/features/{entity} # Get entity features

End of Sprint-1 Design Document


Document Index

  1. Part 1: Technology Stack & Architecture
  2. Part 2: External Entities, Data Schemas & Features
  3. Part 3: Detection Logic & Policy Engine
  4. Part 4: Evidence Pipeline, Disputes & Economics
  5. Part 5: Testing, Validation, Monitoring & Checklist (this document)