Skip to main content

Part 4: Evidence Pipeline, Disputes & Economic Optimization

Payment & Chargeback Fraud Platform - Principal-Level Design Document

Phase 1: Real-Time Streaming & Decisioning


1. Evidence Vault Architecture

1.1 Evidence Schema

@dataclass
class TransactionEvidence:
"""Immutable evidence record captured at transaction time."""

# Primary identifiers (immutable links)
evidence_id: str # UUID - primary key
auth_id: str # Link to transaction
event_id: str # Link to payment event
service_id: str # Telco service identifier
user_id: Optional[str]

# Capture metadata
captured_at: datetime # When evidence was captured
evidence_version: str # Schema version
capture_source: str # "real_time", "batch_enrichment"

# ===== TRANSACTION CONTEXT =====
transaction: TransactionContext

# ===== DEVICE & NETWORK EVIDENCE =====
device: DeviceEvidence
network: NetworkEvidence

# ===== VERIFICATION RESULTS =====
verification: VerificationEvidence

# ===== FRAUD SIGNALS AT DECISION TIME =====
risk_assessment: RiskAssessmentSnapshot

# ===== POLICY DECISION =====
decision: DecisionSnapshot

# ===== CUSTOMER PROFILE (at transaction time) =====
customer_profile: CustomerProfileSnapshot

# ===== INTEGRITY =====
content_hash: str # SHA256 of all evidence fields
signature: str # HMAC signature for tamper detection


@dataclass
class TransactionContext:
amount: Decimal
currency: str
amount_usd: Decimal
transaction_type: str # sim_activation, topup, device_upgrade, sim_swap, international_enable
is_recurring: bool
recurring_sequence: Optional[int]
service_name: str # Telco service name
event_subtype: str # Service-specific event type
service_country: str
descriptor: str # Statement descriptor
order_id: Optional[str]
product_description: Optional[str]
service_type: Optional[str] # mobile, broadband


@dataclass
class DeviceEvidence:
device_fingerprint: str
device_fingerprint_raw: dict # Full fingerprint payload
device_id: Optional[str] # Native app device ID
device_type: str # mobile, desktop, tablet
device_model: Optional[str]
os_type: str
os_version: str
browser_type: Optional[str]
browser_version: Optional[str]
screen_resolution: Optional[str]
timezone: Optional[str]
language: Optional[str]
is_emulator: bool
is_rooted: bool
app_version: Optional[str] # For mobile apps


@dataclass
class NetworkEvidence:
ip_address: str # Raw IP (stored encrypted)
ip_hash: str # Hashed for comparison
ip_geo_country: str
ip_geo_region: str
ip_geo_city: str
ip_geo_lat: float
ip_geo_lon: float
ip_geo_accuracy_km: float
isp_name: str
asn: str
connection_type: str # cable, mobile, corporate
is_proxy: bool
is_vpn: bool
is_tor: bool
is_datacenter: bool
proxy_type: Optional[str]


@dataclass
class VerificationEvidence:
# AVS (Address Verification)
avs_request_sent: bool
avs_result_code: str # Y, N, A, Z, etc.
avs_result_description: str

# CVV
cvv_request_sent: bool
cvv_result_code: str # M, N, P, U
cvv_result_description: str

# 3D Secure
three_ds_attempted: bool
three_ds_version: Optional[str]
three_ds_flow: Optional[str] # frictionless, challenge
three_ds_result: Optional[str] # Y, N, A, U, R
three_ds_eci: Optional[str]
three_ds_cavv: Optional[str]
three_ds_liability_shift: bool
three_ds_challenge_completed: Optional[bool]

# Billing address
billing_address_line1: str # Stored encrypted
billing_address_city: str
billing_address_state: str
billing_address_postal: str
billing_address_country: str

# Shipping address (if different)
shipping_address_provided: bool
shipping_matches_billing: bool
shipping_address_country: Optional[str]


@dataclass
class RiskAssessmentSnapshot:
"""Fraud scores and signals at decision time."""
criminal_fraud_score: float
friendly_fraud_score: float
ml_model_version: str
ml_model_prediction_raw: dict

# Key velocity features
card_attempts_10m: int
card_attempts_1h: int
card_attempts_24h: int
device_distinct_cards_1h: int
device_distinct_cards_24h: int
ip_distinct_cards_1h: int

# Risk signals triggered
signals_triggered: List[str]
velocity_rules_triggered: List[str]

# Behavioral
behavior_consistency_score: float
is_amount_anomaly: bool
amount_zscore: Optional[float]


@dataclass
class DecisionSnapshot:
decision_id: str
action: str # ALLOW, FRICTION, REVIEW, BLOCK
reason: str
friction_type: Optional[str] # 3DS, MFA, none
policy_version: str
experiment_name: Optional[str]
thresholds_used: dict
decision_latency_ms: int
decision_trace: List[dict] # Full audit trail


@dataclass
class CustomerProfileSnapshot:
"""Customer state at transaction time."""
account_age_days: int
total_transactions_lifetime: int
total_amount_lifetime_usd: Decimal
chargeback_count_lifetime: int
chargeback_rate_90d: float
refund_count_90d: int
refund_rate_90d: float
distinct_cards_90d: int
distinct_devices_90d: int
risk_tier: str # low, medium, high
is_verified_email: bool
is_verified_phone: bool
last_chargeback_date: Optional[datetime]

1.2 Evidence Capture Service

class EvidenceCaptureService:
"""Capture and store transaction evidence."""

def __init__(self):
self.db = PostgresClient()
self.s3 = S3Client()
self.encryption = EncryptionService()

async def capture(
self,
event: PaymentEvent,
features: dict,
risk_assessment: RiskAssessment,
decision: PolicyDecision
) -> str:
"""Capture evidence immediately after decision."""

evidence = TransactionEvidence(
evidence_id=str(uuid.uuid4()),
auth_id=event.auth_id,
event_id=event.event_id,
service_id=event.service_id,
user_id=event.user_id,
captured_at=datetime.utcnow(),
evidence_version="1.0",
capture_source="real_time",

transaction=self._build_transaction_context(event),
device=self._build_device_evidence(event, features),
network=self._build_network_evidence(event, features),
verification=self._build_verification_evidence(event),
risk_assessment=self._build_risk_snapshot(features, risk_assessment),
decision=self._build_decision_snapshot(decision),
customer_profile=self._build_customer_snapshot(features),

content_hash="", # Computed below
signature="" # Computed below
)

