AWS Security Monitoring and Alerting Mastery
Implement comprehensive AWS security monitoring with CloudTrail, CloudWatch, Config, Security Hub, GuardDuty, and automated incident response for enterprise-grade threat detection.
📋 Prerequisites
- AWS account with administrative or security permissions
- AWS CLI installed and configured
- Understanding of AWS IAM concepts and JSON policies
- Basic knowledge of CloudFormation or Terraform
- Familiarity with AWS services: CloudTrail, CloudWatch, Config
- Read: AWS IAM Policy Mastery
🎯 What You'll Learn
- Comprehensive CloudTrail setup for API logging and analysis
- CloudWatch security metrics, alarms, and dashboard creation
- AWS Config compliance monitoring and custom rules
- Security Hub centralized security findings management
- GuardDuty threat detection and intelligent analysis
- VPC Flow Logs for network security monitoring
- Automated incident response with EventBridge and Lambda
- Cost anomaly detection for security breach indicators
🏷️ Topics Covered
AWS Security Monitoring Architecture Overview
Modern AWS security monitoring requires a multi-layered approach combining real-time threat detection, compliance monitoring, and automated incident response. This guide covers the complete implementation of enterprise-grade security monitoring using native AWS services and best practices from AWS Well-Architected Security Pillar.
📊 CloudTrail + CloudWatch
Complete API logging with real-time analysis, custom metrics, and intelligent alerting for suspicious activities.
🛡️ Security Hub + Config
Centralized security posture management with automated compliance checks and configuration drift detection.
🔍 GuardDuty + Inspector
AI-powered threat detection with machine learning models trained on AWS threat intelligence and vulnerability scanning.
CloudTrail Comprehensive Logging Setup
CloudTrail provides the foundation for AWS security monitoring by capturing every API call across your AWS environment. Proper setup includes management events, data events, and Insights for anomaly detection.
Organization-Wide CloudTrail Configuration
aws cloudtrail create-trail \
--name OrganizationSecurityTrail \
--s3-bucket-name security-audit-logs-bucket \
--s3-key-prefix cloudtrail-logs/ \
--include-global-service-events \
--is-multi-region-trail \
--enable-log-file-validation \
--is-organization-trail
# Enable data events for S3 and Lambda
aws cloudtrail put-event-selectors \
--trail-name OrganizationSecurityTrail \
--event-selectors '[
{
"ReadWriteType": "All",
"IncludeManagementEvents": true,
"DataResources": [
{
"Type": "AWS::S3::Object",
"Values": ["arn:aws:s3:::*/*"]
},
{
"Type": "AWS::Lambda::Function",
"Values": ["arn:aws:lambda:*"]
}
]
}
]'
# Enable CloudTrail Insights
aws cloudtrail put-insight-selectors \
--trail-name OrganizationSecurityTrail \
--insight-selectors '[
{
"InsightType": "ApiCallRateInsight"
}
]' CloudTrail Log Analysis with CloudWatch
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "cloudtrail.amazonaws.com"
},
"Action": "logs:CreateLogStream",
"Resource": "arn:aws:logs:*:*:log-group:CloudTrail/SecurityAuditLogGroup:*"
},
{
"Effect": "Allow",
"Principal": {
"Service": "cloudtrail.amazonaws.com"
},
"Action": "logs:PutLogEvents",
"Resource": "arn:aws:logs:*:*:log-group:CloudTrail/SecurityAuditLogGroup:*"
}
]
} 🔐 Management Events
Track all control plane operations including IAM changes, EC2 instance launches, and security group modifications.
# CloudWatch Logs filter for critical security events
aws logs put-metric-filter \
--log-group-name CloudTrail/SecurityAuditLogGroup \
--filter-name CriticalSecurityEvents \
--filter-pattern '{ ($.eventName = CreateUser) || ($.eventName = CreateRole) || ($.eventName = DeleteUser) || ($.eventName = DeleteRole) || ($.eventName = AttachUserPolicy) || ($.eventName = DetachUserPolicy) || ($.eventName = CreateAccessKey) || ($.eventName = DeleteAccessKey) }' \
--metric-transformations \
metricName=CriticalSecurityEvents,metricNamespace=Security/CloudTrail,metricValue=1 📁 Data Events
Monitor object-level operations on S3 buckets and function executions to detect data exfiltration attempts.
# Monitor sensitive S3 bucket access
aws logs put-metric-filter \
--log-group-name CloudTrail/SecurityAuditLogGroup \
--filter-name SensitiveS3Access \
--filter-pattern '{ ($.eventSource = s3.amazonaws.com) && ($.eventName = GetObject) && ($.resources[0].ARN = arn:aws:s3:::sensitive-data-*) }' \
--metric-transformations \
metricName=SensitiveDataAccess,metricNamespace=Security/DataAccess,metricValue=1 🧠 CloudTrail Insights
Automated anomaly detection for unusual API call patterns that might indicate security incidents.
{
"InsightSelectors": [
{
"InsightType": "ApiCallRateInsight"
}
]
} CloudWatch Security Metrics and Dashboards
CloudWatch provides real-time monitoring capabilities for security metrics. Create comprehensive dashboards and intelligent alarms that trigger on suspicious activities and security policy violations.
Custom Security Metrics
#!/bin/bash
# Failed console logins
aws logs put-metric-filter \
--log-group-name CloudTrail/SecurityAuditLogGroup \
--filter-name FailedConsoleLogins \
--filter-pattern '{ ($.eventName = ConsoleLogin) && ($.errorMessage EXISTS) }' \
--metric-transformations \
metricName=FailedConsoleLogins,metricNamespace=Security/Authentication,metricValue=1
# Root account usage
aws logs put-metric-filter \
--log-group-name CloudTrail/SecurityAuditLogGroup \
--filter-name RootAccountUsage \
--filter-pattern '{ $.userIdentity.type = "Root" && $.userIdentity.invokedBy NOT EXISTS && $.eventType != "AwsServiceEvent" }' \
--metric-transformations \
metricName=RootAccountUsage,metricNamespace=Security/Authentication,metricValue=1
# Unauthorized API calls
aws logs put-metric-filter \
--log-group-name CloudTrail/SecurityAuditLogGroup \
--filter-name UnauthorizedAPICalls \
--filter-pattern '{ ($.errorCode = "*UnauthorizedOperation") || ($.errorCode = "AccessDenied*") }' \
--metric-transformations \
metricName=UnauthorizedAPICalls,metricNamespace=Security/AccessControl,metricValue=1
# IAM policy changes
aws logs put-metric-filter \
--log-group-name CloudTrail/SecurityAuditLogGroup \
--filter-name IAMPolicyChanges \
--filter-pattern '{ ($.eventName=DeleteGroupPolicy)||($.eventName=DeleteRolePolicy)||($.eventName=DeleteUserPolicy)||($.eventName=PutGroupPolicy)||($.eventName=PutRolePolicy)||($.eventName=PutUserPolicy)||($.eventName=CreatePolicy)||($.eventName=DeletePolicy)||($.eventName=CreatePolicyVersion)||($.eventName=DeletePolicyVersion)||($.eventName=AttachRolePolicy)||($.eventName=DetachRolePolicy)||($.eventName=AttachUserPolicy)||($.eventName=DetachUserPolicy)||($.eventName=AttachGroupPolicy)||($.eventName=DetachGroupPolicy) }' \
--metric-transformations \
metricName=IAMPolicyChanges,metricNamespace=Security/IAM,metricValue=1 Security Alarms Configuration
🚨 Critical Security Alarms
# Root account usage alarm
aws cloudwatch put-metric-alarm \
--alarm-name "Root-Account-Usage" \
--alarm-description "Alarm when root account is used" \
--metric-name RootAccountUsage \
--namespace Security/Authentication \
--statistic Sum \
--period 300 \
--threshold 1 \
--comparison-operator GreaterThanOrEqualToThreshold \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:us-east-1:123456789012:security-alerts
# Excessive failed logins
aws cloudwatch put-metric-alarm \
--alarm-name "Excessive-Failed-Logins" \
--alarm-description "Alarm when failed logins exceed threshold" \
--metric-name FailedConsoleLogins \
--namespace Security/Authentication \
--statistic Sum \
--period 300 \
--threshold 5 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 2 \
--alarm-actions arn:aws:sns:us-east-1:123456789012:security-alerts 📊 Security Dashboard
{
"widgets": [
{
"type": "metric",
"properties": {
"metrics": [
[ "Security/Authentication", "FailedConsoleLogins" ],
[ "Security/Authentication", "RootAccountUsage" ],
[ "Security/AccessControl", "UnauthorizedAPICalls" ],
[ "Security/IAM", "IAMPolicyChanges" ]
],
"period": 300,
"stat": "Sum",
"region": "us-east-1",
"title": "Security Events Summary"
}
},
{
"type": "log",
"properties": {
"query": "SOURCE '/aws/cloudtrail/security-audit-log-group' | fields @timestamp, eventName, sourceIPAddress, userIdentity.type\n| filter eventName like /Create|Delete|Attach|Detach/\n| stats count() by eventName\n| sort count desc",
"region": "us-east-1",
"title": "Top Security-Related API Calls",
"view": "table"
}
}
]
} AWS Config Compliance Monitoring
AWS Config continuously monitors your AWS resource configurations against compliance rules and security best practices. Implement automated remediation for common security misconfigurations.
Config Rules for Security Compliance
# Enable Config service
aws configservice put-configuration-recorder \
--configuration-recorder name=SecurityComplianceRecorder,roleARN=arn:aws:iam::123456789012:role/aws-config-role,recordingGroup='{
"allSupported": true,
"includeGlobalResourceTypes": true,
"resourceTypes": []
}'
aws configservice put-delivery-channel \
--delivery-channel name=SecurityComplianceChannel,s3BucketName=config-compliance-logs
# Deploy essential security rules
aws configservice put-config-rule \
--config-rule '{
"ConfigRuleName": "s3-bucket-public-access-prohibited",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "S3_BUCKET_PUBLIC_ACCESS_PROHIBITED"
}
}'
aws configservice put-config-rule \
--config-rule '{
"ConfigRuleName": "root-access-key-check",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "ROOT_ACCESS_KEY_CHECK"
}
}'
aws configservice put-config-rule \
--config-rule '{
"ConfigRuleName": "iam-password-policy",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "IAM_PASSWORD_POLICY"
},
"InputParameters": "{\"RequireUppercaseCharacters\": \"true\", \"RequireLowercaseCharacters\": \"true\", \"RequireNumbers\": \"true\", \"MinimumPasswordLength\": \"14\"}"
}' Custom Config Rules
import boto3
import json
def lambda_handler(event, context):
"""
Custom Config rule to check if EC2 instances have required security groups
"""
config = boto3.client('config')
ec2 = boto3.client('ec2')
# Get the configuration item
configuration_item = event['configurationItem']
# Check if this is an EC2 instance
if configuration_item['resourceType'] != 'AWS::EC2::Instance':
return {
'statusCode': 200,
'body': json.dumps('NOT_APPLICABLE')
}
# Required security groups (configure based on your needs)
required_security_groups = ['sg-12345678', 'sg-87654321']
# Get instance security groups
instance_security_groups = []
for sg in configuration_item['configuration']['securityGroups']:
instance_security_groups.append(sg['groupId'])
# Check compliance
is_compliant = any(sg in instance_security_groups for sg in required_security_groups)
# Report evaluation result
evaluation = {
'ComplianceResourceType': configuration_item['resourceType'],
'ComplianceResourceId': configuration_item['resourceId'],
'ComplianceType': 'COMPLIANT' if is_compliant else 'NON_COMPLIANT',
'Annotation': f'Instance security groups: {instance_security_groups}',
'OrderingTimestamp': configuration_item['configurationItemCaptureTime']
}
config.put_evaluations(
Evaluations=[evaluation],
ResultToken=event['resultToken']
)
return {
'statusCode': 200,
'body': json.dumps('Evaluation complete')
} Automated Remediation
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Automated remediation for security compliance violations'
Resources:
RemediationRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: RemediationPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:PutBucketPublicAccessBlock
- ec2:ModifyInstanceAttribute
- iam:UpdateAccountPasswordPolicy
Resource: '*'
S3PublicAccessRemediationFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: S3PublicAccessRemediation
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt RemediationRole.Arn
Code:
ZipFile: |
import boto3
import json
def lambda_handler(event, context):
s3 = boto3.client('s3')
# Extract bucket name from Config event
bucket_name = event['configurationItem']['resourceName']
# Apply public access block
s3.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': True
}
)
return {
'statusCode': 200,
'body': json.dumps(f'Applied public access block to {bucket_name}')
}
RemediationConfigurationS3:
Type: AWS::Config::RemediationConfiguration
Properties:
ConfigRuleName: s3-bucket-public-access-prohibited
TargetType: SSM_DOCUMENT
TargetId: !Ref S3PublicAccessRemediationFunction
TargetVersion: '1.0'
Parameters:
AutomationAssumeRole:
StaticValue: !GetAtt RemediationRole.Arn
Automatic: true
MaximumAutomaticAttempts: 3 Security Hub Centralized Security Management
AWS Security Hub provides a central dashboard for security alerts and compliance status across your AWS environment. It aggregates findings from multiple AWS security services and third-party tools.
Security Hub Organization Setup
# Enable Security Hub in management account
aws securityhub enable-security-hub \
--enable-default-standards
# Enable organization management
aws securityhub create-members \
--account-details AccountId=111122223333,Email=security@example.com AccountId=444455556666,Email=dev@example.com
# Auto-enable for new accounts
aws securityhub update-organization-configuration \
--auto-enable
# Enable important security standards
aws securityhub batch-enable-standards \
--standards-subscription-requests '[
{
"StandardsArn": "arn:aws:securityhub:::ruleset/finding-format/aws-foundational-security-standard/v/1.0.0"
},
{
"StandardsArn": "arn:aws:securityhub:::ruleset/finding-format/cis-aws-foundations-benchmark/v/1.2.0"
},
{
"StandardsArn": "arn:aws:securityhub:::ruleset/finding-format/pci-dss/v/3.2.1"
}
]' Custom Security Findings
import boto3
import json
from datetime import datetime
def create_custom_finding(title, description, severity, resource_arn, resource_type):
"""
Create a custom security finding in Security Hub
"""
securityhub = boto3.client('securityhub')
account_id = boto3.client('sts').get_caller_identity()['Account']
region = boto3.Session().region_name
finding = {
'SchemaVersion': '2018-10-08',
'Id': f"custom-finding-{resource_arn.split('/')[-1]}-{int(datetime.now().timestamp())}",
'ProductArn': f"arn:aws:securityhub:{region}:{account_id}:product/{account_id}/default",
'GeneratorId': 'custom-security-monitor',
'AwsAccountId': account_id,
'CreatedAt': datetime.now().isoformat() + 'Z',
'UpdatedAt': datetime.now().isoformat() + 'Z',
'Severity': {
'Label': severity.upper()
},
'Title': title,
'Description': description,
'Resources': [
{
'Type': resource_type,
'Id': resource_arn,
'Region': region
}
],
'WorkflowState': 'NEW',
'RecordState': 'ACTIVE'
}
response = securityhub.batch_import_findings(Findings=[finding])
return response
def lambda_handler(event, context):
"""
Lambda function to create custom findings based on CloudWatch alarms
"""
# Example: High data transfer volumes (potential data exfiltration)
if 'cloudwatch_alarm' in event:
alarm = event['cloudwatch_alarm']
if alarm['alarm_name'] == 'High-Data-Transfer':
create_custom_finding(
title='Unusual Data Transfer Detected',
description=f'High volume of data transfer detected from instance {alarm["instance_id"]}. This could indicate data exfiltration.',
severity='HIGH',
resource_arn=f'arn:aws:ec2:{boto3.Session().region_name}:{boto3.client("sts").get_caller_identity()["Account"]}:instance/{alarm["instance_id"]}',
resource_type='AwsEc2Instance'
)
return {
'statusCode': 200,
'body': json.dumps('Custom finding created successfully')
} Security Hub Automation
Detection
Security services generate findings and send to Security Hub
Aggregation
Security Hub normalizes and correlates findings from multiple sources
Automation
EventBridge rules trigger automated response workflows
Remediation
Lambda functions execute automated remediation actions
GuardDuty Intelligent Threat Detection
Amazon GuardDuty uses machine learning, anomaly detection, and integrated threat intelligence to identify threats such as malicious IPs, cryptocurrency mining, and data exfiltration attempts.
GuardDuty Organization Setup
# Enable GuardDuty in management account
aws guardduty create-detector \
--enable \
--finding-publishing-frequency FIFTEEN_MINUTES
# Get detector ID
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text)
# Enable organization features
aws guardduty update-organization-configuration \
--detector-id $DETECTOR_ID \
--auto-enable
# Enable Malware Protection
aws guardduty update-malware-scan-settings \
--detector-id $DETECTOR_ID \
--scan-resource-criteria '{"Include": {"EC2_INSTANCE_TAG": {"EnvironmentTag": ["Production", "Staging"]}}}'
# Enable S3 Protection
aws guardduty update-detector \
--detector-id $DETECTOR_ID \
--datasources '{
"S3Logs": {"Enable": true},
"Kubernetes": {"AuditLogs": {"Enable": true}},
"MalwareProtection": {"ScanEc2InstanceWithFindings": {"EbsVolumes": true}}
}' Custom Threat Intelligence
# Create S3 bucket for threat intelligence
aws s3 mb s3://guardduty-threat-intel-bucket
# Upload IP threat list
aws s3 cp malicious-ips.txt s3://guardduty-threat-intel-bucket/
# Create threat intel set
aws guardduty create-threat-intel-set \
--detector-id $DETECTOR_ID \
--name "CustomMaliciousIPs" \
--format TXT \
--location s3://guardduty-threat-intel-bucket/malicious-ips.txt \
--activate
# Upload domain threat list
aws s3 cp malicious-domains.txt s3://guardduty-threat-intel-bucket/
aws guardduty create-threat-intel-set \
--detector-id $DETECTOR_ID \
--name "CustomMaliciousDomains" \
--format TXT \
--location s3://guardduty-threat-intel-bucket/malicious-domains.txt \
--activate GuardDuty Findings Analysis
🚨 High Severity Findings
# Get high severity findings from last 24 hours
aws guardduty list-findings \
--detector-id $DETECTOR_ID \
--finding-criteria '{
"Criterion": {
"severity": {
"Gte": 7.0
},
"updatedAt": {
"Gte": "'$(date -d "24 hours ago" -Iseconds)'"
}
}
}' \
--sort-criteria '{
"AttributeName": "severity",
"OrderBy": "DESC"
}' 🔍 Findings by Type
# Cryptocurrency mining detection
aws guardduty list-findings \
--detector-id $DETECTOR_ID \
--finding-criteria '{
"Criterion": {
"type": {
"Eq": ["CryptoCurrency:EC2/BitcoinTool.B!DNS"]
}
}
}'
# Backdoor communication
aws guardduty list-findings \
--detector-id $DETECTOR_ID \
--finding-criteria '{
"Criterion": {
"type": {
"Eq": ["Backdoor:EC2/C&CActivity.B!DNS"]
}
}
}'
# Data exfiltration attempts
aws guardduty list-findings \
--detector-id $DETECTOR_ID \
--finding-criteria '{
"Criterion": {
"type": {
"Eq": ["Exfiltration:S3/ObjectRead.Unusual"]
}
}
}' VPC Flow Logs Network Security Analysis
VPC Flow Logs capture information about IP traffic going to and from network interfaces in your VPC. This data is essential for network security monitoring, troubleshooting, and forensic analysis.
Comprehensive Flow Logs Setup
# Enable VPC-level flow logs
aws ec2 create-flow-logs \
--resource-type VPC \
--resource-ids vpc-12345678 \
--traffic-type ALL \
--log-destination-type cloud-watch-logs \
--log-group-name VPCFlowLogs \
--deliver-logs-permission-arn arn:aws:iam::123456789012:role/flowlogsRole
# Enable subnet-level flow logs for sensitive subnets
aws ec2 create-flow-logs \
--resource-type Subnet \
--resource-ids subnet-12345678 subnet-87654321 \
--traffic-type ALL \
--log-destination-type s3 \
--log-destination arn:aws:s3:::vpc-flow-logs-bucket/AWSLogs/
# Enable ENI-level flow logs for critical instances
aws ec2 create-flow-logs \
--resource-type ENI \
--resource-ids eni-12345678 \
--traffic-type ALL \
--log-destination-type cloud-watch-logs \
--log-group-name CriticalInstanceFlowLogs Flow Logs Analysis Queries
# Top rejected connections (potential attacks)
fields @timestamp, srcaddr, dstaddr, srcport, dstport, protocol, action
| filter action = "REJECT"
| stats count() by srcaddr, dstaddr, dstport
| sort count desc
| limit 50
# Unusual outbound connections to external IPs
fields @timestamp, srcaddr, dstaddr, dstport, protocol, bytes
| filter action = "ACCEPT" and dstaddr not like /^10\./ and dstaddr not like /^172\.(1[6-9]|2[0-9]|3[0-1])\./ and dstaddr not like /^192\.168\./
| stats sum(bytes) as total_bytes by srcaddr, dstaddr, dstport
| sort total_bytes desc
| limit 100
# SSH brute force detection
fields @timestamp, srcaddr, dstaddr, dstport, action
| filter dstport = 22 and action = "REJECT"
| stats count() as attempts by srcaddr, dstaddr
| sort attempts desc
| limit 20
# Data exfiltration detection (high volume outbound)
fields @timestamp, srcaddr, dstaddr, bytes, action
| filter action = "ACCEPT" and bytes > 1000000
| stats sum(bytes) as total_bytes by srcaddr, dstaddr
| sort total_bytes desc
| limit 50 Automated Flow Logs Alerting
import boto3
import json
import gzip
import base64
from datetime import datetime, timedelta
def lambda_handler(event, context):
"""
Analyze VPC Flow Logs for security threats
"""
# Decode CloudWatch Logs data
cw_data = event['awslogs']['data']
compressed_payload = base64.b64decode(cw_data)
uncompressed_payload = gzip.decompress(compressed_payload)
log_data = json.loads(uncompressed_payload)
suspicious_activities = []
for log_event in log_data['logEvents']:
message = log_event['message']
fields = message.split(' ')
if len(fields) >= 14:
srcaddr = fields[3]
dstaddr = fields[4]
srcport = fields[5]
dstport = fields[6]
protocol = fields[7]
packets = int(fields[8]) if fields[8].isdigit() else 0
bytes_transferred = int(fields[9]) if fields[9].isdigit() else 0
action = fields[12]
# Detect suspicious patterns
# 1. High volume data transfer
if bytes_transferred > 100000000: # 100MB
suspicious_activities.append({
'type': 'HIGH_DATA_TRANSFER',
'source': srcaddr,
'destination': dstaddr,
'bytes': bytes_transferred,
'timestamp': log_event['timestamp']
})
# 2. Multiple rejected connections (port scan)
if action == 'REJECT' and packets > 10:
suspicious_activities.append({
'type': 'PORT_SCAN_ATTEMPT',
'source': srcaddr,
'destination': dstaddr,
'port': dstport,
'packets': packets,
'timestamp': log_event['timestamp']
})
# 3. Connections to known bad ports
bad_ports = ['1433', '3389', '5432', '27017'] # SQL Server, RDP, PostgreSQL, MongoDB
if dstport in bad_ports and action == 'ACCEPT':
suspicious_activities.append({
'type': 'DANGEROUS_PORT_ACCESS',
'source': srcaddr,
'destination': dstaddr,
'port': dstport,
'timestamp': log_event['timestamp']
})
# Send alerts for suspicious activities
if suspicious_activities:
sns = boto3.client('sns')
message = json.dumps({
'alert_type': 'VPC_FLOW_LOGS_SECURITY_ALERT',
'suspicious_activities': suspicious_activities,
'analysis_timestamp': datetime.now().isoformat()
}, indent=2)
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:security-alerts',
Message=message,
Subject='VPC Flow Logs Security Alert'
)
return {
'statusCode': 200,
'body': json.dumps(f'Analyzed {len(log_data["logEvents"])} log events, found {len(suspicious_activities)} suspicious activities')
} Automated Security Incident Response
Implement automated incident response workflows using EventBridge, Lambda, and Systems Manager to reduce response time and ensure consistent security incident handling.
EventBridge Security Automation Rules
{
"Rules": [
{
"Name": "GuardDutyHighSeverityFindings",
"EventPattern": {
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Finding"],
"detail": {
"severity": [{"numeric": [">=", 7.0]}]
}
},
"State": "ENABLED",
"Targets": [
{
"Id": "1",
"Arn": "arn:aws:lambda:us-east-1:123456789012:function:SecurityIncidentResponse"
},
{
"Id": "2",
"Arn": "arn:aws:sns:us-east-1:123456789012:security-alerts"
}
]
},
{
"Name": "SecurityHubCriticalFindings",
"EventPattern": {
"source": ["aws.securityhub"],
"detail-type": ["Security Hub Findings - Imported"],
"detail": {
"findings": {
"Severity": {
"Label": ["CRITICAL", "HIGH"]
},
"WorkflowState": ["NEW"]
}
}
},
"State": "ENABLED",
"Targets": [
{
"Id": "1",
"Arn": "arn:aws:lambda:us-east-1:123456789012:function:SecurityHubResponseHandler"
}
]
},
{
"Name": "UnauthorizedAPICallsPattern",
"EventPattern": {
"source": ["aws.cloudtrail"],
"detail": {
"errorCode": ["UnauthorizedOperation", "AccessDenied", "InvalidUserID.NotFound"],
"sourceIPAddress": [{"anything-but": {"prefix": "10."}}]
}
},
"State": "ENABLED",
"Targets": [
{
"Id": "1",
"Arn": "arn:aws:lambda:us-east-1:123456789012:function:UnauthorizedAccessHandler"
}
]
}
]
} Incident Response Lambda Functions
🚨 Malicious IP Isolation
import boto3
import json
def lambda_handler(event, context):
"""
Automatically isolate instances communicating with malicious IPs
"""
ec2 = boto3.client('ec2')
# Extract finding details
detail = event['detail']
finding_type = detail.get('type', '')
service = detail.get('service', {})
if 'Backdoor' in finding_type or 'Trojan' in finding_type:
# Get affected instance
instance_details = service.get('instanceDetails', {})
instance_id = instance_details.get('instanceId')
if instance_id:
# Create isolation security group
try:
response = ec2.create_security_group(
GroupName=f'isolation-{instance_id}',
Description='Isolation security group for compromised instance',
VpcId=instance_details.get('vpcId')
)
isolation_sg_id = response['GroupId']
# Attach isolation security group
ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[isolation_sg_id]
)
# Create Systems Manager document for forensics
ssm = boto3.client('ssm')
ssm.send_command(
InstanceIds=[instance_id],
DocumentName='AWS-RunShellScript',
Parameters={
'commands': [
'sudo netstat -tulpn > /tmp/network_connections.txt',
'sudo ps aux > /tmp/running_processes.txt',
'sudo find /tmp -name "*.log" -exec cp {} /tmp/forensics/ \;'
]
}
)
return {
'statusCode': 200,
'body': json.dumps({
'action': 'INSTANCE_ISOLATED',
'instance_id': instance_id,
'isolation_sg': isolation_sg_id
})
}
except Exception as e:
print(f'Error isolating instance: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps(f'Failed to isolate instance: {str(e)}')
}
return {
'statusCode': 200,
'body': json.dumps('No isolation required')
} 🔐 Compromise Account Response
import boto3
import json
from datetime import datetime
def lambda_handler(event, context):
"""
Respond to compromised IAM credentials
"""
iam = boto3.client('iam')
# Extract user information from GuardDuty finding
detail = event['detail']
user_identity = detail.get('service', {}).get('userDetails', {})
username = user_identity.get('userName')
access_key_id = user_identity.get('accessKeyId')
if username and access_key_id:
try:
# Disable the compromised access key
iam.update_access_key(
UserName=username,
AccessKeyId=access_key_id,
Status='Inactive'
)
# Attach explicit deny policy
deny_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "*",
"Resource": "*",
"Condition": {
"StringEquals": {
"aws:RequestedRegion": "*"
}
}
}
]
}
policy_name = f'EmergencyDeny-{username}-{datetime.now().strftime("%Y%m%d%H%M%S")}'
iam.put_user_policy(
UserName=username,
PolicyName=policy_name,
PolicyDocument=json.dumps(deny_policy)
)
# Force password reset if user has console access
try:
iam.update_login_profile(
UserName=username,
PasswordResetRequired=True
)
except iam.exceptions.NoSuchEntityException:
pass # User doesn't have console access
# Send notification
sns = boto3.client('sns')
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:security-alerts',
Subject='URGENT: IAM User Compromised - Actions Taken',
Message=json.dumps({
'incident_type': 'IAM_COMPROMISE',
'username': username,
'access_key_disabled': access_key_id,
'deny_policy_applied': policy_name,
'timestamp': datetime.now().isoformat(),
'actions_taken': [
'Access key disabled',
'Explicit deny policy applied',
'Password reset required'
]
}, indent=2)
)
return {
'statusCode': 200,
'body': json.dumps({
'action': 'USER_SECURED',
'username': username,
'access_key_disabled': access_key_id
})
}
except Exception as e:
print(f'Error securing compromised user: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps(f'Failed to secure user: {str(e)}')
}
return {
'statusCode': 200,
'body': json.dumps('No user action required')
} Incident Response Playbooks
# Systems Manager Automation Document for Security Incidents
schemaVersion: '0.3'
description: 'Automated Security Incident Response Playbook'
assumeRole: '{{ AutomationAssumeRole }}'
parameters:
InstanceId:
type: String
description: 'ID of the compromised EC2 instance'
IncidentSeverity:
type: String
description: 'Severity level of the security incident'
allowedValues:
- CRITICAL
- HIGH
- MEDIUM
- LOW
default: HIGH
AutomationAssumeRole:
type: String
description: 'IAM role for automation execution'
mainSteps:
- name: CreateSnapshot
action: 'aws:executeAwsApi'
description: 'Create snapshot of affected instance for forensics'
inputs:
Service: ec2
Api: CreateSnapshot
VolumeId: '{{ GetVolumeId.VolumeId }}'
Description: 'Forensic snapshot for incident {{ automation:EXECUTION_ID }}'
outputs:
- Name: SnapshotId
Selector: $.SnapshotId
- name: IsolateInstance
action: 'aws:executeAwsApi'
description: 'Apply isolation security group'
inputs:
Service: ec2
Api: ModifyInstanceAttribute
InstanceId: '{{ InstanceId }}'
Groups:
- '{{ CreateIsolationSecurityGroup.GroupId }}'
- name: CollectForensicData
action: 'aws:runCommand'
description: 'Collect forensic data from instance'
inputs:
DocumentName: 'AWS-RunShellScript'
InstanceIds:
- '{{ InstanceId }}'
Parameters:
commands:
- 'mkdir -p /tmp/forensics'
- 'netstat -tulpn > /tmp/forensics/network_connections.txt'
- 'ps aux > /tmp/forensics/processes.txt'
- 'last -n 100 > /tmp/forensics/login_history.txt'
- 'find /var/log -name "*.log" -mtime -1 -exec cp {} /tmp/forensics/ \;'
- 'aws s3 cp /tmp/forensics/ s3://security-forensics-bucket/{{ automation:EXECUTION_ID }}/ --recursive'
- name: NotifySecurityTeam
action: 'aws:executeAwsApi'
description: 'Send notification to security team'
inputs:
Service: sns
Api: Publish
TopicArn: 'arn:aws:sns:us-east-1:123456789012:security-incidents'
Subject: 'Security Incident Response - {{ IncidentSeverity }}'
Message: |
Security incident response executed for instance {{ InstanceId }}
Actions taken:
- Forensic snapshot created: {{ CreateSnapshot.SnapshotId }}
- Instance isolated with security group
- Forensic data collected and uploaded to S3
Incident severity: {{ IncidentSeverity }}
Automation execution: {{ automation:EXECUTION_ID }}
Please review findings and take additional manual actions as needed.
outputs:
- SnapshotId: '{{ CreateSnapshot.SnapshotId }}'
- AutomationExecutionId: '{{ automation:EXECUTION_ID }}' Cost Anomaly Detection for Security Breaches
Unusual cost spikes can indicate security breaches such as cryptocurrency mining, unauthorized resource creation, or data exfiltration. AWS Cost Anomaly Detection helps identify these patterns.
Cost Anomaly Detectors for Security
# Create cost anomaly detector for EC2 usage spikes
aws ce create-anomaly-detector \
--anomaly-detector '{
"DetectorName": "EC2-Security-Anomaly-Detector",
"MonitorType": "DIMENSIONAL",
"DimensionKey": "SERVICE",
"MatchOptions": ["EQUALS"],
"MonitorSpecification": "{"DimensionKeyValue": "Amazon Elastic Compute Cloud - Compute"}"
}'
# Create detector for data transfer anomalies
aws ce create-anomaly-detector \
--anomaly-detector '{
"DetectorName": "DataTransfer-Security-Anomaly-Detector",
"MonitorType": "DIMENSIONAL",
"DimensionKey": "USAGE_TYPE_GROUP",
"MatchOptions": ["EQUALS"],
"MonitorSpecification": "{"DimensionKeyValue": "EC2-Data Transfer"}"
}'
# Create detector for Lambda execution anomalies
aws ce create-anomaly-detector \
--anomaly-detector '{
"DetectorName": "Lambda-Security-Anomaly-Detector",
"MonitorType": "DIMENSIONAL",
"DimensionKey": "SERVICE",
"MatchOptions": ["EQUALS"],
"MonitorSpecification": "{"DimensionKeyValue": "AWS Lambda"}"
}' Cost Anomaly Alerting
# Create subscription for immediate alerting
aws ce create-anomaly-subscription \
--anomaly-subscription '{
"SubscriptionName": "Security-Cost-Anomalies",
"MonitorArnList": [
"arn:aws:ce::123456789012:anomalydetector/ec2-security-detector-id",
"arn:aws:ce::123456789012:anomalydetector/datatransfer-security-detector-id",
"arn:aws:ce::123456789012:anomalydetector/lambda-security-detector-id"
],
"Subscribers": [
{
"Address": "security-team@company.com",
"Type": "EMAIL",
"Status": "CONFIRMED"
},
{
"Address": "arn:aws:sns:us-east-1:123456789012:cost-security-alerts",
"Type": "SNS",
"Status": "CONFIRMED"
}
],
"Threshold": 100.0,
"Frequency": "IMMEDIATE",
"DimensionFilters": [
{
"DimensionKey": "ANOMALY_TOTAL_IMPACT_ABSOLUTE",
"Values": ["100"]
}
]
}' Cost-Based Security Analysis
import boto3
import json
from datetime import datetime, timedelta
def analyze_cost_anomalies(event, context):
"""
Analyze cost anomalies for potential security implications
"""
ce_client = boto3.client('ce')
# Get cost anomalies from the last 7 days
end_date = datetime.now().date()
start_date = end_date - timedelta(days=7)
response = ce_client.get_anomalies(
DateInterval={
'StartDate': start_date.strftime('%Y-%m-%d'),
'EndDate': end_date.strftime('%Y-%m-%d')
},
TotalImpact={
'NumericOperator': 'GREATER_THAN_OR_EQUAL',
'Values': ['50'] # $50 or more impact
}
)
security_alerts = []
for anomaly in response['Anomalies']:
impact = float(anomaly['Impact']['TotalImpact'])
service = anomaly.get('DimensionMetadata', {}).get('SERVICE', 'Unknown')
# Analyze for security-related patterns
security_risk = analyze_security_risk(anomaly, service, impact)
if security_risk['risk_level'] in ['HIGH', 'CRITICAL']:
security_alerts.append({
'anomaly_id': anomaly['AnomalyId'],
'service': service,
'impact': impact,
'risk_assessment': security_risk,
'start_date': anomaly['AnomalyStartDate'],
'end_date': anomaly['AnomalyEndDate']
})
# Send security alerts if found
if security_alerts:
send_security_cost_alert(security_alerts)
return {
'statusCode': 200,
'body': json.dumps({
'total_anomalies': len(response['Anomalies']),
'security_alerts': len(security_alerts),
'alerts': security_alerts
})
}
def analyze_security_risk(anomaly, service, impact):
"""
Analyze cost anomaly for security risk indicators
"""
risk_level = 'LOW'
risk_indicators = []
# High impact thresholds by service
if service == 'Amazon Elastic Compute Cloud - Compute' and impact > 500:
risk_level = 'HIGH'
risk_indicators.append('Potential cryptocurrency mining or unauthorized compute usage')
elif service == 'Amazon Simple Storage Service' and impact > 200:
risk_level = 'HIGH'
risk_indicators.append('Potential data exfiltration or unauthorized storage usage')
elif service == 'AWS Lambda' and impact > 100:
risk_level = 'CRITICAL'
risk_indicators.append('Potential malicious Lambda execution or DDoS amplification')
elif 'Data Transfer' in service and impact > 300:
risk_level = 'CRITICAL'
risk_indicators.append('Potential data exfiltration via high egress costs')
# Check for unusual time patterns
anomaly_hour = datetime.strptime(anomaly['AnomalyStartDate'], '%Y-%m-%d').hour
if anomaly_hour < 6 or anomaly_hour > 22: # Outside business hours
risk_indicators.append('Anomaly detected outside normal business hours')
if risk_level == 'LOW':
risk_level = 'MEDIUM'
return {
'risk_level': risk_level,
'risk_indicators': risk_indicators,
'recommendation': get_security_recommendation(risk_level, service)
}
def get_security_recommendation(risk_level, service):
"""
Get security recommendations based on risk level and service
"""
if risk_level in ['HIGH', 'CRITICAL']:
return f"Immediate investigation required for {service} usage anomaly. Check for unauthorized access, review CloudTrail logs, and validate all recent resource changes."
elif risk_level == 'MEDIUM':
return f"Monitor {service} usage patterns and review access logs. Consider implementing additional monitoring."
else:
return f"Continue monitoring {service} usage patterns."
def send_security_cost_alert(alerts):
"""
Send security alerts via SNS
"""
sns = boto3.client('sns')
message = {
'alert_type': 'COST_ANOMALY_SECURITY_ALERT',
'alert_count': len(alerts),
'alerts': alerts,
'timestamp': datetime.now().isoformat(),
'recommended_actions': [
'Review CloudTrail logs for suspicious API calls',
'Check GuardDuty findings for corresponding threats',
'Validate all recent IAM and resource changes',
'Consider isolating affected resources if compromise suspected'
]
}
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:cost-security-alerts',
Subject=f'SECURITY ALERT: {len(alerts)} Cost Anomalies Detected',
Message=json.dumps(message, indent=2)
) AWS Security Monitoring Best Practices
📊 Comprehensive Logging
Enable CloudTrail for all regions and accounts, capture data events for sensitive resources, and implement log file validation for integrity.
🔍 Real-time Analysis
Use CloudWatch Logs Insights for real-time analysis, create custom metrics for security events, and implement intelligent alerting thresholds.
🛡️ Centralized Management
Use Security Hub for centralized findings management, implement organization-wide security standards, and automate compliance monitoring.
🚨 Automated Response
Implement EventBridge rules for automated incident response, create playbooks for common security scenarios, and maintain isolation procedures.
🎯 Threat Intelligence
Enable GuardDuty with custom threat intelligence feeds, monitor for emerging threats, and integrate with external security tools.
💰 Cost-Based Detection
Monitor cost anomalies as security indicators, implement usage-based alerting, and correlate cost spikes with security events.
Security Monitoring Troubleshooting
🔍 Missing CloudTrail Events
Problem: Important API calls not appearing in CloudTrail logs
Solutions:
- Verify CloudTrail is enabled in all regions
- Check event selector configuration for data events
- Ensure S3 bucket permissions allow CloudTrail delivery
- Verify CloudWatch Logs integration is properly configured
🚨 False Positive Alerts
Problem: Security alarms triggering on legitimate activities
Solutions:
- Adjust alarm thresholds based on baseline metrics
- Implement time-based filtering for business hours
- Add IP address whitelisting for known safe sources
- Use composite alarms for multiple condition validation
⚡ High Lambda Costs in GuardDuty
Problem: GuardDuty generating high Lambda execution costs
Solutions:
- Optimize Lambda function timeout and memory allocation
- Implement efficient filtering to reduce function invocations
- Use Step Functions for complex workflow orchestration
- Consider batching events before processing
Next Steps
🎉 Congratulations!
You now have comprehensive AWS security monitoring capabilities, including:
- ✅ CloudTrail comprehensive logging and analysis setup
- ✅ CloudWatch security metrics, alarms, and dashboards
- ✅ AWS Config compliance monitoring and automated remediation
- ✅ Security Hub centralized security findings management
- ✅ GuardDuty intelligent threat detection and analysis
- ✅ VPC Flow Logs network security monitoring
- ✅ Automated incident response with EventBridge and Lambda
- ✅ Cost anomaly detection for security breach indicators