advanced 45 min read cloud-security Updated: 2025-07-03

AWS Security Monitoring and Alerting Mastery

Implement comprehensive AWS security monitoring with CloudTrail, CloudWatch, Config, Security Hub, GuardDuty, and automated incident response for enterprise-grade threat detection.

📋 Prerequisites

  • AWS account with administrative or security permissions
  • AWS CLI installed and configured
  • Understanding of AWS IAM concepts and JSON policies
  • Basic knowledge of CloudFormation or Terraform
  • Familiarity with AWS services: CloudTrail, CloudWatch, Config
  • Read: AWS IAM Policy Mastery

🎯 What You'll Learn

  • Comprehensive CloudTrail setup for API logging and analysis
  • CloudWatch security metrics, alarms, and dashboard creation
  • AWS Config compliance monitoring and custom rules
  • Security Hub centralized security findings management
  • GuardDuty threat detection and intelligent analysis
  • VPC Flow Logs for network security monitoring
  • Automated incident response with EventBridge and Lambda
  • Cost anomaly detection for security breach indicators

🏷️ Topics Covered

aws security monitoring setupcloudtrail logging best practicesaws config compliance rulessecurity hub setup guideguardduty threat detectionaws security automationvpc flow logs analysisaws security alerts configuration

AWS Security Monitoring Architecture Overview

Modern AWS security monitoring requires a multi-layered approach combining real-time threat detection, compliance monitoring, and automated incident response. This guide covers the complete implementation of enterprise-grade security monitoring using native AWS services and best practices from AWS Well-Architected Security Pillar.

📊 CloudTrail + CloudWatch

Complete API logging with real-time analysis, custom metrics, and intelligent alerting for suspicious activities.

Best for: API auditing, user behavior analysis, and detecting unauthorized access patterns.

🛡️ Security Hub + Config

Centralized security posture management with automated compliance checks and configuration drift detection.

Best for: Compliance monitoring, security findings aggregation, and remediation workflows.

🔍 GuardDuty + Inspector

AI-powered threat detection with machine learning models trained on AWS threat intelligence and vulnerability scanning.

Best for: Advanced threat detection, malware identification, and container security scanning.

CloudTrail Comprehensive Logging Setup

CloudTrail provides the foundation for AWS security monitoring by capturing every API call across your AWS environment. Proper setup includes management events, data events, and Insights for anomaly detection.

Organization-Wide CloudTrail Configuration

Create Organization CloudTrail
aws cloudtrail create-trail \
  --name OrganizationSecurityTrail \
  --s3-bucket-name security-audit-logs-bucket \
  --s3-key-prefix cloudtrail-logs/ \
  --include-global-service-events \
  --is-multi-region-trail \
  --enable-log-file-validation \
  --is-organization-trail

# Enable data events for S3 and Lambda
aws cloudtrail put-event-selectors \
  --trail-name OrganizationSecurityTrail \
  --event-selectors '[
    {
      "ReadWriteType": "All",
      "IncludeManagementEvents": true,
      "DataResources": [
        {
          "Type": "AWS::S3::Object",
          "Values": ["arn:aws:s3:::*/*"]
        },
        {
          "Type": "AWS::Lambda::Function", 
          "Values": ["arn:aws:lambda:*"]
        }
      ]
    }
  ]'

# Enable CloudTrail Insights
aws cloudtrail put-insight-selectors \
  --trail-name OrganizationSecurityTrail \
  --insight-selectors '[
    {
      "InsightType": "ApiCallRateInsight"
    }
  ]'

CloudTrail Log Analysis with CloudWatch

cloudtrail-log-group-setup.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "cloudtrail.amazonaws.com"
      },
      "Action": "logs:CreateLogStream",
      "Resource": "arn:aws:logs:*:*:log-group:CloudTrail/SecurityAuditLogGroup:*"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "cloudtrail.amazonaws.com"
      },
      "Action": "logs:PutLogEvents",
      "Resource": "arn:aws:logs:*:*:log-group:CloudTrail/SecurityAuditLogGroup:*"
    }
  ]
}

🔐 Management Events

Track all control plane operations including IAM changes, EC2 instance launches, and security group modifications.

High-Risk API Filter
# CloudWatch Logs filter for critical security events
aws logs put-metric-filter \
  --log-group-name CloudTrail/SecurityAuditLogGroup \
  --filter-name CriticalSecurityEvents \
  --filter-pattern '{ ($.eventName = CreateUser) || ($.eventName = CreateRole) || ($.eventName = DeleteUser) || ($.eventName = DeleteRole) || ($.eventName = AttachUserPolicy) || ($.eventName = DetachUserPolicy) || ($.eventName = CreateAccessKey) || ($.eventName = DeleteAccessKey) }' \
  --metric-transformations \
    metricName=CriticalSecurityEvents,metricNamespace=Security/CloudTrail,metricValue=1

📁 Data Events

Monitor object-level operations on S3 buckets and function executions to detect data exfiltration attempts.

S3 Data Access Filter
# Monitor sensitive S3 bucket access
aws logs put-metric-filter \
  --log-group-name CloudTrail/SecurityAuditLogGroup \
  --filter-name SensitiveS3Access \
  --filter-pattern '{ ($.eventSource = s3.amazonaws.com) && ($.eventName = GetObject) && ($.resources[0].ARN = arn:aws:s3:::sensitive-data-*) }' \
  --metric-transformations \
    metricName=SensitiveDataAccess,metricNamespace=Security/DataAccess,metricValue=1

🧠 CloudTrail Insights

Automated anomaly detection for unusual API call patterns that might indicate security incidents.

Insights Configuration
{
  "InsightSelectors": [
    {
      "InsightType": "ApiCallRateInsight"
    }
  ]
}

CloudWatch Security Metrics and Dashboards

CloudWatch provides real-time monitoring capabilities for security metrics. Create comprehensive dashboards and intelligent alarms that trigger on suspicious activities and security policy violations.

