You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
data_collector/futures_data_collector - 副本.py

682 lines
21 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
================================================================================
期货/股票多周期数据获取与技术指标计算脚本
================================================================================
【功能介绍】
本脚本用于获取期货或股票的多周期K线数据并自动计算技术指标输出JSON格式
可直接用于技术分析Skill的输入。
【核心功能】
1. 多周期数据获取
- 5分钟K线至少50根
- 15分钟K线至少50根
- 30分钟K线至少50根
- 60分钟K线至少50根
- 日K线至少50根
2. 技术指标自动计算每根K线都包含
- MA10: 10日/周期移动平均线
- MA20: 20日/周期移动平均线
- MACD_DIF: MACD快线
- MACD_DEA: MACD慢线信号线
- MACD_HISTOGRAM: MACD柱状图
3. 输出格式
每个周期为一个数组,数组中每项包含:
{
"time": "K线时间",
"open": 开盘价,
"high": 最高价,
"low": 最低价,
"close": 收盘价,
"volume": 成交量,
"ma10": MA10值,
"ma20": MA20值,
"macd_dif": DIF值,
"macd_dea": DEA值,
"macd_histogram": 柱状图值
}
【使用方法】
1. 安装依赖
pip install akshare pandas
2. 运行脚本
# 获取期货数据(默认)
python futures_data_collector.py --symbol SN2504 --type futures
# 获取股票数据
python futures_data_collector.py --symbol 000001 --type stock
python futures_data_collector.py --symbol 600000 --type stock --output stock_data.json
3. 参数说明
--symbol: 代码(必填)
期货格式:品种代码 + 年份 + 月份,如 SN2504(沪锡)、AG2506(沪银)
股票格式6位数字代码如 000001(平安银行)、600000(浦发银行)
--type: 数据类型(可选,默认 futures
可选值futures(期货)、stock(股票)
--output: 输出文件名(可选)
默认格式:{代码}_{时间戳}.json
【常见代码对照】
期货:
SN2504 - 沪锡(上海期货交易所)
AG2506 - 沪银(上海期货交易所)
LC2505 - 碳酸锂(广州期货交易所)
NI2505 - 沪镍(上海期货交易所)
股票:
000001 - 平安银行
600000 - 浦发银行
000858 - 五粮液
600519 - 贵州茅台
【输出示例】
{
"symbol": "SN2504",
"current_price": 223500,
"timestamp": "2026-03-07T22:15:00+08:00",
"timeframes": {
"60min": [
{
"time": "2026-03-07 14:00",
"open": 22100,
"high": 22300,
"low": 22050,
"close": 22250,
"volume": 12500,
"ma10": 22180.5,
"ma20": 22000.3,
"macd_dif": 0.0523,
"macd_dea": 0.0312,
"macd_histogram": 0.0422
}
],
"30min": [ ... ],
"15min": [ ... ],
"5min": [ ... ],
"daily": [ ... ]
}
}
【注意事项】
1. 数据源使用akshare数据可能有延迟或频率限制
2. 分钟数据受交易所限制,可能无法获取太多历史数据
3. 如遇数据获取失败,请检查合约代码是否正确
4. 脚本会自动过滤掉数据不足的周期
【作者】OpenClaw Assistant
【日期】2026-03-07
================================================================================
"""
import akshare as ak
import pandas as pd
import json
import argparse
import os
from datetime import datetime, timedelta
from typing import Dict, List
import warnings
warnings.filterwarnings('ignore')
# 清除缓存
ak.cache = {}
# 数据目录配置
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data')
os.makedirs(DATA_DIR, exist_ok=True)
def calculate_ma(df: pd.DataFrame, periods: List[int] = [10, 20]) -> pd.DataFrame:
"""计算移动平均线"""
for period in periods:
df[f'MA{period}'] = df['close'].rolling(window=period, min_periods=1).mean()
return df
def calculate_macd(df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.DataFrame:
"""计算MACD指标"""
ema_fast = df['close'].ewm(span=fast, adjust=False).mean()
ema_slow = df['close'].ewm(span=slow, adjust=False).mean()
df['macd_dif'] = ema_fast - ema_slow
df['macd_dea'] = df['macd_dif'].ewm(span=signal, adjust=False).mean()
df['macd_histogram'] = (df['macd_dif'] - df['macd_dea']) * 2
# MACD信号判断
df['macd_signal'] = df.apply(lambda row:
'bullish' if row['macd_dif'] > row['macd_dea'] and row['macd_histogram'] > 0
else 'bearish' if row['macd_dif'] < row['macd_dea'] and row['macd_histogram'] < 0
else 'neutral', axis=1)
return df
def get_current_time() -> datetime:
"""获取当前北京时间(去除微秒)"""
return datetime.now().replace(microsecond=0)
def filter_future_data(df: pd.DataFrame, current_time: datetime = None) -> pd.DataFrame:
"""
过滤掉未来数据
Args:
df: 包含datetime列的DataFrame
current_time: 当前时间,默认为系统当前时间
Returns:
过滤后的DataFrame
"""
if current_time is None:
current_time = get_current_time()
if 'datetime' not in df.columns:
return df
# 确保datetime列是datetime类型
df['datetime'] = pd.to_datetime(df['datetime'])
# 过滤掉大于当前时间的数据(未来数据)
original_count = len(df)
df = df[df['datetime'] <= current_time].copy()
filtered_count = original_count - len(df)
if filtered_count > 0:
print(f" 过滤了 {filtered_count} 条未来数据")
return df
def extend_night_session_data(df: pd.DataFrame, symbol: str, period: str) -> pd.DataFrame:
"""
尝试获取完整的夜盘数据
期货夜盘时间21:00 - 02:30次日
"""
if df.empty or 'datetime' not in df.columns:
return df
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime').reset_index(drop=True)
# 检查是否需要补充夜盘数据
# 获取最后一条数据的时间
last_time = df['datetime'].iloc[-1]
last_hour = last_time.hour
last_minute = last_time.minute
# 如果最后数据在 21:00-23:59 或 00:00-02:30 范围内,可能需要补充
# 夜盘结束时间是 02:30
is_night_session = (
(last_hour >= 21) or # 21:00 - 23:59
(last_hour < 2) or # 00:00 - 01:59
(last_hour == 2 and last_minute <= 30) # 02:00 - 02:30
)
if not is_night_session:
return df
# 检查是否包含 02:30 的数据对于5分钟、15分钟等周期
has_0230 = False
for dt in df['datetime']:
if dt.hour == 2 and dt.minute == 30:
has_0230 = True
break
# 如果已经有 02:30 的数据,说明夜盘完整
if has_0230:
return df
# 尝试通过获取历史数据来补充夜盘
# 由于akshare限制我们记录警告信息
print(f" 注意: 夜盘数据可能不完整缺少02:30及之前的数据")
return df
def get_minute_data(symbol: str, period: str) -> pd.DataFrame:
"""
获取期货分钟K线数据过滤未来数据确保夜盘完整
Args:
symbol: 合约代码,如 "SN2504"
period: 分钟周期,"5", "15", "30", "60"
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 使用akshare获取分钟数据
df = ak.futures_zh_minute_sina(symbol=symbol, period=period)
# 重命名列
df = df.rename(columns={
'day': 'datetime',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 确保datetime列是datetime类型
df['datetime'] = pd.to_datetime(df['datetime'])
# 过滤未来数据
df = filter_future_data(df, current_time)
# 尝试补充夜盘数据
df = extend_night_session_data(df, symbol, period)
# 确保至少50根K线
if len(df) < 50:
print(f" 警告: {period}分钟只获取到{len(df)}根K线建议检查数据源")
return df
except Exception as e:
print(f" 获取{period}分钟数据失败: {e}")
return pd.DataFrame()
def get_daily_data(symbol: str, days: int = 60) -> pd.DataFrame:
"""
获取期货日K线数据过滤未来数据
Args:
symbol: 合约代码
days: 获取天数
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 获取日K数据获取较多历史数据
df = ak.futures_zh_daily_sina(symbol=symbol)
# 重命名列
df = df.rename(columns={
'date': 'datetime',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 排序
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime').reset_index(drop=True)
# 过滤未来数据(只保留今天及之前的数据)
df = filter_future_data(df, current_time)
# 取最近N天
df = df.tail(days).reset_index(drop=True)
return df
except Exception as e:
print(f" 获取日K数据失败: {e}")
return pd.DataFrame()
def get_stock_minute_data(symbol: str, period: str) -> pd.DataFrame:
"""
获取股票分钟K线数据
Args:
symbol: 股票代码,如 "000001"
period: 分钟周期,"5", "15", "30", "60"
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 使用akshare获取股票分钟数据
# stock_zh_a_minute 需要 symbol 格式为 sh600000 或 sz000001
if symbol.startswith('6'):
full_symbol = f"sh{symbol}"
else:
full_symbol = f"sz{symbol}"
df = ak.stock_zh_a_minute(symbol=full_symbol, period=period)
# 重命名列
df = df.rename(columns={
'day': 'datetime',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 确保datetime列是datetime类型
df['datetime'] = pd.to_datetime(df['datetime'])
# 过滤未来数据
df = filter_future_data(df, current_time)
# 确保至少50根K线
if len(df) < 50:
print(f" 警告: {period}分钟只获取到{len(df)}根K线建议检查数据源")
return df
except Exception as e:
print(f" 获取{period}分钟数据失败: {e}")
return pd.DataFrame()
def get_stock_daily_data(symbol: str, days: int = 60) -> pd.DataFrame:
"""
获取股票日K线数据
Args:
symbol: 股票代码,如 "000001"
days: 获取天数
Returns:
DataFrame with OHLCV data
"""
try:
# 获取当前时间(用于过滤未来数据)
current_time = get_current_time()
# 计算开始日期(获取足够的历史数据)
end_date = current_time.strftime('%Y%m%d')
start_date = (current_time - timedelta(days=days*2)).strftime('%Y%m%d')
# 获取日K数据
df = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=start_date, end_date=end_date)
# 重命名列
df = df.rename(columns={
'日期': 'datetime',
'开盘': 'open',
'最高': 'high',
'最低': 'low',
'收盘': 'close',
'成交量': 'volume'
})
# 转换数据类型
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 排序
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime').reset_index(drop=True)
# 过滤未来数据(只保留今天及之前的数据)
df = filter_future_data(df, current_time)
# 取最近N天
df = df.tail(days).reset_index(drop=True)
return df
except Exception as e:
print(f" 获取日K数据失败: {e}")
return pd.DataFrame()
def process_data(df: pd.DataFrame, timeframe: str) -> List[Dict]:
"""
处理数据,计算指标并格式化输出
每个周期返回一个数组,每项包含交易数据+计算指标
Args:
df: K线DataFrame
timeframe: 周期名称
Returns:
格式化后的K线数组每项包含指标
"""
if df.empty or len(df) < 10:
return []
# 计算技术指标
df = calculate_ma(df)
df = calculate_macd(df)
# 格式化K线数据取最近50根或全部
candles = []
df_tail = df.tail(50) if len(df) > 50 else df
for _, row in df_tail.iterrows():
candle = {
"time": str(row['datetime']),
"open": round(float(row['open']), 2),
"high": round(float(row['high']), 2),
"low": round(float(row['low']), 2),
"close": round(float(row['close']), 2),
"volume": int(row['volume']) if not pd.isna(row['volume']) else 0,
"ma10": round(float(row['MA10']), 2) if not pd.isna(row.get('MA10')) else None,
"ma20": round(float(row['MA20']), 2) if not pd.isna(row.get('MA20')) else None,
"macd_dif": round(float(row['macd_dif']), 4) if not pd.isna(row.get('macd_dif')) else 0,
"macd_dea": round(float(row['macd_dea']), 4) if not pd.isna(row.get('macd_dea')) else 0,
"macd_histogram": round(float(row['macd_histogram']), 4) if not pd.isna(row.get('macd_histogram')) else 0
}
candles.append(candle)
return candles
def collect_futures_data(symbol: str) -> Dict:
"""
收集期货多周期完整数据
Args:
symbol: 合约代码,如 "SN2504"
Returns:
完整的JSON格式数据
"""
print(f"\n正在获取期货 {symbol} 的多周期数据...")
print(f"当前时间: {get_current_time().strftime('%Y-%m-%d %H:%M:%S')}")
print("-" * 50)
result = {
"symbol": symbol,
"type": "futures",
"current_price": None,
"timestamp": datetime.now().strftime("%Y-%m-%dT%H:%M:%S+08:00"),
"timeframes": {}
}
# 获取各周期数据
periods = [
("60min", "60"),
("30min", "30"),
("15min", "15"),
("5min", "5")
]
for tf_name, tf_period in periods:
print(f"获取 {tf_name} 数据...")
try:
df = get_minute_data(symbol, tf_period)
if not df.empty and len(df) >= 50:
candles = process_data(df, tf_name)
if candles:
result["timeframes"][tf_name] = candles
# 设置当前价格为最新收盘价
if result["current_price"] is None:
result["current_price"] = candles[-1]["close"]
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
# 获取日K数据
print("获取 daily 数据...")
try:
df_daily = get_daily_data(symbol, days=60)
if not df_daily.empty and len(df_daily) >= 50:
candles = process_data(df_daily, "daily")
if candles:
result["timeframes"]["daily"] = candles
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df_daily)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
print("-" * 50)
return result
def collect_stock_data(symbol: str) -> Dict:
"""
收集股票多周期完整数据
Args:
symbol: 股票代码,如 "000001"
Returns:
完整的JSON格式数据
"""
print(f"\n正在获取股票 {symbol} 的多周期数据...")
print(f"当前时间: {get_current_time().strftime('%Y-%m-%d %H:%M:%S')}")
print("-" * 50)
result = {
"symbol": symbol,
"type": "stock",
"current_price": None,
"timestamp": datetime.now().strftime("%Y-%m-%dT%H:%M:%S+08:00"),
"timeframes": {}
}
# 获取各周期数据
periods = [
("60min", "60"),
("30min", "30"),
("15min", "15"),
("5min", "5")
]
for tf_name, tf_period in periods:
print(f"获取 {tf_name} 数据...")
try:
df = get_stock_minute_data(symbol, tf_period)
if not df.empty and len(df) >= 50:
candles = process_data(df, tf_name)
if candles:
result["timeframes"][tf_name] = candles
# 设置当前价格为最新收盘价
if result["current_price"] is None:
result["current_price"] = candles[-1]["close"]
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
# 获取日K数据
print("获取 daily 数据...")
try:
df_daily = get_stock_daily_data(symbol, days=60)
if not df_daily.empty and len(df_daily) >= 50:
candles = process_data(df_daily, "daily")
if candles:
result["timeframes"]["daily"] = candles
print(f" [OK] 成功获取 {len(candles)} 根K线")
else:
print(f" [FAIL] 数据不足或获取失败 (获取到{len(df_daily)}根)")
except Exception as e:
print(f" [ERROR] 错误: {e}")
print("-" * 50)
return result
def main():
parser = argparse.ArgumentParser(description='期货/股票多周期数据获取与技术指标计算')
parser.add_argument('--symbol', type=str, required=True,
help='代码,期货如 SN2504(沪锡), 股票如 000001(平安银行)')
parser.add_argument('--type', type=str, default='futures', choices=['futures', 'stock'],
help='数据类型futures(期货)、stock(股票),默认为 futures')
parser.add_argument('--output', type=str, default=None,
help='输出JSON文件名默认为 代码_时间戳.json')
args = parser.parse_args()
# 根据类型获取数据
if args.type == 'stock':
data = collect_stock_data(args.symbol)
else:
data = collect_futures_data(args.symbol)
# 检查是否获取到数据
if not data["timeframes"]:
print("\n错误: 未能获取到任何数据,请检查代码是否正确")
if args.type == 'stock':
print("常见股票代码示例:")
print(" 000001 - 平安银行")
print(" 600000 - 浦发银行")
print(" 000858 - 五粮液")
print(" 600519 - 贵州茅台")
else:
print("常见期货合约代码示例:")
print(" SN2504 - 沪锡2504")
print(" AG2506 - 沪银2506")
print(" LC2505 - 碳酸锂2505")
print(" NI2505 - 沪镍2505")
return
# 打印JSON到控制台
print("\n" + "="*60)
print("JSON 输出:")
print("="*60)
json_output = json.dumps(data, ensure_ascii=False, indent=2)
print(json_output)
# 保存到文件(统一放到 data 目录)
if args.output:
# 如果用户指定了文件名,也放到 data 目录下
filename = os.path.join(DATA_DIR, args.output)
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(DATA_DIR, f"{data['symbol']}_{timestamp}.json")
with open(filename, 'w', encoding='utf-8') as f:
f.write(json_output)
print("\n" + "="*60)
print(f"[OK] 数据已保存到: {filename}")
print(f"[OK] 共获取 {len(data['timeframes'])} 个周期数据")
print("="*60)
if __name__ == "__main__":
main()