#!/usr/bin/env python3 """ amazingData SDK 连接测试脚本 测试与银河证券星耀数智量化平台的连接 """ import sys import os # 添加项目路径 sys.path.insert(0, '/app/working/workspaces/developer/projects/20260330_kline_system/backend') from app.config import settings from app.services.amazing_data_service import amazing_data_service def test_connection(): """测试 SDK 连接""" print("=" * 60) print("amazingData SDK 连接测试") print("=" * 60) print() # 显示配置 print("📋 配置信息:") print(f" Host: {settings.AMAZING_DATA_HOST}") print(f" Port: {settings.AMAZING_DATA_PORT}") print(f" Account: {settings.AMAZING_DATA_ACCOUNT}") print(f" Environment: {settings.AMAZING_DATA_ENV}") print() # 测试连接 print("🔌 正在连接 amazingData 数据源...") try: success = amazing_data_service.connect() if success: print("✅ 连接成功!") print() # 测试获取数据 print("📊 测试获取数据...") try: # 获取期货代码列表 codes = amazing_data_service.get_security_codes(security_type="EXTRA_FUTURE") if codes: print(f"✅ 获取期货代码列表成功,共 {len(codes)} 个品种") print(f" 示例:{codes[:5] if len(codes) >= 5 else codes}") else: print("⚠️ 代码列表为空") except Exception as e: print(f"❌ 获取数据失败:{e}") # 断开连接 print() print("🔌 正在断开连接...") amazing_data_service.disconnect() print("✅ 已断开连接") return True else: print("❌ 连接失败") return False except Exception as e: print(f"❌ 连接异常:{e}") import traceback traceback.print_exc() return False if __name__ == "__main__": success = test_connection() print() print("=" * 60) if success: print("🎉 测试通过!amazingData SDK 集成成功!") else: print("⚠️ 测试失败,请检查网络和账号配置") print("=" * 60) sys.exit(0 if success else 1)