from typing import List, Optional from datetime import datetime, timedelta from sqlalchemy.orm import Session from sqlalchemy import and_ import pandas as pd import logging from app.models import KlineDaily, KlineWeekly, KlineIntraday, ContractInfo from app.services.datasource.manager import DataSourceManager from app.database import SessionLocal logger = logging.getLogger(__name__) class KlineService: """K线数据服务:负责从数据源拉取数据、存储到数据库、查询缓存""" def __init__(self): self.manager = DataSourceManager() def _ensure_source(self): """确保数据源已加载""" source = self.manager.get_primary_source() if not source: raise Exception("没有可用的数据源,请先在管理后台配置并启用数据源") return source # ========== 同步数据 ========== def sync_daily( self, symbol: str, start_date: str, end_date: str ) -> int: """同步日K线数据到数据库""" logger.info(f"[同步日K线] 开始同步 symbol={symbol}, start_date={start_date}, end_date={end_date}") source = self._ensure_source() logger.info(f"[同步日K线] 使用数据源: {source.name}") df = source.get_kline_daily(symbol, start_date, end_date) logger.info(f"[同步日K线] 从数据源获取到 {len(df)} 条记录") if df.empty: logger.warning(f"[同步日K线] 数据源返回空数据,symbol={symbol}") return 0 db = SessionLocal() count = 0 try: for _, row in df.iterrows(): kline = db.query(KlineDaily).filter( and_( KlineDaily.symbol == symbol, KlineDaily.trade_date == row["trade_date"] ) ).first() if kline: kline.open = row.get("open") kline.high = row.get("high") kline.low = row.get("low") kline.close = row.get("close") kline.volume = row.get("volume") kline.turnover = row.get("turnover") kline.open_interest = row.get("open_interest") kline.settle = row.get("settle") kline.pre_settle = row.get("pre_settle") kline.updated_at = datetime.utcnow() else: kline = KlineDaily( symbol=symbol, trade_date=row["trade_date"], open=row.get("open"), high=row.get("high"), low=row.get("low"), close=row.get("close"), volume=row.get("volume"), turnover=row.get("turnover"), open_interest=row.get("open_interest"), settle=row.get("settle"), pre_settle=row.get("pre_settle"), ) db.add(kline) count += 1 db.commit() logger.info(f"[同步日K线] 成功同步 {count} 条记录到数据库") except Exception as e: db.rollback() logger.error(f"[同步日K线] 同步失败: {e}", exc_info=True) raise finally: db.close() return count def sync_weekly( self, symbol: str, start_date: str, end_date: str ) -> int: """同步周K线数据""" source = self._ensure_source() df = source.get_kline_weekly(symbol, start_date, end_date) if df.empty: return 0 db = SessionLocal() count = 0 try: for _, row in df.iterrows(): kline = db.query(KlineWeekly).filter( and_( KlineWeekly.symbol == symbol, KlineWeekly.trade_date == row["trade_date"] ) ).first() if kline: kline.open = row.get("open") kline.high = row.get("high") kline.low = row.get("low") kline.close = row.get("close") kline.volume = row.get("volume") kline.turnover = row.get("turnover") kline.open_interest = row.get("open_interest") kline.updated_at = datetime.utcnow() else: kline = KlineWeekly( symbol=symbol, trade_date=row["trade_date"], open=row.get("open"), high=row.get("high"), low=row.get("low"), close=row.get("close"), volume=row.get("volume"), turnover=row.get("turnover"), open_interest=row.get("open_interest"), ) db.add(kline) count += 1 db.commit() except Exception: db.rollback() raise finally: db.close() return count def sync_intraday( self, symbol: str, period: str, start_date: str, end_date: str ) -> int: """同步分钟级K线数据""" source = self._ensure_source() df = source.get_kline_intraday(symbol, period, start_date, end_date) if df.empty: return 0 db = SessionLocal() count = 0 try: for _, row in df.iterrows(): kline = db.query(KlineIntraday).filter( and_( KlineIntraday.symbol == symbol, KlineIntraday.period == period, KlineIntraday.trade_time == row["trade_time"] ) ).first() if kline: kline.open = row.get("open") kline.high = row.get("high") kline.low = row.get("low") kline.close = row.get("close") kline.volume = row.get("volume") kline.turnover = row.get("turnover") kline.open_interest = row.get("open_interest") kline.updated_at = datetime.utcnow() else: kline = KlineIntraday( symbol=symbol, period=period, trade_time=row["trade_time"], open=row.get("open"), high=row.get("high"), low=row.get("low"), close=row.get("close"), volume=row.get("volume"), turnover=row.get("turnover"), open_interest=row.get("open_interest"), ) db.add(kline) count += 1 db.commit() except Exception: db.rollback() raise finally: db.close() return count # ========== 查询数据 ========== def get_kline( self, symbol: str, period: str, start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 500 ) -> List[dict]: """查询K线数据(优先查库,如果数据库没有数据则自动同步)""" logger.info(f"[查询K线] 开始查询 symbol={symbol}, period={period}, start_date={start_date}, end_date={end_date}, limit={limit}") db = SessionLocal() try: if period == "daily": items = self._query_daily(db, symbol, start_date, end_date, limit) elif period == "weekly": items = self._query_weekly(db, symbol, start_date, end_date, limit) else: items = self._query_intraday(db, symbol, period, start_date, end_date, limit) # 如果数据库中没有数据,自动同步 if len(items) == 0: logger.info(f"[查询K线] 数据库中没有 {symbol} 的 {period} K线数据,开始自动同步") try: sync_start = start_date or "2020-01-01" sync_end = end_date or datetime.now().strftime("%Y-%m-%d") logger.info(f"[查询K线] 自动同步日期范围: {sync_start} ~ {sync_end}") if period == "daily": count = self.sync_daily(symbol, sync_start, sync_end) elif period == "weekly": count = self.sync_weekly(symbol, sync_start, sync_end) else: count = self.sync_intraday(symbol, period, sync_start, sync_end) if count > 0: logger.info(f"[查询K线] 自动同步成功,共同步 {count} 条记录,重新查询数据库") # 重新查询数据库获取同步后的数据 if period == "daily": items = self._query_daily(db, symbol, start_date, end_date, limit) elif period == "weekly": items = self._query_weekly(db, symbol, start_date, end_date, limit) else: items = self._query_intraday(db, symbol, period, start_date, end_date, limit) else: logger.warning(f"[查询K线] 自动同步完成,但数据源返回空数据") except Exception as e: logger.error(f"[查询K线] 自动同步失败: {e}", exc_info=True) # 同步失败不影响查询,继续返回空结果 return items finally: db.close() def _query_daily(self, db: Session, symbol: str, start_date: str, end_date: str, limit: int) -> List[dict]: logger.info(f"[查询日K线] symbol={symbol}, start_date={start_date}, end_date={end_date}, limit={limit}") query = db.query(KlineDaily).filter(KlineDaily.symbol == symbol) if start_date: query = query.filter(KlineDaily.trade_date >= start_date) if end_date: query = query.filter(KlineDaily.trade_date <= end_date) query = query.order_by(KlineDaily.trade_date.desc()).limit(limit) items = query.all() logger.info(f"[查询日K线] 从数据库查询到 {len(items)} 条记录") return [ { "trade_time": item.trade_date, "open": item.open, "high": item.high, "low": item.low, "close": item.close, "volume": item.volume, "turnover": item.turnover, "open_interest": item.open_interest, } for item in items ] def _query_weekly(self, db: Session, symbol: str, start_date: str, end_date: str, limit: int) -> List[dict]: query = db.query(KlineWeekly).filter(KlineWeekly.symbol == symbol) if start_date: query = query.filter(KlineWeekly.trade_date >= start_date) if end_date: query = query.filter(KlineWeekly.trade_date <= end_date) query = query.order_by(KlineWeekly.trade_date.desc()).limit(limit) items = query.all() return [ { "trade_time": item.trade_date, "open": item.open, "high": item.high, "low": item.low, "close": item.close, "volume": item.volume, "turnover": item.turnover, "open_interest": item.open_interest, } for item in items ] def _query_intraday(self, db: Session, symbol: str, period: str, start_date: str, end_date: str, limit: int) -> List[dict]: query = db.query(KlineIntraday).filter( and_( KlineIntraday.symbol == symbol, KlineIntraday.period == period ) ) if start_date: query = query.filter(KlineIntraday.trade_time >= start_date) if end_date: query = query.filter(KlineIntraday.trade_time <= end_date) query = query.order_by(KlineIntraday.trade_time.desc()).limit(limit) items = query.all() return [ { "trade_time": item.trade_time, "open": item.open, "high": item.high, "low": item.low, "close": item.close, "volume": item.volume, "turnover": item.turnover, "open_interest": item.open_interest, } for item in items ] kline_service = KlineService()