From 5a00340ab87d2e91a9c16c1b86e1e7464ad8dcde Mon Sep 17 00:00:00 2001 From: xudongjun999 Date: Fri, 19 Apr 2024 15:42:20 +0800 Subject: [PATCH 1/2] =?UTF-8?q?[fire-1141]=20flink=E8=A1=80=E7=BC=98?= =?UTF-8?q?=E9=87=87=E9=9B=86=E6=97=B6=E5=87=BA=E7=8E=B0=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../zto/fire/common/lineage/LineageManager.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala b/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala index 2c24027..b554eb9 100644 --- a/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala +++ b/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala @@ -164,10 +164,11 @@ object LineageManager extends Logging { def print(interval: Long = 60, pretty: Boolean = true): Unit = { ThreadUtils.schedule({ val lineage = FireUtils.invokeEngineApi[Lineage](this.sparkLineageAccumulatorManager, this.flinkLineageAccumulatorManager, "getValue") - val jsonLineage = FirePS1Conf.wrap(s""" - |------------------- 血缘信息(${DateFormatUtils.formatCurrentDateTime()}):---------------------- - |${JSONUtils.toJSONString(lineage, pretty)} - |""".stripMargin, FirePS1Conf.PINK) + val jsonLineage = FirePS1Conf.wrap( + s""" + |------------------- 血缘信息(${DateFormatUtils.formatCurrentDateTime()}):---------------------- + |${JSONUtils.toJSONString(lineage, pretty)} + |""".stripMargin, FirePS1Conf.PINK) println(jsonLineage) logInfo(jsonLineage) @@ -244,7 +245,7 @@ object LineageManager extends Logging { LineageManager.printLog(log) } }) - } (this.logger, catchLog = "将SQLTable血缘信息映射为Datasource数据源信息失败!", isThrow = false, hook = false) + }(this.logger, catchLog = "将SQLTable血缘信息映射为Datasource数据源信息失败!", isThrow = false, hook = false) } /** @@ -348,7 +349,7 @@ object LineageManager extends Logging { */ def addPaimonLineage(cluster: String, tableName: String, primaryKey: String, bucketKey: String, partitionField: String, operations: Operation*): Unit = { - this.addLineage(PaimonDatasource(Datasource.PAIMON.toString, cluster, tableName, primaryKey , bucketKey, partitionField), operations: _*) + this.addLineage(PaimonDatasource(Datasource.PAIMON.toString, cluster, tableName, primaryKey, bucketKey, partitionField), operations: _*) } /** @@ -877,6 +878,7 @@ object LineageManager extends Logging { /** * 为指定数据源添加Operation类型 + * * @param datasource * 数据源 * @param targetOperations @@ -930,7 +932,7 @@ object LineageManager extends Logging { /** * 将目标DataSourceDesc中的operation合并到set中 */ - private[fire] def mergeSet(set: JHashSet[DatasourceDesc], datasourceDesc: DatasourceDesc): Unit = { + private[fire] def mergeSet(set: JHashSet[DatasourceDesc], datasourceDesc: DatasourceDesc): Unit = this.synchronized { if (set.isEmpty || !set.contains(datasourceDesc)) { set.add(datasourceDesc) return -- Gitee From 56ad4926499e835ee218731bde2b0d9556ce0bf4 Mon Sep 17 00:00:00 2001 From: xudongjun999 Date: Fri, 19 Apr 2024 15:51:51 +0800 Subject: [PATCH 2/2] =?UTF-8?q?[fire-1141]=20flink=E8=A1=80=E7=BC=98?= =?UTF-8?q?=E9=87=87=E9=9B=86=E6=97=B6=E5=87=BA=E7=8E=B0=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../zto/fire/common/lineage/LineageManager.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala b/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala index b554eb9..de335be 100644 --- a/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala +++ b/fire-common/src/main/scala/com/zto/fire/common/lineage/LineageManager.scala @@ -164,11 +164,10 @@ object LineageManager extends Logging { def print(interval: Long = 60, pretty: Boolean = true): Unit = { ThreadUtils.schedule({ val lineage = FireUtils.invokeEngineApi[Lineage](this.sparkLineageAccumulatorManager, this.flinkLineageAccumulatorManager, "getValue") - val jsonLineage = FirePS1Conf.wrap( - s""" - |------------------- 血缘信息(${DateFormatUtils.formatCurrentDateTime()}):---------------------- - |${JSONUtils.toJSONString(lineage, pretty)} - |""".stripMargin, FirePS1Conf.PINK) + val jsonLineage = FirePS1Conf.wrap(s""" + |------------------- 血缘信息(${DateFormatUtils.formatCurrentDateTime()}):---------------------- + |${JSONUtils.toJSONString(lineage, pretty)} + |""".stripMargin, FirePS1Conf.PINK) println(jsonLineage) logInfo(jsonLineage) @@ -245,7 +244,7 @@ object LineageManager extends Logging { LineageManager.printLog(log) } }) - }(this.logger, catchLog = "将SQLTable血缘信息映射为Datasource数据源信息失败!", isThrow = false, hook = false) + } (this.logger, catchLog = "将SQLTable血缘信息映射为Datasource数据源信息失败!", isThrow = false, hook = false) } /** @@ -349,7 +348,7 @@ object LineageManager extends Logging { */ def addPaimonLineage(cluster: String, tableName: String, primaryKey: String, bucketKey: String, partitionField: String, operations: Operation*): Unit = { - this.addLineage(PaimonDatasource(Datasource.PAIMON.toString, cluster, tableName, primaryKey, bucketKey, partitionField), operations: _*) + this.addLineage(PaimonDatasource(Datasource.PAIMON.toString, cluster, tableName, primaryKey , bucketKey, partitionField), operations: _*) } /** @@ -878,7 +877,6 @@ object LineageManager extends Logging { /** * 为指定数据源添加Operation类型 - * * @param datasource * 数据源 * @param targetOperations -- Gitee