Documentation Index
Fetch the complete documentation index at: https://threatbasis.io/llms.txt
Use this file to discover all available pages before exploring further.
AI orchestration for security leverages autonomous agents and coordinated workflows to automate complex security tasks that traditionally required significant human effort. Security engineers design AI-powered systems that can triage alerts, investigate incidents, and execute response actions while maintaining appropriate human oversight and control.
Modern AI orchestration goes beyond simple automation by enabling adaptive, context-aware decision-making. Agents can reason about security events, correlate information across multiple sources, and take actions based on organizational policies and threat intelligence. This capability transforms security operations from reactive alert processing to proactive threat management.
According to the SANS 2024 SOC Survey, security operations centers handle an average of 11,000 alerts daily, with analysts spending 25-30% of their time on false positives. AI orchestration can reduce this burden by automating initial triage, enrichment, and response for routine incidents.
Why AI Orchestration Matters for Security
Security operations face unique challenges that make AI orchestration essential:
| Challenge | Impact on SOC Operations | AI Orchestration Solution |
|---|
| Alert fatigue | 70% of analysts report burnout | Automated triage and prioritization |
| Skill shortage | 3.5M unfilled cybersecurity positions | Amplify analyst capabilities with AI assistance |
| Dwell time | Average 204 days to detect breaches | Continuous automated threat hunting |
| Manual enrichment | 15-30 minutes per alert investigation | Parallel automated data gathering |
| Inconsistent response | Playbook adherence varies by analyst | Standardized AI-driven response execution |
| 24/7 coverage requirements | Staffing gaps during off-hours | Always-on automated monitoring and response |
| Tool sprawl | Average SOC uses 25+ security tools | Unified orchestration layer across tools |
Core Concepts
AI orchestration in security contexts involves several key architectural patterns defined by security frameworks including NIST Cybersecurity Framework and MITRE ATT&CK:
| Pattern | Description | Security Application | Autonomy Level |
|---|
| Single Agent | Autonomous AI handling specific tasks | Alert triage, log analysis | High |
| Multi-Agent | Coordinated agents with specialized roles | Complex incident investigation | Medium-High |
| Human-in-the-Loop | AI recommendations with human approval | High-impact response actions | Low-Medium |
| Workflow Automation | AI-enhanced SOAR playbooks | Automated enrichment and response | Configurable |
| Continuous Learning | Feedback-driven improvement | Detection tuning, false positive reduction | Medium |
| Supervisor-Worker | Hierarchical agent coordination | Large-scale threat hunting campaigns | Medium-High |
| Consensus-Based | Multiple agents voting on decisions | Critical security classifications | Medium |
Agent Architecture Patterns
Designing effective security AI agents requires careful consideration of autonomy levels, tool access, and safety constraints. The OWASP AI Security Guidelines recommend implementing defense-in-depth for AI systems.
Single-Agent Systems
Single-agent systems excel at focused, well-defined security tasks where context is limited and actions are bounded.
Best suited for:
- Alert classification and prioritization
- Log parsing and anomaly detection
- IOC extraction and enrichment
- Vulnerability scanning orchestration
- Compliance checking automation
from anthropic import Anthropic
from datetime import datetime
import json
class SecurityTriageAgent:
"""Single agent for automated alert triage and classification."""
def __init__(self):
self.client = Anthropic()
self.severity_thresholds = {
"critical": 9.0,
"high": 7.0,
"medium": 4.0,
"low": 0.0
}
def triage_alert(self, alert: dict) -> dict:
"""Analyze and classify a security alert."""
prompt = f"""Analyze this security alert and provide triage assessment.
ALERT DATA:
{json.dumps(alert, indent=2)}
Provide your analysis in JSON format:
{{
"classification": "true_positive|false_positive|suspicious",
"severity": "critical|high|medium|low",
"confidence": 0.0-1.0,
"attack_technique": "MITRE ATT&CK technique ID if applicable",
"recommended_actions": ["list of immediate actions"],
"enrichment_needed": ["list of additional data sources to query"],
"reasoning": "brief explanation of classification"
}}"""
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1000,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content[0].text)
def batch_triage(self, alerts: list[dict]) -> list[dict]:
"""Process multiple alerts with priority ordering."""
results = []
for alert in alerts:
triage = self.triage_alert(alert)
triage["original_alert"] = alert
triage["triaged_at"] = datetime.utcnow().isoformat()
results.append(triage)
# Sort by severity and confidence
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
results.sort(key=lambda x: (severity_order[x["severity"]], -x["confidence"]))
return results
Multi-Agent Coordination
Multi-agent systems distribute complex security tasks across specialized agents, enabling parallel processing and domain expertise per MITRE ATT&CK’s detection methodology.
Architecture patterns:
| Pattern | Coordination Method | Use Case | Complexity |
|---|
| Pipeline | Sequential handoff | Staged incident investigation | Low |
| Parallel | Concurrent execution | Multi-source enrichment | Medium |
| Hierarchical | Supervisor delegation | Complex threat hunting | High |
| Blackboard | Shared knowledge base | Collaborative analysis | High |
| Auction-based | Task bidding | Dynamic workload distribution | Medium |
from enum import Enum
from dataclasses import dataclass
from typing import Callable
import asyncio
class AgentRole(Enum):
TRIAGE = "triage"
ENRICHMENT = "enrichment"
THREAT_INTEL = "threat_intel"
RESPONSE = "response"
SUPERVISOR = "supervisor"
@dataclass
class AgentMessage:
sender: AgentRole
recipient: AgentRole
content: dict
priority: int = 0
class SecurityAgentOrchestrator:
"""Coordinate multiple specialized security agents."""
def __init__(self):
self.agents = {}
self.message_queue = asyncio.Queue()
self.investigation_state = {}
def register_agent(self, role: AgentRole, handler: Callable):
"""Register a specialized agent handler."""
self.agents[role] = handler
async def investigate_incident(self, incident: dict) -> dict:
"""Orchestrate multi-agent incident investigation."""
# Phase 1: Initial triage
triage_result = await self.agents[AgentRole.TRIAGE](incident)
# Phase 2: Parallel enrichment
enrichment_tasks = [
self.agents[AgentRole.ENRICHMENT](incident, triage_result),
self.agents[AgentRole.THREAT_INTEL](incident, triage_result)
]
enrichment_results = await asyncio.gather(*enrichment_tasks)
# Phase 3: Supervisor synthesis
combined_context = {
"incident": incident,
"triage": triage_result,
"enrichment": enrichment_results[0],
"threat_intel": enrichment_results[1]
}
# Phase 4: Response recommendation
response = await self.agents[AgentRole.SUPERVISOR](combined_context)
return response
Human-in-the-Loop Design
Human oversight is essential for high-impact security decisions per NIST AI Risk Management Framework guidelines.
Approval thresholds by action type:
| Action Category | Example Actions | Approval Required | Timeout Behavior |
|---|
| Read-only enrichment | WHOIS lookup, reputation check | None | Auto-proceed |
| Low-impact containment | Block IP at edge firewall | Optional | Auto-proceed |
| Medium-impact response | Disable user account temporarily | Recommended | Queue for review |
| High-impact remediation | Isolate production server | Required | Block until approved |
| Critical actions | Wipe endpoint, revoke all sessions | Dual approval | Escalate to on-call |
from enum import Enum
from dataclasses import dataclass
import asyncio
class ApprovalLevel(Enum):
NONE = "none"
OPTIONAL = "optional"
RECOMMENDED = "recommended"
REQUIRED = "required"
DUAL_REQUIRED = "dual_required"
@dataclass
class PendingAction:
action_id: str
action_type: str
target: str
reasoning: str
approval_level: ApprovalLevel
timeout_seconds: int
approvals: list = None
class HumanInTheLoopController:
"""Manage human approval workflows for AI-recommended actions."""
def __init__(self, notification_service, approval_store):
self.notifications = notification_service
self.approvals = approval_store
self.action_policies = self._load_policies()
async def request_approval(self, action: PendingAction) -> bool:
"""Request human approval for a security action."""
if action.approval_level == ApprovalLevel.NONE:
return True
# Store pending action
await self.approvals.store(action)
# Notify appropriate approvers
await self.notifications.send_approval_request(
action=action,
channels=["slack", "pagerduty", "email"]
)
# Wait for approval with timeout
try:
approved = await asyncio.wait_for(
self._wait_for_approval(action),
timeout=action.timeout_seconds
)
return approved
except asyncio.TimeoutError:
return self._handle_timeout(action)
def _handle_timeout(self, action: PendingAction) -> bool:
"""Handle approval timeout based on policy."""
if action.approval_level in [ApprovalLevel.REQUIRED, ApprovalLevel.DUAL_REQUIRED]:
self.notifications.escalate(action)
return False
return action.approval_level == ApprovalLevel.OPTIONAL
Security Operations Use Cases
AI orchestration enables automation across the security operations lifecycle, from initial detection through containment and recovery. These patterns align with NIST SP 800-61 Computer Security Incident Handling Guide.
Alert Triage and Prioritization
Automated alert triage reduces analyst workload by 60-80% while improving detection accuracy.
from anthropic import Anthropic
import json
class AlertTriageOrchestrator:
"""Orchestrate alert triage across multiple data sources."""
def __init__(self):
self.client = Anthropic()
self.enrichment_sources = []
async def triage_with_enrichment(self, alert: dict) -> dict:
"""Full triage workflow with parallel enrichment."""
# Parallel enrichment queries
enrichment_data = await self._gather_enrichment(alert)
# AI-powered classification with full context
prompt = f"""You are a security analyst performing alert triage.
ALERT:
{json.dumps(alert, indent=2)}
ENRICHMENT DATA:
{json.dumps(enrichment_data, indent=2)}
Classify this alert considering:
1. Is this a true positive, false positive, or requires investigation?
2. What is the business impact if this is a real attack?
3. What MITRE ATT&CK techniques are indicated?
4. What immediate actions should be taken?
Respond in JSON format with classification, severity (critical/high/medium/low),
confidence (0-1), mitre_techniques, and recommended_actions."""
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1000,
messages=[{"role": "user", "content": prompt}]
)
result = json.loads(response.content[0].text)
result["enrichment"] = enrichment_data
return result
Automated Incident Investigation
AI agents can conduct thorough investigations that would take analysts hours, completing them in minutes. This follows the SANS Incident Handler’s Handbook methodology.
Investigation workflow phases:
| Phase | AI Actions | Human Touchpoints | Time Savings |
|---|
| Identification | Correlate alerts, identify scope | Confirm incident declaration | 80% |
| Scoping | Query all relevant logs, map affected systems | Review scope assessment | 70% |
| Evidence Gathering | Collect logs, memory dumps, network captures | Approve forensic actions | 60% |
| Timeline Building | Reconstruct attack sequence | Validate timeline accuracy | 75% |
| Attribution | Match TTPs to threat actors | Confirm attribution | 50% |
| Reporting | Generate incident report draft | Review and finalize | 85% |
class IncidentInvestigationAgent:
"""Automated incident investigation with evidence collection."""
def __init__(self, siem_client, edr_client, ti_client):
self.siem = siem_client
self.edr = edr_client
self.threat_intel = ti_client
self.client = Anthropic()
async def investigate(self, incident_id: str, initial_indicators: list) -> dict:
"""Conduct comprehensive incident investigation."""
investigation = {
"incident_id": incident_id,
"timeline": [],
"affected_assets": set(),
"iocs": set(),
"ttps": [],
"recommendations": []
}
# Phase 1: Expand IOCs through pivoting
expanded_iocs = await self._pivot_on_indicators(initial_indicators)
investigation["iocs"].update(expanded_iocs)
# Phase 2: Query all relevant data sources
siem_results = await self.siem.search(expanded_iocs, hours=72)
edr_results = await self.edr.hunt(expanded_iocs)
ti_matches = await self.threat_intel.lookup(expanded_iocs)
# Phase 3: Build investigation timeline
timeline = self._build_timeline(siem_results, edr_results)
investigation["timeline"] = timeline
# Phase 4: AI analysis and recommendation
analysis = await self._analyze_investigation(investigation, ti_matches)
return analysis
Threat Hunting Assistance
AI-powered threat hunting enables proactive detection of threats that evade traditional detection methods, aligned with MITRE ATT&CK-based hunting.
class ThreatHuntingAgent:
"""AI-assisted threat hunting across enterprise data."""
def __init__(self):
self.client = Anthropic()
self.hunt_hypotheses = self._load_hunt_library()
def generate_hunt_queries(self, hypothesis: str, data_sources: list) -> list:
"""Generate hunting queries for a given hypothesis."""
prompt = f"""Generate threat hunting queries for this hypothesis:
HYPOTHESIS: {hypothesis}
AVAILABLE DATA SOURCES: {json.dumps(data_sources)}
For each data source, provide:
1. Query syntax appropriate for that platform
2. Expected indicators if the hypothesis is true
3. False positive considerations
4. MITRE ATT&CK technique coverage
Format as JSON array of hunt queries."""
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2000,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content[0].text)
async def execute_hunt(self, queries: list) -> dict:
"""Execute hunting queries and analyze results."""
results = []
for query in queries:
query_results = await self._execute_query(query)
if query_results:
analysis = await self._analyze_hunt_results(query, query_results)
results.append(analysis)
return {"findings": results, "summary": self._summarize_hunt(results)}
Response Automation
Automated response orchestration executes containment and remediation actions based on playbooks and AI recommendations, integrating with SOAR platforms like Splunk SOAR, Palo Alto XSOAR, and IBM QRadar SOAR.
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class ResponseAction(Enum):
BLOCK_IP = "block_ip"
ISOLATE_HOST = "isolate_host"
DISABLE_USER = "disable_user"
QUARANTINE_FILE = "quarantine_file"
RESET_PASSWORD = "reset_password"
REVOKE_SESSIONS = "revoke_sessions"
@dataclass
class ResponsePlan:
actions: list[ResponseAction]
targets: list[str]
reasoning: str
approval_required: bool
rollback_plan: Optional[str] = None
class ResponseOrchestrator:
"""Execute automated response actions with safety controls."""
def __init__(self, action_handlers: dict, approval_controller):
self.handlers = action_handlers
self.approvals = approval_controller
self.executed_actions = []
async def execute_response_plan(self, plan: ResponsePlan) -> dict:
"""Execute a response plan with appropriate approvals."""
results = {"success": [], "failed": [], "pending_approval": []}
for action, target in zip(plan.actions, plan.targets):
# Check if approval is needed
if plan.approval_required:
approved = await self.approvals.request_approval(action, target)
if not approved:
results["pending_approval"].append({"action": action, "target": target})
continue
# Execute action with rollback capability
try:
result = await self.handlers[action](target)
self.executed_actions.append({
"action": action,
"target": target,
"result": result,
"rollback": plan.rollback_plan
})
results["success"].append({"action": action, "target": target})
except Exception as e:
results["failed"].append({"action": action, "target": target, "error": str(e)})
return results
Implementation Considerations
Effective AI orchestration requires robust integration with security tools across the enterprise. Follow OWASP API Security Top 10 guidelines for secure API integration.
Common integration targets:
| Tool Category | Integration Method | Data Format | Authentication |
|---|
| Splunk | REST API | JSON | Token/OAuth |
| Microsoft Sentinel | REST API/SDK | JSON | Azure AD |
| CrowdStrike Falcon | REST API | JSON | OAuth2 |
| Elastic Security | REST API | JSON | API Key |
| VirusTotal | REST API | JSON | API Key |
| MISP | REST API | STIX/MISP JSON | API Key |
from dataclasses import dataclass
from typing import Protocol
import httpx
class SecurityToolClient(Protocol):
"""Protocol for security tool integrations."""
async def query(self, query: str) -> dict: ...
async def execute_action(self, action: str, target: str) -> dict: ...
@dataclass
class ToolCredentials:
api_key: str
base_url: str
timeout: int = 30
class SecurityToolOrchestrator:
"""Unified interface for security tool integrations."""
def __init__(self):
self.tools: dict[str, SecurityToolClient] = {}
def register_tool(self, name: str, client: SecurityToolClient):
self.tools[name] = client
async def parallel_query(self, query: str, tools: list[str]) -> dict:
"""Execute query across multiple tools in parallel."""
async with httpx.AsyncClient() as http_client:
results = {}
for tool_name in tools:
if tool_name in self.tools:
results[tool_name] = await self.tools[tool_name].query(query)
return results
Safety and Guardrails
AI agents operating in security contexts require strict guardrails to prevent unintended harm. These align with NIST AI Risk Management Framework principles.
Essential guardrails:
| Guardrail Category | Implementation | Example |
|---|
| Action boundaries | Allowlist of permitted actions | Block actions not in approved list |
| Rate limiting | Throttle automated actions | Max 100 IP blocks per hour |
| Blast radius limits | Restrict scope of automated responses | Cannot isolate more than 5 hosts at once |
| Rollback requirements | Require reversibility for all actions | Store undo commands for each action |
| Escalation triggers | Force human review for anomalies | Unusual action patterns trigger review |
| Audit logging | Complete audit trail of all decisions | Log reasoning for every AI decision |
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class GuardrailConfig:
max_actions_per_hour: int = 100
max_concurrent_isolations: int = 5
require_rollback_plan: bool = True
high_impact_threshold: float = 0.8
class SafetyGuardrails:
"""Enforce safety constraints on AI agent actions."""
def __init__(self, config: GuardrailConfig):
self.config = config
self.action_log = []
self.permitted_actions = set()
def validate_action(self, action: str, target: str, context: dict) -> tuple[bool, str]:
"""Validate action against all guardrails."""
# Check action is permitted
if action not in self.permitted_actions:
return False, f"Action {action} not in permitted list"
# Check rate limits
recent_actions = self._count_recent_actions(hours=1)
if recent_actions >= self.config.max_actions_per_hour:
return False, "Rate limit exceeded"
# Check blast radius
if action == "isolate_host":
current_isolations = self._count_active_isolations()
if current_isolations >= self.config.max_concurrent_isolations:
return False, "Maximum concurrent isolations reached"
# Check rollback plan exists
if self.config.require_rollback_plan and "rollback_plan" not in context:
return False, "Rollback plan required but not provided"
return True, "Validation passed"
Observability and Audit
Complete observability is essential for security AI systems per SOC 2 Type II requirements. Every decision must be traceable and explainable.
import json
from datetime import datetime
import structlog
class AISecurityAuditLogger:
"""Comprehensive audit logging for AI security operations."""
def __init__(self, log_backend):
self.logger = structlog.get_logger()
self.backend = log_backend
def log_decision(self, decision: dict):
"""Log an AI decision with full context for audit."""
audit_record = {
"timestamp": datetime.utcnow().isoformat(),
"decision_id": decision.get("id"),
"decision_type": decision.get("type"),
"input_context": decision.get("context"),
"reasoning": decision.get("reasoning"),
"output": decision.get("output"),
"confidence": decision.get("confidence"),
"model_version": decision.get("model"),
"human_override": decision.get("overridden", False)
}
self.logger.info("ai_decision", **audit_record)
self.backend.store(audit_record)
def log_action(self, action: dict):
"""Log an executed action with rollback information."""
action_record = {
"timestamp": datetime.utcnow().isoformat(),
"action_id": action.get("id"),
"action_type": action.get("type"),
"target": action.get("target"),
"triggered_by": action.get("decision_id"),
"result": action.get("result"),
"rollback_available": action.get("rollback") is not None
}
self.logger.info("ai_action", **action_record)
self.backend.store(action_record)
Metrics and Evaluation
Track these metrics to measure AI orchestration effectiveness, aligned with security operations KPIs from SANS SOC Metrics:
| Metric | Description | Target | Measurement Method |
|---|
| Mean Time to Triage (MTTT) | Time from alert to initial classification | < 5 minutes | Timestamp delta |
| Mean Time to Respond (MTTR) | Time from detection to containment | < 30 minutes | Incident lifecycle |
| Automation Rate | Percentage of alerts handled without human intervention | > 70% low severity | Action attribution |
| False Positive Reduction | Decrease in analyst time on false positives | > 50% reduction | Before/after compare |
| Investigation Completeness | Percentage of relevant context gathered automatically | > 80% | Checklist coverage |
| Human Override Rate | Frequency of analyst corrections to AI decisions | < 10% | Override tracking |
| Mean Time to Detect (MTTD) | Time from attack start to detection | < 1 hour | Timeline analysis |
| Cost per Incident | Total cost including AI and human effort | 40% reduction | Resource tracking |
| Agent Accuracy | Precision and recall of AI classifications | > 95% precision | Confusion matrix |
Anti-Patterns to Avoid
Security AI orchestration introduces unique risks that require careful mitigation:
-
Unconstrained autonomy — AI agents must operate within defined boundaries with appropriate oversight. Define explicit action allowlists and implement circuit breakers per NIST SP 800-53 AC-6 (Least Privilege).
-
Opaque decision-making — All AI actions must be explainable and auditable. Implement structured reasoning logs that capture the decision context, evidence considered, and confidence levels.
-
Single point of failure — AI systems should degrade gracefully when unavailable. Implement fallback workflows that route to human analysts when AI systems are unavailable or confidence is low.
-
Insufficient testing — AI workflows require extensive testing with adversarial scenarios. Test against MITRE ATLAS adversarial ML techniques and red team AI decision-making.
-
Alert flooding attacks — Adversaries may attempt to overwhelm AI systems with false positives. Implement rate limiting and anomaly detection on alert volumes.
-
Feedback loop manipulation — If AI learns from analyst feedback, adversaries may attempt to poison the training data. Validate feedback sources and implement anomaly detection on model drift.
-
Over-reliance on automation — Maintain analyst skills by ensuring meaningful human involvement. Rotate automated alert categories to keep analysts engaged with diverse incidents.
| Tool | Purpose | Integration Type |
|---|
| LangChain | Agent framework and tool orchestration | Python SDK |
| LangGraph | Multi-agent graph workflows | Python SDK |
| AutoGen | Multi-agent conversation framework | Python SDK |
| CrewAI | Role-based multi-agent orchestration | Python SDK |
| Anthropic Claude | LLM for reasoning and analysis | REST API |
| Splunk SOAR | SOAR platform integration | REST API |
| Palo Alto XSOAR | SOAR platform integration | REST API |
| TheHive | Incident response platform | REST API |
| MISP | Threat intelligence platform | REST API |
| OpenCTI | Threat intelligence management | GraphQL API |
| Velociraptor | Endpoint visibility and collection | gRPC/REST API |
| Shuffle | Open-source SOAR automation | REST API |
References