Skip to main content

Part 2: External Entities, Data Schemas & Feature Engineering

Payment & Chargeback Fraud Platform - Principal-Level Design Document

Phase 1: Real-Time Streaming & Decisioning


1. Entity Profiling Architecture

1.1 Entity Types & Storage Design

EntityRedis Key PatternTTLUpdate FrequencyPrimary Use
Userentity:user:{user_id}90 daysEvery transactionAccount-level risk, dispute history
Deviceentity:device:{device_fingerprint}30 daysEvery transactionDevice sharing, bot detection
Cardentity:card:{card_token}60 daysEvery transactionCard testing, velocity limits
IP Addressentity:ip:{ip_hash}7 daysEvery transactionProxy detection, geo-velocity
Serviceentity:service:{service_id}180 daysEvery transactionService-level fraud rates
BINentity:bin:{bin_6}24 hours (cache)Daily batchIssuer intelligence

1.2 Redis Data Structures

User Entity

Key: entity:user:{user_id}
Type: HASH
TTL: 90 days

Fields:
created_at: "2024-01-15T10:30:00Z"
total_transactions_lifetime: "156"
total_amount_lifetime_usd: "12450.00"
total_chargebacks_lifetime: "2"
total_disputes_won: "1"
total_disputes_lost: "1"
last_transaction_at: "2025-01-14T15:22:00Z"
last_chargeback_at: "2024-08-10T00:00:00Z"
distinct_cards_90d: "3"
distinct_devices_90d: "2"
distinct_ips_30d: "5"
distinct_services_90d: "2"
refund_count_90d: "4"
refund_amount_90d_usd: "245.00"
avg_transaction_amount_90d: "85.50"
risk_tier: "medium" # low/medium/high/blocked
manual_review_count_90d: "1"
friction_bypass_count_30d: "0"

Device Entity

Key: entity:device:{device_fingerprint}
Type: HASH
TTL: 30 days

Fields:
first_seen_at: "2024-11-20T08:15:00Z"
last_seen_at: "2025-01-14T15:22:00Z"
distinct_users_24h: "1"
distinct_users_7d: "2"
distinct_cards_24h: "1"
distinct_cards_7d: "3"
total_transactions_24h: "2"
total_transactions_7d: "8"
total_amount_24h_usd: "150.00"
total_amount_7d_usd: "680.00"
decline_count_24h: "0"
decline_count_7d: "1"
fraud_confirmed_count: "0"
is_known_bot: "false"
is_emulator: "false"
browser_fingerprint_hash: "abc123..."
os_type: "iOS"
device_model: "iPhone 14 Pro"

Card Entity

Key: entity:card:{card_token}
Type: HASH
TTL: 60 days

Fields:
first_seen_at: "2024-06-10T12:00:00Z"
last_seen_at: "2025-01-14T15:22:00Z"
bin_6: "424242"
issuer_country: "US"
card_type: "credit"
card_brand: "visa"
total_transactions_lifetime: "45"
total_amount_lifetime_usd: "3250.00"
total_chargebacks: "0"
distinct_users_lifetime: "1"
distinct_services_30d: "8"
distinct_devices_7d: "2"
distinct_ips_24h: "1"
attempts_10m: "1"
attempts_1h: "2"
attempts_24h: "3"
decline_count_24h: "0"
decline_count_7d: "0"
avg_transaction_amount: "72.22"
max_transaction_amount: "450.00"
last_decline_reason: null
velocity_alert_triggered: "false"

IP Entity

Key: entity:ip:{ip_hash}
Type: HASH
TTL: 7 days

Fields:
first_seen_at: "2025-01-10T09:00:00Z"
last_seen_at: "2025-01-14T15:22:00Z"
geo_country: "US"
geo_region: "California"
geo_city: "San Francisco"
geo_lat: "37.7749"
geo_lon: "-122.4194"
is_proxy: "false"
is_vpn: "false"
is_tor: "false"
is_datacenter: "false"
isp_name: "Comcast"
asn: "AS7922"
distinct_users_24h: "1"
distinct_users_7d: "2"
distinct_cards_24h: "1"
distinct_cards_7d: "3"
distinct_devices_24h: "1"
total_transactions_1h: "1"
total_transactions_24h: "3"
total_amount_24h_usd: "250.00"
decline_count_24h: "0"
fraud_confirmed_count: "0"

Service Entity

Key: entity:service:{service_id}
Type: HASH
TTL: 180 days

