복사 # lambda/aiops_agent.py
import json
import boto3
import os
from datetime import datetime, timedelta
from typing import Dict, Any, List
# AWS Clients
bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-east-1')
logs_client = boto3.client('logs', region_name='us-east-1')
amp_client = boto3.client('amp', region_name='us-east-1')
xray_client = boto3.client('xray', region_name='us-east-1')
sns_client = boto3.client('sns', region_name='us-east-1')
# Configuration
AMP_WORKSPACE_ID = os.environ.get('AMP_WORKSPACE_ID')
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN')
LOG_GROUP = os.environ.get('LOG_GROUP', '/aws/eks/obs-lab/application')
SYSTEM_PROMPT = """You are an expert Site Reliability Engineer (SRE) analyzing production incidents.
Your role is to:
1. Analyze the provided alert, metrics, logs, and traces
2. Identify the root cause of the issue
3. Provide actionable recommendations
Format your response as follows:
## Summary
[Brief summary of the incident]
## Root Cause Analysis
[Detailed analysis of what caused the issue]
## Evidence
- Metrics: [relevant metric observations]
- Logs: [relevant log patterns]
- Traces: [trace observations if available]
## Recommendations
1. [Immediate action]
2. [Short-term fix]
3. [Long-term prevention]
## Severity Assessment
[Critical/High/Medium/Low] - [Justification]
"""
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Main Lambda handler for AIOps agent"""
print(f"Received event: {json.dumps(event)}")
# Parse alert from SNS or AlertManager webhook
alert = parse_alert(event)
if not alert:
return {'statusCode': 400, 'body': 'Invalid alert format'}
print(f"Parsed alert: {json.dumps(alert)}")
# Determine time range for analysis
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=30)
# Collect telemetry data in parallel (simulated)
telemetry = collect_telemetry(
alert=alert,
start_time=start_time,
end_time=end_time
)
# Invoke Bedrock Claude for analysis
analysis = analyze_with_claude(alert, telemetry)
# Publish analysis to SNS
publish_analysis(alert, analysis)
return {
'statusCode': 200,
'body': json.dumps({
'alert': alert.get('alertname'),
'analysis_sent': True
})
}
def parse_alert(event: Dict[str, Any]) -> Dict[str, Any]:
"""Parse alert from various sources"""
# SNS message
if 'Records' in event:
for record in event['Records']:
if record.get('EventSource') == 'aws:sns':
message = json.loads(record['Sns']['Message'])
return parse_alertmanager_payload(message)
# Direct AlertManager webhook
if 'alerts' in event:
return parse_alertmanager_payload(event)
# API Gateway
if 'body' in event:
body = json.loads(event['body'])
return parse_alertmanager_payload(body)
return None
def parse_alertmanager_payload(payload: Dict[str, Any]) -> Dict[str, Any]:
"""Parse AlertManager payload"""
if 'alerts' in payload and len(payload['alerts']) > 0:
alert = payload['alerts'][0]
return {
'alertname': alert.get('labels', {}).get('alertname'),
'severity': alert.get('labels', {}).get('severity'),
'service': alert.get('labels', {}).get('service'),
'namespace': alert.get('labels', {}).get('namespace'),
'summary': alert.get('annotations', {}).get('summary'),
'description': alert.get('annotations', {}).get('description'),
'status': alert.get('status'),
'startsAt': alert.get('startsAt'),
'labels': alert.get('labels', {})
}
return payload
def collect_telemetry(alert: Dict[str, Any], start_time: datetime, end_time: datetime) -> Dict[str, Any]:
"""Collect relevant telemetry data"""
telemetry = {
'logs': [],
'metrics': [],
'traces': []
}
service = alert.get('service', '')
namespace = alert.get('namespace', 'msa')
# 1. CloudWatch Logs Insights query
try:
query = f"""
fields @timestamp, @message, @logStream
| filter @message like /error|Error|ERROR|exception|Exception/
| filter @logStream like /{service}/
| sort @timestamp desc
| limit 20
"""
query_response = logs_client.start_query(
logGroupName=LOG_GROUP,
startTime=int(start_time.timestamp()),
endTime=int(end_time.timestamp()),
queryString=query
)
query_id = query_response['queryId']
# Wait for query to complete (simplified)
import time
time.sleep(5)
results = logs_client.get_query_results(queryId=query_id)
telemetry['logs'] = results.get('results', [])[:10]
except Exception as e:
print(f"Error querying logs: {e}")
telemetry['logs'] = [{'error': str(e)}]
# 2. AMP (Prometheus) metrics
try:
# Query error rate
error_rate_query = f'sum(rate(http_requests_total{{namespace="{namespace}", service="{service}", status=~"5.."}}[5m])) / sum(rate(http_requests_total{{namespace="{namespace}", service="{service}"}}[5m]))'
# Query latency
latency_query = f'histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket{{namespace="{namespace}", service="{service}"}}[5m])) by (le))'
# Note: AMP queries require workspace query API
telemetry['metrics'] = {
'error_rate_query': error_rate_query,
'latency_query': latency_query,
'note': 'Actual values would be fetched from AMP workspace'
}
except Exception as e:
print(f"Error querying metrics: {e}")
telemetry['metrics'] = {'error': str(e)}
# 3. X-Ray traces
try:
trace_response = xray_client.get_trace_summaries(
StartTime=start_time,
EndTime=end_time,
FilterExpression=f'service("{service}") AND responseTime > 2'
)
telemetry['traces'] = [
{
'id': t.get('Id'),
'duration': t.get('Duration'),
'has_error': t.get('HasError'),
'http_status': t.get('Http', {}).get('HttpStatus')
}
for t in trace_response.get('TraceSummaries', [])[:5]
]
except Exception as e:
print(f"Error querying traces: {e}")
telemetry['traces'] = [{'error': str(e)}]
return telemetry
def analyze_with_claude(alert: Dict[str, Any], telemetry: Dict[str, Any]) -> str:
"""Invoke Bedrock Claude for analysis"""
user_message = f"""
Please analyze this production incident:
## Alert Details
- Alert Name: {alert.get('alertname')}
- Severity: {alert.get('severity')}
- Service: {alert.get('service')}
- Namespace: {alert.get('namespace')}
- Summary: {alert.get('summary')}
- Description: {alert.get('description')}
- Started At: {alert.get('startsAt')}
## Collected Telemetry
### Logs (last 30 minutes)
{json.dumps(telemetry.get('logs', []), indent=2)}
### Metrics Queries
{json.dumps(telemetry.get('metrics', {}), indent=2)}
### Trace Summaries
{json.dumps(telemetry.get('traces', []), indent=2)}
Please provide your analysis following the specified format.
"""
try:
response = bedrock_runtime.invoke_model(
modelId='anthropic.claude-3-5-sonnet-20241022-v2:0',
contentType='application/json',
accept='application/json',
body=json.dumps({
'anthropic_version': 'bedrock-2023-05-31',
'max_tokens': 2048,
'system': SYSTEM_PROMPT,
'messages': [
{
'role': 'user',
'content': user_message
}
]
})
)
response_body = json.loads(response['body'].read())
return response_body['content'][0]['text']
except Exception as e:
print(f"Error invoking Bedrock: {e}")
return f"Error analyzing alert: {str(e)}"
def publish_analysis(alert: Dict[str, Any], analysis: str) -> None:
"""Publish analysis to SNS"""
message = f"""
=== AIOps Alert Analysis ===
Alert: {alert.get('alertname')}
Service: {alert.get('service')}
Severity: {alert.get('severity')}
Time: {datetime.utcnow().isoformat()}
{analysis}
---
Generated by Observability Lab AIOps Agent
"""
try:
sns_client.publish(
TopicArn=SNS_TOPIC_ARN,
Subject=f"[AIOps Analysis] {alert.get('alertname')} - {alert.get('severity')}",
Message=message
)
print("Analysis published to SNS")
except Exception as e:
print(f"Error publishing to SNS: {e}")