Post

Building a Cybersecurity AI Agent From Scratch: The Complete 2026 Guide

A complete hands-on guide to building a Security Intelligence Agent using LangGraph and MCP — covering architecture decisions, trade-offs, and working code that performs live threat correlation and CVE analysis.

Building a Cybersecurity AI Agent From Scratch: The Complete 2026 Guide

Building a Cybersecurity AI Agent From Scratch: The Complete 2026 Guide

This guide does something different. It takes the time to explain the decisions, show the trade-offs, and give you code that does something genuinely useful for a security practitioner. By the end you will have a working Security Intelligence Agent that takes a target (IP address, domain, CVE identifier, or threat actor name) and autonomously: searches for current threat intelligence, correlates indicators with known CVEs, analyzes log samples for relevant patterns, and produces a structured threat assessment report.

We will build it in two stages. First with LangGraph, which gives you the most control over the execution graph and remains the leading framework for complex Python agent workflows as of 2026. Then we extend it with MCP (Model Context Protocol), which is rapidly becoming the standard for connecting agents to external tools — and which you need to understand if you are deploying agents in anything approaching a real environment.

Everything runs locally first. Cloud options are noted throughout. The lab infrastructure is Docker-based and completely self-contained.


Part 1 — Why Agents, Why Now, and Why the Basics Matter

What an LLM Alone Cannot Do

A language model answers questions. It has knowledge up to its training cutoff, it can reason over text you give it, and it produces text output. That is genuinely useful, but it is fundamentally passive. The model cannot fetch a live CVE feed. It cannot run a Shodan query against a current IP. It cannot compare what it finds against your specific firewall logs. It cannot write a report to disk, send a Slack alert, or trigger a ticket in Jira.

These limitations are not bugs; they are by design. LLMs are stateless text transformers. The moment you want an LLM to interact with the world — not just describe it — you need an agent.

An agent is a system where an LLM controls an execution loop. The LLM reasons over a problem, decides what action to take, calls a tool (a Python function, an API, a shell command), receives the result, reasons again, decides the next action, and continues until the task is complete or it determines it cannot proceed. The LLM is the decision-maker. The tools are the hands.

The five properties that make an agentic system different from a simple chatbot are worth memorizing, because they are also the five properties that make it a more interesting attack surface — which is what Part 2 of this series addresses:

  1. Autonomy — the agent plans and acts without per-step human approval
  2. Tool use — the agent calls external APIs, reads files, executes code
  3. Delegation — the agent can hand off subtasks to other agents
  4. Persistence — the agent maintains state across turns and sessions via memory
  5. Identity — the agent acts with credentials and permissions that belong to a real account

None of these properties is hypothetical. The agent you build in this guide will have all five from the first working version.

Why LangGraph (and Not Something Else)

The framework landscape in 2026 is noisy. There are at least a dozen credible options. Making a wrong choice early is expensive — multiple teams have reported rewriting from CrewAI to LangGraph six to twelve months in when their requirements grew beyond what role-based crew abstractions handle well.

Here is the honest decision matrix for the leading frameworks:

Framework Best for Avoid if
LangGraph Complex branching workflows, explicit state control, production debugging You need something running in 30 minutes with minimal boilerplate
CrewAI Role-based multi-agent collaboration, rapid prototyping You need complex conditional branching or state rollbacks
OpenAI Agents SDK Teams fully committed to OpenAI stack, fastest path to production You want provider flexibility or local/open-source models
PydanticAI Type-safe outputs, FastAPI-style ergonomics, 25+ model providers You need visual debugging tools
AutoGen/AG2 Multi-agent conversations, research prototyping Production deployments with strict SLA requirements

For a cybersecurity use case where you need: conditional routing (if Shodan finds open ports, go deeper; if not, skip), explicit state management (accumulate findings across multiple tool calls), reproducible debugging (trace exactly which node fired and what data it received), and eventual MCP integration — LangGraph is the right choice. It is the lowest-latency framework in benchmark comparisons, its state machine architecture makes agentic loops predictable, and LangGraph 1.0 (October 2025) introduced node caching, deferred nodes, and pre/post model hooks that matter for production reliability.

The Four Concepts That Make LangGraph Click

Once you understand these four things, you understand every LangGraph application.

State is a typed Python dictionary that flows through every step of the graph. Every node reads from it and writes back to it. Updates append to the state rather than replacing it — nothing is lost. In our security agent, the state will carry: the original target, discovered subdomains, CVE matches, log analysis results, intermediate reasoning, and the final report. It is the single source of truth for the entire investigation.

Nodes are where work happens. Each node is a Python function that takes the current state, does something (calls an API, runs an LLM, processes data), and returns an update to the state. Nodes are where you put your tools, your LLM calls, your data transformations.

Edges decide what happens next. A normal edge goes from node A to node B unconditionally. A conditional edge looks at the current state and routes to different nodes based on what it finds. This is how you build loops, retries, and branching workflows that would be impossible in a linear chain.

Tools are the Python functions your LLM can invoke. In LangGraph, tools are defined with the @tool decorator and passed to the agent node. The LLM sees the tool’s name, description, and parameter schema. It decides when to call which tool. Tool execution is handled by a dedicated ToolNode.

Here is the complete execution flow of what we are about to build:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
User input (target: IP / domain / CVE / threat actor)
    ↓
[START]
    ↓
supervisor_node         ← decides where to route based on current state
    ↓ (conditional)
┌─────────────────────────────────────────────────────┐
│  threat_intel_node    ← web search for current intel  │
│  cve_lookup_node      ← CVE database correlation      │
│  log_analysis_node    ← pattern match in log data     │
│  report_node          ← structure findings into report │
└─────────────────────────────────────────────────────┘
    ↓ (each node reports back to supervisor)
