IntermedioPythonautomatizaciónpefilescriptsVirusTotal APItriage

Automatización de Análisis de Malware con Python: Scripts Prácticos

Scripts Python prácticos para automatizar análisis de malware. Parsing de PE/ELF con pefile/lief, extracción de strings e IOCs, cálculo de entropía, integración con VirusTotal API, YARA scanning automatizado, y pipeline de triage completo.

MalwareIntel Research··8 min lectura
Serie: Entornos de Análisis — Parte 14

Python como lenguaje del analista

Python es el lenguaje de scripting estándar en análisis de malware. Las razones son prácticas: librerías maduras para parsing de formatos binarios, APIs de servicios de threat intelligence, y la capacidad de automatizar tareas repetitivas de triage que consumen horas si se hacen manualmente.

Setup

pip install pefile lief yara-python ssdeep capstone requests
pip install vt-py   # VirusTotal API v3
pip install oletools  # Documentos Office
pip install dnfile    # .NET assemblies

Scripts prácticos

Script 1: triage automático de PE

#!/usr/bin/env python3
"""Triage automatico de archivo PE."""
import pefile
import hashlib
import math
import sys
import os
from collections import Counter
from datetime import datetime

def calculate_entropy(data):
    if not data:
        return 0
    counter = Counter(data)
    length = len(data)
    entropy = 0
    for count in counter.values():
        p = count / length
        entropy -= p * math.log2(p)
    return entropy

def analyze_pe(filepath):
    with open(filepath, 'rb') as f:
        data = f.read()
    
    # Hashes
    print("=== HASHES ===")
    print(f"MD5:    {hashlib.md5(data).hexdigest()}")
    print(f"SHA1:   {hashlib.sha1(data).hexdigest()}")
    print(f"SHA256: {hashlib.sha256(data).hexdigest()}")
    print(f"Size:   {len(data)} bytes")
    
    try:
        pe = pefile.PE(data=data)
    except pefile.PEFormatError:
        print("ERROR: Not a valid PE file")
        return
    
    # Headers
    print("\n=== PE HEADERS ===")
    timestamp = pe.FILE_HEADER.TimeDateStamp
    compile_time = datetime.utcfromtimestamp(timestamp)
    print(f"Compile time:  {compile_time} UTC")
    print(f"Entry point:   {hex(pe.OPTIONAL_HEADER.AddressOfEntryPoint)}")
    print(f"Image base:    {hex(pe.OPTIONAL_HEADER.ImageBase)}")
    machine = {0x14c: "x86", 0x8664: "x64", 0xaa64: "ARM64"}.get(
        pe.FILE_HEADER.Machine, hex(pe.FILE_HEADER.Machine))
    print(f"Machine:       {machine}")
    
    # Secciones con entropia
    print("\n=== SECTIONS ===")
    print(f"{'Name':<10} {'VSize':>10} {'RSize':>10} {'Entropy':>8} {'Flags'}")
    for section in pe.sections:
        name = section.Name.decode('utf-8', errors='replace').rstrip('\x00')
        entropy = section.get_entropy()
        flags = []
        if section.Characteristics & 0x20000000: flags.append('X')
        if section.Characteristics & 0x40000000: flags.append('R')
        if section.Characteristics & 0x80000000: flags.append('W')
        flag_str = ''.join(flags)
        suspicious = " [!HIGH ENTROPY]" if entropy > 7.0 else ""
        suspicious += " [!WRITABLE CODE]" if 'X' in flag_str and 'W' in flag_str else ""
        print(f"{name:<10} {section.Misc_VirtualSize:>10} {section.SizeOfRawData:>10} "
              f"{entropy:>8.2f} {flag_str}{suspicious}")
    
    # Imports
    print("\n=== IMPORTS ===")
    suspicious_apis = {
        'CreateRemoteThread': 'CODE INJECTION',
        'VirtualAllocEx': 'REMOTE MEMORY ALLOC',
        'WriteProcessMemory': 'REMOTE MEMORY WRITE',
        'NtUnmapViewOfSection': 'PROCESS HOLLOWING',
        'SetWindowsHookEx': 'KEYLOGGER/HOOK',
        'URLDownloadToFile': 'FILE DOWNLOAD',
        'CryptEncrypt': 'ENCRYPTION',
        'AdjustTokenPrivileges': 'PRIVILEGE MANIPULATION',
        'IsDebuggerPresent': 'ANTI-DEBUG',
        'GetTickCount': 'ANTI-SANDBOX (timing)',
    }
    
    if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
        for entry in pe.DIRECTORY_ENTRY_IMPORT:
            dll = entry.dll.decode('utf-8', errors='replace')
            for imp in entry.imports:
                if imp.name:
                    name = imp.name.decode('utf-8', errors='replace')
                    if name in suspicious_apis:
                        print(f"  [!] {dll}::{name} - {suspicious_apis[name]}")
    else:
        print("  No imports (packed or statically linked)")
    
    # PDB path
    if hasattr(pe, 'DIRECTORY_ENTRY_DEBUG'):
        for dbg in pe.DIRECTORY_ENTRY_DEBUG:
            if hasattr(dbg.entry, 'PdbFileName'):
                pdb = dbg.entry.PdbFileName.decode('utf-8', errors='replace')
                print(f"\n=== PDB PATH ===\n  {pdb}")
    
    # Verdicts
    print("\n=== VERDICTS ===")
    total_entropy = calculate_entropy(data)
    if total_entropy > 7.0:
        print("  [!] HIGH ENTROPY: Likely packed/encrypted")
    if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT') and len(pe.DIRECTORY_ENTRY_IMPORT) < 3:
        print("  [!] FEW IMPORTS: Likely packed or uses dynamic resolution")
    if timestamp == 0:
        print("  [!] ZEROED TIMESTAMP: Compilation time removed")
    if compile_time.year > 2030 or compile_time.year < 2000:
        print("  [!] SUSPICIOUS TIMESTAMP: Likely tampered")