Custom Security Metrics

security-metrics-filters.sh
#!/bin/bash

# Failed console logins
aws logs put-metric-filter \
  --log-group-name CloudTrail/SecurityAuditLogGroup \
  --filter-name FailedConsoleLogins \
  --filter-pattern '{ ($.eventName = ConsoleLogin) && ($.errorMessage EXISTS) }' \
  --metric-transformations \
    metricName=FailedConsoleLogins,metricNamespace=Security/Authentication,metricValue=1

# Root account usage
aws logs put-metric-filter \
  --log-group-name CloudTrail/SecurityAuditLogGroup \
  --filter-name RootAccountUsage \
  --filter-pattern '{ $.userIdentity.type = "Root" && $.userIdentity.invokedBy NOT EXISTS && $.eventType != "AwsServiceEvent" }' \
  --metric-transformations \
    metricName=RootAccountUsage,metricNamespace=Security/Authentication,metricValue=1

# Unauthorized API calls
aws logs put-metric-filter \
  --log-group-name CloudTrail/SecurityAuditLogGroup \
  --filter-name UnauthorizedAPICalls \
  --filter-pattern '{ ($.errorCode = "*UnauthorizedOperation") || ($.errorCode = "AccessDenied*") }' \
  --metric-transformations \
    metricName=UnauthorizedAPICalls,metricNamespace=Security/AccessControl,metricValue=1

# IAM policy changes
aws logs put-metric-filter \
  --log-group-name CloudTrail/SecurityAuditLogGroup \
  --filter-name IAMPolicyChanges \
  --filter-pattern '{ ($.eventName=DeleteGroupPolicy)||($.eventName=DeleteRolePolicy)||($.eventName=DeleteUserPolicy)||($.eventName=PutGroupPolicy)||($.eventName=PutRolePolicy)||($.eventName=PutUserPolicy)||($.eventName=CreatePolicy)||($.eventName=DeletePolicy)||($.eventName=CreatePolicyVersion)||($.eventName=DeletePolicyVersion)||($.eventName=AttachRolePolicy)||($.eventName=DetachRolePolicy)||($.eventName=AttachUserPolicy)||($.eventName=DetachUserPolicy)||($.eventName=AttachGroupPolicy)||($.eventName=DetachGroupPolicy) }' \
  --metric-transformations \
    metricName=IAMPolicyChanges,metricNamespace=Security/IAM,metricValue=1

Security Alarms Configuration

🚨 Critical Security Alarms

Create High-Priority Alarms
# Root account usage alarm
aws cloudwatch put-metric-alarm \
  --alarm-name "Root-Account-Usage" \
  --alarm-description "Alarm when root account is used" \
  --metric-name RootAccountUsage \
  --namespace Security/Authentication \
  --statistic Sum \
  --period 300 \
  --threshold 1 \
  --comparison-operator GreaterThanOrEqualToThreshold \
  --evaluation-periods 1 \
  --alarm-actions arn:aws:sns:us-east-1:123456789012:security-alerts

# Excessive failed logins
aws cloudwatch put-metric-alarm \
  --alarm-name "Excessive-Failed-Logins" \
  --alarm-description "Alarm when failed logins exceed threshold" \
  --metric-name FailedConsoleLogins \
  --namespace Security/Authentication \
  --statistic Sum \
  --period 300 \
  --threshold 5 \
  --comparison-operator GreaterThanThreshold \
  --evaluation-periods 2 \
  --alarm-actions arn:aws:sns:us-east-1:123456789012:security-alerts

📊 Security Dashboard

security-dashboard.json
{
  "widgets": [
    {
      "type": "metric",
      "properties": {
        "metrics": [
          [ "Security/Authentication", "FailedConsoleLogins" ],
          [ "Security/Authentication", "RootAccountUsage" ],
          [ "Security/AccessControl", "UnauthorizedAPICalls" ],
          [ "Security/IAM", "IAMPolicyChanges" ]
        ],
        "period": 300,
        "stat": "Sum",
        "region": "us-east-1",
        "title": "Security Events Summary"
      }
    },
    {
      "type": "log",
      "properties": {
        "query": "SOURCE '/aws/cloudtrail/security-audit-log-group' | fields @timestamp, eventName, sourceIPAddress, userIdentity.type\n| filter eventName like /Create|Delete|Attach|Detach/\n| stats count() by eventName\n| sort count desc",
        "region": "us-east-1",
        "title": "Top Security-Related API Calls",
        "view": "table"
      }
    }
  ]
}

AWS Config Compliance Monitoring

AWS Config continuously monitors your AWS resource configurations against compliance rules and security best practices. Implement automated remediation for common security misconfigurations.

Config Rules for Security Compliance

Deploy Security Config Rules
# Enable Config service
aws configservice put-configuration-recorder \
  --configuration-recorder name=SecurityComplianceRecorder,roleARN=arn:aws:iam::123456789012:role/aws-config-role,recordingGroup='{
    "allSupported": true,
    "includeGlobalResourceTypes": true,
    "resourceTypes": []
  }'

aws configservice put-delivery-channel \
  --delivery-channel name=SecurityComplianceChannel,s3BucketName=config-compliance-logs

# Deploy essential security rules
aws configservice put-config-rule \
  --config-rule '{
    "ConfigRuleName": "s3-bucket-public-access-prohibited",
    "Source": {
      "Owner": "AWS",
      "SourceIdentifier": "S3_BUCKET_PUBLIC_ACCESS_PROHIBITED"
    }
  }'

aws configservice put-config-rule \
  --config-rule '{
    "ConfigRuleName": "root-access-key-check", 
    "Source": {
      "Owner": "AWS",
      "SourceIdentifier": "ROOT_ACCESS_KEY_CHECK"
    }
  }'

