#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ================================================================================ 期货/股票多周期数据获取与技术指标计算脚本 ================================================================================ 【功能介绍】 本脚本用于获取期货或股票的多周期K线数据,并自动计算技术指标,输出JSON格式, 可直接用于技术分析Skill的输入。 【核心功能】 1. 多周期数据获取 - 5分钟K线(至少50根) - 15分钟K线(至少50根) - 30分钟K线(至少50根) - 60分钟K线(至少50根) - 日K线(至少50根) 2. 技术指标自动计算(每根K线都包含) - MA10: 10日/周期移动平均线 - MA20: 20日/周期移动平均线 - MACD_DIF: MACD快线 - MACD_DEA: MACD慢线(信号线) - MACD_HISTOGRAM: MACD柱状图 3. 输出格式 每个周期为一个数组,数组中每项包含: { "time": "K线时间", "open": 开盘价, "high": 最高价, "low": 最低价, "close": 收盘价, "volume": 成交量, "ma10": MA10值, "ma20": MA20值, "macd_dif": DIF值, "macd_dea": DEA值, "macd_histogram": 柱状图值 } 【使用方法】 1. 安装依赖 pip install akshare pandas 2. 运行脚本 # 获取期货数据(默认) python futures_data_collector.py --symbol SN2504 --type futures # 获取股票数据 python futures_data_collector.py --symbol 000001 --type stock python futures_data_collector.py --symbol 600000 --type stock --output stock_data.json 3. 参数说明 --symbol: 代码(必填) 期货格式:品种代码 + 年份 + 月份,如 SN2504(沪锡)、AG2506(沪银) 股票格式:6位数字代码,如 000001(平安银行)、600000(浦发银行) --type: 数据类型(可选,默认 futures) 可选值:futures(期货)、stock(股票) --output: 输出文件名(可选) 默认格式:{代码}_{时间戳}.json 【常见代码对照】 期货: SN2504 - 沪锡(上海期货交易所) AG2506 - 沪银(上海期货交易所) LC2505 - 碳酸锂(广州期货交易所) NI2505 - 沪镍(上海期货交易所) 股票: 000001 - 平安银行 600000 - 浦发银行 000858 - 五粮液 600519 - 贵州茅台 【输出示例】 { "symbol": "SN2504", "current_price": 223500, "timestamp": "2026-03-07T22:15:00+08:00", "timeframes": { "60min": [ { "time": "2026-03-07 14:00", "open": 22100, "high": 22300, "low": 22050, "close": 22250, "volume": 12500, "ma10": 22180.5, "ma20": 22000.3, "macd_dif": 0.0523, "macd_dea": 0.0312, "macd_histogram": 0.0422 } ], "30min": [ ... ], "15min": [ ... ], "5min": [ ... ], "daily": [ ... ] } } 【注意事项】 1. 数据源使用akshare,数据可能有延迟或频率限制 2. 分钟数据受交易所限制,可能无法获取太多历史数据 3. 如遇数据获取失败,请检查合约代码是否正确 4. 脚本会自动过滤掉数据不足的周期 【作者】OpenClaw Assistant 【日期】2026-03-07 ================================================================================ """ import akshare as ak import pandas as pd import json import argparse import os from datetime import datetime, timedelta from typing import Dict, List import warnings warnings.filterwarnings('ignore') # 清除缓存 ak.cache = {} # 数据目录配置 DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data') os.makedirs(DATA_DIR, exist_ok=True) def calculate_ma(df: pd.DataFrame, periods: List[int] = [10, 20]) -> pd.DataFrame: """计算移动平均线""" for period in periods: df[f'MA{period}'] = df['close'].rolling(window=period, min_periods=1).mean() return df def calculate_macd(df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.DataFrame: """计算MACD指标""" ema_fast = df['close'].ewm(span=fast, adjust=False).mean() ema_slow = df['close'].ewm(span=slow, adjust=False).mean() df['macd_dif'] = ema_fast - ema_slow df['macd_dea'] = df['macd_dif'].ewm(span=signal, adjust=False).mean() df['macd_histogram'] = (df['macd_dif'] - df['macd_dea']) * 2 # MACD信号判断 df['macd_signal'] = df.apply(lambda row: 'bullish' if row['macd_dif'] > row['macd_dea'] and row['macd_histogram'] > 0 else 'bearish' if row['macd_dif'] < row['macd_dea'] and row['macd_histogram'] < 0 else 'neutral', axis=1) return df def get_current_time() -> datetime: """获取当前北京时间(去除微秒)""" return datetime.now().replace(microsecond=0) def filter_future_data(df: pd.DataFrame, current_time: datetime = None) -> pd.DataFrame: """ 过滤掉未来数据 Args: df: 包含datetime列的DataFrame current_time: 当前时间,默认为系统当前时间 Returns: 过滤后的DataFrame """ if current_time is None: current_time = get_current_time() if 'datetime' not in df.columns: return df # 确保datetime列是datetime类型 df['datetime'] = pd.to_datetime(df['datetime']) # 过滤掉大于当前时间的数据(未来数据) original_count = len(df) df = df[df['datetime'] <= current_time].copy() filtered_count = original_count - len(df) if filtered_count > 0: print(f" 过滤了 {filtered_count} 条未来数据") return df def extend_night_session_data(df: pd.DataFrame, symbol: str, period: str) -> pd.DataFrame: """ 尝试获取完整的夜盘数据 期货夜盘时间:21:00 - 02:30(次日) """ if df.empty or 'datetime' not in df.columns: return df df['datetime'] = pd.to_datetime(df['datetime']) df = df.sort_values('datetime').reset_index(drop=True) # 检查是否需要补充夜盘数据 # 获取最后一条数据的时间 last_time = df['datetime'].iloc[-1] last_hour = last_time.hour last_minute = last_time.minute # 如果最后数据在 21:00-23:59 或 00:00-02:30 范围内,可能需要补充 # 夜盘结束时间是 02:30 is_night_session = ( (last_hour >= 21) or # 21:00 - 23:59 (last_hour < 2) or # 00:00 - 01:59 (last_hour == 2 and last_minute <= 30) # 02:00 - 02:30 ) if not is_night_session: return df # 检查是否包含 02:30 的数据(对于5分钟、15分钟等周期) has_0230 = False for dt in df['datetime']: if dt.hour == 2 and dt.minute == 30: has_0230 = True break # 如果已经有 02:30 的数据,说明夜盘完整 if has_0230: return df # 尝试通过获取历史数据来补充夜盘 # 由于akshare限制,我们记录警告信息 print(f" 注意: 夜盘数据可能不完整(缺少02:30及之前的数据)") return df def get_minute_data(symbol: str, period: str) -> pd.DataFrame: """ 获取期货分钟K线数据,过滤未来数据,确保夜盘完整 Args: symbol: 合约代码,如 "SN2504" period: 分钟周期,"5", "15", "30", "60" Returns: DataFrame with OHLCV data """ try: # 获取当前时间(用于过滤未来数据) current_time = get_current_time() # 使用akshare获取分钟数据 df = ak.futures_zh_minute_sina(symbol=symbol, period=period) # 重命名列 df = df.rename(columns={ 'day': 'datetime', 'open': 'open', 'high': 'high', 'low': 'low', 'close': 'close', 'volume': 'volume' }) # 转换数据类型 for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') # 确保datetime列是datetime类型 df['datetime'] = pd.to_datetime(df['datetime']) # 过滤未来数据 df = filter_future_data(df, current_time) # 尝试补充夜盘数据 df = extend_night_session_data(df, symbol, period) # 确保至少50根K线 if len(df) < 50: print(f" 警告: {period}分钟只获取到{len(df)}根K线,建议检查数据源") return df except Exception as e: print(f" 获取{period}分钟数据失败: {e}") return pd.DataFrame() def get_daily_data(symbol: str, days: int = 60) -> pd.DataFrame: """ 获取期货日K线数据,过滤未来数据 Args: symbol: 合约代码 days: 获取天数 Returns: DataFrame with OHLCV data """ try: # 获取当前时间(用于过滤未来数据) current_time = get_current_time() # 获取日K数据(获取较多历史数据) df = ak.futures_zh_daily_sina(symbol=symbol) # 重命名列 df = df.rename(columns={ 'date': 'datetime', 'open': 'open', 'high': 'high', 'low': 'low', 'close': 'close', 'volume': 'volume' }) # 转换数据类型 for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') # 排序 df['datetime'] = pd.to_datetime(df['datetime']) df = df.sort_values('datetime').reset_index(drop=True) # 过滤未来数据(只保留今天及之前的数据) df = filter_future_data(df, current_time) # 取最近N天 df = df.tail(days).reset_index(drop=True) return df except Exception as e: print(f" 获取日K数据失败: {e}") return pd.DataFrame() def get_stock_minute_data(symbol: str, period: str) -> pd.DataFrame: """ 获取股票分钟K线数据 Args: symbol: 股票代码,如 "000001" period: 分钟周期,"5", "15", "30", "60" Returns: DataFrame with OHLCV data """ try: # 获取当前时间(用于过滤未来数据) current_time = get_current_time() # 使用akshare获取股票分钟数据 # stock_zh_a_minute 需要 symbol 格式为 sh600000 或 sz000001 if symbol.startswith('6'): full_symbol = f"sh{symbol}" else: full_symbol = f"sz{symbol}" df = ak.stock_zh_a_minute(symbol=full_symbol, period=period) # 重命名列 df = df.rename(columns={ 'day': 'datetime', 'open': 'open', 'high': 'high', 'low': 'low', 'close': 'close', 'volume': 'volume' }) # 转换数据类型 for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') # 确保datetime列是datetime类型 df['datetime'] = pd.to_datetime(df['datetime']) # 过滤未来数据 df = filter_future_data(df, current_time) # 确保至少50根K线 if len(df) < 50: print(f" 警告: {period}分钟只获取到{len(df)}根K线,建议检查数据源") return df except Exception as e: print(f" 获取{period}分钟数据失败: {e}") return pd.DataFrame() def get_stock_daily_data(symbol: str, days: int = 60) -> pd.DataFrame: """ 获取股票日K线数据 Args: symbol: 股票代码,如 "000001" days: 获取天数 Returns: DataFrame with OHLCV data """ try: # 获取当前时间(用于过滤未来数据) current_time = get_current_time() # 计算开始日期(获取足够的历史数据) end_date = current_time.strftime('%Y%m%d') start_date = (current_time - timedelta(days=days*2)).strftime('%Y%m%d') # 获取日K数据 df = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=start_date, end_date=end_date) # 重命名列 df = df.rename(columns={ '日期': 'datetime', '开盘': 'open', '最高': 'high', '最低': 'low', '收盘': 'close', '成交量': 'volume' }) # 转换数据类型 for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') # 排序 df['datetime'] = pd.to_datetime(df['datetime']) df = df.sort_values('datetime').reset_index(drop=True) # 过滤未来数据(只保留今天及之前的数据) df = filter_future_data(df, current_time) # 取最近N天 df = df.tail(days).reset_index(drop=True) return df except Exception as e: print(f" 获取日K数据失败: {e}") return pd.DataFrame() def process_data(df: pd.DataFrame, timeframe: str) -> List[Dict]: """ 处理数据,计算指标并格式化输出 每个周期返回一个数组,每项包含交易数据+计算指标 Args: df: K线DataFrame timeframe: 周期名称 Returns: 格式化后的K线数组,每项包含指标 """ if df.empty or len(df) < 10: return [] # 计算技术指标 df = calculate_ma(df) df = calculate_macd(df) # 格式化K线数据(取最近50根或全部) candles = [] df_tail = df.tail(50) if len(df) > 50 else df for _, row in df_tail.iterrows(): candle = { "time": str(row['datetime']), "open": round(float(row['open']), 2), "high": round(float(row['high']), 2), "low": round(float(row['low']), 2), "close": round(float(row['close']), 2), "volume": int(row['volume']) if not pd.isna(row['volume']) else 0, "ma10": round(float(row['MA10']), 2) if not pd.isna(row.get('MA10')) else None, "ma20": round(float(row['MA20']), 2) if not pd.isna(row.get('MA20')) else None, "macd_dif": round(float(row['macd_dif']), 4) if not pd.isna(row.get('macd_dif')) else 0, "macd_dea": round(float(row['macd_dea']), 4) if not pd.isna(row.get('macd_dea')) else 0, "macd_histogram": round(float(row['macd_histogram']), 4) if not pd.isna(row.get('macd_histogram')) else 0 } candles.append(candle) return candles def collect_futures_data(symbol: str) -> Dict: """ 收集期货多周期完整数据 Args: symbol: 合约代码,如 "SN2504" Returns: 完整的JSON格式数据 """ print(f"\n正在获取期货 {symbol} 的多周期数据...") print(f"当前时间: {get_current_time().strftime('%Y-%m-%d %H:%M:%S')}") print("-" * 50) result = { "symbol": symbol, "type": "futures", "current_price": None, "timestamp": datetime.now().strftime("%Y-%m-%dT%H:%M:%S+08:00"), "timeframes": {} } # 获取各周期数据 periods = [ ("60min", "60"), ("30min", "30"), ("15min", "15"), ("5min", "5") ] for tf_name, tf_period in periods: print(f"获取 {tf_name} 数据...") try: df = get_minute_data(symbol, tf_period) if not df.empty and len(df) >= 50: candles = process_data(df, tf_name) if candles: result["timeframes"][tf_name] = candles # 设置当前价格为最新收盘价 if result["current_price"] is None: result["current_price"] = candles[-1]["close"] print(f" [OK] 成功获取 {len(candles)} 根K线") else: print(f" [FAIL] 数据不足或获取失败 (获取到{len(df)}根)") except Exception as e: print(f" [ERROR] 错误: {e}") # 获取日K数据 print("获取 daily 数据...") try: df_daily = get_daily_data(symbol, days=60) if not df_daily.empty and len(df_daily) >= 50: candles = process_data(df_daily, "daily") if candles: result["timeframes"]["daily"] = candles print(f" [OK] 成功获取 {len(candles)} 根K线") else: print(f" [FAIL] 数据不足或获取失败 (获取到{len(df_daily)}根)") except Exception as e: print(f" [ERROR] 错误: {e}") print("-" * 50) return result def collect_stock_data(symbol: str) -> Dict: """ 收集股票多周期完整数据 Args: symbol: 股票代码,如 "000001" Returns: 完整的JSON格式数据 """ print(f"\n正在获取股票 {symbol} 的多周期数据...") print(f"当前时间: {get_current_time().strftime('%Y-%m-%d %H:%M:%S')}") print("-" * 50) result = { "symbol": symbol, "type": "stock", "current_price": None, "timestamp": datetime.now().strftime("%Y-%m-%dT%H:%M:%S+08:00"), "timeframes": {} } # 获取各周期数据 periods = [ ("60min", "60"), ("30min", "30"), ("15min", "15"), ("5min", "5") ] for tf_name, tf_period in periods: print(f"获取 {tf_name} 数据...") try: df = get_stock_minute_data(symbol, tf_period) if not df.empty and len(df) >= 50: candles = process_data(df, tf_name) if candles: result["timeframes"][tf_name] = candles # 设置当前价格为最新收盘价 if result["current_price"] is None: result["current_price"] = candles[-1]["close"] print(f" [OK] 成功获取 {len(candles)} 根K线") else: print(f" [FAIL] 数据不足或获取失败 (获取到{len(df)}根)") except Exception as e: print(f" [ERROR] 错误: {e}") # 获取日K数据 print("获取 daily 数据...") try: df_daily = get_stock_daily_data(symbol, days=60) if not df_daily.empty and len(df_daily) >= 50: candles = process_data(df_daily, "daily") if candles: result["timeframes"]["daily"] = candles print(f" [OK] 成功获取 {len(candles)} 根K线") else: print(f" [FAIL] 数据不足或获取失败 (获取到{len(df_daily)}根)") except Exception as e: print(f" [ERROR] 错误: {e}") print("-" * 50) return result def main(): parser = argparse.ArgumentParser(description='期货/股票多周期数据获取与技术指标计算') parser.add_argument('--symbol', type=str, required=True, help='代码,期货如 SN2504(沪锡), 股票如 000001(平安银行)') parser.add_argument('--type', type=str, default='futures', choices=['futures', 'stock'], help='数据类型:futures(期货)、stock(股票),默认为 futures') parser.add_argument('--output', type=str, default=None, help='输出JSON文件名,默认为 代码_时间戳.json') args = parser.parse_args() # 根据类型获取数据 if args.type == 'stock': data = collect_stock_data(args.symbol) else: data = collect_futures_data(args.symbol) # 检查是否获取到数据 if not data["timeframes"]: print("\n错误: 未能获取到任何数据,请检查代码是否正确") if args.type == 'stock': print("常见股票代码示例:") print(" 000001 - 平安银行") print(" 600000 - 浦发银行") print(" 000858 - 五粮液") print(" 600519 - 贵州茅台") else: print("常见期货合约代码示例:") print(" SN2504 - 沪锡2504") print(" AG2506 - 沪银2506") print(" LC2505 - 碳酸锂2505") print(" NI2505 - 沪镍2505") return # 打印JSON到控制台 print("\n" + "="*60) print("JSON 输出:") print("="*60) json_output = json.dumps(data, ensure_ascii=False, indent=2) print(json_output) # 保存到文件(统一放到 data 目录) if args.output: # 如果用户指定了文件名,也放到 data 目录下 filename = os.path.join(DATA_DIR, args.output) else: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(DATA_DIR, f"{data['symbol']}_{timestamp}.json") with open(filename, 'w', encoding='utf-8') as f: f.write(json_output) print("\n" + "="*60) print(f"[OK] 数据已保存到: {filename}") print(f"[OK] 共获取 {len(data['timeframes'])} 个周期数据") print("="*60) if __name__ == "__main__": main()