Agent skill

implementing-stix-taxii-feed-integration

STIX (Structured Threat Information eXpression) and TAXII (Trusted Automated eXchange of Intelligence Information) are OASIS open standards for representing and transporting cyber threat intelligence.

Stars 4,300
Forks 470

Install this agent skill to your Project

npx add-skill https://github.com/mukul975/Anthropic-Cybersecurity-Skills/tree/main/skills/implementing-stix-taxii-feed-integration

SKILL.md

Implementing STIX/TAXII Feed Integration

Overview

STIX (Structured Threat Information eXpression) and TAXII (Trusted Automated eXchange of Intelligence Information) are OASIS open standards for representing and transporting cyber threat intelligence. This skill covers implementing a STIX/TAXII 2.1 feed consumer and producer using Python, configuring TAXII server discovery, collection management, polling for new intelligence, parsing STIX 2.1 objects, and integrating feeds into SIEM and TIP platforms.

When to Use

  • When deploying or configuring implementing stix taxii feed integration capabilities in your environment
  • When establishing security controls aligned to compliance requirements
  • When building or improving security architecture for this domain
  • When conducting security assessments that require this implementation

Prerequisites

  • Python 3.9+ with taxii2-client, stix2, cti-taxii-client libraries
  • Understanding of STIX 2.1 data model (SDOs, SCOs, SROs)
  • Understanding of TAXII 2.1 protocol (discovery, API roots, collections)
  • Network access to TAXII servers (MITRE ATT&CK TAXII, Anomali STAXX)
  • Optional: medallion for running a local TAXII 2.1 server

Key Concepts

TAXII 2.1 Architecture

TAXII defines a RESTful API with three service types:

  • Discovery: Returns information about available API roots
  • API Root: Contains collections and serves as the main interaction point
  • Collection: A logical grouping of STIX objects accessible via GET/POST

STIX 2.1 Object Model

STIX objects are categorized as:

  • SDOs (STIX Domain Objects): Indicator, Malware, Threat Actor, Campaign, Attack Pattern, Tool, Infrastructure, Vulnerability, Identity, Location, Note, Opinion, Report, Grouping
  • SCOs (STIX Cyber Observables): IPv4-Addr, Domain-Name, URL, File, Email-Addr, Process, Network-Traffic, Artifact
  • SROs (STIX Relationship Objects): Relationship, Sighting
  • Meta Objects: Marking Definition (TLP), Language Content, Extension Definition

STIX Bundle

A Bundle is a collection of STIX objects transmitted together. Bundles have a unique ID and contain an array of objects. TAXII collections serve bundles in response to GET requests.

Workflow

Step 1: TAXII Server Discovery

python
from taxii2client.v21 import Server, Collection, as_pages

# Connect to MITRE ATT&CK TAXII server
server = Server("https://cti-taxii.mitre.org/taxii2/", user="", password="")

print(f"Title: {server.title}")
print(f"Description: {server.description}")

# List API roots
for api_root in server.api_roots:
    print(f"\nAPI Root: {api_root.title}")
    print(f"  URL: {api_root.url}")

    # List collections
    for collection in api_root.collections:
        print(f"  Collection: {collection.title} (ID: {collection.id})")
        print(f"    Can Read: {collection.can_read}")
        print(f"    Can Write: {collection.can_write}")

Step 2: Fetch STIX Objects from Collection

python
from taxii2client.v21 import Collection, as_pages
import json

# Connect to Enterprise ATT&CK collection
ENTERPRISE_ATTACK_ID = "95ecc380-afe9-11e4-9b6c-751b66dd541e"
collection = Collection(
    f"https://cti-taxii.mitre.org/stix/collections/{ENTERPRISE_ATTACK_ID}/",
    user="",
    password="",
)

print(f"Collection: {collection.title}")

# Fetch all objects (paginated)
all_objects = []
for envelope in as_pages(collection.get_objects, per_request=50):
    objects = envelope.get("objects", [])
    all_objects.extend(objects)
    print(f"  Fetched {len(objects)} objects (total: {len(all_objects)})")

print(f"\nTotal objects retrieved: {len(all_objects)}")

# Categorize by type
type_counts = {}
for obj in all_objects:
    obj_type = obj.get("type", "unknown")
    type_counts[obj_type] = type_counts.get(obj_type, 0) + 1

for obj_type, count in sorted(type_counts.items()):
    print(f"  {obj_type}: {count}")

Step 3: Parse STIX 2.1 Objects with stix2 Library

python
from stix2 import parse, Filter, MemoryStore

# Load objects into a MemoryStore for querying
store = MemoryStore(stix_data=all_objects)

# Query for all indicators
indicators = store.query([Filter("type", "=", "indicator")])
print(f"Indicators: {len(indicators)}")

for ind in indicators[:5]:
    print(f"  {ind.name}: {ind.pattern}")

# Query for malware
malware_list = store.query([Filter("type", "=", "malware")])
print(f"\nMalware families: {len(malware_list)}")

# Query for threat actors
actors = store.query([Filter("type", "=", "intrusion-set")])
print(f"Threat actors: {len(actors)}")

# Find relationships for a specific object
def get_related(store, source_id):
    relationships = store.query([
        Filter("type", "=", "relationship"),
        Filter("source_ref", "=", source_id),
    ])
    return relationships

# Example: Get all techniques used by APT28
apt28 = store.query([
    Filter("type", "=", "intrusion-set"),
    Filter("name", "=", "APT28"),
])
if apt28:
    rels = get_related(store, apt28[0].id)
    for rel in rels:
        target = store.get(rel.target_ref)
        if target:
            print(f"  {rel.relationship_type} -> {target.name} ({target.type})")

Step 4: Implement Custom TAXII Consumer

python
from taxii2client.v21 import Collection, as_pages
from stix2 import parse, Bundle
from datetime import datetime, timedelta
import json

class TAXIIConsumer:
    """Consume STIX/TAXII 2.1 feeds and extract IOCs."""

    def __init__(self, collection_url, user="", password=""):
        self.collection = Collection(collection_url, user=user, password=password)
        self.last_poll = None

    def poll_new_objects(self, added_after=None):
        """Poll for objects added after a specific timestamp."""
        if added_after is None:
            added_after = (
                self.last_poll or
                (datetime.utcnow() - timedelta(days=1)).strftime(
                    "%Y-%m-%dT%H:%M:%S.000Z"
                )
            )

        all_objects = []
        kwargs = {"added_after": added_after}

        for envelope in as_pages(
            self.collection.get_objects, per_request=100, **kwargs
        ):
            objects = envelope.get("objects", [])
            all_objects.extend(objects)

        self.last_poll = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
        return all_objects

    def extract_indicators(self, objects):
        """Extract actionable indicators from STIX objects."""
        indicators = []
        for obj in objects:
            if obj.get("type") == "indicator":
                indicators.append({
                    "id": obj.get("id"),
                    "name": obj.get("name", ""),
                    "pattern": obj.get("pattern", ""),
                    "pattern_type": obj.get("pattern_type", ""),
                    "valid_from": obj.get("valid_from", ""),
                    "valid_until": obj.get("valid_until", ""),
                    "indicator_types": obj.get("indicator_types", []),
                    "confidence": obj.get("confidence", 0),
                    "labels": obj.get("labels", []),
                })
        return indicators

    def extract_observables(self, objects):
        """Extract STIX Cyber Observables."""
        observables = []
        observable_types = {
            "ipv4-addr", "ipv6-addr", "domain-name", "url",
            "file", "email-addr", "network-traffic",
        }
        for obj in objects:
            if obj.get("type") in observable_types:
                observables.append({
                    "type": obj["type"],
                    "value": obj.get("value", ""),
                    "id": obj.get("id"),
                })
        return observables


# Usage
consumer = TAXIIConsumer(
    f"https://cti-taxii.mitre.org/stix/collections/{ENTERPRISE_ATTACK_ID}/"
)
new_objects = consumer.poll_new_objects()
indicators = consumer.extract_indicators(new_objects)
print(f"New indicators: {len(indicators)}")

Step 5: Set Up Local TAXII Server with Medallion

python
# medallion configuration (medallion.conf)
TAXII_CONFIG = {
    "backend": {
        "module_class": "MemoryBackend",
    },
    "users": {
        "admin": "admin_password",
        "readonly": "readonly_password",
    },
    "taxii": {
        "max_content_length": 10485760,
    },
}

# Run medallion server:
# pip install medallion
# python -m medallion --config medallion.conf --port 5000

# Add objects to local TAXII server
import requests

def push_to_taxii(server_url, collection_id, stix_bundle, user, password):
    """Push STIX bundle to a TAXII 2.1 collection."""
    url = f"{server_url}/collections/{collection_id}/objects/"
    headers = {
        "Content-Type": "application/stix+json;version=2.1",
        "Accept": "application/taxii+json;version=2.1",
    }
    response = requests.post(
        url,
        json=stix_bundle,
        headers=headers,
        auth=(user, password),
        timeout=30,
    )
    return response.json()

Validation Criteria

  • TAXII server discovery returns valid API roots and collections
  • STIX objects fetched and parsed correctly from TAXII collections
  • Indicators extracted with valid STIX patterns
  • Pagination handled correctly for large collections
  • Consumer tracks polling state for incremental updates
  • Local TAXII server accepts and serves STIX bundles

References

Expand your agent's capabilities with these related and highly-rated skills.

mukul975/Anthropic-Cybersecurity-Skills

mapping-mitre-attack-techniques

Maps observed adversary behaviors, security alerts, and detection rules to MITRE ATT&CK techniques and sub-techniques to quantify detection coverage and guide control prioritization. Use when building an ATT&CK-based coverage heatmap, tagging SIEM alerts with technique IDs, aligning security controls to adversary playbooks, or reporting threat exposure to executives. Activates for requests involving ATT&CK Navigator, Sigma rules, MITRE D3FEND, or coverage gap analysis.

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

hunting-for-spearphishing-indicators

Hunt for spearphishing campaign indicators across email logs, endpoint telemetry, and network data to detect targeted email attacks.

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

analyzing-malicious-url-with-urlscan

URLScan.io is a free service for scanning and analyzing suspicious URLs. It captures screenshots, DOM content, HTTP transactions, JavaScript behavior, and network connections of web pages in an isolat

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

implementing-zero-standing-privilege-with-cyberark

Deploy CyberArk Secure Cloud Access to eliminate standing privileges in hybrid and multi-cloud environments using just-in-time access with time, entitlement, and approval controls.

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

implementing-pam-for-database-access

Deploy privileged access management for database systems including Oracle, SQL Server, PostgreSQL, and MySQL. Covers session proxy configuration, credential vaulting, query auditing, dynamic credentia

4,300 470
Explore
mukul975/Anthropic-Cybersecurity-Skills

detecting-t1003-credential-dumping-with-edr

Detect OS credential dumping techniques targeting LSASS memory, SAM database, NTDS.dit, and cached credentials using EDR telemetry, Sysmon process access monitoring, and Windows security event correlation.

4,300 470
Explore

Didn't find tool you were looking for?

Be as detailed as possible for better results