Data Flow & Integration Patterns¶
Comprehensive documentation of data flows, integration patterns, and message formats across the AI-SOC platform.
Overview¶
The AI-SOC platform processes security telemetry through multiple integrated data flows. This document describes the end-to-end data journeys from ingestion through detection, analysis, and response.
Key Data Flow Categories: 1. Log Ingestion & Correlation - Raw events to indexed alerts 2. ML-Powered Classification - Alert to prediction 3. AI-Augmented Triage - Alert to enriched case 4. SOAR Orchestration - Case to automated response 5. Observability Pipeline - Service metrics to dashboards
1. Log Ingestion & Correlation Flow¶
High-Level Flow¶
External Sources → Wazuh Manager → Rule Engine → Indexer → Storage
↓
Alert Generation
↓
Webhook → TheHive/AI Services
Detailed Flow Diagram¶
┌────────────────────────────────────────────────────────────┐
│ External Data Sources │
├────────────────────────────────────────────────────────────┤
│ │
│ • Suricata EVE JSON (network alerts) │
│ • Zeek connection logs (network metadata) │
│ • System logs (syslog, Windows Event Log) │
│ • Application logs (web servers, databases) │
│ • Cloud security logs (AWS CloudTrail, Azure Activity) │
│ │
└─────────────┬──────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Transport Layer (Protocol Selection) │
├─────────────────────────────────────────────────────────────┤
│ │
│ • TCP/1514: Wazuh Agent (encrypted) │
│ • UDP/514: Syslog │
│ • Filebeat → Wazuh Manager API │
│ • Direct API: HTTP POST to Wazuh Manager │
│ │
└─────────────┬───────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Wazuh Manager: Event Processing Pipeline │
├──────────────────────────────────────────────────────────────┤
│ │
│ 1. Input Reception │
│ └─► Buffer: 16KB per event │
│ │
│ 2. Decoding │
│ └─► Log format detection │
│ └─► Field extraction (src_ip, dst_ip, etc.) │
│ │
│ 3. Rule Matching │
│ └─► 3,000+ detection rules │
│ └─► Regex pattern matching │
│ └─► Composite rule chaining │
│ │
│ 4. Correlation │
│ └─► Time-based correlation (frequency, sequences) │
│ └─► Statistical anomaly detection │
│ │
│ 5. Enrichment │
│ └─► GeoIP lookup │
│ └─► MITRE ATT&CK technique mapping │
│ └─► Alert metadata injection │
│ │
└─────────────┬────────────────────────────────────────────────┘
│
├─────────────────────────────────┐
│ │
▼ ▼
┌──────────────────────┐ ┌────────────────────────┐
│ Wazuh Indexer │ │ Alert Generation │
│ (OpenSearch) │ │ (Webhooks) │
├──────────────────────┤ ├────────────────────────┤
│ │ │ │
│ • Bulk indexing │ │ • Severity ≥ 7 │
│ • Daily indices │ │ • TheHive webhook │
│ • 30-day retention │ │ • Custom webhooks │
│ • Searchable │ │ │
│ │ └─────────────┬──────────┘
└──────────────────────┘ │
▼
┌───────────────────────┐
│ Downstream Systems │
├───────────────────────┤
│ • TheHive (SOAR) │
│ • AI Services │
│ • Custom SIEM │
└───────────────────────┘
Data Format Evolution¶
Input (Raw Log):
Oct 24 10:15:32 webserver nginx: 192.168.1.100 - - [24/Oct/2025:10:15:32 +0000] "GET /admin HTTP/1.1" 404 156 "-" "Mozilla/5.0"
After Decoding:
{
"timestamp": "2025-10-24T10:15:32.000Z",
"hostname": "webserver",
"program": "nginx",
"src_ip": "192.168.1.100",
"request_method": "GET",
"request_uri": "/admin",
"response_code": 404,
"user_agent": "Mozilla/5.0"
}
After Rule Matching:
{
... (decoded fields) ...,
"rule": {
"id": 31101,
"level": 7,
"description": "Multiple web authentication failures",
"mitre": {
"technique": ["T1110"],
"tactic": ["Credential Access"]
}
}
}
Final Alert (Indexed):
{
"timestamp": "2025-10-24T10:15:32.000Z",
"agent": {
"id": "001",
"name": "webserver",
"ip": "10.0.1.50"
},
"rule": {
"id": 31101,
"level": 7,
"description": "Multiple web authentication failures",
"groups": ["web", "authentication_failed"],
"mitre": {
"technique": ["T1110"],
"tactic": ["Credential Access"]
}
},
"data": {
"src_ip": "192.168.1.100",
"dst_ip": "10.0.1.50",
"src_port": 54321,
"dst_port": 443,
"protocol": "TCP",
"url": "/admin"
},
"location": "/var/log/nginx/access.log",
"decoder": {
"name": "nginx-access"
},
"geoip": {
"country_name": "United States",
"city_name": "San Francisco",
"latitude": 37.7749,
"longitude": -122.4194
}
}
Performance Characteristics¶
| Stage | Latency | Throughput | Resource Usage |
|---|---|---|---|
| Input Reception | <1ms | 15,000 events/sec | Negligible |
| Decoding | 1-5ms | Limited by CPU | ~10% CPU |
| Rule Matching | 5-20ms | 10,000 events/sec | ~30% CPU |
| Correlation | Variable | 5,000 events/sec | ~20% CPU |
| Indexing | 10-50ms | 50,000 events/sec | ~40% CPU, ~2GB RAM |
Total End-to-End Latency: 20-100ms (event to indexed alert)
2. ML-Powered Classification Flow¶
Flow Diagram¶
Wazuh Alert → Feature Extraction → ML Inference → Prediction
↓
Risk Score + Confidence
↓
Alert Enrichment → Wazuh Indexer
Detailed Process¶
Step 1: Feature Extraction
The ML service subscribes to Wazuh alerts and extracts 79 CICIDS2017 features:
# Feature extraction from Wazuh alert
features = extract_features(alert_data)
# Returns: [Flow Duration, Fwd Packet Length Mean, Flow Bytes/s, ...]
CICIDS2017 Feature Categories: 1. Flow Characteristics (13 features): Duration, bytes, packets 2. Forward Direction Stats (14 features): Packet sizes, IAT 3. Backward Direction Stats (14 features): Packet sizes, IAT 4. Bidirectional Stats (10 features): Ratios, flags 5. Time-Based (8 features): Active/Idle periods 6. Protocol (5 features): TCP flags, headers 7. Application Layer (15 features): Payload statistics
Step 2: ML Inference API Request
POST http://ml-inference:8500/predict
Content-Type: application/json
{
"features": [
120.5, # Flow Duration (seconds)
564.23, # Fwd Packet Length Mean
4687.9, # Flow Bytes/s
... (76 more features)
],
"model_name": "random_forest"
}
Step 3: Model Prediction
Input Vector [79 dimensions]
↓
Scaling (StandardScaler)
↓
Random Forest (500 trees)
↓
Voting (majority class)
↓
Prediction: ATTACK
Confidence: 0.9856
Step 4: Response
{
"prediction": "ATTACK",
"confidence": 0.9856,
"model": "random_forest",
"inference_time_ms": 0.8,
"feature_importance": {
"Fwd Packet Length Mean": 0.152,
"Flow Bytes/s": 0.128,
"Flow Packets/s": 0.113
}
}
Step 5: Alert Enrichment
The original Wazuh alert is enriched with ML prediction:
{
... (original alert) ...,
"ml_classification": {
"prediction": "ATTACK",
"confidence": 0.9856,
"model": "random_forest",
"timestamp": "2025-10-24T10:15:33.120Z"
},
"risk_score": 95 # Calculated: 0.9856 * 100
}
Multi-Model Ensemble¶
The system can query multiple models for consensus:
Alert → Random Forest → ATTACK (0.9856)
↓
→ XGBoost → ATTACK (0.9821)
↓
→ Decision Tree → ATTACK (0.9512)
↓
Ensemble Vote: ATTACK
Average Confidence: 0.9730
Performance¶
- Latency: <1ms per prediction (Random Forest)
- Throughput: 1,250 predictions/second (single container)
- Accuracy: 99.28% (Random Forest), 99.21% (XGBoost)
- False Positive Rate: 0.25% (Random Forest), 0.09% (XGBoost)
3. AI-Augmented Triage Flow¶
Complete Flow Diagram¶
Wazuh Alert
│
▼
Alert Triage Service
│
├─────────────────┬─────────────────┬───────────────┐
│ │ │ │
▼ ▼ ▼ ▼
ML Inference RAG Service Ollama LLM Rule-Based
(Classification) (MITRE Context) (Analysis) (Risk Calc)
│ │ │ │
│ │ │ │
└─────────────────┴─────────────────┴───────────────┘
│
▼
Enriched Alert Response
(Risk Score, Classification,
MITRE Techniques, Summary)
│
▼
TheHive Case
Step-by-Step Process¶
Step 1: Alert Ingestion
POST http://alert-triage:8100/triage
Content-Type: application/json
{
"alert_data": {
"rule_id": 31101,
"rule_description": "Multiple web authentication failures",
"src_ip": "192.168.1.100",
"dst_ip": "10.0.1.50",
"src_port": 54321,
"dst_port": 443,
"protocol": "TCP",
"severity": 7,
"mitre_technique": ["T1110"]
}
}
Step 2: Parallel Processing
The Alert Triage Service makes concurrent requests:
# Parallel execution
async def triage_alert(alert_data):
ml_task = call_ml_inference(alert_data)
rag_task = retrieve_mitre_context(alert_data)
llm_task = analyze_with_llm(alert_data)
results = await asyncio.gather(ml_task, rag_task, llm_task)
return combine_results(results)
2a. ML Inference Request
POST http://ml-inference:8000/predict
{
"features": [...], # Extracted from alert_data
"model_name": "random_forest"
}
Response:
{
"prediction": "ATTACK",
"confidence": 0.9856
}
2b. RAG Service Request
POST http://rag-service:8000/retrieve
{
"query": "brute force credential access web authentication",
"top_k": 3
}
Response:
{
"results": [
{
"technique_id": "T1110",
"technique_name": "Brute Force",
"similarity_score": 0.92,
"description": "Adversaries may use brute force techniques...",
"tactics": ["Credential Access"],
"detection": "Monitor authentication logs for patterns..."
},
{
"technique_id": "T1110.001",
"technique_name": "Brute Force: Password Guessing",
"similarity_score": 0.89
},
{
"technique_id": "T1078",
"technique_name": "Valid Accounts",
"similarity_score": 0.74
}
]
}
2c. LLM Analysis Request
POST http://ollama-server:11434/api/generate
{
"model": "llama3.1:8b",
"prompt": "Analyze this security alert and provide a risk assessment:\n\nAlert: Multiple web authentication failures\nSource IP: 192.168.1.100\nTarget: 10.0.1.50:443\nProtocol: HTTPS\n\nMITRE Context: T1110 (Brute Force)\n\nProvide:\n1. Attack classification\n2. Recommended actions\n3. Executive summary",
"stream": false
}
Response:
{
"response": "**Attack Classification:** Brute Force Credential Attack\n\n**Recommended Actions:**\n1. Immediately block source IP 192.168.1.100 at firewall\n2. Force password reset for affected accounts\n3. Enable multi-factor authentication\n4. Review access logs for successful attempts\n\n**Executive Summary:**\nHigh-confidence brute force attack detected against web authentication. Attacker systematically attempting credential guessing from IP 192.168.1.100. Immediate blocking recommended to prevent account compromise.",
"tokens_evaluated": 1024
}
Step 3: Result Aggregation
The Alert Triage Service combines all results:
def calculate_risk_score(ml_conf, rag_similarity, severity):
"""
Risk Score Calculation:
- ML Confidence: 40% weight
- MITRE Similarity: 30% weight
- Alert Severity: 30% weight
"""
risk = (
ml_conf * 0.4 +
rag_similarity * 0.3 +
(severity / 15) * 0.3 # Normalize severity (0-15 scale)
) * 100
return int(risk)
# Example calculation:
ml_conf = 0.9856
rag_similarity = 0.92
severity = 7
risk_score = calculate_risk_score(ml_conf, rag_similarity, severity)
# Result: 85
Step 4: Final Response
{
"risk_score": 85,
"classification": "Brute Force Credential Attack",
"mitre_techniques": [
{
"id": "T1110",
"name": "Brute Force",
"confidence": 0.92
},
{
"id": "T1110.001",
"name": "Password Guessing",
"confidence": 0.89
}
],
"recommended_actions": [
"Block source IP 192.168.1.100 at firewall",
"Force password reset for affected accounts",
"Enable multi-factor authentication",
"Review access logs for successful attempts"
],
"executive_summary": "High-confidence brute force attack detected against web authentication. Attacker systematically attempting credential guessing from IP 192.168.1.100. Immediate blocking recommended to prevent account compromise.",
"ml_prediction": {
"prediction": "ATTACK",
"confidence": 0.9856,
"model": "random_forest"
},
"processing_time_ms": 3250,
"components_used": ["ml_inference", "rag_service", "ollama_llm"]
}
Performance Profile¶
| Component | Latency | Contribution |
|---|---|---|
| Feature Extraction | 50ms | 1.5% |
| ML Inference | 0.8ms | <1% |
| RAG Retrieval | 45ms | 1.4% |
| LLM Analysis | 3000ms | 92% |
| Aggregation | 20ms | 0.6% |
| Total | ~3250ms | 100% |
Bottleneck: LLM inference dominates latency.
Optimization Strategies: 1. Batch processing: Queue alerts, process in batches 2. Async execution: Non-blocking LLM calls 3. Caching: Cache LLM responses for similar alerts 4. Model optimization: Use smaller/faster LLM (7B → 3B params)
4. SOAR Orchestration Flow¶
Automated Response Workflow¶
Enriched Alert (from AI Triage)
│
▼
TheHive Case Creation
│
├──► Observable Extraction (IP, domain, hash, email)
│
▼
Cortex Analysis (Parallel)
│
├─► AbuseIPDB (IP reputation)
├─► VirusTotal (File hash)
├─► MaxMind GeoIP (Geolocation)
└─► Shodan (Port scan history)
│
└──► Results → TheHive Observables
│
▼
Shuffle Workflow Trigger
│
├──► Condition: Risk Score ≥ 80
│ │
│ ├─► Action: Block IP on firewall
│ ├─► Action: Create ticket in ServiceNow
│ └─► Action: Send Slack notification
│
└──► Condition: Risk Score < 80
│
└─► Action: Assign to analyst queue
Data Flow Through SOAR¶
Step 1: TheHive Case Creation
Webhook from Wazuh → TheHive:
POST http://thehive:9010/api/v1/alert
Authorization: Bearer API_KEY
{
"type": "wazuh",
"source": "AI-SOC",
"sourceRef": "wazuh-alert-12345",
"title": "Brute Force Attack - 192.168.1.100",
"description": "High-confidence brute force attack detected...",
"severity": 3, # Critical
"tlp": 2, # Amber
"pap": 2, # Amber
"tags": ["brute_force", "T1110", "credential_access"],
"customFields": {
"risk_score": 85,
"ml_confidence": 0.9856,
"mitre_techniques": ["T1110", "T1110.001"]
},
"observables": [
{
"dataType": "ip",
"data": "192.168.1.100",
"tags": ["src_ip", "attacker"],
"ioc": true
},
{
"dataType": "ip",
"data": "10.0.1.50",
"tags": ["dst_ip", "victim"]
}
]
}
Step 2: Cortex Analysis
TheHive automatically triggers Cortex analyzers for each observable:
Observable: 192.168.1.100 (IP)
│
├─► AbuseIPDB Analyzer
│ └─► Result: Malicious (Confidence: 95%, Reports: 147)
│
├─► MaxMind GeoIP
│ └─► Result: Russia, Moscow (ISP: Suspected Proxy)
│
└─► Shodan
└─► Result: Open ports 22, 80, 443, 3389 (RDP exposed)
Cortex API Call:
POST http://cortex:9001/api/analyzer/AbuseIPDB/run
Authorization: Bearer API_KEY
{
"data": "192.168.1.100",
"dataType": "ip",
"tlp": 2,
"pap": 2
}
Response:
{
"status": "Success",
"artifacts": [
{
"type": "abuse_confidence",
"value": 95
},
{
"type": "total_reports",
"value": 147
},
{
"type": "country_code",
"value": "RU"
}
]
}
Step 3: Shuffle Workflow Execution
Webhook trigger from TheHive → Shuffle:
POST http://shuffle-backend:5001/api/v1/hooks/thehive_alert
{
"case_id": "~41216",
"title": "Brute Force Attack - 192.168.1.100",
"severity": 3,
"customFields": {
"risk_score": 85
},
"observables": [
{
"dataType": "ip",
"data": "192.168.1.100",
"tags": ["attacker"],
"cortex_results": {
"abuseipdb": {"confidence": 95}
}
}
]
}
Shuffle Workflow Execution:
Workflow: High-Risk Alert Response
Trigger: TheHive Case (Risk Score ≥ 80)
Actions:
1. Extract Variables:
- attacker_ip = case.observables[0].data
- risk_score = case.customFields.risk_score
2. Condition Check: risk_score ≥ 80
3a. If TRUE (High Risk):
- Call Firewall API: block_ip(attacker_ip)
- Create ServiceNow Ticket:
Priority: Critical
Assignment: Security Team
- Send Slack Notification:
Channel: #security-incidents
Message: "🚨 Critical: Brute force attack blocked. IP: {attacker_ip}"
- Update TheHive Case:
Status: Resolved
Resolution: Automated blocking
3b. If FALSE (Medium Risk):
- Assign to Analyst Queue
- Send Email Notification
Step 4: Response Actions
Firewall Block (pfSense API):
POST https://firewall.example.com/api/v2/firewall/rule
Authorization: Bearer FIREWALL_API_KEY
{
"type": "block",
"interface": "wan",
"ipprotocol": "inet",
"protocol": "any",
"src": "192.168.1.100/32",
"dst": "any",
"descr": "Blocked by AI-SOC: Brute force attack (Case ~41216)",
"log": true
}
ServiceNow Ticket:
POST https://company.service-now.com/api/now/table/incident
Authorization: Basic SERVICE_NOW_CREDS
{
"short_description": "Security Incident: Brute Force Attack Blocked",
"description": "AI-SOC automatically blocked brute force attack from 192.168.1.100...",
"urgency": "1",
"impact": "1",
"priority": "1",
"assignment_group": "Security Operations",
"category": "Security",
"subcategory": "Intrusion Detection"
}
Slack Notification:
POST https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXX
{
"channel": "#security-incidents",
"username": "AI-SOC",
"icon_emoji": ":robot_face:",
"attachments": [
{
"color": "danger",
"title": "🚨 Critical Security Incident",
"text": "Brute Force Attack Automatically Blocked",
"fields": [
{"title": "Attacker IP", "value": "192.168.1.100", "short": true},
{"title": "Risk Score", "value": "85/100", "short": true},
{"title": "MITRE Technique", "value": "T1110 (Brute Force)", "short": true},
{"title": "Action Taken", "value": "IP Blocked + Ticket Created", "short": true}
],
"footer": "AI-SOC Autonomous Response",
"ts": 1729775732
}
]
}
End-to-End Timing¶
| Stage | Duration | Cumulative |
|---|---|---|
| Alert → TheHive Case | 500ms | 500ms |
| Cortex Analysis (parallel) | 2-5s | 3-5.5s |
| Shuffle Workflow Trigger | 200ms | 3.2-5.7s |
| Firewall API Call | 300ms | 3.5-6s |
| ServiceNow Ticket | 800ms | 4.3-6.8s |
| Slack Notification | 200ms | 4.5-7s |
| Total (Alert to Response) | ~5-7 seconds | - |
5. Observability Pipeline¶
Metrics Flow¶
All Services → Prometheus Exporters → Prometheus → Grafana
↓
Alert Rules
↓
AlertManager
↓
Email/Slack/Shuffle
Service Metrics Collection¶
Example: ML Inference Metrics
The ML Inference service exposes Prometheus metrics:
# Python (FastAPI + prometheus_client)
from prometheus_client import Counter, Histogram
# Metric definitions
predictions_total = Counter(
'ml_predictions_total',
'Total ML predictions',
['model', 'prediction']
)
inference_latency = Histogram(
'ml_inference_latency_seconds',
'ML inference latency',
['model']
)
# Usage in code
@app.post("/predict")
async def predict(request: PredictionRequest):
start_time = time.time()
prediction = model.predict(request.features)
# Record metrics
latency = time.time() - start_time
inference_latency.labels(model='random_forest').observe(latency)
predictions_total.labels(
model='random_forest',
prediction=prediction
).inc()
return {"prediction": prediction}
Exposed Metrics (GET /metrics):
# HELP ml_predictions_total Total ML predictions
# TYPE ml_predictions_total counter
ml_predictions_total{model="random_forest",prediction="ATTACK"} 15234.0
ml_predictions_total{model="random_forest",prediction="BENIGN"} 8912.0
ml_predictions_total{model="xgboost",prediction="ATTACK"} 5621.0
# HELP ml_inference_latency_seconds ML inference latency
# TYPE ml_inference_latency_seconds histogram
ml_inference_latency_seconds_bucket{le="0.001",model="random_forest"} 8524.0
ml_inference_latency_seconds_bucket{le="0.005",model="random_forest"} 15230.0
ml_inference_latency_seconds_sum{model="random_forest"} 19.348
ml_inference_latency_seconds_count{model="random_forest"} 24146.0
Prometheus Scraping¶
# prometheus.yml
scrape_configs:
- job_name: 'ml-inference'
scrape_interval: 15s
static_configs:
- targets: ['ml-inference:8000']
metrics_path: /metrics
Scrape Result:
Timestamp: 2025-10-24T10:15:45.000Z
Job: ml-inference
Instance: ml-inference:8000
Metrics:
ml_predictions_total{model="random_forest",prediction="ATTACK"} 15234
ml_inference_latency_seconds_sum{model="random_forest"} 19.348
...
Alert Rule Evaluation¶
# alerts/ai-soc-alerts.yml
groups:
- name: ml_performance
interval: 30s
rules:
- alert: HighMLInferenceLatency
expr: |
histogram_quantile(0.95,
rate(ml_inference_latency_seconds_bucket[5m])
) > 0.005
for: 5m
labels:
severity: warning
component: ml_inference
annotations:
summary: "ML inference latency above threshold"
description: "95th percentile latency is {{ $value }}s (threshold: 0.005s)"
- alert: MLPredictionImbalance
expr: |
sum(rate(ml_predictions_total{prediction="ATTACK"}[5m]))
/
sum(rate(ml_predictions_total[5m]))
> 0.5
for: 10m
labels:
severity: warning
component: ml_inference
annotations:
summary: "Abnormal prediction distribution"
description: "{{ $value | humanizePercentage }} of predictions are ATTACK"
AlertManager Routing¶
# alertmanager.yml
route:
receiver: 'default'
routes:
- match:
component: ml_inference
receiver: 'ai-team'
receivers:
- name: 'ai-team'
email_configs:
- to: 'ai-team@example.com'
subject: '[AI-SOC] ML Service Alert: {{ .GroupLabels.alertname }}'
slack_configs:
- api_url: 'https://hooks.slack.com/...'
channel: '#ai-soc-alerts'
title: '{{ .GroupLabels.alertname }}'
text: '{{ range .Alerts }}{{ .Annotations.description }}{{ end }}'
Grafana Dashboards¶
Data Source Configuration:
# grafana/provisioning/datasources/prometheus.yml
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
Dashboard Query Example:
Panel: "ML Inference Latency (p95)"
Panel: "Predictions per Second"
6. End-to-End Integration Example¶
Complete Flow: Attack Detection to Automated Response¶
Scenario: SQL Injection attack against web application
┌──────────────────────────────────────────────────────────────┐
│ T+0ms: Attack Execution │
├──────────────────────────────────────────────────────────────┤
│ Attacker sends: GET /products?id=1' OR '1'='1 │
│ Target: Web Application (10.0.1.50:443) │
└─────────────────────────────┬────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ T+10ms: Network Detection (Suricata) │
├──────────────────────────────────────────────────────────────┤
│ Rule Match: ET WEB_SPECIFIC_APPS SQL Injection Attempt │
│ Severity: High │
│ EVE JSON log created: /var/log/suricata/eve.json │
└─────────────────────────────┬────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ T+50ms: Log Shipping (Filebeat) │
├──────────────────────────────────────────────────────────────┤
│ Filebeat reads eve.json → sends to Wazuh Manager │
└─────────────────────────────┬────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ T+100ms: SIEM Correlation (Wazuh Manager) │
├──────────────────────────────────────────────────────────────┤
│ 1. Decoding: Extract src_ip, dst_ip, payload │
│ 2. Rule Match: 31106 (SQL injection attempt) │
│ 3. Enrichment: GeoIP, MITRE T1190 │
│ 4. Alert Generation: Severity 12 │
└─────────────────────────────┬────────────────────────────────┘
│
├──► Wazuh Indexer (Storage)
│
└──► Webhook → AI Services
│
▼
┌──────────────────────────────────────────────────────────────┐
│ T+200ms: AI Triage (Parallel Processing) │
├──────────────────────────────────────────────────────────────┤
│ • ML Inference: ATTACK (Confidence: 0.9912) │
│ • RAG Retrieval: T1190 (Initial Access - Exploit Public) │
│ • LLM Analysis: "SQL injection attack attempt detected..." │
│ │
│ Risk Score: 92/100 │
└─────────────────────────────┬────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ T+3500ms: TheHive Case Creation │
├──────────────────────────────────────────────────────────────┤
│ Case #~41217: SQL Injection Attack │
│ Severity: Critical │
│ Observables: src_ip=203.0.113.42, dst_ip=10.0.1.50 │
│ Tags: sql_injection, T1190, web_attack │
└─────────────────────────────┬────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ T+4000ms: Cortex Analysis (Parallel) │
├──────────────────────────────────────────────────────────────┤
│ • AbuseIPDB: Malicious (Confidence: 87%) │
│ • GeoIP: China, Beijing (ISP: Hosting Provider) │
│ • URLhaus: Payload matches known exploit kit │
└─────────────────────────────┬────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ T+5000ms: Shuffle Workflow (Automated Response) │
├──────────────────────────────────────────────────────────────┤
│ Condition: Risk ≥ 90 AND sql_injection tag │
│ │
│ Actions Executed: │
│ 1. Block IP at WAF (Cloudflare API) │
│ 2. Add to threat feed (MISP) │
│ 3. Create ServiceNow P1 incident │
│ 4. Notify #security-incidents (Slack) │
│ 5. Update TheHive case: Status = Responded │
└─────────────────────────────┬────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ T+7000ms: Response Confirmation │
├──────────────────────────────────────────────────────────────┤
│ • WAF: IP 203.0.113.42 blocked (confirmed) │
│ • MISP: IOC published to threat feed │
│ • ServiceNow: INC0012345 created │
│ • Slack: Team notified │
│ │
│ Attack Neutralized: 7 seconds from detection to mitigation │
└──────────────────────────────────────────────────────────────┘
Total Timeline: - Detection: 10ms (Suricata) - Correlation: 100ms (Wazuh) - AI Analysis: 3.5s (ML + RAG + LLM) - Response: 7s (End-to-end)
Manual SOC Response (Typical): - Detection: 10ms (same) - Analyst notification: 5-15 minutes - Investigation: 10-30 minutes - Response approval: 5-15 minutes - Total: 20-60 minutes
Improvement: 171x - 514x faster response time with AI-SOC automation
Summary¶
Key Data Flows Documented: 1. Log Ingestion - External sources → Wazuh → OpenSearch 2. ML Classification - Alert → Feature extraction → Prediction 3. AI Triage - Alert → ML + RAG + LLM → Enriched case 4. SOAR Orchestration - Case → Analysis → Automated response 5. Observability - Service metrics → Prometheus → Grafana
Performance Characteristics: - Log Ingestion: 20-100ms latency - ML Inference: <1ms - AI Triage: ~3s (LLM-dominated) - SOAR Response: 4-7s - End-to-End: 5-10s (detection to mitigation)
Data Formats: - Syslog, JSON (Suricata/Zeek EVE) - Wazuh Alert Format (JSON) - MITRE ATT&CK (JSON schema) - Prometheus Metrics (OpenMetrics) - REST APIs (JSON over HTTP)
Data Flow Documentation Version: 1.0 Last Updated: October 24, 2025 Maintained By: AI-SOC Architecture Team