问题遇到的现象和发生背景
我想要设置字段级别的血缘关系, 但是我根据官网和 您的代码进行了一些修改, 可是没有生效, 您能帮忙分析一下或者给一个 设置字段级别的血缘关系 的代码实例可以吗?
问题相关代码,请勿粘贴截图
from typing import List
import datetime
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage
)
from datahub.metadata.schema_classes import *
def datasetUrn(tbl):
return builder.make_dataset_urn("hive", tbl)
def fldUrn(tbl, fld):
return builder.make_schema_field_urn(datasetUrn(tbl), fld)
def add_metadata_by_me(urn: str, desc: str, emitter: DatahubRestEmitter):
# 构造一个数据集属性对象
dataset_properties = DatasetPropertiesClass(description=desc)
# 构造一个MetadataChangeProposalWrapper对象
metadata_event = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=urn,
aspectName="datasetProperties",
aspect=dataset_properties,
)
# emit 元数据,这是一个阻塞调用
res = emitter.emit(metadata_event)
print(res)
def add_data_lineage(src_urns: List[str], dest_urn: str, emitter: DatahubRestEmitter):
# 构建数据血缘上流对象UpstreamClass实例
upstream_tables: List[UpstreamClass] = []
for urn in src_urns:
upstream_tables.append(
UpstreamClass(
dataset=urn,
type=DatasetLineageTypeClass.TRANSFORMED,
auditStamp=AuditStampClass(
time= int(datetime.datetime.now().timestamp()*1000),
actor="urn:li:corpuser:datahub",
),
)
)
# 构建上流数据血缘对象实例
fineGrainedLineages = [
# FineGrainedLineage(
# upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
# upstreams=[fldUrn("bar2", "c1"), fldUrn("bar4", "c1")],
# downstreamType=FineGrainedLineageDownstreamType.FIELD,
# downstreams=[fldUrn("bar", "c1")]),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn("tmp.skip_test", "user_id")],
downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
downstreams=[fldUrn("tmp.skip_test2", "id")]),
]
upstream_lineage = UpstreamLineage(upstreams=upstream_tables, fineGrainedLineages=fineGrainedLineages)
# 构造一个MetadataChangeProposalWrapper对象
lineage_mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dest_urn,
aspectName="upstreamLineage",
aspect=upstream_lineage,
)
# emit 元数据,阻塞调用
res = emitter.emit(lineage_mcp)
print(res)
gms_server_url = "http://localhost:8080"
# 构建一个GMS REST API Emitter。
rest_emitter = DatahubRestEmitter(gms_server_url)
add_metadata_by_me(builder.make_dataset_urn("hive", "tmp.skip_test"), "测试1", rest_emitter)
add_metadata_by_me(builder.make_dataset_urn("hive", "tmp.skip_test2"), "测试2", rest_emitter)
# add_metadata_by_me(builder.make_dataset_urn("hive", "test.tableC"), "用户PV/UV按天统计数据", rest_emitter)
add_data_lineage(
src_urns=[builder.make_dataset_urn("hive", "tmp.skip_test"),
# builder.make_dataset_urn("hive", "test.tableB")
],
dest_urn=builder.make_dataset_urn("hive", "tmp.skip_test2"),
emitter=rest_emitter
)
我想要达到的结果
skip_test的user_id 下游指向 skip_test2 的 id 字段