265 lines
No EOL
8 KiB
Python
265 lines
No EOL
8 KiB
Python
import requests
|
|
import json
|
|
import yfinance as yf
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
import time
|
|
|
|
BASE_URL = "https://duriin.imbenji.net"
|
|
USERNAME = "admin"
|
|
PASSWORD = "changeme"
|
|
|
|
SQL_QUERY = """
|
|
SELECT
|
|
ep.id,
|
|
ep.event_date,
|
|
ep.direction,
|
|
ep.magnitude,
|
|
ep.timeframe,
|
|
substr(ep.rationale, 1, 100) as rationale,
|
|
tc.name,
|
|
tc.ticker
|
|
FROM event_predictions ep
|
|
JOIN tracked_companies tc ON ep.company_id = tc.id
|
|
WHERE ep.event_date >= '2020-01-01'
|
|
AND ep.event_date <= '2026-01-01'
|
|
AND ep.direction IN ('positive', 'negative')
|
|
AND tc.ticker NOT LIKE '%.%'
|
|
AND tc.ticker NOT LIKE '%ORIGIN%'
|
|
AND tc.ticker NOT LIKE '%PRIVATE%'
|
|
AND tc.ticker NOT LIKE '%DEEPSEEK%'
|
|
AND tc.ticker NOT LIKE '%ANTHROPIC%'
|
|
AND tc.ticker NOT LIKE '%OPENAI%'
|
|
AND tc.ticker NOT LIKE '%BYTEDANCE%'
|
|
AND tc.ticker NOT LIKE '%HUAWEI%'
|
|
AND tc.ticker NOT LIKE '%SCALEAI%'
|
|
AND tc.ticker NOT LIKE '%MISTRAL%'
|
|
AND tc.ticker NOT LIKE '%COHERE%'
|
|
AND tc.ticker NOT LIKE '%GROQ%'
|
|
AND tc.ticker NOT LIKE '%INFLECTION%'
|
|
AND tc.ticker NOT LIKE '%STABILITY%'
|
|
AND tc.ticker NOT LIKE '%SPACEX%'
|
|
AND tc.ticker NOT LIKE '%MCKINSEY%'
|
|
AND tc.ticker NOT LIKE '%DELOITTE%'
|
|
AND tc.ticker NOT LIKE '%XAI%'
|
|
AND length(tc.ticker) <= 5
|
|
ORDER BY ep.event_date DESC
|
|
LIMIT 500
|
|
"""
|
|
|
|
|
|
def fetch_predictions():
|
|
resp = requests.post(
|
|
f"{BASE_URL}/admin/api/sql",
|
|
json={"sql": SQL_QUERY, "database": "intelligence"},
|
|
auth=(USERNAME, PASSWORD),
|
|
timeout=30
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
# shape: {"results": [{"sql": "...", "rows": [...]}], "elapsed": 0}
|
|
if isinstance(data, list):
|
|
return data
|
|
if "results" in data and isinstance(data["results"], list) and len(data["results"]) > 0:
|
|
result = data["results"][0]
|
|
if "error" in result:
|
|
raise RuntimeError(f"SQL error: {result['error']}")
|
|
return result.get("rows", [])
|
|
if "rows" in data:
|
|
return data["rows"]
|
|
return data
|
|
|
|
|
|
price_cache = {}
|
|
|
|
def get_price(ticker, date_str):
|
|
key = (ticker, date_str)
|
|
if key in price_cache:
|
|
return price_cache[key]
|
|
|
|
try:
|
|
dt = datetime.strptime(date_str, "%Y-%m-%d")
|
|
start = dt - timedelta(days=5)
|
|
end = dt + timedelta(days=5)
|
|
|
|
hist = yf.Ticker(ticker).history(
|
|
start=start.strftime("%Y-%m-%d"),
|
|
end=end.strftime("%Y-%m-%d"),
|
|
auto_adjust=True
|
|
)
|
|
|
|
if hist.empty:
|
|
price_cache[key] = None
|
|
return None
|
|
|
|
# nearest trading day on or after date
|
|
hist.index = hist.index.tz_localize(None) if hist.index.tzinfo else hist.index
|
|
target = pd.Timestamp(dt)
|
|
after = hist[hist.index >= target]
|
|
if after.empty:
|
|
after = hist
|
|
price = float(after["Close"].iloc[0])
|
|
price_cache[key] = price
|
|
return price
|
|
|
|
except Exception:
|
|
price_cache[key] = None
|
|
return None
|
|
|
|
|
|
def add_trading_days(date_str, n):
|
|
dt = datetime.strptime(date_str, "%Y-%m-%d")
|
|
count = 0
|
|
while count < n:
|
|
dt += timedelta(days=1)
|
|
if dt.weekday() < 5: # mon-fri
|
|
count += 1
|
|
return dt.strftime("%Y-%m-%d")
|
|
|
|
|
|
def main():
|
|
print("Fetching predictions from remote...")
|
|
preds = fetch_predictions()
|
|
print(f"Got {len(preds)} predictions\n")
|
|
|
|
results = []
|
|
skipped = 0
|
|
|
|
for i, p in enumerate(preds):
|
|
if i > 0 and i % 50 == 0:
|
|
print(f" Progress: {i}/{len(preds)} — skipped so far: {skipped}")
|
|
|
|
ticker = p["ticker"]
|
|
event_date = p["event_date"][:10] # trim time if present
|
|
direction = p["direction"]
|
|
|
|
date_5d = add_trading_days(event_date, 5)
|
|
date_10d = add_trading_days(event_date, 10)
|
|
date_20d = add_trading_days(event_date, 20)
|
|
|
|
price_0 = get_price(ticker, event_date)
|
|
if price_0 is None:
|
|
skipped += 1
|
|
continue
|
|
|
|
price_5 = get_price(ticker, date_5d)
|
|
price_10 = get_price(ticker, date_10d)
|
|
price_20 = get_price(ticker, date_20d)
|
|
|
|
def ret(px):
|
|
if px is None:
|
|
return None
|
|
return (px - price_0) / price_0 * 100
|
|
|
|
r5 = ret(price_5)
|
|
r10 = ret(price_10)
|
|
r20 = ret(price_20)
|
|
|
|
def correct(r):
|
|
if r is None:
|
|
return None
|
|
if direction == "positive":
|
|
return r > 0
|
|
else:
|
|
return r < 0
|
|
|
|
results.append({
|
|
"id": p["id"],
|
|
"ticker": ticker,
|
|
"name": p["name"],
|
|
"event_date": event_date,
|
|
"direction": direction,
|
|
"magnitude": p.get("magnitude", ""),
|
|
"timeframe": p.get("timeframe", ""),
|
|
"rationale": p.get("rationale", ""),
|
|
"price_0": round(price_0, 4),
|
|
"price_5d": round(price_5, 4) if price_5 else None,
|
|
"price_10d": round(price_10, 4) if price_10 else None,
|
|
"price_20d": round(price_20, 4) if price_20 else None,
|
|
"5d_return": round(r5, 4) if r5 is not None else None,
|
|
"10d_return": round(r10, 4) if r10 is not None else None,
|
|
"20d_return": round(r20, 4) if r20 is not None else None,
|
|
"correct_5d": correct(r5),
|
|
"correct_10d": correct(r10),
|
|
"correct_20d": correct(r20),
|
|
})
|
|
|
|
df = pd.DataFrame(results)
|
|
|
|
print(f"\n{'='*60}")
|
|
print("BACKTEST RESULTS")
|
|
print(f"{'='*60}")
|
|
print(f"Total predictions fetched: {len(preds)}")
|
|
print(f"Skipped (no price data): {skipped}")
|
|
print(f"Evaluated: {len(df)}")
|
|
print(f"Random baseline: 50.0%")
|
|
print()
|
|
|
|
|
|
def acc(col):
|
|
sub = df[df[col].notna()]
|
|
if len(sub) == 0:
|
|
return 0, 0
|
|
pct = sub[col].mean() * 100
|
|
return pct, len(sub)
|
|
|
|
a5, n5 = acc("correct_5d")
|
|
a10, n10 = acc("correct_10d")
|
|
a20, n20 = acc("correct_20d")
|
|
|
|
print("OVERALL DIRECTIONAL ACCURACY")
|
|
print(f" 5-day: {a5:.1f}% (n={n5})")
|
|
print(f" 10-day: {a10:.1f}% (n={n10})")
|
|
print(f" 20-day: {a20:.1f}% (n={n20})")
|
|
print()
|
|
|
|
# by magnitude
|
|
print("BY MAGNITUDE (10-day accuracy)")
|
|
for mag in sorted(df["magnitude"].dropna().unique()):
|
|
sub = df[(df["magnitude"] == mag) & df["correct_10d"].notna()]
|
|
if len(sub) == 0:
|
|
continue
|
|
pct = sub["correct_10d"].mean() * 100
|
|
print(f" {mag:<12} {pct:.1f}% (n={len(sub)})")
|
|
print()
|
|
|
|
# by direction
|
|
print("BY DIRECTION (10-day accuracy)")
|
|
for d in ["bull", "bear"]:
|
|
sub = df[(df["direction"] == d) & df["correct_10d"].notna()]
|
|
if len(sub) == 0:
|
|
continue
|
|
pct = sub["correct_10d"].mean() * 100
|
|
print(f" {d:<8} {pct:.1f}% (n={len(sub)})")
|
|
print()
|
|
|
|
# by timeframe
|
|
print("BY TIMEFRAME (10-day accuracy)")
|
|
for tf in sorted(df["timeframe"].dropna().unique()):
|
|
sub = df[(df["timeframe"] == tf) & df["correct_10d"].notna()]
|
|
if len(sub) == 0:
|
|
continue
|
|
pct = sub["correct_10d"].mean() * 100
|
|
print(f" {tf:<12} {pct:.1f}% (n={len(sub)})")
|
|
print()
|
|
|
|
# sample table
|
|
sample = df[df["correct_10d"].notna()].head(30)
|
|
print("SAMPLE (30 predictions)")
|
|
print(f"{'Company':<12} {'Date':<12} {'Dir':<5} {'Mag':<8} {'5d%':>7} {'10d%':>7} {'20d%':>7} Correct@10d")
|
|
print("-" * 75)
|
|
for _, row in sample.iterrows():
|
|
r5s = f"{row['5d_return']:+.2f}" if row['5d_return'] is not None else "N/A"
|
|
r10s = f"{row['10d_return']:+.2f}" if row['10d_return'] is not None else "N/A"
|
|
r20s = f"{row['20d_return']:+.2f}" if row['20d_return'] is not None else "N/A"
|
|
ok = "YES" if row["correct_10d"] else "NO"
|
|
name_short = str(row["ticker"])[:11]
|
|
print(f"{name_short:<12} {row['event_date']:<12} {row['direction']:<5} {str(row['magnitude']):<8} {r5s:>7} {r10s:>7} {r20s:>7} {ok}")
|
|
|
|
df.to_csv("backtest_results.csv", index=False)
|
|
print(f"\nFull results saved to backtest_results.csv ({len(df)} rows)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |