278 lines
8.2 KiB
Python
278 lines
8.2 KiB
Python
import requests
|
|
import json
|
|
import yfinance as yf
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
|
|
BASE_URL = "https://duriin.imbenji.net"
|
|
USERNAME = "admin"
|
|
PASSWORD = "changeme"
|
|
|
|
PRIVATE_KEYWORDS = {
|
|
"OPENAI", "ANTHROPIC", "SPACEX", "BYTEDANCE", "HUAWEI", "DEEPSEEK",
|
|
"MISTRAL", "COHERE", "GROQ", "INFLECTION", "STABILITY", "SCALEAI",
|
|
"MCKINSEY", "DELOITTE", "BLUEORIGIN", "XAI", "HF", "RF", "DJI",
|
|
"CBRS", "PRIVATE", "ORIGIN"
|
|
}
|
|
|
|
# tickers that need an exchange suffix appended
|
|
EXCHANGE_MAP = {
|
|
"005930": "005930.KS",
|
|
"000660": "000660.KS",
|
|
"002594": "002594.SZ",
|
|
"688981": "688981.SS",
|
|
"1810": "1810.HK",
|
|
"0700": "0700.HK",
|
|
"9984": "9984.T",
|
|
"AIR": "AIR.PA",
|
|
"DTE": "DTE.DE",
|
|
"HSBA": "HSBA.L",
|
|
"RHM": "RHM.DE",
|
|
"VOW": "VOW.DE",
|
|
}
|
|
|
|
SQL_QUERY = """
|
|
SELECT
|
|
ep.id,
|
|
ep.event_date,
|
|
ep.direction,
|
|
ep.magnitude,
|
|
ep.timeframe,
|
|
tc.name,
|
|
tc.ticker
|
|
FROM event_predictions ep
|
|
JOIN tracked_companies tc ON ep.company_id = tc.id
|
|
WHERE ep.event_date >= '2020-01-01'
|
|
AND ep.event_date <= '2026-01-01'
|
|
AND ep.direction IN ('positive', 'negative')
|
|
ORDER BY ep.event_date DESC
|
|
LIMIT 1000
|
|
"""
|
|
|
|
|
|
def fetch_predictions():
|
|
resp = requests.post(
|
|
f"{BASE_URL}/admin/api/sql",
|
|
json={"sql": SQL_QUERY, "database": "intelligence"},
|
|
auth=(USERNAME, PASSWORD),
|
|
timeout=30
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
result = data["results"][0]
|
|
if "error" in result:
|
|
raise RuntimeError(f"SQL error: {result['error']}")
|
|
return result.get("rows", [])
|
|
|
|
|
|
def resolve_ticker(raw):
|
|
t = raw.strip()
|
|
|
|
# already has a dot suffix (e.g. AIR.PA, 005930.KS)
|
|
if "." in t:
|
|
return t
|
|
|
|
# explicit overrides
|
|
if t in EXCHANGE_MAP:
|
|
return EXCHANGE_MAP[t]
|
|
|
|
return t
|
|
|
|
|
|
def is_private(ticker, name):
|
|
t = ticker.upper()
|
|
n = (name or "").upper()
|
|
for kw in PRIVATE_KEYWORDS:
|
|
if kw in t or kw in n:
|
|
return True
|
|
# looks like a private-company placeholder (long all-caps word, no digits)
|
|
if len(t) > 6 and t.isalpha():
|
|
return True
|
|
return False
|
|
|
|
|
|
price_cache = {}
|
|
|
|
|
|
def get_price(ticker, date_str):
|
|
key = (ticker, date_str)
|
|
if key in price_cache:
|
|
return price_cache[key]
|
|
|
|
try:
|
|
dt = datetime.strptime(date_str, "%Y-%m-%d")
|
|
start = (dt - timedelta(days=7)).strftime("%Y-%m-%d")
|
|
end = (dt + timedelta(days=7)).strftime("%Y-%m-%d")
|
|
|
|
hist = yf.Ticker(ticker).history(start=start, end=end, auto_adjust=True)
|
|
|
|
if hist.empty:
|
|
price_cache[key] = None
|
|
return None
|
|
|
|
hist.index = hist.index.tz_localize(None) if hist.index.tzinfo else hist.index
|
|
target = pd.Timestamp(dt)
|
|
after = hist[hist.index >= target]
|
|
if after.empty:
|
|
after = hist
|
|
price = float(after["Close"].iloc[0])
|
|
price_cache[key] = price
|
|
return price
|
|
|
|
except Exception:
|
|
price_cache[key] = None
|
|
return None
|
|
|
|
|
|
def add_trading_days(date_str, n):
|
|
dt = datetime.strptime(date_str, "%Y-%m-%d")
|
|
count = 0
|
|
while count < n:
|
|
dt += timedelta(days=1)
|
|
if dt.weekday() < 5:
|
|
count += 1
|
|
return dt.strftime("%Y-%m-%d")
|
|
|
|
|
|
def main():
|
|
print("Fetching predictions from remote...")
|
|
raw_preds = fetch_predictions()
|
|
print(f"Got {len(raw_preds)} predictions from DB")
|
|
|
|
# filter private companies
|
|
preds = []
|
|
for p in raw_preds:
|
|
if is_private(p["ticker"], p.get("name", "")):
|
|
continue
|
|
preds.append(p)
|
|
|
|
print(f"After filtering private companies: {len(preds)}\n")
|
|
|
|
results = []
|
|
skipped = 0
|
|
|
|
for i, p in enumerate(preds):
|
|
if i > 0 and i % 50 == 0:
|
|
print(f" Progress: {i}/{len(preds)} — evaluated: {len(results)}, skipped: {skipped}")
|
|
|
|
raw_ticker = p["ticker"]
|
|
ticker = resolve_ticker(raw_ticker)
|
|
event_date = p["event_date"][:10]
|
|
direction = p["direction"]
|
|
|
|
date_5d = add_trading_days(event_date, 5)
|
|
date_10d = add_trading_days(event_date, 10)
|
|
date_20d = add_trading_days(event_date, 20)
|
|
|
|
price_0 = get_price(ticker, event_date)
|
|
if price_0 is None:
|
|
skipped += 1
|
|
continue
|
|
|
|
price_5 = get_price(ticker, date_5d)
|
|
price_10 = get_price(ticker, date_10d)
|
|
price_20 = get_price(ticker, date_20d)
|
|
|
|
def ret(px):
|
|
if px is None:
|
|
return None
|
|
return (px - price_0) / price_0 * 100
|
|
|
|
r5 = ret(price_5)
|
|
r10 = ret(price_10)
|
|
r20 = ret(price_20)
|
|
|
|
def correct(r):
|
|
if r is None:
|
|
return None
|
|
return (r > 0) if direction == "positive" else (r < 0)
|
|
|
|
results.append({
|
|
"id": p["id"],
|
|
"ticker": ticker,
|
|
"name": p["name"],
|
|
"event_date": event_date,
|
|
"direction": direction,
|
|
"magnitude": p.get("magnitude", ""),
|
|
"timeframe": p.get("timeframe", ""),
|
|
"price_0": round(price_0, 4),
|
|
"price_5d": round(price_5, 4) if price_5 else None,
|
|
"price_10d": round(price_10, 4) if price_10 else None,
|
|
"price_20d": round(price_20, 4) if price_20 else None,
|
|
"5d_return": round(r5, 4) if r5 is not None else None,
|
|
"10d_return": round(r10, 4) if r10 is not None else None,
|
|
"20d_return": round(r20, 4) if r20 is not None else None,
|
|
"correct_5d": correct(r5),
|
|
"correct_10d": correct(r10),
|
|
"correct_20d": correct(r20),
|
|
})
|
|
|
|
df = pd.DataFrame(results)
|
|
|
|
print(f"\n{'='*62}")
|
|
print("BACKTEST RESULTS — ALL TRACKED COMPANIES")
|
|
print(f"{'='*62}")
|
|
print(f"Total predictions fetched: {len(raw_preds)}")
|
|
print(f"After private-company filter: {len(preds)}")
|
|
print(f"Skipped (no price data): {skipped}")
|
|
print(f"Evaluated: {len(df)}")
|
|
print(f"Random baseline: 50.0%")
|
|
print()
|
|
|
|
def acc(col):
|
|
sub = df[df[col].notna()]
|
|
if len(sub) == 0:
|
|
return 0.0, 0
|
|
return sub[col].mean() * 100, len(sub)
|
|
|
|
a5, n5 = acc("correct_5d")
|
|
a10, n10 = acc("correct_10d")
|
|
a20, n20 = acc("correct_20d")
|
|
|
|
print("OVERALL DIRECTIONAL ACCURACY")
|
|
print(f" 5-day: {a5:.1f}% (n={n5})")
|
|
print(f" 10-day: {a10:.1f}% (n={n10})")
|
|
print(f" 20-day: {a20:.1f}% (n={n20})")
|
|
print()
|
|
|
|
print("BY MAGNITUDE (10-day accuracy)")
|
|
for mag in ["low", "medium", "high"]:
|
|
sub = df[(df["magnitude"] == mag) & df["correct_10d"].notna()]
|
|
if len(sub) == 0:
|
|
continue
|
|
print(f" {mag:<10} {sub['correct_10d'].mean()*100:.1f}% (n={len(sub)})")
|
|
print()
|
|
|
|
print("BY DIRECTION (10-day accuracy)")
|
|
for d in ["positive", "negative"]:
|
|
sub = df[(df["direction"] == d) & df["correct_10d"].notna()]
|
|
if len(sub) == 0:
|
|
continue
|
|
print(f" {d:<12} {sub['correct_10d'].mean()*100:.1f}% (n={len(sub)})")
|
|
print()
|
|
|
|
print("BY TIMEFRAME (10-day accuracy)")
|
|
for tf in ["short", "medium", "long"]:
|
|
sub = df[(df["timeframe"] == tf) & df["correct_10d"].notna()]
|
|
if len(sub) == 0:
|
|
continue
|
|
print(f" {tf:<10} {sub['correct_10d'].mean()*100:.1f}% (n={len(sub)})")
|
|
print()
|
|
|
|
sample = df[df["correct_10d"].notna()].head(30)
|
|
print("SAMPLE (30 most recent predictions)")
|
|
print(f"{'Ticker':<12} {'Date':<12} {'Dir':<10} {'Mag':<8} {'5d%':>7} {'10d%':>7} {'20d%':>7} @10d")
|
|
print("-" * 72)
|
|
for _, row in sample.iterrows():
|
|
r5s = f"{row['5d_return']:+.2f}" if row['5d_return'] is not None else "N/A"
|
|
r10s = f"{row['10d_return']:+.2f}" if row['10d_return'] is not None else "N/A"
|
|
r20s = f"{row['20d_return']:+.2f}" if row['20d_return'] is not None else "N/A"
|
|
ok = "YES" if row["correct_10d"] else "NO"
|
|
print(f"{str(row['ticker']):<12} {row['event_date']:<12} {row['direction']:<10} {str(row['magnitude']):<8} {r5s:>7} {r10s:>7} {r20s:>7} {ok}")
|
|
|
|
df.to_csv("backtest_results_full.csv", index=False)
|
|
print(f"\nFull results saved to backtest_results_full.csv ({len(df)} rows)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|