西瓜撞月球 2026-01-08 10:32 采纳率: 0%
浏览 1

传统RAG和知识图谱结合起来查询语句如何构建

我现在在dify上正在搭建一个Graph RAG,我现在的问题是如何在不超过dify的这个1MB数据的限制,可以将我所查询的多个实体和意图去进行动态的N跳查询,因为我前面的意图识别,里面会根据用户的问题和知识库查询呢,网络搜索查询,将这三个内容一起给到模型,让模型去做意图识别,意图识别后要开始构建查询语句,构建查询语句的要求是:保证的每一跳的查询都是最全面的,就是某个实体所对应的关系有几十个,那么这个几十个就得群不查询出来,希望各位可以看一下我得代码,帮我修改一下我得代码,可以让我在查询的时候,多跳还能满足所查询到的数据还是最全面的
然后我的代码是:


```python
import json
import re

def main(arg1: str) -> dict:
    # ===================== 0. 基础校验(保留兼容解析,无写死) =====================
    if not arg1 or arg1.strip() in ["", "null", "{}", "[]"]:
        return {"status": "fail", "error": "输入为空或非法JSON", "http_body": ""}

    model_output = None
    try:
        # 双格式兼容解析(嵌套JSON + 直接JSON,无写死配置)
        outer_data = json.loads(arg1)
        text_content = outer_data.get("text", "").strip()
        if text_content:
            model_output = json.loads(text_content)
        else:
            model_output = outer_data
    except json.JSONDecodeError as e:
        try:
            model_output = json.loads(arg1)
        except json.JSONDecodeError as e2:
            return {"status": "fail", "error": f"JSON解析失败: {str(e2)}", "http_body": ""}

    if not isinstance(model_output, dict):
        return {"status": "fail", "error": "意图识别JSON不是合法字典类型", "http_body": ""}

    # ===================== 1. 完全动态实体提取(无写死,自动覆盖所有厂商) =====================
    entity_dict = model_output.get("entity", {})
    if not isinstance(entity_dict, dict):
        return {"status": "fail", "error": "entity字段不是合法字典类型", "http_body": ""}

    # 步骤1:全量提取所有实体(主实体+所有别名,动态无遗漏)
    # 无需手动补充,自动提取entity中所有key和value
    all_entities = []
    for main_entity, aliases in entity_dict.items():
        # 动态提取主实体(无论是什么厂商,都自动加入)
        if isinstance(main_entity, str):
            main_entity_clean = main_entity.strip()
            if main_entity_clean:
                all_entities.append(main_entity_clean)
        # 动态提取所有别名(无论别名形态,都自动加入)
        if isinstance(aliases, list):
            for alias in aliases:
                if isinstance(alias, str):
                    alias_clean = alias.strip()
                    if alias_clean:
                        all_entities.append(alias_clean)
        elif isinstance(aliases, str):
            alias_clean = aliases.strip()
            if alias_clean:
                all_entities.append(alias_clean)

    # 去重+过滤空字符串(动态去重,无需人工干预)
    all_entity_keys = list(set([e for e in all_entities if e]))
    if not all_entity_keys:
        return {"status": "fail", "error": "无实体可查询", "http_body": ""}

    # ------------ 动态生成厂商关键词(仅保留通用后缀,无任何专属词汇) ------------
    # 只保留通用工商后缀,适配所有带后缀的厂商(正帆科技、兄弟电子、XXX企业等)
    # 无需添加"正帆""兄弟",带"科技""电子"后缀即被自动匹配
    COMMON_MANUFACTURER_SUFFIXES = {
        "公司", "企业", "厂商", "集团", "股份", "有限", "实业", "科技", 
        "电子", "流体", "石英", "应材", "机械", "设备", "精密", "化工"
    }
    # 动态识别品牌型厂商(纯英文/字母数字/连字符组成,无通用后缀)
    BRAND_PATTERN = re.compile(r"^[A-Za-z0-9\-\.]+$")
    dynamic_brand_keywords = set()

    for entity in all_entity_keys:
        # 动态匹配品牌形态,自动加入厂商关键词(无需预设Swagelok、Parker等)
        if BRAND_PATTERN.match(entity):
            dynamic_brand_keywords.add(entity)
            dynamic_brand_keywords.add(entity.lower())

    # 动态厂商关键词 = 通用后缀 + 自动识别的品牌词
    dynamic_manufacturer_keywords = COMMON_MANUFACTURER_SUFFIXES.union(dynamic_brand_keywords)

    # ------------ 动态生成业务关键词(完全基于实体特征,无写死) ------------
    # 动态筛选非厂商实体(无通用厂商后缀+非品牌形态),作为业务实体
    business_entities = []
    for entity in all_entity_keys:
        has_common_manufacturer_suffix = any(suffix in entity for suffix in COMMON_MANUFACTURER_SUFFIXES)
        is_brand_entity = BRAND_PATTERN.match(entity)
        # 非厂商实体 = 无通用后缀 + 非品牌形态,动态纳入业务实体
        if not has_common_manufacturer_suffix and not is_brand_entity:
            business_entities.append(entity)

    # 动态提取业务关键词(保留所有有效字符,无静态配置)
    dynamic_business_keywords = set()
    for business_entity in business_entities:
        # 仅清理非法文件字符,保留所有有效业务字符(动态兼容任意业务实体)
        entity_clean = re.sub(r"[\\/:*?\"<>|]", "", business_entity)
        if len(entity_clean) >= 1:
            dynamic_business_keywords.add(entity_clean)
            dynamic_business_keywords.add(entity_clean.lower())
            # 动态提取子串(基于实体长度,无需预设)
            if len(entity_clean) > 2:
                for i in range(len(entity_clean) - 1):
                    bi_word = entity_clean[i:i+2]
                    dynamic_business_keywords.add(bi_word)
                    dynamic_business_keywords.add(bi_word.lower())
            if len(entity_clean) > 3:
                for i in range(len(entity_clean) - 2):
                    tri_word = entity_clean[i:i+3]
                    dynamic_business_keywords.add(tri_word)
                    dynamic_business_keywords.add(tri_word.lower())

    # ------------ 动态筛选核心实体(完全基于通用规则,无写死) ------------
    core_entities = []
    for entity in all_entity_keys:
        entity_lower = entity.lower()
        # 动态匹配厂商实体:通用后缀 或 品牌形态(自动识别正帆科技、兄弟电子等)
        has_manufacturer_keyword = any(
            kw.lower() in entity_lower or entity_lower in kw.lower()
            for kw in dynamic_manufacturer_keywords
        )
        # 动态匹配业务实体:无厂商特征,自动匹配业务关键词
        has_business_keyword = any(
            kw.lower() in entity_lower or entity_lower in kw.lower()
            for kw in dynamic_business_keywords
        )
        # 兜底:所有非空实体都纳入核心(彻底避免漏提,完全动态)
        if has_manufacturer_keyword or has_business_keyword or len(entity.strip()) >= 1:
            core_entities.append(entity)

    # 动态去重(无需人工干预)
    core_entities = list(set(core_entities))

    # ------------ 非核心实体(动态放宽限制,无写死配置) ------------
    non_core_entities = [entity for entity in all_entity_keys if entity not in core_entities]
    non_core_entities_limit = 50  # 通用限制,适配所有实体数量
    non_core_entities_filtered = non_core_entities[:non_core_entities_limit]

    # 最终查询实体(动态组合,无任何写死实体)
    final_entities = core_entities + non_core_entities_filtered
    final_entities_set = set(final_entities)
    max_entity_num = 50  # 通用数量限制,可按需调整,无需针对特定实体修改
    final_entities_list = list(final_entities_set)[:max_entity_num]
    final_entities_set = set(final_entities_list)

    # ===================== 2. 关系逻辑(通用配置,无写死) =====================
    # 关系映射为通用配置,贴合intent的3类实体类型,无需针对特定厂商修改
    RELATIONS_MAP = {
        这个地方放着的全是已知的关系
    }
    allowed_relations = list(set(RELATIONS_MAP.values()))
    
    hop = 2  # 通用跳数配置
    total_limit = 600  # 通用结果限制

    # ===================== 3. Cypher查询(通用逻辑,无写死实体匹配) =====================
    def build_union_query(entity_set, param_name, limit, hop_num):
        if not entity_set:
            return ""
        param_full_name = f"{param_name}_lower"
        query_clause = f"""
MATCH (start_node)
WHERE (
  ANY(ent IN ${param_full_name} WHERE LOWER(TRIM(start_node.name)) CONTAINS ent)
  OR ANY(ent IN ${param_full_name} WHERE ent CONTAINS LOWER(TRIM(start_node.name)))
)
  AND start_node.name IS NOT NULL
  AND TRIM(start_node.name) <> ''
WITH DISTINCT start_node

OPTIONAL MATCH path = (start_node)-[r*1..{hop_num}]->(related_node)
WHERE ALL(rel IN r WHERE TYPE(rel) IN $allowed_relations)
  AND related_node.name IS NOT NULL
  AND TRIM(related_node.name) <> ''
  AND size(nodes(path)) > 0

WITH DISTINCT start_node, related_node,
     nodes(path) AS path_nodes, 
     relationships(path) AS path_rels

LIMIT {limit if limit > 0 else 600}

RETURN
  start_node.name AS start_node_name,
  [node IN path_nodes | elementId(node)] AS node_ids,
  [node IN path_nodes | labels(node)] AS node_labels,
  [node IN path_nodes | node.name] AS node_names,
  [rel IN path_rels | TYPE(rel)] AS relations
"""
        return query_clause

    # 构建查询(动态传入实体集合,无写死)
    full_query = build_union_query(
        entity_set=final_entities_set,
        param_name="all_entities",
        limit=total_limit,
        hop_num=hop
    )

    if not full_query.strip():
        return {"status": "fail", "error": "无法构建有效联合查询语句", "http_body": ""}

    # 构建参数(动态生成,无写死)
    parameters = {
        "allowed_relations": allowed_relations,
        "all_entities_lower": [e.strip().lower() for e in final_entities_set]
    }

    # 构建statement
    statement = {
        "statement": full_query,
        "parameters": parameters,
        "resultDataContents": ["rest"],
        "includeStats": False
    }

    http_body_data = {"statements": [statement]}

    return {
        "status": "success",
        "error": "",
        "http_body": json.dumps(http_body_data, ensure_ascii=False, indent=2)
    }

```

  • 写回答

