Avanzadomemory forensicsautomatizacionPythonYARApipelineSOC

Automatizacion de Memory Forensics: Scripts, Pipelines y YARA en Memoria

Automatizacion del analisis forense de memoria con scripts Python, Volatility 3 como libreria, VolShell interactivo, pipelines CI/CD de analisis automatico, YARA scanning de volcados de memoria e integracion de memory forensics en el workflow SOC.

MalwareIntel Research··10 min lectura
Serie: Memory Forensics — Parte 12

Por que automatizar el analisis de memoria

El analisis manual con Volatility 3, como hemos visto en los articulos anteriores, sigue un patron repetitivo: ejecutar pslist, pstree, netscan, cmdline, malfind, dlllist, handles, printkey. Para cada volcado. Cada vez. El analista repite los mismos plugins, busca los mismos patrones de anomalias, y aplica los mismos criterios de deteccion.

Esto grita automatizacion. Un script puede ejecutar todos los plugins en paralelo, aplicar reglas de deteccion de anomalias, generar un informe de triaje, y presentar al analista solo los hallazgos que requieren atencion humana. En un SOC que procesa multiples volcados de memoria por semana, la diferencia entre triaje manual (2 a 4 horas) y triaje automatizado (5 a 15 minutos) es la diferencia entre un equipo saturado y uno funcional.

Volatility 3 como libreria Python

Volatility 3 puede usarse como libreria Python, no solo como herramienta de linea de comandos. Esto permite construir scripts personalizados que ejecutan plugins, procesan resultados y generan informes programaticamente.

Estructura basica de un script

import volatility3
from volatility3.framework import contexts, automagic
from volatility3.framework.configuration import requirements
from volatility3.plugins.windows import pslist, pstree, netscan, malfind

def analyze_dump(dump_path):
    """Ejecuta analisis basico de un volcado de memoria."""
    # Crear el contexto
    ctx = contexts.Context()
    
    # Configurar la ruta del dump
    ctx.config["automagic.LayerStacker.single_location"] = (
        "file://" + dump_path
    )
    
    # Ejecutar automagic (detectar OS, cargar simbolos)
    automagics = automagic.available(ctx)
    automagic.run(automagics, ctx, pslist.PsList)
    
    # Ejecutar pslist
    plugin = pslist.PsList(ctx, "plugins.PsList")
    results = list(plugin.run())
    
    return results

Ejecucion de multiples plugins

def full_triage(dump_path):
    """Triaje completo: procesos, red, inyeccion."""
    ctx = setup_context(dump_path)
    
    results = dict(
        processes=run_plugin(ctx, pslist.PsList),
        tree=run_plugin(ctx, pstree.PsTree),
        network=run_plugin(ctx, netscan.NetScan),
        injections=run_plugin(ctx, malfind.Malfind),
    )
    
    return results

Procesamiento de resultados

Los resultados de los plugins son generadores de tuplas. Cada tupla contiene los campos del plugin:

def find_suspicious_processes(processes):
    """Detecta procesos con anomalias conocidas."""
    suspicious = []
    
    # Procesos que solo deben tener una instancia
    single_instance = ["lsass.exe", "services.exe", "wininit.exe"]
    
    # Contar instancias
    counts = dict()
    for proc in processes:
        name = proc.ImageFileName
        if name not in counts:
            counts[name] = 0
        counts[name] += 1
    
    # Detectar duplicados
    for name in single_instance:
        if counts.get(name, 0) > 1:
            suspicious.append(
                dict(
                    type="DUPLICATE_CRITICAL_PROCESS",
                    process=name,
                    count=counts[name],
                    severity="CRITICAL",
                )
            )
    
    return suspicious

VolShell: analisis interactivo

VolShell es el shell interactivo de Volatility. Permite explorar la memoria con comandos Python en un entorno REPL:

vol -f memory.raw windows.volshell

Dentro de VolShell:

# Listar procesos
for proc in self.list_processes():
    print(f"PID={proc.UniqueProcessId} Name={proc.ImageFileName}")

# Leer memoria de un proceso
proc = self.process_from_pid(5700)
data = proc.read(0x00400000, 256)

# Desensamblar
self.disassemble(0x00400000, count=20)

# Buscar strings
self.context.layers["memory_layer"].scan(
    scanner=volatility3.framework.scanners.RegExScanner(
        b"https?://[a-zA-Z0-9./]+"
    )
)

VolShell es ideal para:

  • Investigaciones ad-hoc que no siguen un patron predecible.
  • Verificar hipotesis rapidas sobre una region de memoria.
  • Aprender como Volatility accede a las estructuras internas.
  • Prototyping de logica que luego se integrara en un script.

YARA scanning de volcados de memoria

Plugin yarascan

vol -f memory.raw yarascan.YaraScan --yara-file /path/to/rules.yar

yarascan aplica reglas YARA contra el contenido de la memoria, mapeando los hallazgos a procesos especificos:

Rule: CobaltStrike_Beacon
Owner: rundll32.exe (PID 5800)
Address: 0x000001f0a500

Reglas YARA para memoria

Las reglas YARA disenadas para analisis de memoria difieren de las de disco. En memoria, el malware esta desempaquetado y ejecutandose, lo que expone strings y patrones que en disco estan ofuscados:

rule Cobalt_Strike_Beacon_Memory
{
    meta:
        description = "Detecta Cobalt Strike beacon en memoria"
        author = "MalwareIntel Research"
        
    strings:
        $config1 = "%.4x%.4x%.4x%.4x%.4x%.4x%.4x%.4x"
        $config2 = "%s (admin)" wide
        $config3 = "beacon.dll" ascii
        $pipe = "\\\\.\\pipe\\" ascii
        $sleep = "sleeptime" ascii
        
    condition:
        3 of them
}
rule Generic_Shellcode_x64
{
    meta:
        description = "Shellcode x64 generico en memoria"
        
    strings:
        $prologue1 = { fc 48 83 e4 f0 }
        $prologue2 = { 48 31 c9 65 48 8b }
        $api_hash = { 41 51 41 50 52 51 56 48 31 d2 65 }
        
    condition:
        any of them
}

Repositorios de reglas YARA para memoria

  • YARA-Rules: https://github.com/Yara-Rules/rules. Coleccion comunitaria amplia.
  • Elastic YARA Rules: reglas de deteccion del equipo de Elastic Security.
  • Malpedia YARA: reglas asociadas a familias documentadas en Malpedia.
  • Florian Roth's signature-base: https://github.com/Neo23x0/signature-base. Una de las colecciones mas completas.
  • CAPE Community: reglas extraidas del sandbox CAPE.

Script de triaje automatizado

Un script de triaje que ejecuta los analisis mas importantes y genera un informe:

#!/usr/bin/env python3
"""
memory_triage.py - Triaje automatizado de volcados de memoria
Ejecuta plugins clave de Volatility 3 y genera informe de hallazgos
"""

import sys
import json
import hashlib
import subprocess
from pathlib import Path
from datetime import datetime


def run_volatility(dump_path, plugin, extra_args=None):
    """Ejecuta un plugin de Volatility y retorna la salida."""
    cmd = ["vol", "-f", str(dump_path), "-r", "json", plugin]
    if extra_args:
        cmd.extend(extra_args)
    
    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        timeout=600,
    )
    
    if result.returncode == 0:
        try:
            return json.loads(result.stdout)
        except json.JSONDecodeError:
            return None
    return None


def check_process_anomalies(processes):
    """Detecta anomalias en la lista de procesos."""
    findings = []
    
    # Procesos criticos y sus padres esperados
    expected_parents = dict(
        services_exe="wininit.exe",
        lsass_exe="wininit.exe",
        svchost_exe="services.exe",
    )
    
    # Procesos que solo deben existir una vez
    single_instance = [
        "lsass.exe", "services.exe", 
        "wininit.exe", "csrss.exe",
    ]
    
    name_counts = dict()
    for proc in processes:
        name = proc.get("ImageFileName", "")
        name_counts[name] = name_counts.get(name, 0) + 1
    
    # Detectar duplicados criticos
    if name_counts.get("lsass.exe", 0) > 1:
        findings.append(dict(
            severity="CRITICAL",
            type="DUPLICATE_LSASS",
            detail=f"lsass.exe aparece {name_counts['lsass.exe']} veces",
        ))
    
    return findings


def check_network_anomalies(connections):
    """Detecta conexiones de red sospechosas."""
    findings = []
    
    suspicious_processes = [
        "powershell.exe", "cmd.exe", "rundll32.exe",
        "regsvr32.exe", "mshta.exe", "wscript.exe",
        "cscript.exe", "certutil.exe",
    ]
    
    for conn in connections:
        owner = conn.get("Owner", "")
        state = conn.get("State", "")
        foreign = conn.get("ForeignAddr", "")
        
        # Proceso sospechoso con conexion externa
        if (owner.lower() in suspicious_processes 
                and state == "ESTABLISHED"
                and not foreign.startswith(("10.", "192.168.", "172."))):
            findings.append(dict(
                severity="HIGH",
                type="SUSPICIOUS_OUTBOUND",
                detail=f"{owner} conectado a {foreign}",
                pid=conn.get("PID"),
            ))
    
    return findings


