Skip to main content

Part 3: Detection Logic & Policy Engine

Payment & Chargeback Fraud Platform - Principal-Level Design Document

Phase 1: Real-Time Streaming & Decisioning


1. Criminal Fraud Detection

1.1 Card Testing / BIN Attack Detection

Card testing attacks involve fraudsters validating stolen card numbers through small transactions before making larger purchases.

Attack Patterns:

  • Rapid small-value authorizations (< $5)
  • Sequential card numbers from same BIN
  • High decline rates followed by successful transactions
  • Single device/IP testing multiple cards
class CardTestingDetector:
"""Detect card testing and BIN attack patterns."""

# Thresholds (configurable, subject to rotation)
THRESHOLDS = {
"device_cards_1h": 5, # Max distinct cards per device per hour
"ip_cards_1h": 10, # Max distinct cards per IP per hour
"ip_bins_1h": 3, # Max distinct BINs per IP per hour
"decline_rate_threshold": 0.5, # 50% decline rate triggers alert
"small_txn_threshold_usd": 5.0, # Transactions under this = "small"
"small_txn_count_1h": 10, # Max small transactions per entity
}

async def detect(self, event: PaymentEvent, features: dict) -> CardTestingResult:
signals = []
risk_score = 0.0

# Signal 1: Device testing multiple cards
if features["device_distinct_cards_1h"] > self.THRESHOLDS["device_cards_1h"]:
signals.append({
"signal": "device_multi_card",
"value": features["device_distinct_cards_1h"],
"threshold": self.THRESHOLDS["device_cards_1h"],
"severity": "HIGH"
})
risk_score += 0.4

# Signal 2: IP testing multiple cards
if features["ip_distinct_cards_1h"] > self.THRESHOLDS["ip_cards_1h"]:
signals.append({
"signal": "ip_multi_card",
"value": features["ip_distinct_cards_1h"],
"threshold": self.THRESHOLDS["ip_cards_1h"],
"severity": "HIGH"
})
risk_score += 0.3

# Signal 3: BIN enumeration (testing cards from same issuer range)
if features["ip_distinct_bins_1h"] > self.THRESHOLDS["ip_bins_1h"]:
signals.append({
"signal": "bin_enumeration",
"value": features["ip_distinct_bins_1h"],
"threshold": self.THRESHOLDS["ip_bins_1h"],
"severity": "CRITICAL"
})
risk_score += 0.5

# Signal 4: High decline rate on device
if features["device_decline_rate_1h"] > self.THRESHOLDS["decline_rate_threshold"]:
signals.append({
"signal": "high_decline_rate",
"value": features["device_decline_rate_1h"],
"threshold": self.THRESHOLDS["decline_rate_threshold"],
"severity": "MEDIUM"
})
risk_score += 0.2

# Signal 5: Small transaction velocity
if (event.amount_usd < self.THRESHOLDS["small_txn_threshold_usd"] and
features["device_small_txn_count_1h"] > self.THRESHOLDS["small_txn_count_1h"]):
signals.append({
"signal": "small_txn_velocity",
"value": features["device_small_txn_count_1h"],
"threshold": self.THRESHOLDS["small_txn_count_1h"],
"severity": "HIGH"
})
risk_score += 0.35

# Signal 6: Sequential card pattern (BIN + incrementing)
if await self._detect_sequential_cards(event, features):
signals.append({
"signal": "sequential_card_pattern",
"value": "detected",
"severity": "CRITICAL"
})
risk_score += 0.6

return CardTestingResult(
detected=len(signals) > 0,
risk_score=min(risk_score, 1.0),
signals=signals,
recommended_action=self._get_action(risk_score)
)

async def _detect_sequential_cards(self, event: PaymentEvent, features: dict) -> bool:
"""Detect if recent cards from same device/IP are sequential."""

# Get recent card tokens from this device
recent_cards = await self.redis.zrange(
f"device:{event.device_fingerprint}:recent_bins",
0, -1,
withscores=True
)

if len(recent_cards) < 3:
return False

# Extract BIN + last 4 patterns
# (We only have BIN and last4, not full PAN, but patterns still detectable)
bins = [card[0].split(":")[0] for card in recent_cards]

# Check if all from same BIN
if len(set(bins)) == 1:
# Same BIN, high suspicion of sequential testing
return True

return False

def _get_action(self, risk_score: float) -> str:
if risk_score >= 0.8:
return "BLOCK"
elif risk_score >= 0.5:
return "FRICTION"
elif risk_score >= 0.3:
return "REVIEW"
return "ALLOW"

1.2 Velocity Attack Detection

class VelocityAttackDetector:
"""Detect velocity-based fraud attacks."""

VELOCITY_RULES = [
# Card velocity
{
"name": "card_rapid_fire",
"feature": "card_attempts_10m",
"threshold": 3,
"action": "FRICTION",
"description": "Card used 3+ times in 10 minutes"
},
{
"name": "card_hourly_limit",
"feature": "card_attempts_1h",
"threshold": 5,
"action": "BLOCK",
"description": "Card used 5+ times in 1 hour"
},

# Device velocity
{
"name": "device_burst",
"feature": "device_transaction_count_10m",
"threshold": 5,
"action": "BLOCK",
"description": "Device 5+ transactions in 10 minutes"
},
{
"name": "device_hourly",
"feature": "device_transaction_count_1h",
"threshold": 15,
"action": "FRICTION",
"description": "Device 15+ transactions in 1 hour"
},

# IP velocity
{
"name": "ip_burst",
"feature": "ip_transaction_count_10m",
"threshold": 10,
"action": "FRICTION",
"description": "IP 10+ transactions in 10 minutes"
},
{
"name": "ip_hourly",
"feature": "ip_transaction_count_1h",
"threshold": 50,
"action": "REVIEW",
"description": "IP 50+ transactions in 1 hour"
},

# Amount velocity
{
"name": "card_amount_daily",
"feature": "card_total_amount_24h_usd",
"threshold": 5000,
"action": "FRICTION",
"description": "Card $5000+ in 24 hours"
},
{
"name": "user_amount_daily",
"feature": "user_total_amount_24h_usd",
"threshold": 10000,
"action": "REVIEW",
"description": "User $10000+ in 24 hours"
},
]