if __name__ == '__main__':
    if len(sys.argv) != 2:
        print(f"Usage: {sys.argv[0]} <pe_file>")
        sys.exit(1)
    analyze_pe(sys.argv[1])

Script 2: extracción de IOCs

#!/usr/bin/env python3
"""Extraer IOCs de un archivo binario."""
import re
import sys

def extract_iocs(filepath):
    with open(filepath, 'rb') as f:
        data = f.read()
    
    # Extraer strings ASCII (min 6 chars)
    ascii_strings = re.findall(b'[\x20-\x7e]{6,}', data)
    text = b'\n'.join(ascii_strings).decode('utf-8', errors='replace')
    
    iocs = {'urls': set(), 'ips': set(), 'emails': set(), 
            'domains': set(), 'registry': set(), 'files': set(),
            'mutexes': set(), 'bitcoin': set()}
    
    # URLs
    for url in re.findall(r'https?://[a-zA-Z0-9\.\-/\?=&_%#@!]+', text):
        iocs['urls'].add(url)
    
    # IPs (IPv4)
    for ip in re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', text):
        octets = [int(o) for o in ip.split('.')]
        if all(0 <= o <= 255 for o in octets) and not ip.startswith('0.') and ip != '127.0.0.1':
            iocs['ips'].add(ip)
    
    # Emails
    for email in re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', text):
        iocs['emails'].add(email)
    
    # Registry keys
    for key in re.findall(r'(?:HKLM|HKCU|HKCR)\\[A-Za-z0-9\\_ -]+', text):
        iocs['registry'].add(key)
    
    # File paths
    for path in re.findall(r'[A-Z]:\\[A-Za-z0-9\\\._ -]+', text):
        iocs['files'].add(path)
    
    # Bitcoin addresses
    for btc in re.findall(r'\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b', text):
        iocs['bitcoin'].add(btc)
    
    # Print results
    for ioc_type, values in iocs.items():
        if values:
            print(f"\n=== {ioc_type.upper()} ({len(values)}) ===")
            for v in sorted(values):
                print(f"  {v}")

if __name__ == '__main__':
    extract_iocs(sys.argv[1])

Script 3: VirusTotal lookup

#!/usr/bin/env python3
"""Lookup de hash en VirusTotal."""
import vt
import hashlib
import sys
import os

API_KEY = os.environ.get('VT_API_KEY', 'YOUR_API_KEY')

