"""AKShare数据源适配器 - 使用新浪接口""" import asyncio import time from datetime import datetime from typing import List, Optional import akshare as ak import pandas as pd from app.adapters.base import ( DataSourceAdapter, TickData, KLineData, SymbolInfo, TradeCalData, TickCallback ) from app.core.logger import info, error, warning class AKShareAdapter(DataSourceAdapter): """AKShare数据源适配器 - 使用新浪接口""" def __init__(self): self.config = {} self._connected = False self._max_retries = 3 self._retry_delay = 2 # 秒 async def connect(self, config: dict) -> None: """建立连接(AKShare无需认证)""" self.config = config self._connected = True info("AKShare adapter connected (Sina API)") async def subscribe_ticks(self, symbols: List[str], callback: TickCallback) -> None: """订阅实时Tick(AKShare不支持实时推送)""" raise NotImplementedError("AKShare does not support real-time tick subscription") async def fetch_klines( self, symbol: str, start: str, end: str, freq: str ) -> List[KLineData]: info(f"Fetching KLines from Sina for {symbol} [{freq}] from {start} to {end}") """拉取历史K线""" # 判断是股票还是期货 if ".SH" in symbol or ".SZ" in symbol or ".BJ" in symbol: return await self._fetch_stock_klines(symbol, start, end, freq) elif "." in symbol: # 期货格式: CU2504.SHFE return await self._fetch_futures_klines(symbol, start, end, freq) else: raise ValueError(f"Unknown symbol format: {symbol}") async def _fetch_stock_klines( self, symbol: str, start: str, end: str, freq: str ) -> List[KLineData]: """获取股票K线 - 使用新浪接口""" # 转换symbol格式: 000001.SZ -> sz000001 ts_code = self._normalize_stock_symbol(symbol) if freq in ["1d", "day", "D", ""]: return await self._fetch_stock_daily_sina(ts_code, symbol, start, end) elif freq in ["1m", "5m", "15m", "30m", "60m"]: return await self._fetch_stock_minute_sina(ts_code, symbol, start, end, freq) else: raise ValueError(f"Unsupported frequency: {freq}") def _normalize_stock_symbol(self, symbol: str) -> str: """转换股票代码格式: 000001.SZ -> sz000001""" if "." in symbol: code, exchange = symbol.split(".") exchange_map = { "SH": "sh", "SZ": "sz", "BJ": "bj" } return exchange_map.get(exchange, "sz") + code return symbol def _denormalize_stock_symbol(self, symbol: str) -> str: """还原股票代码格式: sz000001 -> 000001.SZ""" if symbol.startswith("sh"): return symbol[2:] + ".SH" elif symbol.startswith("sz"): return symbol[2:] + ".SZ" elif symbol.startswith("bj"): return symbol[2:] + ".BJ" return symbol async def _fetch_with_retry(self, func, *args, **kwargs): """带重试机制的调用""" last_exception = None for attempt in range(self._max_retries): try: loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) except Exception as e: last_exception = e error_msg = str(e).lower() # 检查是否是可重试的错误 if any(x in error_msg for x in ['connection', 'timeout', 'remote', 'reset', 'closed']): if attempt < self._max_retries - 1: warning(f"Sina API request failed (attempt {attempt + 1}/{self._max_retries}): {e}") await asyncio.sleep(self._retry_delay * (attempt + 1)) # 指数退避 continue # 不可重试的错误,直接抛出 raise raise last_exception async def _fetch_stock_daily_sina( self, ts_code: str, original_symbol: str, start_date: str, end_date: str ) -> List[KLineData]: """获取股票日线 - 使用新浪接口""" try: # 新浪接口获取历史数据 # 使用 stock_zh_a_daily 接口(新浪) df = await self._fetch_with_retry( ak.stock_zh_a_daily, symbol=ts_code, start_date=start_date, end_date=end_date, adjust="qfq" # 前复权 ) if df is None or df.empty: warning(f"No data returned from Sina for {original_symbol}") return [] results = [] for _, row in df.iterrows(): # 新浪接口的字段名可能不同 trade_date = datetime.strptime(str(row['date']), "%Y-%m-%d") results.append(KLineData( symbol=original_symbol, time=int(trade_date.timestamp()), open=float(row['open']), high=float(row['high']), low=float(row['low']), close=float(row['close']), volume=int(row['volume']), amount=float(row.get('amount', 0)) )) info(f"Fetched {len(results)} daily klines from Sina for {original_symbol}") return results except Exception as e: error(f"Failed to fetch stock daily from Sina for {original_symbol}: {e}") # 新浪接口失败时返回空列表 return [] async def _fetch_stock_minute_sina( self, ts_code: str, original_symbol: str, start_date: str, end_date: str, freq: str ) -> List[KLineData]: """获取股票分钟线 - 使用新浪接口""" try: # 新浪分钟线接口 # 使用 stock_zh_a_minute 接口 df = await self._fetch_with_retry( ak.stock_zh_a_minute, symbol=ts_code, period=freq.replace("m", ""), # 1m -> 1 adjust="qfq" ) if df is None or df.empty: return [] # 过滤日期范围 df['date'] = pd.to_datetime(df['date']) start_dt = datetime.strptime(start_date, "%Y%m%d") end_dt = datetime.strptime(end_date, "%Y%m%d") df = df[(df['date'] >= start_dt) & (df['date'] <= end_dt)] results = [] for _, row in df.iterrows(): trade_time = datetime.strptime(str(row['date']), "%Y-%m-%d %H:%M:%S") results.append(KLineData( symbol=original_symbol, time=int(trade_time.timestamp()), open=float(row['open']), high=float(row['high']), low=float(row['low']), close=float(row['close']), volume=int(row['volume']), amount=float(row.get('amount', 0)) )) return results except Exception as e: error(f"Failed to fetch stock minute from Sina for {original_symbol}: {e}") return [] async def _fetch_futures_klines( self, symbol: str, start: str, end: str, freq: str ) -> List[KLineData]: """获取期货K线 - 使用新浪接口""" if freq in ["1d", "day", "D", ""]: return await self._fetch_futures_daily_sina(symbol, start, end) elif freq in ["1m", "5m", "15m", "30m", "60m"]: return await self._fetch_futures_minute_sina(symbol, start, end, freq) else: raise ValueError(f"Unsupported frequency: {freq}") async def _fetch_futures_daily_sina( self, symbol: str, start_date: str, end_date: str ) -> List[KLineData]: """获取期货日线 - 使用新浪接口""" try: # 解析合约代码: CU2504.SHFE -> cu2504 contract_code, exchange = symbol.split(".") contract_code = contract_code.lower() # 新浪期货历史行情接口 df = await self._fetch_with_retry( ak.futures_zh_daily, symbol=contract_code, start_date=start_date, end_date=end_date ) if df is None or df.empty: return [] results = [] for _, row in df.iterrows(): trade_date = datetime.strptime(str(row['date']), "%Y-%m-%d") results.append(KLineData( symbol=symbol, time=int(trade_date.timestamp()), open=float(row['open']), high=float(row['high']), low=float(row['low']), close=float(row['close']), volume=int(row['volume']), amount=float(row.get('amount', 0)), open_interest=int(row.get('hold', 0)) )) return results except Exception as e: error(f"Failed to fetch futures daily from Sina for {symbol}: {e}") return [] async def _fetch_futures_minute_sina( self, symbol: str, start_date: str, end_date: str, freq: str ) -> List[KLineData]: """获取期货分钟线 - 使用新浪接口""" try: # 解析合约代码 contract_code, exchange = symbol.split(".") contract_code = contract_code.lower() # 新浪期货分钟线接口 df = await self._fetch_with_retry( ak.futures_zh_minute_sina, symbol=contract_code, period=freq.replace("m", "") ) if df is None or df.empty: return [] # 过滤日期范围 df['datetime'] = pd.to_datetime(df['datetime']) start_dt = datetime.strptime(start_date, "%Y%m%d") end_dt = datetime.strptime(end_date, "%Y%m%d") df = df[(df['datetime'] >= start_dt) & (df['datetime'] <= end_dt)] results = [] for _, row in df.iterrows(): trade_time = row['datetime'] results.append(KLineData( symbol=symbol, time=int(trade_time.timestamp()), open=float(row['open']), high=float(row['high']), low=float(row['low']), close=float(row['close']), volume=int(row['volume']), amount=0, open_interest=0 )) return results except Exception as e: error(f"Failed to fetch futures minute from Sina for {symbol}: {e}") return [] async def fetch_symbols(self, asset_type: str) -> List[SymbolInfo]: """获取标的列表""" if asset_type == "stock": return await self._fetch_stock_symbols_sina() elif asset_type == "futures": return await self._fetch_futures_symbols_sina() else: raise ValueError(f"Unsupported asset type: {asset_type}") async def _fetch_stock_symbols_sina(self) -> List[SymbolInfo]: """获取A股股票列表 - 使用新浪接口""" try: # 新浪A股列表接口 df = await self._fetch_with_retry(ak.stock_zh_a_spot) if df is None or df.empty: return [] results = [] for _, row in df.iterrows(): # 新浪接口的代码格式 code = str(row['代码']) if code.startswith('6') or code.startswith('5') or code.startswith('9'): ts_code = f"{code}.SH" exchange = "SH" elif code.startswith('8') or code.startswith('4'): ts_code = f"{code}.BJ" exchange = "BJ" else: ts_code = f"{code}.SZ" exchange = "SZ" results.append(SymbolInfo( symbol_id=ts_code, name=str(row['名称']), exchange=exchange )) info(f"Fetched {len(results)} stock symbols from Sina") return results except Exception as e: error(f"Failed to fetch stock symbols from Sina: {e}") return [] async def _fetch_futures_symbols_sina(self) -> List[SymbolInfo]: """获取期货合约列表 - 使用新浪接口""" try: # 新浪期货列表接口 df = await self._fetch_with_retry(ak.futures_zh_realtime, subscribe_list=["0", "1", "2", "3"]) if df is None or df.empty: return [] results = [] for _, row in df.iterrows(): symbol = str(row['symbol']) underlying = ''.join([c for c in symbol if c.isalpha()]).upper() contract_month = ''.join([c for c in symbol if c.isdigit()]) exchange = self._get_futures_exchange(underlying) ts_code = f"{symbol.upper()}.{exchange}" results.append(SymbolInfo( symbol_id=ts_code, name=str(row.get('name', symbol)), exchange=exchange, underlying=underlying, contract_month=contract_month )) info(f"Fetched {len(results)} futures symbols from Sina") return results except Exception as e: error(f"Failed to fetch futures symbols from Sina: {e}") return [] def _get_futures_exchange(self, underlying: str) -> str: """根据品种代码判断交易所""" # 上海期货交易所 if underlying in ['CU', 'AL', 'ZN', 'PB', 'NI', 'SN', 'AU', 'AG', 'RB', 'HC', 'BU', 'RU', 'FU', 'SP', 'WR', 'SS', 'LU', 'NR']: return 'SHFE' # 大连商品交易所 elif underlying in ['A', 'B', 'M', 'Y', 'P', 'C', 'CS', 'JD', 'LH', 'JM', 'J', 'I', 'FB', 'BB', 'RR', 'PG', 'EB', 'EG', 'V', 'PP', 'L']: return 'DCE' # 郑州商品交易所 elif underlying in ['WH', 'PM', 'CF', 'SR', 'TA', 'OI', 'RI', 'MA', 'FG', 'RS', 'RM', 'JR', 'LR', 'SM', 'SF', 'CY', 'AP', 'CJ', 'UR', 'SA', 'PF', 'PK']: return 'CZCE' # 中国金融期货交易所 elif underlying in ['IF', 'IC', 'IH', 'T', 'TF', 'TS', 'IM']: return 'CFFEX' # 上海国际能源交易中心 elif underlying in ['SC', 'BC', 'EC']: return 'INE' else: return 'SHFE' # 默认上海 async def fetch_trading_calendar( self, exchange: str, start: str, end: str ) -> List[TradeCalData]: """获取交易日历 - 使用新浪接口""" try: # 新浪交易日历接口 df = await self._fetch_with_retry(ak.tool_trade_date_hist_sina) if df is None or df.empty: return [] # 过滤日期范围 df['trade_date'] = pd.to_datetime(df['trade_date']) start_dt = datetime.strptime(start, "%Y%m%d") end_dt = datetime.strptime(end, "%Y%m%d") df = df[(df['trade_date'] >= start_dt) & (df['trade_date'] <= end_dt)] results = [] for _, row in df.iterrows(): cal_date = row['trade_date'] results.append(TradeCalData( date=cal_date, is_trading_day=True )) return results except Exception as e: error(f"Failed to fetch trading calendar from Sina: {e}") return [] async def health_check(self) -> bool: """健康检查""" try: if not self._connected: return False # 尝试获取股票列表作为健康检查 df = await self._fetch_with_retry(ak.stock_zh_a_spot) return df is not None and not df.empty except Exception as e: error(f"Health check failed: {e}") return False async def close(self) -> None: """关闭连接""" self._connected = False info("AKShare adapter closed (Sina API)")