aws configservice put-config-rule \
  --config-rule '{
    "ConfigRuleName": "iam-password-policy",
    "Source": {
      "Owner": "AWS", 
      "SourceIdentifier": "IAM_PASSWORD_POLICY"
    },
    "InputParameters": "{\"RequireUppercaseCharacters\": \"true\", \"RequireLowercaseCharacters\": \"true\", \"RequireNumbers\": \"true\", \"MinimumPasswordLength\": \"14\"}"
  }'

Custom Config Rules

custom-security-rule.py
import boto3
import json

def lambda_handler(event, context):
    """
    Custom Config rule to check if EC2 instances have required security groups
    """
    config = boto3.client('config')
    ec2 = boto3.client('ec2')
    
    # Get the configuration item
    configuration_item = event['configurationItem']
    
    # Check if this is an EC2 instance
    if configuration_item['resourceType'] != 'AWS::EC2::Instance':
        return {
            'statusCode': 200,
            'body': json.dumps('NOT_APPLICABLE')
        }
    
    # Required security groups (configure based on your needs)
    required_security_groups = ['sg-12345678', 'sg-87654321']
    
    # Get instance security groups
    instance_security_groups = []
    for sg in configuration_item['configuration']['securityGroups']:
        instance_security_groups.append(sg['groupId'])
    
    # Check compliance
    is_compliant = any(sg in instance_security_groups for sg in required_security_groups)
    
    # Report evaluation result
    evaluation = {
        'ComplianceResourceType': configuration_item['resourceType'],
        'ComplianceResourceId': configuration_item['resourceId'],
        'ComplianceType': 'COMPLIANT' if is_compliant else 'NON_COMPLIANT',
        'Annotation': f'Instance security groups: {instance_security_groups}',
        'OrderingTimestamp': configuration_item['configurationItemCaptureTime']
    }
    
    config.put_evaluations(
        Evaluations=[evaluation],
        ResultToken=event['resultToken']
    )
    
    return {
        'statusCode': 200,
        'body': json.dumps('Evaluation complete')
    }

Automated Remediation

config-remediation-cfn.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Automated remediation for security compliance violations'

Resources:
  RemediationRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: RemediationPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - s3:PutBucketPublicAccessBlock
                  - ec2:ModifyInstanceAttribute
                  - iam:UpdateAccountPasswordPolicy
                Resource: '*'

  S3PublicAccessRemediationFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: S3PublicAccessRemediation
      Runtime: python3.9
      Handler: index.lambda_handler
      Role: !GetAtt RemediationRole.Arn
      Code:
        ZipFile: |
          import boto3
          import json
          
          def lambda_handler(event, context):
              s3 = boto3.client('s3')
              
              # Extract bucket name from Config event
              bucket_name = event['configurationItem']['resourceName']
              
              # Apply public access block
              s3.put_public_access_block(
                  Bucket=bucket_name,
                  PublicAccessBlockConfiguration={
                      'BlockPublicAcls': True,
                      'IgnorePublicAcls': True, 
                      'BlockPublicPolicy': True,
                      'RestrictPublicBuckets': True
                  }
              )
              
              return {
                  'statusCode': 200,
                  'body': json.dumps(f'Applied public access block to {bucket_name}')
              }

  RemediationConfigurationS3:
    Type: AWS::Config::RemediationConfiguration
    Properties:
      ConfigRuleName: s3-bucket-public-access-prohibited
      TargetType: SSM_DOCUMENT
      TargetId: !Ref S3PublicAccessRemediationFunction
      TargetVersion: '1.0'
      Parameters:
        AutomationAssumeRole:
          StaticValue: !GetAtt RemediationRole.Arn
      Automatic: true
      MaximumAutomaticAttempts: 3

Security Hub Centralized Security Management

AWS Security Hub provides a central dashboard for security alerts and compliance status across your AWS environment. It aggregates findings from multiple AWS security services and third-party tools.

Security Hub Organization Setup

Enable Security Hub Organization-wide
# Enable Security Hub in management account
aws securityhub enable-security-hub \
  --enable-default-standards

# Enable organization management
aws securityhub create-members \
  --account-details AccountId=111122223333,Email=security@example.com AccountId=444455556666,Email=dev@example.com

# Auto-enable for new accounts
aws securityhub update-organization-configuration \
  --auto-enable

# Enable important security standards
aws securityhub batch-enable-standards \
  --standards-subscription-requests '[
    {
      "StandardsArn": "arn:aws:securityhub:::ruleset/finding-format/aws-foundational-security-standard/v/1.0.0"
    },
    {
      "StandardsArn": "arn:aws:securityhub:::ruleset/finding-format/cis-aws-foundations-benchmark/v/1.2.0"
    },
    {
      "StandardsArn": "arn:aws:securityhub:::ruleset/finding-format/pci-dss/v/3.2.1"
    }
  ]'

Custom Security Findings

custom-security-findings.py
import boto3
import json
from datetime import datetime

def create_custom_finding(title, description, severity, resource_arn, resource_type):
    """
    Create a custom security finding in Security Hub
    """
    securityhub = boto3.client('securityhub')
    account_id = boto3.client('sts').get_caller_identity()['Account']
    region = boto3.Session().region_name
    
    finding = {
        'SchemaVersion': '2018-10-08',
        'Id': f"custom-finding-{resource_arn.split('/')[-1]}-{int(datetime.now().timestamp())}",
        'ProductArn': f"arn:aws:securityhub:{region}:{account_id}:product/{account_id}/default",
        'GeneratorId': 'custom-security-monitor',
        'AwsAccountId': account_id,
        'CreatedAt': datetime.now().isoformat() + 'Z',
        'UpdatedAt': datetime.now().isoformat() + 'Z',
        'Severity': {
            'Label': severity.upper()
        },
        'Title': title,
        'Description': description,
        'Resources': [
            {
                'Type': resource_type,
                'Id': resource_arn,
                'Region': region
            }
        ],
        'WorkflowState': 'NEW',
        'RecordState': 'ACTIVE'
    }
    
    response = securityhub.batch_import_findings(Findings=[finding])
    return response

