feat: 增加5分钟分析

master^2
Lxy 1 week ago
parent fb8c4f6587
commit 41a2bec6e3

@ -95,6 +95,7 @@ class AIAnalysisCache(AnalysisBase):
id = Column(Integer, primary_key=True, autoincrement=True) id = Column(Integer, primary_key=True, autoincrement=True)
symbol = Column(String(32), nullable=False, index=True, comment="品种合约代码") symbol = Column(String(32), nullable=False, index=True, comment="品种合约代码")
analysis_data = Column(JSON, nullable=False, comment="AI分析结果数据") analysis_data = Column(JSON, nullable=False, comment="AI分析结果数据")
kline_timestamp = Column(DateTime, nullable=True, comment="分析时K线数据的时间戳")
created_at = Column(DateTime, nullable=False, default=datetime.now, index=True, comment="分析时间") created_at = Column(DateTime, nullable=False, default=datetime.now, index=True, comment="分析时间")
def __repr__(self): def __repr__(self):

@ -818,11 +818,12 @@ def run_ai_analysis(symbol: str, db: Session = Depends(get_db), analysis_db: Ses
@router.get("/ai-analysis/{symbol}") @router.get("/ai-analysis/{symbol}")
def get_ai_analysis(symbol: str, force_refresh: bool = False, db: Session = Depends(get_db), analysis_db: Session = Depends(get_analysis_db)): def get_ai_analysis(symbol: str, force_refresh: bool = False, db: Session = Depends(get_db), analysis_db: Session = Depends(get_analysis_db)):
"""获取AI分析结果可选择是否强制刷新""" """获取AI分析结果智能判断是否需要重新分析"""
try: try:
analyzer = AIFuturesAnalyzer(db, analysis_db) analyzer = AIFuturesAnalyzer(db, analysis_db)
if force_refresh: if force_refresh:
logger.info(f"强制刷新: {symbol}")
result = analyzer.analyze(symbol) result = analyzer.analyze(symbol)
if result.get("success"): if result.get("success"):
return { return {
@ -836,8 +837,35 @@ def get_ai_analysis(symbol: str, force_refresh: bool = False, db: Session = Depe
"error": result.get("error", "AI分析失败") "error": result.get("error", "AI分析失败")
} }
# 获取最新缓存
cache = analyzer.get_latest_cache(symbol) cache = analyzer.get_latest_cache(symbol)
if cache: if cache:
# 智能判断是否需要重新分析
if analyzer.should_reanalyze(symbol, cache):
logger.info(f"检测到数据变化或超时,自动重新分析: {symbol}")
result = analyzer.analyze(symbol)
if result.get("success"):
return {
"success": True,
"data": result["data"],
"is_cached": False
}
else:
# 如果重新分析失败,返回旧缓存
logger.warning(f"重新分析失败,返回旧缓存: {symbol}")
return {
"success": True,
"data": {
"id": cache.id,
"symbol": cache.symbol,
"analysis_time": cache.created_at.isoformat(),
"result": cache.analysis_data
},
"is_cached": True,
"warning": "分析数据可能不是最新的"
}
# 返回缓存数据
return { return {
"success": True, "success": True,
"data": { "data": {
@ -849,6 +877,7 @@ def get_ai_analysis(symbol: str, force_refresh: bool = False, db: Session = Depe
"is_cached": True "is_cached": True
} }
# 没有缓存,执行分析
result = analyzer.analyze(symbol) result = analyzer.analyze(symbol)
if result.get("success"): if result.get("success"):
return { return {

@ -382,11 +382,12 @@ class AIFuturesAnalyzer:
logger.error(f"解析AI响应失败: {e}") logger.error(f"解析AI响应失败: {e}")
return None return None
def save_analysis_cache(self, symbol: str, analysis_data: Dict) -> AIAnalysisCache: def save_analysis_cache(self, symbol: str, analysis_data: Dict, kline_timestamp: datetime) -> AIAnalysisCache:
"""保存AI分析结果到缓存""" """保存AI分析结果到缓存"""
cache = AIAnalysisCache( cache = AIAnalysisCache(
symbol=symbol, symbol=symbol,
analysis_data=analysis_data, analysis_data=analysis_data,
kline_timestamp=kline_timestamp,
created_at=datetime.now() created_at=datetime.now()
) )
@ -402,10 +403,61 @@ class AIFuturesAnalyzer:
AIAnalysisCache.symbol == symbol AIAnalysisCache.symbol == symbol
).order_by(AIAnalysisCache.created_at.desc()).first() ).order_by(AIAnalysisCache.created_at.desc()).first()
def get_latest_kline_timestamp(self, symbol: str) -> Optional[datetime]:
"""获取当前K线数据的最新时间戳"""
cached_data = get_cached_data(
self.db,
symbol,
"futures",
["5min", "15min", "30min", "60min", "daily"]
)
if not cached_data or not cached_data.get("timeframes"):
return None
latest_time = None
for period, candles in cached_data.get("timeframes", {}).items():
if candles and len(candles) > 0:
last_candle = candles[-1]
time_str = last_candle.get("datetime", last_candle.get("time", ""))
if time_str:
try:
candle_time = datetime.fromisoformat(time_str.replace("Z", "+00:00"))
if latest_time is None or candle_time > latest_time:
latest_time = candle_time
except:
pass
return latest_time
def should_reanalyze(self, symbol: str, cache: AIAnalysisCache) -> bool:
"""判断是否需要重新分析"""
if not cache:
return True
# 1. 检查时间是否超过15分钟
time_since_analysis = (datetime.now() - cache.created_at).total_seconds()
if time_since_analysis > 900: # 15分钟 = 900秒
logger.info(f"分析时间已超过15分钟需要重新分析")
return True
# 2. 检查K线数据是否有变化
current_kline_time = self.get_latest_kline_timestamp(symbol)
if current_kline_time and cache.kline_timestamp:
if current_kline_time > cache.kline_timestamp:
logger.info(f"K线数据已更新当前: {current_kline_time}, 缓存: {cache.kline_timestamp}),需要重新分析")
return True
return False
def analyze(self, symbol: str) -> Dict: def analyze(self, symbol: str) -> Dict:
"""执行完整的AI分析流程""" """执行完整的AI分析流程"""
logger.info(f"===== 开始AI分析: {symbol} =====") logger.info(f"===== 开始AI分析: {symbol} =====")
# 获取当前K线数据的时间戳
kline_timestamp = self.get_latest_kline_timestamp(symbol)
logger.info(f"当前K线数据时间戳: {kline_timestamp}")
model = self.get_active_model() model = self.get_active_model()
if not model: if not model:
logger.error("未找到激活的AI模型配置") logger.error("未找到激活的AI模型配置")
@ -450,8 +502,8 @@ class AIFuturesAnalyzer:
logger.info(f"AI响应解析成功") logger.info(f"AI响应解析成功")
cache = self.save_analysis_cache(symbol, analysis_result) cache = self.save_analysis_cache(symbol, analysis_result, kline_timestamp)
logger.info(f"分析结果已保存到缓存ID: {cache.id}") logger.info(f"分析结果已保存到缓存ID: {cache.id}, K线时间戳: {kline_timestamp}")
logger.info(f"===== AI分析完成: {symbol} =====") logger.info(f"===== AI分析完成: {symbol} =====")
return { return {

Binary file not shown.

@ -0,0 +1,31 @@
"""
更新AI分析缓存表结构
添加 kline_timestamp 字段
"""
import sqlite3
from pathlib import Path
project_root = Path(__file__).parent
ANALYSIS_DB_PATH = project_root / "data" / "futures_analysis.db"
def add_kline_timestamp_column():
"""添加 kline_timestamp 列"""
conn = sqlite3.connect(str(ANALYSIS_DB_PATH))
cursor = conn.cursor()
# 检查列是否已存在
cursor.execute("PRAGMA table_info(ai_analysis_cache)")
columns = [col[1] for col in cursor.fetchall()]
if 'kline_timestamp' in columns:
print("✅ kline_timestamp 列已存在")
else:
print("添加 kline_timestamp 列...")
cursor.execute("ALTER TABLE ai_analysis_cache ADD COLUMN kline_timestamp DATETIME")
conn.commit()
print("✅ kline_timestamp 列添加成功")
conn.close()
if __name__ == "__main__":
add_kline_timestamp_column()
Loading…
Cancel
Save