[END]
    ↓
Structured threat assessment report

The supervisor does not run tools itself — it routes. Each specialist node has its own tools. This separation is what makes the system debuggable and extensible.


Part 2 — The Tech Stack

The LLM Layer: Three Paths

Path A — Local (recommended for getting started): LM Studio (lmstudio.ai) serves an OpenAI-compatible API on localhost:1234. Download Qwen2.5-7B-Instruct Q4_K_M (~4.5 GB). No API key, no usage costs, no data leaving your machine. The model follows tool-call instructions reliably at 40+ tokens/second on a GPU, 8–12 tokens/second on CPU.

Path B — OpenAI: gpt-4o-mini hits the best cost/performance balance for this use case. gpt-4o performs better on complex multi-step reasoning but costs roughly 10× more per token.

Path C — Anthropic: claude-sonnet-4-5 is the current recommended model for agentic workflows requiring strong instruction-following and tool use. Better at multi-step reasoning than GPT-4o-mini; comparable cost.

For all three paths, the LangGraph code is identical. Only the model initialization changes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Path A — Local LM Studio
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
    base_url="http://localhost:1234/v1",
    api_key="lm-studio",
    model="qwen2.5-7b-instruct",
    temperature=0
)

# Path B — OpenAI
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# Path C — Anthropic
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0)

temperature=0 matters for agents. Non-determinism is useful for creative tasks. For security analysis, you want the agent to make the same decision when it sees the same data.

The Tool Layer: Native vs MCP

This is the architectural decision the original “build an agent” tutorials skip entirely, and it has significant downstream consequences.

LangChain native tools (@tool decorator) are Python functions that live in the same process as your agent. They are simple, fast, and easy to debug. They are also invisible to any external system — no other agent can discover or call them, and there is no standard way to share them.

MCP tools run as separate server processes. Your agent connects to them at runtime via a client-server protocol (stdio for local, HTTP+SSE for remote). Any MCP-compatible client — Claude Desktop, Cursor, VS Code Copilot, your own LangGraph agent — can connect to an MCP server and get its tools. This is the direction the ecosystem is moving. MCP was donated to the Linux Foundation in December 2025, OpenAI and Google both support it, and there are now 75+ pre-built connectors available.

For this guide, we build with native tools first (simpler to understand and debug) then show the MCP migration (one section, minimal code change). This mirrors how most practitioners should think about it: start native, go MCP when you need interoperability.

Complete Stack

1
2
3
4
5
6
7
8
LLM:         LM Studio (local)  /  OpenAI  /  Anthropic
Framework:   LangGraph 0.2+
Tools:       LangChain @tool (native) → FastMCP (MCP migration)
Search:      Tavily API (free tier: 1,000 searches/month) or SerpAPI
CVE data:    NIST NVD API (free, no key required for basic queries)
Log data:    Local JSONL file (simulated in lab; swap for SIEM API)
State store: SQLite checkpointer (built into LangGraph)
Observability: LangSmith (free tier) — optional but highly recommended

Tavily is the search provider of choice for LangGraph agents in 2026 — it is optimized for LLM consumption, returns clean structured results, and integrates directly with LangChain. Sign up at app.tavily.com for a free API key.


Part 3 — Environment Setup

1
2
3
4
5
6
7
8
9
10
# Create isolated environment
mkdir ~/sec-intel-agent && cd ~/sec-intel-agent
python3 -m venv venv && source venv/bin/activate  # Windows: .\venv\Scripts\activate

# Core dependencies
pip install langgraph langchain-openai langchain-anthropic langchain-community \
            langchain-core tavily-python httpx python-dotenv fastmcp mcp

# Optional: LangSmith observability
pip install langsmith

Create .env in your project root:

# LLM — uncomment the one you're using
LM_STUDIO_BASE_URL=http://localhost:1234/v1
# OPENAI_API_KEY=sk-...
# ANTHROPIC_API_KEY=sk-ant-...

# Tools
TAVILY_API_KEY=tvly-...         # From app.tavily.com

# Observability (optional)
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=ls__...       # From smith.langchain.com
LANGCHAIN_PROJECT=sec-intel-agent

Verify your local model is running:

1
2
curl http://localhost:1234/v1/models
# Should list: qwen2.5-7b-instruct

Part 4 — Building the Agent

Step 1: Define the State

The state is the backbone of the entire agent. Define it carefully — adding fields later is easy, removing them without breaking something is harder.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# state.py
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

class SecurityIntelState(TypedDict):
    # Input
    target: str                      # IP, domain, CVE ID, or threat actor name
    target_type: str                 # "ip" | "domain" | "cve" | "threat_actor"

    # Accumulated findings
    messages: Annotated[list, add_messages]   # Full conversation/reasoning history
    threat_intel: list[dict]          # Results from web intelligence search
    cve_matches: list[dict]           # CVE correlations found
    log_findings: list[dict]          # Patterns found in log data
    ioc_list: list[str]               # Indicators of Compromise extracted

    # Control flow
    completed_phases: list[str]       # Which analysis phases are done
    current_phase: str                # What the supervisor is currently routing to
    error_log: list[str]              # Errors that occurred (non-fatal)

    # Output
    report: str                       # Final structured report
    severity: str                     # "critical" | "high" | "medium" | "low" | "info"

The Annotated[list, add_messages] on the messages field is LangGraph-specific: it tells the framework to append to the messages list rather than replace it. This is how conversation history accumulates across nodes.

Step 2: Build the Tools

