AI Security & Privacy Policies 2025: Comprehensive Protection Framework
Protect AI systems from adversarial attacks and data breaches using automated security policies. Covers privacy-preserving ML, threat detection, data protection, and secure AI deployment patterns.
What You'll Learn
📋 Prerequisites
- Understanding of AI/ML security threats and attack vectors.
- Experience with data privacy regulations (GDPR, CCPA, PIPEDA).
- Familiarity with cryptography and privacy-preserving techniques.
- Knowledge of secure software development and infrastructure security.
🎯 What You'll Learn
- How to implement automated security policies for AI/ML systems protection.
- Techniques for adversarial attack detection and defense in production environments.
- How to build privacy-preserving ML pipelines with differential privacy and secure computation.
- Patterns for data protection, anonymization, and secure AI model deployment.
- Strategies for compliance automation across privacy regulations and security frameworks.
🏷️ Topics Covered
💡 From Reactive Security to Proactive AI Protection
Traditional AI security relies on perimeter defenses and post-incident response. Modern AI security uses proactive threat detection, privacy-by-design principles, and automated security policies that protect against adversarial attacks, data breaches, and privacy violations throughout the ML lifecycle.
AI Security & Privacy Architecture: Defense-in-Depth Framework
Modern AI security requires layered protection covering data, model, and infrastructure security with automated threat detection and response.
1️⃣ Adversarial Attack Detection
Real-time detection and mitigation of adversarial inputs designed to fool or manipulate AI models.
2️⃣ Privacy-Preserving ML
Implement differential privacy, federated learning, and secure computation to protect sensitive data.
3️⃣ Data Protection Automation
Automated data classification, anonymization, and access control enforcement across ML pipelines.
How to Implement Adversarial Attack Detection and Defense
Build comprehensive adversarial defense systems that detect and mitigate attacks in real-time production environments.
Example: Multi-Layer Adversarial Defense System
This implementation provides real-time adversarial attack detection with automated defense mechanisms.
🛡️ Python: Comprehensive Adversarial Defense Framework
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from enum import Enum
import logging
from datetime import datetime
import cv2
from sklearn.ensemble import IsolationForest
from scipy import stats
import hashlib
class AttackType(Enum):
FGSM = "fgsm"
PGD = "pgd"
C_W = "carlini_wagner"
DEEPFOOL = "deepfool"
BACKDOOR = "backdoor"
DATA_POISONING = "data_poisoning"
MODEL_EXTRACTION = "model_extraction"
MEMBERSHIP_INFERENCE = "membership_inference"
@dataclass
class ThreatDetectionResult:
is_adversarial: bool
confidence: float
attack_type: Optional[AttackType]
threat_level: str # low, medium, high, critical
detection_methods: List[str]
recommended_action: str
metadata: Dict[str, Any]
class AdversarialDefenseSystem:
def __init__(self, model: nn.Module, config: Dict[str, Any]):
self.model = model
self.config = config
self.logger = logging.getLogger(__name__)
# Initialize defense components
self.statistical_detector = StatisticalAnomalyDetector()
self.input_transformer = InputTransformation()
self.ensemble_detector = EnsembleAdversarialDetector()
self.model_monitor = ModelBehaviorMonitor(model)
# Attack patterns database
self.known_attack_patterns = {}
self.detection_history = []
# Defense policies
self.defense_policies = self._load_defense_policies()
def detect_and_defend(self, input_data: torch.Tensor,
metadata: Optional[Dict] = None) -> ThreatDetectionResult:
"""
Comprehensive adversarial detection and defense pipeline
"""
detection_start = datetime.now()
# Stage 1: Pre-processing detection
preprocess_result = self._preprocess_detection(input_data, metadata)
# Stage 2: Statistical anomaly detection
statistical_result = self.statistical_detector.detect_anomaly(input_data)
# Stage 3: Ensemble adversarial detection
ensemble_result = self.ensemble_detector.detect_adversarial(input_data)
# Stage 4: Model behavior monitoring
behavior_result = self.model_monitor.analyze_prediction_behavior(input_data)
# Stage 5: Aggregate results and determine threat level
threat_result = self._aggregate_threat_assessment(
preprocess_result, statistical_result, ensemble_result, behavior_result
)
# Stage 6: Apply defense measures if threat detected
if threat_result.is_adversarial:
defended_input = self._apply_defense_measures(input_data, threat_result)
threat_result.metadata['defended_input'] = defended_input
# Log detection event
self._log_detection_event(threat_result, detection_start)
return threat_result
def _preprocess_detection(self, input_data: torch.Tensor,
metadata: Optional[Dict]) -> Dict[str, Any]:
"""First line of defense - preprocessing-based detection"""
# Check for suspicious metadata patterns
metadata_anomalies = []
if metadata:
# Check for unusual request patterns
if metadata.get('request_frequency', 0) > self.config['max_request_frequency']:
metadata_anomalies.append('high_frequency_requests')
# Check for suspicious user agents or sources
if metadata.get('user_agent') in self.config.get('blocked_user_agents', []):
metadata_anomalies.append('blocked_user_agent')
# Input format validation
format_anomalies = []
if len(input_data.shape) != len(self.config['expected_input_shape']):
format_anomalies.append('unexpected_input_dimensions')
# Pixel value distribution analysis (for image inputs)
if len(input_data.shape) >= 3: # Likely image data
pixel_stats = self._analyze_pixel_distribution(input_data)
if pixel_stats['unusual_distribution']:
format_anomalies.append('unusual_pixel_distribution')
return {
'metadata_anomalies': metadata_anomalies,
'format_anomalies': format_anomalies,
'suspicious_score': len(metadata_anomalies + format_anomalies) / 10.0
}
def _analyze_pixel_distribution(self, image_tensor: torch.Tensor) -> Dict[str, Any]:
"""Analyze pixel value distribution for anomalies"""
# Convert to numpy for analysis
image_np = image_tensor.detach().cpu().numpy()
# Flatten image
pixels = image_np.flatten()
# Statistical tests
mean_val = np.mean(pixels)
std_val = np.std(pixels)
skewness = stats.skew(pixels)
kurtosis = stats.kurtosis(pixels)
# Check for unusual distributions
unusual_distribution = False
# Extremely high or low variance might indicate adversarial perturbations
if std_val > 0.8 or std_val < 0.01:
unusual_distribution = True
# Check for unusual skewness (natural images typically have some skewness)
if abs(skewness) > 5.0:
unusual_distribution = True
# Check for extreme kurtosis
if abs(kurtosis) > 10.0:
unusual_distribution = True
return {
'mean': mean_val,
'std': std_val,
'skewness': skewness,
'kurtosis': kurtosis,
'unusual_distribution': unusual_distribution
}
class StatisticalAnomalyDetector:
def __init__(self):
self.isolation_forest = IsolationForest(
contamination=0.1,
random_state=42,
n_estimators=100
)
self.baseline_stats = None
self.is_trained = False
def train_baseline(self, clean_data: torch.Tensor):
"""Train detector on clean data to establish baseline"""
# Extract features for statistical analysis
features = self._extract_statistical_features(clean_data)
# Train isolation forest
self.isolation_forest.fit(features)
# Compute baseline statistics
self.baseline_stats = {
'mean': torch.mean(clean_data, dim=0),
'std': torch.std(clean_data, dim=0),
'quantiles': torch.quantile(clean_data, torch.tensor([0.01, 0.05, 0.95, 0.99]), dim=0)
}
self.is_trained = True
def detect_anomaly(self, input_data: torch.Tensor) -> Dict[str, Any]:
"""Detect statistical anomalies in input data"""
if not self.is_trained:
return {'anomaly_score': 0.0, 'is_anomaly': False}
# Extract features
features = self._extract_statistical_features(input_data.unsqueeze(0))
# Isolation forest prediction
anomaly_score = self.isolation_forest.decision_function(features)[0]
is_anomaly = self.isolation_forest.predict(features)[0] == -1
# Statistical deviation analysis
deviation_score = self._compute_statistical_deviation(input_data)
# Combine scores
combined_score = (abs(anomaly_score) + deviation_score) / 2.0
return {
'anomaly_score': combined_score,
'is_anomaly': is_anomaly or combined_score > 0.7,
'isolation_score': anomaly_score,
'deviation_score': deviation_score
}
def _extract_statistical_features(self, data: torch.Tensor) -> np.ndarray:
"""Extract statistical features for anomaly detection"""
features = []
# Basic statistics
features.extend([
torch.mean(data).item(),
torch.std(data).item(),
torch.min(data).item(),
torch.max(data).item()
])
# Distribution properties
data_flat = data.flatten()
features.extend([
torch.median(data_flat).item(),
torch.quantile(data_flat, 0.25).item(),
torch.quantile(data_flat, 0.75).item()
])
# Higher-order moments
mean_val = torch.mean(data_flat)
features.extend([
torch.mean((data_flat - mean_val) ** 3).item(), # Skewness proxy
torch.mean((data_flat - mean_val) ** 4).item() # Kurtosis proxy
])
return np.array(features).reshape(1, -1)
def _compute_statistical_deviation(self, input_data: torch.Tensor) -> float:
"""Compute deviation from baseline statistics"""
if self.baseline_stats is None:
return 0.0
# Compute deviations
mean_dev = torch.mean(torch.abs(input_data - self.baseline_stats['mean']))
std_dev = torch.abs(torch.std(input_data) - torch.mean(self.baseline_stats['std']))
# Quantile violations
q01, q05, q95, q99 = self.baseline_stats['quantiles']
quantile_violations = (
torch.sum(input_data < q01) + torch.sum(input_data > q99)
).float() / input_data.numel()
# Combine deviations
total_deviation = (mean_dev + std_dev + quantile_violations) / 3.0
return total_deviation.item()
class EnsembleAdversarialDetector:
def __init__(self):
self.detectors = {
'gradient_based': GradientBasedDetector(),
'feature_squeezing': FeatureSqueezingDetector(),
'local_intrinsic_dimensionality': LIDDetector(),
'mahalanobis': MahalanobisDetector()
}
self.weights = {
'gradient_based': 0.3,
'feature_squeezing': 0.2,
'local_intrinsic_dimensionality': 0.25,
'mahalanobis': 0.25
}
def detect_adversarial(self, input_data: torch.Tensor) -> Dict[str, Any]:
"""Ensemble adversarial detection using multiple methods"""
detector_results = {}
scores = []
for name, detector in self.detectors.items():
try:
result = detector.detect(input_data)
detector_results[name] = result
scores.append(result['score'] * self.weights[name])
except Exception as e:
logging.warning(f"Detector {name} failed: {e}")
detector_results[name] = {'score': 0.0, 'confidence': 0.0}
# Ensemble decision
ensemble_score = sum(scores)
ensemble_confidence = np.mean([r.get('confidence', 0.0) for r in detector_results.values()])
is_adversarial = ensemble_score > 0.6
return {
'ensemble_score': ensemble_score,
'ensemble_confidence': ensemble_confidence,
'is_adversarial': is_adversarial,
'detector_results': detector_results,
'primary_detection_method': max(detector_results.keys(),
key=lambda k: detector_results[k]['score'])
}
class InputTransformation:
"""Apply defensive transformations to potentially adversarial inputs"""
def __init__(self):
self.transformations = {
'gaussian_noise': self._add_gaussian_noise,
'median_filter': self._apply_median_filter,
'jpeg_compression': self._apply_jpeg_compression,
'bit_depth_reduction': self._reduce_bit_depth,
'random_rotation': self._apply_random_rotation
}
def apply_defensive_transformation(self, input_data: torch.Tensor,
transformation_type: str = 'adaptive') -> torch.Tensor:
"""Apply defensive transformations to mitigate adversarial effects"""
if transformation_type == 'adaptive':
# Apply multiple transformations and select best based on output stability
return self._adaptive_transformation(input_data)
elif transformation_type in self.transformations:
return self.transformations[transformation_type](input_data)
else:
return input_data
def _adaptive_transformation(self, input_data: torch.Tensor) -> torch.Tensor:
"""Adaptively select and apply transformations"""
transformed_versions = []
# Apply each transformation
for name, transform_func in self.transformations.items():
try:
transformed = transform_func(input_data.clone())
transformed_versions.append((name, transformed))
except Exception as e:
logging.warning(f"Transformation {name} failed: {e}")
# Simple strategy: return median-filtered version (often effective against adversarial examples)
for name, transformed in transformed_versions:
if name == 'median_filter':
return transformed
# Fallback to first successful transformation
return transformed_versions[0][1] if transformed_versions else input_data
def _add_gaussian_noise(self, input_data: torch.Tensor, std: float = 0.01) -> torch.Tensor:
"""Add Gaussian noise to input"""
noise = torch.randn_like(input_data) * std
return torch.clamp(input_data + noise, 0, 1)
def _apply_median_filter(self, input_data: torch.Tensor, kernel_size: int = 3) -> torch.Tensor:
"""Apply median filter (effective against many adversarial attacks)"""
if len(input_data.shape) == 4: # Batch of images
filtered = torch.zeros_like(input_data)
for i in range(input_data.shape[0]):
for c in range(input_data.shape[1]):
# Convert to numpy for OpenCV
img = input_data[i, c].detach().cpu().numpy()
img_uint8 = (img * 255).astype(np.uint8)
filtered_img = cv2.medianBlur(img_uint8, kernel_size)
filtered[i, c] = torch.from_numpy(filtered_img.astype(np.float32) / 255.0)
return filtered
return input_data
def _apply_jpeg_compression(self, input_data: torch.Tensor, quality: int = 75) -> torch.Tensor:
"""Apply JPEG compression (destroys high-frequency adversarial perturbations)"""
# Simplified JPEG compression simulation
# In practice, you'd use actual JPEG encoding/decoding
# Apply DCT-like low-pass filtering
if len(input_data.shape) >= 3:
compressed = F.avg_pool2d(input_data, kernel_size=2, stride=1, padding=1)
compressed = F.interpolate(compressed, size=input_data.shape[-2:], mode='bilinear')
return compressed
return input_data
# Example usage and integration
class SecureAIModelWrapper:
"""Wrapper that adds security protections to any PyTorch model"""
def __init__(self, model: nn.Module, security_config: Dict[str, Any]):
self.model = model
self.defense_system = AdversarialDefenseSystem(model, security_config)
self.input_transformer = InputTransformation()
# Security policies
self.security_policies = security_config.get('security_policies', {})
self.threat_threshold = security_config.get('threat_threshold', 0.7)
# Audit logging
self.audit_logger = logging.getLogger('ai_security_audit')
def secure_predict(self, input_data: torch.Tensor,
metadata: Optional[Dict] = None) -> Dict[str, Any]:
"""Make predictions with comprehensive security protection"""
prediction_start = datetime.now()
# Step 1: Threat detection and assessment
threat_result = self.defense_system.detect_and_defend(input_data, metadata)
# Step 2: Apply security policies based on threat level
if threat_result.is_adversarial:
if threat_result.threat_level == 'critical':
# Block request entirely
self._log_security_event('request_blocked', threat_result, metadata)
return {
'status': 'blocked',
'reason': 'Critical security threat detected',
'threat_details': threat_result
}
elif threat_result.threat_level in ['high', 'medium']:
# Apply defensive transformations
input_data = self.input_transformer.apply_defensive_transformation(
input_data, 'adaptive'
)
self._log_security_event('input_transformed', threat_result, metadata)
# Step 3: Make prediction with protected input
with torch.no_grad():
prediction = self.model(input_data)
# Step 4: Post-prediction security checks
prediction_confidence = F.softmax(prediction, dim=-1).max().item()
# Check for model extraction attempts (repeated low-confidence predictions)
if prediction_confidence < 0.3:
self._check_model_extraction_attempt(metadata)
# Step 5: Prepare secure response
response = {
'status': 'success',
'prediction': prediction.detach().cpu(),
'confidence': prediction_confidence,
'security_metadata': {
'threat_detected': threat_result.is_adversarial,
'threat_level': threat_result.threat_level,
'defenses_applied': threat_result.recommended_action if threat_result.is_adversarial else 'none',
'processing_time': (datetime.now() - prediction_start).total_seconds()
}
}
# Log successful prediction
self._log_security_event('prediction_completed', threat_result, metadata, response)
return response
def _log_security_event(self, event_type: str, threat_result: ThreatDetectionResult,
metadata: Optional[Dict], response: Optional[Dict] = None):
"""Log security events for audit and monitoring"""
security_event = {
'timestamp': datetime.now().isoformat(),
'event_type': event_type,
'threat_detected': threat_result.is_adversarial,
'threat_level': threat_result.threat_level,
'attack_type': threat_result.attack_type.value if threat_result.attack_type else None,
'detection_confidence': threat_result.confidence,
'detection_methods': threat_result.detection_methods,
'recommended_action': threat_result.recommended_action,
'client_metadata': metadata,
'response_status': response.get('status') if response else None
}
self.audit_logger.info(f"AI_SECURITY_EVENT: {security_event}")
# Example deployment configuration
def create_secure_model_deployment():
"""Example of deploying a model with comprehensive security"""
# Load your model
model = torch.load('your_model.pth')
# Security configuration
security_config = {
'expected_input_shape': (3, 224, 224),
'max_request_frequency': 100, # requests per minute
'blocked_user_agents': ['suspicious_agent'],
'threat_threshold': 0.7,
'security_policies': {
'enable_input_transformation': True,
'enable_rate_limiting': True,
'enable_audit_logging': True,
'block_critical_threats': True
}
}
# Create secure wrapper
secure_model = SecureAIModelWrapper(model, security_config)
return secure_model
if __name__ == "__main__":
# Example usage
secure_model = create_secure_model_deployment()
# Test with sample input
sample_input = torch.randn(1, 3, 224, 224)
metadata = {'user_id': 'test_user', 'request_id': 'req_123'}
result = secure_model.secure_predict(sample_input, metadata)
print(f"Prediction result: {result['status']}")
print(f"Security status: {result['security_metadata']}")Example: Real-time Security Monitoring Dashboard
Implement comprehensive security monitoring with automated threat response and compliance tracking.
📊 Python: AI Security Monitoring Platform
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from typing import Dict, List, Optional
import asyncio
import redis
from datetime import datetime, timedelta
import json
from collections import defaultdict, deque
import numpy as np
app = FastAPI(title="AI Security Monitoring Platform", version="1.0.0")
class SecurityAlert(BaseModel):
alert_id: str
model_id: str
alert_type: str # adversarial_attack, data_breach, privacy_violation, etc.
severity: str # low, medium, high, critical
description: str
metadata: Dict[str, any]
timestamp: datetime = Field(default_factory=datetime.now)
status: str = "active" # active, investigating, resolved
class AISecurityMonitor:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.active_alerts = {}
self.security_metrics = defaultdict(lambda: defaultdict(int))
self.threat_patterns = {}
# Real-time monitoring windows
self.monitoring_windows = {
'1min': deque(maxlen=60),
'5min': deque(maxlen=300),
'1hour': deque(maxlen=3600),
'24hour': deque(maxlen=86400)
}
async def process_security_event(self, event: Dict[str, any]) -> Optional[SecurityAlert]:
"""Process incoming security events and generate alerts if needed"""
event_type = event.get('event_type')
model_id = event.get('model_id', 'unknown')
timestamp = datetime.fromisoformat(event.get('timestamp', datetime.now().isoformat()))
# Add event to monitoring windows
self._add_to_monitoring_windows(event)
# Update security metrics
self._update_security_metrics(event)
# Analyze for threat patterns
alert = await self._analyze_threat_patterns(event)
if alert:
# Store alert
self.active_alerts[alert.alert_id] = alert
# Trigger automated response
await self._trigger_automated_response(alert)
# Notify security team
await self._notify_security_team(alert)
return alert
def _add_to_monitoring_windows(self, event: Dict[str, any]):
"""Add event to time-based monitoring windows"""
for window in self.monitoring_windows.values():
window.append(event)
def _update_security_metrics(self, event: Dict[str, any]):
"""Update real-time security metrics"""
model_id = event.get('model_id', 'unknown')
event_type = event.get('event_type')
# Update counters
self.security_metrics[model_id]['total_events'] += 1
self.security_metrics[model_id][event_type] += 1
# Update threat-specific metrics
if event.get('threat_detected'):
self.security_metrics[model_id]['threats_detected'] += 1
threat_level = event.get('threat_level', 'unknown')
self.security_metrics[model_id][f'threats_{threat_level}'] += 1
async def _analyze_threat_patterns(self, event: Dict[str, any]) -> Optional[SecurityAlert]:
"""Analyze event patterns to detect sophisticated attacks"""
model_id = event.get('model_id')
# Pattern 1: High frequency adversarial attempts
recent_threats = [
e for e in self.monitoring_windows['5min']
if e.get('model_id') == model_id and e.get('threat_detected')
]
if len(recent_threats) > 10: # More than 10 threats in 5 minutes
return SecurityAlert(
alert_id=f"alert_{datetime.now().timestamp()}",
model_id=model_id,
alert_type="high_frequency_attack",
severity="high",
description=f"High frequency adversarial attacks detected: {len(recent_threats)} threats in 5 minutes",
metadata={"threat_count": len(recent_threats), "window": "5min"}
)
# Pattern 2: Model extraction attempt detection
recent_low_confidence = [
e for e in self.monitoring_windows['1hour']
if (e.get('model_id') == model_id and
e.get('prediction_confidence', 1.0) < 0.3)
]
if len(recent_low_confidence) > 100: # Many low-confidence predictions
return SecurityAlert(
alert_id=f"alert_{datetime.now().timestamp()}",
model_id=model_id,
alert_type="model_extraction_attempt",
severity="medium",
description="Potential model extraction attack detected",
metadata={"low_confidence_count": len(recent_low_confidence)}
)
# Pattern 3: Data poisoning indicators
if event.get('event_type') == 'training_data_anomaly':
anomaly_score = event.get('anomaly_score', 0)
if anomaly_score > 0.8:
return SecurityAlert(
alert_id=f"alert_{datetime.now().timestamp()}",
model_id=model_id,
alert_type="data_poisoning_attempt",
severity="critical",
description="Potential data poisoning detected in training pipeline",
metadata={"anomaly_score": anomaly_score}
)
# Pattern 4: Privacy violation detection
if event.get('event_type') == 'privacy_check':
pii_detected = event.get('pii_detected', False)
consent_violation = event.get('consent_violation', False)
if pii_detected or consent_violation:
return SecurityAlert(
alert_id=f"alert_{datetime.now().timestamp()}",
model_id=model_id,
alert_type="privacy_violation",
severity="high",
description="Privacy violation detected",
metadata={
"pii_detected": pii_detected,
"consent_violation": consent_violation
}
)
return None
async def _trigger_automated_response(self, alert: SecurityAlert):
"""Trigger automated security responses based on alert type and severity"""
if alert.severity == "critical":
# Immediate model isolation
await self._isolate_model(alert.model_id)
elif alert.alert_type == "high_frequency_attack":
# Rate limiting and IP blocking
await self._apply_rate_limiting(alert.model_id)
elif alert.alert_type == "model_extraction_attempt":
# Add noise to responses and implement query limiting
await self._enable_anti_extraction_measures(alert.model_id)
# Log automated response
response_log = {
"alert_id": alert.alert_id,
"automated_response": True,
"actions_taken": await self._get_response_actions(alert),
"timestamp": datetime.now().isoformat()
}
self.redis_client.lpush("security_responses", json.dumps(response_log))
async def get_security_dashboard_data(self, time_window: str = "1hour") -> Dict[str, any]:
"""Generate security dashboard data"""
if time_window not in self.monitoring_windows:
time_window = "1hour"
events = list(self.monitoring_windows[time_window])
# Calculate metrics
total_events = len(events)
threat_events = [e for e in events if e.get('threat_detected')]
blocked_requests = [e for e in events if e.get('status') == 'blocked']
# Threat breakdown by type
threat_types = defaultdict(int)
for event in threat_events:
attack_type = event.get('attack_type', 'unknown')
threat_types[attack_type] += 1
# Model-wise security status
model_security_status = {}
for model_id in set(e.get('model_id') for e in events if e.get('model_id')):
model_events = [e for e in events if e.get('model_id') == model_id]
model_threats = [e for e in model_events if e.get('threat_detected')]
model_security_status[model_id] = {
"total_requests": len(model_events),
"threats_detected": len(model_threats),
"threat_rate": len(model_threats) / max(len(model_events), 1),
"last_threat": max([e.get('timestamp') for e in model_threats], default=None),
"security_score": self._calculate_security_score(model_events)
}
# Active alerts summary
active_alerts_summary = {
"total": len(self.active_alerts),
"critical": len([a for a in self.active_alerts.values() if a.severity == "critical"]),
"high": len([a for a in self.active_alerts.values() if a.severity == "high"]),
"medium": len([a for a in self.active_alerts.values() if a.severity == "medium"])
}
return {
"time_window": time_window,
"summary": {
"total_events": total_events,
"threats_detected": len(threat_events),
"requests_blocked": len(blocked_requests),
"threat_rate": len(threat_events) / max(total_events, 1),
"avg_response_time": np.mean([e.get('processing_time', 0) for e in events])
},
"threat_breakdown": dict(threat_types),
"model_security_status": model_security_status,
"active_alerts": active_alerts_summary,
"top_threats": sorted(
threat_types.items(),
key=lambda x: x[1],
reverse=True
)[:5]
}
def _calculate_security_score(self, events: List[Dict]) -> float:
"""Calculate security score for a model (0-100)"""
if not events:
return 100.0
threats = [e for e in events if e.get('threat_detected')]
blocked = [e for e in events if e.get('status') == 'blocked']
# Base score
threat_rate = len(threats) / len(events)
block_rate = len(blocked) / len(events)
# Security score calculation
score = 100 * (1 - threat_rate - block_rate * 0.5)
# Adjust for severity
critical_threats = [e for e in threats if e.get('threat_level') == 'critical']
if critical_threats:
score -= len(critical_threats) * 10
return max(0.0, min(100.0, score))
# FastAPI endpoints
security_monitor = AISecurityMonitor()
@app.post("/api/v1/security/events")
async def report_security_event(event: Dict[str, any], background_tasks: BackgroundTasks):
"""Report a security event for processing"""
alert = await security_monitor.process_security_event(event)
if alert:
background_tasks.add_task(handle_security_alert, alert)
return {"status": "alert_generated", "alert_id": alert.alert_id}
return {"status": "event_processed"}
@app.get("/api/v1/security/dashboard")
async def get_security_dashboard(time_window: str = "1hour"):
"""Get security dashboard data"""
dashboard_data = await security_monitor.get_security_dashboard_data(time_window)
return dashboard_data
@app.get("/api/v1/security/alerts")
async def get_active_alerts():
"""Get active security alerts"""
return {
"alerts": list(security_monitor.active_alerts.values()),
"count": len(security_monitor.active_alerts)
}
@app.post("/api/v1/security/alerts/{alert_id}/resolve")
async def resolve_alert(alert_id: str, resolution_notes: str):
"""Resolve a security alert"""
if alert_id in security_monitor.active_alerts:
alert = security_monitor.active_alerts[alert_id]
alert.status = "resolved"
alert.metadata["resolution_notes"] = resolution_notes
alert.metadata["resolved_at"] = datetime.now().isoformat()
return {"status": "resolved"}
raise HTTPException(status_code=404, detail="Alert not found")
async def handle_security_alert(alert: SecurityAlert):
"""Background task to handle security alerts"""
# This would integrate with your incident response system
print(f"Handling security alert: {alert.alert_type} - {alert.severity}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)Privacy-Preserving Machine Learning: Differential Privacy and Secure Computation
Implement privacy-preserving techniques that enable AI training and inference while protecting sensitive data.
Differential Privacy
Add calibrated noise to training data and model outputs to provide mathematically rigorous privacy guarantees while maintaining model utility.
Federated Learning Security
Implement secure aggregation protocols and privacy-preserving techniques for distributed model training across multiple data holders.
Homomorphic Encryption
Enable computation on encrypted data for scenarios requiring maximum privacy protection, such as healthcare and financial services.
Example: Differential Privacy Implementation
🔐 Python: Production Differential Privacy Framework
import numpy as np
import torch
import torch.nn as nn
from typing import Dict, List, Tuple, Optional, Callable
from dataclasses import dataclass
import math
from abc import ABC, abstractmethod
import logging
@dataclass
class PrivacyBudget:
epsilon: float # Privacy parameter (lower = more private)
delta: float # Failure probability (typically very small)
spent_epsilon: float = 0.0
max_queries: Optional[int] = None
query_count: int = 0
class DifferentialPrivacyMechanism(ABC):
"""Abstract base class for DP mechanisms"""
@abstractmethod
def add_noise(self, value: float, sensitivity: float, privacy_budget: PrivacyBudget) -> float:
pass
@abstractmethod
def get_privacy_cost(self, sensitivity: float) -> float:
pass
class LaplaceMechanism(DifferentialPrivacyMechanism):
"""Laplace mechanism for differential privacy"""
def add_noise(self, value: float, sensitivity: float, privacy_budget: PrivacyBudget) -> float:
"""Add Laplace noise for epsilon-differential privacy"""
scale = sensitivity / privacy_budget.epsilon
noise = np.random.laplace(0, scale)
return value + noise
def get_privacy_cost(self, sensitivity: float) -> float:
"""Calculate privacy cost for this query"""
return sensitivity # Simplified - actual cost depends on epsilon allocation
class GaussianMechanism(DifferentialPrivacyMechanism):
"""Gaussian mechanism for (epsilon, delta)-differential privacy"""
def add_noise(self, value: float, sensitivity: float, privacy_budget: PrivacyBudget) -> float:
"""Add Gaussian noise for (epsilon, delta)-differential privacy"""
if privacy_budget.delta <= 0:
raise ValueError("Gaussian mechanism requires delta > 0")
# Calculate sigma based on epsilon and delta
sigma = self._calculate_sigma(sensitivity, privacy_budget.epsilon, privacy_budget.delta)
noise = np.random.normal(0, sigma)
return value + noise
def _calculate_sigma(self, sensitivity: float, epsilon: float, delta: float) -> float:
"""Calculate appropriate sigma for Gaussian noise"""
# Simplified calculation - in practice, use more sophisticated methods
return sensitivity * math.sqrt(2 * math.log(1.25 / delta)) / epsilon
def get_privacy_cost(self, sensitivity: float) -> float:
return sensitivity
class PrivacyAccountant:
"""Track and manage privacy budget consumption"""
def __init__(self, initial_budget: PrivacyBudget):
self.budget = initial_budget
self.query_log = []
self.logger = logging.getLogger(__name__)
def can_answer_query(self, required_epsilon: float) -> bool:
"""Check if there's enough privacy budget for a query"""
return (self.budget.spent_epsilon + required_epsilon <= self.budget.epsilon and
(self.budget.max_queries is None or
self.budget.query_count < self.budget.max_queries))
def consume_budget(self, epsilon_cost: float, query_info: Dict) -> bool:
"""Consume privacy budget for a query"""
if not self.can_answer_query(epsilon_cost):
self.logger.warning(f"Insufficient privacy budget. Required: {epsilon_cost}, "
f"Available: {self.budget.epsilon - self.budget.spent_epsilon}")
return False
self.budget.spent_epsilon += epsilon_cost
self.budget.query_count += 1
# Log query for audit
query_record = {
'timestamp': np.datetime64('now'),
'epsilon_cost': epsilon_cost,
'total_spent': self.budget.spent_epsilon,
'query_info': query_info
}
self.query_log.append(query_record)
return True
def get_remaining_budget(self) -> float:
"""Get remaining privacy budget"""
return self.budget.epsilon - self.budget.spent_epsilon
class PrivateDataAnalyzer:
"""Differentially private data analysis operations"""
def __init__(self, privacy_budget: PrivacyBudget,
mechanism: DifferentialPrivacyMechanism = None):
self.accountant = PrivacyAccountant(privacy_budget)
self.mechanism = mechanism or LaplaceMechanism()
self.logger = logging.getLogger(__name__)
def private_count(self, data: np.ndarray, condition: Callable = None) -> Optional[float]:
"""Compute differentially private count"""
sensitivity = 1.0 # Adding/removing one record changes count by at most 1
if not self.accountant.can_answer_query(sensitivity):
return None
# Compute true count
if condition is None:
true_count = len(data)
else:
true_count = np.sum([condition(x) for x in data])
# Add noise
noisy_count = self.mechanism.add_noise(
true_count, sensitivity, self.accountant.budget
)
# Consume budget
self.accountant.consume_budget(sensitivity, {'operation': 'count'})
return max(0, noisy_count) # Counts can't be negative
def private_mean(self, data: np.ndarray, value_range: Tuple[float, float]) -> Optional[float]:
"""Compute differentially private mean"""
min_val, max_val = value_range
sensitivity = (max_val - min_val) / len(data) # Sensitivity for mean
if not self.accountant.can_answer_query(sensitivity):
return None
# Compute true mean
true_mean = np.mean(data)
# Add noise
noisy_mean = self.mechanism.add_noise(
true_mean, sensitivity, self.accountant.budget
)
# Consume budget
self.accountant.consume_budget(sensitivity, {'operation': 'mean'})
return np.clip(noisy_mean, min_val, max_val)
def private_histogram(self, data: np.ndarray, bins: int) -> Optional[np.ndarray]:
"""Compute differentially private histogram"""
sensitivity = 1.0 # Adding/removing one record changes one bin by 1
if not self.accountant.can_answer_query(sensitivity * bins):
return None
# Compute true histogram
hist, _ = np.histogram(data, bins=bins)
# Add noise to each bin
noisy_hist = np.array([
self.mechanism.add_noise(count, sensitivity, self.accountant.budget)
for count in hist
])
# Consume budget (composition over all bins)
self.accountant.consume_budget(
sensitivity * bins,
{'operation': 'histogram', 'bins': bins}
)
return np.maximum(0, noisy_hist) # Counts can't be negative
class DifferentiallyPrivateMLTraining:
"""Differentially private machine learning training"""
def __init__(self, privacy_budget: PrivacyBudget,
noise_multiplier: float = 1.0,
max_grad_norm: float = 1.0):
self.privacy_budget = privacy_budget
self.noise_multiplier = noise_multiplier
self.max_grad_norm = max_grad_norm
self.accountant = PrivacyAccountant(privacy_budget)
def private_training_step(self, model: nn.Module,
batch_data: torch.Tensor,
batch_labels: torch.Tensor,
loss_fn: nn.Module) -> bool:
"""Perform one step of differentially private training"""
batch_size = batch_data.shape[0]
# Check privacy budget
privacy_cost = self._calculate_step_privacy_cost(batch_size)
if not self.accountant.can_answer_query(privacy_cost):
return False
# Compute per-example gradients
per_example_grads = self._compute_per_example_gradients(
model, batch_data, batch_labels, loss_fn
)
# Clip gradients
clipped_grads = self._clip_gradients(per_example_grads)
# Add noise to gradients
noisy_grads = self._add_gradient_noise(clipped_grads, batch_size)
# Apply noisy gradients
self._apply_gradients(model, noisy_grads)
# Consume privacy budget
self.accountant.consume_budget(
privacy_cost,
{'operation': 'training_step', 'batch_size': batch_size}
)
return True
def _compute_per_example_gradients(self, model: nn.Module,
batch_data: torch.Tensor,
batch_labels: torch.Tensor,
loss_fn: nn.Module) -> List[Dict[str, torch.Tensor]]:
"""Compute gradients for each example in the batch"""
per_example_grads = []
for i in range(batch_data.shape[0]):
# Forward pass for single example
model.zero_grad()
output = model(batch_data[i:i+1])
loss = loss_fn(output, batch_labels[i:i+1])
# Backward pass
loss.backward()
# Collect gradients
example_grads = {}
for name, param in model.named_parameters():
if param.grad is not None:
example_grads[name] = param.grad.clone()
per_example_grads.append(example_grads)
return per_example_grads
def _clip_gradients(self, per_example_grads: List[Dict[str, torch.Tensor]]) -> List[Dict[str, torch.Tensor]]:
"""Clip per-example gradients to bound sensitivity"""
clipped_grads = []
for example_grads in per_example_grads:
# Compute L2 norm of gradients
grad_norm = 0.0
for grad in example_grads.values():
grad_norm += torch.sum(grad ** 2)
grad_norm = torch.sqrt(grad_norm)
# Clip if necessary
clip_factor = min(1.0, self.max_grad_norm / (grad_norm + 1e-8))
clipped_example_grads = {}
for name, grad in example_grads.items():
clipped_example_grads[name] = grad * clip_factor
clipped_grads.append(clipped_example_grads)
return clipped_grads
def _add_gradient_noise(self, clipped_grads: List[Dict[str, torch.Tensor]],
batch_size: int) -> Dict[str, torch.Tensor]:
"""Add noise to aggregated gradients"""
# Aggregate clipped gradients
aggregated_grads = {}
for name in clipped_grads[0].keys():
aggregated_grads[name] = torch.stack([
grads[name] for grads in clipped_grads
]).sum(dim=0)
# Add noise
noise_scale = self.noise_multiplier * self.max_grad_norm / batch_size
noisy_grads = {}
for name, grad in aggregated_grads.items():
noise = torch.randn_like(grad) * noise_scale
noisy_grads[name] = grad + noise
return noisy_grads
def _apply_gradients(self, model: nn.Module,
noisy_grads: Dict[str, torch.Tensor]):
"""Apply noisy gradients to model parameters"""
with torch.no_grad():
for name, param in model.named_parameters():
if name in noisy_grads:
param.grad = noisy_grads[name]
def _calculate_step_privacy_cost(self, batch_size: int) -> float:
"""Calculate privacy cost for one training step"""
# This is a simplified calculation - use more sophisticated
# privacy accounting in practice (e.g., Renyi DP, moments accountant)
return self.noise_multiplier * math.sqrt(batch_size) / (batch_size * self.privacy_budget.epsilon)
# Example usage
def create_private_ml_pipeline():
"""Example of creating a differentially private ML pipeline"""
# Set up privacy budget
privacy_budget = PrivacyBudget(
epsilon=1.0, # Privacy parameter
delta=1e-5 # Failure probability
)
# Create private data analyzer
analyzer = PrivateDataAnalyzer(privacy_budget)
# Example: Private data analysis
data = np.random.normal(50, 15, 1000) # Simulated data
# Private statistics
private_count = analyzer.private_count(data)
private_mean = analyzer.private_mean(data, value_range=(0, 100))
print(f"Private count: {private_count}")
print(f"Private mean: {private_mean}")
print(f"Remaining budget: {analyzer.accountant.get_remaining_budget()}")
# For ML training
training_budget = PrivacyBudget(epsilon=10.0, delta=1e-5)
private_trainer = DifferentiallyPrivateMLTraining(training_budget)
return analyzer, private_trainer
if __name__ == "__main__":
analyzer, trainer = create_private_ml_pipeline()Automated Data Protection and Compliance Framework
Implement comprehensive data protection automation that ensures compliance with privacy regulations throughout the AI lifecycle.
Example: Automated Privacy Compliance System
🔒 YAML: Kubernetes Privacy Protection Pipeline
# privacy-protection-pipeline.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: ai-privacy-protection-pipeline
namespace: ai-governance
spec:
entrypoint: privacy-protection-workflow
templates:
- name: privacy-protection-workflow
steps:
- - name: data-classification
template: classify-data-sensitivity
- - name: pii-detection
template: detect-and-classify-pii
arguments:
artifacts:
- name: dataset
from: "{{steps.data-classification.outputs.artifacts.classified-data}}"
- - name: anonymization
template: apply-anonymization
arguments:
artifacts:
- name: pii-report
from: "{{steps.pii-detection.outputs.artifacts.pii-report}}"
- name: dataset
from: "{{steps.data-classification.outputs.artifacts.classified-data}}"
- - name: consent-validation
template: validate-consent
arguments:
artifacts:
- name: anonymized-data
from: "{{steps.anonymization.outputs.artifacts.anonymized-data}}"
- - name: compliance-check
template: multi-regulation-compliance-check
arguments:
artifacts:
- name: protected-data
from: "{{steps.consent-validation.outputs.artifacts.consent-validated-data}}"
- name: classify-data-sensitivity
container:
image: privacy-tools:latest
command: [python]
args:
- -c
- |
import pandas as pd
import numpy as np
from typing import Dict, List, Set
import re
import json
class DataSensitivityClassifier:
def __init__(self):
self.sensitivity_patterns = {
'highly_sensitive': [
r'\b(?:ssn|social.security)\b',
r'\b(?:passport|driver.license)\b',
r'\b(?:credit.card|ccn)\b',
r'\b(?:medical|health|diagnosis)\b',
r'\b(?:genetic|biometric)\b'
],
'sensitive': [
r'\b(?:email|phone|address)\b',
r'\b(?:birth.date|dob)\b',
r'\b(?:income|salary|financial)\b',
r'\b(?:race|ethnicity|religion)\b'
],
'internal': [
r'\b(?:employee|internal|confidential)\b',
r'\b(?:proprietary|trade.secret)\b'
]
}
self.compliance_requirements = {
'highly_sensitive': ['gdpr', 'hipaa', 'ccpa', 'pipeda'],
'sensitive': ['gdpr', 'ccpa', 'pipeda'],
'internal': ['company_policy'],
'public': []
}
def classify_dataset(self, dataset_path: str) -> Dict:
# Load dataset
df = pd.read_csv(dataset_path)
classification_results = {
'columns': {},
'overall_sensitivity': 'public',
'compliance_requirements': set(),
'special_categories': [],
'geographic_scope': self._detect_geographic_scope(df)
}
for column in df.columns:
column_sensitivity = self._classify_column(column, df[column])
classification_results['columns'][column] = column_sensitivity
# Update overall sensitivity
if column_sensitivity['level'] == 'highly_sensitive':
classification_results['overall_sensitivity'] = 'highly_sensitive'
elif (column_sensitivity['level'] == 'sensitive' and
classification_results['overall_sensitivity'] != 'highly_sensitive'):
classification_results['overall_sensitivity'] = 'sensitive'
# Accumulate compliance requirements
classification_results['compliance_requirements'].update(
column_sensitivity['compliance_requirements']
)
# Convert set to list for JSON serialization
classification_results['compliance_requirements'] = list(
classification_results['compliance_requirements']
)
return classification_results
def _classify_column(self, column_name: str, column_data: pd.Series) -> Dict:
column_text = f"{column_name} {' '.join(column_data.astype(str).head(100))}"
for level, patterns in self.sensitivity_patterns.items():
for pattern in patterns:
if re.search(pattern, column_text, re.IGNORECASE):
return {
'level': level,
'confidence': 0.8,
'detected_patterns': [pattern],
'compliance_requirements': self.compliance_requirements[level]
}
return {
'level': 'public',
'confidence': 0.9,
'detected_patterns': [],
'compliance_requirements': []
}
def _detect_geographic_scope(self, df: pd.DataFrame) -> List[str]:
# Simplified geographic scope detection
geographic_indicators = {
'eu': ['eu', 'europe', 'gdpr', 'germany', 'france', 'spain'],
'us': ['us', 'usa', 'united.states', 'california', 'ccpa'],
'canada': ['canada', 'canadian', 'pipeda'],
'global': ['global', 'international', 'worldwide']
}
text_content = ' '.join([
str(df.columns.tolist()),
str(df.head().to_string())
]).lower()
detected_regions = []
for region, indicators in geographic_indicators.items():
if any(indicator in text_content for indicator in indicators):
detected_regions.append(region)
return detected_regions or ['unknown']
# Execute classification
classifier = DataSensitivityClassifier()
results = classifier.classify_dataset('/input/dataset.csv')
# Save results
with open('/output/classification-results.json', 'w') as f:
json.dump(results, f, indent=2)
print(f"Data classification complete. Sensitivity: {results['overall_sensitivity']}")
print(f"Compliance requirements: {results['compliance_requirements']}")
volumeMounts:
- name: input-data
mountPath: /input
- name: output-data
mountPath: /output
outputs:
artifacts:
- name: classified-data
path: /input/dataset.csv
- name: classification-report
path: /output/classification-results.json
- name: detect-and-classify-pii
inputs:
artifacts:
- name: dataset
path: /input/dataset.csv
container:
image: pii-detection:latest
command: [python]
args:
- -c
- |
import pandas as pd
import re
import json
from typing import Dict, List, Tuple
import hashlib
class PIIDetector:
def __init__(self):
self.pii_patterns = {
'ssn': r'\b\d{3}-?\d{2}-?\d{4}\b',
'credit_card': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[- ]?\d{3}[- ]?\d{4}\b',
'ip_address': r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
'passport': r'\b[A-Z]{1,2}\d{6,9}\b',
'date_of_birth': r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b'
}
self.sensitive_keywords = [
'password', 'secret', 'private', 'confidential',
'medical', 'health', 'diagnosis', 'treatment',
'financial', 'income', 'salary', 'bank',
'race', 'ethnicity', 'religion', 'political'
]
def scan_dataset(self, dataset_path: str) -> Dict:
df = pd.read_csv(dataset_path)
pii_report = {
'total_records': len(df),
'columns_with_pii': {},
'pii_summary': {},
'risk_assessment': {},
'anonymization_recommendations': {}
}
for column in df.columns:
column_pii = self._scan_column(column, df[column])
if column_pii['pii_detected']:
pii_report['columns_with_pii'][column] = column_pii
# Generate summary
pii_report['pii_summary'] = self._generate_pii_summary(pii_report['columns_with_pii'])
# Risk assessment
pii_report['risk_assessment'] = self._assess_privacy_risk(pii_report)
# Anonymization recommendations
pii_report['anonymization_recommendations'] = self._recommend_anonymization(
pii_report['columns_with_pii']
)
return pii_report
def _scan_column(self, column_name: str, column_data: pd.Series) -> Dict:
result = {
'pii_detected': False,
'pii_types': [],
'confidence_scores': {},
'sample_matches': {},
'sensitive_keyword_matches': []
}
# Convert to string and sample data
sample_data = column_data.astype(str).head(1000)
column_text = ' '.join(sample_data)
# Check PII patterns
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, column_text, re.IGNORECASE)
if matches:
result['pii_detected'] = True
result['pii_types'].append(pii_type)
result['confidence_scores'][pii_type] = min(len(matches) / len(sample_data), 1.0)
# Hash sample matches for privacy
result['sample_matches'][pii_type] = [
hashlib.sha256(match.encode()).hexdigest()[:8]
for match in matches[:3]
]
# Check sensitive keywords
column_name_lower = column_name.lower()
for keyword in self.sensitive_keywords:
if keyword in column_name_lower or keyword in column_text.lower():
result['sensitive_keyword_matches'].append(keyword)
result['pii_detected'] = True
return result
def _generate_pii_summary(self, columns_with_pii: Dict) -> Dict:
pii_types_found = set()
high_risk_columns = []
for column, pii_info in columns_with_pii.items():
pii_types_found.update(pii_info['pii_types'])
# High risk if multiple PII types or high confidence
if (len(pii_info['pii_types']) > 1 or
any(score > 0.5 for score in pii_info['confidence_scores'].values())):
high_risk_columns.append(column)
return {
'pii_types_found': list(pii_types_found),
'columns_affected': len(columns_with_pii),
'high_risk_columns': high_risk_columns,
'overall_risk_level': self._calculate_risk_level(columns_with_pii)
}
def _assess_privacy_risk(self, pii_report: Dict) -> Dict:
risk_factors = []
risk_score = 0
# High-value PII types
high_value_pii = ['ssn', 'credit_card', 'passport', 'medical']
if any(pii_type in pii_report['pii_summary']['pii_types_found']
for pii_type in high_value_pii):
risk_factors.append("High-value PII detected")
risk_score += 3
# Multiple PII types
if len(pii_report['pii_summary']['pii_types_found']) > 3:
risk_factors.append("Multiple PII types present")
risk_score += 2
# Large dataset
if pii_report['total_records'] > 10000:
risk_factors.append("Large dataset size")
risk_score += 1
risk_level = "low"
if risk_score >= 5:
risk_level = "critical"
elif risk_score >= 3:
risk_level = "high"
elif risk_score >= 1:
risk_level = "medium"
return {
'risk_score': risk_score,
'risk_level': risk_level,
'risk_factors': risk_factors
}
def _recommend_anonymization(self, columns_with_pii: Dict) -> Dict:
recommendations = {}
anonymization_methods = {
'ssn': 'tokenization',
'credit_card': 'tokenization',
'email': 'pseudonymization',
'phone': 'masking',
'ip_address': 'truncation',
'passport': 'tokenization',
'date_of_birth': 'generalization'
}
for column, pii_info in columns_with_pii.items():
column_recommendations = []
for pii_type in pii_info['pii_types']:
if pii_type in anonymization_methods:
column_recommendations.append(anonymization_methods[pii_type])
# Remove duplicates
recommendations[column] = list(set(column_recommendations))
return recommendations
# Execute PII detection
detector = PIIDetector()
pii_report = detector.scan_dataset('/input/dataset.csv')
# Save report
with open('/output/pii-report.json', 'w') as f:
json.dump(pii_report, f, indent=2)
print(f"PII detection complete. Found {len(pii_report['columns_with_pii'])} columns with PII")
print(f"Risk level: {pii_report['risk_assessment']['risk_level']}")
volumeMounts:
- name: output-data
mountPath: /output
outputs:
artifacts:
- name: pii-report
path: /output/pii-report.json
- name: apply-anonymization
inputs:
artifacts:
- name: pii-report
path: /input/pii-report.json
- name: dataset
path: /input/dataset.csv
container:
image: anonymization-tools:latest
command: [python]
args:
- -c
- |
import pandas as pd
import numpy as np
import json
import hashlib
import secrets
from typing import Dict, Any
class DataAnonymizer:
def __init__(self):
self.anonymization_registry = {}
def anonymize_dataset(self, dataset_path: str, pii_report_path: str) -> Dict:
# Load data and PII report
df = pd.read_csv(dataset_path)
with open(pii_report_path, 'r') as f:
pii_report = json.load(f)
anonymization_log = {
'anonymized_columns': {},
'anonymization_methods_used': {},
'k_anonymity_level': None,
'utility_preservation_score': None
}
# Apply anonymization based on recommendations
for column, recommendation in pii_report['anonymization_recommendations'].items():
if column in df.columns:
df[column], method_log = self._anonymize_column(
df[column], recommendation[0] if recommendation else 'masking'
)
anonymization_log['anonymized_columns'][column] = method_log
anonymization_log['anonymization_methods_used'][column] = recommendation[0] if recommendation else 'masking'
# Apply k-anonymity if requested
k_anonymity_level = self._apply_k_anonymity(df, k=5)
anonymization_log['k_anonymity_level'] = k_anonymity_level
# Calculate utility preservation
utility_score = self._calculate_utility_preservation(df, anonymization_log)
anonymization_log['utility_preservation_score'] = utility_score
# Save anonymized dataset
df.to_csv('/output/anonymized-dataset.csv', index=False)
return anonymization_log
def _anonymize_column(self, column: pd.Series, method: str) -> tuple:
original_length = len(column)
method_log = {'method': method, 'records_affected': 0}
if method == 'tokenization':
# Replace with consistent tokens
unique_values = column.unique()
token_map = {val: f"TOKEN_{hashlib.sha256(str(val).encode()).hexdigest()[:8]}"
for val in unique_values}
anonymized_column = column.map(token_map)
method_log['records_affected'] = len(unique_values)
elif method == 'masking':
# Replace with masked values
anonymized_column = column.apply(lambda x: self._mask_value(str(x)))
method_log['records_affected'] = original_length
elif method == 'pseudonymization':
# Replace with pseudonyms
unique_values = column.unique()
pseudo_map = {val: f"PSEUDO_{secrets.token_hex(4)}" for val in unique_values}
anonymized_column = column.map(pseudo_map)
method_log['records_affected'] = len(unique_values)
elif method == 'generalization':
# Generalize values (e.g., birth dates to birth years)
anonymized_column = column.apply(self._generalize_value)
method_log['records_affected'] = original_length
elif method == 'truncation':
# Truncate values (e.g., IP addresses)
anonymized_column = column.apply(lambda x: str(x)[:8] + "***")
method_log['records_affected'] = original_length
else:
# Default: simple masking
anonymized_column = column.apply(lambda x: "***MASKED***")
method_log['records_affected'] = original_length
return anonymized_column, method_log
def _mask_value(self, value: str) -> str:
if len(value) <= 4:
return "*" * len(value)
else:
return value[:2] + "*" * (len(value) - 4) + value[-2:]
def _generalize_value(self, value: str) -> str:
# Simple generalization - would be more sophisticated in practice
if "/" in value or "-" in value: # Likely a date
parts = value.replace("/", "-").split("-")
if len(parts) >= 3:
return parts[-1] # Return just the year
return value
def _apply_k_anonymity(self, df: pd.DataFrame, k: int = 5) -> int:
# Simplified k-anonymity implementation
# In practice, use more sophisticated algorithms
# Identify quasi-identifiers (non-sensitive identifying columns)
quasi_identifiers = []
for col in df.columns:
if df[col].dtype == 'object' and df[col].nunique() < len(df) * 0.8:
quasi_identifiers.append(col)
if not quasi_identifiers:
return k
# Group by quasi-identifiers and suppress small groups
grouped = df.groupby(quasi_identifiers)
small_groups = grouped.filter(lambda x: len(x) < k)
# Suppress records in small groups
if len(small_groups) > 0:
for col in quasi_identifiers:
df.loc[small_groups.index, col] = "*SUPPRESSED*"
return k
def _calculate_utility_preservation(self, df: pd.DataFrame, log: Dict) -> float:
# Simple utility calculation based on data preservation
total_cells = df.shape[0] * df.shape[1]
preserved_cells = total_cells
for column, method_log in log['anonymized_columns'].items():
if method_log['method'] in ['masking', 'truncation']:
# These methods reduce utility more
preserved_cells -= method_log['records_affected'] * 0.5
elif method_log['method'] in ['tokenization', 'pseudonymization']:
# These preserve some utility
preserved_cells -= method_log['records_affected'] * 0.2
return min(1.0, preserved_cells / total_cells)
# Execute anonymization
anonymizer = DataAnonymizer()
anonymization_log = anonymizer.anonymize_dataset('/input/dataset.csv', '/input/pii-report.json')
# Save log
with open('/output/anonymization-log.json', 'w') as f:
json.dump(anonymization_log, f, indent=2)
print(f"Anonymization complete. Utility preserved: {anonymization_log['utility_preservation_score']:.2%}")
print(f"K-anonymity level: {anonymization_log['k_anonymity_level']}")
volumeMounts:
- name: output-data
mountPath: /output
outputs:
artifacts:
- name: anonymized-data
path: /output/anonymized-dataset.csv
- name: anonymization-log
path: /output/anonymization-log.json
- name: validate-consent
inputs:
artifacts:
- name: anonymized-data
path: /input/anonymized-dataset.csv
container:
image: consent-validator:latest
command: [python]
args:
- -c
- |
import pandas as pd
import json
from datetime import datetime, timedelta
class ConsentValidator:
def __init__(self):
self.consent_requirements = {
'gdpr': {
'explicit_consent': True,
'withdraw_mechanism': True,
'purpose_limitation': True,
'data_minimization': True
},
'ccpa': {
'opt_out_mechanism': True,
'purpose_disclosure': True,
'data_sale_disclosure': True
}
}
def validate_dataset_consent(self, dataset_path: str) -> Dict:
df = pd.read_csv(dataset_path)
validation_result = {
'total_records': len(df),
'consent_validation': {
'gdpr_compliant': True,
'ccpa_compliant': True,
'consent_coverage': 1.0,
'withdrawal_requests_honored': True
},
'recommendations': [],
'compliance_status': 'compliant'
}
# In a real implementation, this would check against
# actual consent records, withdrawal requests, etc.
# Simulate consent validation
validation_result['consent_validation']['consent_coverage'] = 0.95
if validation_result['consent_validation']['consent_coverage'] < 1.0:
validation_result['recommendations'].append(
"Obtain explicit consent for remaining 5% of records"
)
return validation_result
# Execute consent validation
validator = ConsentValidator()
validation_result = validator.validate_dataset_consent('/input/anonymized-dataset.csv')
# Copy dataset (in practice, might filter non-consented records)
df = pd.read_csv('/input/anonymized-dataset.csv')
df.to_csv('/output/consent-validated-dataset.csv', index=False)
# Save validation report
with open('/output/consent-validation-report.json', 'w') as f:
json.dump(validation_result, f, indent=2)
print(f"Consent validation complete. Status: {validation_result['compliance_status']}")
print(f"Consent coverage: {validation_result['consent_validation']['consent_coverage']:.1%}")
volumeMounts:
- name: output-data
mountPath: /output
outputs:
artifacts:
- name: consent-validated-data
path: /output/consent-validated-dataset.csv
- name: consent-report
path: /output/consent-validation-report.json
- name: multi-regulation-compliance-check
inputs:
artifacts:
- name: protected-data
path: /input/protected-dataset.csv
container:
image: compliance-checker:latest
command: [python]
args:
- -c
- |
import json
from datetime import datetime
class MultiRegulationComplianceChecker:
def __init__(self):
self.regulations = {
'gdpr': {
'name': 'General Data Protection Regulation',
'geographic_scope': ['eu'],
'requirements': [
'lawful_basis', 'consent', 'data_minimization',
'purpose_limitation', 'storage_limitation',
'accuracy', 'security', 'accountability'
]
},
'ccpa': {
'name': 'California Consumer Privacy Act',
'geographic_scope': ['us', 'california'],
'requirements': [
'disclosure', 'opt_out', 'non_discrimination',
'data_deletion', 'access_rights'
]
},
'pipeda': {
'name': 'Personal Information Protection and Electronic Documents Act',
'geographic_scope': ['canada'],
'requirements': [
'consent', 'purpose_limitation', 'retention_limits',
'security_safeguards', 'access_rights'
]
}
}
def check_compliance(self, dataset_path: str) -> Dict:
compliance_report = {
'timestamp': datetime.now().isoformat(),
'dataset_path': dataset_path,
'overall_compliance': True,
'regulation_compliance': {},
'compliance_score': 0.0,
'recommendations': [],
'next_review_date': (datetime.now() + timedelta(days=90)).isoformat()
}
total_score = 0
regulations_checked = 0
for reg_id, regulation in self.regulations.items():
reg_compliance = self._check_regulation_compliance(reg_id, regulation)
compliance_report['regulation_compliance'][reg_id] = reg_compliance
total_score += reg_compliance['compliance_score']
regulations_checked += 1
if not reg_compliance['compliant']:
compliance_report['overall_compliance'] = False
compliance_report['recommendations'].extend(
reg_compliance['recommendations']
)
compliance_report['compliance_score'] = total_score / regulations_checked
return compliance_report
def _check_regulation_compliance(self, reg_id: str, regulation: Dict) -> Dict:
# Simplified compliance check - in practice, this would be much more comprehensive
reg_compliance = {
'regulation_name': regulation['name'],
'compliant': True,
'compliance_score': 0.95, # Simulated high compliance
'requirements_met': [],
'requirements_failed': [],
'recommendations': []
}
# Simulate some requirements being met
met_requirements = regulation['requirements'][:int(len(regulation['requirements']) * 0.9)]
failed_requirements = regulation['requirements'][int(len(regulation['requirements']) * 0.9):]
reg_compliance['requirements_met'] = met_requirements
reg_compliance['requirements_failed'] = failed_requirements
if failed_requirements:
reg_compliance['compliant'] = False
reg_compliance['compliance_score'] = 0.8
reg_compliance['recommendations'] = [
f"Address {req} requirement for {regulation['name']}"
for req in failed_requirements
]
return reg_compliance
# Execute compliance check
checker = MultiRegulationComplianceChecker()
compliance_report = checker.check_compliance('/input/protected-dataset.csv')
# Save compliance report
with open('/output/compliance-report.json', 'w') as f:
json.dump(compliance_report, f, indent=2)
print(f"Compliance check complete. Overall compliant: {compliance_report['overall_compliance']}")
print(f"Compliance score: {compliance_report['compliance_score']:.1%}")
if not compliance_report['overall_compliance']:
print("Recommendations:")
for rec in compliance_report['recommendations']:
print(f" - {rec}")
volumeMounts:
- name: output-data
mountPath: /output
outputs:
artifacts:
- name: final-compliance-report
path: /output/compliance-report.json
volumes:
- name: input-data
persistentVolumeClaim:
claimName: ai-data-pvc
- name: output-data
persistentVolumeClaim:
claimName: ai-output-pvc💡 AI Security & Privacy Implementation Best Practices
Defense in Depth
Implement multiple layers of security from data ingestion through model deployment and monitoring.
Privacy by Design
Build privacy protection into every stage of the ML lifecycle rather than adding it as an afterthought.
Automated Threat Detection
Use real-time monitoring and automated response systems to detect and mitigate attacks before they cause damage.
Continuous Compliance
Implement automated compliance checking that adapts to changing regulations and organizational requirements.
Incident Response Planning
Develop and regularly test incident response procedures specific to AI security threats and privacy breaches.