Fields:
service_name: "Mobile Postpaid"
service_type: "mobile"
event_subtypes: "sim_activation,topup,device_upgrade,sim_swap"
country: "US"
onboarded_at: "2023-01-15T00:00:00Z"
total_transactions_lifetime: "125000"
total_volume_lifetime_usd: "8500000.00"
chargeback_count_lifetime: "342"
chargeback_rate_90d: "0.0028" # 0.28%
fraud_rate_90d: "0.0015"
friendly_fraud_rate_90d: "0.0012"
avg_ticket_size: "68.00"
refund_rate_30d: "0.045"
dispute_win_rate: "0.62"
risk_tier: "low"
velocity_limit_override: null
custom_rules_enabled: "true"

2. Canonical PaymentEvent Schema

2.1 Full Schema Definition

@dataclass
class PaymentEvent:
# Identifiers
event_id: str # Internal unique ID (UUID v7 for time-ordering)
idempotency_key: str # SHA256 hash for deduplication
auth_id: str # Transaction authorization ID
source_system: str # "stripe", "adyen", "braintree"
source_event_id: str # PSP's original event ID

# Event metadata
event_type: EventType # AUTHORIZATION, CAPTURE, REFUND, etc.
event_timestamp: datetime # When event occurred (from PSP)
received_at: datetime # When we received it
api_version: str # PSP API version

# Transaction details
amount: Decimal # Transaction amount
currency: str # ISO 4217 (USD, EUR, etc.)
amount_usd: Decimal # Normalized to USD for comparison
transaction_type: str # "purchase", "recurring", "refund"
is_recurring: bool
recurring_sequence: int # 1st, 2nd, nth recurring charge

# Card information (tokenized - NO raw PAN)
card_token: str # Tokenized card reference
bin_6: str # First 6 digits (for BIN lookup)
bin_8: str # First 8 digits (if available)
last_4: str # Last 4 digits (for display)
card_brand: str # visa, mastercard, amex
card_type: str # credit, debit, prepaid
card_country: str # Issuing country
exp_month: int
exp_year: int

# Cardholder / User
user_id: Optional[str] # Internal user ID (if authenticated)
email_hash: str # SHA256 of email (for linking)
phone_hash: Optional[str] # SHA256 of phone

# Billing address
billing_address: Address
billing_country: str
billing_zip: str

# Shipping address (if different)
shipping_address: Optional[Address]
shipping_country: Optional[str]
shipping_zip: Optional[str]

# Device & network
device_fingerprint: str # Client-side fingerprint
ip_address: str # Hashed or raw (based on PCI scope)
user_agent: str
session_id: str
device_id: Optional[str] # Native app device ID

# Verification results
avs_result: str # Address Verification (Y, N, A, Z, etc.)
cvv_result: str # CVV check (M, N, P, U, etc.)
three_ds_result: Optional[ThreeDSResult]
three_ds_version: Optional[str]
three_ds_eci: Optional[str]

# Service context
service_id: str
service_name: str
event_subtype: str # sim_activation, topup, device_upgrade, etc.
service_country: str
descriptor: str # Statement descriptor

# External signals (populated if available)
issuer_risk_score: Optional[float]
network_risk_score: Optional[float]
consortium_score: Optional[float]

# Telco/MSP signals (Phase 2 hooks - nullable for now)
account_tenure_days: Optional[int]
subscription_type: Optional[str]
usage_anomaly_score: Optional[float]
sim_swap_detected: Optional[bool]

@dataclass
class Address:
line1: str
line2: Optional[str]
city: str
state: str
postal_code: str
country: str

@dataclass
class ThreeDSResult:
authentication_result: str # Y, N, A, U, R
transaction_status: str
cavv: str
directory_response: str
liability_shift: bool

class EventType(Enum):
AUTHORIZATION = "authorization"
CAPTURE = "capture"
VOID = "void"
REFUND = "refund"
CHARGEBACK_INITIATED = "chargeback_initiated"
CHARGEBACK_OUTCOME = "chargeback_outcome"
ISSUER_ALERT = "issuer_alert"

2.2 Event Normalization from PSP Webhooks

class EventNormalizer:
"""Normalize raw PSP webhooks to canonical PaymentEvent."""

NORMALIZERS = {
"stripe": StripeNormalizer(),
"adyen": AdyenNormalizer(),
"braintree": BraintreeNormalizer(),
}