def generate_report(dump_path, all_findings):
    """Genera informe de triaje."""
    report = dict(
        timestamp=datetime.utcnow().isoformat(),
        dump_file=str(dump_path),
        dump_sha256=hashlib.sha256(
            Path(dump_path).read_bytes()
        ).hexdigest() if Path(dump_path).stat().st_size < 1e9 else "TOO_LARGE",
        total_findings=len(all_findings),
        critical=len([f for f in all_findings if f["severity"] == "CRITICAL"]),
        high=len([f for f in all_findings if f["severity"] == "HIGH"]),
        findings=all_findings,
    )
    
    return report


def main():
    dump_path = sys.argv[1]
    
    print(f"[*] Triaje de: {dump_path}")
    print("[*] Ejecutando plugins...")
    
    # Ejecutar plugins en secuencia
    processes = run_volatility(dump_path, "windows.pslist")
    network = run_volatility(dump_path, "windows.netscan")
    injections = run_volatility(dump_path, "windows.malfind")
    
    # Analizar resultados
    findings = []
    
    if processes:
        findings.extend(check_process_anomalies(processes))
    
    if network:
        findings.extend(check_network_anomalies(network))
    
    if injections:
        for inj in injections:
            findings.append(dict(
                severity="HIGH",
                type="CODE_INJECTION",
                detail=f"Codigo inyectado en {inj.get('Process')} (PID {inj.get('PID')})",
                pid=inj.get("PID"),
            ))
    
    # Generar informe
    report = generate_report(dump_path, findings)
    
    # Salida
    output_file = f"triage_{Path(dump_path).stem}.json"
    with open(output_file, "w") as f:
        json.dump(report, f, indent=2)
    
    print(f"[*] Informe generado: {output_file}")
    print(f"[*] Hallazgos: {report['critical']} CRITICAL, {report['high']} HIGH")
    
    return 1 if report["critical"] > 0 else 0


if __name__ == "__main__":
    sys.exit(main())

Pipeline CI/CD de analisis

En un SOC maduro, los volcados de memoria pueden procesarse automaticamente cuando se depositan en un almacenamiento compartido:

Arquitectura del pipeline

[Adquisicion]
     |
     v
[Storage] -- volcado depositado en bucket S3/MinIO
     |
     v
[Trigger] -- watcher detecta nuevo fichero
     |
     v
[Triaje automatico]
  |-- volatility plugins basicos
  |-- YARA scanning
  |-- deteccion de anomalias
     |
     v
[Informe de triaje] -- JSON con hallazgos
     |
     v
[Notificacion] -- alerta al analista si hay hallazgos CRITICAL/HIGH
     |
     v
[Analisis manual] -- analista profundiza en hallazgos

Implementacion con scripts

Un watcher simple que monitoriza un directorio:

#!/usr/bin/env python3
"""
watch_dumps.py - Monitoriza directorio de volcados y lanza triaje
"""

import time
import subprocess
from pathlib import Path

WATCH_DIR = Path("/data/memory-dumps/incoming")
PROCESSED_DIR = Path("/data/memory-dumps/processed")
REPORTS_DIR = Path("/data/memory-dumps/reports")


def process_dump(dump_path):
    """Ejecuta el triaje automatizado."""
    print(f"[+] Procesando: {dump_path.name}")
    
    result = subprocess.run(
        ["python3", "memory_triage.py", str(dump_path)],
        capture_output=True,
        text=True,
        timeout=3600,
    )
    
    if result.returncode != 0:
        send_alert(
            f"CRITICAL findings in {dump_path.name}"
        )
    
    # Mover a procesado
    dump_path.rename(PROCESSED_DIR / dump_path.name)


def send_alert(message):
    """Envia alerta via webhook."""
    # Integrar con Slack, Teams, Telegram, etc.
    print(f"[ALERT] {message}")


def main():
    WATCH_DIR.mkdir(parents=True, exist_ok=True)
    PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
    
    processed = set()
    
    while True:
        for dump_file in WATCH_DIR.glob("*.raw"):
            if dump_file.name not in processed:
                process_dump(dump_file)
                processed.add(dump_file.name)
        
        for dump_file in WATCH_DIR.glob("*.lime"):
            if dump_file.name not in processed:
                process_dump(dump_file)
                processed.add(dump_file.name)
        
        time.sleep(30)


if __name__ == "__main__":
    main()

Integracion con el workflow SOC

Nivel 1: triaje automatico

El pipeline automatizado ejecuta el triaje y genera un informe con hallazgos clasificados por severidad. El analista SOC N1 recibe la notificacion y revisa el informe. Si hay hallazgos CRITICAL o HIGH, escala a N2.

Nivel 2: analisis manual guiado

