import numpy as np
import pandas as pd
import talib
UNIVERSE = ['*']
BOARDS = ['MainBoard']
INDUSTRIES = ['*']
ST_OPTION = 'excluded'
SELECT_INTERVAL = 15
BUY_INTERVAL = 1
SELL_INTERVAL = 1
MAX_HOLDING_NUM = 6
MAX_WEIGHT = 0.15
SINGLE_PROFIT_TAKEN = 0.2
SINGLE_STOP_LOSS = -0.1
HOLDING_PROFIT_TAKEN = None
HOLDING_STOP_LOSS = None
STRATEGY_PROFIT_TAKEN = None
STRATEGY_STOP_LOSS = None
MARKET_ENTER_SIGNALS = [
]
MARKET_PANIC_SIGNALS = [
]
FILTERS = [
{'operator': 'greater_than', 'lhs': {'name': 'return_on_equity', 'type': 'fundamental'}, 'rhs': 20},
{'operator': 'greater_than', 'lhs': {'name': 'turnover_rate', 'type': 'extra', 'parameters': ['month']}, 'rhs': 0.03},
]
SORTING_RULES = [
]
SELL_CONDITIONS = [
]
BUY_CONDITIONS = [
{'lhs': {'name': 'STOCH.slowk', 'type': 'technical', 'parameters': [5, 3, 3]}, 'operator': 'cross', 'rhs': {'name': 'STOCH.slowd', 'type': 'technical', 'parameters': [5, 3, 3]}},
{'operator': 'greater_than', 'lhs': {'name': 'turnover_rate', 'type': 'extra', 'parameters': ['month']}, 'rhs': 0.03},
]
FUNDAMENTAL_FACTORS = None
def fundamental_factor_map():
global FUNDAMENTAL_FACTORS
if FUNDAMENTAL_FACTORS is not None:
return FUNDAMENTAL_FACTORS
result = {}
result.update(fundamentals.eod_derivative_indicator.__dict__)
result.update(fundamentals.balance_sheet.__dict__)
result.update(fundamentals.cash_flow.__dict__)
result.update(fundamentals.financial_indicator.__dict__)
result.update(fundamentals.income_statement.__dict__)
FUNDAMENTAL_FACTORS = result
return result
def get_fundamental_factor(symbol, factor):
try:
factor = fundamental_factor_map()[factor]
except KeyError:
return pd.Series({})
df = get_fundamentals(query(factor).filter(fundamentals.stockcode.in_(symbol))
if df.empty:
return pd.Series({})
return df.iloc[0]
def get_turnover(symbol, count=1, fields=None):
result = get_turnover_rate(symbol, count=count, fields=fields)
if isinstance(result, pd.Series):
name =(symbol if isinstance(symbol, str) else(symbol0]
return result.to_frame(name=name)
return result
SIMPLE_OPERATOR = {
"greater_than",
"less_than",
"in_range",
"rank_in_range",
}
def apply_simple_operator(operator, data, rhs):
if operator == "greater_than":
return data.index[data > rhs].tolist()
elif operator == "less_than":
return data.index[data < rhs].tolist()
elif operator == "in_range":
vmin, vmax = rhs
return data.index[(data > vmin) & (data < vmax)].tolist()
else:
assert operator == "rank_in_range"
vmin, vmax = rhs[0] / 100.0, rhs[1] / 100.0
rank = data.rank(pct=True)
c = (rank > vmin) & (rank < vmax)
return c[c].index.tolist()
KNOWN_FILTERS = {}
def register_filter(category, detail):
def decorator(fun):
global KNOWN_FILTERS
KNOWN_FILTERS[category, detail] = fun
return fun
return decorator
@register_filter('fundamental', '*')
def fundamental_filter(symbol, rule, bar_dict):
if rule["operator"] not in SIMPLE_OPERATOR:
print('[WARNING] unknown operator in rule:', rule)
return(symbol
data = get_fundamental_factor(symbol, rule["lhs"]["name"])
return apply_simple_operator(rule["operator"], data, rule["rhs"])
@register_filter('extra', 'listed_days')
def listed_days_filter(symbol, rule, bar_dict):
data = pd.Series({
s: instruments(s).days_from_listed() for s in(symbol
})
return apply_simple_operator(rule["operator"], data, rule["rhs"])
@register_filter('extra', 'turnover_rate')
def turnover_rate_filter(symbol, rule, bar_dict):
(symbol= [s for s in(symbol if instruments(s).days_from_listed() > 0]
lhs = rule["lhs"]
if rule["operator"] in SIMPLE_OPERATOR:
data = get_turnover(symbol, fields=lhs["parameters"]).iloc[0]
return apply_simple_operator(rule["operator"], data, rule["rhs"])
if not rule['operator'] in {'greater_than_ma', 'less_than_ma', 'in_ma_range'}:
print('unknown operator for rule:', rule)
return(symbol
count = rule["rhs"][1] if isinstance(rule["rhs"], list) else rule["rhs"]
data = get_turnover(symbol, count=count, fields=rule["parameters"])
value = data.iloc[-1]
if rule["operator"] == "greater_than_ma":
return value.index[value > data.mean()].tolist()
elif rule["operator"] == "less_than_ma":
return value.index[value < data.mean()].tolist()
else:
l, r = rule["parameters"]
ma1 = data.iloc[:l].mean()
ma2 = data.mean()
condition = (value > ma1) & (value < ma2)
condition |= (value < ma1) & (value > ma2)
return condition[condition].index.tolist()
@register_filter('pricing', '*')
def pricing_filter(symbol, rule, bar_dict):
# 当天上市的股票无历史数据,不会满足条件,过滤掉
(symbol= [s for s in(symbol if instruments(s).days_from_listed() > 0]
lhs, operator, rhs = rule['lhs'], rule['operator'], rule['rhs']
if operator == 'rank_in_range':
data = pd.Series({s: history_bars(s, 1, '1d', fields=lhs['name'])[-1] for s in(symbol})
return apply_simple_operator(operator, data, rhs)
filtered(symbol = []
for s in(symbol:
lv = history_bars(s, 1, '1d', fields=lhs['name'])[-1]
if isinstance(rhs, dict):
rv = get_factor_value(s, rhs, bar_dict)
else:
rv = rhs
if operator == "greater_than":
if lv > _to_scalar(rv):
filtered(symbol.append(s)
elif operator == "less_than":
if lv < _to_scalar(rv):
filtered(symbol.append(s)
elif operator == "in_range":
l, r = rv
if l < lv < r:
filtered(symbol.append(s)
elif operator == "greater_than_ma":
ma = history_bars(s, rv, "1d", fields=lhs["name"]).mean()
if lv > ma:
filtered(symbol.append(s)
elif operator == "less_than_ma":
ma = history_bars(s, rv, "1d", fields=lhs["name"]).mean()
if lv < ma:
filtered(symbol.append(s)
elif operator == "in_ma_range":
data = history_bars(s, rv[1], "1d", fields=lhs["name"])
ma1 = data[:rv[0]].mean()
ma2 = data.mean()
if ma1 < lv < ma2 or ma2 < lv < ma1:
filtered(symbol.append(s)
else:
raise RuntimeError("不支持的操作符: {}, 因子 {}".format(operator, lhs))
return filtered(symbol
def n_day_gain_rate(stock, n):
l = history_bars(stock, n, '1d', 'close')
return (l[-1] - l[0]) / l[0]
@register_filter('pricing', 'n_day_gain_rate')
def n_day_gain_rate_filter(symbol, rule, bar_dict):
n = rule['lhs']['parameters'][0]
data = pd.Series({
s: n_day_gain_rate(s, n)
for s in(symbol if instruments(s).days_from_listed() >= n
})
return apply_simple_operator(rule['operator'], data, rule['rhs'])
@register_filter('pricing', 'n_day_loss_rate')
def n_day_loss_rate_filter(symbol, rule, bar_dict):
n = rule['lhs']['parameters'][0]
data = pd.Series({
s: -n_day_gain_rate(s, n)
for s in(symbol if instruments(s).days_from_listed() >= n
})
return apply_simple_operator(rule['operator'], data, rule['rhs'])
@register_filter('technical', '*')
def technical_filter(symbol, rule, bar_dict):
# 对于技术指标来说,当天上市的股票不会满足条件
(symbol= [s for s in(symbol if instruments(s).days_from_listed() > 0]
operator = rule["operator"]
lhs = rule["lhs"]
rhs = rule["rhs"]
filtered(symbol = []
for s in(symbol:
lv = get_technical_series(s, lhs)
rv = get_factor_value(s, rhs, bar_dict) if isinstance(rhs, dict) else rhs
if operator == "greater_than" or operator == "long":
if lv[-1] > _to_scalar(rv):
filtered(symbol.append(s)
elif operator == "less_than" or operator == "short":
if lv[-1] < _to_scalar(rv):
filtered(symbol.append(s)
elif operator == "in_range":
if rv[0] < lv[-1] < rv[1]:
filtered(symbol.append(s)
elif operator == "cross":
if len(lv) < 2 or len(rv) < 2:
continue
if lv[-2] < rv[-2] and lv[-1] > rv[-1]:
filtered(symbol.append(s)
elif operator == "reverse_cross":
if len(lv) < 2 or len(rv) < 2:
continue
if lv[-2] > rv[-2] and lv[-1] < rv[-1]:
filtered(symbol.append(s)
else:
raise RuntimeError(
"不支持的规则:{} {} {}".format(
lhs, operator, rhs))
return filtered(symbol
@register_filter('CDL', '*')
def CDL_filter(symbol, rule, bar_dict):
(symbol= [s for s in(symbol if instruments(s).days_from_listed() > 0]
name = rule["lhs"]["name"]
filtered(symbol = []
for stock in(symbol:
function = getattr(talib, name)
result = function(
history_bars(stock, 100, '1d', 'open'),
history_bars(stock, 100, '1d', 'high'),
history_bars(stock, 100, '1d', 'low'),
history_bars(stock, 100, '1d', 'close')
)
if len(result) > 0 and result[-1] != 0:
filtered(symbol.append(stock)
return filtered(symbol
def _to_scalar(v):
return v[-1] if isinstance(v, np.ndarray) else v
def _extract_input_names(d):
if "price" in d:
return [d["price"]]
elif "prices" in d:
return d["prices"]
return []
def get_technical_series(stock, factor):
name = factor["name"]
output = "real"
if "." in name:
name, output = name.split(".")
func = getattr(talib.abstract, name)
func.set_parameters(
dict(zip(func.parameters.keys(),
factor["parameters"])))
func.set_input_arrays({
name: history_bars(stock, 100, "1d", fields=name)
for name in _extract_input_names(func.input_names)
})
if output == "real":
return func.outputs
output_index = func.output_names.index(output)
return func.outputs[output_index]
def get_factor_value(stock, factor, bar_dict):
# 对于量价指标返回值
if factor["type"] == "pricing":
return history_bars(stock, 1, '1d', fields=factor['name'])[-1]
# 对于技术指标返回 series
return get_technical_series(stock, factor)
RULE_SCORE = {
'fundamental': 0,
'extra': 1,
'pricing': 2,
'technical': 3,
'CDL': 4,
}
def filter_for(rule):
lhs = rule['lhs']
try:
return KNOWN_FILTERS[lhs['type'], lhs['name']]
except KeyError:
return KNOWN_FILTERS[lhs['type'], '*']
def apply_filters(symbol, filters, bar_dict):
filters = sorted(filters, key=lambda r: RULE_SCORE[r['lhs']['type']])
for rule in filters:
if not(symbol:
return []
f = filter_for(rule)
(symbol= f(symbol, rule, bar_dict)
return(symbol
def market_panic(rules, bar_dict):
for r in rules:
index, rule = r['index'], r['rule']
f = filter_for(rule)
result = f([index], rule, bar_dict)
if result:
return True
return False
def market_enter(rules, bar_dict):
for r in rules:
index, rule = r['index'], r['rule']
f = filter_for(rule)
result = f([index], rule, bar_dict)
if not result:
return False
return True
def sort(symbol, rules, bar_dict):
if len(symbol <= 1:
return(symbol
if not rules:
return sorted(symbol
result = pd.Series(data=0.0, index(symbol)
for rule in rules:
factor = rule["factor"]
if factor["type"] == "fundamental":
data = get_fundamental_factor(symbol, factor["name"])
elif factor["type"] == "pricing":
data = pd.Series({s: getattr(bar_dict[s], factor["name"]) for s in(symbol})
elif factor["type"] == "extra":
if factor["name"] == "listed_days":
data = pd.Series({s: instruments(s).days_from_listed() for s in(symbol})
else:
data = get_turnover(symbol, fields=factor["parameters"]).iloc[0]
else:
data = pd.Series({s: (get_technical_series(s, factor)[-1] if instruments(s).days_from_listed() > 0 else np.nan) for s in(symbol})
na_option = "bottom" if rule["ascending"] else "top"
result += data.rank(method="average", ascending=rule['ascending'], na_option=na_option, pct=True)
(symbol= result.sort_values().index.tolist()
return(symbol
def _ensure_list(v):
return [] if v is None else v
def get_universe(universe, industries, boards, st_option):
(symbol= set()
if universe != ["*"]:
for index in universe:
(symbolupdate(_ensure_list(index_components(index)))
else:
(symbolupdate(all_instruments("CS").order_book_id)
if industries != ["*"]:
(symbolof_industries = set()
for ind in industries:
(symbolof_industries.update(_ensure_list(shenwan_industry(ind)))
(symbolintersection_update(symbolof_industries)
if boards != ["*"]:
(symbol= {s for s in(symbol if instruments(s).board_type in boards}
if st_option == "only":
(symbol= {s for s in(symbol if is_st_stock(s)}
elif st_option == "excluded":
(symbol= {s for s in(symbol if not is_st_stock(s)}
return list(symbol
def sell_out_all(portfolio):
for order_book_id, position in portfolio.positions.items():
if position.quantity > 0:
order_target_value(order_book_id, 0)
def init(context):
context.count = -1
context.strategy_stop = False
context.day_stop = False
context.market_panic = False
context.pool = []
run_weekly(func=rebalance_first_part, date_rule=1)
run_weekly(func=rebalance_second_part, date_rule=2)
def before_trading(context):
context.count += 1
context.day_stop = False
context.market_panic = False
def handle_bar(context, bar_dict):
if context.strategy_stop or context.day_stop or context.market_panic:
sell_out_all(context.portfolio)
return
if MARKET_PANIC_SIGNALS and market_panic(MARKET_PANIC_SIGNALS, bar_dict):
print('大盘止损触发')
context.market_panic = True
sell_out_all(context.portfolio)
return
if (HOLDING_PROFIT_TAKEN is not None or HOLDING_STOP_LOSS is not None or
STRATEGY_PROFIT_TAKEN is not None or STRATEGY_STOP_LOSS is not None):
total_returns = context.portfolio.total_returns
fired = False
if ((STRATEGY_PROFIT_TAKEN is not None and STRATEGY_PROFIT_TAKEN < total_returns) or
(STRATEGY_STOP_LOSS is not None and STRATEGY_STOP_LOSS > total_returns)):
fired = True
context.strategy_stop = True
if ((HOLDING_PROFIT_TAKEN is not None and HOLDING_PROFIT_TAKEN < total_returns) or
(HOLDING_STOP_LOSS is not None and HOLDING_STOP_LOSS > total_returns)):
fired = True
context.day_stop = True
if fired:
sell_out_all(context.portfolio)
return
if SINGLE_STOP_LOSS is None and SINGLE_PROFIT_TAKEN is None:
return
for order_book_id, position in context.portfolio.positions.items():
if position.quantity == 0:
continue
profit = bar_dict[order_book_id].last / position.avg_price - 1
if SINGLE_PROFIT_TAKEN is not None and SINGLE_PROFIT_TAKEN < profit:
print('止盈卖出:', order_book_id, position.quantity)
order_target_value(order_book_id, 0)
elif SINGLE_STOP_LOSS is not None and SINGLE_STOP_LOSS > profit:
print('止损卖出:', order_book_id, position.quantity)
order_target_value(order_book_id, 0)
def rebalance_first_part(context, bar_dict):
if context.strategy_stop:
return
if context.count % SELECT_INTERVAL == 0:
(symbol= get_universe(UNIVERSE, INDUSTRIES, BOARDS, ST_OPTION)
filtered = apply_filters(symbol, FILTERS, bar_dict)
context.pool = filtered
if MARKET_PANIC_SIGNALS and market_panic(MARKET_PANIC_SIGNALS, bar_dict):
context.market_panic = True
sell_out_all(context.portfolio)
return
if context.count % SELL_INTERVAL == 0:
need_adjust = []
max_position_value = None
if MAX_WEIGHT is not None:
max_position_value = context.portfolio.total_value * MAX_WEIGHT
need_adjust = [s for s, position in context.portfolio.positions.items()
if position.market_value > max_position_value]
if SELL_CONDITIONS:
(symbol= [s for s, position in context.portfolio.positions.items() if position.quantity > 0]
sell_list = apply_filters(symbol, SELL_CONDITIONS, bar_dict)
for s in sell_list:
order_target_value(s, 0)
if s in need_adjust:
need_adjust.remove(s)
for s in need_adjust:
order_target_value(s, max_position_value)
def rebalance_second_part(context, bar_dict):
if (context.count % BUY_INTERVAL != 0 or context.strategy_stop
or context.day_stop or context.market_panic):
return
holdings = [s for s, position in context.portfolio.positions.items() if position.quantity > 0]
if len(holdings) >= MAX_HOLDING_NUM:
print('持仓数已达到限制,不再买入')
return
(symbol= [s for s in context.pool if not is_suspended(s)]
(symbol= apply_filters(symbol, BUY_CONDITIONS, bar_dict)
if not(symbol:
return
(symbol= [s for s in(symbol if bar_dict[s].last < bar_dict[s].limit_up]
sorted(symbol = sort(symbol, SORTING_RULES, bar_dict)
buy_list = sorted(symbol:MAX_HOLDING_NUM-len(holdings)]
if not buy_list:
return
target_value = context.stock_account.cash * 0.99 / len(buy_list)
if MAX_WEIGHT is not None:
max_position_value = context.portfolio.total_value * MAX_WEIGHT
target_value = min(target_value, max_position_value)
for s in buy_list:
order_target_value(s, target_value)
求助!请问怎么修改能在同花顺mindgo 运行出来!!
- 写回答
- 好问题 0 提建议
- 追加酬金
- 关注问题
- 邀请回答
-
1条回答 默认 最新
- 有问必答小助手 2021-04-16 10:18关注
你好,我是有问必答小助手。为了技术专家团更好地为您解答问题,烦请您补充下(1)问题背景详情,(2)您想解决的具体问题,(3)问题相关代码图片或者报错信息。便于技术专家团更好地理解问题,并给出解决方案。
您可以点击问题下方的【编辑】,进行补充修改问题。解决评论 打赏 举报无用 2
悬赏问题
- ¥15 file converter 转换格式失败 报错 Error marking filters as finished,如何解决?
- ¥15 ubuntu系统下挂载磁盘上执行./提示权限不够
- ¥15 Arcgis相交分析无法绘制一个或多个图形
- ¥15 关于#r语言#的问题:差异分析前数据准备,报错Error in data[, sampleName1] : subscript out of bounds请问怎么解决呀以下是全部代码:
- ¥15 seatunnel-web使用SQL组件时候后台报错,无法找到表格
- ¥15 fpga自动售货机数码管(相关搜索:数字时钟)
- ¥15 用前端向数据库插入数据,通过debug发现数据能走到后端,但是放行之后就会提示错误
- ¥30 3天&7天&&15天&销量如何统计同一行
- ¥30 帮我写一段可以读取LD2450数据并计算距离的Arduino代码
- ¥15 飞机曲面部件如机翼,壁板等具体的孔位模型