async def normalize(self, raw_event: dict, source: str) -> PaymentEvent:
normalizer = self.NORMALIZERS.get(source)
if not normalizer:
raise UnsupportedSourceError(f"Unknown source: {source}")

# 1. Extract and validate required fields
event = normalizer.extract(raw_event)

# 2. Compute idempotency key
event.idempotency_key = self._compute_idempotency_key(
source, event.event_type, event.source_event_id, event.event_timestamp
)

# 3. Generate internal event ID (UUID v7 for time-ordering)
event.event_id = uuid7()

# 4. Normalize currency to USD
event.amount_usd = await self._convert_to_usd(event.amount, event.currency)

# 5. Enrich with BIN data
event = await self._enrich_bin_data(event)

# 6. Validate schema
self._validate(event)

return event

def _compute_idempotency_key(
self, source: str, event_type: str, source_id: str, timestamp: datetime
) -> str:
raw = f"{source}:{event_type}:{source_id}:{timestamp.isoformat()}"
return hashlib.sha256(raw.encode()).hexdigest()


class StripeNormalizer:
"""Stripe-specific field mappings."""

def extract(self, raw: dict) -> PaymentEvent:
charge = raw.get("data", {}).get("object", {})
card = charge.get("payment_method_details", {}).get("card", {})

return PaymentEvent(
source_system="stripe",
source_event_id=raw["id"],
event_type=self._map_event_type(raw["type"]),
event_timestamp=datetime.fromtimestamp(raw["created"]),
received_at=datetime.utcnow(),
api_version=raw.get("api_version"),

auth_id=charge["id"],
amount=Decimal(charge["amount"]) / 100, # Stripe uses cents
currency=charge["currency"].upper(),

card_token=charge.get("payment_method"),
bin_6=card.get("fingerprint", "")[:6], # Stripe uses fingerprint
last_4=card.get("last4"),
card_brand=card.get("brand"),
card_type=card.get("funding"), # credit, debit, prepaid
card_country=card.get("country"),

# ... continue mapping all fields
)

def _map_event_type(self, stripe_type: str) -> EventType:
mapping = {
"charge.succeeded": EventType.AUTHORIZATION,
"charge.captured": EventType.CAPTURE,
"charge.refunded": EventType.REFUND,
"charge.dispute.created": EventType.CHARGEBACK_INITIATED,
"charge.dispute.closed": EventType.CHARGEBACK_OUTCOME,
}
return mapping.get(stripe_type, EventType.AUTHORIZATION)

3. Entity-Centric Feature Definitions

3.1 Complete Feature Catalog

User Features

Feature NameFormula/LogicWindowStorageUse Case
user_transaction_count_24hCOUNT(txns) WHERE user_id = X AND timestamp > now - 24h24h slidingRedis ZSETVelocity
user_transaction_count_7dCOUNT(txns) WHERE user_id = X AND timestamp > now - 7d7d slidingRedis ZSETBaseline
user_total_amount_24h_usdSUM(amount_usd) WHERE user_id = X AND timestamp > now - 24h24h slidingRedisSpend velocity
user_distinct_cards_30dCOUNT(DISTINCT card_token) WHERE user_id = X30d slidingRedis HyperLogLogMulti-card abuse
user_distinct_services_7dCOUNT(DISTINCT service_id) WHERE user_id = X7d slidingRedis HyperLogLogUsage pattern
user_chargeback_count_lifetimeCOUNT(chargebacks) WHERE user_id = XAll timeRedis HASHRisk scoring
user_chargeback_rate_90dchargebacks_90d / transactions_90d90dComputedFriendly fraud
user_refund_rate_30drefunds_30d / transactions_30d30dComputedAbuse pattern
user_decline_rate_7ddeclines_7d / attempts_7d7dComputedCard testing
user_avg_ticket_90dAVG(amount_usd) over 90d90dRedisAnomaly detection
user_max_ticket_90dMAX(amount_usd) over 90d90dRedisAnomaly detection
user_days_since_first_txnnow - first_transaction_dateN/AComputedAccount age
user_days_since_last_chargebacknow - last_chargeback_dateN/AComputedRecency

Device Features

