diff --git a/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala b/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala index f930b2d37c6f1fcf3807c7c72e9ffd09fe02fc8f..8f2b0cc0eb6c565b64e90509c43cbf48df0444e9 100644 --- a/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala +++ b/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala @@ -57,25 +57,30 @@ private[fire] class LineageManager extends Logging { private[this] def lineageParse(): Unit = { if (lineageEnable) { this.parserExecutor.scheduleWithFixedDelay(new Runnable { - override def run(): Unit = { - if (!lineageEnable || (parseCount.incrementAndGet() >= lineageRunCount && !parserExecutor.isShutdown)) { - disableLineage() - parserExecutor.shutdown() - printLog("实时血缘解析任务退出!") - } - - // 1. 解析jdbc sql语句 - parseJdbcSql() - printLog(s"完成第${parseCount}/${lineageRunCount}次解析JDBC中的血缘信息") - - // 2. 将SQL中使用到的的表血缘信息映射到数据源中 - LineageManager.mapTableToDatasource(SQLLineageManager.getSQLLineage.getTables) - printLog(s"完成第${parseCount}/${lineageRunCount}次异步解析SQL埋点中的表信息") - } + override def run(): Unit = executeLineageParse() }, lineageRunInitialDelay, lineageRunPeriod, TimeUnit.SECONDS) } } + /** + * 触发合并 + */ + private[this] def executeLineageParse(): Unit = { + if (!lineageEnable || (parseCount.incrementAndGet() >= lineageRunCount && !parserExecutor.isShutdown)) { + disableLineage() + parserExecutor.shutdown() + printLog("实时血缘解析任务退出!") + } + + // 1. 解析jdbc sql语句 + parseJdbcSql() + printLog(s"完成第${parseCount}/${lineageRunCount}次解析JDBC中的血缘信息") + + // 2. 将SQL中使用到的的表血缘信息映射到数据源中 + LineageManager.mapTableToDatasource(SQLLineageManager.getSQLLineage.getTables) + printLog(s"完成第${parseCount}/${lineageRunCount}次异步解析SQL埋点中的表信息") + } + /** * 解析来自于jdbc的sql血缘 */ @@ -127,7 +132,11 @@ private[fire] class LineageManager extends Logging { /** * 获取所有使用到的数据源 */ - private[fire] def get: JConcurrentHashMap[Datasource, JHashSet[DatasourceDesc]] = this.lineageMap + private[fire] def get: JConcurrentHashMap[Datasource, JHashSet[DatasourceDesc]] = { + //返回前,手动触发下合并 + executeLineageParse() + this.lineageMap + } } /**