def vt_lookup(filepath):
    with open(filepath, 'rb') as f:
        sha256 = hashlib.sha256(f.read()).hexdigest()
    
    print(f"SHA256: {sha256}")
    print(f"Looking up on VirusTotal...")
    
    client = vt.Client(API_KEY)
    try:
        file_info = client.get_object(f"/files/{sha256}")
        stats = file_info.last_analysis_stats
        total = sum(stats.values())
        malicious = stats.get('malicious', 0)
        
        print(f"\nDetections: {malicious}/{total}")
        print(f"First seen: {file_info.first_submission_date}")
        
        if hasattr(file_info, 'popular_threat_classification'):
            label = file_info.popular_threat_classification.get('suggested_threat_label', 'N/A')
            print(f"Classification: {label}")
        
        print(f"\nTop detections:")
        for engine, result in file_info.last_analysis_results.items():
            if result['category'] == 'malicious':
                print(f"  {engine}: {result['result']}")
                
    except vt.error.APIError as e:
        if 'NotFoundError' in str(e):
            print("Not found in VirusTotal (unknown sample)")
        else:
            print(f"API Error: {e}")
    finally:
        client.close()

if __name__ == '__main__':
    vt_lookup(sys.argv[1])

Script 4: YARA scanner batch

#!/usr/bin/env python3
"""Escanear directorio con reglas YARA."""
import yara
import os
import sys

def scan_directory(rules_path, target_dir):
    # Compilar reglas
    if os.path.isdir(rules_path):
        rule_files = {}
        for f in os.listdir(rules_path):
            if f.endswith('.yar') or f.endswith('.yara'):
                rule_files[f] = os.path.join(rules_path, f)
        rules = yara.compile(filepaths=rule_files)
    else:
        rules = yara.compile(filepath=rules_path)
    
    print(f"Scanning {target_dir} with YARA rules...")
    matches_found = 0
    
    for root, dirs, files in os.walk(target_dir):
        for filename in files:
            filepath = os.path.join(root, filename)
            try:
                matches = rules.match(filepath, timeout=30)
                if matches:
                    matches_found += 1
                    print(f"\n[MATCH] {filepath}")
                    for match in matches:
                        print(f"  Rule: {match.rule}")
                        for s in match.strings[:5]:  # Max 5 strings
                            print(f"    String: {s}")
            except yara.Error as e:
                pass  # Skip files that can't be scanned
    
    print(f"\nTotal matches: {matches_found}")

if __name__ == '__main__':
    if len(sys.argv) != 3:
        print(f"Usage: {sys.argv[0]} <rules_path> <target_dir>")
        sys.exit(1)
    scan_directory(sys.argv[1], sys.argv[2])

Script 5: pipeline de triage completo

#!/usr/bin/env python3
"""Pipeline completo de triage de malware."""
import hashlib
import math
import os
import sys
import json
from collections import Counter
from datetime import datetime

try:
    import pefile
    import yara
    import requests
except ImportError as e:
    print(f"Missing dependency: {e}. Install with pip.")
    sys.exit(1)