Feature NameFormula/LogicWindowUse Case
device_distinct_cards_1hCOUNT(DISTINCT card_token)1h slidingCard testing
device_distinct_cards_24hCOUNT(DISTINCT card_token)24h slidingMulti-card fraud
device_distinct_users_24hCOUNT(DISTINCT user_id)24h slidingAccount sharing/takeover
device_transaction_count_1hCOUNT(txns)1h slidingVelocity attack
device_transaction_count_24hCOUNT(txns)24h slidingSustained attack
device_decline_count_1hCOUNT(declined txns)1h slidingCard testing
device_decline_rate_24hdeclines / attempts24hTesting pattern
device_total_amount_24h_usdSUM(amount_usd)24h slidingFraud exposure
device_is_emulatorBoolean from fingerprint analysisN/ABot detection
device_is_known_fraudulentLookup in blocklistN/ABlocklist
device_age_hoursnow - first_seen_atN/ANew device risk
device_fraud_history_countCOUNT(confirmed fraud txns)All timeHistorical risk

Card Features

Feature NameFormula/LogicWindowUse Case
card_attempts_10mCOUNT(auth attempts)10m slidingRapid testing
card_attempts_1hCOUNT(auth attempts)1h slidingVelocity
card_attempts_24hCOUNT(auth attempts)24h slidingSustained attack
card_distinct_devices_1hCOUNT(DISTINCT device_fp)1h slidingDistributed attack
card_distinct_devices_24hCOUNT(DISTINCT device_fp)24h slidingMulti-device use
card_distinct_ips_1hCOUNT(DISTINCT ip_hash)1h slidingIP hopping
card_distinct_services_24hCOUNT(DISTINCT service_id)24h slidingCross-service testing
card_decline_count_1hCOUNT(declined)1h slidingTesting signal
card_decline_count_24hCOUNT(declined)24h slidingCard status
card_total_amount_24h_usdSUM(amount_usd)24h slidingSpend velocity
card_avg_amount_30dAVG(amount_usd)30dBaseline
card_amount_zscore(current_amount - avg_30d) / stddev_30dN/AAnomaly
card_chargeback_countCOUNT(chargebacks)All timeCard risk
card_days_since_first_seennow - first_seen_atN/ACard age

IP Features

Feature NameFormula/LogicWindowUse Case
ip_distinct_cards_1hCOUNT(DISTINCT card_token)1h slidingBIN attack
ip_distinct_cards_24hCOUNT(DISTINCT card_token)24h slidingMulti-card fraud
ip_distinct_users_1hCOUNT(DISTINCT user_id)1h slidingATO cluster
ip_distinct_users_24hCOUNT(DISTINCT user_id)24h slidingShared IP
ip_transaction_count_1hCOUNT(txns)1h slidingVelocity
ip_decline_rate_1hdeclines / attempts1hTesting pattern
ip_is_proxyBoolean from IP intelligenceN/AAnonymization
ip_is_vpnBoolean from IP intelligenceN/AAnonymization
ip_is_torBoolean from Tor exit node listN/AHigh risk
ip_is_datacenterBoolean from ASN classificationN/ABot/scraper
ip_fraud_history_countCOUNT(confirmed fraud from IP)90dHistorical risk
ip_geo_distance_from_billing_kmHaversine(ip_geo, billing_geo)N/AGeo mismatch

Service Features

Feature NameFormula/LogicWindowUse Case
service_chargeback_rate_90dchargebacks / transactions90dService risk
service_fraud_rate_90dcriminal_fraud / transactions90dAttack target
service_friendly_fraud_rate_90dfriendly_fraud / transactions90dAbuse magnet
service_avg_ticketAVG(amount_usd)90dBaseline
service_refund_rate_30drefunds / transactions30dOps issues
service_dispute_win_ratewon / total disputesAll timeEvidence quality
service_risk_tierCategorical (low/medium/high)N/APolicy routing
class VelocityFeatureJob:
"""Flink job for maintaining sliding window velocity features."""

def process(self, event: PaymentEvent):
current_time = event.event_timestamp

# ===== CARD VELOCITY FEATURES =====
card_key = f"velocity:card:{event.card_token}"

# Use Redis sorted set with timestamp as score
# ZADD adds event, ZREMRANGEBYSCORE removes expired
await self.redis.zadd(
f"{card_key}:attempts",
{event.event_id: current_time.timestamp()}
)

# Clean up old entries (older than 24h)
cutoff_24h = (current_time - timedelta(hours=24)).timestamp()
await self.redis.zremrangebyscore(f"{card_key}:attempts", "-inf", cutoff_24h)

# Compute counts for different windows
cutoff_10m = (current_time - timedelta(minutes=10)).timestamp()
cutoff_1h = (current_time - timedelta(hours=1)).timestamp()

