You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

841 lines
27 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
AmazingData 数据源适配器
基于银河证券星耀数智量化平台 SDK 的封装
提供统一、简洁的金融数据获取接口
"""
from calendar import calendar
import pandas as pd
from typing import List, Dict, Optional, Union, Tuple
from datetime import datetime, date
from dataclasses import dataclass
from enum import Enum
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SecurityType(Enum):
"""证券类型枚举"""
STOCK_A = "EXTRA_STOCK_A" # 沪深A股
STOCK_A_SH_SZ = "EXTRA_STOCK_A_SH_SZ" # 沪深A股沪深
INDEX_A = "EXTRA_INDEX_A" # 沪深指数
ETF = "EXTRA_ETF" # ETF
FUTURE = "EXTRA_FUTURE" # 期货
KZZ = "EXTRA_KZZ" # 可转债
GLRA = "EXTRA_GLRA" # 逆回购
HKT = "EXTRA_HKT" # 港股通
ETF_OP = "EXTRA_ETF_OP" # ETF期权
class Market(Enum):
"""市场枚举"""
SH = "SH" # 上海
SZ = "SZ" # 深圳
BJ = "BJ" # 北京
class Period(Enum):
"""周期枚举"""
MIN1 = "min1"
MIN3 = "min3"
MIN5 = "min5"
MIN10 = "min10"
MIN15 = "min15"
MIN30 = "min30"
MIN60 = "min60"
MIN120 = "min120"
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
SEASON = "season"
YEAR = "year"
@dataclass
class DataSourceConfig:
"""数据源配置"""
username: str
password: str
host: str
port: int
local_path: str = "./amazing_data_cache/"
use_local_cache: bool = True
class AmazingDataAdapter:
"""
AmazingData 数据源适配器
封装银河证券星耀数智 SDK提供统一的数据获取接口
"""
def __init__(self, config: DataSourceConfig):
"""
初始化适配器
Args:
config: 数据源配置
"""
self.config = config
self._ad = None
self._base_data = None
self._market_data = None
self._info_data = None
self._calendar = None
self._is_logged_in = False
def connect(self) -> bool:
"""
连接到数据源
Returns:
bool: 是否连接成功
"""
try:
import AmazingData as ad
self._ad = ad
# 登录
ad.login(
username=self.config.username,
password=self.config.password,
host=self.config.host,
port=self.config.port
)
# 初始化数据类
self._base_data = ad.BaseData()
self._info_data = ad.InfoData()
self._calendar = self._base_data.get_calendar()
self._market_data = ad.MarketData(self._calendar)
self._is_logged_in = True
logger.info("成功连接到 AmazingData 数据源")
return True
except Exception as e:
logger.error(f"连接失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self._is_logged_in and self._ad:
try:
self._ad.logout(self.config.username)
logger.info("已断开与 AmazingData 的连接")
except Exception as e:
logger.warning(f"断开连接时出错: {e}")
self._is_logged_in = False
# ==================== 基础数据接口 ====================
def get_code_list(self, security_type: SecurityType = SecurityType.STOCK_A) -> List[str]:
"""
获取代码列表
Args:
security_type: 证券类型
Returns:
证券代码列表
"""
self._check_login()
if security_type == SecurityType.FUTURE:
return self._base_data.get_future_code_list(security_type=security_type.value)
elif security_type == SecurityType.ETF_OP:
return self._base_data.get_option_code_list(security_type=security_type.value)
else:
return self._base_data.get_code_list(security_type=security_type.value)
def get_code_info(self, security_type: SecurityType = SecurityType.STOCK_A) -> pd.DataFrame:
"""
获取证券信息
Args:
security_type: 证券类型
Returns:
DataFrame 包含证券基本信息
"""
self._check_login()
return self._base_data.get_code_info(security_type=security_type.value)
def get_trading_calendar(self, market: Market = Market.SH) -> List[int]:
"""
获取交易日历
Args:
market: 市场
Returns:
交易日列表 (YYYYMMDD 格式)
"""
self._check_login()
return self._base_data.get_calendar(market=market.value)
def get_adj_factor(self, codes: List[str],
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取复权因子(单次复权)
Args:
codes: 股票代码列表
is_local: 是否使用本地缓存
Returns:
DataFrame (index: 日期, columns: 股票代码)
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._base_data.get_adj_factor(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local
)
def get_backward_factor(self, codes: List[str],
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取后复权因子
Args:
codes: 股票代码列表
is_local: 是否使用本地缓存
Returns:
DataFrame (index: 日期, columns: 股票代码)
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._base_data.get_backward_factor(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local
)
# ==================== 历史行情数据接口 ====================
def get_kline(self,
codes: Union[str, List[str]],
start_date: Union[str, int, date],
end_date: Union[str, int, date],
period: Period = Period.DAILY) -> Dict[str, pd.DataFrame]:
"""
获取历史K线数据
Args:
codes: 证券代码或代码列表
start_date: 开始日期
end_date: 结束日期
period: K线周期
Returns:
Dict[代码, DataFrame]DataFrame包含OHLCV等字段
"""
self._check_login()
if isinstance(codes, str):
codes = [codes]
start_date = self._format_date(start_date)
end_date = self._format_date(end_date)
print(f"正在获取K线数据: 代码={codes}, 日期范围={start_date}~{end_date}, 周期={period.value}")
# 获取K线数据
kline_dict = self._market_data.query_kline(
code_list=codes,
begin_date=20240302,
end_date=20240306,
period=self._ad.constant.Period.day.value)
print(f"成功获取K线数据: {len(kline_dict)} 只证券")
print(f"示例数据:\n{kline_dict[codes[0]].head()}")
return kline_dict
def get_snapshot(self,
codes: Union[str, List[str]],
start_date: Union[str, int, date],
end_date: Union[str, int, date]) -> Dict[str, pd.DataFrame]:
"""
获取历史快照数据tick级别
Args:
codes: 证券代码或代码列表
start_date: 开始日期
end_date: 结束日期
Returns:
Dict[代码, DataFrame]
"""
self._check_login()
if isinstance(codes, str):
codes = [codes]
start_date = self._format_date(start_date)
end_date = self._format_date(end_date)
snapshot_dict = self._market_data.query_snapshot(
code_list=codes,
begin_date=start_date,
end_date=end_date
)
return snapshot_dict
# ==================== 财务数据接口 ====================
def get_balance_sheet(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取资产负债表
Args:
codes: 股票代码列表
start_date: 开始报告期
end_date: 结束报告期
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
return self._get_financial_data(
'get_balance_sheet', codes, start_date, end_date, is_local
)
def get_cash_flow(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取现金流量表
Args:
codes: 股票代码列表
start_date: 开始报告期
end_date: 结束报告期
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
return self._get_financial_data(
'get_cash_flow', codes, start_date, end_date, is_local
)
def get_income_statement(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取利润表
Args:
codes: 股票代码列表
start_date: 开始报告期
end_date: 结束报告期
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
return self._get_financial_data(
'get_income', codes, start_date, end_date, is_local
)
def get_profit_express(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取业绩快报
Args:
codes: 股票代码列表
start_date: 开始报告期
end_date: 结束报告期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_profit_express(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_profit_notice(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取业绩预告
Args:
codes: 股票代码列表
start_date: 开始报告期
end_date: 结束报告期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_profit_notice(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 股东股本数据接口 ====================
def get_top10_shareholders(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取十大股东数据
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_share_holder(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_shareholder_count(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取股东户数数据
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_holder_num(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_equity_structure(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取股本结构数据
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_equity_structure(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 融资融券数据接口 ====================
def get_margin_summary(self,
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取融资融券成交汇总
Args:
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_margin_summary(
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_margin_detail(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取融资融券交易明细
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_margin_detail(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 交易异动数据接口 ====================
def get_longhu_bang(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取龙虎榜数据
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_long_hu_bang(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
def get_block_trading(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> pd.DataFrame:
"""
获取大宗交易数据
Args:
codes: 股票代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
DataFrame
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_block_trading(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 指数数据接口 ====================
def get_index_constituents(self,
codes: List[str],
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取指数成分股
Args:
codes: 指数代码列表
is_local: 是否使用本地缓存
Returns:
Dict[指数代码, DataFrame]
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_index_constituent(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local
)
def get_index_weights(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取指数成分股权重
支持上证50(000016.SH)、沪深300(000300.SH)、中证500(000905.SH)、
中证800(000906.SH)、中证1000(000852.SH)
Args:
codes: 指数代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
Dict[指数代码, DataFrame]
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_index_weight(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== ETF数据接口 ====================
def get_etf_pcf(self, codes: List[str]) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""
获取ETF申赎数据
Args:
codes: ETF代码列表
Returns:
(etf_info, etf_constituents)
"""
self._check_login()
return self._base_data.get_etf_pcf(code_list=codes)
def get_fund_share(self,
codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取基金份额数据
Args:
codes: ETF代码列表
start_date: 开始日期
end_date: 结束日期
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_fund_share(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 可转债数据接口 ====================
def get_kzz_issuance(self,
codes: List[str],
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""
获取可转债发行数据
Args:
codes: 可转债代码列表
is_local: 是否使用本地缓存
Returns:
Dict[代码, DataFrame]
"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
return self._info_data.get_kzz_issuance(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local
)
# ==================== 辅助方法 ====================
def _check_login(self):
"""检查是否已登录"""
if not self._is_logged_in:
raise RuntimeError("未连接到数据源,请先调用 connect()")
def _format_date(self, d: Union[str, int, date]) -> int:
"""统一日期格式为 YYYYMMDD"""
if isinstance(d, int):
return d
elif isinstance(d, str):
return int(d.replace("-", "").replace("/", ""))
elif isinstance(d, date):
return int(d.strftime("%Y%m%d"))
else:
raise ValueError(f"不支持的日期格式: {d}")
def _get_financial_data(self, method: str, codes: List[str],
start_date: Optional[Union[str, int, date]] = None,
end_date: Optional[Union[str, int, date]] = None,
is_local: Optional[bool] = None) -> Dict[str, pd.DataFrame]:
"""通用财务数据获取方法"""
self._check_login()
is_local = is_local if is_local is not None else self.config.use_local_cache
method = getattr(self._info_data, method)
return method(
code_list=codes,
local_path=self.config.local_path,
is_local=is_local,
begin_date=self._format_date(start_date) if start_date else None,
end_date=self._format_date(end_date) if end_date else None
)
# ==================== 便捷函数 ====================
def create_adapter(username: str, password: str, host: str, port: int,
local_path: str = "./amazing_data_cache/",
use_local_cache: bool = True) -> AmazingDataAdapter:
"""
快速创建适配器实例
Args:
username: 用户名
password: 密码
host: 服务器地址
port: 服务器端口
local_path: 本地缓存路径
use_local_cache: 是否使用本地缓存
Returns:
AmazingDataAdapter 实例
"""
config = DataSourceConfig(
username=username,
password=password,
host=host,
port=port,
local_path=local_path,
use_local_cache=use_local_cache
)
return AmazingDataAdapter(config)
# ==================== 使用示例 ====================
if __name__ == "__main__":
# 示例代码
print("""
# 使用示例:
# 1. 创建适配器
adapter = create_adapter(
username='your_username',
password='your_password',
host='your_host',
port=your_port
)
# 2. 连接数据源
if adapter.connect():
# 3. 获取沪深A股代码列表
codes = adapter.get_code_list(SecurityType.STOCK_A)
print(f"获取到 {len(codes)} 只股票")
# 4. 获取历史K线数据
kline_data = adapter.get_kline(
codes=['000001.SZ', '600000.SH'],
start_date='20240101',
end_date='20241231',
period=Period.DAILY
)
# 5. 获取财务数据
balance_sheet = adapter.get_balance_sheet(
codes=['000001.SZ', '600000.SH'],
start_date='20240101',
end_date='20241231'
)
# 6. 获取指数成分股
constituents = adapter.get_index_constituents(['000300.SH']) # 沪深300
# 7. 断开连接
adapter.disconnect()
""")