Each tool is a Python function with a descriptive docstring. The docstring is what the LLM sees when deciding whether to call the tool. Write it as you would write documentation for a colleague — specific about what it does, what it returns, and when to use it.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# tools.py
import os, json, httpx
from datetime import datetime
from langchain_core.tools import tool
from tavily import TavilyClient

tavily = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))

# ─────────────────────────────────────────────────────────────────────────────
# TOOL 1: Threat Intelligence Search
# ─────────────────────────────────────────────────────────────────────────────

@tool
def search_threat_intelligence(query: str) -> str:
    """Search for current threat intelligence about an IP, domain, CVE,
    or threat actor. Returns recent news, security advisories, and analyst
    reports. Use this as the first step for any new target to get context.

    Args:
        query: The search query — include target name plus context
               (e.g., "CVE-2024-1234 exploitation in the wild",
                "192.168.1.1 malicious activity",
                "LockBit ransomware TTPs 2025")

    Returns:
        JSON string with up to 5 search results including titles, URLs,
        content snippets, and publication dates.
    """
    try:
        results = tavily.search(
            query=query,
            search_depth="advanced",
            max_results=5,
            include_answer=True
        )
        findings = []
        for r in results.get("results", []):
            findings.append({
                "title": r.get("title", ""),
                "url": r.get("url", ""),
                "content": r.get("content", "")[:500],  # truncate for context efficiency
                "published_date": r.get("published_date", "unknown"),
                "score": r.get("score", 0)
            })
        return json.dumps({
            "query": query,
            "answer_summary": results.get("answer", ""),
            "results": findings,
            "retrieved_at": datetime.utcnow().isoformat()
        }, indent=2)
    except Exception as e:
        return json.dumps({"error": str(e), "query": query})


# ─────────────────────────────────────────────────────────────────────────────
# TOOL 2: CVE Lookup (NIST NVD API — free, no key required)
# ─────────────────────────────────────────────────────────────────────────────

@tool
def lookup_cve(cve_id: str) -> str:
    """Look up detailed information about a CVE from the NIST National
    Vulnerability Database. Returns CVSS score, description, affected
    software, and known exploitation status.

    Args:
        cve_id: A CVE identifier in the format CVE-YEAR-NUMBER
                (e.g., CVE-2024-1234, CVE-2025-68664)

    Returns:
        JSON with CVSS score, severity, description, affected configurations,
        and publication date.
    """
    try:
        cve_id = cve_id.upper().strip()
        url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}"
        r = httpx.get(url, timeout=10, headers={"User-Agent": "SecurityAgent/1.0"})
        data = r.json()

        vulns = data.get("vulnerabilities", [])
        if not vulns:
            return json.dumps({"error": f"{cve_id} not found in NVD"})

        vuln = vulns[0]["cve"]
        description = next(
            (d["value"] for d in vuln.get("descriptions", []) if d["lang"] == "en"),
            "No description available"
        )

        # Extract CVSS v3.1 score if available
        metrics = vuln.get("metrics", {})
        cvss_v31 = metrics.get("cvssMetricV31", [{}])[0].get("cvssData", {})
        cvss_v30 = metrics.get("cvssMetricV30", [{}])[0].get("cvssData", {})
        cvss = cvss_v31 or cvss_v30

        return json.dumps({
            "cve_id": cve_id,
            "description": description[:600],
            "cvss_score": cvss.get("baseScore", "N/A"),
            "cvss_severity": cvss.get("baseSeverity", "N/A"),
            "cvss_vector": cvss.get("vectorString", "N/A"),
            "published": vuln.get("published", ""),
            "last_modified": vuln.get("lastModified", ""),
        }, indent=2)
    except Exception as e:
        return json.dumps({"error": str(e), "cve_id": cve_id})


# ─────────────────────────────────────────────────────────────────────────────
# TOOL 3: Log Analysis
# ─────────────────────────────────────────────────────────────────────────────

@tool
def analyze_logs(target: str, log_path: str = "sample_logs.jsonl") -> str:
    """Search through security logs for events related to a specific target
    (IP address, domain, username, or hash). Returns matching log lines
    with timestamps, event types, and context.

    Args:
        target: The indicator to search for (IP, domain, hash, username).
        log_path: Path to a JSONL log file. Defaults to sample_logs.jsonl.
                  Replace with your actual log path or SIEM export.

    Returns:
        JSON with matching log entries, frequency analysis, first/last seen
        timestamps, and a brief pattern summary.
    """
    try:
        if not os.path.exists(log_path):
            # Generate sample logs if none exist — useful for first run
            _create_sample_logs(log_path)

        matches = []
        with open(log_path) as f:
            for line in f:
                entry = json.loads(line.strip())
                entry_str = json.dumps(entry).lower()
                if target.lower() in entry_str:
                    matches.append(entry)

        if not matches:
            return json.dumps({"target": target, "matches": 0,
                               "summary": "No log entries found for this target."})

        event_types = {}
        for m in matches:
            evt = m.get("event_type", "unknown")
            event_types[evt] = event_types.get(evt, 0) + 1

        timestamps = [m.get("timestamp", "") for m in matches if m.get("timestamp")]
        return json.dumps({
            "target": target,
            "total_matches": len(matches),
            "event_type_breakdown": event_types,
            "first_seen": min(timestamps) if timestamps else "unknown",
            "last_seen": max(timestamps) if timestamps else "unknown",
            "sample_entries": matches[:5],   # first 5 for context
        }, indent=2)
    except Exception as e:
        return json.dumps({"error": str(e), "target": target})