async def evaluate(self, event: PaymentEvent, features: dict) -> VelocityResult:
triggered_rules = []
max_action = "ALLOW"
action_priority = {"ALLOW": 0, "REVIEW": 1, "FRICTION": 2, "BLOCK": 3}

for rule in self.VELOCITY_RULES:
feature_value = features.get(rule["feature"], 0)

if feature_value >= rule["threshold"]:
triggered_rules.append({
"rule_name": rule["name"],
"feature": rule["feature"],
"value": feature_value,
"threshold": rule["threshold"],
"action": rule["action"],
"description": rule["description"]
})

if action_priority[rule["action"]] > action_priority[max_action]:
max_action = rule["action"]

return VelocityResult(
rules_triggered=triggered_rules,
recommended_action=max_action,
rule_count=len(triggered_rules)
)

1.3 Geographic Anomaly Detection

class GeoAnomalyDetector:
"""Detect geographic impossibilities and anomalies."""

# Maximum plausible travel speeds (km/h)
MAX_TRAVEL_SPEEDS = {
"ground": 150, # Car/train
"air_domestic": 800,
"air_international": 1000,
}

async def detect(self, event: PaymentEvent, features: dict) -> GeoAnomalyResult:
signals = []

# Signal 1: Geo-velocity (impossible travel)
geo_velocity = await self._check_geo_velocity(event, features)
if geo_velocity["impossible"]:
signals.append({
"signal": "impossible_travel",
"details": geo_velocity,
"severity": "HIGH"
})

# Signal 2: IP-Billing mismatch
ip_billing_distance = features.get("ip_geo_distance_from_billing_km", 0)
if ip_billing_distance > 500: # More than 500km
signals.append({
"signal": "ip_billing_mismatch",
"distance_km": ip_billing_distance,
"ip_country": features.get("ip_geo_country"),
"billing_country": event.billing_country,
"severity": "MEDIUM"
})

# Signal 3: Card country vs IP country mismatch
if (features.get("card_issuer_country") and
features.get("ip_geo_country") and
features["card_issuer_country"] != features["ip_geo_country"]):
signals.append({
"signal": "cross_border_mismatch",
"card_country": features["card_issuer_country"],
"ip_country": features["ip_geo_country"],
"severity": "LOW" # Common for legitimate travel
})

# Signal 4: High-risk country
if features.get("ip_geo_country") in HIGH_RISK_COUNTRIES:
signals.append({
"signal": "high_risk_country",
"country": features["ip_geo_country"],
"severity": "MEDIUM"
})

# Signal 5: Proxy/VPN/Tor detection
if features.get("ip_is_proxy") or features.get("ip_is_vpn"):
signals.append({
"signal": "anonymization_detected",
"is_proxy": features.get("ip_is_proxy"),
"is_vpn": features.get("ip_is_vpn"),
"is_tor": features.get("ip_is_tor"),
"severity": "MEDIUM"
})

return GeoAnomalyResult(
signals=signals,
risk_score=self._calculate_geo_risk(signals)
)

async def _check_geo_velocity(self, event: PaymentEvent, features: dict) -> dict:
"""Check if user could have physically traveled between transactions."""

# Get last transaction location for this user
last_txn = await self.redis.hgetall(f"user:{event.user_id}:last_geo")

if not last_txn:
return {"impossible": False}

last_lat = float(last_txn.get("lat", 0))
last_lon = float(last_txn.get("lon", 0))
last_time = datetime.fromisoformat(last_txn.get("timestamp"))

current_lat = features.get("ip_geo_lat", 0)
current_lon = features.get("ip_geo_lon", 0)
current_time = event.event_timestamp

# Calculate distance (Haversine formula)
distance_km = self._haversine(last_lat, last_lon, current_lat, current_lon)

# Calculate time difference
time_diff_hours = (current_time - last_time).total_seconds() / 3600

if time_diff_hours <= 0:
return {"impossible": False}

# Calculate required speed
required_speed = distance_km / time_diff_hours

# Determine if impossible
max_speed = self.MAX_TRAVEL_SPEEDS["air_international"]
impossible = required_speed > max_speed

return {
"impossible": impossible,
"distance_km": distance_km,
"time_hours": time_diff_hours,
"required_speed_kmh": required_speed,
"max_plausible_speed": max_speed,
"last_location": {"lat": last_lat, "lon": last_lon},
"current_location": {"lat": current_lat, "lon": current_lon}
}