card_attempts_10m = await self.redis.zcount(
f"{card_key}:attempts", cutoff_10m, "+inf"
)
card_attempts_1h = await self.redis.zcount(
f"{card_key}:attempts", cutoff_1h, "+inf"
)
card_attempts_24h = await self.redis.zcount(
f"{card_key}:attempts", cutoff_24h, "+inf"
)

# Update hash with computed values
await self.redis.hset(f"entity:card:{event.card_token}", mapping={
"attempts_10m": card_attempts_10m,
"attempts_1h": card_attempts_1h,
"attempts_24h": card_attempts_24h,
"last_seen_at": current_time.isoformat(),
})

# ===== DISTINCT COUNTS (HyperLogLog) =====
# Device distinct cards
device_key = f"hll:device:{event.device_fingerprint}:cards"
await self.redis.pfadd(f"{device_key}:1h", event.card_token)
await self.redis.pfadd(f"{device_key}:24h", event.card_token)

# Set expiry on HLL keys
await self.redis.expire(f"{device_key}:1h", 3600)
await self.redis.expire(f"{device_key}:24h", 86400)

# Get distinct counts
device_distinct_cards_1h = await self.redis.pfcount(f"{device_key}:1h")
device_distinct_cards_24h = await self.redis.pfcount(f"{device_key}:24h")

await self.redis.hset(f"entity:device:{event.device_fingerprint}", mapping={
"distinct_cards_1h": device_distinct_cards_1h,
"distinct_cards_24h": device_distinct_cards_24h,
})

# ===== AMOUNT AGGREGATIONS =====
# Use Redis to track running sum
await self.redis.hincrbyfloat(
f"entity:card:{event.card_token}",
"total_amount_24h_usd",
float(event.amount_usd)
)

# ===== DECLINE TRACKING =====
if event.decision == "DECLINE":
await self.redis.hincrby(
f"entity:card:{event.card_token}",
"decline_count_24h",
1
)
await self.redis.hincrby(
f"entity:device:{event.device_fingerprint}",
"decline_count_24h",
1
)

async def compute_rates(self, entity_type: str, entity_id: str) -> dict:
"""Compute rate-based features from counts."""
key = f"entity:{entity_type}:{entity_id}"
data = await self.redis.hgetall(key)

attempts_24h = int(data.get("attempts_24h", 0))
declines_24h = int(data.get("decline_count_24h", 0))

decline_rate_24h = declines_24h / attempts_24h if attempts_24h > 0 else 0.0

return {
"decline_rate_24h": decline_rate_24h,
"attempts_24h": attempts_24h,
"declines_24h": declines_24h,
}

3.3 Batch Feature Backfill (for Historical Analysis)

-- Daily batch job to compute 90-day features
-- Runs after midnight, updates offline feature store

CREATE TABLE daily_user_features AS
SELECT
user_id,
DATE(now()) as feature_date,

-- Transaction counts
COUNT(*) as txn_count_90d,
COUNT(CASE WHEN txn_date > now() - INTERVAL '30 days' THEN 1 END) as txn_count_30d,
COUNT(CASE WHEN txn_date > now() - INTERVAL '7 days' THEN 1 END) as txn_count_7d,

-- Amount aggregations
SUM(amount_usd) as total_amount_90d,
AVG(amount_usd) as avg_amount_90d,
STDDEV(amount_usd) as stddev_amount_90d,
MAX(amount_usd) as max_amount_90d,

-- Distinct counts
COUNT(DISTINCT card_token) as distinct_cards_90d,
COUNT(DISTINCT device_fingerprint) as distinct_devices_90d,
COUNT(DISTINCT service_id) as distinct_services_90d,

-- Chargeback metrics (joined from chargebacks table)
COALESCE(cb.chargeback_count_90d, 0) as chargeback_count_90d,
COALESCE(cb.chargeback_amount_90d, 0) as chargeback_amount_90d,
COALESCE(cb.chargeback_count_90d, 0) / NULLIF(COUNT(*), 0) as chargeback_rate_90d,

-- Refund metrics
COUNT(CASE WHEN event_type = 'REFUND' THEN 1 END) as refund_count_90d,
SUM(CASE WHEN event_type = 'REFUND' THEN amount_usd ELSE 0 END) as refund_amount_90d,

-- Account age
DATEDIFF(now(), MIN(txn_date)) as account_age_days

