""" AmazingData 数据源适配器 - 完整测试套件 每个接口都有独立的测试用例和入口 """ import sys import traceback from datetime import datetime from amazing_data_adapter import ( AmazingDataAdapter, DataSourceConfig, SecurityType, Market, Period, create_adapter ) # ============== 配置信息 ============== USERNAME = '11200008169' PASSWORD = '11200008169@2026' HOST = '140.206.44.234' PORT = 8600 LOCAL_PATH = './amazing_data_cache/' def create_test_adapter() -> AmazingDataAdapter: """创建测试用的适配器实例""" return create_adapter( username=USERNAME, password=PASSWORD, host=HOST, port=PORT, local_path=LOCAL_PATH, use_local_cache=True ) def run_test(test_name: str, test_func): """运行单个测试并输出结果""" print(f"\n{'='*60}") print(f"测试: {test_name}") print('='*60) adapter = create_test_adapter() try: if not adapter.connect(): print("❌ 连接失败") return False test_func(adapter) print("✅ 测试通过") return True except Exception as e: print(f"❌ 测试失败: {e}") traceback.print_exc() return False finally: adapter.disconnect() # ============== 基础数据接口测试 ============== def test_get_code_list(adapter: AmazingDataAdapter): """测试: 获取代码列表""" print("\n--- 测试获取各类证券代码列表 ---") # 沪深A股 stock_codes = adapter.get_code_list(SecurityType.STOCK_A) print(f"沪深A股数量: {len(stock_codes)}") if stock_codes: print(f"示例代码: {stock_codes[:5]}") # ETF etf_codes = adapter.get_code_list(SecurityType.ETF) print(f"ETF数量: {len(etf_codes)}") if etf_codes: print(f"示例代码: {etf_codes[:3]}") # 期货 future_codes = adapter.get_code_list(SecurityType.FUTURE) print(f"期货数量: {len(future_codes)}") if future_codes: print(f"示例代码: {future_codes[:3]}") # 可转债 kzz_codes = adapter.get_code_list(SecurityType.KZZ) print(f"可转债数量: {len(kzz_codes)}") if kzz_codes: print(f"示例代码: {kzz_codes[:3]}") # 指数 index_codes = adapter.get_code_list(SecurityType.INDEX_A) print(f"沪深指数数量: {len(index_codes)}") if index_codes: print(f"示例代码: {index_codes[:3]}") def test_get_code_info(adapter: AmazingDataAdapter): """测试: 获取证券信息""" print("\n--- 测试获取证券基本信息 ---") stock_info = adapter.get_code_info(SecurityType.STOCK_A) print(f"证券信息形状: {stock_info.shape}") print(f"列名: {stock_info.columns.tolist()[:10]}") print("\n前3条数据:") print(stock_info.head(3)) def test_get_trading_calendar(adapter: AmazingDataAdapter): """测试: 获取交易日历""" print("\n--- 测试获取交易日历 ---") # 上海市场 sh_calendar = adapter.get_trading_calendar(Market.SH) print(f"上海市场交易日数量: {len(sh_calendar)}") print(f"最近5个交易日: {sh_calendar[-5:]}") # 深圳市场 sz_calendar = adapter.get_trading_calendar(Market.SZ) print(f"深圳市场交易日数量: {len(sz_calendar)}") # 北京市场 bj_calendar = adapter.get_trading_calendar(Market.BJ) print(f"北京市场交易日数量: {len(bj_calendar)}") def test_get_adj_factor(adapter: AmazingDataAdapter): """测试: 获取复权因子""" print("\n--- 测试获取复权因子 ---") codes = ['000001.SZ', '600000.SH'] adj_factor = adapter.get_adj_factor(codes, is_local=False) print(f"复权因子形状: {adj_factor.shape}") print(f"列名: {adj_factor.columns.tolist()}") print("\n前5条数据:") print(adj_factor.head()) def test_get_backward_factor(adapter: AmazingDataAdapter): """测试: 获取后复权因子""" print("\n--- 测试获取后复权因子 ---") codes = ['000001.SZ', '600000.SH', '000858.SZ'] backward_factor = adapter.get_backward_factor(codes) print(f"后复权因子形状: {backward_factor.shape}") print(f"列名: {backward_factor.columns.tolist()}") print("\n前5条数据:") print(backward_factor.head()) # ============== 历史行情数据接口测试 ============== def test_get_kline(adapter: AmazingDataAdapter): """测试: 获取历史K线数据""" print("\n--- 测试获取历史K线数据 ---") # 获取沪深A股代码列表 stock_codes = adapter.get_code_list(SecurityType.STOCK_A) sample_codes = stock_codes[:3] # 日K线 print("\n>>> 日K线数据") kline_daily = adapter.get_kline( codes=sample_codes, start_date='20241201', end_date='20241231', period=Period.DAILY ) for code, df in list(kline_daily.items())[:2]: print(f"\n{code}:") print(f" 数据条数: {len(df)}") print(f" 列名: {df.columns.tolist()}") if not df.empty: print(f" 最新数据:\n{df.tail(2)}") # 分钟K线 print("\n>>> 60分钟K线数据") kline_min = adapter.get_kline( codes=['000001.SZ'], start_date='20241201', end_date='20241231', period=Period.MIN60 ) for code, df in kline_min.items(): print(f"\n{code}:") print(f" 数据条数: {len(df)}") if not df.empty: print(f" 前2条数据:\n{df.head(2)}") def test_get_snapshot(adapter: AmazingDataAdapter): """测试: 获取历史快照数据(tick级别)""" print("\n--- 测试获取历史快照数据 ---") snapshot_data = adapter.get_snapshot( codes=['000001.SZ'], start_date='20241201', end_date='20241201' # 只取一天的数据,避免数据量过大 ) print(f"快照数据形状: {snapshot_data}") for code, df in snapshot_data.items(): print(f"\n{code}:") print(f" 数据条数: {len(df)}") if not df.empty: print(f" 列名: {df.columns.tolist()[:15]}") # 显示前15列 print(f" 前3条数据:\n{df.head(3)}") else: print(" 数据为空") # ============== 财务数据接口测试 ============== def test_get_balance_sheet(adapter: AmazingDataAdapter): """测试: 获取资产负债表""" print("\n--- 测试获取资产负债表 ---") codes = ['000001.SZ', '600000.SH'] balance_sheet = adapter.get_balance_sheet( codes=codes, start_date=20250101, end_date=20260311 ) for code, df in balance_sheet.items(): print(f"\n{code}:") print(f" 数据条数: {len(df)}") if not df.empty: print(f" 列名(前10): {df.columns.tolist()[:10]}") print(f" 数据预览:\n{df.head(2)}") # 获取最后一个资产负债表并打印最近日期的 TOT_SHARE print("\n--- 最后一个资产负债表详情 ---") last_code = list(balance_sheet.keys())[-1] last_df = balance_sheet[last_code] if not last_df.empty: print(f"股票代码: {last_code}") print(f"资产负债表:\n{last_df}") if 'TOT_SHARE' in last_df.columns: # 按 REPORTING_PERIOD 降序排序,获取最近日期的一行 if 'REPORTING_PERIOD' in last_df.columns: # 输出日期和总股本对应关系 print("\n日期与总股本对应关系:") for idx, row in last_df.iterrows(): print(f" {row['REPORTING_PERIOD']}: {row['TOT_SHARE']}") # 获取最近日期的数据 latest_row = last_df.sort_values('REPORTING_PERIOD', ascending=False).iloc[0] latest_date = latest_row['REPORTING_PERIOD'] else: # 如果没有 REPORTING_PERIOD 列,取最后一行 print("\n日期与总股本对应关系:") for idx, row in last_df.iterrows(): print(f" {idx}: {row['TOT_SHARE']}") latest_row = last_df.iloc[-1] latest_date = latest_row.name if hasattr(latest_row, 'name') else '未知' latest_tot_share = latest_row['TOT_SHARE'] print(f"\n>>> 最近报告期 {latest_date} 的总股本: {latest_tot_share}") else: print(f"\n可用列: {last_df.columns.tolist()}") print("未找到 TOT_SHARE 列") else: print(f"{last_code} 的资产负债表为空") def test_get_cash_flow(adapter: AmazingDataAdapter): """测试: 获取现金流量表""" print("\n--- 测试获取现金流量表 ---") codes = ['000001.SZ', '600000.SH'] cash_flow = adapter.get_cash_flow( codes=codes, start_date=20240101, end_date=20241231 ) for code, df in cash_flow.items(): print(f"\n{code}:") print(f" 数据条数: {len(df)}") if not df.empty: print(f" 列名(前10): {df.columns.tolist()[:10]}") print(f" 数据预览:\n{df.head(2)}") def test_get_income_statement(adapter: AmazingDataAdapter): """测试: 获取利润表""" print("\n--- 测试获取利润表 ---") codes = ['000001.SZ', '600000.SH'] income = adapter.get_income_statement( codes=codes, start_date=20240101, end_date=20241231 ) for code, df in income.items(): print(f"\n{code}:") print(f" 数据条数: {len(df)}") if not df.empty: print(f" 列名(前10): {df.columns.tolist()[:10]}") print(f" 数据预览:\n{df.head(2)}") def test_get_profit_express(adapter: AmazingDataAdapter): """测试: 获取业绩快报""" print("\n--- 测试获取业绩快报 ---") codes = ['000001.SZ', '600000.SH', '000858.SZ'] profit_express = adapter.get_profit_express( codes=codes, start_date=20240101, end_date=20241231 ) print(f"数据形状: {profit_express.shape}") if not profit_express.empty: print(f"列名: {profit_express.columns.tolist()}") print(f"\n前5条数据:\n{profit_express.head()}") else: print("数据为空") def test_get_profit_notice(adapter: AmazingDataAdapter): """测试: 获取业绩预告""" print("\n--- 测试获取业绩预告 ---") codes = ['000001.SZ', '600000.SH'] profit_notice = adapter.get_profit_notice( codes=codes, start_date=20240101, end_date=20241231 ) print(f"数据形状: {profit_notice.shape}") if not profit_notice.empty: print(f"列名: {profit_notice.columns.tolist()}") print(f"\n前5条数据:\n{profit_notice.head()}") else: print("数据为空") # ============== 股东股本数据接口测试 ============== def test_get_top10_shareholders(adapter: AmazingDataAdapter): """测试: 获取十大股东数据""" print("\n--- 测试获取十大股东数据 ---") codes = ['000001.SZ', '600000.SH'] top10 = adapter.get_top10_shareholders( codes=codes, start_date=20240101, end_date=20241231 ) print(f"数据形状: {top10.shape}") if not top10.empty: print(f"列名: {top10.columns.tolist()}") print(f"\n前5条数据:\n{top10.head()}") else: print("数据为空") def test_get_shareholder_count(adapter: AmazingDataAdapter): """测试: 获取股东户数数据""" print("\n--- 测试获取股东户数数据 ---") codes = ['000001.SZ', '600000.SH'] holder_count = adapter.get_shareholder_count( codes=codes, start_date=20240101, end_date=20241231 ) print(f"数据形状: {holder_count.shape}") if not holder_count.empty: print(f"列名: {holder_count.columns.tolist()}") print(f"\n前5条数据:\n{holder_count.head()}") else: print("数据为空") def test_get_equity_structure(adapter: AmazingDataAdapter): """测试: 获取股本结构数据""" print("\n--- 测试获取股本结构数据 ---") codes = ['600000.SH'] equity = adapter.get_equity_structure( codes=codes, start_date=20250101, end_date=20251231 ) print(f"数据形状: {equity.shape}") if not equity.empty: print(f"列名: {equity.columns.tolist()}") print(f"\n前5条数据:\n{equity.head()}") # 按日期和 TOT_A_SHARE 输出 if 'TOT_A_SHARE' in equity.columns: print("\n日期与流通A股(TOT_A_SHARE)对应关系:") if 'REPORTING_PERIOD' in equity.columns: for idx, row in equity.iterrows(): print(f" {row['REPORTING_PERIOD']}: {row['TOT_A_SHARE']}") # 获取最近日期的数据 latest_row = equity.sort_values('REPORTING_PERIOD', ascending=False).iloc[0] latest_date = latest_row['REPORTING_PERIOD'] else: for idx, row in equity.iterrows(): print(f" {idx}: {row['TOT_A_SHARE']}") latest_row = equity.iloc[-1] latest_date = latest_row.name if hasattr(latest_row, 'name') else '未知' print(f"\n>>> 最近报告期 {latest_date} 的流通A股: {latest_row['TOT_A_SHARE']}") else: print(f"\n可用列: {equity.columns.tolist()}") print("未找到 TOT_A_SHARE 列") else: print("数据为空") # ============== 融资融券数据接口测试 ============== def test_get_margin_summary(adapter: AmazingDataAdapter): """测试: 获取融资融券成交汇总""" print("\n--- 测试获取融资融券成交汇总 ---") margin_summary = adapter.get_margin_summary( start_date=20240101, end_date=20241231 ) print(f"数据形状: {margin_summary.shape}") if not margin_summary.empty: print(f"列名: {margin_summary.columns.tolist()}") print(f"\n前5条数据:\n{margin_summary.head()}") else: print("数据为空") def test_get_margin_detail(adapter: AmazingDataAdapter): """测试: 获取融资融券交易明细""" print("\n--- 测试获取融资融券交易明细 ---") codes = ['000001.SZ', '600000.SH'] margin_detail = adapter.get_margin_detail( codes=codes, start_date=20241201, end_date=20241231 ) for code, df in margin_detail.items(): print(f"\n{code}:") print(f" 数据条数: {len(df)}") if not df.empty: print(f" 列名: {df.columns.tolist()[:10]}") print(f" 数据预览:\n{df.head(2)}") # ============== 交易异动数据接口测试 ============== def test_get_longhu_bang(adapter: AmazingDataAdapter): """测试: 获取龙虎榜数据""" print("\n--- 测试获取龙虎榜数据 ---") codes = ['000001.SZ', '600000.SH'] longhu = adapter.get_longhu_bang( codes=codes, start_date=20241201, end_date=20241231 ) print(f"数据形状: {longhu.shape}") if not longhu.empty: print(f"列名: {longhu.columns.tolist()}") print(f"\n前5条数据:\n{longhu.head()}") else: print("数据为空") def test_get_block_trading(adapter: AmazingDataAdapter): """测试: 获取大宗交易数据""" print("\n--- 测试获取大宗交易数据 ---") codes = ['000001.SZ', '600000.SH'] block_trade = adapter.get_block_trading( codes=codes, start_date=20241201, end_date=20241231 ) print(f"数据形状: {block_trade.shape}") if not block_trade.empty: print(f"列名: {block_trade.columns.tolist()}") print(f"\n前5条数据:\n{block_trade.head()}") else: print("数据为空") # ============== 指数数据接口测试 ============== def test_get_index_constituents(adapter: AmazingDataAdapter): """测试: 获取指数成分股""" print("\n--- 测试获取指数成分股 ---") index_codes = ['000300.SH', '000905.SH', '000016.SH'] constituents = adapter.get_index_constituents(index_codes) for code, df in constituents.items(): print(f"\n{code}:") print(f" 成分股数量: {len(df)}") if not df.empty: print(f" 列名: {df.columns.tolist()}") print(f" 前5只成分股:\n{df.head()}") def test_get_index_weights(adapter: AmazingDataAdapter): """测试: 获取指数成分股权重""" print("\n--- 测试获取指数成分股权重 ---") index_codes = ['000300.SH', '000905.SH'] weights = adapter.get_index_weights( codes=index_codes, start_date=20241201, end_date=20241231 ) for code, df in weights.items(): print(f"\n{code}:") print(f" 数据条数: {len(df)}") if not df.empty: print(f" 列名: {df.columns.tolist()}") print(f" 权重最大的5只股票:\n{df.nlargest(5, 'WEIGHT') if 'WEIGHT' in df.columns else df.head()}") # ============== ETF数据接口测试 ============== def test_get_etf_pcf(adapter: AmazingDataAdapter): """测试: 获取ETF申赎数据""" print("\n--- 测试获取ETF申赎数据 ---") etf_codes = ['510050.SH', '510300.SH'] etf_info, etf_constituents = adapter.get_etf_pcf(etf_codes) print(f"ETF基本信息形状: {etf_info.shape}") if not etf_info.empty: print(f"列名: {etf_info.columns.tolist()}") print(f"\nETF基本信息:\n{etf_info.head()}") print(f"\nETF成分股数据数量: {len(etf_constituents)}") for code, df in list(etf_constituents.items())[:2]: print(f"\n{code} 成分股:") print(f" 数据条数: {len(df)}") if not df.empty: print(f" 列名: {df.columns.tolist()[:10]}") print(f" 前3条:\n{df.head(3)}") def test_get_fund_share(adapter: AmazingDataAdapter): """测试: 获取基金份额数据""" print("\n--- 测试获取基金份额数据 ---") codes = ['510050.SH', '510300.SH'] fund_share = adapter.get_fund_share( codes=codes, start_date=20240101, end_date=20241231 ) for code, df in fund_share.items(): print(f"\n{code}:") print(f" 数据条数: {len(df)}") if not df.empty: print(f" 列名: {df.columns.tolist()}") print(f" 数据预览:\n{df.head()}") # ============== 可转债数据接口测试 ============== def test_get_kzz_issuance(adapter: AmazingDataAdapter): """测试: 获取可转债发行数据""" print("\n--- 测试获取可转债发行数据 ---") kzz_codes = adapter.get_code_list(SecurityType.KZZ) print(f"可转债数量: {len(kzz_codes)}") if kzz_codes: sample_codes = kzz_codes[:5] print(f"测试代码: {sample_codes}") kzz_issuance = adapter.get_kzz_issuance(sample_codes) for code, df in list(kzz_issuance.items())[:3]: print(f"\n{code}:") print(f" 数据条数: {len(df)}") if not df.empty: print(f" 列名: {df.columns.tolist()[:10]}") print(f" 数据预览:\n{df.head()}") # ============== 测试入口 ============== # 所有测试用例映射 TEST_CASES = { # 基础数据接口 (5个) 'code_list': test_get_code_list, 'code_info': test_get_code_info, 'trading_calendar': test_get_trading_calendar, 'adj_factor': test_get_adj_factor, 'backward_factor': test_get_backward_factor, # 历史行情数据接口 (2个) 'kline': test_get_kline, 'snapshot': test_get_snapshot, # 财务数据接口 (5个) 'balance_sheet': test_get_balance_sheet, 'cash_flow': test_get_cash_flow, 'income_statement': test_get_income_statement, 'profit_express': test_get_profit_express, 'profit_notice': test_get_profit_notice, # 股东股本数据接口 (3个) 'top10_shareholders': test_get_top10_shareholders, 'shareholder_count': test_get_shareholder_count, 'equity_structure': test_get_equity_structure, # 融资融券数据接口 (2个) 'margin_summary': test_get_margin_summary, 'margin_detail': test_get_margin_detail, # 交易异动数据接口 (2个) 'longhu_bang': test_get_longhu_bang, 'block_trading': test_get_block_trading, # 指数数据接口 (2个) 'index_constituents': test_get_index_constituents, 'index_weights': test_get_index_weights, # ETF数据接口 (2个) 'etf_pcf': test_get_etf_pcf, 'fund_share': test_get_fund_share, # 可转债数据接口 (1个) 'kzz_issuance': test_get_kzz_issuance, } def print_usage(): """打印使用说明""" print(""" 使用方法: python test_amazing_data_adapter.py 测试用例列表: 基础数据接口: code_list - 获取代码列表 code_info - 获取证券信息 trading_calendar - 获取交易日历 adj_factor - 获取复权因子 backward_factor - 获取后复权因子 历史行情数据接口: kline - 获取历史K线数据 snapshot - 获取历史快照数据 财务数据接口: balance_sheet - 获取资产负债表 cash_flow - 获取现金流量表 income_statement - 获取利润表 profit_express - 获取业绩快报 profit_notice - 获取业绩预告 股东股本数据接口: top10_shareholders - 获取十大股东数据 shareholder_count - 获取股东户数数据 equity_structure - 获取股本结构数据 融资融券数据接口: margin_summary - 获取融资融券成交汇总 margin_detail - 获取融资融券交易明细 交易异动数据接口: longhu_bang - 获取龙虎榜数据 block_trading - 获取大宗交易数据 指数数据接口: index_constituents - 获取指数成分股 index_weights - 获取指数成分股权重 ETF数据接口: etf_pcf - 获取ETF申赎数据 fund_share - 获取基金份额数据 可转债数据接口: kzz_issuance - 获取可转债发行数据 特殊命令: all - 运行所有测试 list - 列出所有测试用例 """) def run_all_tests(): """运行所有测试""" print("\n" + "="*60) print(f"运行所有测试用例: {list(TEST_CASES.keys())}") print("="*60) results = {} for name, func in TEST_CASES.items(): print(f"\n--- 测试: {name} ---") results[name] = run_test(name, func) print(f"\n--- 测试: {name} 完成 ---") # 打印汇总 print("\n" + "="*60) print("测试结果汇总") print("="*60) passed = sum(results.values()) total = len(results) for name, result in results.items(): status = "✅ 通过" if result else "❌ 失败" print(f"{name:20s} {status}") print("-"*60) print(f"总计: {passed}/{total} 通过") if __name__ == "__main__": if len(sys.argv) < 2: print_usage() sys.exit(1) test_name = sys.argv[1].lower() if test_name == 'list': print_usage() elif test_name == 'all': run_all_tests() elif test_name in TEST_CASES: run_test(test_name, TEST_CASES[test_name]) else: print(f"未知测试用例: {test_name}") print_usage() sys.exit(1)