W27769 2021-04-15 19:24 采纳率: 0%
浏览 144

求助!请问怎么修改能在同花顺mindgo 运行出来!!

import numpy as np
import pandas as pd
import talib

UNIVERSE = ['*']
BOARDS = ['MainBoard']
INDUSTRIES = ['*']
ST_OPTION = 'excluded'

SELECT_INTERVAL = 15
BUY_INTERVAL = 1
SELL_INTERVAL = 1

MAX_HOLDING_NUM = 6
MAX_WEIGHT = 0.15

SINGLE_PROFIT_TAKEN = 0.2
SINGLE_STOP_LOSS = -0.1
HOLDING_PROFIT_TAKEN = None
HOLDING_STOP_LOSS = None
STRATEGY_PROFIT_TAKEN = None
STRATEGY_STOP_LOSS = None

MARKET_ENTER_SIGNALS = [
    
]

MARKET_PANIC_SIGNALS = [
    
]

FILTERS = [
    
    {'operator': 'greater_than', 'lhs': {'name': 'return_on_equity', 'type': 'fundamental'}, 'rhs': 20},
    
    {'operator': 'greater_than', 'lhs': {'name': 'turnover_rate', 'type': 'extra', 'parameters': ['month']}, 'rhs': 0.03},
    
]

SORTING_RULES = [
    
]

SELL_CONDITIONS = [
    
]

BUY_CONDITIONS = [
    
    {'lhs': {'name': 'STOCH.slowk', 'type': 'technical', 'parameters': [5, 3, 3]}, 'operator': 'cross', 'rhs': {'name': 'STOCH.slowd', 'type': 'technical', 'parameters': [5, 3, 3]}},
    
    {'operator': 'greater_than', 'lhs': {'name': 'turnover_rate', 'type': 'extra', 'parameters': ['month']}, 'rhs': 0.03},
    
]


FUNDAMENTAL_FACTORS = None


def fundamental_factor_map():
    global FUNDAMENTAL_FACTORS
    if FUNDAMENTAL_FACTORS is not None:
        return FUNDAMENTAL_FACTORS

    result = {}
    result.update(fundamentals.eod_derivative_indicator.__dict__)
    result.update(fundamentals.balance_sheet.__dict__)
    result.update(fundamentals.cash_flow.__dict__)
    result.update(fundamentals.financial_indicator.__dict__)
    result.update(fundamentals.income_statement.__dict__)

    FUNDAMENTAL_FACTORS = result

    return result
    