def lambda_handler(event, context):
    """
    Lambda function to create custom findings based on CloudWatch alarms
    """
    
    # Example: High data transfer volumes (potential data exfiltration)
    if 'cloudwatch_alarm' in event:
        alarm = event['cloudwatch_alarm']
        
        if alarm['alarm_name'] == 'High-Data-Transfer':
            create_custom_finding(
                title='Unusual Data Transfer Detected',
                description=f'High volume of data transfer detected from instance {alarm["instance_id"]}. This could indicate data exfiltration.',
                severity='HIGH',
                resource_arn=f'arn:aws:ec2:{boto3.Session().region_name}:{boto3.client("sts").get_caller_identity()["Account"]}:instance/{alarm["instance_id"]}',
                resource_type='AwsEc2Instance'
            )
    
    return {
        'statusCode': 200,
        'body': json.dumps('Custom finding created successfully')
    }

Security Hub Automation

🔍

Detection

Security services generate findings and send to Security Hub

📊

Aggregation

Security Hub normalizes and correlates findings from multiple sources

Automation

EventBridge rules trigger automated response workflows

🛠️

Remediation

Lambda functions execute automated remediation actions

GuardDuty Intelligent Threat Detection

Amazon GuardDuty uses machine learning, anomaly detection, and integrated threat intelligence to identify threats such as malicious IPs, cryptocurrency mining, and data exfiltration attempts.

GuardDuty Organization Setup

Enable GuardDuty Organization-wide
# Enable GuardDuty in management account
aws guardduty create-detector \
  --enable \
  --finding-publishing-frequency FIFTEEN_MINUTES

# Get detector ID
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text)

# Enable organization features
aws guardduty update-organization-configuration \
  --detector-id $DETECTOR_ID \
  --auto-enable

# Enable Malware Protection
aws guardduty update-malware-scan-settings \
  --detector-id $DETECTOR_ID \
  --scan-resource-criteria '{"Include": {"EC2_INSTANCE_TAG": {"EnvironmentTag": ["Production", "Staging"]}}}'

# Enable S3 Protection
aws guardduty update-detector \
  --detector-id $DETECTOR_ID \
  --datasources '{
    "S3Logs": {"Enable": true},
    "Kubernetes": {"AuditLogs": {"Enable": true}},
    "MalwareProtection": {"ScanEc2InstanceWithFindings": {"EbsVolumes": true}}
  }'

Custom Threat Intelligence

Upload Custom Threat Lists
# Create S3 bucket for threat intelligence
aws s3 mb s3://guardduty-threat-intel-bucket

# Upload IP threat list
aws s3 cp malicious-ips.txt s3://guardduty-threat-intel-bucket/

# Create threat intel set
aws guardduty create-threat-intel-set \
  --detector-id $DETECTOR_ID \
  --name "CustomMaliciousIPs" \
  --format TXT \
  --location s3://guardduty-threat-intel-bucket/malicious-ips.txt \
  --activate

# Upload domain threat list  
aws s3 cp malicious-domains.txt s3://guardduty-threat-intel-bucket/

aws guardduty create-threat-intel-set \
  --detector-id $DETECTOR_ID \
  --name "CustomMaliciousDomains" \
  --format TXT \
  --location s3://guardduty-threat-intel-bucket/malicious-domains.txt \
  --activate

GuardDuty Findings Analysis

🚨 High Severity Findings

Query High Severity Findings
# Get high severity findings from last 24 hours
aws guardduty list-findings \
  --detector-id $DETECTOR_ID \
  --finding-criteria '{
    "Criterion": {
      "severity": {
        "Gte": 7.0
      },
      "updatedAt": {
        "Gte": "'$(date -d "24 hours ago" -Iseconds)'"
      }
    }
  }' \
  --sort-criteria '{
    "AttributeName": "severity",
    "OrderBy": "DESC"
  }'

🔍 Findings by Type

Common Finding Types Analysis
# Cryptocurrency mining detection
aws guardduty list-findings \
  --detector-id $DETECTOR_ID \
  --finding-criteria '{
    "Criterion": {
      "type": {
        "Eq": ["CryptoCurrency:EC2/BitcoinTool.B!DNS"]
      }
    }
  }'

# Backdoor communication
aws guardduty list-findings \
  --detector-id $DETECTOR_ID \
  --finding-criteria '{
    "Criterion": {
      "type": {
        "Eq": ["Backdoor:EC2/C&CActivity.B!DNS"]
      }
    }
  }'

# Data exfiltration attempts
aws guardduty list-findings \
  --detector-id $DETECTOR_ID \
  --finding-criteria '{
    "Criterion": {
      "type": {
        "Eq": ["Exfiltration:S3/ObjectRead.Unusual"]
      }
    }
  }'

VPC Flow Logs Network Security Analysis

VPC Flow Logs capture information about IP traffic going to and from network interfaces in your VPC. This data is essential for network security monitoring, troubleshooting, and forensic analysis.

Comprehensive Flow Logs Setup

Enable VPC Flow Logs
# Enable VPC-level flow logs
aws ec2 create-flow-logs \
  --resource-type VPC \
  --resource-ids vpc-12345678 \
  --traffic-type ALL \
  --log-destination-type cloud-watch-logs \
  --log-group-name VPCFlowLogs \
  --deliver-logs-permission-arn arn:aws:iam::123456789012:role/flowlogsRole

# Enable subnet-level flow logs for sensitive subnets
aws ec2 create-flow-logs \
  --resource-type Subnet \
  --resource-ids subnet-12345678 subnet-87654321 \
  --traffic-type ALL \
  --log-destination-type s3 \
  --log-destination arn:aws:s3:::vpc-flow-logs-bucket/AWSLogs/

# Enable ENI-level flow logs for critical instances
aws ec2 create-flow-logs \
  --resource-type ENI \
  --resource-ids eni-12345678 \
  --traffic-type ALL \
  --log-destination-type cloud-watch-logs \
  --log-group-name CriticalInstanceFlowLogs

