876 lines
31 KiB
Python
876 lines
31 KiB
Python
import asyncio
|
||
import json
|
||
import re
|
||
import shutil
|
||
from datetime import datetime, timezone
|
||
|
||
import httpx
|
||
|
||
OLLAMA_URL = "http://localhost:11434/api/generate"
|
||
MODEL_NAME = "llama3.2"
|
||
|
||
# CLI backend: "claude" uses Claude Code CLI (no API key needed if logged in),
|
||
# "gemini" uses Gemini CLI (free with Google account), "ollama" = local llama3.2
|
||
# Auto-detect: prefers claude > gemini > ollama
|
||
def _detect_cli_backend() -> str:
|
||
for tool in ("gemini", "claude"):
|
||
if shutil.which(tool):
|
||
return tool
|
||
return "ollama"
|
||
|
||
_CLI_BACKEND = _detect_cli_backend()
|
||
|
||
|
||
async def _query_cli(prompt: str) -> dict | None:
|
||
"""Call claude or gemini CLI as subprocess, parse JSON from response."""
|
||
try:
|
||
cmd = [_CLI_BACKEND, "-p", prompt]
|
||
proc = await asyncio.create_subprocess_exec(
|
||
*cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30.0)
|
||
raw = stdout.decode().strip()
|
||
if not raw:
|
||
return None
|
||
# Strip markdown code fences if present
|
||
if "```json" in raw:
|
||
raw = raw.split("```json", 1)[1].split("```", 1)[0].strip()
|
||
elif "```" in raw:
|
||
raw = raw.split("```", 1)[1].split("```", 1)[0].strip()
|
||
# Extract first JSON object if there's surrounding text
|
||
m = re.search(r'\{.*\}', raw, re.DOTALL)
|
||
if m:
|
||
raw = m.group(0)
|
||
parsed = json.loads(raw)
|
||
return parsed if isinstance(parsed, dict) else None
|
||
except Exception as exc:
|
||
print(f"[AI] CLI query failed ({_CLI_BACKEND}): {exc}")
|
||
return None
|
||
THREAT_LEVELS = ("LOW", "GUARDED", "ELEVATED", "HIGH", "SEVERE")
|
||
|
||
KEYWORD_SCORES = {
|
||
"war": 4,
|
||
"missile": 4,
|
||
"strike": 4,
|
||
"attack": 4,
|
||
"explosion": 3,
|
||
"drone": 3,
|
||
"nuclear": 5,
|
||
"cyber": 3,
|
||
"hijack": 4,
|
||
"sanction": 2,
|
||
"military": 3,
|
||
"troops": 3,
|
||
"border": 2,
|
||
"earthquake": 2,
|
||
"tsunami": 4,
|
||
}
|
||
|
||
# Hotspot regions with bounding boxes for spatial movement analysis
|
||
# bbox = (lat_min, lon_min, lat_max, lon_max)
|
||
HOTSPOT_REGIONS = {
|
||
"ukraine": {
|
||
"name": "Ukraine / Black Sea", "lat": 48.38, "lon": 31.17,
|
||
"bbox": (44, 25, 52, 40),
|
||
"keywords": ["ukraine", "kyiv", "kharkiv", "crimea", "donbas", "odesa", "bakhmut", "zaporizhzhia"],
|
||
"baseline": "Escalation risk around frontline",
|
||
},
|
||
"russia": {
|
||
"name": "Western Russia", "lat": 55.76, "lon": 37.62,
|
||
"bbox": (52, 30, 62, 48),
|
||
"keywords": ["russia", "moscow", "kursk"],
|
||
"baseline": "Military posture shifts",
|
||
},
|
||
"israel": {
|
||
"name": "Israel / Palestine", "lat": 31.05, "lon": 34.85,
|
||
"bbox": (29, 33, 33.5, 36),
|
||
"keywords": ["israel", "gaza", "west bank", "tel aviv", "jerusalem", "rafah", "idf", "hamas", "hezbollah"],
|
||
"baseline": "Cross-border retaliation risk",
|
||
},
|
||
"iran": {
|
||
"name": "Iran", "lat": 32.43, "lon": 53.69,
|
||
"bbox": (25, 44, 40, 63),
|
||
"keywords": ["iran", "tehran", "irgc"],
|
||
"baseline": "Strategic force posture shift",
|
||
},
|
||
"syria_lebanon": {
|
||
"name": "Syria / Lebanon", "lat": 34.80, "lon": 37.00,
|
||
"bbox": (32, 35, 37.5, 42),
|
||
"keywords": ["syria", "damascus", "lebanon", "aleppo"],
|
||
"baseline": "Proxy activity risk",
|
||
},
|
||
"taiwan": {
|
||
"name": "Taiwan Strait", "lat": 23.70, "lon": 120.96,
|
||
"bbox": (21, 117, 27, 124),
|
||
"keywords": ["taiwan", "taipei", "taiwan strait"],
|
||
"baseline": "Maritime and airspace pressure",
|
||
},
|
||
"south_china_sea": {
|
||
"name": "South China Sea", "lat": 12.0, "lon": 113.0,
|
||
"bbox": (5, 107, 22, 120),
|
||
"keywords": ["south china sea", "spratly", "paracel"],
|
||
"baseline": "Naval maneuvering risk",
|
||
},
|
||
"korea": {
|
||
"name": "Korean Peninsula", "lat": 37.57, "lon": 126.98,
|
||
"bbox": (33, 124, 43, 131),
|
||
"keywords": ["korea", "pyongyang", "dprk", "north korea", "south korea", "seoul"],
|
||
"baseline": "Missile test / military response risk",
|
||
},
|
||
"red_sea": {
|
||
"name": "Red Sea / Bab-el-Mandeb", "lat": 15.0, "lon": 42.0,
|
||
"bbox": (10, 38, 22, 46),
|
||
"keywords": ["red sea", "houthi", "yemen", "bab-el-mandeb", "aden"],
|
||
"baseline": "Shipping disruption risk",
|
||
},
|
||
"hormuz": {
|
||
"name": "Strait of Hormuz", "lat": 26.58, "lon": 56.25,
|
||
"bbox": (24, 54, 28, 58),
|
||
"keywords": ["hormuz", "persian gulf"],
|
||
"baseline": "Energy transit disruption risk",
|
||
},
|
||
"suez": {
|
||
"name": "Suez Canal", "lat": 29.93, "lon": 32.55,
|
||
"bbox": (28, 31, 32, 34),
|
||
"keywords": ["suez"],
|
||
"baseline": "Maritime chokepoint risk",
|
||
},
|
||
"sahel": {
|
||
"name": "Sahel / West Africa", "lat": 14.0, "lon": 0.0,
|
||
"bbox": (8, -10, 20, 15),
|
||
"keywords": ["mali", "niger", "burkina faso", "sahel", "nigeria"],
|
||
"baseline": "Insurgency / instability risk",
|
||
},
|
||
}
|
||
|
||
# ── Region definitions for grouping events into theaters ──
|
||
REGION_DEFS: dict[str, list[str]] = {
|
||
"Eastern Europe": [
|
||
"Ukraine", "Russia", "Crimea", "Donbas", "Donbass", "Kharkiv", "Kyiv",
|
||
"Kiev", "Odesa", "Mariupol", "Bakhmut", "Zaporizhzhia", "Kherson",
|
||
"Moldova", "Belarus", "Avdiivka",
|
||
],
|
||
"Middle East": [
|
||
"Israel", "Gaza", "Palestine", "Lebanon", "Hezbollah", "Syria",
|
||
"Damascus", "Iran", "Tehran", "Iraq", "Baghdad", "Jordan",
|
||
"West Bank", "Rafah", "Jenin", "Nablus", "Jerusalem", "Tel Aviv",
|
||
],
|
||
"Red Sea & Horn of Africa": [
|
||
"Red Sea", "Yemen", "Houthi", "Somalia", "Sudan", "Ethiopia",
|
||
"Bab-el-Mandeb", "Suez",
|
||
],
|
||
"East Asia": [
|
||
"China", "Taiwan", "Taipei", "North Korea", "Pyongyang",
|
||
"South Korea", "Japan", "South China Sea", "Taiwan Strait",
|
||
"East China Sea",
|
||
],
|
||
"South Asia": [
|
||
"India", "Pakistan", "Afghanistan", "Kashmir", "Myanmar", "Kabul",
|
||
],
|
||
"Sub-Saharan Africa": [
|
||
"Mali", "Niger", "Nigeria", "Congo", "Sahel", "Burkina Faso",
|
||
"Mozambique", "Libya",
|
||
],
|
||
"Europe / NATO": [
|
||
"NATO", "Poland", "Finland", "Sweden", "Norway", "Baltic",
|
||
"Germany", "France", "UK", "Serbia", "Kosovo", "Georgia", "Romania",
|
||
],
|
||
}
|
||
|
||
_WEIGHT_ORDER = {"CRITICAL": 0, "HIGH": 1, "ELEVATED": 2, "GUARDED": 3, "ACTIVE": 4}
|
||
|
||
|
||
def _match_region(title: str) -> str | None:
|
||
"""Match a headline to a geopolitical region."""
|
||
tl = title.lower()
|
||
for region, keywords in REGION_DEFS.items():
|
||
for kw in keywords:
|
||
if kw.lower() in tl:
|
||
return region
|
||
return None
|
||
|
||
|
||
def _regional_briefs(news_items: list, conflicts: list) -> list[dict]:
|
||
"""Group events by region and produce per-region threat assessments."""
|
||
region_events: dict[str, list] = {}
|
||
|
||
for item in (conflicts or []):
|
||
region = _match_region(item.get("title", ""))
|
||
if region:
|
||
region_events.setdefault(region, []).append(item)
|
||
|
||
for item in (news_items or [])[:30]:
|
||
region = _match_region(item.get("title", ""))
|
||
if region:
|
||
region_events.setdefault(region, []).append(
|
||
{"title": item["title"], "severity": "INFO", "event_type": "INTEL"}
|
||
)
|
||
|
||
briefs = []
|
||
for region, events in region_events.items():
|
||
sev_counts: dict[str, int] = {}
|
||
for ev in events:
|
||
s = ev.get("severity", "INFO")
|
||
sev_counts[s] = sev_counts.get(s, 0) + 1
|
||
|
||
critical = sev_counts.get("CRITICAL", 0)
|
||
high = sev_counts.get("HIGH", 0)
|
||
|
||
if critical >= 3:
|
||
status = "CRITICAL"
|
||
elif critical >= 1 or high >= 3:
|
||
status = "HIGH"
|
||
elif high >= 1:
|
||
status = "ELEVATED"
|
||
else:
|
||
status = "ACTIVE"
|
||
|
||
top = [e for e in events if e.get("severity") in ("CRITICAL", "HIGH")][:3]
|
||
if top:
|
||
summary = "; ".join(
|
||
e.get("event_type", e.get("title", "")[:40]) for e in top[:2]
|
||
)
|
||
else:
|
||
summary = f"{len(events)} events tracked in region"
|
||
|
||
briefs.append({
|
||
"region": region,
|
||
"status": status,
|
||
"event_count": len(events),
|
||
"critical": critical,
|
||
"high": high,
|
||
"summary": summary[:120],
|
||
})
|
||
|
||
briefs.sort(key=lambda b: (_WEIGHT_ORDER.get(b["status"], 5), -b["event_count"]))
|
||
return briefs[:6]
|
||
|
||
|
||
def _key_drivers(
|
||
score: int, reasons: list[str], gps_data: list, bgp_data: dict,
|
||
conflicts: list, space_weather: dict, planes: list = None, ships: list = None,
|
||
) -> list[dict]:
|
||
"""Identify and rank the top threat signal drivers."""
|
||
drivers: list[dict] = []
|
||
|
||
# GPS Interference
|
||
gps_count = len(gps_data or [])
|
||
if gps_count:
|
||
drivers.append({
|
||
"signal": "GPS Interference",
|
||
"detail": f"{gps_count} active jamming zone{'s' if gps_count > 1 else ''} detected via ADS-B navigation anomalies",
|
||
"weight": "CRITICAL" if gps_count >= 3 else "HIGH",
|
||
"icon": "⬡",
|
||
})
|
||
|
||
# Military Conflicts
|
||
if conflicts:
|
||
critical = sum(1 for c in conflicts if c.get("severity") == "CRITICAL")
|
||
high = sum(1 for c in conflicts if c.get("severity") == "HIGH")
|
||
active_regions = set(
|
||
_match_region(c.get("title", "")) for c in conflicts
|
||
if _match_region(c.get("title", ""))
|
||
)
|
||
if critical or high:
|
||
drivers.append({
|
||
"signal": "Military Activity",
|
||
"detail": f"{critical} CRITICAL, {high} HIGH severity events across {len(active_regions)} region{'s' if len(active_regions) != 1 else ''}",
|
||
"weight": "CRITICAL" if critical >= 5 else "HIGH" if critical >= 1 else "ELEVATED",
|
||
"icon": "⚔",
|
||
})
|
||
|
||
# BGP Routing
|
||
bgp_status = str((bgp_data or {}).get("status", "STABLE")).upper()
|
||
if bgp_status in ("ELEVATED", "CRITICAL"):
|
||
updates = (bgp_data or {}).get("total_updates", 0)
|
||
drivers.append({
|
||
"signal": "BGP Routing",
|
||
"detail": f"Internet routing instability: {updates:,} updates/hour ({bgp_status})",
|
||
"weight": bgp_status,
|
||
"icon": "📡",
|
||
})
|
||
|
||
# Space Weather
|
||
sw = space_weather or {}
|
||
kp = sw.get("kp_index", 0)
|
||
if isinstance(kp, (int, float)) and kp >= 4:
|
||
drivers.append({
|
||
"signal": "Space Weather",
|
||
"detail": f"Geomagnetic storm Kp={kp:.1f} — {sw.get('description', 'HF radio / satellite disruption risk')}",
|
||
"weight": "CRITICAL" if kp >= 7 else "HIGH" if kp >= 5 else "ELEVATED",
|
||
"icon": "☀",
|
||
})
|
||
|
||
# News keywords
|
||
keyword_drivers = [r for r in reasons if "×" in r]
|
||
if keyword_drivers:
|
||
top_kw = keyword_drivers[:3]
|
||
drivers.append({
|
||
"signal": "Intel Keywords",
|
||
"detail": f"High-weight terms in news: {', '.join(top_kw)}",
|
||
"weight": "ELEVATED" if score >= 8 else "GUARDED",
|
||
"icon": "📰",
|
||
})
|
||
|
||
# Military air movement
|
||
mil_count = sum(1 for p in (planes or []) if p.get("military"))
|
||
total_planes = len(planes or [])
|
||
if mil_count >= 5:
|
||
drivers.append({
|
||
"signal": "Military Air Movement",
|
||
"detail": f"{mil_count} military aircraft tracked globally ({total_planes:,} total airframes monitored)",
|
||
"weight": "HIGH" if mil_count >= 30 else "ELEVATED" if mil_count >= 15 else "GUARDED",
|
||
"icon": "✈",
|
||
})
|
||
|
||
# Maritime movement
|
||
ship_count = len(ships or [])
|
||
if ship_count >= 10:
|
||
tankers = sum(1 for s in (ships or []) if str(s.get("type", "")).lower() in ("tanker", "lng carrier"))
|
||
drivers.append({
|
||
"signal": "Maritime Activity",
|
||
"detail": f"{ship_count} vessels tracked globally" + (f" ({tankers} tankers near chokepoints)" if tankers >= 5 else ""),
|
||
"weight": "ELEVATED" if ship_count >= 100 else "GUARDED",
|
||
"icon": "🚢",
|
||
})
|
||
|
||
drivers.sort(key=lambda d: _WEIGHT_ORDER.get(d["weight"], 5))
|
||
return drivers[:6]
|
||
|
||
|
||
def _correlations(
|
||
gps_data: list, conflicts: list, bgp_data: dict, space_weather: dict,
|
||
) -> list[str]:
|
||
"""Find cross-domain signal correlations."""
|
||
out: list[str] = []
|
||
|
||
conflict_regions = set(
|
||
_match_region(c.get("title", "")) for c in (conflicts or [])
|
||
if _match_region(c.get("title", ""))
|
||
)
|
||
|
||
if gps_data and conflicts:
|
||
for z in gps_data:
|
||
glat, glon = z.get("lat", 0), z.get("lon", 0)
|
||
if 35 < glat < 55 and 25 < glon < 45 and "Eastern Europe" in conflict_regions:
|
||
out.append(
|
||
"GPS jamming in Eastern Europe coincides with active military operations "
|
||
"— likely electronic warfare activity"
|
||
)
|
||
break
|
||
for z in gps_data:
|
||
glat, glon = z.get("lat", 0), z.get("lon", 0)
|
||
if 25 < glat < 40 and 30 < glon < 50 and "Middle East" in conflict_regions:
|
||
out.append(
|
||
"GPS interference near Middle Eastern conflict zones suggests coordinated "
|
||
"electronic warfare alongside kinetic operations"
|
||
)
|
||
break
|
||
|
||
bgp_status = str((bgp_data or {}).get("status", "STABLE")).upper()
|
||
if bgp_status in ("ELEVATED", "CRITICAL") and conflicts:
|
||
out.append(
|
||
f"BGP routing instability ({bgp_status}) during active military operations "
|
||
"may indicate cyber operations or infrastructure targeting"
|
||
)
|
||
|
||
sw = space_weather or {}
|
||
kp = sw.get("kp_index", 0) if isinstance(sw.get("kp_index"), (int, float)) else 0
|
||
if kp >= 4 and gps_data:
|
||
out.append(
|
||
f"Elevated geomagnetic activity (Kp={kp:.1f}) may amplify GPS signal "
|
||
"degradation in existing interference zones"
|
||
)
|
||
if kp >= 5:
|
||
out.append(
|
||
f"Geomagnetic storm (Kp={kp:.1f}) affecting HF radio propagation — "
|
||
"military comms may shift to satellite links"
|
||
)
|
||
|
||
if not out:
|
||
out.append("No significant cross-domain signal correlations detected at this time")
|
||
return out[:4]
|
||
|
||
|
||
def _watch_items(conflicts: list, gps_data: list, space_weather: dict) -> list[str]:
|
||
"""Generate actionable intelligence watch items."""
|
||
items: list[str] = []
|
||
|
||
if conflicts:
|
||
critical = [c for c in conflicts if c.get("severity") == "CRITICAL"]
|
||
if critical:
|
||
regions: dict[str, list] = {}
|
||
for c in critical[:15]:
|
||
r = _match_region(c.get("title", "")) or "Global"
|
||
regions.setdefault(r, []).append(c)
|
||
for region, evts in sorted(regions.items(), key=lambda x: -len(x[1])):
|
||
items.append(
|
||
f"Monitor {region}: {len(evts)} critical event{'s' if len(evts) > 1 else ''} "
|
||
f"— {evts[0].get('event_type', 'military activity')}"
|
||
)
|
||
|
||
if gps_data:
|
||
items.append(
|
||
f"Track {len(gps_data)} GPS interference zone{'s' if len(gps_data) > 1 else ''} "
|
||
"— assess impact on aviation safety and military navigation"
|
||
)
|
||
|
||
sw = space_weather or {}
|
||
if sw.get("alerts"):
|
||
items.append("Space weather alerts active — monitor satellite communication reliability")
|
||
|
||
if not items:
|
||
items.append("Maintain standard surveillance posture across all domains")
|
||
return items[:5]
|
||
|
||
|
||
def _threat_from_score(score: int) -> str:
|
||
if score >= 18:
|
||
return "SEVERE"
|
||
if score >= 13:
|
||
return "HIGH"
|
||
if score >= 8:
|
||
return "ELEVATED"
|
||
if score >= 4:
|
||
return "GUARDED"
|
||
return "LOW"
|
||
|
||
|
||
def _score_from_signals(news_items, gps_data, bgp_data) -> tuple[int, list[str]]:
|
||
score = 0
|
||
reasons: list[str] = []
|
||
|
||
titles = [str(item.get("title", "")).lower() for item in news_items[:20]]
|
||
title_blob = " ".join(titles)
|
||
|
||
for kw, weight in KEYWORD_SCORES.items():
|
||
hits = title_blob.count(kw)
|
||
if hits:
|
||
inc = min(hits, 3) * weight
|
||
score += inc
|
||
reasons.append(f"{kw}×{hits}")
|
||
|
||
gps_count = len(gps_data or [])
|
||
if gps_count:
|
||
bump = min(gps_count, 12)
|
||
score += bump
|
||
reasons.append(f"gps_anomalies={gps_count}")
|
||
|
||
bgp_status = str((bgp_data or {}).get("status", "STABLE")).upper()
|
||
if bgp_status == "ELEVATED":
|
||
score += 3
|
||
reasons.append("bgp=elevated")
|
||
elif bgp_status == "CRITICAL":
|
||
score += 6
|
||
reasons.append("bgp=critical")
|
||
elif bgp_status == "OFFLINE":
|
||
score += 2
|
||
reasons.append("bgp=offline")
|
||
|
||
return score, reasons
|
||
|
||
|
||
def _probability(score: int, offset: int) -> str:
|
||
pct = max(20, min(95, 35 + score * 3 + offset))
|
||
return f"{pct}%"
|
||
|
||
|
||
def _in_bbox(lat: float, lon: float, bbox: tuple) -> bool:
|
||
return bbox[0] <= lat <= bbox[2] and bbox[1] <= lon <= bbox[3]
|
||
|
||
|
||
def _heuristic_predictions(
|
||
news_items, gps_data, score: int,
|
||
conflicts=None, planes=None, ships=None,
|
||
) -> list[dict]:
|
||
"""
|
||
Generate predicted hotspots by cross-referencing:
|
||
- news keyword mentions (intel signal)
|
||
- conflict event severity (OSINT signal)
|
||
- military aircraft concentrations (movement signal)
|
||
- ship density in strategic chokepoints (maritime signal)
|
||
- GPS interference zones (EW signal)
|
||
"""
|
||
titles = [str(item.get("title", "")).lower() for item in (news_items or [])[:30]]
|
||
title_blob = " ".join(titles)
|
||
planes = planes or []
|
||
ships = ships or []
|
||
conflicts = conflicts or []
|
||
|
||
scored: list[tuple[float, str, dict]] = []
|
||
|
||
for key, region in HOTSPOT_REGIONS.items():
|
||
bbox = region["bbox"]
|
||
|
||
# ── 1. News keyword mentions ──
|
||
news_hits = sum(1 for kw in region["keywords"] if kw in title_blob)
|
||
news_score = news_hits * 3.0
|
||
|
||
# ── 2. Conflict event severity ──
|
||
conflict_crit = 0
|
||
conflict_high = 0
|
||
for c in conflicts:
|
||
ct = (c.get("title") or "").lower()
|
||
if any(kw in ct for kw in region["keywords"]):
|
||
sev = c.get("severity", "")
|
||
if sev == "CRITICAL":
|
||
conflict_crit += 1
|
||
elif sev == "HIGH":
|
||
conflict_high += 1
|
||
conflict_score = conflict_crit * 5.0 + conflict_high * 2.0
|
||
|
||
# ── 3. Military aircraft in region ──
|
||
mil_count = 0
|
||
civ_count = 0
|
||
for p in planes:
|
||
plat = p.get("lat")
|
||
plon = p.get("lon")
|
||
if isinstance(plat, (int, float)) and isinstance(plon, (int, float)):
|
||
if _in_bbox(plat, plon, bbox):
|
||
if p.get("military"):
|
||
mil_count += 1
|
||
else:
|
||
civ_count += 1
|
||
mil_score = min(mil_count * 2.0, 16.0)
|
||
|
||
# ── 4. Ship density near strategic chokepoints ──
|
||
ship_count = 0
|
||
tanker_count = 0
|
||
for s in ships:
|
||
slat = s.get("lat")
|
||
slon = s.get("lon")
|
||
if isinstance(slat, (int, float)) and isinstance(slon, (int, float)):
|
||
if _in_bbox(slat, slon, bbox):
|
||
ship_count += 1
|
||
if str(s.get("type", "")).lower() in ("tanker", "lng carrier"):
|
||
tanker_count += 1
|
||
ship_score = min(ship_count * 0.8 + tanker_count * 1.2, 10.0)
|
||
|
||
# ── 5. GPS interference overlap ──
|
||
gps_overlap = 0
|
||
for z in (gps_data or []):
|
||
zlat = z.get("lat", 0)
|
||
zlon = z.get("lon", 0)
|
||
if isinstance(zlat, (int, float)) and isinstance(zlon, (int, float)):
|
||
if _in_bbox(zlat, zlon, bbox):
|
||
gps_overlap += 1
|
||
ew_score = gps_overlap * 4.0
|
||
|
||
total = news_score + conflict_score + mil_score + ship_score + ew_score
|
||
if total < 1.0:
|
||
continue
|
||
|
||
# Build explanation string from active signals
|
||
signals = []
|
||
if news_hits:
|
||
signals.append(f"{news_hits} intel mentions")
|
||
if conflict_crit or conflict_high:
|
||
parts = []
|
||
if conflict_crit:
|
||
parts.append(f"{conflict_crit} CRITICAL")
|
||
if conflict_high:
|
||
parts.append(f"{conflict_high} HIGH")
|
||
signals.append(f"{'+'.join(parts)} events")
|
||
if mil_count:
|
||
signals.append(f"{mil_count} military aircraft tracked")
|
||
if ship_count:
|
||
desc = f"{ship_count} vessels"
|
||
if tanker_count:
|
||
desc += f" ({tanker_count} tankers)"
|
||
signals.append(desc)
|
||
if gps_overlap:
|
||
signals.append(f"{gps_overlap} GPS jamming zone{'s' if gps_overlap > 1 else ''}")
|
||
|
||
event = region["baseline"]
|
||
if signals:
|
||
event += " — " + ", ".join(signals[:3])
|
||
|
||
prob_pct = max(25, min(95, int(30 + total * 2.5 + score * 1.5)))
|
||
|
||
# Build AI-style reasoning text from active signals
|
||
reason_parts = []
|
||
if news_hits:
|
||
matching_kws = [kw for kw in region["keywords"] if kw in title_blob][:3]
|
||
reason_parts.append(
|
||
f"Multiple intelligence feeds ({news_hits}) reference "
|
||
f"{', '.join(matching_kws) if matching_kws else 'this region'}, "
|
||
f"indicating heightened international attention."
|
||
)
|
||
if conflict_crit:
|
||
reason_parts.append(
|
||
f"{conflict_crit} CRITICAL-severity conflict event{'s' if conflict_crit > 1 else ''} "
|
||
f"detected via OSINT, suggesting active kinetic operations or imminent escalation."
|
||
)
|
||
if conflict_high:
|
||
reason_parts.append(
|
||
f"{conflict_high} HIGH-severity event{'s' if conflict_high > 1 else ''} "
|
||
f"reported in the area, indicating elevated security posture."
|
||
)
|
||
if mil_count:
|
||
reason_parts.append(
|
||
f"ADS-B tracking shows {mil_count} military aircraft operating within the region"
|
||
f"{f' alongside {civ_count} civilian flights' if civ_count > 20 else ''}, "
|
||
f"{'which represents unusual concentration' if mil_count >= 5 else 'consistent with ongoing monitoring'}."
|
||
)
|
||
if ship_count:
|
||
s_detail = f"{ship_count} vessels tracked"
|
||
if tanker_count:
|
||
s_detail += f" including {tanker_count} tanker{'s' if tanker_count > 1 else ''}"
|
||
reason_parts.append(
|
||
f"Maritime surveillance identifies {s_detail} in strategic waters, "
|
||
f"{'raising chokepoint disruption concerns' if tanker_count >= 2 else 'indicating normal commerce flow'}."
|
||
)
|
||
if gps_overlap:
|
||
reason_parts.append(
|
||
f"Active GPS jamming/spoofing detected ({gps_overlap} zone{'s' if gps_overlap > 1 else ''}), "
|
||
f"a strong indicator of electronic warfare activity in the area."
|
||
)
|
||
if not reason_parts:
|
||
reason_parts.append(region["baseline"])
|
||
|
||
reason = " ".join(reason_parts)
|
||
|
||
scored.append((total, key, {
|
||
"location": region["name"],
|
||
"lat": region["lat"],
|
||
"lon": region["lon"],
|
||
"event": event[:200],
|
||
"reason": reason[:500],
|
||
"probability": f"{prob_pct}%",
|
||
"_signals": {
|
||
"news": news_hits, "conflicts": conflict_crit + conflict_high,
|
||
"mil_planes": mil_count, "ships": ship_count, "gps": gps_overlap,
|
||
},
|
||
}))
|
||
|
||
scored.sort(key=lambda x: x[0], reverse=True)
|
||
predictions = []
|
||
for _, _, item in scored[:4]:
|
||
# Strip internal fields
|
||
item.pop("_signals", None)
|
||
predictions.append(item)
|
||
|
||
# Fallback if nothing ranked
|
||
if not predictions:
|
||
predictions.append({
|
||
"location": "Global Monitoring",
|
||
"lat": 20.0, "lon": 10.0,
|
||
"event": "No dominant hotspot; maintain broad surveillance",
|
||
"reason": "Current intelligence signals do not indicate a concentrated threat in any single region. Continuing broad-spectrum monitoring across all sensor feeds.",
|
||
"probability": _probability(score, 0),
|
||
})
|
||
|
||
return predictions
|
||
|
||
|
||
def _sanitize_prediction(item: dict, fallback_prob: str) -> dict | None:
|
||
if not isinstance(item, dict):
|
||
return None
|
||
lat = item.get("lat")
|
||
lon = item.get("lon")
|
||
try:
|
||
lat = float(lat)
|
||
lon = float(lon)
|
||
except Exception:
|
||
return None
|
||
if not (-90 <= lat <= 90 and -180 <= lon <= 180):
|
||
return None
|
||
|
||
location = str(item.get("location", "Unknown Location")).strip()[:80]
|
||
event = str(item.get("event", "Potential activity shift")).strip()[:180]
|
||
prob = str(item.get("probability", fallback_prob)).strip()[:8]
|
||
if not re.match(r"^\d{1,3}%$", prob):
|
||
prob = fallback_prob
|
||
|
||
return {
|
||
"location": location or "Unknown Location",
|
||
"lat": lat,
|
||
"lon": lon,
|
||
"event": event or "Potential activity shift",
|
||
"reason": str(item.get("reason", "")).strip()[:500],
|
||
"probability": prob,
|
||
}
|
||
|
||
|
||
def _sanitize_llm_output(raw: dict, fallback_level: str, fallback_predictions: list[dict]) -> dict:
|
||
summary = str(raw.get("summary", "")).strip()
|
||
if not summary:
|
||
summary = "AI produced no summary; heuristic assessment applied."
|
||
summary = summary[:220]
|
||
|
||
level = str(raw.get("threat_level", "")).upper().strip()
|
||
if level not in THREAT_LEVELS:
|
||
level = fallback_level
|
||
|
||
# Build lookup of heuristic reasons by location name for fallback
|
||
heuristic_reasons: dict[str, str] = {}
|
||
for fp in fallback_predictions:
|
||
loc = str(fp.get("location", "")).lower().strip()
|
||
if loc and fp.get("reason"):
|
||
heuristic_reasons[loc] = fp["reason"]
|
||
|
||
clean_predictions: list[dict] = []
|
||
for p in raw.get("predictions", [])[:4]:
|
||
cleaned = _sanitize_prediction(p, fallback_predictions[0]["probability"])
|
||
if cleaned:
|
||
# If LLM didn't provide a reason, try to merge from heuristic
|
||
if not cleaned.get("reason"):
|
||
loc_key = str(cleaned.get("location", "")).lower().strip()
|
||
cleaned["reason"] = heuristic_reasons.get(loc_key, "")
|
||
clean_predictions.append(cleaned)
|
||
if len(clean_predictions) >= 4:
|
||
break
|
||
|
||
if len(clean_predictions) < 2:
|
||
clean_predictions = fallback_predictions
|
||
|
||
return {
|
||
"summary": summary,
|
||
"threat_level": level,
|
||
"predictions": clean_predictions,
|
||
}
|
||
|
||
|
||
async def _query_ollama(prompt: str) -> dict | None:
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.post(
|
||
OLLAMA_URL,
|
||
json={
|
||
"model": MODEL_NAME,
|
||
"prompt": prompt,
|
||
"stream": False,
|
||
"format": "json",
|
||
},
|
||
timeout=20.0,
|
||
)
|
||
if response.status_code != 200:
|
||
return None
|
||
|
||
payload = response.json()
|
||
raw = payload.get("response", "")
|
||
if not raw:
|
||
return None
|
||
|
||
try:
|
||
parsed = json.loads(raw)
|
||
return parsed if isinstance(parsed, dict) else None
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
async def analyze_threat(
|
||
news_items,
|
||
gps_data=None,
|
||
bgp_data=None,
|
||
conflicts=None,
|
||
space_weather=None,
|
||
ships=None,
|
||
planes=None,
|
||
):
|
||
if not news_items and not conflicts:
|
||
return {
|
||
"summary": "Awaiting intelligence data...",
|
||
"threat_level": "LOW",
|
||
"predictions": [],
|
||
"key_drivers": [],
|
||
"regional_briefs": [],
|
||
"correlations": [],
|
||
"watch_items": ["Maintain standard surveillance posture across all domains"],
|
||
"source": "heuristic",
|
||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
|
||
score, reasons = _score_from_signals(news_items, gps_data or [], bgp_data or {})
|
||
|
||
# Boost score from conflicts
|
||
if conflicts:
|
||
crit = sum(1 for c in conflicts if c.get("severity") == "CRITICAL")
|
||
high = sum(1 for c in conflicts if c.get("severity") == "HIGH")
|
||
conflict_bump = min(crit * 2 + high, 10)
|
||
score += conflict_bump
|
||
if conflict_bump:
|
||
reasons.append(f"conflicts={crit}C/{high}H")
|
||
|
||
base_level = _threat_from_score(score)
|
||
base_predictions = _heuristic_predictions(
|
||
news_items, gps_data or [], score,
|
||
conflicts=conflicts or [],
|
||
planes=planes or [],
|
||
ships=ships or [],
|
||
)
|
||
|
||
# Build enhanced intel sections
|
||
regional = _regional_briefs(news_items, conflicts or [])
|
||
drivers = _key_drivers(score, reasons, gps_data or [], bgp_data or {}, conflicts or [], space_weather or {}, planes=planes, ships=ships)
|
||
corr = _correlations(gps_data or [], conflicts or [], bgp_data or {}, space_weather or {})
|
||
watch = _watch_items(conflicts or [], gps_data or [], space_weather or {})
|
||
|
||
# Build movement summary for LLM prompt
|
||
mil_planes_total = sum(1 for p in (planes or []) if p.get("military"))
|
||
ship_total = len(ships or [])
|
||
movement_ctx = (
|
||
f"Military aircraft tracked: {mil_planes_total}. "
|
||
f"Vessels tracked: {ship_total}. "
|
||
)
|
||
# Summarize heuristic predictions for LLM context
|
||
hotspot_ctx = " | ".join(
|
||
f"{p['location']}({p['probability']}): {p['event'][:60]}"
|
||
for p in base_predictions[:3]
|
||
)
|
||
|
||
headlines = " | ".join(str(item.get('title', ''))[:80] for item in news_items[:6])
|
||
prompt = (
|
||
f"STRATCOM AI. Strict JSON only, no markdown.\n"
|
||
f"News: {headlines}\n"
|
||
f"Signals: GPS={len(gps_data or [])}, BGP={(bgp_data or {}).get('status','?')}, {movement_ctx.strip()}\n"
|
||
f"Hotspots: {hotspot_ctx}\n"
|
||
f'Return: {{"summary":"<1 sentence>","threat_level":"LOW|GUARDED|ELEVATED|HIGH|SEVERE",'
|
||
f'"predictions":[{{"location":"","lat":0,"lon":0,"event":"","probability":"NN%","reason":""}}]}}'
|
||
)
|
||
|
||
try:
|
||
if _CLI_BACKEND != "ollama":
|
||
llm_raw = await _query_cli(prompt)
|
||
source_name = _CLI_BACKEND
|
||
else:
|
||
llm_raw = await _query_ollama(prompt)
|
||
source_name = MODEL_NAME
|
||
if llm_raw:
|
||
sanitized = _sanitize_llm_output(llm_raw, base_level, base_predictions)
|
||
sanitized["source"] = source_name
|
||
sanitized["generated_at"] = datetime.now(timezone.utc).isoformat()
|
||
sanitized["key_drivers"] = drivers
|
||
sanitized["regional_briefs"] = regional
|
||
sanitized["correlations"] = corr
|
||
sanitized["watch_items"] = watch
|
||
return sanitized
|
||
except Exception as exc:
|
||
print(f"[AI] LLM path failed ({_CLI_BACKEND}), fallback engaged: {exc}")
|
||
|
||
summary = f"{base_level} risk posture based on {len(news_items)} intelligence items"
|
||
if conflicts:
|
||
summary += f" + {len(conflicts)} military events"
|
||
if reasons:
|
||
summary += f"; key drivers: {', '.join(reasons[:4])}."
|
||
else:
|
||
summary += "."
|
||
|
||
return {
|
||
"summary": summary[:220],
|
||
"threat_level": base_level,
|
||
"predictions": base_predictions,
|
||
"key_drivers": drivers,
|
||
"regional_briefs": regional,
|
||
"correlations": corr,
|
||
"watch_items": watch,
|
||
"source": "heuristic",
|
||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
TEST = [{"title": "Missile strike reported near border region"}, {"title": "Cyberattack impacts telecom routing"}]
|
||
print(asyncio.run(analyze_threat(TEST, gps_data=[{"lat": 32.1, "lon": 35.0}], bgp_data={"status": "ELEVATED"})))
|