FROM transactions t
LEFT JOIN (
SELECT
user_id,
COUNT(*) as chargeback_count_90d,
SUM(amount) as chargeback_amount_90d
FROM chargebacks
WHERE chargeback_date > now() - INTERVAL '90 days'
GROUP BY user_id
) cb ON t.user_id = cb.user_id

WHERE txn_date > now() - INTERVAL '90 days'
GROUP BY user_id;

4. BIN / Issuer Intelligence Integration

4.1 BIN Data Schema

@dataclass
class BINData:
bin_6: str # First 6 digits
bin_8: Optional[str] # First 8 digits (extended BIN)

# Card attributes
card_brand: str # visa, mastercard, amex, discover
card_type: str # credit, debit, prepaid
card_category: str # classic, gold, platinum, business, corporate
card_product: str # Specific product name

# Issuer information
issuer_name: str # "Chase", "Bank of America"
issuer_country: str # ISO 3166-1 alpha-2
issuer_country_numeric: str # ISO 3166-1 numeric
issuer_phone: Optional[str]
issuer_url: Optional[str]

# Risk indicators
is_regulated: bool # Durbin-regulated (affects interchange)
is_reloadable: bool # Prepaid reloadable
is_commercial: bool # Business/corporate card
is_level2: bool # Supports Level 2 data
is_level3: bool # Supports Level 3 data

# Fraud intelligence
bin_fraud_rate_global: float # Global fraud rate for this BIN
bin_chargeback_rate: float # Chargeback rate
issuer_risk_tier: str # low, medium, high
issuer_dispute_friendliness: float # 0-1, how often issuer sides with service provider

# Metadata
last_updated: datetime
data_source: str # "binlist", "mastercard", "visa"

4.2 BIN Data Integration

class BINEnrichmentService:
"""Enrich transactions with BIN intelligence."""

def __init__(self):
# In-memory cache for hot BINs
self.local_cache = TTLCache(maxsize=100000, ttl=3600)

# Redis for distributed cache
self.redis_prefix = "bin:data:"

# Daily refresh from data providers
self.data_sources = [
BINListProvider(), # Open source
MastercardBINDB(), # Licensed
VisaBINTable(), # Licensed
]

async def get_bin_data(self, bin_6: str) -> BINData:
# 1. Check local cache
if bin_6 in self.local_cache:
return self.local_cache[bin_6]

# 2. Check Redis
cached = await self.redis.get(f"{self.redis_prefix}{bin_6}")
if cached:
data = BINData.from_json(cached)
self.local_cache[bin_6] = data
return data

# 3. Lookup from primary source (should be pre-loaded)
data = await self.lookup_bin(bin_6)

# 4. Cache result
await self.redis.setex(
f"{self.redis_prefix}{bin_6}",
86400, # 24 hour TTL
data.to_json()
)
self.local_cache[bin_6] = data

return data

def enrich_event(self, event: PaymentEvent, bin_data: BINData) -> PaymentEvent:
"""Add BIN-derived features to event."""
event.bin_card_type = bin_data.card_type
event.bin_card_category = bin_data.card_category
event.bin_issuer_country = bin_data.issuer_country
event.bin_issuer_name = bin_data.issuer_name
event.bin_is_prepaid = bin_data.card_type == "prepaid"
event.bin_is_commercial = bin_data.is_commercial
event.bin_fraud_rate = bin_data.bin_fraud_rate_global
event.bin_issuer_risk_tier = bin_data.issuer_risk_tier

# Derived features
event.is_cross_border = (
bin_data.issuer_country != event.service_country
)
event.issuer_service_country_match = (
bin_data.issuer_country == event.billing_country
)

return event

4.3 Daily BIN Data Refresh

async def refresh_bin_database():
"""Daily batch job to update BIN intelligence."""

# 1. Download latest BIN files from providers
for provider in data_sources:
bin_updates = await provider.download_latest()

for bin_record in bin_updates:
# 2. Merge with existing data
existing = await db.get_bin(bin_record.bin_6)

if existing:
merged = merge_bin_data(existing, bin_record)
else:
merged = bin_record

# 3. Update database
await db.upsert_bin(merged)

# 4. Invalidate cache
await redis.delete(f"bin:data:{bin_record.bin_6}")

# 5. Compute aggregate statistics
await compute_bin_fraud_rates()
await compute_issuer_risk_tiers()

logger.info(f"BIN database refreshed: {len(bin_updates)} records updated")

5. PII & Compliance Boundaries

5.1 Data Classification