Flow Logs Analysis Queries

CloudWatch Insights Security Queries
# Top rejected connections (potential attacks)
fields @timestamp, srcaddr, dstaddr, srcport, dstport, protocol, action
| filter action = "REJECT"
| stats count() by srcaddr, dstaddr, dstport
| sort count desc
| limit 50

# Unusual outbound connections to external IPs
fields @timestamp, srcaddr, dstaddr, dstport, protocol, bytes
| filter action = "ACCEPT" and dstaddr not like /^10\./ and dstaddr not like /^172\.(1[6-9]|2[0-9]|3[0-1])\./ and dstaddr not like /^192\.168\./
| stats sum(bytes) as total_bytes by srcaddr, dstaddr, dstport
| sort total_bytes desc
| limit 100

# SSH brute force detection
fields @timestamp, srcaddr, dstaddr, dstport, action
| filter dstport = 22 and action = "REJECT"
| stats count() as attempts by srcaddr, dstaddr
| sort attempts desc
| limit 20

# Data exfiltration detection (high volume outbound)
fields @timestamp, srcaddr, dstaddr, bytes, action
| filter action = "ACCEPT" and bytes > 1000000
| stats sum(bytes) as total_bytes by srcaddr, dstaddr
| sort total_bytes desc
| limit 50

Automated Flow Logs Alerting

flow-logs-analysis-lambda.py
import boto3
import json
import gzip
import base64
from datetime import datetime, timedelta

def lambda_handler(event, context):
    """
    Analyze VPC Flow Logs for security threats
    """
    
    # Decode CloudWatch Logs data
    cw_data = event['awslogs']['data']
    compressed_payload = base64.b64decode(cw_data)
    uncompressed_payload = gzip.decompress(compressed_payload)
    log_data = json.loads(uncompressed_payload)
    
    suspicious_activities = []
    
    for log_event in log_data['logEvents']:
        message = log_event['message']
        fields = message.split(' ')
        
        if len(fields) >= 14:
            srcaddr = fields[3]
            dstaddr = fields[4]
            srcport = fields[5]
            dstport = fields[6]
            protocol = fields[7]
            packets = int(fields[8]) if fields[8].isdigit() else 0
            bytes_transferred = int(fields[9]) if fields[9].isdigit() else 0
            action = fields[12]
            
            # Detect suspicious patterns
            # 1. High volume data transfer
            if bytes_transferred > 100000000:  # 100MB
                suspicious_activities.append({
                    'type': 'HIGH_DATA_TRANSFER',
                    'source': srcaddr,
                    'destination': dstaddr,
                    'bytes': bytes_transferred,
                    'timestamp': log_event['timestamp']
                })
            
            # 2. Multiple rejected connections (port scan)
            if action == 'REJECT' and packets > 10:
                suspicious_activities.append({
                    'type': 'PORT_SCAN_ATTEMPT',
                    'source': srcaddr,
                    'destination': dstaddr,
                    'port': dstport,
                    'packets': packets,
                    'timestamp': log_event['timestamp']
                })
            
            # 3. Connections to known bad ports
            bad_ports = ['1433', '3389', '5432', '27017']  # SQL Server, RDP, PostgreSQL, MongoDB
            if dstport in bad_ports and action == 'ACCEPT':
                suspicious_activities.append({
                    'type': 'DANGEROUS_PORT_ACCESS',
                    'source': srcaddr,
                    'destination': dstaddr,
                    'port': dstport,
                    'timestamp': log_event['timestamp']
                })
    
    # Send alerts for suspicious activities
    if suspicious_activities:
        sns = boto3.client('sns')
        message = json.dumps({
            'alert_type': 'VPC_FLOW_LOGS_SECURITY_ALERT',
            'suspicious_activities': suspicious_activities,
            'analysis_timestamp': datetime.now().isoformat()
        }, indent=2)
        
        sns.publish(
            TopicArn='arn:aws:sns:us-east-1:123456789012:security-alerts',
            Message=message,
            Subject='VPC Flow Logs Security Alert'
        )
    
    return {
        'statusCode': 200,
        'body': json.dumps(f'Analyzed {len(log_data["logEvents"])} log events, found {len(suspicious_activities)} suspicious activities')
    }

Automated Security Incident Response

Implement automated incident response workflows using EventBridge, Lambda, and Systems Manager to reduce response time and ensure consistent security incident handling.

EventBridge Security Automation Rules

security-eventbridge-rules.json
{
  "Rules": [
    {
      "Name": "GuardDutyHighSeverityFindings",
      "EventPattern": {
        "source": ["aws.guardduty"],
        "detail-type": ["GuardDuty Finding"],
        "detail": {
          "severity": [{"numeric": [">=", 7.0]}]
        }
      },
      "State": "ENABLED",
      "Targets": [
        {
          "Id": "1",
          "Arn": "arn:aws:lambda:us-east-1:123456789012:function:SecurityIncidentResponse"
        },
        {
          "Id": "2", 
          "Arn": "arn:aws:sns:us-east-1:123456789012:security-alerts"
        }
      ]
    },
    {
      "Name": "SecurityHubCriticalFindings",
      "EventPattern": {
        "source": ["aws.securityhub"],
        "detail-type": ["Security Hub Findings - Imported"],
        "detail": {
          "findings": {
            "Severity": {
              "Label": ["CRITICAL", "HIGH"]
            },
            "WorkflowState": ["NEW"]
          }
        }
      },
      "State": "ENABLED", 
      "Targets": [
        {
          "Id": "1",
          "Arn": "arn:aws:lambda:us-east-1:123456789012:function:SecurityHubResponseHandler"
        }
      ]
    },
    {
      "Name": "UnauthorizedAPICallsPattern",
      "EventPattern": {
        "source": ["aws.cloudtrail"],
        "detail": {
          "errorCode": ["UnauthorizedOperation", "AccessDenied", "InvalidUserID.NotFound"],
          "sourceIPAddress": [{"anything-but": {"prefix": "10."}}]
        }
      },
      "State": "ENABLED",
      "Targets": [
        {
          "Id": "1",
          "Arn": "arn:aws:lambda:us-east-1:123456789012:function:UnauthorizedAccessHandler"
        }
      ]
    }
  ]
}