def get_fundamental_factor(symbol, factor):
    try:
        factor = fundamental_factor_map()[factor]
    except KeyError:
        return pd.Series({})

    df = get_fundamentals(query(factor).filter(fundamentals.stockcode.in_(symbol))
    if df.empty:
        return pd.Series({})
    return df.iloc[0]

def get_turnover(symbol, count=1, fields=None):
    result = get_turnover_rate(symbol, count=count, fields=fields)
    if isinstance(result, pd.Series):
        name =(symbol if isinstance(symbol, str) else(symbol0]
        return result.to_frame(name=name)

    return result


SIMPLE_OPERATOR = {
    "greater_than",
    "less_than",
    "in_range",
    "rank_in_range",
}


def apply_simple_operator(operator, data, rhs):
    if operator == "greater_than":
        return data.index[data > rhs].tolist()
    elif operator == "less_than":
        return data.index[data < rhs].tolist()
    elif operator == "in_range":
        vmin, vmax = rhs
        return data.index[(data > vmin) & (data < vmax)].tolist()
    else:
        assert operator == "rank_in_range"
        vmin, vmax = rhs[0] / 100.0, rhs[1] / 100.0
        rank = data.rank(pct=True)
        c = (rank > vmin) & (rank < vmax)
        return c[c].index.tolist()


KNOWN_FILTERS = {}


def register_filter(category, detail):
    def decorator(fun):
        global KNOWN_FILTERS
        KNOWN_FILTERS[category, detail] = fun
        return fun

    return decorator


@register_filter('fundamental', '*')
def fundamental_filter(symbol, rule, bar_dict):
    if rule["operator"] not in SIMPLE_OPERATOR:
        print('[WARNING] unknown operator in rule:', rule)
        return(symbol

    data = get_fundamental_factor(symbol, rule["lhs"]["name"])
    return apply_simple_operator(rule["operator"], data, rule["rhs"])


@register_filter('extra', 'listed_days')
def listed_days_filter(symbol, rule, bar_dict):
    data = pd.Series({
        s: instruments(s).days_from_listed() for s in(symbol
    })
    return apply_simple_operator(rule["operator"], data, rule["rhs"])


@register_filter('extra', 'turnover_rate')
def turnover_rate_filter(symbol, rule, bar_dict):
(symbol= [s for s in(symbol if instruments(s).days_from_listed() > 0]
    lhs = rule["lhs"]
    if rule["operator"] in SIMPLE_OPERATOR:
        data = get_turnover(symbol, fields=lhs["parameters"]).iloc[0]
        return apply_simple_operator(rule["operator"], data, rule["rhs"])

    if not rule['operator'] in {'greater_than_ma', 'less_than_ma', 'in_ma_range'}:
        print('unknown operator for rule:', rule)
        return(symbol

    count = rule["rhs"][1] if isinstance(rule["rhs"], list) else rule["rhs"]
    data = get_turnover(symbol, count=count, fields=rule["parameters"])
    value = data.iloc[-1]
    if rule["operator"] == "greater_than_ma":
        return value.index[value > data.mean()].tolist()
    elif rule["operator"] == "less_than_ma":
        return value.index[value < data.mean()].tolist()
    else:
        l, r = rule["parameters"]
        ma1 = data.iloc[:l].mean()
        ma2 = data.mean()
        condition = (value > ma1) & (value < ma2)
        condition |= (value < ma1) & (value > ma2)
        return condition[condition].index.tolist()


@register_filter('pricing', '*')
def pricing_filter(symbol, rule, bar_dict):
    # 当天上市的股票无历史数据,不会满足条件,过滤掉
(symbol= [s for s in(symbol if instruments(s).days_from_listed() > 0]
    lhs, operator, rhs = rule['lhs'], rule['operator'], rule['rhs']
    if operator == 'rank_in_range':
        data = pd.Series({s: history_bars(s, 1, '1d', fields=lhs['name'])[-1] for s in(symbol})
        return apply_simple_operator(operator, data, rhs)

    filtered(symbol = []
    for s in(symbol:
        lv = history_bars(s, 1, '1d', fields=lhs['name'])[-1]
        if isinstance(rhs, dict):
            rv = get_factor_value(s, rhs, bar_dict)
        else:
            rv = rhs
        if operator == "greater_than":
            if lv > _to_scalar(rv):
                filtered(symbol.append(s)
        elif operator == "less_than":
            if lv < _to_scalar(rv):
                filtered(symbol.append(s)
        elif operator == "in_range":
            l, r = rv
            if l < lv < r:
                filtered(symbol.append(s)
        elif operator == "greater_than_ma":
            ma = history_bars(s, rv, "1d", fields=lhs["name"]).mean()
            if lv > ma:
                filtered(symbol.append(s)
        elif operator == "less_than_ma":
            ma = history_bars(s, rv, "1d", fields=lhs["name"]).mean()
            if lv < ma:
                filtered(symbol.append(s)
        elif operator == "in_ma_range":
            data = history_bars(s, rv[1], "1d", fields=lhs["name"])
            ma1 = data[:rv[0]].mean()
            ma2 = data.mean()
            if ma1 < lv < ma2 or ma2 < lv < ma1:
                filtered(symbol.append(s)
        else:
            raise RuntimeError("不支持的操作符: {}, 因子 {}".format(operator, lhs))
    return filtered(symbol


def n_day_gain_rate(stock, n):
    l = history_bars(stock, n, '1d', 'close')
    return (l[-1] - l[0]) / l[0]


@register_filter('pricing', 'n_day_gain_rate')
def n_day_gain_rate_filter(symbol, rule, bar_dict):
    n = rule['lhs']['parameters'][0]
    data = pd.Series({
        s: n_day_gain_rate(s, n)
        for s in(symbol if instruments(s).days_from_listed() >= n
    })
    return apply_simple_operator(rule['operator'], data, rule['rhs'])


@register_filter('pricing', 'n_day_loss_rate')
def n_day_loss_rate_filter(symbol, rule, bar_dict):
    n = rule['lhs']['parameters'][0]
    data = pd.Series({
        s: -n_day_gain_rate(s, n)
        for s in(symbol if instruments(s).days_from_listed() >= n
    })
    return apply_simple_operator(rule['operator'], data, rule['rhs'])


@register_filter('technical', '*')
def technical_filter(symbol, rule, bar_dict):
    # 对于技术指标来说,当天上市的股票不会满足条件
(symbol= [s for s in(symbol if instruments(s).days_from_listed() > 0]
    operator = rule["operator"]
    lhs = rule["lhs"]
    rhs = rule["rhs"]
    filtered(symbol = []
    for s in(symbol:
        lv = get_technical_series(s, lhs)
        rv = get_factor_value(s, rhs, bar_dict) if isinstance(rhs, dict) else rhs
        if operator == "greater_than" or operator == "long":
            if lv[-1] > _to_scalar(rv):
                filtered(symbol.append(s)
        elif operator == "less_than" or operator == "short":
            if lv[-1] < _to_scalar(rv):
                filtered(symbol.append(s)
        elif operator == "in_range":
            if rv[0] < lv[-1] < rv[1]:
                filtered(symbol.append(s)
        elif operator == "cross":
            if len(lv) < 2 or len(rv) < 2:
                continue
            if lv[-2] < rv[-2] and lv[-1] > rv[-1]:
                filtered(symbol.append(s)
        elif operator == "reverse_cross":
            if len(lv) < 2 or len(rv) < 2:
                continue
            if lv[-2] > rv[-2] and lv[-1] < rv[-1]:
                filtered(symbol.append(s)
        else:
            raise RuntimeError(
                "不支持的规则:{} {} {}".format(
                    lhs, operator, rhs))
    return filtered(symbol
    
@register_filter('CDL', '*')
def CDL_filter(symbol, rule, bar_dict):
(symbol= [s for s in(symbol if instruments(s).days_from_listed() > 0]
    name = rule["lhs"]["name"]
    filtered(symbol = []
    for stock in(symbol:
        function = getattr(talib, name)
        result = function(
            history_bars(stock, 100, '1d', 'open'),
            history_bars(stock, 100, '1d', 'high'),
            history_bars(stock, 100, '1d', 'low'),
            history_bars(stock, 100, '1d', 'close')
        )
        if len(result) > 0 and result[-1] != 0:
            filtered(symbol.append(stock)
    return filtered(symbol


def _to_scalar(v):
    return v[-1] if isinstance(v, np.ndarray) else v


def _extract_input_names(d):
    if "price" in d:
        return [d["price"]]
    elif "prices" in d:
        return d["prices"]
    return []


def get_technical_series(stock, factor):
    name = factor["name"]
    output = "real"
    if "." in name:
        name, output = name.split(".")
    func = getattr(talib.abstract, name)
    func.set_parameters(
        dict(zip(func.parameters.keys(),
                 factor["parameters"])))
    func.set_input_arrays({
        name: history_bars(stock, 100, "1d", fields=name)
        for name in _extract_input_names(func.input_names)
    })
    if output == "real":
        return func.outputs
    output_index = func.output_names.index(output)
    return func.outputs[output_index]


def get_factor_value(stock, factor, bar_dict):
    # 对于量价指标返回值
    if factor["type"] == "pricing":
        return history_bars(stock, 1, '1d', fields=factor['name'])[-1]
    # 对于技术指标返回 series
    return get_technical_series(stock, factor)


RULE_SCORE = {
    'fundamental': 0,
    'extra': 1,
    'pricing': 2,
    'technical': 3,
    'CDL': 4,
}


def filter_for(rule):
    lhs = rule['lhs']
    try:
        return KNOWN_FILTERS[lhs['type'], lhs['name']]
    except KeyError:
        return KNOWN_FILTERS[lhs['type'], '*']


def apply_filters(symbol, filters, bar_dict):
    filters = sorted(filters, key=lambda r: RULE_SCORE[r['lhs']['type']])
    for rule in filters:
        if not(symbol:
            return []

        f = filter_for(rule)
    (symbol= f(symbol, rule, bar_dict)

    return(symbol


def market_panic(rules, bar_dict):
    for r in rules:
        index, rule = r['index'], r['rule']
        f = filter_for(rule)
        result = f([index], rule, bar_dict)
        if result:
            return True

    return False


def market_enter(rules, bar_dict):
    for r in rules:
        index, rule = r['index'], r['rule']
        f = filter_for(rule)
        result = f([index], rule, bar_dict)
        if not result:
            return False

    return True


def sort(symbol, rules, bar_dict):
    if len(symbol <= 1:
        return(symbol
    if not rules:
        return sorted(symbol

    result = pd.Series(data=0.0, index(symbol)
    for rule in rules:
        factor = rule["factor"]
        if factor["type"] == "fundamental":
            data = get_fundamental_factor(symbol, factor["name"])
        elif factor["type"] == "pricing":
            data = pd.Series({s: getattr(bar_dict[s], factor["name"]) for s in(symbol})
        elif factor["type"] == "extra":
            if factor["name"] == "listed_days":
                data = pd.Series({s: instruments(s).days_from_listed() for s in(symbol})
            else:
                data = get_turnover(symbol, fields=factor["parameters"]).iloc[0]
        else:
            data = pd.Series({s: (get_technical_series(s, factor)[-1] if instruments(s).days_from_listed() > 0 else np.nan) for s in(symbol})
        na_option = "bottom" if rule["ascending"] else "top"
        result += data.rank(method="average", ascending=rule['ascending'], na_option=na_option, pct=True)
(symbol= result.sort_values().index.tolist()
    return(symbol


def _ensure_list(v):
    return [] if v is None else v


def get_universe(universe, industries, boards, st_option):
(symbol= set()
    if universe != ["*"]:
        for index in universe:
        (symbolupdate(_ensure_list(index_components(index)))
    else:
    (symbolupdate(all_instruments("CS").order_book_id)
        
    if industries != ["*"]:
    (symbolof_industries = set()
        for ind in industries:
        (symbolof_industries.update(_ensure_list(shenwan_industry(ind)))
    (symbolintersection_update(symbolof_industries)

    if boards != ["*"]:
    (symbol= {s for s in(symbol if instruments(s).board_type in boards}
    if st_option == "only":
    (symbol= {s for s in(symbol if is_st_stock(s)}
    elif st_option == "excluded":
    (symbol= {s for s in(symbol if not is_st_stock(s)}

    return list(symbol


def sell_out_all(portfolio):
    for order_book_id, position in portfolio.positions.items():
        if position.quantity > 0:
            order_target_value(order_book_id, 0)



def init(context):
    context.count = -1
    context.strategy_stop = False
    context.day_stop = False
    context.market_panic = False
    context.pool = []
    run_weekly(func=rebalance_first_part, date_rule=1)
    run_weekly(func=rebalance_second_part, date_rule=2)

def before_trading(context):
    context.count += 1
    context.day_stop = False
    context.market_panic = False


def handle_bar(context, bar_dict):
    if context.strategy_stop or context.day_stop or context.market_panic:
        sell_out_all(context.portfolio)
        return

    if MARKET_PANIC_SIGNALS and market_panic(MARKET_PANIC_SIGNALS, bar_dict):
        print('大盘止损触发')
        context.market_panic = True
        sell_out_all(context.portfolio)
        return

    if (HOLDING_PROFIT_TAKEN is not None or HOLDING_STOP_LOSS is not None or
        STRATEGY_PROFIT_TAKEN is not None or STRATEGY_STOP_LOSS is not None):
        total_returns = context.portfolio.total_returns
        fired = False
        if ((STRATEGY_PROFIT_TAKEN is not None and STRATEGY_PROFIT_TAKEN < total_returns) or
            (STRATEGY_STOP_LOSS is not None and STRATEGY_STOP_LOSS > total_returns)):
            fired = True
            context.strategy_stop = True

        if ((HOLDING_PROFIT_TAKEN is not None and HOLDING_PROFIT_TAKEN < total_returns) or
            (HOLDING_STOP_LOSS is not None and HOLDING_STOP_LOSS > total_returns)):
            fired = True
            context.day_stop = True

        if fired:
            sell_out_all(context.portfolio)
            return

    if SINGLE_STOP_LOSS is None and SINGLE_PROFIT_TAKEN is None:
        return

    for order_book_id, position in context.portfolio.positions.items():
        if position.quantity == 0:
            continue

        profit = bar_dict[order_book_id].last / position.avg_price - 1
        if SINGLE_PROFIT_TAKEN is not None and SINGLE_PROFIT_TAKEN < profit:
            print('止盈卖出:', order_book_id, position.quantity)
            order_target_value(order_book_id, 0)
        elif SINGLE_STOP_LOSS is not None and SINGLE_STOP_LOSS > profit:
            print('止损卖出:', order_book_id, position.quantity)
            order_target_value(order_book_id, 0)


def rebalance_first_part(context, bar_dict):
    if context.strategy_stop:
        return

    if context.count % SELECT_INTERVAL == 0:
    (symbol= get_universe(UNIVERSE, INDUSTRIES, BOARDS, ST_OPTION)
        filtered = apply_filters(symbol, FILTERS, bar_dict)
        context.pool = filtered

    if MARKET_PANIC_SIGNALS and market_panic(MARKET_PANIC_SIGNALS, bar_dict):
        context.market_panic = True
        sell_out_all(context.portfolio)
        return

    if context.count % SELL_INTERVAL == 0:
        need_adjust = []
        max_position_value = None
        if MAX_WEIGHT is not None:
            max_position_value = context.portfolio.total_value * MAX_WEIGHT
            need_adjust = [s for s, position in context.portfolio.positions.items()
                           if position.market_value > max_position_value]

        if SELL_CONDITIONS:
        (symbol= [s for s, position in context.portfolio.positions.items() if position.quantity > 0]
            sell_list = apply_filters(symbol, SELL_CONDITIONS, bar_dict)
            for s in sell_list:
                order_target_value(s, 0)
                if s in need_adjust:
                    need_adjust.remove(s)

        for s in need_adjust:
            order_target_value(s, max_position_value)

def rebalance_second_part(context, bar_dict):
    if (context.count % BUY_INTERVAL != 0 or context.strategy_stop 
        or context.day_stop or context.market_panic):
        return
    
    holdings = [s for s, position in context.portfolio.positions.items() if position.quantity > 0]
    if len(holdings) >= MAX_HOLDING_NUM:
        print('持仓数已达到限制,不再买入')
        return

(symbol= [s for s in context.pool if not is_suspended(s)]
(symbol= apply_filters(symbol, BUY_CONDITIONS, bar_dict)
    if not(symbol:
        return

(symbol= [s for s in(symbol if bar_dict[s].last < bar_dict[s].limit_up]
    sorted(symbol = sort(symbol, SORTING_RULES, bar_dict)
    buy_list = sorted(symbol:MAX_HOLDING_NUM-len(holdings)]
    if not buy_list:
        return

    target_value = context.stock_account.cash * 0.99 / len(buy_list)
    if MAX_WEIGHT is not None:
        max_position_value = context.portfolio.total_value * MAX_WEIGHT
        target_value = min(target_value, max_position_value)

    for s in buy_list:
        order_target_value(s, target_value)
  • 写回答

1条回答 默认 最新

  • 有问必答小助手 2021-04-16 10:18
    关注

    你好,我是有问必答小助手。为了技术专家团更好地为您解答问题,烦请您补充下(1)问题背景详情,(2)您想解决的具体问题,(3)问题相关代码图片或者报错信息。便于技术专家团更好地理解问题,并给出解决方案。

    您可以点击问题下方的【编辑】,进行补充修改问题。

    评论

报告相同问题?

悬赏问题

  • ¥15 file converter 转换格式失败 报错 Error marking filters as finished,如何解决?
  • ¥15 ubuntu系统下挂载磁盘上执行./提示权限不够
  • ¥15 Arcgis相交分析无法绘制一个或多个图形
  • ¥15 关于#r语言#的问题:差异分析前数据准备,报错Error in data[, sampleName1] : subscript out of bounds请问怎么解决呀以下是全部代码:
  • ¥15 seatunnel-web使用SQL组件时候后台报错,无法找到表格
  • ¥15 fpga自动售货机数码管(相关搜索:数字时钟)
  • ¥15 用前端向数据库插入数据,通过debug发现数据能走到后端,但是放行之后就会提示错误
  • ¥30 3天&7天&&15天&销量如何统计同一行
  • ¥30 帮我写一段可以读取LD2450数据并计算距离的Arduino代码
  • ¥15 飞机曲面部件如机翼,壁板等具体的孔位模型