Data ElementClassificationStorage LocationEncryptionRetention
Raw PANPCI-DSS ScopeNEVER in fraud platformN/AN/A
Card TokenPCI TokenRedis, PostgreSQLAt-rest7 years
BIN (6-8 digits)Non-PCIAll systemsAt-restIndefinite
Last 4Non-PCIAll systemsAt-rest7 years
EmailPIIHashed onlySHA2567 years
PhonePIIHashed onlySHA2567 years
IP AddressPIIRaw in secure zone, hashed elsewhereVaries90 days raw, 7 years hashed
Device FingerprintPseudonymousAll systemsAt-rest90 days
Billing AddressPIIEvidence vault onlyAt-rest + field-level7 years
User IDInternal IDAll systemsNoneAccount lifetime

5.2 Service-Level Access Matrix

ServiceCard TokenBINIP (Raw)IP (Hashed)Email HashBilling Address
Ingestion APIReadReadReadWriteReadRead
Flink StreamingReadReadNoReadReadNo
Feature StoreReadReadNoReadReadNo
Model ServiceReadReadNoReadReadNo
Policy EngineReadReadNoReadReadNo
Evidence VaultReadReadReadReadReadRead
Analytics/BINoReadNoReadReadNo
Training PipelineReadReadNoReadReadNo

5.3 Tokenization Flow

┌─────────────────────────────────────────────────────────────────────┐
│ PCI TOKENIZATION BOUNDARY │
└─────────────────────────────────────────────────────────────────────┘

PCI SCOPE │ NON-PCI SCOPE

┌─────────────┐ ┌─────────────┐ │ ┌─────────────────┐
│ Customer │───▶│ Payment │ │ │ Fraud Platform │
│ Browser │ │ Gateway │ │ │ │
└─────────────┘ │ (Stripe) │ │ │ - Sees token │
│ │ │ │ - Sees BIN │
│ Raw PAN │ │ │ - Never raw PAN│
│ lives here │ │ │ │
└──────┬──────┘ │ └────────▲────────┘
│ │ │
│ Webhook with │ │
│ token + BIN │ │
▼ │ │
┌─────────────┐ │ │
│ PSP Token │─────────────────┼────────────┘
│ Service │ │
│ │ │
│ pm_xxx123 │ │
│ BIN: 424242 │ │
└─────────────┘ │

5.4 Log Masking Rules

class PIIMasker:
"""Mask PII in logs and analytics exports."""

MASKING_RULES = {
"card_token": lambda x: f"{x[:8]}...{x[-4:]}" if len(x) > 12 else "***",
"email": lambda x: hashlib.sha256(x.lower().encode()).hexdigest()[:16],
"phone": lambda x: hashlib.sha256(x.encode()).hexdigest()[:16],
"ip_address": lambda x: self._mask_ip(x),
"billing_address.line1": lambda x: "***REDACTED***",
"billing_address.line2": lambda x: "***REDACTED***",
"device_fingerprint": lambda x: x[:16] + "..." if len(x) > 16 else x,
}

def _mask_ip(self, ip: str) -> str:
"""Mask last octet of IPv4, last 80 bits of IPv6."""
if ":" in ip: # IPv6
parts = ip.split(":")
return ":".join(parts[:3]) + "::XXXX"
else: # IPv4
parts = ip.split(".")
return ".".join(parts[:3]) + ".XXX"

def mask_for_logging(self, event: dict) -> dict:
"""Apply masking rules for application logs."""
masked = copy.deepcopy(event)

for field, mask_fn in self.MASKING_RULES.items():
if "." in field:
# Nested field
parts = field.split(".")
obj = masked
for part in parts[:-1]:
obj = obj.get(part, {})
if parts[-1] in obj:
obj[parts[-1]] = mask_fn(obj[parts[-1]])
elif field in masked:
masked[field] = mask_fn(masked[field])

return masked