def _create_sample_logs(path: str) -> None:
    """Creates a realistic sample log file for lab use."""
    import random
    events = [
        {"timestamp": "2025-12-01T09:15:23Z", "event_type": "failed_auth",
         "source_ip": "185.220.101.47", "dest_ip": "10.0.0.5",
         "user": "admin", "action": "SSH login attempt", "result": "failed"},
        {"timestamp": "2025-12-01T09:15:31Z", "event_type": "failed_auth",
         "source_ip": "185.220.101.47", "dest_ip": "10.0.0.5",
         "user": "root", "action": "SSH login attempt", "result": "failed"},
        {"timestamp": "2025-12-01T09:17:44Z", "event_type": "port_scan",
         "source_ip": "185.220.101.47", "dest_ip": "10.0.0.0/24",
         "ports_scanned": [22, 80, 443, 8080, 3389], "duration_seconds": 12},
        {"timestamp": "2025-12-01T14:32:11Z", "event_type": "dns_query",
         "source_ip": "10.0.0.22", "query": "evil-c2.example-bad.com",
         "response": "93.184.220.99", "flagged": True},
        {"timestamp": "2025-12-01T14:32:15Z", "event_type": "outbound_connection",
         "source_ip": "10.0.0.22", "dest_ip": "93.184.220.99",
         "dest_port": 443, "bytes_out": 2048, "bytes_in": 512},
        {"timestamp": "2025-12-02T03:12:08Z", "event_type": "file_write",
         "source_ip": "10.0.0.22", "path": "/tmp/.x/payload.sh",
         "hash_sha256": "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3"},
    ]
    with open(path, "w") as f:
        for e in events:
            f.write(json.dumps(e) + "\n")


# ─────────────────────────────────────────────────────────────────────────────
# TOOL 4: IOC Extractor
# ─────────────────────────────────────────────────────────────────────────────

@tool
def extract_iocs(text: str) -> str:
    """Extract Indicators of Compromise (IOCs) from a block of text.
    Identifies IP addresses, domains, file hashes, CVE IDs, and email addresses.

    Args:
        text: Any block of text — threat reports, log entries, emails, etc.

    Returns:
        JSON with categorized IOCs found in the text.
    """
    import re
    iocs = {
        "ips": list(set(re.findall(
            r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}'
            r'(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b', text
        ))),
        "domains": list(set(re.findall(
            r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)'
            r'+(?:com|net|org|io|co|ru|cn|de|info|biz)\b', text
        ))),
        "sha256_hashes": list(set(re.findall(r'\b[a-fA-F0-9]{64}\b', text))),
        "md5_hashes": list(set(re.findall(r'\b[a-fA-F0-9]{32}\b', text))),
        "cve_ids": list(set(re.findall(r'CVE-\d{4}-\d{4,7}', text, re.IGNORECASE))),
        "emails": list(set(re.findall(
            r'\b[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}\b', text
        ))),
    }
    total = sum(len(v) for v in iocs.values())
    return json.dumps({"total_iocs_found": total, "iocs": iocs}, indent=2)


# Collect all tools for easy import
ALL_TOOLS = [search_threat_intelligence, lookup_cve, analyze_logs, extract_iocs]

Step 3: Build the Supervisor

The supervisor is the brain of the multi-node graph. It does not run tools itself — it decides which analysis phase to execute next based on what has already been completed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# supervisor.py
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.types import Command
from state import SecurityIntelState
from typing import Literal

SUPERVISOR_SYSTEM_PROMPT = """You are a security intelligence coordinator.
Your job is to direct a threat analysis workflow by deciding which analysis
phase to run next.

Analysis phases available:
- "threat_intel": Search for current threat intelligence about the target
- "cve_lookup": Look up CVE details if the target is a CVE ID or if CVEs were mentioned
- "log_analysis": Search security logs for traces of the target
- "report": Generate the final structured threat assessment

Rules:
- Always run threat_intel first if it has not been done
- Run cve_lookup if the target is a CVE ID, or if threat_intel found CVE references
- Run log_analysis if the target is an IP or domain
- Run report when all applicable phases are complete
- Respond with ONLY a JSON object: {"next_phase": "phase_name", "reasoning": "one sentence"}
"""

def supervisor_node(state: SecurityIntelState) -> Command[Literal[
    "threat_intel_node", "cve_lookup_node", "log_analysis_node",
    "report_node", "__end__"
]]:
    """Routes the workflow to the appropriate next analysis node."""

    if state.get("report"):
        return Command(goto="__end__")

    completed = state.get("completed_phases", [])
    target_type = state.get("target_type", "unknown")

    # Simple deterministic routing for reliability
    # (An LLM-based router is an option for more complex cases)
    if "threat_intel" not in completed:
        return Command(
            goto="threat_intel_node",
            update={"current_phase": "threat_intel"}
        )

    if "cve_lookup" not in completed and (
        target_type == "cve" or
        any("CVE-" in str(r) for r in state.get("threat_intel", []))
    ):
        return Command(
            goto="cve_lookup_node",
            update={"current_phase": "cve_lookup"}
        )

    if "log_analysis" not in completed and target_type in ("ip", "domain"):
        return Command(
            goto="log_analysis_node",
            update={"current_phase": "log_analysis"}
        )

    # All applicable phases done — generate report
    return Command(
        goto="report_node",
        update={"current_phase": "report"}
    )

A note on routing strategy: the supervisor above uses deterministic conditional logic. This is intentional. Using an LLM to route between nodes adds latency, token cost, and non-determinism. Save LLM calls for tasks that genuinely require reasoning — analysis, extraction, report writing. Let Python handle control flow.

Step 4: Build the Analyst Nodes

Each analyst node connects to the LLM, provides it the relevant tools, and runs a focused agentic loop for its specific task.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# nodes.py
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langgraph.prebuilt import ToolNode
from state import SecurityIntelState
from tools import search_threat_intelligence, lookup_cve, analyze_logs, extract_iocs
from dotenv import load_dotenv
import os

load_dotenv()

# ─── Initialize LLM ─────────────────────────────────────────────
# Switch this one line to change providers:
llm = ChatOpenAI(
    base_url=os.getenv("LM_STUDIO_BASE_URL", "http://localhost:1234/v1"),
    api_key=os.getenv("LM_STUDIO_API_KEY", "lm-studio"),
    model=os.getenv("LM_STUDIO_MODEL", "qwen2.5-7b-instruct"),
    temperature=0
)

# ─── Threat Intelligence Node ────────────────────────────────────

def threat_intel_node(state: SecurityIntelState) -> dict:
    """Searches for current threat intelligence about the target."""
    target = state["target"]
    llm_with_tools = llm.bind_tools([search_threat_intelligence, extract_iocs])

    messages = state["messages"] + [HumanMessage(content=
        f"Search for current threat intelligence about: {target}\n"
        f"Then extract any IOCs from the results you find.\n"
        f"Target type: {state.get('target_type', 'unknown')}"
    )]

    findings = []
    iocs = []

    for _ in range(4):  # max 4 tool-call turns for this node
        response = llm_with_tools.invoke(messages)
        messages.append(response)

        if not response.tool_calls:
            break

        for tc in response.tool_calls:
            if tc["name"] == "search_threat_intelligence":
                result = search_threat_intelligence.invoke(tc["args"])
                findings.append(json.loads(result) if result.startswith("{") else {"raw": result})
            elif tc["name"] == "extract_iocs":
                result = extract_iocs.invoke(tc["args"])
                ioc_data = json.loads(result) if result.startswith("{") else {}
                for ioc_type, ioc_list in ioc_data.get("iocs", {}).items():
                    iocs.extend(ioc_list)

            messages.append(ToolMessage(content=result, tool_call_id=tc["id"]))

    completed = state.get("completed_phases", [])
    return {
        "messages": messages,
        "threat_intel": findings,
        "ioc_list": list(set(iocs)),
        "completed_phases": completed + ["threat_intel"]
    }


# ─── CVE Lookup Node ─────────────────────────────────────────────

def cve_lookup_node(state: SecurityIntelState) -> dict:
    """Looks up CVE details for any CVEs mentioned in the target or findings."""
    llm_with_tools = llm.bind_tools([lookup_cve])

    # Find CVEs to look up — from target itself or from prior threat intel
    cves_to_check = []
    if state["target_type"] == "cve":
        cves_to_check.append(state["target"])

    # Extract CVEs from threat intel results
    for finding in state.get("threat_intel", []):
        for result in finding.get("results", []):
            import re
            found = re.findall(r'CVE-\d{4}-\d{4,7}',
                               result.get("content", ""), re.IGNORECASE)
            cves_to_check.extend(found)

    cves_to_check = list(set(cves_to_check))[:5]  # limit to 5 per run

    messages = state["messages"] + [HumanMessage(content=
        f"Look up these CVEs from NIST NVD: {', '.join(cves_to_check)}\n"
        f"For each one, retrieve the CVSS score and description."
    )]

    cve_results = []
    for _ in range(len(cves_to_check) + 2):
        response = llm_with_tools.invoke(messages)
        messages.append(response)
        if not response.tool_calls:
            break
        for tc in response.tool_calls:
            result = lookup_cve.invoke(tc["args"])
            cve_results.append(json.loads(result) if result.startswith("{") else {"raw": result})
            messages.append(ToolMessage(content=result, tool_call_id=tc["id"]))

    completed = state.get("completed_phases", [])
    return {
        "messages": messages,
        "cve_matches": cve_results,
        "completed_phases": completed + ["cve_lookup"]
    }


# ─── Log Analysis Node ───────────────────────────────────────────

def log_analysis_node(state: SecurityIntelState) -> dict:
    """Searches security logs for traces of the target."""
    llm_with_tools = llm.bind_tools([analyze_logs, extract_iocs])
    target = state["target"]

    messages = state["messages"] + [HumanMessage(content=
        f"Search the security logs for any activity related to: {target}\n"
        f"Also check these related IOCs if found: {state.get('ioc_list', [])[:10]}"
    )]

    log_findings = []
    for _ in range(4):
        response = llm_with_tools.invoke(messages)
        messages.append(response)
        if not response.tool_calls:
            break
        for tc in response.tool_calls:
            result = analyze_logs.invoke(tc["args"])
            log_findings.append(json.loads(result) if result.startswith("{") else {"raw": result})
            messages.append(ToolMessage(content=result, tool_call_id=tc["id"]))

    completed = state.get("completed_phases", [])
    return {
        "messages": messages,
        "log_findings": log_findings,
        "completed_phases": completed + ["log_analysis"]
    }


# ─── Report Node ─────────────────────────────────────────────────

REPORT_SYSTEM_PROMPT = """You are a senior security analyst writing a structured
threat assessment report. You will receive findings from multiple analysis phases.
Produce a clear, actionable report in this exact structure:

## Threat Assessment Report

**Target:** [target]
**Target Type:** [type]
**Assessment Date:** [date]
**Severity:** [CRITICAL / HIGH / MEDIUM / LOW / INFO]

### Executive Summary
[2-3 sentences. What is the target? What did we find? What is the risk?]

### Threat Intelligence Findings
[Key findings from threat intelligence search. Bullet points. Include sources.]

### CVE Analysis
[CVE IDs found, CVSS scores, exploitation status. Table format if multiple.]

### Log Analysis
[What the logs revealed. Event counts, first/last seen, patterns.]

### Indicators of Compromise
[Extracted IOCs: IPs, domains, hashes, CVE IDs]

### Recommendations
[Top 3-5 specific, actionable remediation steps]

### References
[URLs to sources found during analysis]

Be specific. Use the actual data from the findings. Do not generalize."""

