""" 期货智析接口 - 提供期货分析数据 """ import json import logging from pathlib import Path from typing import Optional from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from app.database import get_db from app.analysis_db import get_analysis_db from app.analysis_models import FuturesAnalysis, WatchedSymbol, AIModelConfig, AnalysisSettings from app.services.cache import get_cached_data, get_latest_cached logger = logging.getLogger(__name__) router = APIRouter(prefix="/futures", tags=["期货智析"]) CONFIG_DIR = Path(__file__).resolve().parent.parent.parent / "config" SYMBOLS_CONFIG_FILE = CONFIG_DIR / "symbols_config.json" def _load_symbols_config() -> dict: """加载品种配置文件""" if not SYMBOLS_CONFIG_FILE.exists(): return {"futures": {}, "stock": {}} with open(SYMBOLS_CONFIG_FILE, "r", encoding="utf-8") as f: return json.load(f) @router.get("/list") def get_futures_list(db: Session = Depends(get_db)): """获取所有期货品种列表及摘要信息(从symbols_config.json读取)""" config = _load_symbols_config() futures_config = config.get("futures", {}) if not futures_config: return {"success": True, "data": []} futures_data = [] for name, symbol_code in futures_config.items(): cached = get_cached_data(db, symbol_code, "futures") if cached and cached.get("timeframes"): all_candles = [] for period, candles in cached.get("timeframes", {}).items(): all_candles.extend(candles) if all_candles: latest_candle = all_candles[-1] open_price = float(latest_candle.get("open", 0)) close_price = float(latest_candle.get("close", 0)) high_price = float(latest_candle.get("high", 0)) low_price = float(latest_candle.get("low", 0)) change = close_price - open_price change_pct = (change / open_price * 100) if open_price > 0 else 0 futures_data.append({ "symbol": symbol_code, "name": name, "price": close_price, "change": round(change, 2), "changePct": round(change_pct, 2), "suggestion": _get_suggestion(close_price, open_price, change_pct), "suggestionType": "up" if change >= 0 else "down", "periods": _get_period_trends(all_candles), "successRate": _calc_success_rate(all_candles), "trendScore": _calc_trend_score(all_candles), "resistance": round(2 * ((high_price + low_price + close_price) / 3) - low_price, 2), "support": round(2 * ((high_price + low_price + close_price) / 3) - high_price, 2), "open": open_price, "high": high_price, "low": low_price, "volume": sum(float(c.get("volume", 0)) for c in all_candles) }) else: futures_data.append({ "symbol": symbol_code, "name": name, "price": 0, "change": 0, "changePct": 0, "suggestion": "等待数据", "suggestionType": "neutral", "periods": {"5": "neutral", "15": "neutral", "30": "neutral", "60": "neutral"}, "successRate": 0, "trendScore": 0, "resistance": 0, "support": 0, "open": 0, "high": 0, "low": 0, "volume": 0 }) return {"success": True, "data": futures_data} @router.get("/detail/{symbol}") def get_futures_detail(symbol: str, db: Session = Depends(get_db)): """获取指定期货品种的详细分析数据""" cached = get_cached_data(db, symbol, "futures") if not cached: raise HTTPException(status_code=404, detail=f"未找到 {symbol} 的缓存数据") all_candles = [] for period, candles in cached.get("timeframes", {}).items(): all_candles.extend(candles) if not all_candles: raise HTTPException(status_code=404, detail=f"未找到 {symbol} 的K线数据") latest_candle = all_candles[-1] open_price = float(latest_candle.get("open", 0)) close_price = float(latest_candle.get("close", 0)) high_price = float(latest_candle.get("high", 0)) low_price = float(latest_candle.get("low", 0)) change = close_price - open_price change_pct = (change / open_price * 100) if open_price > 0 else 0 # Pivot Point 公式计算关键点位 pp = (high_price + low_price + close_price) / 3 r1 = round(2 * pp - low_price, 2) r2 = round(pp + (high_price - low_price), 2) s1 = round(2 * pp - high_price, 2) s2 = round(pp - (high_price - low_price), 2) suggestion = _get_suggestion(close_price, open_price, change_pct) suggestion_type = "up" if change >= 0 else "down" trend_score = _calc_trend_score(all_candles) data = { "symbol": symbol, "name": _get_futures_name(symbol), "price": close_price, "change": round(change, 2), "changePct": round(change_pct, 2), "suggestion": suggestion, "suggestionType": suggestion_type, "suggestionReason": _get_suggestion_reason(symbol, suggestion), "open": open_price, "high": high_price, "low": low_price, "volume": sum(float(c.get("volume", 0)) for c in all_candles), "entryPrice": round(close_price * 0.995, 2) if change >= 0 else round(close_price * 1.005, 2), "targetPrice": r1 if change >= 0 else s1, "stopLoss": s1 if change >= 0 else r1, "riskLevel": "低" if trend_score >= 80 else "中" if trend_score >= 60 else "高", "macd": _calc_macd(all_candles), "rsi": _calc_rsi(all_candles), "boll": _calc_boll(all_candles), "kdj": _calc_kdj(all_candles), "resistances": [r1, r2], "supports": [s1, s2], "pivotPoint": round(pp, 2), "periodConsistency": _get_period_trends(all_candles) } return {"success": True, "data": data} @router.get("/kline/{symbol}") def get_kline_data(symbol: str, period: str = "15", db: Session = Depends(get_db)): """获取指定品种和周期的K线数据""" period_map = { "5": "5min", "15": "15min", "30": "30min", "60": "60min", "1440": "daily", "daily": "daily" } db_period = period_map.get(period, f"{period}min") cached = get_cached_data(db, symbol, "futures", [db_period]) if not cached or not cached.get("timeframes"): raise HTTPException(status_code=404, detail=f"未找到 {symbol} {db_period} 的缓存数据") candles = cached["timeframes"].get(db_period, []) kline_data = [] for c in candles: time_str = c.get("datetime", c.get("time", "")) if time_str and len(time_str) >= 16: time_str = time_str[:16].replace("T", " ") kline_data.append([ time_str, str(c.get("open", 0)), str(c.get("close", 0)), str(c.get("low", 0)), str(c.get("high", 0)), str(int(c.get("volume", 0))) ]) return {"success": True, "data": kline_data} def _get_futures_name(symbol: str) -> str: """根据合约代码获取品种名称""" name_map = { "AU": "黄金", "AG": "白银", "CU": "铜", "AL": "铝", "ZN": "锌", "NI": "镍", "SN": "锡", "PB": "铅", "RB": "螺纹钢", "HC": "热卷", "I": "铁矿石", "J": "焦炭", "JM": "焦煤", "ZC": "动力煤", "MA": "甲醇", "TA": "PTA", "EG": "乙二醇", "PP": "聚丙烯", "L": "塑料", "V": "PVC", "M": "豆粕", "RM": "菜粕", "C": "玉米", "CS": "淀粉", "A": "豆一", "B": "豆二", "Y": "豆油", "P": "棕榈油", "OI": "菜油", "CF": "棉花", "SR": "白糖", "AP": "苹果", "JD": "鸡蛋", "LH": "生猪", "FU": "燃料油", "LU": "低硫燃油", "SC": "原油", "EC": "集运指数", "BU": "沥青", "RU": "橡胶", "NR": "20号胶", "SP": "纸浆", "SS": "不锈钢", "SA": "纯碱", "FG": "玻璃", "UR": "尿素", "SF": "硅铁", "SM": "锰硅", "IF": "沪深300", "IC": "中证500", "IH": "上证50", "IM": "中证1000", "T": "10年期国债", "TF": "5年期国债", "TS": "2年期国债", "TL": "30年期国债", } return name_map.get(symbol, symbol) def _get_suggestion(close: float, open: float, change_pct: float) -> str: """根据价格走势给出操作建议""" if change_pct > 2: return "逢低做多" elif change_pct > 0.5: return "逢低做多" elif change_pct > -0.5: return "观望等待" elif change_pct > -2: return "逢高做空" else: return "逢高做空" def _get_suggestion_reason(symbol: str, suggestion: str) -> str: """获取建议理由""" reasons = { "逢低做多": "技术面突破,趋势明确,建议逢低介入", "逢高做空": "技术面走弱,下行压力增大", "观望等待": "多空力量均衡,等待方向明确" } return reasons.get(suggestion, "等待进一步信号") def _get_period_trends(candles: list) -> dict: """计算各周期趋势 - 根据不同周期取不同长度的K线计算""" period_config = { "5": {"bars": 10, "threshold": 0.003}, "15": {"bars": 15, "threshold": 0.005}, "30": {"bars": 20, "threshold": 0.008}, "60": {"bars": 30, "threshold": 0.01} } result = {} for period, cfg in period_config.items(): bars = cfg["bars"] threshold = cfg["threshold"] if len(candles) < bars: result[period] = "neutral" continue recent = candles[-bars:] first_close = float(recent[0].get("close", 0)) last_close = float(recent[-1].get("close", 0)) if first_close <= 0: result[period] = "neutral" continue change_pct = (last_close - first_close) / first_close if change_pct > threshold: result[period] = "up" elif change_pct < -threshold: result[period] = "down" else: result[period] = "neutral" return result def _calc_success_rate(candles: list) -> int: """计算交易成功率(简化版)""" if len(candles) < 10: return 50 wins = 0 for i in range(1, len(candles)): prev_close = float(candles[i-1].get("close", 0)) curr_close = float(candles[i].get("close", 0)) if curr_close >= prev_close: wins += 1 return int(wins / (len(candles) - 1) * 100) def _calc_trend_score(candles: list) -> int: """计算趋势评分(0-100)""" if len(candles) < 5: return 50 recent = candles[-10:] closes = [float(c.get("close", 0)) for c in recent] if len(closes) < 2: return 50 up_count = sum(1 for i in range(1, len(closes)) if closes[i] >= closes[i-1]) score = int(up_count / (len(closes) - 1) * 100) return max(0, min(100, score)) def _calc_ema(data: list, period: int) -> list: """计算EMA,返回与输入等长的列表,前面用None填充""" ema = [None] * len(data) multiplier = 2 / (period + 1) if len(data) < period: return ema ema[period - 1] = sum(data[:period]) / period for i in range(period, len(data)): ema[i] = (data[i] - ema[i-1]) * multiplier + ema[i-1] return ema def _calc_macd(candles: list) -> dict: """计算MACD指标""" if len(candles) < 26: return {"signal": "中性", "detail": "数据不足"} closes = [float(c.get("close", 0)) for c in candles] ema12 = _calc_ema(closes, 12) ema26 = _calc_ema(closes, 26) dif_list = [] for i in range(len(closes)): if ema12[i] is not None and ema26[i] is not None: dif_list.append(ema12[i] - ema26[i]) else: dif_list.append(None) # 只对有效DIF值计算DEA,避免None替换为0导致计算错误 dif_valid = [d for d in dif_list if d is not None] if dif_valid: dea_valid = _calc_ema(dif_valid, 9) dea_list = [None] * (len(dif_list) - len(dif_valid)) + dea_valid else: dea_list = [None] * len(dif_list) dif = dif_list[-1] dea = dea_list[-1] if dif is not None and dea is not None: if dif > dea: signal = "金叉" elif dif < dea: signal = "死叉" else: signal = "中性" else: signal = "中性" return {"signal": signal, "detail": f"DIF: {dif:.4f}"} def _calc_rsi(candles: list) -> dict: """计算RSI指标""" if len(candles) < 15: return {"value": 50, "status": "正常"} closes = [float(c.get("close", 0)) for c in candles[-15:]] gains = [] losses = [] for i in range(1, len(closes)): diff = closes[i] - closes[i-1] gains.append(max(0, diff)) losses.append(max(0, -diff)) avg_gain = sum(gains) / len(gains) if gains else 0 avg_loss = sum(losses) / len(losses) if losses else 0 if avg_loss == 0: rsi = 100 else: rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) rsi = int(rsi) if rsi > 70: status = "超买" elif rsi < 30: status = "超卖" else: status = "正常" return {"value": rsi, "status": status} def _calc_boll(candles: list) -> dict: """计算布林带""" if len(candles) < 20: return {"signal": "中轨", "detail": "区间: --"} closes = [float(c.get("close", 0)) for c in candles[-20:]] ma = sum(closes) / len(closes) std = (sum((c - ma) ** 2 for c in closes) / len(closes)) ** 0.5 upper = ma + 2 * std lower = ma - 2 * std current = closes[-1] if current > upper: signal = "上轨外" elif current < lower: signal = "下轨外" elif current > ma: signal = "中轨上" else: signal = "中轨" return {"signal": signal, "detail": f"区间: {lower:.0f}-{upper:.0f}"} def _calc_kdj(candles: list) -> dict: """计算KDJ指标""" if len(candles) < 9: return {"signal": "中性", "detail": "K: -- D: --"} highs = [float(c.get("high", 0)) for c in candles[-9:]] lows = [float(c.get("low", 0)) for c in candles[-9:]] closes = [float(c.get("close", 0)) for c in candles[-9:]] highest = max(highs) lowest = min(lows) current = closes[-1] if highest == lowest: rsv = 50 else: rsv = (current - lowest) / (highest - lowest) * 100 k = int(rsv * 2 / 3 + 50 / 3) d = int(k * 2 / 3 + 50 / 3) if k > d: signal = "偏多" elif k < d: signal = "偏空" else: signal = "中性" return {"signal": signal, "detail": f"K: {k} D: {d}"} # ==================== 期货智析数据管理接口 ==================== @router.get("/analysis/history/{symbol}") def get_analysis_history(symbol: str, limit: int = 10, adb: Session = Depends(get_analysis_db)): """获取品种历史分析记录""" records = adb.query(FuturesAnalysis).filter( FuturesAnalysis.symbol == symbol ).order_by( FuturesAnalysis.analysis_time.desc() ).limit(limit).all() return { "success": True, "data": [{ "id": r.id, "symbol": r.symbol, "analysis_time": r.analysis_time.isoformat(), "suggestion": r.suggestion, "suggestion_type": r.suggestion_type, "trend_score": r.trend_score, "entry_price": r.entry_price, "target_price": r.target_price, "stop_loss": r.stop_loss, "risk_level": r.risk_level } for r in records] } @router.post("/analysis/save") def save_analysis_record(data: dict, adb: Session = Depends(get_analysis_db)): """保存分析记录到数据库""" try: record = FuturesAnalysis( symbol=data.get("symbol"), suggestion=data.get("suggestion"), suggestion_type=data.get("suggestion_type"), entry_price=data.get("entry_price"), target_price=data.get("target_price"), stop_loss=data.get("stop_loss"), risk_level=data.get("risk_level"), macd_signal=data.get("macd", {}).get("signal") if data.get("macd") else None, rsi_value=data.get("rsi", {}).get("value") if data.get("rsi") else None, boll_signal=data.get("boll", {}).get("signal") if data.get("boll") else None, kdj_signal=data.get("kdj", {}).get("signal") if data.get("kdj") else None, trend_score=data.get("trend_score"), success_rate=data.get("success_rate"), resistance_levels=data.get("resistances"), support_levels=data.get("supports"), period_trends=data.get("periodConsistency") ) adb.add(record) adb.commit() return {"success": True, "message": "分析记录已保存", "id": record.id} except Exception as e: adb.rollback() logger.error(f"保存分析记录失败: {e}") return {"success": False, "message": str(e)} # ==================== 关注品种管理 ==================== @router.get("/watched") def get_watched_symbols(adb: Session = Depends(get_analysis_db)): """获取关注的品种列表""" symbols = adb.query(WatchedSymbol).order_by(WatchedSymbol.created_at.desc()).all() return { "success": True, "data": [{ "id": s.id, "symbol": s.symbol, "name": s.name, "note": s.note, "created_at": s.created_at.isoformat() } for s in symbols] } @router.post("/watched") def add_watched_symbol(data: dict, adb: Session = Depends(get_analysis_db)): """添加关注品种""" try: symbol = data.get("symbol") existing = adb.query(WatchedSymbol).filter(WatchedSymbol.symbol == symbol).first() if existing: return {"success": False, "message": "该品种已关注"} new_symbol = WatchedSymbol( symbol=symbol, name=data.get("name"), note=data.get("note") ) adb.add(new_symbol) adb.commit() return {"success": True, "message": "已添加关注", "id": new_symbol.id} except Exception as e: adb.rollback() return {"success": False, "message": str(e)} @router.delete("/watched/{symbol}") def remove_watched_symbol(symbol: str, adb: Session = Depends(get_analysis_db)): """取消关注品种""" try: record = adb.query(WatchedSymbol).filter(WatchedSymbol.symbol == symbol).first() if not record: return {"success": False, "message": "未找到该品种"} adb.delete(record) adb.commit() return {"success": True, "message": "已取消关注"} except Exception as e: adb.rollback() return {"success": False, "message": str(e)} # ==================== AI模型配置管理 ==================== @router.get("/ai-models") def get_ai_models(adb: Session = Depends(get_analysis_db)): """获取AI模型配置列表""" models = adb.query(AIModelConfig).order_by(AIModelConfig.created_at.desc()).all() settings = adb.query(AnalysisSettings).filter( AnalysisSettings.key == "analysis_settings" ).first() return { "success": True, "data": { "models": [{ "id": m.id, "provider": m.provider, "model_name": m.model_name, "api_base": m.api_base, "model_id": m.model_id, "temperature": m.temperature, "max_tokens": m.max_tokens, "enabled": m.enabled, "is_active": m.is_active, "created_at": m.created_at.isoformat() } for m in models], "analysis_settings": settings.value if settings else { "enable_technical_analysis": True, "enable_fundamental_analysis": False, "enable_sentiment_analysis": False, "risk_tolerance": "medium", "max_position_pct": 10 } } } @router.post("/ai-models") def save_ai_model(data: dict, adb: Session = Depends(get_analysis_db)): """保存AI模型配置""" try: if data.get("action") == "save_settings": settings = adb.query(AnalysisSettings).filter( AnalysisSettings.key == "analysis_settings" ).first() if settings: settings.value = data.get("settings", {}) else: settings = AnalysisSettings( key="analysis_settings", value=data.get("settings", {}) ) adb.add(settings) adb.commit() return {"success": True, "message": "分析设置已保存"} model_data = data.get("model", {}) model = AIModelConfig( provider=model_data.get("provider", "custom"), model_name=model_data.get("model_name", ""), api_key=model_data.get("api_key", ""), api_base=model_data.get("api_base"), model_id=model_data.get("model_id"), temperature=model_data.get("temperature", 0.7), max_tokens=model_data.get("max_tokens", 2000), enabled=model_data.get("enabled", True), is_active=model_data.get("is_active", False) ) if model.is_active: adb.query(AIModelConfig).update({"is_active": False}) adb.add(model) adb.commit() return {"success": True, "message": "AI模型已保存", "id": model.id} except Exception as e: adb.rollback() return {"success": False, "message": str(e)} @router.put("/ai-models/{model_id}") def update_ai_model(model_id: int, data: dict, adb: Session = Depends(get_analysis_db)): """更新AI模型配置""" try: model = adb.query(AIModelConfig).filter(AIModelConfig.id == model_id).first() if not model: return {"success": False, "message": "模型不存在"} if "is_active" in data and data["is_active"]: adb.query(AIModelConfig).update({"is_active": False}) model.is_active = True else: for key, value in data.items(): if hasattr(model, key): setattr(model, key, value) adb.commit() return {"success": True, "message": "模型已更新"} except Exception as e: adb.rollback() return {"success": False, "message": str(e)} @router.delete("/ai-models/{model_id}") def delete_ai_model(model_id: int, adb: Session = Depends(get_analysis_db)): """删除AI模型配置""" try: model = adb.query(AIModelConfig).filter(AIModelConfig.id == model_id).first() if not model: return {"success": False, "message": "模型不存在"} adb.delete(model) adb.commit() return {"success": True, "message": "模型已删除"} except Exception as e: adb.rollback() return {"success": False, "message": str(e)}