Incident Response Lambda Functions

🚨 Malicious IP Isolation

isolate-malicious-ip.py
import boto3
import json

def lambda_handler(event, context):
    """
    Automatically isolate instances communicating with malicious IPs
    """
    ec2 = boto3.client('ec2')
    
    # Extract finding details
    detail = event['detail']
    finding_type = detail.get('type', '')
    service = detail.get('service', {})
    
    if 'Backdoor' in finding_type or 'Trojan' in finding_type:
        # Get affected instance
        instance_details = service.get('instanceDetails', {})
        instance_id = instance_details.get('instanceId')
        
        if instance_id:
            # Create isolation security group
            try:
                response = ec2.create_security_group(
                    GroupName=f'isolation-{instance_id}',
                    Description='Isolation security group for compromised instance',
                    VpcId=instance_details.get('vpcId')
                )
                isolation_sg_id = response['GroupId']
                
                # Attach isolation security group
                ec2.modify_instance_attribute(
                    InstanceId=instance_id,
                    Groups=[isolation_sg_id]
                )
                
                # Create Systems Manager document for forensics
                ssm = boto3.client('ssm')
                ssm.send_command(
                    InstanceIds=[instance_id],
                    DocumentName='AWS-RunShellScript',
                    Parameters={
                        'commands': [
                            'sudo netstat -tulpn > /tmp/network_connections.txt',
                            'sudo ps aux > /tmp/running_processes.txt',
                            'sudo find /tmp -name "*.log" -exec cp {} /tmp/forensics/ \;'
                        ]
                    }
                )
                
                return {
                    'statusCode': 200,
                    'body': json.dumps({
                        'action': 'INSTANCE_ISOLATED',
                        'instance_id': instance_id,
                        'isolation_sg': isolation_sg_id
                    })
                }
                
            except Exception as e:
                print(f'Error isolating instance: {str(e)}')
                return {
                    'statusCode': 500,
                    'body': json.dumps(f'Failed to isolate instance: {str(e)}')
                }
    
    return {
        'statusCode': 200,
        'body': json.dumps('No isolation required')
    }

🔐 Compromise Account Response

compromise-account-response.py
import boto3
import json
from datetime import datetime

def lambda_handler(event, context):
    """
    Respond to compromised IAM credentials
    """
    iam = boto3.client('iam')
    
    # Extract user information from GuardDuty finding
    detail = event['detail']
    user_identity = detail.get('service', {}).get('userDetails', {})
    username = user_identity.get('userName')
    access_key_id = user_identity.get('accessKeyId')
    
    if username and access_key_id:
        try:
            # Disable the compromised access key
            iam.update_access_key(
                UserName=username,
                AccessKeyId=access_key_id,
                Status='Inactive'
            )
            
            # Attach explicit deny policy
            deny_policy = {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Deny",
                        "Action": "*",
                        "Resource": "*",
                        "Condition": {
                            "StringEquals": {
                                "aws:RequestedRegion": "*"
                            }
                        }
                    }
                ]
            }
            
            policy_name = f'EmergencyDeny-{username}-{datetime.now().strftime("%Y%m%d%H%M%S")}'
            
            iam.put_user_policy(
                UserName=username,
                PolicyName=policy_name,
                PolicyDocument=json.dumps(deny_policy)
            )
            
            # Force password reset if user has console access
            try:
                iam.update_login_profile(
                    UserName=username,
                    PasswordResetRequired=True
                )
            except iam.exceptions.NoSuchEntityException:
                pass  # User doesn't have console access
            
            # Send notification
            sns = boto3.client('sns')
            sns.publish(
                TopicArn='arn:aws:sns:us-east-1:123456789012:security-alerts',
                Subject='URGENT: IAM User Compromised - Actions Taken',
                Message=json.dumps({
                    'incident_type': 'IAM_COMPROMISE',
                    'username': username,
                    'access_key_disabled': access_key_id,
                    'deny_policy_applied': policy_name,
                    'timestamp': datetime.now().isoformat(),
                    'actions_taken': [
                        'Access key disabled',
                        'Explicit deny policy applied',
                        'Password reset required'
                    ]
                }, indent=2)
            )
            
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'action': 'USER_SECURED',
                    'username': username,
                    'access_key_disabled': access_key_id
                })
            }
            
        except Exception as e:
            print(f'Error securing compromised user: {str(e)}')
            return {
                'statusCode': 500,
                'body': json.dumps(f'Failed to secure user: {str(e)}')
            }
    
    return {
        'statusCode': 200,
        'body': json.dumps('No user action required')
    }

Incident Response Playbooks

incident-response-playbook.yaml
# Systems Manager Automation Document for Security Incidents
schemaVersion: '0.3'
description: 'Automated Security Incident Response Playbook'
assumeRole: '{{ AutomationAssumeRole }}'
parameters:
  InstanceId:
    type: String
    description: 'ID of the compromised EC2 instance'
  IncidentSeverity:
    type: String
    description: 'Severity level of the security incident'
    allowedValues:
      - CRITICAL
      - HIGH
      - MEDIUM
      - LOW
    default: HIGH
  AutomationAssumeRole:
    type: String
    description: 'IAM role for automation execution'

