weixin_44777039 2022-03-31 13:17
浏览 694
已结题

datahub如何编辑字段级别的血缘关系

问题遇到的现象和发生背景

我想要设置字段级别的血缘关系, 但是我根据官网和 您的代码进行了一些修改, 可是没有生效, 您能帮忙分析一下或者给一个 设置字段级别的血缘关系 的代码实例可以吗?

问题相关代码,请勿粘贴截图

from typing import List
import datetime
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
    DatasetLineageType,
    FineGrainedLineage,
    FineGrainedLineageDownstreamType,
    FineGrainedLineageUpstreamType,
    Upstream,
    UpstreamLineage
)
from datahub.metadata.schema_classes import *


def datasetUrn(tbl):
    return builder.make_dataset_urn("hive", tbl)


def fldUrn(tbl, fld):
    return builder.make_schema_field_urn(datasetUrn(tbl), fld)


def add_metadata_by_me(urn: str, desc: str, emitter: DatahubRestEmitter):
    # 构造一个数据集属性对象
    dataset_properties = DatasetPropertiesClass(description=desc)

    # 构造一个MetadataChangeProposalWrapper对象
    metadata_event = MetadataChangeProposalWrapper(
        entityType="dataset",
        changeType=ChangeTypeClass.UPSERT,
        entityUrn=urn,
        aspectName="datasetProperties",
        aspect=dataset_properties,
    )

    # emit 元数据,这是一个阻塞调用
    res = emitter.emit(metadata_event)
    print(res)

def add_data_lineage(src_urns: List[str], dest_urn: str, emitter: DatahubRestEmitter):
    # 构建数据血缘上流对象UpstreamClass实例
    upstream_tables: List[UpstreamClass] = []
    for urn in src_urns:
        upstream_tables.append(
            UpstreamClass(
                dataset=urn,
                type=DatasetLineageTypeClass.TRANSFORMED,
                auditStamp=AuditStampClass(
                    time= int(datetime.datetime.now().timestamp()*1000),
                    actor="urn:li:corpuser:datahub",
                ),
            )
        )
    # 构建上流数据血缘对象实例
    fineGrainedLineages = [
        # FineGrainedLineage(
        #     upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
        #     upstreams=[fldUrn("bar2", "c1"), fldUrn("bar4", "c1")],
        #     downstreamType=FineGrainedLineageDownstreamType.FIELD,
        #     downstreams=[fldUrn("bar", "c1")]),

        FineGrainedLineage(
            upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
            upstreams=[fldUrn("tmp.skip_test", "user_id")],
            downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
            downstreams=[fldUrn("tmp.skip_test2", "id")]),
    ]
    upstream_lineage = UpstreamLineage(upstreams=upstream_tables, fineGrainedLineages=fineGrainedLineages)

    # 构造一个MetadataChangeProposalWrapper对象
    lineage_mcp = MetadataChangeProposalWrapper(
        entityType="dataset",
        changeType=ChangeTypeClass.UPSERT,
        entityUrn=dest_urn,
        aspectName="upstreamLineage",
        aspect=upstream_lineage,
    )

    # emit 元数据,阻塞调用
    res = emitter.emit(lineage_mcp)
    print(res)


gms_server_url = "http://localhost:8080"

# 构建一个GMS REST API Emitter。
rest_emitter = DatahubRestEmitter(gms_server_url)

add_metadata_by_me(builder.make_dataset_urn("hive", "tmp.skip_test"), "测试1", rest_emitter)
add_metadata_by_me(builder.make_dataset_urn("hive", "tmp.skip_test2"), "测试2", rest_emitter)
# add_metadata_by_me(builder.make_dataset_urn("hive", "test.tableC"), "用户PV/UV按天统计数据", rest_emitter)

add_data_lineage(
    src_urns=[builder.make_dataset_urn("hive", "tmp.skip_test"),
              # builder.make_dataset_urn("hive", "test.tableB")
              ],
    dest_urn=builder.make_dataset_urn("hive", "tmp.skip_test2"),
    emitter=rest_emitter
)


我想要达到的结果

skip_test的user_id 下游指向 skip_test2 的 id 字段

  • 写回答

0条回答 默认 最新

    报告相同问题?

    问题事件

    • 系统已结题 4月8日
    • 创建了问题 3月31日

    悬赏问题

    • ¥15 如何让子窗口鼠标滚动独立,不要传递消息给主窗口
    • ¥15 如何能达到用ping0.cc检测成这样?如图
    • ¥15 关于#DMA固件#的问题,请各位专家解答!
    • ¥15 matlab生成的x1图不趋于稳定,之后的图像是稳定的水平线
    • ¥15 请问华为OD岗位的内部职业发展通道都有哪些,以及各个级别晋升的要求
    • ¥20 微信小程序 canvas 问题
    • ¥15 系统 24h2 专业工作站版,浏览文件夹的图库,视频,图片之类的怎样删除?
    • ¥15 怎么把512还原为520格式
    • ¥15 MATLAB的动态模态分解出现错误,以CFX非定常模拟结果为快照
    • ¥15 求高通平台Softsim调试经验