# Compute content hash for integrity
evidence.content_hash = self._compute_hash(evidence)
evidence.signature = self._sign_evidence(evidence)

# Store in PostgreSQL (structured data)
await self._store_structured(evidence)

# Store raw payloads in S3 (device fingerprint, 3DS data, etc.)
await self._store_blobs(evidence, event)

return evidence.evidence_id

def _compute_hash(self, evidence: TransactionEvidence) -> str:
"""Compute SHA256 hash of evidence content."""
# Serialize all fields except hash and signature
content = json.dumps(asdict(evidence), default=str, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()

def _sign_evidence(self, evidence: TransactionEvidence) -> str:
"""HMAC signature for tamper detection."""
key = self.encryption.get_signing_key()
message = f"{evidence.evidence_id}:{evidence.content_hash}"
return hmac.new(key, message.encode(), hashlib.sha256).hexdigest()

async def _store_structured(self, evidence: TransactionEvidence):
"""Store in PostgreSQL with immutable constraints."""

# Main evidence table (append-only)
await self.db.execute("""
INSERT INTO evidence_vault (
evidence_id,
auth_id,
event_id,
service_id,
user_id,
captured_at,
evidence_version,

-- Transaction context
amount_usd,
currency,
service_name,
event_subtype,

-- Verification results (non-PII)
avs_result_code,
cvv_result_code,
three_ds_result,
three_ds_liability_shift,

-- Risk assessment
criminal_fraud_score,
friendly_fraud_score,
ml_model_version,
signals_triggered,

-- Decision
decision_action,
decision_reason,
policy_version,

-- Customer profile
account_age_days,
chargeback_count_lifetime,
chargeback_rate_90d,

-- Integrity
content_hash,
signature
) VALUES ($1, $2, $3, ...)
""", evidence.to_db_tuple())

# PII stored separately with encryption
await self.db.execute("""
INSERT INTO evidence_pii (
evidence_id,
ip_address_encrypted,
billing_address_encrypted,
device_fingerprint_raw_encrypted
) VALUES ($1, $2, $3, $4)
""",
evidence.evidence_id,
self.encryption.encrypt(evidence.network.ip_address),
self.encryption.encrypt(json.dumps(asdict(evidence.verification))),
self.encryption.encrypt(json.dumps(evidence.device.device_fingerprint_raw))
)

async def _store_blobs(self, evidence: TransactionEvidence, event: PaymentEvent):
"""Store large payloads in S3."""

# Device fingerprint full payload
await self.s3.put_object(
bucket="evidence-vault",
key=f"device/{evidence.evidence_id}/fingerprint.json",
body=json.dumps(evidence.device.device_fingerprint_raw),
metadata={
"auth_id": evidence.auth_id,
"captured_at": evidence.captured_at.isoformat()
}
)

# 3DS payload if available
if event.three_ds_result:
await self.s3.put_object(
bucket="evidence-vault",
key=f"3ds/{evidence.evidence_id}/authentication.json",
body=json.dumps(asdict(event.three_ds_result))
)

1.3 Evidence Immutability Enforcement

-- PostgreSQL table with immutability constraints

CREATE TABLE evidence_vault (
evidence_id UUID PRIMARY KEY,
auth_id VARCHAR(64) NOT NULL,
event_id VARCHAR(64) NOT NULL,
service_id VARCHAR(64) NOT NULL,
user_id VARCHAR(64),
captured_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
evidence_version VARCHAR(10) NOT NULL,

-- Transaction context
amount_usd DECIMAL(12, 2) NOT NULL,
currency VARCHAR(3) NOT NULL,
service_name VARCHAR(255),
event_subtype VARCHAR(64),

-- ... other fields ...

-- Integrity
content_hash VARCHAR(64) NOT NULL,
signature VARCHAR(64) NOT NULL,

-- Audit
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Prevent updates and deletes
CREATE OR REPLACE FUNCTION prevent_evidence_modification()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Evidence records are immutable and cannot be modified or deleted';
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER evidence_immutability_update
BEFORE UPDATE ON evidence_vault
FOR EACH ROW
EXECUTE FUNCTION prevent_evidence_modification();

CREATE TRIGGER evidence_immutability_delete
BEFORE DELETE ON evidence_vault
FOR EACH ROW
EXECUTE FUNCTION prevent_evidence_modification();

-- Indexes for dispute linking
CREATE INDEX idx_evidence_auth_id ON evidence_vault(auth_id);
CREATE INDEX idx_evidence_service_id ON evidence_vault(service_id);
CREATE INDEX idx_evidence_captured_at ON evidence_vault(captured_at);

-- Retention policy (7 years)
-- Implemented via pg_partman or manual partition management
CREATE TABLE evidence_vault_y2025m01 PARTITION OF evidence_vault
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

2. Dispute Pipeline

2.1 Chargeback Ingestion & Linking

class ChargebackIngestionService:
"""Ingest and process chargebacks from PSPs and networks."""

async def process_chargeback(self, raw_chargeback: dict, source: str) -> Chargeback:
"""Process incoming chargeback event."""

# 1. Normalize from source format
chargeback = self._normalize(raw_chargeback, source)

# 2. Link to original transaction
auth_id = await self._link_to_transaction(chargeback)
if not auth_id:
await self._store_unlinked(chargeback)
raise ChargebackLinkingError(f"Could not link chargeback {chargeback.chargeback_id}")

chargeback.auth_id = auth_id

# 3. Retrieve evidence
evidence = await self.evidence_service.get_by_auth_id(auth_id)
chargeback.evidence_id = evidence.evidence_id if evidence else None

# 4. Classify chargeback type
chargeback.label_category = self._classify_chargeback(chargeback, evidence)

# 5. Store chargeback
await self._store_chargeback(chargeback)

# 6. Update entity risk profiles
await self._update_risk_profiles(chargeback)

# 7. Trigger representment workflow if applicable
if self._should_represent(chargeback, evidence):
await self.representment_service.create_case(chargeback, evidence)

# 8. Publish for downstream processing
await self.kafka.produce("chargebacks", chargeback.to_json())

return chargeback

async def _link_to_transaction(self, chargeback: Chargeback) -> Optional[str]:
"""Link chargeback to original authorization."""

# Method 1: Direct reference (PSP provides auth_id)
if chargeback.original_reference:
auth = await self.db.query_one(
"SELECT auth_id FROM transactions WHERE psp_reference = $1",
chargeback.original_reference
)
if auth:
return auth["auth_id"]

# Method 2: ARN lookup
if chargeback.arn:
auth = await self.db.query_one(
"SELECT auth_id FROM transactions WHERE arn = $1",
chargeback.arn
)
if auth:
return auth["auth_id"]

# Method 3: Fuzzy match
candidates = await self.db.query("""
SELECT auth_id, amount_usd, transaction_date, card_token
FROM transactions
WHERE card_token = $1
AND amount_usd BETWEEN $2 * 0.99 AND $2 * 1.01
AND transaction_date BETWEEN $3 - INTERVAL '7 days' AND $3 + INTERVAL '1 day'
ORDER BY ABS(EXTRACT(EPOCH FROM (transaction_date - $3)))
LIMIT 5
""",
chargeback.card_token,
chargeback.amount,
chargeback.original_transaction_date
)

if len(candidates) == 1:
return candidates[0]["auth_id"]

if len(candidates) > 1:
# Multiple matches - needs manual review
await self._create_manual_linking_task(chargeback, candidates)

return None

def _classify_chargeback(
self,
chargeback: Chargeback,
evidence: Optional[TransactionEvidence]
) -> str:
"""Classify chargeback into fraud taxonomy."""

# Check for issuer-confirmed fraud (TC40/SAFE)
issuer_alert = await self.db.query_one(
"SELECT * FROM issuer_alerts WHERE auth_id = $1",
chargeback.auth_id
)

if issuer_alert:
return "CRIMINAL_FRAUD"

# Map reason code
base_label = REASON_CODE_MAP.get(chargeback.reason_code, "UNKNOWN")

# Apply overrides based on evidence
if evidence and base_label == "FRIENDLY_FRAUD":
# Check if delivery was confirmed
delivery = await self.delivery_service.get_status(chargeback.auth_id)
if delivery and delivery.status == "DELIVERED" and delivery.signature:
return "FRIENDLY_FRAUD" # Confirmed abuse

if not delivery or delivery.status == "NOT_DELIVERED":
return "SERVICE_ERROR" # Legitimate complaint

return base_label

async def _update_risk_profiles(self, chargeback: Chargeback):
"""Update entity risk scores based on chargeback."""

# Update user profile
if chargeback.user_id:
await self.redis.hincrby(
f"entity:user:{chargeback.user_id}",
"chargeback_count_lifetime",
1
)
await self.redis.hset(
f"entity:user:{chargeback.user_id}",
"last_chargeback_at",
chargeback.initiated_date.isoformat()
)

# Update card profile
await self.redis.hincrby(
f"entity:card:{chargeback.card_token}",
"chargeback_count",
1
)

# If criminal fraud, add to blocklists
if chargeback.label_category == "CRIMINAL_FRAUD":
await self.redis.sadd("blocklist:card_tokens", chargeback.card_token)

# Also block associated device if known
if chargeback.evidence_id:
evidence = await self.evidence_service.get(chargeback.evidence_id)
if evidence:
await self.redis.sadd(
"blocklist:devices",
evidence.device.device_fingerprint
)

2.2 Representment Automation

class RepresentmentService:
"""Automate chargeback representment."""

# Evidence types and their win rate impact
EVIDENCE_EFFECTIVENESS = {
"3ds_authentication": {"weight": 0.35, "win_rate_lift": 0.45},
"delivery_proof": {"weight": 0.30, "win_rate_lift": 0.40},
"avs_match": {"weight": 0.15, "win_rate_lift": 0.15},
"cvv_match": {"weight": 0.10, "win_rate_lift": 0.10},
"device_fingerprint": {"weight": 0.05, "win_rate_lift": 0.08},
"customer_communication": {"weight": 0.05, "win_rate_lift": 0.12},
}

async def create_case(
self,
chargeback: Chargeback,
evidence: TransactionEvidence
) -> RepresentmentCase:
"""Create representment case with evidence package."""

# Calculate win probability
win_probability = await self._calculate_win_probability(chargeback, evidence)

# Determine if worth fighting
expected_recovery = win_probability * chargeback.amount
representment_cost = 25.00 # Estimated cost to represent

should_represent = expected_recovery > representment_cost * 1.5

case = RepresentmentCase(
case_id=str(uuid.uuid4()),
chargeback_id=chargeback.chargeback_id,
auth_id=chargeback.auth_id,
evidence_id=evidence.evidence_id,
created_at=datetime.utcnow(),
deadline=chargeback.response_deadline,
status="PENDING" if should_represent else "SKIPPED",
win_probability=win_probability,
expected_recovery=expected_recovery,
chargeback_amount=chargeback.amount,
reason_code=chargeback.reason_code,
label_category=chargeback.label_category,
recommendation="REPRESENT" if should_represent else "ACCEPT"
)

if should_represent:
# Build evidence package
case.evidence_package = await self._build_evidence_package(
chargeback, evidence
)

# Auto-submit if confidence is high enough
if win_probability > 0.75:
await self._submit_representment(case)
case.status = "SUBMITTED"
else:
# Queue for manual review
case.status = "PENDING_REVIEW"

await self._store_case(case)
return case

async def _calculate_win_probability(
self,
chargeback: Chargeback,
evidence: TransactionEvidence
) -> float:
"""Calculate probability of winning dispute."""

base_probability = 0.20 # Base win rate without evidence

# Add evidence contributions
for evidence_type, config in self.EVIDENCE_EFFECTIVENESS.items():
if self._has_evidence(evidence, evidence_type):
base_probability += config["win_rate_lift"] * config["weight"]

# Adjust based on reason code
reason_code_modifiers = {
"10.4": -0.20, # Card-not-present fraud - hard to win
"13.1": +0.10, # Item not received - winnable with proof
"13.3": -0.05, # Not as described - subjective
}
base_probability += reason_code_modifiers.get(chargeback.reason_code, 0)

# Adjust based on historical win rate for this service
service_win_rate = await self._get_service_win_rate(chargeback.service_id)
if service_win_rate:
base_probability = (base_probability + service_win_rate) / 2

return min(max(base_probability, 0.05), 0.95)

def _has_evidence(self, evidence: TransactionEvidence, evidence_type: str) -> bool:
"""Check if specific evidence type is present."""

mapping = {
"3ds_authentication": lambda e: (
e.verification.three_ds_result == "Y" and
e.verification.three_ds_liability_shift
),
"delivery_proof": lambda e: False, # Checked via delivery service
"avs_match": lambda e: e.verification.avs_result_code in ["Y", "A"],
"cvv_match": lambda e: e.verification.cvv_result_code == "M",
"device_fingerprint": lambda e: (
e.device.device_fingerprint and
len(e.device.device_fingerprint) > 20
),
"customer_communication": lambda e: False, # Checked separately
}

return mapping.get(evidence_type, lambda e: False)(evidence)

async def _build_evidence_package(
self,
chargeback: Chargeback,
evidence: TransactionEvidence
) -> EvidencePackage:
"""Build formatted evidence package for submission."""

package = EvidencePackage(
case_id=chargeback.chargeback_id,
created_at=datetime.utcnow()
)

# Transaction details
package.add_section("transaction_details", {
"transaction_date": evidence.transaction.captured_at.isoformat(),
"amount": f"{evidence.transaction.currency} {evidence.transaction.amount}",
"service_name": evidence.transaction.service_name,
"order_id": evidence.transaction.order_id,
})

# 3DS Authentication
if evidence.verification.three_ds_liability_shift:
package.add_section("3ds_authentication", {
"authentication_result": evidence.verification.three_ds_result,
"eci": evidence.verification.three_ds_eci,
"cavv": evidence.verification.three_ds_cavv,
"liability_shift": True,
})
package.compelling_evidence.append("3ds_authentication")

# AVS/CVV
if evidence.verification.avs_result_code in ["Y", "A"]:
package.add_section("avs_verification", {
"result": evidence.verification.avs_result_code,
"description": evidence.verification.avs_result_description,
})
package.compelling_evidence.append("avs_match")

if evidence.verification.cvv_result_code == "M":
package.add_section("cvv_verification", {
"result": "Match",
})
package.compelling_evidence.append("cvv_match")

# Device/IP information
package.add_section("device_information", {
"device_type": evidence.device.device_type,
"ip_country": evidence.network.ip_geo_country,
"ip_city": evidence.network.ip_geo_city,
})

# Delivery proof (if applicable and available)
delivery = await self.delivery_service.get_proof(chargeback.auth_id)
if delivery and delivery.proof_url:
package.add_section("delivery_proof", {
"carrier": delivery.carrier,
"tracking_number": delivery.tracking_number,
"delivery_date": delivery.delivered_at.isoformat(),
"signature_url": delivery.signature_url,
"proof_url": delivery.proof_url,
})
package.add_attachment("delivery_signature", delivery.signature_url)
package.compelling_evidence.append("delivery_proof")

# Customer communication
communications = await self.support_service.get_communications(chargeback.auth_id)
if communications:
package.add_section("customer_communication", {
"tickets": [c.to_dict() for c in communications],
})

return package

2.3 Dispute Outcome Processing

class DisputeOutcomeProcessor:
"""Process dispute outcomes and feed back into system."""

async def process_outcome(self, outcome: DisputeOutcome) -> None:
"""Process dispute resolution outcome."""

# 1. Update chargeback record
await self.db.execute("""
UPDATE chargebacks
SET
outcome = $1,
outcome_date = $2,
recovered_amount = $3,
final_status = $4
WHERE chargeback_id = $5
""",
outcome.result, # WON, LOST, PARTIAL
outcome.resolved_date,
outcome.recovered_amount,
"RESOLVED",
outcome.chargeback_id
)

# 2. Update representment case
await self.db.execute("""
UPDATE representment_cases
SET
status = 'RESOLVED',
outcome = $1,
resolved_at = $2
WHERE chargeback_id = $3
""",
outcome.result,
outcome.resolved_date,
outcome.chargeback_id
)

# 3. Get full context for learning
context = await self._get_outcome_context(outcome)

# 4. Write to training dataset
await self._write_to_training_data(context, outcome)

# 5. Update evidence effectiveness metrics
await self._update_evidence_effectiveness(context, outcome)

# 6. Update label for ML training
await self._finalize_label(context, outcome)

# 7. Publish for analytics
await self.kafka.produce("dispute-outcomes", {
"chargeback_id": outcome.chargeback_id,
"auth_id": context.auth_id,
"outcome": outcome.result,
"amount": float(context.amount),
"recovered": float(outcome.recovered_amount or 0),
"label_category": context.label_category,
"evidence_types_used": context.evidence_types,
"win_probability_predicted": context.predicted_win_probability,
})

async def _write_to_training_data(
self,
context: DisputeContext,
outcome: DisputeOutcome
):
"""Write resolved dispute to training dataset."""

# Get original transaction features
evidence = await self.evidence_service.get(context.evidence_id)

training_row = {
# Identifiers
"auth_id": context.auth_id,
"transaction_date": context.transaction_date.isoformat(),

# Features (from evidence snapshot)
"features": evidence.risk_assessment.__dict__,

# Labels
"is_fraud": context.label_category in ["CRIMINAL_FRAUD", "FRIENDLY_FRAUD"],
"fraud_type": context.label_category,
"chargeback_occurred": True,
"dispute_won": outcome.result == "WON",

# Outcome details
"chargeback_amount": float(context.amount),
"recovered_amount": float(outcome.recovered_amount or 0),
"days_to_chargeback": (context.chargeback_date - context.transaction_date).days,
"days_to_resolution": (outcome.resolved_date - context.chargeback_date).days,

# Evidence effectiveness
"evidence_types_available": context.evidence_types,
"evidence_package_quality": context.evidence_quality_score,

# Meta
"label_maturity_date": datetime.utcnow().isoformat(),
}

# Write to Delta Lake
await self.delta_lake.append("training_data_disputes", training_row)

async def _update_evidence_effectiveness(
self,
context: DisputeContext,
outcome: DisputeOutcome
):
"""Update evidence type effectiveness metrics."""

for evidence_type in context.evidence_types:
# Update win/loss counts
metric_key = f"evidence_effectiveness:{evidence_type}"

await self.redis.hincrby(metric_key, "total_cases", 1)

if outcome.result == "WON":
await self.redis.hincrby(metric_key, "wins", 1)
elif outcome.result == "LOST":
await self.redis.hincrby(metric_key, "losses", 1)
else: # PARTIAL
await self.redis.hincrbyfloat(metric_key, "partial_wins", 0.5)

# Recompute win rates (batch job runs daily)
await self.metrics_service.schedule_evidence_analysis()

3. Training Data Pipeline

3.1 Labeled Dataset Generation

class TrainingDataGenerator:
"""Generate training datasets with proper label handling."""

async def generate_training_set(
self,
start_date: datetime,
end_date: datetime,
label_maturity_days: int = 120
) -> str:
"""Generate training dataset with mature labels."""

# Only include transactions old enough for labels to mature
maturity_cutoff = datetime.utcnow() - timedelta(days=label_maturity_days)

if end_date > maturity_cutoff:
end_date = maturity_cutoff
logger.warning(f"Adjusted end_date to {end_date} for label maturity")

# Query transactions with outcomes
query = """
WITH transaction_outcomes AS (
SELECT
t.auth_id,
t.transaction_date,
t.amount_usd,
t.user_id,
t.card_token,
t.service_id,
t.event_subtype,

-- Chargeback info
c.chargeback_id,
c.reason_code,
c.label_category,
c.outcome AS dispute_outcome,
c.initiated_date AS chargeback_date,

-- Evidence snapshot
e.risk_assessment,
e.customer_profile,

-- Derived labels
CASE
WHEN c.chargeback_id IS NULL THEN 'LEGITIMATE'
WHEN c.label_category = 'CRIMINAL_FRAUD' THEN 'CRIMINAL_FRAUD'
WHEN c.label_category = 'FRIENDLY_FRAUD' THEN 'FRIENDLY_FRAUD'
WHEN c.label_category = 'SERVICE_ERROR' THEN 'SERVICE_ERROR'
ELSE 'UNKNOWN'
END AS final_label,

-- Time-based features
EXTRACT(EPOCH FROM (c.initiated_date - t.transaction_date)) / 86400
AS days_to_chargeback

FROM transactions t
LEFT JOIN chargebacks c ON t.auth_id = c.auth_id
LEFT JOIN evidence_vault e ON t.auth_id = e.auth_id

WHERE t.transaction_date BETWEEN :start_date AND :end_date
AND (c.chargeback_id IS NULL OR c.outcome IS NOT NULL) -- Resolved or no CB
)
SELECT * FROM transaction_outcomes
WHERE final_label != 'UNKNOWN'
"""

results = await self.db.query(query, start_date=start_date, end_date=end_date)

# Convert to training format
training_data = []
for row in results:
features = self._extract_features(row)
labels = self._extract_labels(row)

training_data.append({
**features,
**labels,
"auth_id": row["auth_id"],
"transaction_date": row["transaction_date"].isoformat(),
})

# Write to Delta Lake
dataset_id = f"training_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}"
await self.delta_lake.write(
f"training_sets/{dataset_id}",
training_data,
partition_by=["final_label"]
)

# Log dataset stats
stats = self._compute_stats(training_data)
logger.info(f"Generated training set {dataset_id}: {stats}")

return dataset_id

def _extract_features(self, row: dict) -> dict:
"""Extract feature vector from row."""

risk = row.get("risk_assessment", {})
profile = row.get("customer_profile", {})

return {
# Velocity features
"card_attempts_10m": risk.get("card_attempts_10m", 0),
"card_attempts_1h": risk.get("card_attempts_1h", 0),
"card_attempts_24h": risk.get("card_attempts_24h", 0),
"device_distinct_cards_1h": risk.get("device_distinct_cards_1h", 0),
"device_distinct_cards_24h": risk.get("device_distinct_cards_24h", 0),
"ip_distinct_cards_1h": risk.get("ip_distinct_cards_1h", 0),

# Amount features
"amount_usd": float(row["amount_usd"]),
"amount_zscore": risk.get("amount_zscore"),
"is_amount_anomaly": risk.get("is_amount_anomaly", False),

# Customer features
"account_age_days": profile.get("account_age_days", 0),
"chargeback_count_lifetime": profile.get("chargeback_count_lifetime", 0),
"chargeback_rate_90d": profile.get("chargeback_rate_90d", 0.0),
"refund_rate_90d": profile.get("refund_rate_90d", 0.0),
"distinct_cards_90d": profile.get("distinct_cards_90d", 1),

# Behavioral
"behavior_consistency_score": risk.get("behavior_consistency_score", 1.0),

# Signals
"velocity_rules_triggered_count": len(risk.get("velocity_rules_triggered", [])),
"signals_triggered_count": len(risk.get("signals_triggered", [])),

# ML scores (from previous model - for model comparison)
"prev_criminal_fraud_score": risk.get("criminal_fraud_score", 0),
"prev_friendly_fraud_score": risk.get("friendly_fraud_score", 0),
}

def _extract_labels(self, row: dict) -> dict:
"""Extract label information."""

return {
# Binary fraud label
"is_fraud": row["final_label"] in ["CRIMINAL_FRAUD", "FRIENDLY_FRAUD"],

# Multi-class label
"fraud_type": row["final_label"],

# Criminal vs friendly (for separate models)
"is_criminal_fraud": row["final_label"] == "CRIMINAL_FRAUD",
"is_friendly_fraud": row["final_label"] == "FRIENDLY_FRAUD",

# Chargeback occurred (regardless of type)
"chargeback_occurred": row["chargeback_id"] is not None,

# Dispute outcome (if applicable)
"dispute_won": row.get("dispute_outcome") == "WON",

# Days to chargeback (for survival analysis)
"days_to_chargeback": row.get("days_to_chargeback"),
}

def _compute_stats(self, data: list) -> dict:
"""Compute dataset statistics."""

df = pd.DataFrame(data)

return {
"total_records": len(df),
"legitimate_count": (df["fraud_type"] == "LEGITIMATE").sum(),
"criminal_fraud_count": (df["fraud_type"] == "CRIMINAL_FRAUD").sum(),
"friendly_fraud_count": (df["fraud_type"] == "FRIENDLY_FRAUD").sum(),
"service_error_count": (df["fraud_type"] == "SERVICE_ERROR").sum(),
"fraud_rate": df["is_fraud"].mean(),
"date_range": f"{df['transaction_date'].min()} to {df['transaction_date'].max()}",
}

3.2 Point-in-Time Feature Retrieval

class PointInTimeFeatureStore:
"""Retrieve features as they existed at transaction time."""

async def get_features_at_time(
self,
auth_id: str,
transaction_time: datetime
) -> dict:
"""Get features exactly as they were at transaction time."""

# First, try evidence vault (gold standard)
evidence = await self.db.query_one("""
SELECT risk_assessment, customer_profile
FROM evidence_vault
WHERE auth_id = $1
""", auth_id)

if evidence:
return {
**evidence["risk_assessment"],
**evidence["customer_profile"]
}

# Fallback: reconstruct from historical feature store
# (Less accurate but better than nothing)
return await self._reconstruct_features(auth_id, transaction_time)

async def _reconstruct_features(
self,
auth_id: str,
transaction_time: datetime
) -> dict:
"""Reconstruct features from historical snapshots."""

# Get transaction details
txn = await self.db.query_one("""
SELECT user_id, card_token, device_fingerprint, ip_hash
FROM transactions
WHERE auth_id = $1
""", auth_id)

if not txn:
return {}

# Query feature store history
# (Feast maintains historical feature values)
features = await self.feast.get_historical_features(
entity_df=pd.DataFrame([{
"user_id": txn["user_id"],
"card_token": txn["card_token"],
"device_fingerprint": txn["device_fingerprint"],
"event_timestamp": transaction_time
}]),
features=[
"user_features:chargeback_count_lifetime",
"user_features:chargeback_rate_90d",
"user_features:account_age_days",
"card_features:attempts_24h",
"device_features:distinct_cards_24h",
# ... other features
]
)

return features.to_dict(orient="records")[0]

4. Economic Optimization Service

4.1 Approval-Loss Trade-off Analysis

class ApprovalLossAnalyzer:
"""Analyze trade-offs between approval rate and fraud loss."""

async def compute_tradeoff_curve(
self,
start_date: datetime,
end_date: datetime,
score_column: str = "criminal_fraud_score"
) -> TradeoffCurve:
"""Compute approval rate vs fraud loss curve."""

# Load historical decisions with outcomes
data = await self.db.query("""
SELECT
d.auth_id,
d.criminal_fraud_score,
d.friendly_fraud_score,
d.decision_action,
t.amount_usd,
CASE WHEN c.chargeback_id IS NOT NULL THEN true ELSE false END as is_fraud,
c.label_category
FROM decisions d
JOIN transactions t ON d.auth_id = t.auth_id
LEFT JOIN chargebacks c ON t.auth_id = c.auth_id
WHERE d.timestamp BETWEEN :start AND :end
AND (c.chargeback_id IS NULL OR c.outcome IS NOT NULL)
""", start=start_date, end=end_date)

df = pd.DataFrame(data)

# Compute metrics at different thresholds
thresholds = np.arange(0.05, 0.95, 0.02)
results = []

total_transactions = len(df)
total_fraud = df["is_fraud"].sum()
total_legitimate = total_transactions - total_fraud

for threshold in thresholds:
# Simulate blocking at this threshold
blocked = df[df[score_column] >= threshold]
allowed = df[df[score_column] < threshold]

# Metrics
true_positives = blocked["is_fraud"].sum()
false_positives = len(blocked) - true_positives
false_negatives = allowed["is_fraud"].sum()
true_negatives = len(allowed) - false_negatives

# Rates
approval_rate = len(allowed) / total_transactions
fraud_caught_rate = true_positives / total_fraud if total_fraud > 0 else 0
false_positive_rate = false_positives / total_legitimate if total_legitimate > 0 else 0

# Dollar amounts
fraud_blocked_amount = blocked[blocked["is_fraud"]]["amount_usd"].sum()
fraud_passed_amount = allowed[allowed["is_fraud"]]["amount_usd"].sum()
legitimate_blocked_amount = blocked[~blocked["is_fraud"]]["amount_usd"].sum()

# Economic impact
fraud_loss = fraud_passed_amount * 1.25 # Amount + fees
lost_revenue = legitimate_blocked_amount
net_impact = fraud_loss + lost_revenue

results.append({
"threshold": threshold,
"approval_rate": approval_rate,
"fraud_caught_rate": fraud_caught_rate,
"false_positive_rate": false_positive_rate,
"precision": true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0,
"recall": fraud_caught_rate,
"f1_score": 2 * (fraud_caught_rate * (true_positives / (true_positives + false_positives))) / (fraud_caught_rate + (true_positives / (true_positives + false_positives))) if (true_positives + false_positives) > 0 else 0,
"fraud_blocked_usd": fraud_blocked_amount,
"fraud_passed_usd": fraud_passed_amount,
"legitimate_blocked_usd": legitimate_blocked_amount,
"net_loss_usd": net_impact,
})

return TradeoffCurve(
data=results,
optimal_threshold=self._find_optimal(results),
date_range=f"{start_date.date()} to {end_date.date()}",
transaction_count=total_transactions,
fraud_count=total_fraud
)

def _find_optimal(self, results: list) -> float:
"""Find threshold that minimizes net loss."""
min_loss = float("inf")
optimal = 0.5

for row in results:
if row["net_loss_usd"] < min_loss:
min_loss = row["net_loss_usd"]
optimal = row["threshold"]

return optimal

4.2 Risk Budget Management

class RiskBudgetManager:
"""Manage fraud loss budgets and thresholds."""

async def get_current_budget(self, service_id: Optional[str] = None) -> RiskBudget:
"""Get current risk budget configuration."""

if service_id:
budget = await self.db.query_one(
"SELECT * FROM risk_budgets WHERE service_id = $1",
service_id
)
else:
budget = await self.db.query_one(
"SELECT * FROM risk_budgets WHERE service_id IS NULL"
)

return RiskBudget(
budget_id=budget["budget_id"],
service_id=budget.get("service_id"),
monthly_fraud_loss_limit_usd=budget["monthly_fraud_loss_limit_usd"],
target_approval_rate=budget["target_approval_rate"],
max_fraud_rate=budget["max_fraud_rate"],
current_month_loss_usd=await self._get_current_month_loss(service_id),
current_month_fraud_rate=await self._get_current_month_fraud_rate(service_id),
thresholds=budget["thresholds"],
updated_at=budget["updated_at"],
updated_by=budget["updated_by"]
)

async def check_budget_status(
self,
service_id: Optional[str] = None
) -> BudgetStatus:
"""Check if current fraud loss is within budget."""

budget = await self.get_current_budget(service_id)

utilization = budget.current_month_loss_usd / budget.monthly_fraud_loss_limit_usd

if utilization >= 1.0:
status = "EXCEEDED"
recommended_action = "TIGHTEN_THRESHOLDS"
elif utilization >= 0.8:
status = "WARNING"
recommended_action = "MONITOR_CLOSELY"
elif utilization >= 0.5:
status = "ON_TRACK"
recommended_action = None
else:
status = "UNDER_UTILIZED"
recommended_action = "CONSIDER_RELAXING"

return BudgetStatus(
budget=budget,
utilization_pct=utilization * 100,
status=status,
days_remaining=self._days_remaining_in_month(),
projected_month_end_loss=self._project_month_end(budget),
recommended_action=recommended_action
)

async def adjust_thresholds_for_budget(
self,
service_id: Optional[str] = None
) -> ThresholdAdjustment:
"""Automatically adjust thresholds based on budget status."""

status = await self.check_budget_status(service_id)
current_thresholds = status.budget.thresholds

if status.status == "EXCEEDED":
# Tighten thresholds by 10%
adjustment = -0.10
elif status.status == "WARNING":
# Tighten slightly
adjustment = -0.05
elif status.status == "UNDER_UTILIZED":
# Relax slightly to improve approval rate
adjustment = +0.03
else:
return ThresholdAdjustment(
adjusted=False,
reason="Budget on track, no adjustment needed"
)

new_thresholds = {
key: max(0.3, min(0.95, value + adjustment))
for key, value in current_thresholds.items()
}

# Validate new thresholds won't cause issues
validation = await self._validate_thresholds(new_thresholds)
if not validation.valid:
return ThresholdAdjustment(
adjusted=False,
reason=validation.reason
)

# Apply new thresholds
await self._apply_thresholds(service_id, new_thresholds, f"auto_budget_adjustment_{status.status}")

return ThresholdAdjustment(
adjusted=True,
old_thresholds=current_thresholds,
new_thresholds=new_thresholds,
reason=f"Adjusted for budget status: {status.status}",
adjustment_factor=adjustment
)

4.3 Business User Interface (API)

# FastAPI endpoints for business users

@router.get("/risk-budget")
async def get_risk_budget(
service_id: Optional[str] = None,
current_user: User = Depends(get_current_user)
):
"""Get current risk budget and utilization."""
require_permission(current_user, "risk_budget:read")

manager = RiskBudgetManager()
status = await manager.check_budget_status(service_id)

return {
"budget": status.budget.to_dict(),
"status": status.status,
"utilization_pct": status.utilization_pct,
"days_remaining": status.days_remaining,
"projected_month_end_loss": status.projected_month_end_loss,
"recommended_action": status.recommended_action
}


@router.post("/risk-budget/thresholds")
async def update_thresholds(
request: ThresholdUpdateRequest,
current_user: User = Depends(get_current_user)
):
"""Update fraud detection thresholds."""
require_permission(current_user, "risk_budget:write")

# Validate thresholds
if not 0.3 <= request.criminal_fraud_block <= 0.95:
raise HTTPException(400, "Block threshold must be between 0.3 and 0.95")

# Log change for audit
await audit_log.record({
"action": "threshold_update",
"user": current_user.id,
"old_thresholds": await get_current_thresholds(request.service_id),
"new_thresholds": request.thresholds,
"reason": request.reason
})

# Apply thresholds
await apply_thresholds(request.service_id, request.thresholds)

return {"status": "updated", "effective_at": datetime.utcnow().isoformat()}


@router.get("/analytics/tradeoff-curve")
async def get_tradeoff_curve(
start_date: date,
end_date: date,
service_id: Optional[str] = None,
current_user: User = Depends(get_current_user)
):
"""Get approval rate vs fraud loss trade-off curve."""
require_permission(current_user, "analytics:read")

analyzer = ApprovalLossAnalyzer()
curve = await analyzer.compute_tradeoff_curve(
datetime.combine(start_date, datetime.min.time()),
datetime.combine(end_date, datetime.max.time())
)

return {
"curve": curve.data,
"optimal_threshold": curve.optimal_threshold,
"current_threshold": await get_current_threshold("criminal_fraud_block"),
"date_range": curve.date_range,
"transaction_count": curve.transaction_count,
"fraud_count": curve.fraud_count
}


@router.get("/analytics/threshold-simulation")
async def simulate_threshold_change(
new_threshold: float,
start_date: date,
end_date: date,
current_user: User = Depends(get_current_user)
):
"""Simulate impact of threshold change on historical data."""
require_permission(current_user, "analytics:read")

simulator = ThresholdSimulator()
result = await simulator.simulate(
new_threshold=new_threshold,
start_date=datetime.combine(start_date, datetime.min.time()),
end_date=datetime.combine(end_date, datetime.max.time())
)

return {
"current_threshold": result.current_threshold,
"proposed_threshold": new_threshold,
"impact": {
"approval_rate_change": result.approval_rate_delta,
"fraud_caught_change": result.fraud_caught_delta,
"net_revenue_change_usd": result.net_revenue_delta,
"false_positives_change": result.false_positives_delta
},
"recommendation": result.recommendation
}

5. Key Performance Metrics

5.1 Metrics Definitions

class FraudMetricsCalculator:
"""Calculate and track fraud platform KPIs."""

async def compute_daily_metrics(self, date: datetime.date) -> DailyMetrics:
"""Compute all KPIs for a given day."""

# Get day's data
decisions = await self._get_decisions(date)
chargebacks = await self._get_chargebacks_for_transactions(date)

total_transactions = len(decisions)
total_amount = sum(d["amount_usd"] for d in decisions)

# Approval rate
approved = [d for d in decisions if d["action"] in ["ALLOW", "FRICTION"]]
approval_rate = len(approved) / total_transactions if total_transactions > 0 else 0

# Block rate
blocked = [d for d in decisions if d["action"] == "BLOCK"]
block_rate = len(blocked) / total_transactions if total_transactions > 0 else 0

# Fraud rate (lagged - uses historical chargebacks)
fraud_count = len(chargebacks)
fraud_rate = fraud_count / total_transactions if total_transactions > 0 else 0

# Fraud loss
fraud_loss = sum(c["amount"] for c in chargebacks)
fraud_loss_with_fees = fraud_loss * 1.25 # Including fees and penalties

# Net fraud loss rate
net_fraud_loss_rate = fraud_loss_with_fees / total_amount if total_amount > 0 else 0

# False positive estimate (blocked transactions that were likely legitimate)
# Based on historical false positive rate calibration
estimated_false_positives = len(blocked) * 0.15 # Assume 15% of blocks are FP

# Dispute win rate (from resolved disputes)
disputes = await self._get_resolved_disputes(date)
if disputes:
dispute_win_rate = sum(1 for d in disputes if d["outcome"] == "WON") / len(disputes)
else:
dispute_win_rate = None

return DailyMetrics(
date=date,
transaction_count=total_transactions,
total_amount_usd=total_amount,
approval_rate=approval_rate,
block_rate=block_rate,
friction_rate=len([d for d in decisions if d["action"] == "FRICTION"]) / total_transactions,
review_rate=len([d for d in decisions if d["action"] == "REVIEW"]) / total_transactions,
fraud_rate=fraud_rate,
fraud_loss_usd=fraud_loss,
net_fraud_loss_rate=net_fraud_loss_rate,
false_positive_estimate=estimated_false_positives,
dispute_win_rate=dispute_win_rate,
avg_decision_latency_ms=np.mean([d["latency_ms"] for d in decisions]),
p99_decision_latency_ms=np.percentile([d["latency_ms"] for d in decisions], 99),
)

async def compute_ltv_impact(self, date_range: tuple) -> LTVImpact:
"""Estimate customer LTV impact from fraud decisions."""

start, end = date_range

# Get blocked legitimate customers (false positives)
false_positives = await self.db.query("""
SELECT DISTINCT d.user_id
FROM decisions d
LEFT JOIN chargebacks c ON d.auth_id = c.auth_id
WHERE d.timestamp BETWEEN :start AND :end
AND d.action = 'BLOCK'
AND c.chargeback_id IS NULL -- No chargeback = likely legitimate
AND d.transaction_date < NOW() - INTERVAL '120 days' -- Mature
""", start=start, end=end)

# Check if these users churned
churned_users = await self._check_churn(false_positives)

# Estimate LTV loss
avg_ltv = 500 # Configurable
estimated_ltv_loss = len(churned_users) * avg_ltv

return LTVImpact(
false_positive_count=len(false_positives),
estimated_churn_count=len(churned_users),
churn_rate=len(churned_users) / len(false_positives) if false_positives else 0,
estimated_ltv_loss_usd=estimated_ltv_loss,
date_range=f"{start.date()} to {end.date()}"
)

5.2 Metrics Storage & Serving

-- Metrics tables

CREATE TABLE daily_fraud_metrics (
metric_date DATE PRIMARY KEY,
transaction_count INTEGER NOT NULL,
total_amount_usd DECIMAL(15, 2) NOT NULL,
approval_rate DECIMAL(5, 4) NOT NULL,
block_rate DECIMAL(5, 4) NOT NULL,
friction_rate DECIMAL(5, 4) NOT NULL,
review_rate DECIMAL(5, 4) NOT NULL,
fraud_rate DECIMAL(6, 5), -- Nullable until mature
fraud_loss_usd DECIMAL(12, 2),
net_fraud_loss_rate DECIMAL(6, 5),
false_positive_estimate INTEGER,
dispute_win_rate DECIMAL(5, 4),
avg_decision_latency_ms DECIMAL(8, 2),
p99_decision_latency_ms DECIMAL(8, 2),
computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
is_mature BOOLEAN NOT NULL DEFAULT FALSE
);

-- Materialized view for dashboards
CREATE MATERIALIZED VIEW fraud_metrics_weekly AS
SELECT
DATE_TRUNC('week', metric_date) as week_start,
SUM(transaction_count) as transaction_count,
SUM(total_amount_usd) as total_amount_usd,
AVG(approval_rate) as avg_approval_rate,
AVG(fraud_rate) FILTER (WHERE is_mature) as avg_fraud_rate,
SUM(fraud_loss_usd) FILTER (WHERE is_mature) as total_fraud_loss,
AVG(dispute_win_rate) FILTER (WHERE dispute_win_rate IS NOT NULL) as avg_dispute_win_rate,
AVG(p99_decision_latency_ms) as avg_p99_latency
FROM daily_fraud_metrics
GROUP BY DATE_TRUNC('week', metric_date)
ORDER BY week_start DESC;

-- Refresh weekly
CREATE OR REPLACE FUNCTION refresh_weekly_metrics()
RETURNS void AS $$
BEGIN
REFRESH MATERIALIZED VIEW fraud_metrics_weekly;
END;
$$ LANGUAGE plpgsql;

Next: Part 5: Testing, Validation, Monitoring & Sprint-1 Checklist