Automatización de Análisis de Malware con Python: Scripts Prácticos
Scripts Python prácticos para automatizar análisis de malware. Parsing de PE/ELF con pefile/lief, extracción de strings e IOCs, cálculo de entropía, integración con VirusTotal API, YARA scanning automatizado, y pipeline de triage completo.
Python como lenguaje del analista
Python es el lenguaje de scripting estándar en análisis de malware. Las razones son prácticas: librerías maduras para parsing de formatos binarios, APIs de servicios de threat intelligence, y la capacidad de automatizar tareas repetitivas de triage que consumen horas si se hacen manualmente.
Setup
pip install pefile lief yara-python ssdeep capstone requests
pip install vt-py # VirusTotal API v3
pip install oletools # Documentos Office
pip install dnfile # .NET assemblies
Scripts prácticos
Script 1: triage automático de PE
#!/usr/bin/env python3
"""Triage automatico de archivo PE."""
import pefile
import hashlib
import math
import sys
import os
from collections import Counter
from datetime import datetime
def calculate_entropy(data):
if not data:
return 0
counter = Counter(data)
length = len(data)
entropy = 0
for count in counter.values():
p = count / length
entropy -= p * math.log2(p)
return entropy
def analyze_pe(filepath):
with open(filepath, 'rb') as f:
data = f.read()
# Hashes
print("=== HASHES ===")
print(f"MD5: {hashlib.md5(data).hexdigest()}")
print(f"SHA1: {hashlib.sha1(data).hexdigest()}")
print(f"SHA256: {hashlib.sha256(data).hexdigest()}")
print(f"Size: {len(data)} bytes")
try:
pe = pefile.PE(data=data)
except pefile.PEFormatError:
print("ERROR: Not a valid PE file")
return
# Headers
print("\n=== PE HEADERS ===")
timestamp = pe.FILE_HEADER.TimeDateStamp
compile_time = datetime.utcfromtimestamp(timestamp)
print(f"Compile time: {compile_time} UTC")
print(f"Entry point: {hex(pe.OPTIONAL_HEADER.AddressOfEntryPoint)}")
print(f"Image base: {hex(pe.OPTIONAL_HEADER.ImageBase)}")
machine = {0x14c: "x86", 0x8664: "x64", 0xaa64: "ARM64"}.get(
pe.FILE_HEADER.Machine, hex(pe.FILE_HEADER.Machine))
print(f"Machine: {machine}")
# Secciones con entropia
print("\n=== SECTIONS ===")
print(f"{'Name':<10} {'VSize':>10} {'RSize':>10} {'Entropy':>8} {'Flags'}")
for section in pe.sections:
name = section.Name.decode('utf-8', errors='replace').rstrip('\x00')
entropy = section.get_entropy()
flags = []
if section.Characteristics & 0x20000000: flags.append('X')
if section.Characteristics & 0x40000000: flags.append('R')
if section.Characteristics & 0x80000000: flags.append('W')
flag_str = ''.join(flags)
suspicious = " [!HIGH ENTROPY]" if entropy > 7.0 else ""
suspicious += " [!WRITABLE CODE]" if 'X' in flag_str and 'W' in flag_str else ""
print(f"{name:<10} {section.Misc_VirtualSize:>10} {section.SizeOfRawData:>10} "
f"{entropy:>8.2f} {flag_str}{suspicious}")
# Imports
print("\n=== IMPORTS ===")
suspicious_apis = {
'CreateRemoteThread': 'CODE INJECTION',
'VirtualAllocEx': 'REMOTE MEMORY ALLOC',
'WriteProcessMemory': 'REMOTE MEMORY WRITE',
'NtUnmapViewOfSection': 'PROCESS HOLLOWING',
'SetWindowsHookEx': 'KEYLOGGER/HOOK',
'URLDownloadToFile': 'FILE DOWNLOAD',
'CryptEncrypt': 'ENCRYPTION',
'AdjustTokenPrivileges': 'PRIVILEGE MANIPULATION',
'IsDebuggerPresent': 'ANTI-DEBUG',
'GetTickCount': 'ANTI-SANDBOX (timing)',
}
if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dll = entry.dll.decode('utf-8', errors='replace')
for imp in entry.imports:
if imp.name:
name = imp.name.decode('utf-8', errors='replace')
if name in suspicious_apis:
print(f" [!] {dll}::{name} - {suspicious_apis[name]}")
else:
print(" No imports (packed or statically linked)")
# PDB path
if hasattr(pe, 'DIRECTORY_ENTRY_DEBUG'):
for dbg in pe.DIRECTORY_ENTRY_DEBUG:
if hasattr(dbg.entry, 'PdbFileName'):
pdb = dbg.entry.PdbFileName.decode('utf-8', errors='replace')
print(f"\n=== PDB PATH ===\n {pdb}")
# Verdicts
print("\n=== VERDICTS ===")
total_entropy = calculate_entropy(data)
if total_entropy > 7.0:
print(" [!] HIGH ENTROPY: Likely packed/encrypted")
if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT') and len(pe.DIRECTORY_ENTRY_IMPORT) < 3:
print(" [!] FEW IMPORTS: Likely packed or uses dynamic resolution")
if timestamp == 0:
print(" [!] ZEROED TIMESTAMP: Compilation time removed")
if compile_time.year > 2030 or compile_time.year < 2000:
print(" [!] SUSPICIOUS TIMESTAMP: Likely tampered")
if __name__ == '__main__':
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <pe_file>")
sys.exit(1)
analyze_pe(sys.argv[1])
Script 2: extracción de IOCs
#!/usr/bin/env python3
"""Extraer IOCs de un archivo binario."""
import re
import sys
def extract_iocs(filepath):
with open(filepath, 'rb') as f:
data = f.read()
# Extraer strings ASCII (min 6 chars)
ascii_strings = re.findall(b'[\x20-\x7e]{6,}', data)
text = b'\n'.join(ascii_strings).decode('utf-8', errors='replace')
iocs = {'urls': set(), 'ips': set(), 'emails': set(),
'domains': set(), 'registry': set(), 'files': set(),
'mutexes': set(), 'bitcoin': set()}
# URLs
for url in re.findall(r'https?://[a-zA-Z0-9\.\-/\?=&_%#@!]+', text):
iocs['urls'].add(url)
# IPs (IPv4)
for ip in re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', text):
octets = [int(o) for o in ip.split('.')]
if all(0 <= o <= 255 for o in octets) and not ip.startswith('0.') and ip != '127.0.0.1':
iocs['ips'].add(ip)
# Emails
for email in re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', text):
iocs['emails'].add(email)
# Registry keys
for key in re.findall(r'(?:HKLM|HKCU|HKCR)\\[A-Za-z0-9\\_ -]+', text):
iocs['registry'].add(key)
# File paths
for path in re.findall(r'[A-Z]:\\[A-Za-z0-9\\\._ -]+', text):
iocs['files'].add(path)
# Bitcoin addresses
for btc in re.findall(r'\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b', text):
iocs['bitcoin'].add(btc)
# Print results
for ioc_type, values in iocs.items():
if values:
print(f"\n=== {ioc_type.upper()} ({len(values)}) ===")
for v in sorted(values):
print(f" {v}")
if __name__ == '__main__':
extract_iocs(sys.argv[1])
Script 3: VirusTotal lookup
#!/usr/bin/env python3
"""Lookup de hash en VirusTotal."""
import vt
import hashlib
import sys
import os
API_KEY = os.environ.get('VT_API_KEY', 'YOUR_API_KEY')
def vt_lookup(filepath):
with open(filepath, 'rb') as f:
sha256 = hashlib.sha256(f.read()).hexdigest()
print(f"SHA256: {sha256}")
print(f"Looking up on VirusTotal...")
client = vt.Client(API_KEY)
try:
file_info = client.get_object(f"/files/{sha256}")
stats = file_info.last_analysis_stats
total = sum(stats.values())
malicious = stats.get('malicious', 0)
print(f"\nDetections: {malicious}/{total}")
print(f"First seen: {file_info.first_submission_date}")
if hasattr(file_info, 'popular_threat_classification'):
label = file_info.popular_threat_classification.get('suggested_threat_label', 'N/A')
print(f"Classification: {label}")
print(f"\nTop detections:")
for engine, result in file_info.last_analysis_results.items():
if result['category'] == 'malicious':
print(f" {engine}: {result['result']}")
except vt.error.APIError as e:
if 'NotFoundError' in str(e):
print("Not found in VirusTotal (unknown sample)")
else:
print(f"API Error: {e}")
finally:
client.close()
if __name__ == '__main__':
vt_lookup(sys.argv[1])
Script 4: YARA scanner batch
#!/usr/bin/env python3
"""Escanear directorio con reglas YARA."""
import yara
import os
import sys
def scan_directory(rules_path, target_dir):
# Compilar reglas
if os.path.isdir(rules_path):
rule_files = {}
for f in os.listdir(rules_path):
if f.endswith('.yar') or f.endswith('.yara'):
rule_files[f] = os.path.join(rules_path, f)
rules = yara.compile(filepaths=rule_files)
else:
rules = yara.compile(filepath=rules_path)
print(f"Scanning {target_dir} with YARA rules...")
matches_found = 0
for root, dirs, files in os.walk(target_dir):
for filename in files:
filepath = os.path.join(root, filename)
try:
matches = rules.match(filepath, timeout=30)
if matches:
matches_found += 1
print(f"\n[MATCH] {filepath}")
for match in matches:
print(f" Rule: {match.rule}")
for s in match.strings[:5]: # Max 5 strings
print(f" String: {s}")
except yara.Error as e:
pass # Skip files that can't be scanned
print(f"\nTotal matches: {matches_found}")
if __name__ == '__main__':
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <rules_path> <target_dir>")
sys.exit(1)
scan_directory(sys.argv[1], sys.argv[2])
Script 5: pipeline de triage completo
#!/usr/bin/env python3
"""Pipeline completo de triage de malware."""
import hashlib
import math
import os
import sys
import json
from collections import Counter
from datetime import datetime
try:
import pefile
import yara
import requests
except ImportError as e:
print(f"Missing dependency: {e}. Install with pip.")
sys.exit(1)
class MalwareTriage:
def __init__(self, filepath):
self.filepath = filepath
self.filename = os.path.basename(filepath)
with open(filepath, 'rb') as f:
self.data = f.read()
self.results = {
'filename': self.filename,
'size': len(self.data),
'hashes': {},
'file_type': '',
'pe_info': {},
'iocs': {},
'verdicts': [],
'risk_score': 0
}
def calculate_hashes(self):
self.results['hashes'] = {
'md5': hashlib.md5(self.data).hexdigest(),
'sha1': hashlib.sha1(self.data).hexdigest(),
'sha256': hashlib.sha256(self.data).hexdigest()
}
def calculate_entropy(self, data=None):
if data is None:
data = self.data
if not data:
return 0
counter = Counter(data)
length = len(data)
return -sum((c/length) * math.log2(c/length) for c in counter.values())
def identify_type(self):
if self.data[:2] == b'MZ':
self.results['file_type'] = 'PE'
elif self.data[:4] == b'\x7fELF':
self.results['file_type'] = 'ELF'
elif self.data[:4] == b'%PDF':
self.results['file_type'] = 'PDF'
elif self.data[:8] == b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1':
self.results['file_type'] = 'OLE (Office)'
elif self.data[:2] == b'PK':
self.results['file_type'] = 'ZIP/OOXML'
else:
self.results['file_type'] = 'Unknown'
def analyze_pe(self):
if self.results['file_type'] != 'PE':
return
try:
pe = pefile.PE(data=self.data)
except:
return
info = {}
info['compile_time'] = str(datetime.utcfromtimestamp(pe.FILE_HEADER.TimeDateStamp))
info['entry_point'] = hex(pe.OPTIONAL_HEADER.AddressOfEntryPoint)
info['sections'] = []
for s in pe.sections:
name = s.Name.decode('utf-8', errors='replace').rstrip('\x00')
entropy = s.get_entropy()
info['sections'].append({
'name': name, 'entropy': round(entropy, 2),
'raw_size': s.SizeOfRawData, 'virtual_size': s.Misc_VirtualSize
})
if entropy > 7.0:
self.results['verdicts'].append(f"High entropy section: {name} ({entropy:.2f})")
self.results['risk_score'] += 20
# Imports
suspicious_imports = []
injection_apis = ['CreateRemoteThread', 'VirtualAllocEx', 'WriteProcessMemory',
'NtUnmapViewOfSection', 'QueueUserAPC']
if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
info['import_count'] = sum(len(e.imports) for e in pe.DIRECTORY_ENTRY_IMPORT)
for entry in pe.DIRECTORY_ENTRY_IMPORT:
for imp in entry.imports:
if imp.name and imp.name.decode('utf-8', 'replace') in injection_apis:
suspicious_imports.append(imp.name.decode())
if info['import_count'] < 5:
self.results['verdicts'].append("Very few imports (possible packing)")
self.results['risk_score'] += 15
if suspicious_imports:
info['suspicious_imports'] = suspicious_imports
self.results['verdicts'].append(f"Injection APIs: {', '.join(suspicious_imports)}")
self.results['risk_score'] += 30
self.results['pe_info'] = info
def extract_iocs(self):
import re
strings = re.findall(b'[\x20-\x7e]{6,}', self.data)
text = b'\n'.join(strings).decode('utf-8', errors='replace')
self.results['iocs'] = {
'urls': list(set(re.findall(r'https?://[^\s<>"\']+', text)))[:20],
'ips': list(set(ip for ip in re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', text)
if not ip.startswith(('0.', '127.', '255.'))))[:20],
'emails': list(set(re.findall(r'[\w.+-]+@[\w-]+\.[\w.]+', text)))[:10]
}
if self.results['iocs']['urls']:
self.results['risk_score'] += 10
if self.results['iocs']['ips']:
self.results['risk_score'] += 10
def run(self):
self.calculate_hashes()
self.identify_type()
self.analyze_pe()
self.extract_iocs()
# Overall entropy
total_entropy = self.calculate_entropy()
self.results['total_entropy'] = round(total_entropy, 2)
if total_entropy > 7.0:
self.results['risk_score'] += 15
# Risk classification
score = self.results['risk_score']
if score >= 50:
self.results['classification'] = 'HIGH RISK - Likely malicious'
elif score >= 25:
self.results['classification'] = 'MEDIUM RISK - Suspicious'
else:
self.results['classification'] = 'LOW RISK - Possibly benign'
return self.results
if __name__ == '__main__':
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <file>")
sys.exit(1)
triage = MalwareTriage(sys.argv[1])
results = triage.run()
print(json.dumps(results, indent=2))
Librerías útiles
| Librería | pip install | Uso |
|---|---|---|
| pefile | pefile | PE parsing completo |
| lief | lief | PE/ELF/MachO parsing |
| yara-python | yara-python | YARA scanning |
| capstone | capstone | Disassembly (x86, ARM, MIPS) |
| unicorn | unicorn | CPU emulation |
| vt-py | vt-py | VirusTotal API v3 |
| oletools | oletools | Office document analysis |
| dnfile | dnfile | .NET assembly parsing |
| ssdeep | ssdeep | Fuzzy hashing |
| pycryptodomex | pycryptodomex | Crypto (descifrar configs) |
| requests | requests | HTTP (APIs de threat intel) |
| scapy | scapy | Network packet analysis |
Fuentes y referencias
- erocarrera. "pefile: Python PE Parsing." https://github.com/erocarrera/pefile
- LIEF. "Library to Instrument Executable Formats." https://lief-project.github.io/
- Alvarez, V. "yara-python." https://github.com/VirusTotal/yara-python
- VirusTotal. "VT API v3 Documentation." https://docs.virustotal.com/
- Seitz, J. & Arnold, T. "Black Hat Python." No Starch Press, 2021.
Preguntas frecuentes
Libros recomendados
Artículos relacionados
Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.