微信扫码
添加专属顾问
我要投稿
股票市场分析的智能助手,助你把握投资脉搏。 核心内容: 1. 股票分析系统的功能介绍与重要性 2. 利用dify实现股票数据实时获取与分析 3. 工作流制作步骤详解与实际操作演示
股票分析系统是一种综合性的工具,旨在帮助投资者深入分析股票市场数据,掌握市场趋势,评估股票的风险和价值,并制定科学的投资策略。它通常被称为证券决策分析系统,是投资者进行股票投资时不可或缺的工具之一。
技术分析:通过图表(如K线图、蜡烛图、折线图等)和趋势分析,预测股票价格的未来走势。技术分析的核心是利用历史数据来识别市场模式和趋势。
基本分析:研究公司的基本面数据,如盈利能力、财务状况、行业前景等,以评估股票的内在价值。
财务分析:提供财务报表和指标分析,帮助用户深入了解公司的财务状况。
市场情报:提供最新的市场新闻和研究报告,帮助用户了解市场动态和投资趋势。
风险管理系统:评估股票的风险水平,帮助投资者规避潜在的投资风险。
选股系统:通过算法和模型筛选出具有潜力的股票,节省投资者的时间和精力。
盘后分析:对当天的市场表现进行总结和分析,为第二天的投资决策提供参考。
实时信息推送:及时推送市场动态、新闻资讯和个股信息,帮助投资者快速做出反应。
智能选股:利用人工智能算法分析大量数据,推荐最适合的股票。
之前给大家做一个简单的echarts工作流,这个工作流用到的数据就是简单的股票数据,当时这数据是写死的。当时就觉的做的不好,今天就带大家使用dify来实现一个实时获取真实的股票交易数据,利用大模型的能力实现股票的决策分析和选股推荐。下面给大家看一下工作流整体效果。
接下来我们介绍一下这个工作流是如何制作的。
开始节点我们这里方便用户使用做了2个输入,一个是用户手工输入股票代码,一个是用户通过下拉选项选择股票代码。
有的小伙伴可能对股票代码不了解,大家可以在东方财富网https://quote.eastmoney.com/ 搜索股票代码。
我们这里提供部分设定好的股票让小伙伴下拉选择。
手工输入比较简单就是文本输入的内容。
另外的一个值就是下拉选择判断A股票、港股、美股、ETF、LOF
这样我们就完成了开始节点的配置。
这个条件分支主要目的是区分用户手工输入还是从下拉选项都认定是用户输入的信息。这里我们做了逻辑判断
如果用户2个值都没有输入我们给它返回一个错误提醒回复。
这里我们需要把用户输入的股票代码,不管是手工输入还是下拉选择,对于后面流程来说都是股票代码输入参数。这样我们可以利用变量聚合器进行参数的合并处理。
这个地方就是一个http请求接口。使用post方式请求。请求一个股票请求接口 http://127.0.0.1:8080/analyze-stock/
这里我们为了数据请求安全增加了接口请求鉴权(这个类似openai api请求)
这个apikey 我们可以在环境变量 env 设定好。
http请求 body部分 就是我们开始节点输入2个参数 1个是股票代码,1个是股票市场(A股票、港股、美股、ETF、LOF)。这里我们使用json值作为body体参数传入。
整体的http请求股票接口设置如下图
那么它的服务端代码是如何实现的呢?我们也给大家贴一下实现的代码
stock_analysis_api.py
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
from datetime import datetime, timedelta
import pandas as pd
import json
import akshare as ak
app = FastAPI()
# 参数配置
params = {
'ma_periods': {'short': 5, 'medium': 20, 'long': 60},
'rsi_period': 14,
'bollinger_period': 20,
'bollinger_std': 2,
'volume_ma_period': 20,
'atr_period': 14
}
# 鉴权 Token 验证
def verify_auth_token(authorization: str = Header(None)):
"""
验证 Authorization Header 中的 Bearer Token
"""
if not authorization:
raise HTTPException(status_code=401, detail="Missing Authorization Header")
scheme, _, token = authorization.partition(" ")
if scheme.lower() != "bearer":
raise HTTPException(status_code=401, detail="Invalid Authorization Scheme")
# 这里可以替换为实际的 Token 验证逻辑
valid_tokens = ["sk-zhouhui1122444", "zhouhui11224555"] # 示例有效 Token 列表
if token not in valid_tokens:
raise HTTPException(status_code=403, detail="Invalid or Expired Token")
return token
class StockAnalysisRequest(BaseModel):
stock_code: str
market_type: str = 'A'
start_date: str = None
end_date: str = None
def calculate_score(df):
"""计算评分"""
try:
score = 0
latest = df.iloc[-1]
# 趋势得分 (30分)
if latest['MA5'] > latest['MA20']:
score += 15
if latest['MA20'] > latest['MA60']:
score += 15
# RSI得分 (20分)
if 30 <= latest['RSI'] <= 70:
score += 20
elif latest['RSI'] < 30: # 超卖
score += 15
# MACD得分 (20分)
if latest['MACD'] > latest['Signal']:
score += 20
# 成交量得分 (30分)
if latest['Volume_Ratio'] > 1.5:
score += 30
elif latest['Volume_Ratio'] > 1:
score += 15
return score
except Exception as e:
print(f"计算评分时出错: {str(e)}")
raise
def calculate_indicators(df):
"""计算技术指标"""
try:
# 计算移动平均线
df['MA5'] = calculate_ema(df['close'], params['ma_periods']['short'])
df['MA20'] = calculate_ema(df['close'], params['ma_periods']['medium'])
df['MA60'] = calculate_ema(df['close'], params['ma_periods']['long'])
# 计算RSI
df['RSI'] = calculate_rsi(df['close'], params['rsi_period'])
# 计算MACD
df['MACD'], df['Signal'], df['MACD_hist'] = calculate_macd(df['close'])
# 计算布林带
df['BB_upper'], df['BB_middle'], df['BB_lower'] = calculate_bollinger_bands(
df['close'],
params['bollinger_period'],
params['bollinger_std']
)
# 成交量分析
df['Volume_MA'] = df['volume'].rolling(window=params['volume_ma_period']).mean()
df['Volume_Ratio'] = df['volume'] / df['Volume_MA']
# 计算ATR和波动率
df['ATR'] = calculate_atr(df, params['atr_period'])
df['Volatility'] = df['ATR'] / df['close'] * 100
# 动量指标
df['ROC'] = df['close'].pct_change(periods=10) * 100
return df
except Exception as e:
print(f"计算技术指标时出错: {str(e)}")
raise
def _truncate_json_for_logging(json_obj, max_length=500):
"""截断JSON对象用于日志记录,避免日志过大"""
json_str = json.dumps(json_obj, ensure_ascii=False)
if len(json_str) <= max_length:
return json_str
return json_str[:max_length] + f"... [截断,总长度: {len(json_str)}字符]"
def get_stock_data(stock_code, market_type='A', start_date=None, end_date=None):
"""获取股票或基金数据"""
if start_date is None:
start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d')
if end_date is None:
end_date = datetime.now().strftime('%Y%m%d')
try:
# 验证股票代码格式
if market_type == 'A':
valid_prefixes = ['0', '3', '6', '688', '8']
valid_format = False
for prefix in valid_prefixes:
if stock_code.startswith(prefix):
valid_format = True
break
if not valid_format:
error_msg = f"无效的A股股票代码格式: {stock_code}。A股代码应以0、3、6、688或8开头"
raise ValueError(error_msg)
df = ak.stock_zh_a_hist(
symbol=stock_code,
start_date=start_date,
end_date=end_date,
adjust="qfq"
)
elif market_type == 'HK':
df = ak.stock_hk_daily(
symbol=stock_code,
adjust="qfq"
)
elif market_type == 'US':
df = ak.stock_us_hist(
symbol=stock_code,
start_date=start_date,
end_date=end_date,
adjust="qfq"
)
elif market_type == 'ETF':
df = ak.fund_etf_hist_em(
symbol=stock_code,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="qfq"
)
elif market_type == 'LOF':
df = ak.fund_lof_hist_em(
symbol=stock_code,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="qfq"
)
else:
raise ValueError(f"不支持的市场类型: {market_type}")
# 重命名列名以匹配分析需求
df = df.rename(columns={
"日期": "date",
"开盘": "open",
"收盘": "close",
"最高": "high",
"最低": "low",
"成交量": "volume"
})
# 确保日期格式正确
df['date'] = pd.to_datetime(df['date'])
# 数据类型转换
numeric_columns = ['open', 'close', 'high', 'low', 'volume']
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')
# 删除空值
df = df.dropna()
return df.sort_values('date')
except Exception as e:
raise Exception(f"获取数据失败: {str(e)}")
def calculate_ema(series, period):
"""计算指数移动平均线"""
return series.ewm(span=period, adjust=False).mean()
def calculate_rsi(series, period):
"""计算RSI指标"""
delta = series.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
return 100 - (100 / (1 + rs))
def calculate_macd(series):
"""计算MACD指标"""
exp1 = series.ewm(span=12, adjust=False).mean()
exp2 = series.ewm(span=26, adjust=False).mean()
macd = exp1 - exp2
signal = macd.ewm(span=9, adjust=False).mean()
hist = macd - signal
return macd, signal, hist
def calculate_bollinger_bands(series, period, std_dev):
"""计算布林带"""
middle = series.rolling(window=period).mean()
std = series.rolling(window=period).std()
upper = middle + (std * std_dev)
lower = middle - (std * std_dev)
return upper, middle, lower
def calculate_atr(df, period):
"""计算ATR指标"""
high = df['high']
low = df['low']
close = df['close'].shift(1)
tr1 = high - low
tr2 = abs(high - close)
tr3 = abs(low - close)
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
return tr.rolling(window=period).mean()
def calculate_indicators(df):
"""计算技术指标"""
try:
# 计算移动平均线
df['MA5'] = calculate_ema(df['close'], params['ma_periods']['short'])
df['MA20'] = calculate_ema(df['close'], params['ma_periods']['medium'])
df['MA60'] = calculate_ema(df['close'], params['ma_periods']['long'])
# 计算RSI
df['RSI'] = calculate_rsi(df['close'], params['rsi_period'])
# 计算MACD
df['MACD'], df['Signal'], df['MACD_hist'] = calculate_macd(df['close'])
# 计算布林带
df['BB_upper'], df['BB_middle'], df['BB_lower'] = calculate_bollinger_bands(
df['close'],
params['bollinger_period'],
params['bollinger_std']
)
# 成交量分析
df['Volume_MA'] = df['volume'].rolling(window=params['volume_ma_period']).mean()
df['Volume_Ratio'] = df['volume'] / df['Volume_MA']
# 计算ATR和波动率
df['ATR'] = calculate_atr(df, params['atr_period'])
df['Volatility'] = df['ATR'] / df['close'] * 100
# 动量指标
df['ROC'] = df['close'].pct_change(periods=10) * 100
return df
except Exception as e:
print(f"计算技术指标时出错: {str(e)}")
raise
def calculate_score(df):
"""计算评分"""
try:
score = 0
latest = df.iloc[-1]
# 趋势得分 (30分)
if latest['MA5'] > latest['MA20']:
score += 15
if latest['MA20'] > latest['MA60']:
score += 15
# RSI得分 (20分)
if 30 <= latest['RSI'] <= 70:
score += 20
elif latest['RSI'] < 30: # 超卖
score += 15
# MACD得分 (20分)
if latest['MACD'] > latest['Signal']:
score += 20
# 成交量得分 (30分)
if latest['Volume_Ratio'] > 1.5:
score += 30
elif latest['Volume_Ratio'] > 1:
score += 15
return score
except Exception as e:
print(f"计算评分时出错: {str(e)}")
raise
def get_recommendation(score):
"""根据得分给出建议"""
if score >= 80:
return '强烈推荐买入'
elif score >= 60:
return '建议买入'
elif score >= 40:
return '观望'
elif score >= 20:
return '建议卖出'
else:
return '强烈建议卖出'
@app.post("/analyze-stock/")
async def analyze_stock(request: StockAnalysisRequest, auth_token: str = Depends(verify_auth_token)):
try:
# 获取股票数据
stock_data = get_stock_data(
request.stock_code,
request.market_type,
request.start_date,
request.end_date
)
print(stock_data)
# 计算技术指标
stock_data = calculate_indicators(stock_data)
# 计算评分
score = calculate_score(stock_data)
# 获取最新数据
latest = stock_data.iloc[-1]
prev = stock_data.iloc[-2]
# 生成技术指标概要
technical_summary = {
'trend': 'upward' if latest['MA5'] > latest['MA20'] else 'downward',
'volatility': f"{latest['Volatility']:.2f}%",
'volume_trend': 'increasing' if latest['Volume_Ratio'] > 1 else 'decreasing',
'rsi_level': latest['RSI']
}
# 获取近14日交易数据
recent_data = stock_data.tail(14).to_dict('records')
# 生成报告
report = {
'stock_code': request.stock_code,
'market_type': request.market_type,
'analysis_date': datetime.now().strftime('%Y-%m-%d'),
'score': score,
'price': latest['close'],
'price_change': (latest['close'] - prev['close']) / prev['close'] * 100,
'ma_trend': 'UP' if latest['MA5'] > latest['MA20'] else 'DOWN',
'rsi': latest['RSI'] if not pd.isna(latest['RSI']) else None,
'macd_signal': 'BUY' if latest['MACD'] > latest['Signal'] else 'SELL',
'volume_status': 'HIGH' if latest['Volume_Ratio'] > 1.5 else 'NORMAL',
'recommendation': get_recommendation(score)
}
# 返回结果
return {
"technical_summary": technical_summary,
"recent_data": recent_data,
"report": report
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8085)
关于详细的代码和依赖包大家可以看我开源的项目,我会上传到github中,文末会提供下载链接地址。
请求完股票数据服务端接口后,我们需要对它返回值进行处理。输入参数 就是body返回值。
处理代码如下
import json
def main(arg1: str) -> str: # 修改返回类型为 str
# 修正后的代码(修复缩进问题)
data = json.loads(arg1)
technical_summary = data['technical_summary']
recent_data = data['recent_data']
report = data['report']
# 将结果转换为JSON字符串返回
return {
"technical_summary": json.dumps(technical_summary,ensure_ascii=False, indent=2),
"recent_data": json.dumps(recent_data,ensure_ascii=False, indent=2),
"report": json.dumps(report,ensure_ascii=False, indent=2)
}
这个代码返回三个JSON返回值。所以有3个返回参数technical_summary、recent_data、report
这里我们又用到了条件分支判断组件了。
这个地方的目的是根据股票市场进行IF条件判断,后面的大模型会根据不同的股票进行分析和总结。所以这里我们需要有个分支判断
参数值就是开始节点中marketType
接下来这个地方分别有4个llm大语言模型分别对A股票、港股、美股、基金(ETF、LOF)进行处理。这个4个地方流程一样,我这里就以A股为案例讲解。这里我们用到了书生浦语的internlm3-8b-instruct模型,这个模型速度还是挺快的。
系统提示词
分析A股 {{#1741661741635.output#}}:
技术指标概要:
{{#1741663437132.technical_summary#}}
近14日交易数据:
{{#1741663437132.recent_data#}}
请提供:
1. 趋势分析(包含支撑位和压力位)
2. 成交量分析及其含义
3. 风险评估(包含波动率分析)
4. 短期和中期目标价位
5. 关键技术位分析
6. 具体交易建议(包含止损位)
请基于技术指标和A股市场特点进行分析,给出具体数据支持。
这个地方的作用主要利用上面的股票接口返回的technical_summary、recent_data这些股票数据结合大模型推理能力进行股票分析和总结。让AI给我做出股票分析和决策。
我这里4个股票市场分别用了4个模型,提示词都比较类似。这里就不详细展开。大家也可以去我开源项目中下载完整的DSL 分析。
上面4个模型分别对应4个回复输出。
有个小伙伴可以会问了,你这个4个节点感觉有点冗余,中间增加一个变量聚合器接受4个模型的输出,然后在利用变量聚合器和一个直接回复不就解决了吗?弄这4个感觉好冗余。 增加变量聚合器的确可以输出。不过这里有个小技巧就是,直接回复和大模型对接是可以实现流式输出的,如果对接一个变量聚合器在转到直接回复会导致流式输出失效,用户体验感查。因为这个工作流这个地方是最费时间的,如果20-30秒还没有输出用户会觉的有问题放弃。有了流式输出效果会好很多,大家可以自己去尝试。
以上我们就完成了工作流的搭建。工作流整体面貌如下
接下来我们可以把这个工作流分享出去,分享的地址如下:http://dify.duckcloud.fun/chat/hVjAUsFtg0PNknxe
我们在工作流中测试一下:
以上我们就完成了工作流的测试。
相关资料和文档可以看我开源的项目 https://github.com/wwwzhouhui/dify-for-dsl
这个工作流服务端代码主要参考了https://github.com/lanzhihong6/stock-scanner和 https://github.com/akfamily/akshare 2个项目。
其中Akshare项目提供的股票数据接口,stock-scanner提供了后端服务代码以及提示词,提供了很好的思路。再次感谢2个项目的开源作者无私奉献。
今天主要带大家使用 Dify 实现了一个实时获取真实股票交易数据,并利用大模型能力进行股票决策分析和选股推荐的工作流。详细介绍了整个工作流的实现步骤,工作流包含开始节点、条件分支、变量聚合器、HTTP 请求股票接口等部分。本次工作流内容比较多有一点难度,这里主要涉及到 Dify 和 FastAPI 等工具的使用,代码中还运用了 Akshare 库获取股票数据,以及对多种技术指标的计算。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-12-24
2024-04-25
2024-07-16
2024-04-24
2024-07-20
2024-05-08
2024-05-09
2024-06-21
2024-05-07
2024-08-06