El analista N2 usa el informe de triaje como punto de partida. En lugar de ejecutar todos los plugins manualmente, se concentra en los hallazgos que el triaje marco como sospechosos. Profundiza con plugins adicionales, extrae artefactos y construye la timeline.

Nivel 3: analisis profundo

El analista N3 / forensic analyst realiza analisis de rootkits, reverse engineering de codigo extraido, analisis de configuracion de C2, y correlacion con inteligencia de amenazas. Produce el informe forense completo con IOCs y recomendaciones.

Feedback loop

Los IOCs extraidos del analisis de memoria alimentan:

  • Las reglas YARA del pipeline (para detectar la misma amenaza en futuros volcados).
  • El SIEM (para buscar retroactivamente en logs historicos).
  • El EDR (para buscar en otros endpoints).
  • La plataforma de threat intelligence (para compartir con la comunidad via STIX/TAXII).

Herramientas complementarias

Orochi

Orochi es una plataforma web colaborativa para analisis de memoria basada en Volatility 3. Permite:

  • Subir volcados de memoria y ejecutar plugins desde la interfaz web.
  • Compartir resultados entre analistas.
  • Mantener un historial de analisis.
  • Ejecutar YARA scans desde la interfaz.

URL: https://github.com/LDO-CERT/orochi

VolWeb

Interfaz web para Volatility con visualizaciones interactivas del arbol de procesos, conexiones de red y timeline.

MemProcFS

MemProcFS (Memory Process File System) monta un volcado de memoria como un sistema de ficheros virtual. Los procesos aparecen como carpetas, las DLLs como ficheros, y la informacion de red como ficheros de texto. Esto permite usar herramientas estandar de analisis de ficheros sobre la memoria.

./memprocfs -device memory.raw -mount /mnt/memfs
ls /mnt/memfs/pid/5700/
# cmdline.txt  dlls/  handles/  maps/  mem/  threads/
cat /mnt/memfs/pid/5700/cmdline.txt

AVML + Volatility en Docker

Para estandarizar el entorno de analisis:

FROM python:3.12-slim
RUN pip install volatility3 yara-python pycryptodome
RUN mkdir /symbols /dumps /reports
COPY rules/ /rules/
COPY scripts/ /scripts/
ENTRYPOINT ["python3", "/scripts/memory_triage.py"]
docker run -v /data/dumps:/dumps -v /data/reports:/reports \
  memory-triage /dumps/WKSTN-FIN-042.raw

Consideraciones de rendimiento

Paralelizacion de plugins

Los plugins de Volatility son independientes entre si. Pueden ejecutarse en paralelo:

from concurrent.futures import ProcessPoolExecutor

plugins_to_run = [
    "windows.pslist",
    "windows.netscan",
    "windows.malfind",
    "windows.cmdline",
    "windows.dlllist",
]

with ProcessPoolExecutor(max_workers=4) as executor:
    futures = dict()
    for plugin in plugins_to_run:
        future = executor.submit(
            run_volatility, dump_path, plugin
        )
        futures[plugin] = future
    
    results = dict()
    for plugin, future in futures.items():
        results[plugin] = future.result()

Almacenamiento

Los volcados de memoria son ficheros grandes. Consideraciones:

  • Almacenamiento SSD para el dump activo (acelera el analisis significativamente).
  • Compresion (zstd, lz4) para almacenamiento a largo plazo.
  • Politica de retencion: cuanto tiempo conservar los dumps despues del analisis.

Cache de simbolos

Las tablas de simbolos de Windows se descargan la primera vez. Cachear en un directorio compartido evita descargas repetidas:

export VOLATILITY_SYMBOLS=/shared/volatility-symbols/

Conclusion de la serie

A lo largo de 12 articulos hemos cubierto el ciclo completo del analisis forense de memoria:

  1. Conceptos fundamentales y por que la memoria es evidencia critica.
  2. Herramientas y tecnicas de adquisicion para Windows y Linux.
  3. Instalacion y configuracion de Volatility 3.
  4. Analisis de procesos y deteccion de anomalias.
  5. DLLs, handles y tecnicas de inyeccion.
  6. Artefactos de red y deteccion de comunicaciones C2.
  7. malfind y deteccion de codigo inyectado.
  8. Registro de Windows en memoria.
  9. Deteccion de rootkits de kernel.
  10. Analisis de memoria Linux.
  11. Caso de estudio completo.
  12. Automatizacion y pipelines.

La memoria RAM es el campo de batalla donde el malware vive. Dominar su analisis es una de las habilidades mas valiosas que un profesional de seguridad puede desarrollar. Las herramientas son open source y los volcados de practica estan disponibles en CTFs y labs como MemLabs. La inversion es tiempo y practica.

Preguntas frecuentes

Artículos relacionados

Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.