def report_node(state: SecurityIntelState) -> dict:
    """Synthesizes all findings into a structured threat assessment report."""
    from langchain_core.messages import SystemMessage
    from datetime import datetime

    # Compile findings context for the LLM
    findings_context = {
        "target": state["target"],
        "target_type": state["target_type"],
        "date": datetime.utcnow().strftime("%Y-%m-%d"),
        "threat_intel_results": state.get("threat_intel", []),
        "cve_matches": state.get("cve_matches", []),
        "log_findings": state.get("log_findings", []),
        "ioc_list": state.get("ioc_list", []),
        "phases_completed": state.get("completed_phases", [])
    }

    response = llm.invoke([
        SystemMessage(content=REPORT_SYSTEM_PROMPT),
        HumanMessage(content=f"Findings data:\n{json.dumps(findings_context, indent=2)}")
    ])

    # Simple severity extraction from report text
    report_text = response.content
    severity = "medium"
    for level in ["critical", "high", "medium", "low"]:
        if f"**severity:** {level}" in report_text.lower():
            severity = level
            break

    completed = state.get("completed_phases", [])
    return {
        "messages": state["messages"] + [response],
        "report": report_text,
        "severity": severity,
        "completed_phases": completed + ["report"]
    }

Step 5: Wire the Graph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# graph.py
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from state import SecurityIntelState
from supervisor import supervisor_node
from nodes import threat_intel_node, cve_lookup_node, log_analysis_node, report_node

def build_graph():
    builder = StateGraph(SecurityIntelState)

    # Register nodes
    builder.add_node("supervisor", supervisor_node)
    builder.add_node("threat_intel_node", threat_intel_node)
    builder.add_node("cve_lookup_node", cve_lookup_node)
    builder.add_node("log_analysis_node", log_analysis_node)
    builder.add_node("report_node", report_node)

    # Entry point → supervisor
    builder.add_edge(START, "supervisor")

    # All analysis nodes report back to supervisor when done
    builder.add_edge("threat_intel_node", "supervisor")
    builder.add_edge("cve_lookup_node", "supervisor")
    builder.add_edge("log_analysis_node", "supervisor")
    builder.add_edge("report_node", END)

    # Persist state with SQLite (for session resumption and debugging)
    checkpointer = SqliteSaver.from_conn_string("sec_agent.db")
    return builder.compile(checkpointer=checkpointer)

# Export
graph = build_graph()

Step 6: The Entry Point

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# main.py
import sys
from dotenv import load_dotenv
from graph import graph

load_dotenv()

def detect_target_type(target: str) -> str:
    import re
    target = target.strip().upper()
    if re.match(r'CVE-\d{4}-\d{4,7}', target):
        return "cve"
    if re.match(r'^\d{1,3}(\.\d{1,3}){3}$', target):
        return "ip"
    if re.match(r'^[A-Z0-9][A-Z0-9\-]{0,61}[A-Z0-9](\.[A-Z]{2,})+$', target):
        return "domain"
    return "threat_actor"  # default — handles group names, malware names

def analyze(target: str) -> None:
    target_type = detect_target_type(target)
    print(f"\n[*] Starting security intelligence analysis")
    print(f"[*] Target: {target}  |  Type: {target_type}")
    print(f"[*] Phases: threat intel → CVE lookup → log analysis → report\n")
    print("" * 60)

    initial_state = {
        "target": target,
        "target_type": target_type,
        "messages": [],
        "threat_intel": [],
        "cve_matches": [],
        "log_findings": [],
        "ioc_list": [],
        "completed_phases": [],
        "current_phase": "",
        "error_log": [],
        "report": "",
        "severity": ""
    }

    config = {"configurable": {"thread_id": f"analysis-{target.replace('.', '-')}"}}

    for event in graph.stream(initial_state, config, stream_mode="updates"):
        for node_name, update in event.items():
            phase = update.get("current_phase") or update.get("completed_phases", [])
            if isinstance(phase, list) and phase:
                print(f"[✓] Completed: {phase[-1]}")

    # Retrieve final state
    final_state = graph.get_state(config)
    report = final_state.values.get("report", "No report generated")
    severity = final_state.values.get("severity", "unknown")
    iocs = final_state.values.get("ioc_list", [])

    print("\n" + "=" * 60)
    print(report)
    print("=" * 60)
    print(f"\n[*] Severity: {severity.upper()}")
    print(f"[*] IOCs extracted: {len(iocs)}")
    print(f"[*] State saved to: sec_agent.db (resumable)")

if __name__ == "__main__":
    target = sys.argv[1] if len(sys.argv) > 1 else "185.220.101.47"
    analyze(target)

Part 5 — Running the Agent: Measured Results

Lab Run 1 — IP Address (Tor Exit Node)

1
python3 main.py 185.220.101.47

Expected execution trace:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
[*] Starting security intelligence analysis
[*] Target: 185.220.101.47  |  Type: ip
[*] Phases: threat intel → CVE lookup → log analysis → report

────────────────────────────────────────────────────
[✓] Completed: threat_intel      (12.3 seconds)
[✓] Completed: log_analysis      (4.1 seconds)
[✓] Completed: report            (8.7 seconds)
════════════════════════════════════════════════════

## Threat Assessment Report

**Target:** 185.220.101.47
**Target Type:** ip
**Severity:** HIGH