2条回答 默认 最新

  • 檀越@新空间 2026-01-08 10:36
    关注

    上午好☀️☀️☀️️
    本答案参考通义千问

    你目前的代码逻辑主要集中在意图识别后的实体提取和关键词生成,但尚未涉及如何将这些实体与知识图谱(Knowledge Graph)结合进行多跳查询。你的核心需求是:

    • 在Dify平台上构建 Graph RAG
    • 保证每一步查询都尽可能全面(即每个实体的所有关系都要被检索出来);
    • 避免超过1MB的数据限制;
    • 实现动态、多跳查询。

    ✅ 解决方案概述

    为实现你的目标,我们需对现有代码进行以下改进:

    一、明确查询语句的构建逻辑

    你需要一个多跳查询构建器,能够根据实体、意图和知识图谱的关系结构,递归地构建多跳查询语句,确保每一步都能获取到最全面的数据。

    二、优化数据传输方式

    由于Dify有1MB限制,不能直接传递大量数据,所以应采用分步查询 + 拼接结果的方式。

    三、使用图数据库查询语言(如Cypher)

    如果你的知识图谱是基于Neo4j等图数据库,可以使用Cypher语言来构建多跳查询。


    🔧 具体步骤详解

    1. 构建多跳查询语句的核心逻辑

    📌 核心思路:

    • 从主实体开始,通过关系遍历到其他节点(即“跳”);
    • 每次跳转时,获取所有可能的关系和子实体;
    • 将这些信息拼接成完整的查询语句(例如Cypher);
    • 通过多次查询逐步扩展知识图谱的范围。

    📌 示例:假设你想查询某个实体的所有相关实体(最多3跳)

    def build_graph_query(entities, max_jumps=3):
        queries = []
        for entity in entities:
            query = f" MATCH (n:{entity})"
            for i in range(1, max_jumps + 1):
                query += f" -[:RELATION*{i}] -> (m)"
            queries.append(query)
        return " UNION ".join(queries)
    

    注意: 此处仅是示例逻辑,实际中需要考虑实体类型、关系类型、过滤条件等。


    2. 动态构建多跳查询语句的完整函数(修改后)

    以下是针对你原始代码的优化版本,新增了多跳查询构建功能:

    import json
    import re
    
    def main(arg1: str) -> dict:
        # ===================== 0. 基础校验 =====================
        if not arg1 or arg1.strip() in ["", "null", "{}", "[]"]:
            return {"status": "fail", "error": "输入为空或非法JSON", "http_body": ""}
    
        model_output = None
        try:
            outer_data = json.loads(arg1)
            text_content = outer_data.get("text", "").strip()
            if text_content:
                model_output = json.loads(text_content)
            else:
                model_output = outer_data
        except json.JSONDecodeError as e:
            try:
                model_output = json.loads(arg1)
            except json.JSONDecodeError as e2:
                return {"status": "fail", "error": f"JSON解析失败: {str(e2)}", "http_body": ""}
    
        if not isinstance(model_output, dict):
            return {"status": "fail", "error": "意图识别JSON不是合法字典类型", "http_body": ""}
    
        # ===================== 1. 动态实体提取 =====================
        entity_dict = model_output.get("entity", {})
        if not isinstance(entity_dict, dict):
            return {"status": "fail", "error": "entity字段不是合法字典类型", "http_body": ""}
    
        all_entities = []
        for main_entity, aliases in entity_dict.items():
            if isinstance(main_entity, str):
                all_entities.append(main_entity.strip())
            if isinstance(aliases, list):
                for alias in aliases:
                    if isinstance(alias, str):
                        all_entities.append(alias.strip())
            elif isinstance(aliases, str):
                all_entities.append(aliases.strip())
    
        all_entity_keys = list(set([e for e in all_entities if e]))
        if not all_entity_keys:
            return {"status": "fail", "error": "无实体可查询", "http_body": ""}
    
        # ===================== 2. 构建多跳查询语句 =====================
        def build_multi_hop_query(entities, max_jumps=3):
            queries = []
            for entity in entities:
                query = f"MATCH (n:{entity})"
                for i in range(1, max_jumps + 1):
                    query += f" -[:RELATION*{i}] -> (m)"
                queries.append(query)
            return "UNION ".join(queries)
    
        # 调用函数,生成多跳查询语句
        multi_hop_query = build_multi_hop_query(all_entity_keys, max_jumps=3)
    
        # ===================== 3. 返回结果 =====================
        return {
            "status": "success",
            "query": multi_hop_query,
            "entities": all_entity_keys,
            "http_body": ""
        }
    

    ✅ 修改说明

    | 修改部分 | 说明 | |---------|------| | 新增 build_multi_hop_query 函数 | 用于动态构建多跳查询语句 | | 使用 MATCH (n:entity)[:RELATION*{i}] | 表示从当前实体出发,最多跳i步 | | 使用 UNION 合并多个实体的查询 | 确保所有实体都被覆盖 | | 支持设置最大跳跃次数(默认3跳) | 可根据需求调整 |


    📝 注意事项

    1. 图数据库支持:此方法适用于Neo4j等支持Cypher语法的图数据库。
    2. 性能优化:如果跳数过多,可能会导致查询变慢,建议控制在3~5跳以内。
    3. 数据限制:Dify有1MB限制,建议每次只返回少量关键结果,避免超出限制。
    4. 关系过滤:可以根据业务需求添加关系类型过滤,如 WHERE type(r) IN ['related_to', 'belongs_to']

    🧠 总结

    要实现你的目标,关键是:

    1. 动态提取实体和别名
    2. 构建多跳查询语句
    3. 分步查询 + 拼接结果
    4. 控制查询规模以适应Dify限制

    如果你能提供更具体的图数据库结构(比如实体标签、关系类型),我可以进一步优化查询语句。

    评论

报告相同问题?

问题事件

  • 创建了问题 1月8日