class MalwareTriage:
    def __init__(self, filepath):
        self.filepath = filepath
        self.filename = os.path.basename(filepath)
        with open(filepath, 'rb') as f:
            self.data = f.read()
        self.results = {
            'filename': self.filename,
            'size': len(self.data),
            'hashes': {},
            'file_type': '',
            'pe_info': {},
            'iocs': {},
            'verdicts': [],
            'risk_score': 0
        }
    
    def calculate_hashes(self):
        self.results['hashes'] = {
            'md5': hashlib.md5(self.data).hexdigest(),
            'sha1': hashlib.sha1(self.data).hexdigest(),
            'sha256': hashlib.sha256(self.data).hexdigest()
        }
    
    def calculate_entropy(self, data=None):
        if data is None:
            data = self.data
        if not data:
            return 0
        counter = Counter(data)
        length = len(data)
        return -sum((c/length) * math.log2(c/length) for c in counter.values())
    
    def identify_type(self):
        if self.data[:2] == b'MZ':
            self.results['file_type'] = 'PE'
        elif self.data[:4] == b'\x7fELF':
            self.results['file_type'] = 'ELF'
        elif self.data[:4] == b'%PDF':
            self.results['file_type'] = 'PDF'
        elif self.data[:8] == b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1':
            self.results['file_type'] = 'OLE (Office)'
        elif self.data[:2] == b'PK':
            self.results['file_type'] = 'ZIP/OOXML'
        else:
            self.results['file_type'] = 'Unknown'
    
    def analyze_pe(self):
        if self.results['file_type'] != 'PE':
            return
        try:
            pe = pefile.PE(data=self.data)
        except:
            return
        
        info = {}
        info['compile_time'] = str(datetime.utcfromtimestamp(pe.FILE_HEADER.TimeDateStamp))
        info['entry_point'] = hex(pe.OPTIONAL_HEADER.AddressOfEntryPoint)
        info['sections'] = []
        
        for s in pe.sections:
            name = s.Name.decode('utf-8', errors='replace').rstrip('\x00')
            entropy = s.get_entropy()
            info['sections'].append({
                'name': name, 'entropy': round(entropy, 2),
                'raw_size': s.SizeOfRawData, 'virtual_size': s.Misc_VirtualSize
            })
            if entropy > 7.0:
                self.results['verdicts'].append(f"High entropy section: {name} ({entropy:.2f})")
                self.results['risk_score'] += 20
        
        # Imports
        suspicious_imports = []
        injection_apis = ['CreateRemoteThread', 'VirtualAllocEx', 'WriteProcessMemory',
                         'NtUnmapViewOfSection', 'QueueUserAPC']
        if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
            info['import_count'] = sum(len(e.imports) for e in pe.DIRECTORY_ENTRY_IMPORT)
            for entry in pe.DIRECTORY_ENTRY_IMPORT:
                for imp in entry.imports:
                    if imp.name and imp.name.decode('utf-8', 'replace') in injection_apis:
                        suspicious_imports.append(imp.name.decode())
            if info['import_count'] < 5:
                self.results['verdicts'].append("Very few imports (possible packing)")
                self.results['risk_score'] += 15
        
        if suspicious_imports:
            info['suspicious_imports'] = suspicious_imports
            self.results['verdicts'].append(f"Injection APIs: {', '.join(suspicious_imports)}")
            self.results['risk_score'] += 30
        
        self.results['pe_info'] = info
    
    def extract_iocs(self):
        import re
        strings = re.findall(b'[\x20-\x7e]{6,}', self.data)
        text = b'\n'.join(strings).decode('utf-8', errors='replace')
        
        self.results['iocs'] = {
            'urls': list(set(re.findall(r'https?://[^\s<>"\']+', text)))[:20],
            'ips': list(set(ip for ip in re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', text)
                          if not ip.startswith(('0.', '127.', '255.'))))[:20],
            'emails': list(set(re.findall(r'[\w.+-]+@[\w-]+\.[\w.]+', text)))[:10]
        }
        
        if self.results['iocs']['urls']:
            self.results['risk_score'] += 10
        if self.results['iocs']['ips']:
            self.results['risk_score'] += 10
    
    def run(self):
        self.calculate_hashes()
        self.identify_type()
        self.analyze_pe()
        self.extract_iocs()
        
        # Overall entropy
        total_entropy = self.calculate_entropy()
        self.results['total_entropy'] = round(total_entropy, 2)
        if total_entropy > 7.0:
            self.results['risk_score'] += 15
        
        # Risk classification
        score = self.results['risk_score']
        if score >= 50:
            self.results['classification'] = 'HIGH RISK - Likely malicious'
        elif score >= 25:
            self.results['classification'] = 'MEDIUM RISK - Suspicious'
        else:
            self.results['classification'] = 'LOW RISK - Possibly benign'
        
        return self.results

if __name__ == '__main__':
    if len(sys.argv) != 2:
        print(f"Usage: {sys.argv[0]} <file>")
        sys.exit(1)
    
    triage = MalwareTriage(sys.argv[1])
    results = triage.run()
    print(json.dumps(results, indent=2))

Librerías útiles

Libreríapip installUso
pefilepefilePE parsing completo
liefliefPE/ELF/MachO parsing
yara-pythonyara-pythonYARA scanning
capstonecapstoneDisassembly (x86, ARM, MIPS)
unicornunicornCPU emulation
vt-pyvt-pyVirusTotal API v3
oletoolsoletoolsOffice document analysis
dnfilednfile.NET assembly parsing
ssdeepssdeepFuzzy hashing
pycryptodomexpycryptodomexCrypto (descifrar configs)
requestsrequestsHTTP (APIs de threat intel)
scapyscapyNetwork packet analysis

Fuentes y referencias

Preguntas frecuentes

Artículos relacionados

Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.