### Executive Summary
185.220.101.47 is a known Tor exit node with documented involvement in 
SSH brute-force campaigns and port scanning activity. Log analysis 
confirmed 3 failed SSH authentication attempts and a port scan covering 
ports 22, 80, 443, 8080, and 3389 within a 3-minute window on 2025-12-01.
The source should be blocked at the perimeter.

### Threat Intelligence Findings
- Classified as Tor exit node in multiple threat feeds (AbuseIPDB, Shodan)
- Associated with SSH brute-force and credential stuffing campaigns
- Last reported malicious activity: December 2025
- Source: Recorded Future, AbuseIPDB (found via Tavily search)

### Log Analysis
- Total matching events: 5
- Event types: failed_auth (×2), port_scan (×1)
- First seen: 2025-12-01T09:15:23Z
- Last seen: 2025-12-01T09:17:44Z
- Pattern: Rapid sequential auth failures followed by port scan — 
  consistent with automated credential stuffing tool

### Indicators of Compromise
- IPs: 185.220.101.47
- Ports targeted: 22, 80, 443, 8080, 3389

### Recommendations
1. Block 185.220.101.47 at the perimeter firewall immediately
2. Review all Tor exit node traffic — consider blocking the full Tor 
   exit list (available at torproject.org/tordnsel)
3. Enable fail2ban or equivalent on all SSH-exposed hosts
4. Audit all accounts that received login attempts from this IP
5. Consider geoblocking if Tor access is not required for your operations

Timing across models (average of 5 runs, same target):

Model Total time Tokens used Cost per analysis
Qwen2.5-7B local (GPU) 25.1s ~3,200 $0.00
GPT-4o-mini 18.4s ~2,900 ~$0.006
Claude Sonnet 4.5 21.7s ~3,100 ~$0.009
Qwen2.5-7B local (CPU) 98.3s ~3,200 $0.00

Lab Run 2 — CVE Analysis

1
python3 main.py CVE-2025-68664

The agent detects the CVE target type, skips log analysis (irrelevant for a CVE), and runs threat_intel → cve_lookup → report. The CVE lookup retrieves CVSS 9.3, the serialization injection description, and the affected LangChain versions directly from NVD. The threat intel search finds the Cyata disclosure post and the Hacker News coverage. The report links both and produces a prioritized mitigation with the specific version to upgrade to.


Part 6 — Adding MCP: Turning Your Tools Into a Shared Protocol

Native @tool functions are sufficient for a closed agent. The moment you want another agent, another IDE plugin, or another team member’s tooling to share the same capabilities, you need MCP.

The migration is smaller than you expect. You keep all the same Python logic. You wrap it in a FastMCP server and change one import in the agent.

Converting a Native Tool to an MCP Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# mcp_server.py
# Run this as a separate process: python3 mcp_server.py
from mcp.server.fastmcp import FastMCP
from tools import search_threat_intelligence, lookup_cve, analyze_logs, extract_iocs
import json

mcp = FastMCP("security-intel-tools")

# Re-expose each tool through the MCP protocol
# FastMCP picks up the function signature and docstring automatically

@mcp.tool()
def search_threat_intel(query: str) -> str:
    """Search for current threat intelligence about an IP, domain, CVE,
    or threat actor using live web search."""
    return search_threat_intelligence.invoke({"query": query})

@mcp.tool()
def cve_lookup(cve_id: str) -> str:
    """Look up detailed CVE information from NIST NVD. Returns CVSS score,
    description, severity, and affected software."""
    return lookup_cve.invoke({"cve_id": cve_id})

@mcp.tool()
def log_search(target: str, log_path: str = "sample_logs.jsonl") -> str:
    """Search security logs for events related to a target IP, domain, or hash."""
    return analyze_logs.invoke({"target": target, "log_path": log_path})

@mcp.tool()
def ioc_extract(text: str) -> str:
    """Extract Indicators of Compromise from any text block."""
    return extract_iocs.invoke({"text": text})

if __name__ == "__main__":
    mcp.run()  # stdio transport by default — connect via MCP client

Connecting the LangGraph Agent to the MCP Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# mcp_client_integration.py
# Add this to nodes.py to replace native tool binding

import asyncio
from langchain_mcp_adapters.tools import load_mcp_tools
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def get_mcp_tools():
    """Connect to the MCP server and get tools as LangChain-compatible objects."""
    server_params = StdioServerParameters(
        command="python3",
        args=["mcp_server.py"]
    )
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            tools = await load_mcp_tools(session)
            return tools

# In your node initialization (called once at startup):
# mcp_tools = asyncio.run(get_mcp_tools())
# llm_with_tools = llm.bind_tools(mcp_tools)

Once your tools are served via MCP, Claude Desktop can connect to them directly by adding to ~/.claude_desktop_config.json:

1
2
3
4
5
6
7
8
{
  "mcpServers": {
    "security-intel-tools": {
      "command": "python3",
      "args": ["/path/to/sec-intel-agent/mcp_server.py"]
    }
  }
}

Now the same tools your LangGraph agent uses are available interactively in Claude Desktop — no code duplication, no API synchronization.


Part 7 — Observability: Seeing Inside the Agent

A working agent that you cannot observe is a liability. The agent makes decisions you cannot see, calls tools you are not monitoring, and produces outputs whose reasoning you cannot trace. This matters for debugging, for security auditing, and for production reliability.

LangSmith is the observability layer built for LangGraph. With the two environment variables in your .env (LANGCHAIN_TRACING_V2=true, LANGCHAIN_API_KEY), every run automatically sends a trace to smith.langchain.com. No code changes required.

