125 lines
4.7 KiB
Python
125 lines
4.7 KiB
Python
import httpx
|
|
import asyncio
|
|
import random
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
|
|
# Abuse.ch ThreatFox API - Real-time indicators of compromise
|
|
THREATFOX_URL = "https://threatfox-api.abuse.ch/api/v1/"
|
|
|
|
CACHE_FILE = Path(__file__).parent.parent / ".cache" / "cyber.json"
|
|
CACHE_DURATION_SEC = 300 # 5 minutes
|
|
|
|
def _get_cached_cyber():
|
|
if CACHE_FILE.exists() and (datetime.now().timestamp() - CACHE_FILE.stat().st_mtime) < CACHE_DURATION_SEC:
|
|
try:
|
|
with open(CACHE_FILE, "r") as f:
|
|
return json.load(f)
|
|
except: pass
|
|
return None
|
|
|
|
def _save_cache(data):
|
|
CACHE_FILE.parent.mkdir(exist_ok=True, parents=True)
|
|
try:
|
|
with open(CACHE_FILE, "w") as f:
|
|
json.dump(data, f)
|
|
except: pass
|
|
|
|
# Fallback geocoding for major infrastructure if IP geocoding fails or to show targets
|
|
CYBER_TARGETS = [
|
|
{"name": "AWS US-East", "lat": 39.04, "lon": -77.48},
|
|
{"name": "Google Cloud Europe", "lat": 50.11, "lon": 8.68},
|
|
{"name": "Azure East Asia", "lat": 22.28, "lon": 114.17},
|
|
{"name": "DE-CIX Frankfurt", "lat": 50.12, "lon": 8.67},
|
|
{"name": "London Internet Exchange", "lat": 51.51, "lon": -0.12},
|
|
{"name": "Equinix Ashburn", "lat": 39.03, "lon": -77.45},
|
|
]
|
|
|
|
async def _get_ip_geo(client, ip):
|
|
"""Real-time geocoding for malicious IPs using ip-api.com (free for non-commercial)."""
|
|
try:
|
|
# Rate limit is 45 requests per minute
|
|
resp = await client.get(f"http://ip-api.com/json/{ip}?fields=status,lat,lon,country", timeout=2.0)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
if data.get("status") == "success":
|
|
return data.get("lat"), data.get("lon"), data.get("country")
|
|
except: pass
|
|
return None, None, None
|
|
|
|
async def fetch_cyber_warfare(bgp_data=None):
|
|
"""
|
|
Fetches REAL cyber threat data from Abuse.ch ThreatFox.
|
|
Geocodes the malicious sources and maps them to critical infrastructure targets.
|
|
"""
|
|
cached = _get_cached_cyber()
|
|
if cached is not None:
|
|
return cached
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
# Query latest 20 malware/botnet indicators
|
|
payload = {"query": "get_iocs", "days": 1}
|
|
resp = await client.post(THREATFOX_URL, json=payload, timeout=10.0)
|
|
|
|
if resp.status_code != 200:
|
|
if CACHE_FILE.exists():
|
|
try:
|
|
with open(CACHE_FILE, "r") as f: return json.load(f)
|
|
except: pass
|
|
return []
|
|
|
|
data = resp.json()
|
|
if data.get("query_status") != "ok":
|
|
if CACHE_FILE.exists():
|
|
try:
|
|
with open(CACHE_FILE, "r") as f: return json.load(f)
|
|
except: pass
|
|
return []
|
|
|
|
iocs = data.get("data", [])
|
|
# Filter for IP addresses (IPv4)
|
|
ip_iocs = [i for i in iocs if i.get("threat_type") in ["botnet_cc", "payload_delivery"] and ":" not in i.get("ioc", "")]
|
|
|
|
attacks = []
|
|
# Geocode only a subset to respect ip-api limits
|
|
sample_size = min(len(ip_iocs), 8)
|
|
sampled = random.sample(ip_iocs, sample_size) if len(ip_iocs) > sample_size else ip_iocs
|
|
|
|
for ioc in sampled:
|
|
ip = ioc["ioc"].split(":")[0]
|
|
lat, lon, country = await _get_ip_geo(client, ip)
|
|
|
|
if lat and lon:
|
|
target = random.choice(CYBER_TARGETS)
|
|
intensity = random.uniform(0.4, 1.0)
|
|
|
|
attacks.append({
|
|
"id": f"real_cyb_{ioc['id']}",
|
|
"source_name": f"{ioc['threat_type'].upper()} ({country or 'Unknown'})",
|
|
"target_name": target["name"],
|
|
"startLat": lat,
|
|
"startLng": lon,
|
|
"endLat": target["lat"],
|
|
"endLng": target["lon"],
|
|
"type": ioc["malware_printable"] or "Unknown Malware",
|
|
"intensity": intensity,
|
|
"color": "#ff003c" if "botnet" in ioc["threat_type"] else "#ff8800",
|
|
"timestamp": ioc["first_seen"]
|
|
})
|
|
|
|
_save_cache(attacks)
|
|
return attacks
|
|
|
|
except Exception as e:
|
|
print(f"[CYBER] Real data fetch error: {e}")
|
|
if CACHE_FILE.exists():
|
|
try:
|
|
with open(CACHE_FILE, "r") as f: return json.load(f)
|
|
except: pass
|
|
return []
|
|
|
|
if __name__ == "__main__":
|
|
print(asyncio.run(fetch_cyber_warfare()))
|