def mask_for_analytics(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply masking for analytics/BI exports."""
masked_df = df.copy()

# Remove raw PII columns entirely
drop_columns = ["ip_address", "billing_address", "shipping_address"]
masked_df = masked_df.drop(columns=drop_columns, errors="ignore")

# Hash email/phone if present
if "email" in masked_df.columns:
masked_df["email_hash"] = masked_df["email"].apply(
lambda x: hashlib.sha256(x.lower().encode()).hexdigest()
)
masked_df = masked_df.drop(columns=["email"])

return masked_df

6. Dispute Network Integration

6.1 Alert Sources

SourceFormatFrequencyLatencyContent
TC40 (Visa)ISO 8583 / CSVDaily batch1-7 daysConfirmed fraud reports
SAFE (Mastercard)CSV / APIDaily batch1-7 daysConfirmed fraud reports
Ethoca/VerifiReal-time APIReal-time<1 hourConsumer dispute alerts
PSP WebhooksJSONReal-time<1 minuteDispute lifecycle events
RDR (Visa)APIReal-time<1 hourRapid Dispute Resolution
Mastercard CollaborationAPIReal-time<1 hourDispute pre-notification

6.2 Alert Ingestion Pipeline

class DisputeAlertIngester:
"""Ingest and normalize dispute alerts from multiple sources."""

async def process_tc40_file(self, file_path: str):
"""Process daily TC40 fraud report from Visa."""

records = parse_tc40_file(file_path)

for record in records:
# 1. Map to internal transaction
auth_id = await self.lookup_transaction(
card_token=self.tokenize_pan(record["pan"]),
amount=record["transaction_amount"],
date=record["transaction_date"],
service_id=record["acquirer_id"]
)

if not auth_id:
logger.warning(f"TC40 alert for unknown transaction: {record['case_id']}")
await self.store_unmatched_alert(record)
continue

# 2. Create issuer alert event
alert = IssuerAlert(
alert_id=f"tc40_{record['case_id']}",
alert_type="TC40",
source="visa",
auth_id=auth_id,
card_token=self.tokenize_pan(record["pan"]),
fraud_amount=record["fraud_amount"],
reason_code=record["fraud_type"],
alert_date=record["report_date"],
received_at=datetime.utcnow()
)

# 3. Publish to Kafka for downstream processing
await self.kafka.produce("issuer-alerts", alert.to_json())

# 4. Update entity risk profiles
await self.update_card_fraud_flag(alert.card_token)
await self.update_device_fraud_flag(auth_id)

async def process_ethoca_alert(self, webhook: dict):
"""Process real-time Ethoca consumer dispute alert."""

# Ethoca alerts arrive before formal chargeback
# Gives service provider opportunity to refund proactively

auth_id = await self.lookup_transaction(
card_token=webhook["card_token"],
amount=webhook["amount"],
date=webhook["transaction_date"]
)

alert = DisputePreNotification(
alert_id=webhook["alert_id"],
source="ethoca",
auth_id=auth_id,
dispute_type=webhook["dispute_type"], # fraud, service, etc.
alert_date=datetime.utcnow(),
response_deadline=webhook["response_deadline"],
suggested_action=webhook["suggested_action"] # refund, accept, fight
)

# Publish for automated response or manual review
await self.kafka.produce("dispute-prenotifications", alert.to_json())

# If auto-refund policy applies, trigger refund
if await self.should_auto_refund(auth_id, alert):
await self.trigger_refund(auth_id, reason="ethoca_deflection")

6.3 Chargeback-to-Transaction Linking

async def link_chargeback_to_transaction(chargeback: ChargebackEvent) -> str:
"""Link incoming chargeback to original authorization."""

# Method 1: Direct ID match (if PSP provides)
if chargeback.original_auth_id:
return chargeback.original_auth_id

# Method 2: ARN (Acquirer Reference Number) lookup
if chargeback.arn:
auth_id = await db.query("""
SELECT auth_id FROM transactions
WHERE arn = :arn
""", arn=chargeback.arn)
if auth_id:
return auth_id

# Method 3: Fuzzy match on card + amount + date
candidates = await db.query("""
SELECT auth_id, amount, transaction_date, service_id
FROM transactions
WHERE card_token = :card_token
AND amount BETWEEN :amount * 0.99 AND :amount * 1.01
AND transaction_date BETWEEN :date - INTERVAL '7 days' AND :date + INTERVAL '1 day'
ORDER BY ABS(EXTRACT(EPOCH FROM (transaction_date - :date)))
LIMIT 5
""",
card_token=chargeback.card_token,
amount=chargeback.amount,
date=chargeback.original_transaction_date
)

if len(candidates) == 1:
return candidates[0]["auth_id"]

if len(candidates) > 1:
# Multiple candidates - add to manual review
await create_manual_linking_task(chargeback, candidates)
return None

# No match found
logger.error(f"Could not link chargeback {chargeback.chargeback_id}")
await store_unlinked_chargeback(chargeback)
return None

Next: Part 3: Detection Logic & Policy Engine