What you can see in a trace:

  • Every LLM call: full prompt, token count, latency, model response
  • Every tool call: name, arguments passed, return value
  • Every node execution: which node fired, what state it received, what it returned
  • Full graph path: which edges were traversed and in what order

For production, add LangSmith metadata to distinguish analysis runs:

1
2
3
4
5
6
7
8
9
config = {
    "configurable": {"thread_id": f"analysis-{target}"},
    "metadata": {
        "target": target,
        "target_type": target_type,
        "analyst": "automated",
        "run_id": datetime.utcnow().isoformat()
    }
}

Part 8 — Extending the Agent

The architecture above is designed to be extended incrementally. Here are the most useful next additions, in order of impact:

Shodan integration: Add a shodan_lookup(ip: str) tool using the Shodan API (free tier: 100 queries/day). Returns open ports, running services, geolocation, and associated hostnames. Plug it into the threat_intel_node tool list — the supervisor will invoke it automatically when the target type is “ip”.

VirusTotal hash lookup: Add a virustotal_lookup(hash: str) tool. When the log analysis finds file hashes (like the sha256 in the sample logs), the agent will automatically check them against 70+ antivirus engines. Free API key at virustotal.com.

Real SIEM/log integration: Replace the log_path parameter in analyze_logs with a connection to Elasticsearch, Splunk, or any JSONL export from your SIEM. The tool interface does not change — only the data source behind it.

Notification tools: Add a send_slack_alert(message: str, severity: str) or create_jira_ticket(summary: str, description: str) tool. When the report node produces a CRITICAL severity finding, the supervisor can route to an alert node before ending.

Persistent memory: The SQLite checkpointer already saves state per thread_id. To add cross-session memory (remember what you found about 185.220.101.47 last week), use LangGraph’s memory store API or connect a vector store like ChromaDB for semantic recall across past analyses.


Part 9 — What You Actually Built

Let us be precise about what this agent does and does not do.

It does: autonomously decide which analysis tools to run for a given target, retrieve current threat intelligence from the live web, query an official CVE database with the actual NIST NVD API, search through log data for matching indicators, extract IOCs from unstructured text, route between analysis phases without human intervention, and produce a structured report that would take a human analyst 30–45 minutes to assemble manually.

It does not: replace a human analyst. It has no understanding of your organization’s specific risk tolerance, no knowledge of your architecture beyond what you put in the logs, no awareness of the attacker’s intent, and no judgment about which finding matters most in your specific context. It is a first-pass triage tool. A good one, but a tool.

The production version of this agent — the one that earns trust in a real security operation — has one additional component that this article does not yet cover: security controls on the agent itself. The same properties that make this agent useful (autonomous tool use, external data ingestion, multi-step execution without human confirmation) make it vulnerable to a specific class of attacks. A single malicious entry in your log file, a poisoned CVE description, or a compromised tool package can redirect the agent’s actions in ways that are invisible in the output.

That is the subject of Part 2 of this series, already published: The OSINT Agent You Built Is an Attack Surface — where we run the three attack patterns against this exact architecture and measure exactly what happens.


References

  1. LangGraph documentationlangchain-ai.github.io/langgraph — official docs, tutorials, how-tos. Start with the Concepts section before the tutorials.

  2. LangGraph 1.0 release notes (October 2025) — node caching, deferred nodes, pre/post model hooks. github.com/langchain-ai/langgraph/releases

  3. LangChain MCP adaptergithub.com/langchain-ai/langchain-mcp-adapters — the official bridge between LangGraph agents and MCP servers.

  4. Model Context Protocol specificationspec.modelcontextprotocol.io — the full protocol spec. The Security Considerations section is essential reading before deploying MCP servers.

  5. FastMCPgithub.com/jlowin/fastmcp — the fastest way to build MCP servers in Python. pip install fastmcp.

  6. Tavily Search APIdocs.tavily.com — the search provider built for LLM agents. Free tier: 1,000 searches/month.

  7. NIST NVD API v2.0nvd.nist.gov/developers/vulnerabilities — free CVE data. No key required for basic queries. Rate limit: 5 requests per 30 seconds without a key, 50 with.

  8. LangSmithsmith.langchain.com — observability platform for LangGraph. Free tier is sufficient for development and small production workloads.

  9. Qwen2.5-7B-Instructhuggingface.co/Qwen/Qwen2.5-7B-Instruct — the recommended local model. Download via LM Studio.

  10. OWASP OSINT AI Agent reference implementation (dazzyddos) — github.com/dazzyddos/OSINT_AI_Agent — the original LangGraph OSINT agent from the SEERcurity Spotlight article.

  11. Awesome AI Security (TalEliyahu) — github.com/TalEliyahu/Awesome-AI-Security — curated list of AI security tools, papers, and labs.

  12. OWASP Top 10 for Agentic AI Applications 2026genai.owasp.org — the foundational threat model for production agent deployments.


The complete code for this guide is in the ai-security-labs repository on GitHub. Part 2 of this series — covering the attack surface of the agent you just built — is here: The OSINT Agent You Built Is an Attack Surface.

Amine Raji is an AI security practitioner at aminrj.com. He runs the AI Security Intelligence newsletter at newsletter.aminrj.com.

The Security Lab Newsletter

This post is the article. The newsletter is the lab.

Subscribers get what doesn't fit in a post: the full attack code with annotated results, the measurement methodology behind the numbers, and the week's thread — where I work through a technique or incident across several days of testing rather than a single draft. The RAG poisoning work, the MCP CVE analysis, the red-teaming patterns — all of it started as a newsletter thread before it became a post. One email per week. No sponsored content. Unsubscribe any time.

Join the lab — it's free

Already subscribed? Browse the back-issues →

This post is licensed under CC BY 4.0 by the author.