mainSteps:
  - name: CreateSnapshot
    action: 'aws:executeAwsApi'
    description: 'Create snapshot of affected instance for forensics'
    inputs:
      Service: ec2
      Api: CreateSnapshot
      VolumeId: '{{ GetVolumeId.VolumeId }}'
      Description: 'Forensic snapshot for incident {{ automation:EXECUTION_ID }}'
    outputs:
      - Name: SnapshotId
        Selector: $.SnapshotId

  - name: IsolateInstance
    action: 'aws:executeAwsApi' 
    description: 'Apply isolation security group'
    inputs:
      Service: ec2
      Api: ModifyInstanceAttribute
      InstanceId: '{{ InstanceId }}'
      Groups:
        - '{{ CreateIsolationSecurityGroup.GroupId }}'

  - name: CollectForensicData
    action: 'aws:runCommand'
    description: 'Collect forensic data from instance'
    inputs:
      DocumentName: 'AWS-RunShellScript'
      InstanceIds:
        - '{{ InstanceId }}'
      Parameters:
        commands:
          - 'mkdir -p /tmp/forensics'
          - 'netstat -tulpn > /tmp/forensics/network_connections.txt'
          - 'ps aux > /tmp/forensics/processes.txt'
          - 'last -n 100 > /tmp/forensics/login_history.txt'
          - 'find /var/log -name "*.log" -mtime -1 -exec cp {} /tmp/forensics/ \;'
          - 'aws s3 cp /tmp/forensics/ s3://security-forensics-bucket/{{ automation:EXECUTION_ID }}/ --recursive'

  - name: NotifySecurityTeam
    action: 'aws:executeAwsApi'
    description: 'Send notification to security team'
    inputs:
      Service: sns
      Api: Publish
      TopicArn: 'arn:aws:sns:us-east-1:123456789012:security-incidents'
      Subject: 'Security Incident Response - {{ IncidentSeverity }}'
      Message: |
        Security incident response executed for instance {{ InstanceId }}
        
        Actions taken:
        - Forensic snapshot created: {{ CreateSnapshot.SnapshotId }}
        - Instance isolated with security group
        - Forensic data collected and uploaded to S3
        
        Incident severity: {{ IncidentSeverity }}
        Automation execution: {{ automation:EXECUTION_ID }}
        
        Please review findings and take additional manual actions as needed.

outputs:
  - SnapshotId: '{{ CreateSnapshot.SnapshotId }}'
  - AutomationExecutionId: '{{ automation:EXECUTION_ID }}'

Cost Anomaly Detection for Security Breaches

Unusual cost spikes can indicate security breaches such as cryptocurrency mining, unauthorized resource creation, or data exfiltration. AWS Cost Anomaly Detection helps identify these patterns.

Cost Anomaly Detectors for Security

Setup Security-Focused Cost Monitoring
# Create cost anomaly detector for EC2 usage spikes
aws ce create-anomaly-detector \
  --anomaly-detector '{
    "DetectorName": "EC2-Security-Anomaly-Detector",
    "MonitorType": "DIMENSIONAL",
    "DimensionKey": "SERVICE",
    "MatchOptions": ["EQUALS"],
    "MonitorSpecification": "{"DimensionKeyValue": "Amazon Elastic Compute Cloud - Compute"}"
  }'

# Create detector for data transfer anomalies  
aws ce create-anomaly-detector \
  --anomaly-detector '{
    "DetectorName": "DataTransfer-Security-Anomaly-Detector",
    "MonitorType": "DIMENSIONAL", 
    "DimensionKey": "USAGE_TYPE_GROUP",
    "MatchOptions": ["EQUALS"],
    "MonitorSpecification": "{"DimensionKeyValue": "EC2-Data Transfer"}"
  }'

# Create detector for Lambda execution anomalies
aws ce create-anomaly-detector \
  --anomaly-detector '{
    "DetectorName": "Lambda-Security-Anomaly-Detector",
    "MonitorType": "DIMENSIONAL",
    "DimensionKey": "SERVICE", 
    "MatchOptions": ["EQUALS"],
    "MonitorSpecification": "{"DimensionKeyValue": "AWS Lambda"}"
  }'

Cost Anomaly Alerting

Create Cost Anomaly Subscriptions
# Create subscription for immediate alerting
aws ce create-anomaly-subscription \
  --anomaly-subscription '{
    "SubscriptionName": "Security-Cost-Anomalies",
    "MonitorArnList": [
      "arn:aws:ce::123456789012:anomalydetector/ec2-security-detector-id",
      "arn:aws:ce::123456789012:anomalydetector/datatransfer-security-detector-id",
      "arn:aws:ce::123456789012:anomalydetector/lambda-security-detector-id"
    ],
    "Subscribers": [
      {
        "Address": "security-team@company.com",
        "Type": "EMAIL",
        "Status": "CONFIRMED"
      },
      {
        "Address": "arn:aws:sns:us-east-1:123456789012:cost-security-alerts",
        "Type": "SNS",
        "Status": "CONFIRMED"
      }
    ],
    "Threshold": 100.0,
    "Frequency": "IMMEDIATE",
    "DimensionFilters": [
      {
        "DimensionKey": "ANOMALY_TOTAL_IMPACT_ABSOLUTE",
        "Values": ["100"]
      }
    ]
  }'

Cost-Based Security Analysis

cost-security-analysis.py
import boto3
import json
from datetime import datetime, timedelta

def analyze_cost_anomalies(event, context):
    """
    Analyze cost anomalies for potential security implications
    """
    ce_client = boto3.client('ce')
    
    # Get cost anomalies from the last 7 days
    end_date = datetime.now().date()
    start_date = end_date - timedelta(days=7)
    
    response = ce_client.get_anomalies(
        DateInterval={
            'StartDate': start_date.strftime('%Y-%m-%d'),
            'EndDate': end_date.strftime('%Y-%m-%d')
        },
        TotalImpact={
            'NumericOperator': 'GREATER_THAN_OR_EQUAL',
            'Values': ['50']  # $50 or more impact
        }
    )
    
    security_alerts = []
    
    for anomaly in response['Anomalies']:
        impact = float(anomaly['Impact']['TotalImpact'])
        service = anomaly.get('DimensionMetadata', {}).get('SERVICE', 'Unknown')
        
        # Analyze for security-related patterns
        security_risk = analyze_security_risk(anomaly, service, impact)
        
        if security_risk['risk_level'] in ['HIGH', 'CRITICAL']:
            security_alerts.append({
                'anomaly_id': anomaly['AnomalyId'],
                'service': service,
                'impact': impact,
                'risk_assessment': security_risk,
                'start_date': anomaly['AnomalyStartDate'],
                'end_date': anomaly['AnomalyEndDate']
            })
    
    # Send security alerts if found
    if security_alerts:
        send_security_cost_alert(security_alerts)
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'total_anomalies': len(response['Anomalies']),
            'security_alerts': len(security_alerts),
            'alerts': security_alerts
        })
    }

def analyze_security_risk(anomaly, service, impact):
    """
    Analyze cost anomaly for security risk indicators
    """
    risk_level = 'LOW'
    risk_indicators = []
    
    # High impact thresholds by service
    if service == 'Amazon Elastic Compute Cloud - Compute' and impact > 500:
        risk_level = 'HIGH'
        risk_indicators.append('Potential cryptocurrency mining or unauthorized compute usage')
    
    elif service == 'Amazon Simple Storage Service' and impact > 200:
        risk_level = 'HIGH' 
        risk_indicators.append('Potential data exfiltration or unauthorized storage usage')
    
    elif service == 'AWS Lambda' and impact > 100:
        risk_level = 'CRITICAL'
        risk_indicators.append('Potential malicious Lambda execution or DDoS amplification')
    
    elif 'Data Transfer' in service and impact > 300:
        risk_level = 'CRITICAL'
        risk_indicators.append('Potential data exfiltration via high egress costs')
    
    # Check for unusual time patterns
    anomaly_hour = datetime.strptime(anomaly['AnomalyStartDate'], '%Y-%m-%d').hour
    if anomaly_hour < 6 or anomaly_hour > 22:  # Outside business hours
        risk_indicators.append('Anomaly detected outside normal business hours')
        if risk_level == 'LOW':
            risk_level = 'MEDIUM'
    
    return {
        'risk_level': risk_level,
        'risk_indicators': risk_indicators,
        'recommendation': get_security_recommendation(risk_level, service)
    }

def get_security_recommendation(risk_level, service):
    """
    Get security recommendations based on risk level and service
    """
    if risk_level in ['HIGH', 'CRITICAL']:
        return f"Immediate investigation required for {service} usage anomaly. Check for unauthorized access, review CloudTrail logs, and validate all recent resource changes."
    elif risk_level == 'MEDIUM':
        return f"Monitor {service} usage patterns and review access logs. Consider implementing additional monitoring."
    else:
        return f"Continue monitoring {service} usage patterns."

def send_security_cost_alert(alerts):
    """
    Send security alerts via SNS
    """
    sns = boto3.client('sns')
    
    message = {
        'alert_type': 'COST_ANOMALY_SECURITY_ALERT',
        'alert_count': len(alerts),
        'alerts': alerts,
        'timestamp': datetime.now().isoformat(),
        'recommended_actions': [
            'Review CloudTrail logs for suspicious API calls',
            'Check GuardDuty findings for corresponding threats',
            'Validate all recent IAM and resource changes',
            'Consider isolating affected resources if compromise suspected'
        ]
    }
    
    sns.publish(
        TopicArn='arn:aws:sns:us-east-1:123456789012:cost-security-alerts',
        Subject=f'SECURITY ALERT: {len(alerts)} Cost Anomalies Detected',
        Message=json.dumps(message, indent=2)
    )

AWS Security Monitoring Best Practices

📊 Comprehensive Logging

Enable CloudTrail for all regions and accounts, capture data events for sensitive resources, and implement log file validation for integrity.

🔍 Real-time Analysis

Use CloudWatch Logs Insights for real-time analysis, create custom metrics for security events, and implement intelligent alerting thresholds.

🛡️ Centralized Management

Use Security Hub for centralized findings management, implement organization-wide security standards, and automate compliance monitoring.

🚨 Automated Response

Implement EventBridge rules for automated incident response, create playbooks for common security scenarios, and maintain isolation procedures.

🎯 Threat Intelligence

Enable GuardDuty with custom threat intelligence feeds, monitor for emerging threats, and integrate with external security tools.

💰 Cost-Based Detection

Monitor cost anomalies as security indicators, implement usage-based alerting, and correlate cost spikes with security events.

Security Monitoring Troubleshooting

🔍 Missing CloudTrail Events

Problem: Important API calls not appearing in CloudTrail logs

Solutions:

  • Verify CloudTrail is enabled in all regions
  • Check event selector configuration for data events
  • Ensure S3 bucket permissions allow CloudTrail delivery
  • Verify CloudWatch Logs integration is properly configured

🚨 False Positive Alerts

Problem: Security alarms triggering on legitimate activities

Solutions:

  • Adjust alarm thresholds based on baseline metrics
  • Implement time-based filtering for business hours
  • Add IP address whitelisting for known safe sources
  • Use composite alarms for multiple condition validation

⚡ High Lambda Costs in GuardDuty

Problem: GuardDuty generating high Lambda execution costs

Solutions:

  • Optimize Lambda function timeout and memory allocation
  • Implement efficient filtering to reduce function invocations
  • Use Step Functions for complex workflow orchestration
  • Consider batching events before processing

Next Steps

🎉 Congratulations!

You now have comprehensive AWS security monitoring capabilities, including:

  • ✅ CloudTrail comprehensive logging and analysis setup
  • ✅ CloudWatch security metrics, alarms, and dashboards
  • ✅ AWS Config compliance monitoring and automated remediation
  • ✅ Security Hub centralized security findings management
  • ✅ GuardDuty intelligent threat detection and analysis
  • ✅ VPC Flow Logs network security monitoring
  • ✅ Automated incident response with EventBridge and Lambda
  • ✅ Cost anomaly detection for security breach indicators