From 8d88df8907ea8c9547107300079f861baf01c893 Mon Sep 17 00:00:00 2001 From: zhanggougou <15651908511@163.com> Date: Mon, 21 Jul 2025 09:08:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=A1=80=E7=BC=98=E5=81=B6?= =?UTF-8?q?=E5=8F=91=E4=B8=A2=E5=A4=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fire/common/lineage/LineageManager.scala | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala b/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala index f930b2d..8f2b0cc 100644 --- a/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala +++ b/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala @@ -57,25 +57,30 @@ private[fire] class LineageManager extends Logging { private[this] def lineageParse(): Unit = { if (lineageEnable) { this.parserExecutor.scheduleWithFixedDelay(new Runnable { - override def run(): Unit = { - if (!lineageEnable || (parseCount.incrementAndGet() >= lineageRunCount && !parserExecutor.isShutdown)) { - disableLineage() - parserExecutor.shutdown() - printLog("实时血缘解析任务退出!") - } - - // 1. 解析jdbc sql语句 - parseJdbcSql() - printLog(s"完成第${parseCount}/${lineageRunCount}次解析JDBC中的血缘信息") - - // 2. 将SQL中使用到的的表血缘信息映射到数据源中 - LineageManager.mapTableToDatasource(SQLLineageManager.getSQLLineage.getTables) - printLog(s"完成第${parseCount}/${lineageRunCount}次异步解析SQL埋点中的表信息") - } + override def run(): Unit = executeLineageParse() }, lineageRunInitialDelay, lineageRunPeriod, TimeUnit.SECONDS) } } + /** + * 触发合并 + */ + private[this] def executeLineageParse(): Unit = { + if (!lineageEnable || (parseCount.incrementAndGet() >= lineageRunCount && !parserExecutor.isShutdown)) { + disableLineage() + parserExecutor.shutdown() + printLog("实时血缘解析任务退出!") + } + + // 1. 解析jdbc sql语句 + parseJdbcSql() + printLog(s"完成第${parseCount}/${lineageRunCount}次解析JDBC中的血缘信息") + + // 2. 将SQL中使用到的的表血缘信息映射到数据源中 + LineageManager.mapTableToDatasource(SQLLineageManager.getSQLLineage.getTables) + printLog(s"完成第${parseCount}/${lineageRunCount}次异步解析SQL埋点中的表信息") + } + /** * 解析来自于jdbc的sql血缘 */ @@ -127,7 +132,11 @@ private[fire] class LineageManager extends Logging { /** * 获取所有使用到的数据源 */ - private[fire] def get: JConcurrentHashMap[Datasource, JHashSet[DatasourceDesc]] = this.lineageMap + private[fire] def get: JConcurrentHashMap[Datasource, JHashSet[DatasourceDesc]] = { + //返回前,手动触发下合并 + executeLineageParse() + this.lineageMap + } } /** -- Gitee