def _haversine(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two points in km."""
R = 6371 # Earth's radius in km

lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1

a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.asin(math.sqrt(a))

return R * c

1.4 Bot / Automation Detection

class BotDetector:
"""Detect automated/bot transactions."""

async def detect(self, event: PaymentEvent, features: dict) -> BotDetectionResult:
signals = []
bot_score = 0.0

# Signal 1: Known bot device fingerprint
if features.get("device_is_known_bot"):
signals.append({"signal": "known_bot_fingerprint", "severity": "CRITICAL"})
bot_score += 0.8

# Signal 2: Emulator/simulator detected
if features.get("device_is_emulator"):
signals.append({"signal": "emulator_detected", "severity": "HIGH"})
bot_score += 0.6

# Signal 3: Datacenter IP
if features.get("ip_is_datacenter"):
signals.append({"signal": "datacenter_ip", "severity": "MEDIUM"})
bot_score += 0.3

# Signal 4: Suspicious user agent
ua_analysis = self._analyze_user_agent(event.user_agent)
if ua_analysis["suspicious"]:
signals.append({
"signal": "suspicious_user_agent",
"details": ua_analysis,
"severity": "MEDIUM"
})
bot_score += 0.25

# Signal 5: Timing patterns (too fast, too regular)
timing = await self._analyze_timing(event, features)
if timing["suspicious"]:
signals.append({
"signal": "suspicious_timing",
"details": timing,
"severity": "MEDIUM"
})
bot_score += 0.3

# Signal 6: Missing browser fingerprint components
if features.get("device_fingerprint_completeness", 1.0) < 0.5:
signals.append({
"signal": "incomplete_fingerprint",
"completeness": features.get("device_fingerprint_completeness"),
"severity": "LOW"
})
bot_score += 0.15

return BotDetectionResult(
is_bot=bot_score >= 0.6,
bot_score=min(bot_score, 1.0),
signals=signals
)

def _analyze_user_agent(self, user_agent: str) -> dict:
"""Analyze user agent for bot indicators."""

if not user_agent:
return {"suspicious": True, "reason": "missing_ua"}

# Known bot patterns
bot_patterns = [
r"bot", r"crawler", r"spider", r"scraper",
r"headless", r"phantom", r"selenium", r"puppeteer"
]

for pattern in bot_patterns:
if re.search(pattern, user_agent, re.IGNORECASE):
return {"suspicious": True, "reason": f"contains_{pattern}"}

# Check for reasonable UA structure
if len(user_agent) < 20:
return {"suspicious": True, "reason": "too_short"}

if not re.search(r"Mozilla|Chrome|Safari|Firefox|Edge", user_agent):
return {"suspicious": True, "reason": "missing_browser"}

return {"suspicious": False}

async def _analyze_timing(self, event: PaymentEvent, features: dict) -> dict:
"""Analyze transaction timing for bot patterns."""

# Get recent transaction timestamps for this device
recent_times = await self.redis.zrange(
f"device:{event.device_fingerprint}:txn_times",
-10, -1, # Last 10 transactions
withscores=True
)

if len(recent_times) < 5:
return {"suspicious": False}

# Calculate inter-transaction intervals
intervals = []
for i in range(1, len(recent_times)):
interval = recent_times[i][1] - recent_times[i-1][1]
intervals.append(interval)

# Check for suspicious patterns
avg_interval = sum(intervals) / len(intervals)
std_interval = statistics.stdev(intervals) if len(intervals) > 1 else 0

# Too regular (low variance = likely automated)
if std_interval < 0.5 and avg_interval < 60: # Less than 0.5s variance, &lt;60s avg
return {
"suspicious": True,
"reason": "too_regular",
"avg_interval_seconds": avg_interval,
"std_interval": std_interval
}

# Too fast (sub-second transactions)
if avg_interval < 2:
return {
"suspicious": True,
"reason": "too_fast",
"avg_interval_seconds": avg_interval
}

return {"suspicious": False}

2. Friendly Fraud Detection

2.1 Historical Abuse Scoring

class FriendlyFraudScorer:
"""Score users for first-party fraud/abuse risk."""

async def score(self, event: PaymentEvent, features: dict) -> FriendlyFraudScore:
risk_factors = []
abuse_score = 0.0

# Factor 1: Historical chargeback rate
chargeback_rate = features.get("user_chargeback_rate_90d", 0)
if chargeback_rate > 0:
if chargeback_rate > 0.05: # >5% chargeback rate
risk_factors.append({
"factor": "high_chargeback_rate",
"value": chargeback_rate,
"weight": 0.4
})
abuse_score += 0.4
elif chargeback_rate > 0.02: # >2%
risk_factors.append({
"factor": "elevated_chargeback_rate",
"value": chargeback_rate,
"weight": 0.2
})
abuse_score += 0.2

# Factor 2: Dispute pattern (disputes after confirmed delivery)
post_delivery_disputes = features.get("user_post_delivery_dispute_count", 0)
if post_delivery_disputes > 1:
risk_factors.append({
"factor": "post_delivery_disputes",
"value": post_delivery_disputes,
"weight": 0.3
})
abuse_score += 0.3

# Factor 3: Refund abuse pattern
refund_rate = features.get("user_refund_rate_30d", 0)
if refund_rate > 0.15: # >15% refund rate
risk_factors.append({
"factor": "high_refund_rate",
"value": refund_rate,
"weight": 0.2
})
abuse_score += 0.2

# Factor 4: "Item not received" claims vs delivery confirmation
inr_claims = features.get("user_inr_claims_90d", 0)
confirmed_deliveries = features.get("user_confirmed_deliveries_90d", 0)
if inr_claims > 2 and confirmed_deliveries > inr_claims * 0.8:
risk_factors.append({
"factor": "suspicious_inr_pattern",
"inr_claims": inr_claims,
"confirmed_deliveries": confirmed_deliveries,
"weight": 0.35
})
abuse_score += 0.35

# Factor 5: Multiple cards, same disputes
if (features.get("user_distinct_cards_with_disputes", 0) > 2 and
features.get("user_chargeback_count_lifetime", 0) > 3):
risk_factors.append({
"factor": "multi_card_dispute_pattern",
"cards_with_disputes": features.get("user_distinct_cards_with_disputes"),
"weight": 0.25
})
abuse_score += 0.25

# Factor 6: Account age vs dispute velocity
account_age_days = features.get("user_days_since_first_txn", 365)
disputes_per_month = (features.get("user_chargeback_count_lifetime", 0) /
max(account_age_days / 30, 1))
if disputes_per_month > 0.5: # More than 1 dispute per 2 months
risk_factors.append({
"factor": "rapid_dispute_velocity",
"disputes_per_month": disputes_per_month,
"weight": 0.2
})
abuse_score += 0.2

# Factor 7: Previous dispute outcomes
dispute_loss_rate = features.get("user_dispute_loss_rate", 0)
if dispute_loss_rate > 0.7: # User loses >70% of disputes
# This actually REDUCES abuse score - they're not gaming successfully
abuse_score -= 0.1
elif dispute_loss_rate < 0.3 and features.get("user_chargeback_count_lifetime", 0) > 2:
# User wins most disputes - possible abuse
risk_factors.append({
"factor": "high_dispute_win_rate",
"win_rate": 1 - dispute_loss_rate,
"weight": 0.15
})
abuse_score += 0.15

return FriendlyFraudScore(
score=min(abuse_score, 1.0),
risk_tier=self._get_tier(abuse_score),
risk_factors=risk_factors,
recommended_action=self._get_action(abuse_score)
)

def _get_tier(self, score: float) -> str:
if score >= 0.7:
return "HIGH"
elif score >= 0.4:
return "MEDIUM"
elif score >= 0.2:
return "LOW"
return "MINIMAL"

def _get_action(self, score: float) -> str:
if score >= 0.7:
return "FRICTION" # Require 3DS, additional verification
elif score >= 0.5:
return "ENHANCED_EVIDENCE" # Capture extra evidence for representment
elif score >= 0.3:
return "MONITOR" # Flag for post-transaction review
return "ALLOW"

2.2 Behavioral Consistency Analysis

class BehavioralConsistencyAnalyzer:
"""Analyze if transaction is consistent with user's historical behavior."""

async def analyze(self, event: PaymentEvent, features: dict) -> ConsistencyResult:
anomalies = []
consistency_score = 1.0 # Start at fully consistent

# Anomaly 1: Transaction amount vs historical average
avg_amount = features.get("user_avg_ticket_90d", event.amount_usd)
std_amount = features.get("user_stddev_ticket_90d", avg_amount * 0.5)

if std_amount > 0:
amount_zscore = (float(event.amount_usd) - avg_amount) / std_amount
if abs(amount_zscore) > 3:
anomalies.append({
"type": "amount_anomaly",
"zscore": amount_zscore,
"expected_range": f"${avg_amount - 2*std_amount:.2f} - ${avg_amount + 2*std_amount:.2f}",
"actual": f"${event.amount_usd:.2f}"
})
consistency_score -= 0.2

# Anomaly 2: New service type for user
user_service_history = await self.redis.smembers(f"user:{event.user_id}:service_history")
if event.event_subtype not in user_service_history and len(user_service_history) > 5:
anomalies.append({
"type": "new_service_type",
"event_subtype": event.event_subtype,
"historical_services": list(user_service_history)[:10]
})
consistency_score -= 0.1

# Anomaly 3: New device for established user
if (features.get("user_days_since_first_txn", 0) > 90 and
features.get("device_age_hours", 0) < 24):
anomalies.append({
"type": "new_device_established_user",
"user_age_days": features.get("user_days_since_first_txn"),
"device_age_hours": features.get("device_age_hours")
})
consistency_score -= 0.15

# Anomaly 4: Unusual time of day
user_hour_histogram = await self._get_user_hour_distribution(event.user_id)
txn_hour = event.event_timestamp.hour
if user_hour_histogram and user_hour_histogram.get(txn_hour, 0) < 0.02:
anomalies.append({
"type": "unusual_time",
"hour": txn_hour,
"typical_hours": self._get_peak_hours(user_hour_histogram)
})
consistency_score -= 0.1

# Anomaly 5: Geographic shift
user_common_countries = await self.redis.zrange(
f"user:{event.user_id}:countries",
0, 2,
withscores=True
)
if user_common_countries:
common_country_codes = [c[0] for c in user_common_countries]
if event.billing_country not in common_country_codes:
anomalies.append({
"type": "unusual_country",
"country": event.billing_country,
"typical_countries": common_country_codes
})
consistency_score -= 0.15

return ConsistencyResult(
consistency_score=max(consistency_score, 0),
anomalies=anomalies,
is_consistent=len(anomalies) == 0
)

3. Combined Risk Scoring

3.1 ML Model Integration

class RiskScoringService:
"""Orchestrate all scoring components and ML models."""

def __init__(self):
self.criminal_detector = CardTestingDetector()
self.velocity_detector = VelocityAttackDetector()
self.geo_detector = GeoAnomalyDetector()
self.bot_detector = BotDetector()
self.friendly_scorer = FriendlyFraudScorer()
self.behavior_analyzer = BehavioralConsistencyAnalyzer()
self.model_client = ModelServiceClient()

async def score(self, event: PaymentEvent, features: dict) -> RiskAssessment:
# Run all detectors in parallel
results = await asyncio.gather(
self.criminal_detector.detect(event, features),
self.velocity_detector.evaluate(event, features),
self.geo_detector.detect(event, features),
self.bot_detector.detect(event, features),
self.friendly_scorer.score(event, features),
self.behavior_analyzer.analyze(event, features),
self.model_client.predict(features), # ML model inference
)

card_testing = results[0]
velocity = results[1]
geo = results[2]
bot = results[3]
friendly_fraud = results[4]
behavior = results[5]
ml_prediction = results[6]

# Combine scores
criminal_score = self._compute_criminal_score(
card_testing, velocity, geo, bot, ml_prediction
)

friendly_score = friendly_fraud.score

# Determine primary risk type
if criminal_score > friendly_score:
primary_risk = "CRIMINAL_FRAUD"
primary_score = criminal_score
else:
primary_risk = "FRIENDLY_FRAUD"
primary_score = friendly_score

# Compile all signals for policy engine
all_signals = {
"card_testing": card_testing.signals,
"velocity": velocity.rules_triggered,
"geo": geo.signals,
"bot": bot.signals,
"friendly_fraud": friendly_fraud.risk_factors,
"behavior": behavior.anomalies,
}

return RiskAssessment(
criminal_fraud_score=criminal_score,
friendly_fraud_score=friendly_score,
ml_score=ml_prediction.score,
ml_model_version=ml_prediction.model_version,
primary_risk_type=primary_risk,
primary_score=primary_score,
signals=all_signals,
feature_snapshot=features,
behavior_consistency=behavior.consistency_score,
recommended_action=self._get_recommended_action(
criminal_score, friendly_score, velocity.recommended_action
)
)

def _compute_criminal_score(
self,
card_testing: CardTestingResult,
velocity: VelocityResult,
geo: GeoAnomalyResult,
bot: BotDetectionResult,
ml: MLPrediction
) -> float:
"""Combine criminal fraud signals into single score."""

# Weighted combination
weights = {
"card_testing": 0.25,
"velocity": 0.15,
"geo": 0.15,
"bot": 0.15,
"ml": 0.30,
}

combined = (
card_testing.risk_score * weights["card_testing"] +
(0.5 if velocity.rules_triggered else 0) * weights["velocity"] +
geo.risk_score * weights["geo"] +
bot.bot_score * weights["bot"] +
ml.score * weights["ml"]
)

# Apply boosters for high-confidence signals
if card_testing.risk_score > 0.8:
combined = min(combined * 1.3, 1.0)

if bot.is_bot:
combined = min(combined * 1.2, 1.0)

return combined

def _get_recommended_action(
self,
criminal_score: float,
friendly_score: float,
velocity_action: str
) -> str:
"""Determine recommended action based on all scores."""

# Hard rules first
if velocity_action == "BLOCK":
return "BLOCK"

# Score-based decision
max_score = max(criminal_score, friendly_score)

if max_score >= 0.85:
return "BLOCK"
elif max_score >= 0.65:
return "FRICTION"
elif max_score >= 0.45:
return "REVIEW"
elif max_score >= 0.25:
return "MONITOR"
return "ALLOW"

4. Policy Engine Architecture

4.1 Policy Configuration Model

# policy_config.yaml
# Version-controlled, hot-reloadable policy configuration

version: "2025.01.15.001"
description: "Production fraud policy - January 2025"

# Global settings
global:
default_decision: "ALLOW"
safe_mode_decision: "ALLOW"
enable_experiments: true

# Blocklists (checked first, before any scoring)
blocklists:
card_tokens:
source: "redis://blocklist:card_tokens"
action: "BLOCK"
reason: "card_blocklisted"

device_fingerprints:
source: "redis://blocklist:devices"
action: "BLOCK"
reason: "device_blocklisted"

ip_addresses:
source: "redis://blocklist:ips"
action: "BLOCK"
reason: "ip_blocklisted"

user_ids:
source: "redis://blocklist:users"
action: "BLOCK"
reason: "user_blocklisted"

# Allowlists (skip scoring for trusted entities)
allowlists:
user_ids:
source: "redis://allowlist:users"
action: "ALLOW"
bypass_scoring: true

service_ids:
source: "redis://allowlist:services"
action: "ALLOW"
bypass_scoring: false # Still score, but don't block

# Velocity rules (hard limits)
velocity_rules:
- name: "card_attempts_10m"
condition: "features.card_attempts_10m > 3"
action: "FRICTION"
reason: "card_velocity_10m"

- name: "card_attempts_1h_block"
condition: "features.card_attempts_1h > 5"
action: "BLOCK"
reason: "card_velocity_1h"

- name: "device_distinct_cards"
condition: "features.device_distinct_cards_1h > 3"
action: "BLOCK"
reason: "device_card_testing"

- name: "ip_distinct_cards"
condition: "features.ip_distinct_cards_1h > 10"
action: "REVIEW"
reason: "ip_suspicious_activity"

# Score-based thresholds (profit-optimized)
score_thresholds:
criminal_fraud:
block: 0.85
friction: 0.60
review: 0.40

friendly_fraud:
friction: 0.70
review: 0.50
enhanced_evidence: 0.30

# Economic thresholds (override score thresholds for high-value transactions)
economic_rules:
- name: "high_value_extra_scrutiny"
condition: "event.amount_usd > 1000"
threshold_adjustment:
criminal_fraud_friction: -0.10 # Lower threshold by 10%
criminal_fraud_block: -0.05

- name: "low_value_relaxed"
condition: "event.amount_usd < 20"
threshold_adjustment:
criminal_fraud_friction: +0.15 # Higher threshold (more lenient)

# Service-specific overrides
service_rules:
- service_id: "service_high_risk_123"
overrides:
criminal_fraud_friction: 0.50 # More aggressive for this service
require_3ds: true

- service_type: "mobile"
overrides:
friendly_fraud_friction: 0.50 # Mobile services = higher friendly fraud risk

# Experiments (A/B testing)
experiments:
- name: "stricter_velocity_test"
enabled: true
traffic_percentage: 10
conditions:
card_attempts_10m_threshold: 2 # Stricter than default 3

- name: "relaxed_geo_test"
enabled: false
traffic_percentage: 5
conditions:
ignore_geo_mismatch: true

# 3DS / Friction rules
friction_rules:
- name: "3ds_for_new_cards"
condition: "features.card_days_since_first_seen < 7"
friction_type: "3DS"

- name: "3ds_for_high_value"
condition: "event.amount_usd > 500 AND scores.criminal_fraud > 0.40"
friction_type: "3DS"

- name: "mfa_for_new_device"
condition: "features.device_age_hours < 24 AND features.user_days_since_first_txn > 30"
friction_type: "MFA"

4.2 Policy Engine Implementation

class PolicyEngine:
"""Evaluate policy rules and make decisions."""

def __init__(self, config_path: str):
self.config = self._load_config(config_path)
self.opa_client = OPAClient()
self.experiment_router = ExperimentRouter()
self.audit_logger = AuditLogger()

async def evaluate(
self,
event: PaymentEvent,
features: dict,
scores: RiskAssessment
) -> PolicyDecision:

decision_trace = []
start_time = time.time()

# Step 1: Check blocklists
blocklist_result = await self._check_blocklists(event, features)
if blocklist_result.blocked:
return self._finalize_decision(
action="BLOCK",
reason=blocklist_result.reason,
trace=decision_trace,
event=event
)

# Step 2: Check allowlists
allowlist_result = await self._check_allowlists(event, features)
if allowlist_result.allowed and allowlist_result.bypass_scoring:
return self._finalize_decision(
action="ALLOW",
reason="allowlisted",
trace=decision_trace,
event=event
)

# Step 3: Apply velocity rules
velocity_decision = await self._apply_velocity_rules(event, features)
decision_trace.append({"step": "velocity", "result": velocity_decision})

if velocity_decision.action == "BLOCK":
return self._finalize_decision(
action="BLOCK",
reason=velocity_decision.reason,
trace=decision_trace,
event=event
)

# Step 4: Determine experiment bucket
experiment = self.experiment_router.get_experiment(event)
if experiment:
decision_trace.append({"step": "experiment", "name": experiment.name})

# Step 5: Get effective thresholds (with economic and service adjustments)
thresholds = self._get_effective_thresholds(event, features, experiment)
decision_trace.append({"step": "thresholds", "values": thresholds})

# Step 6: Apply score-based decision
score_decision = self._apply_score_thresholds(scores, thresholds)
decision_trace.append({"step": "score_decision", "result": score_decision})

# Step 7: Apply friction rules if needed
if score_decision.action in ["FRICTION", "REVIEW"]:
friction = await self._determine_friction_type(event, features, scores)
score_decision.friction_type = friction

# Step 8: Finalize and log
return self._finalize_decision(
action=score_decision.action,
reason=score_decision.reason,
friction_type=score_decision.friction_type,
trace=decision_trace,
event=event,
scores=scores,
thresholds=thresholds,
experiment=experiment
)

async def _check_blocklists(self, event: PaymentEvent, features: dict) -> BlocklistResult:
"""Check all blocklists."""

checks = [
("card_tokens", event.card_token),
("device_fingerprints", event.device_fingerprint),
("ip_addresses", features.get("ip_hash")),
("user_ids", event.user_id),
]

for list_name, value in checks:
if value and await self.redis.sismember(f"blocklist:{list_name}", value):
return BlocklistResult(
blocked=True,
reason=f"{list_name}_blocklisted",
list_name=list_name,
matched_value=value[:20] + "..." # Truncate for logging
)

return BlocklistResult(blocked=False)

def _get_effective_thresholds(
self,
event: PaymentEvent,
features: dict,
experiment: Optional[Experiment]
) -> dict:
"""Calculate effective thresholds with all adjustments."""

# Start with base thresholds
thresholds = copy.deepcopy(self.config["score_thresholds"])

# Apply economic rules
for rule in self.config["economic_rules"]:
if self._evaluate_condition(rule["condition"], event, features):
for key, adjustment in rule["threshold_adjustment"].items():
category, level = key.rsplit("_", 1)
if category in thresholds and level in thresholds[category]:
thresholds[category][level] += adjustment

# Apply service-specific overrides
for rule in self.config["service_rules"]:
if (rule.get("service_id") == event.service_id or
rule.get("service_type") == event.service_type):
for key, value in rule["overrides"].items():
category, level = key.rsplit("_", 1)
if category in thresholds:
thresholds[category][level] = value

# Apply experiment adjustments
if experiment:
thresholds = experiment.apply_adjustments(thresholds)

return thresholds

def _apply_score_thresholds(
self,
scores: RiskAssessment,
thresholds: dict
) -> ScoreDecision:
"""Apply thresholds to scores."""

criminal = scores.criminal_fraud_score
friendly = scores.friendly_fraud_score

criminal_thresholds = thresholds["criminal_fraud"]
friendly_thresholds = thresholds["friendly_fraud"]

# Criminal fraud takes precedence
if criminal >= criminal_thresholds["block"]:
return ScoreDecision(action="BLOCK", reason="criminal_fraud_score")

if criminal >= criminal_thresholds["friction"]:
return ScoreDecision(action="FRICTION", reason="criminal_fraud_score")

if criminal >= criminal_thresholds["review"]:
return ScoreDecision(action="REVIEW", reason="criminal_fraud_score")

# Check friendly fraud
if friendly >= friendly_thresholds["friction"]:
return ScoreDecision(action="FRICTION", reason="friendly_fraud_score")

if friendly >= friendly_thresholds["review"]:
return ScoreDecision(action="REVIEW", reason="friendly_fraud_score")

return ScoreDecision(action="ALLOW", reason="below_thresholds")

def _finalize_decision(
self,
action: str,
reason: str,
trace: list,
event: PaymentEvent,
**kwargs
) -> PolicyDecision:
"""Create final decision and log for audit."""

decision = PolicyDecision(
action=action,
reason=reason,
decision_id=str(uuid.uuid4()),
timestamp=datetime.utcnow(),
event_id=event.event_id,
auth_id=event.auth_id,
trace=trace,
scores=kwargs.get("scores"),
thresholds=kwargs.get("thresholds"),
experiment=kwargs.get("experiment"),
friction_type=kwargs.get("friction_type"),
policy_version=self.config["version"]
)

# Async audit logging
asyncio.create_task(self.audit_logger.log_decision(decision))

return decision

4.3 OPA Rego Policy (for Complex Rules)

# fraud_policy.rego
# Complex rules evaluated via OPA

package fraud

default allow = false
default friction = false
default review = false
default block = false

# Import input
import input.event
import input.features
import input.scores

# Blocklist check
block {
input.blocklist_match == true
}

# Velocity rules
block {
features.card_attempts_1h > 5
}

friction {
features.card_attempts_10m > 3
}

friction {
features.device_distinct_cards_1h > 3
}

# Score-based rules
block {
scores.criminal_fraud_score >= input.thresholds.criminal_fraud.block
}

friction {
scores.criminal_fraud_score >= input.thresholds.criminal_fraud.friction
scores.criminal_fraud_score < input.thresholds.criminal_fraud.block
}

review {
scores.criminal_fraud_score >= input.thresholds.criminal_fraud.review
scores.criminal_fraud_score < input.thresholds.criminal_fraud.friction
}

# Friendly fraud rules
friction {
scores.friendly_fraud_score >= input.thresholds.friendly_fraud.friction
}

# Economic rules - high value transactions
friction {
event.amount_usd > 1000
scores.criminal_fraud_score >= 0.40
}

# Geo rules
review {
features.ip_is_vpn == true
features.ip_billing_distance_km > 500
}

# New device for established user
friction {
features.device_age_hours < 24
features.user_days_since_first_txn > 90
event.amount_usd > 200
}

# Final decision
decision = "BLOCK" {
block
}

decision = "FRICTION" {
not block
friction
}

decision = "REVIEW" {
not block
not friction
review
}

decision = "ALLOW" {
not block
not friction
not review
}

5. Profit-Based Threshold Optimization

5.1 Cost Function Implementation

class EconomicOptimizer:
"""Optimize thresholds based on expected profit/loss."""

# Cost components (configurable per service/business)
COST_PARAMS = {
"chargeback_fee_usd": 25.00,
"chargeback_penalty_usd": 15.00, # Network penalty
"operational_cost_per_review_usd": 5.00,
"friction_abandonment_rate": 0.15, # 15% abandon when friction applied
"customer_lifetime_value_usd": 500.00,
"false_positive_churn_rate": 0.05, # 5% churn when falsely blocked
}

def compute_expected_loss(
self,
transaction_amount: float,
fraud_probability: float,
decision: str
) -> float:
"""
Compute expected financial loss for a decision.

Expected Loss = P(fraud) × (amount + fees + penalties + ops_cost)
+ P(not fraud) × friction_cost
"""

if decision == "BLOCK":
if fraud_probability > 0.5:
# Correctly blocked fraud - saved money
return 0
else:
# Falsely blocked legitimate transaction
false_positive_cost = (
transaction_amount + # Lost sale
self.COST_PARAMS["customer_lifetime_value_usd"] *
self.COST_PARAMS["false_positive_churn_rate"]
)
return false_positive_cost * (1 - fraud_probability)

elif decision == "FRICTION":
# Cost of friction = abandonment
friction_cost = (
transaction_amount *
self.COST_PARAMS["friction_abandonment_rate"] *
(1 - fraud_probability) # Only legitimate customers abandon
)

# Still some fraud gets through
fraud_cost = (
fraud_probability * 0.3 * # Assume 70% of fraud stopped by friction
(transaction_amount +
self.COST_PARAMS["chargeback_fee_usd"] +
self.COST_PARAMS["chargeback_penalty_usd"])
)

return friction_cost + fraud_cost

elif decision == "REVIEW":
review_cost = self.COST_PARAMS["operational_cost_per_review_usd"]

# Delayed fraud still has some cost
fraud_cost = (
fraud_probability * 0.5 * # Assume 50% caught in review
(transaction_amount +
self.COST_PARAMS["chargeback_fee_usd"] +
self.COST_PARAMS["chargeback_penalty_usd"])
)

return review_cost + fraud_cost

else: # ALLOW
# Full fraud exposure
fraud_cost = (
fraud_probability *
(transaction_amount +
self.COST_PARAMS["chargeback_fee_usd"] +
self.COST_PARAMS["chargeback_penalty_usd"])
)

return fraud_cost

def find_optimal_threshold(
self,
historical_data: pd.DataFrame,
score_column: str = "criminal_fraud_score"
) -> float:
"""
Find optimal threshold that minimizes total expected loss.
Uses historical data with known outcomes.
"""

thresholds = np.arange(0.1, 0.95, 0.05)
results = []

for threshold in thresholds:
total_loss = 0
total_revenue = 0

for _, row in historical_data.iterrows():
score = row[score_column]
amount = row["amount_usd"]
is_fraud = row["is_fraud"]

# Simulate decision at this threshold
if score >= threshold:
decision = "BLOCK"
elif score >= threshold - 0.20:
decision = "FRICTION"
else:
decision = "ALLOW"

# Compute loss
loss = self.compute_expected_loss(amount, score, decision)
total_loss += loss

# Compute revenue (only for allowed/friction transactions)
if decision != "BLOCK":
if decision == "FRICTION":
# Reduced by abandonment
revenue = amount * (1 - self.COST_PARAMS["friction_abandonment_rate"])
else:
revenue = amount
total_revenue += revenue

net_profit = total_revenue - total_loss

results.append({
"threshold": threshold,
"total_loss": total_loss,
"total_revenue": total_revenue,
"net_profit": net_profit,
"approval_rate": (historical_data[score_column] < threshold).mean(),
"fraud_caught_rate": (
historical_data[
(historical_data[score_column] >= threshold) &
(historical_data["is_fraud"] == True)
].shape[0] /
historical_data[historical_data["is_fraud"] == True].shape[0]
)
})

results_df = pd.DataFrame(results)

# Find threshold with maximum net profit
optimal_idx = results_df["net_profit"].idxmax()
optimal_threshold = results_df.loc[optimal_idx, "threshold"]

return optimal_threshold, results_df

5.2 Threshold Recommendation Dashboard

class ThresholdRecommendationService:
"""Generate threshold recommendations for business users."""

async def generate_recommendations(self) -> ThresholdRecommendations:
"""Generate weekly threshold recommendations."""

# Load historical data (last 90 days with mature labels)
historical = await self.load_historical_transactions(
start_date=datetime.utcnow() - timedelta(days=90),
end_date=datetime.utcnow() - timedelta(days=30), # 30-day label maturity
require_outcome=True
)

# Run optimization
optimizer = EconomicOptimizer()
optimal_criminal, criminal_curve = optimizer.find_optimal_threshold(
historical, "criminal_fraud_score"
)
optimal_friendly, friendly_curve = optimizer.find_optimal_threshold(
historical, "friendly_fraud_score"
)

# Generate trade-off curves for dashboard
approval_vs_loss = self._compute_tradeoff_curve(historical, criminal_curve)

# Current vs recommended comparison
current_thresholds = await self.get_current_thresholds()

recommendations = ThresholdRecommendations(
generated_at=datetime.utcnow(),
data_window="2025-10-01 to 2025-11-30",
transaction_count=len(historical),
fraud_rate=historical["is_fraud"].mean(),

current_thresholds=current_thresholds,

recommended_thresholds={
"criminal_fraud_block": optimal_criminal,
"criminal_fraud_friction": optimal_criminal - 0.15,
"criminal_fraud_review": optimal_criminal - 0.30,
"friendly_fraud_friction": optimal_friendly,
},

expected_impact={
"approval_rate_change": "+1.2%",
"fraud_loss_change": "-$12,500/month",
"false_positive_reduction": "-340 customers/month",
"net_revenue_impact": "+$45,000/month"
},

tradeoff_curve=approval_vs_loss,

confidence_interval={
"criminal_fraud_block": (optimal_criminal - 0.03, optimal_criminal + 0.03)
}
)

return recommendations

def _compute_tradeoff_curve(
self,
data: pd.DataFrame,
optimization_results: pd.DataFrame
) -> list:
"""Generate approval rate vs fraud loss curve for visualization."""

curve_points = []
for _, row in optimization_results.iterrows():
curve_points.append({
"threshold": row["threshold"],
"approval_rate_pct": row["approval_rate"] * 100,
"fraud_caught_pct": row["fraud_caught_rate"] * 100,
"expected_loss_usd": row["total_loss"],
"net_profit_usd": row["net_profit"]
})

return curve_points

6. Champion/Challenger Framework

6.1 Experiment Configuration

class ExperimentRouter:
"""Route transactions to experiments (champion/challenger)."""

def __init__(self):
self.experiments = self._load_experiments()

def _load_experiments(self) -> List[Experiment]:
"""Load active experiments from config."""
config = load_config("experiments.yaml")

experiments = []
for exp_config in config["experiments"]:
if exp_config["enabled"]:
experiments.append(Experiment(
name=exp_config["name"],
traffic_percentage=exp_config["traffic_percentage"],
conditions=exp_config.get("conditions", {}),
model_version=exp_config.get("model_version"),
threshold_overrides=exp_config.get("threshold_overrides", {})
))

return experiments

def get_experiment(self, event: PaymentEvent) -> Optional[Experiment]:
"""Determine which experiment (if any) applies to this transaction."""

# Deterministic assignment based on auth_id hash
hash_value = int(hashlib.md5(event.auth_id.encode()).hexdigest(), 16) % 100

cumulative_percentage = 0
for experiment in self.experiments:
cumulative_percentage += experiment.traffic_percentage

if hash_value < cumulative_percentage:
return experiment

# No experiment - use champion (default)
return None


@dataclass
class Experiment:
name: str
traffic_percentage: int
conditions: dict
model_version: Optional[str] = None
threshold_overrides: dict = field(default_factory=dict)

def apply_adjustments(self, thresholds: dict) -> dict:
"""Apply experiment-specific threshold adjustments."""
adjusted = copy.deepcopy(thresholds)

for key, value in self.threshold_overrides.items():
parts = key.split(".")
obj = adjusted
for part in parts[:-1]:
obj = obj[part]
obj[parts[-1]] = value

return adjusted

6.2 Experiment Metrics Collection

class ExperimentMetricsCollector:
"""Collect and compare metrics across experiments."""

METRICS = [
"approval_rate",
"friction_rate",
"block_rate",
"review_rate",
"fraud_rate_30d", # Delayed metric
"chargeback_rate_90d", # Very delayed
"false_positive_rate",
"avg_decision_latency_ms",
"revenue_per_transaction",
]

async def collect_daily_metrics(self, date: datetime.date) -> dict:
"""Collect daily metrics for all experiments."""

results = {}

# Get all decisions from this date
decisions = await self.db.query("""
SELECT
experiment_name,
action,
criminal_fraud_score,
friendly_fraud_score,
amount_usd,
decision_latency_ms
FROM decisions
WHERE DATE(timestamp) = :date
""", date=date)

# Group by experiment
by_experiment = decisions.groupby("experiment_name")

for exp_name, group in by_experiment:
total = len(group)

results[exp_name] = {
"date": date.isoformat(),
"transaction_count": total,
"approval_rate": (group["action"] == "ALLOW").sum() / total,
"friction_rate": (group["action"] == "FRICTION").sum() / total,
"block_rate": (group["action"] == "BLOCK").sum() / total,
"review_rate": (group["action"] == "REVIEW").sum() / total,
"avg_score_criminal": group["criminal_fraud_score"].mean(),
"avg_score_friendly": group["friendly_fraud_score"].mean(),
"avg_amount_usd": group["amount_usd"].mean(),
"avg_latency_ms": group["decision_latency_ms"].mean(),
"p99_latency_ms": group["decision_latency_ms"].quantile(0.99),
}

return results

async def compute_experiment_comparison(
self,
experiment_name: str,
start_date: datetime.date,
end_date: datetime.date
) -> ExperimentComparison:
"""Compare experiment to champion over date range."""

champion_metrics = await self._aggregate_metrics("champion", start_date, end_date)
experiment_metrics = await self._aggregate_metrics(experiment_name, start_date, end_date)

# Statistical significance testing
significance = await self._compute_significance(
champion_metrics, experiment_metrics
)

return ExperimentComparison(
experiment_name=experiment_name,
champion_metrics=champion_metrics,
experiment_metrics=experiment_metrics,
differences={
metric: experiment_metrics[metric] - champion_metrics[metric]
for metric in self.METRICS
if metric in champion_metrics and metric in experiment_metrics
},
statistical_significance=significance,
recommendation=self._get_recommendation(significance, experiment_metrics)
)

def _get_recommendation(
self,
significance: dict,
experiment_metrics: dict
) -> str:
"""Generate promotion/rollback recommendation."""

# Check for regressions
if experiment_metrics.get("fraud_rate_30d", 0) > significance.get("champion_fraud_rate", 0) * 1.1:
return "ROLLBACK - Fraud rate increased >10%"

if experiment_metrics.get("approval_rate", 0) < significance.get("champion_approval_rate", 0) * 0.97:
return "ROLLBACK - Approval rate decreased >3%"

# Check for improvements
if (experiment_metrics.get("fraud_rate_30d", 0) < significance.get("champion_fraud_rate", 0) * 0.9 and
experiment_metrics.get("approval_rate", 0) >= significance.get("champion_approval_rate", 0)):
return "PROMOTE - Fraud reduced with no approval rate impact"

return "CONTINUE - Insufficient data or inconclusive"

Next: Part 4: Evidence Pipeline, Disputes & Economic Optimization