From b06a153458fbb3bc3c7503f8a3fdd73ca176e0a1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=AB=8B=E5=AE=8F?= <2198083211@qq.com>
Date: Mon, 27 Nov 2023 17:55:16 +0800
Subject: [PATCH 1/2] =?UTF-8?q?feat(DataTransferNode=E3=80=81pom.xml):=20?=
=?UTF-8?q?=E5=88=A4=E6=96=AD=E6=98=AF=E5=90=A6=E9=9C=80=E8=A6=81Seatunnel?=
=?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=89=A7=E8=A1=8C=E5=AE=8C=E6=AF=95=E6=89=8D?=
=?UTF-8?q?=E6=89=A7=E8=A1=8CDataTransferNode=E7=9A=84=E4=B8=8B=E4=B8=80?=
=?UTF-8?q?=E4=B8=AA=E8=8A=82=E7=82=B9=E3=80=81=E8=A7=A3=E5=86=B3=E5=BC=95?=
=?UTF-8?q?=E5=85=A5hive=E6=95=B0=E6=8D=AE=E5=BA=93=E9=A9=B1=E5=8A=A8?=
=?UTF-8?q?=E5=90=8E=E5=A4=A7=E9=87=8F=E4=BE=9D=E8=B5=96=E5=86=B2=E7=AA=81?=
=?UTF-8?q?=E9=97=AE=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../app/liteFlow/model/DataTransferModel.java | 4 +-
.../liteFlow/model/LiteFlowNodeLogModel.java | 6 +-
.../app/liteFlow/node/DataTransferNode.java | 130 +++++++++++++++---
.../ApiAuthenticationInterceptor.java | 2 +-
pom.xml | 7 +
5 files changed, 127 insertions(+), 22 deletions(-)
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java
index 2dc26f8..e02247e 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java
@@ -2,6 +2,8 @@ package supie.webadmin.app.liteFlow.model;
import lombok.Data;
+import java.util.Map;
+
/**
* 描述:DataTransfer组件的相关信息
*
@@ -15,7 +17,7 @@ public class DataTransferModel {
/**
* jobId
*/
- private String jobId;
+ private Long jobId;
/**
* jobName
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java
index cc7806a..238176e 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java
@@ -36,15 +36,15 @@ public class LiteFlowNodeLogModel {
private String logMessage;
public static LiteFlowNodeLogModel info(String nodeId, String nodeTag, String logMessage) {
- return new LiteFlowNodeLogModel("INFO", nodeId, nodeTag, new Date(),logMessage);
+ return new LiteFlowNodeLogModel("INFO", nodeId, nodeTag, new Date(), logMessage);
}
public static LiteFlowNodeLogModel warn(String nodeId, String nodeTag, String logMessage) {
- return new LiteFlowNodeLogModel("WARN", nodeId, nodeTag, new Date(),logMessage);
+ return new LiteFlowNodeLogModel("WARN", nodeId, nodeTag, new Date(), logMessage);
}
public static LiteFlowNodeLogModel error(String nodeId, String nodeTag, String logMessage) {
- return new LiteFlowNodeLogModel("ERROR", nodeId, nodeTag, new Date(),logMessage);
+ return new LiteFlowNodeLogModel("ERROR", nodeId, nodeTag, new Date(), logMessage);
}
public LiteFlowNodeLogModel(String nodeId, String nodeTag, String logMessage) {
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java
index f1b6090..8634033 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java
@@ -1,12 +1,16 @@
package supie.webadmin.app.liteFlow.node;
import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.json.JSONUtil;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -46,10 +50,6 @@ public class DataTransferNode extends BaseNode {
* Seatunnel 配置信息
*/
private SeatunnelConfig seatunnelConfigModel;
- /**
- * 临时文件路径
- */
- private String tempFilePath;
@Autowired
private SeatunnelConfigMapper seatunnelConfigMapper;
@@ -74,20 +74,103 @@ public class DataTransferNode extends BaseNode {
//判断什么方式调用 Seatunnel
if (this.seatunnelConfigModel.getCallMode() == 1) {
// 通过接口的方式调用 Seatunnel
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用RestApi方式调用Seatunnel。"));
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用RestApi方式调用Seatunnel."));
restApiSubmitJob();
} else if (this.seatunnelConfigModel.getCallMode() == 2) {
// 通过 SSH 方式调用 Seatunnel
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用SSH方式调用Seatunnel。"));
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用SSH方式执行Seatunnel任务."));
sshSubmitJob();
}
}
+ /**
+ *
seatunnel任务已经调用并开始执行.判断是否需要等待任务执行完成再执行下一个节点.
+ * 根据(seaTunnelConfig -> env-> job.mode)的值判断是否需要等待执行完再执行下一节点.
+ * BATCH:需要等待执行完再执行下一个节点,其他值不用管,每次执行也都执行一次.
+ *
+ * @author 王立宏
+ * @date 2023/11/27 11:17
+ */
+ @Override
+ public void afterProcess() {
+ try {
+ JsonNode jsonNode = new JsonMapper().readTree(dataTransferModel.getSeaTunnelConfig());
+ String envJobModelValue = jsonNode.get("env").get("job.mode").asText();
+ if ("BATCH".equals(envJobModelValue)) {
+ // 判断该任务是否执行完毕(Api调用判断、SSH执行判断).
+ if (this.seatunnelConfigModel.getCallMode() == 2) {
+ // TODO SSH方式执行Seatunnel任务, 当前只能够等待任务执行完成才可以执行下一组件
+ // 非等待任务则开启一个线程来执行 SSH 命令,log日志则保存在 Seatunnel 路径的log文件夹中
+ return;
+ }
+ // API调用判断
+ long startTime = System.currentTimeMillis();
+ int sleepTime = 1000; //休眠时间
+ while (true) {
+ StringBuilder url = new StringBuilder(seatunnelConfigModel.getLocalhostUri());
+ url.append("/hazelcast/rest/maps/running-job/").append(dataTransferModel.getJobId());
+ HttpResponse execute;
+ try {
+ execute = HttpRequest.get(url.toString()).execute();
+ } catch (Exception e) {
+ String errorMessage = "SeatunnelRestApi(" + url.toString() + ")调用报错: " + e.getMessage();
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + errorMessage));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage));
+ }
+ if (!execute.isOk()) {
+ nodeLog.add(LiteFlowNodeLogModel.warn(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行信息获取失败,失败信息为:" + execute.body()));
+ if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) {
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!"));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!"));
+ }
+ continue;
+ }
+ String body = URLUtil.decode(execute.body());
+ // bodyJsonNode => {} 或 {"jobId":"","jobName":"","jobStatus":"","envOptions":{},"createTime":"","jobDag":{"vertices":[],"edges":[]},"pluginJarsUrls":[],"isStartWithSavePoint":false,"metrics":{"sourceReceivedCount":"","sinkWriteCount":""}}
+ JsonNode bodyJsonNode = new JsonMapper().readTree(body);
+ if (bodyJsonNode.size() == 0) {
+ // 未查询到该jobId所对应的任务信息{},认为该任务已经执行完毕。
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!"));
+ return;
+ }
+ if ("RUNNING".equals(bodyJsonNode.get("jobStatus").asText())) {
+ // 任务运行中,休眠5秒后继续判断是否完成.
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行中..."));
+ if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) {
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!"));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!"));
+ }
+ ThreadUtil.sleep(sleepTime);
+ //每次休眠后都追加3秒的时间,直至休眠时间大于1分钟。
+ if (sleepTime < 60000) sleepTime = sleepTime + 3000;
+ } else {
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!"));
+ return;
+ }
+ }
+ } else if ("STREAMING".equals(envJobModelValue)) {
+ // 无需等待Seatunnel任务执行完成,直接执行下一个节点.
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "env节点下的job.mode节点值为\"STREAMING\", 不关心执行结果, 直接执行下一组件节点."));
+ return;
+ }
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "'env'节点下的'job.mode'节点值为'" + envJobModelValue + "', 不关心该数据传输组件中的 Seatunnel 执行结果, 直接执行下一组件节点."));
+ } catch (JsonProcessingException e) {
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, e.getMessage()));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), e.getMessage()));
+ }
+ }
+
private void restApiSubmitJob() {
if (seatunnelConfigModel.getSubmitJobUrl() == null) {
seatunnelConfigModel.setSubmitJobUrl(new SeatunnelConfig().getSubmitJobUrl());
nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
- "未配置Seatunnel提交Job的接口地址,使用默认地址:" + seatunnelConfigModel.getSubmitJobUrl()));
+ "未配置Seatunnel提交Job的接口地址,使用默认地址:" + seatunnelConfigModel.getSubmitJobUrl()));
}
StringBuilder url = new StringBuilder(seatunnelConfigModel.getLocalhostUri());
// 判断字符串第一个字符是否为"/"
@@ -108,14 +191,14 @@ public class DataTransferNode extends BaseNode {
url.append("isStartWithSavePoint=").append(dataTransferModel.getIsStartWithSavePoint());
}
nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "提交Job:" + url.toString()));
- HttpResponse execute = null;
+ HttpResponse execute;
try {
execute = HttpRequest.post(url.toString())
.body(dataTransferModel.getSeaTunnelConfig())
.execute();
} catch (Exception e) {
- String errorMessage = "RestApi(" + url.toString() + ")调用报错:" + e.getMessage();
- nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败:" + errorMessage));
+ String errorMessage = "SeatunnelRestApi(" + url.toString() + ")调用报错: " + e.getMessage();
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + errorMessage));
throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage));
}
String body = URLUtil.decode(execute.body());
@@ -124,10 +207,21 @@ public class DataTransferNode extends BaseNode {
devLiteflowNodeMapper.setExecutionMessage(this.rulerId, this.nodeId, this.nodeTag, body);
if (!execute.isOk()) {
// 失败
- nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败:" + body));
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + body));
throw new MyLiteFlowException(new ErrorMessageModel(getClass(), body));
} else {
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "执行成功:" + body));
+ // body => {"jobId":733584788375666689,"jobName":"rest_api_test"}
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "任务调用成功,返回值为: " + body));
+ try {
+ // 成功,设置jobId,jobId将在该 DataTransferNode 组件的 afterProcess() 方法使用.
+ JsonNode jsonNode = new JsonMapper().readTree(JSONUtil.toJsonStr(body));
+ Long jobId = jsonNode.get("jobId").asLong();
+ dataTransferModel.setJobId(jobId);
+ } catch (Exception e) {
+ String errorMessage = "设置jobId失败,失败学习为: " + e.getMessage();
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, errorMessage));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage));
+ }
}
}
@@ -137,14 +231,16 @@ public class DataTransferNode extends BaseNode {
RemoteShell remoteShell = new RemoteShellSshjImpl(
remoteHost.getHostIp(), remoteHost.getHostPort(),
remoteHost.getLoginName(), remoteHost.getPassword(), null);
-
- tempFilePath = "./tempFolder/" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS-")
+ /**
+ * 临时文件路径
+ */
+ String tempFilePath = "./tempFolder/" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS-")
+ RandomUtil.randomString(5) + "-config.json";
contentWriteToFile(tempFilePath, JSONUtil.toJsonStr(dataTransferModel.getSeaTunnelConfig()));
// 上传配置文件(v2.batch.config.template)至 seatunnel 的 ./config/ 中
- String remoteConfigName = "v2.supie.config.json";
+ String remoteConfigName = "v2-supie-config" + DateUtil.format(new Date(), "-yyyy-MM-dd-HH-mm-ss-SSS") + ".json";
String remoteFilePath = seatunnelConfigModel.getSeatunnelPath() + "/config/" + remoteConfigName;
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "上传Seatunnel配置文件,remoteFilePath:" + remoteFilePath + "。"));
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "上传Seatunnel配置文件,remoteFilePath:" + remoteFilePath + "."));
remoteShell.uploadFile(tempFilePath, remoteFilePath);
// 执行命令
nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
@@ -155,7 +251,7 @@ public class DataTransferNode extends BaseNode {
"sh bin/seatunnel.sh --config config/" + remoteConfigName + " -e local");
remoteShell.close();
// 存储执行结果信息
- if (resultMsg == null) resultMsg = "无回执结果信息!";
+ if (resultMsg == null) resultMsg = "无回执结果信息, 请检查!";
nodeLog.add(LiteFlowNodeLogModel.warn(nodeId, nodeTag, resultMsg));
devLiteflowNodeMapper.setExecutionMessage(this.rulerId, this.nodeId, this.nodeTag, resultMsg);
// 删除创建的临时文件
diff --git a/application-webadmin/src/main/java/supie/webadmin/interceptor/ApiAuthenticationInterceptor.java b/application-webadmin/src/main/java/supie/webadmin/interceptor/ApiAuthenticationInterceptor.java
index 142aa49..9878406 100644
--- a/application-webadmin/src/main/java/supie/webadmin/interceptor/ApiAuthenticationInterceptor.java
+++ b/application-webadmin/src/main/java/supie/webadmin/interceptor/ApiAuthenticationInterceptor.java
@@ -103,7 +103,7 @@ public class ApiAuthenticationInterceptor implements HandlerInterceptor {
/**
* 获取验证信息.
* 先从redis获取,若redis中不存在则从数据库中查找并存入redis.
- * TODO 相应的权限有变动修改则删除redis中缓存的权限信息。
+ * 相应的权限有变动修改则删除redis中缓存的权限信息。
*
* @param url 路由地址
* @return 验证信息集
diff --git a/pom.xml b/pom.xml
index 67aaf52..8c1d757 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,13 @@
org.apache.hive
hive-jdbc
3.1.2
+
+
+
+ *
+ *
+
+
--
Gitee
From 020b3edc742865458ee5b8545ae82ec0865fb186 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=8E=8B=E7=AB=8B=E5=AE=8F?= <2198083211@qq.com>
Date: Wed, 29 Nov 2023 16:47:20 +0800
Subject: [PATCH 2/2] =?UTF-8?q?feat(Strategy=E3=80=81StandardQuatity?=
=?UTF-8?q?=E3=80=81ModelDesginField=E3=80=81StandardMain=E3=80=81Customiz?=
=?UTF-8?q?eRoute=E3=80=81DefinitionIndex=E3=80=81DataTransferNode):=20?=
=?UTF-8?q?=E5=AD=97=E6=AE=B5=E5=8F=98=E6=9B=B4=E3=80=81=E5=8A=9F=E8=83=BD?=
=?UTF-8?q?=E6=96=B0=E5=A2=9E?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
1、StandardQuatity、ModelDesginField、StandardMain、CustomizeRoute、DefinitionIndex、DataTransferNode字段变更
2、Strategy:数据库管理模块适应Doris数据库。
3、DataTransferNode:SSH方式执行新任务,判断是否需要等待任务结束再执行下一组件。
---
.../app/dao/mapper/CustomizeRouteMapper.xml | 10 +-
.../app/dao/mapper/DefinitionIndexMapper.xml | 40 +---
.../app/dao/mapper/ModelDesginFieldMapper.xml | 10 +-
.../app/dao/mapper/StandardMainMapper.xml | 11 +-
.../app/dao/mapper/StandardQuatityMapper.xml | 21 +-
.../webadmin/app/dto/CustomizeRouteDto.java | 6 +
.../webadmin/app/dto/DefinitionIndexDto.java | 32 +--
.../webadmin/app/dto/ModelDesginFieldDto.java | 6 +
.../webadmin/app/dto/StandardMainDto.java | 6 +
.../webadmin/app/dto/StandardQuatityDto.java | 18 +-
.../app/liteFlow/node/DataTransferNode.java | 190 ++++++++++++------
.../webadmin/app/model/CustomizeRoute.java | 5 +
.../webadmin/app/model/DefinitionIndex.java | 26 +--
.../webadmin/app/model/ModelDesginField.java | 5 +
.../webadmin/app/model/StandardMain.java | 5 +
.../webadmin/app/model/StandardQuatity.java | 13 +-
.../strategyImpl/BaseDataSource.java | 32 +--
.../strategyImpl/DataSourceDoris.java | 54 +++++
.../webadmin/app/vo/CustomizeRouteVo.java | 6 +
.../webadmin/app/vo/DefinitionIndexVo.java | 29 +--
.../webadmin/app/vo/ModelDesginFieldVo.java | 6 +
.../supie/webadmin/app/vo/StandardMainVo.java | 7 +
.../webadmin/app/vo/StandardQuatityVo.java | 18 +-
23 files changed, 338 insertions(+), 218 deletions(-)
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/CustomizeRouteMapper.xml b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/CustomizeRouteMapper.xml
index a70dfaf..92011f7 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/CustomizeRouteMapper.xml
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/CustomizeRouteMapper.xml
@@ -20,6 +20,7 @@
+
@@ -41,7 +42,8 @@
database_name,
sql_script,
parameter,
- process_id)
+ process_id,
+ definition_index_id)
VALUES
(#{item.id},
@@ -61,7 +63,8 @@
#{item.databaseName},
#{item.sqlScript},
#{item.parameter},
- #{item.processId})
+ #{item.processId},
+ #{item.definitionIndexId})
@@ -138,6 +141,9 @@
AND sdt_customize_route.process_id = #{customizeRouteFilter.processId}
+
+ AND sdt_customize_route.definition_index_id = #{customizeRouteFilter.definitionIndexId}
+
AND CONCAT(IFNULL(sdt_customize_route.name,''), IFNULL(sdt_customize_route.route_describe,''), IFNULL(sdt_customize_route.url,''), IFNULL(sdt_customize_route.database_name,''), IFNULL(sdt_customize_route.sql_script,''), IFNULL(sdt_customize_route.parameter,'')) LIKE #{safeCustomizeRouteSearchString}
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/DefinitionIndexMapper.xml b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/DefinitionIndexMapper.xml
index 4ffa2a8..d986f4e 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/DefinitionIndexMapper.xml
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/DefinitionIndexMapper.xml
@@ -18,14 +18,10 @@
-
+
-
-
-
-
@@ -47,14 +43,10 @@
index_level,
process_id,
index_description,
- model_logical_id,
+ customize_route_id,
model_desgin_field_id,
data_type,
product_period,
- caliber_calculate_function,
- caliber_measure_unit,
- caliber_precision,
- caliber_description,
project_id)
VALUES
@@ -74,14 +66,10 @@
#{item.indexLevel},
#{item.processId},
#{item.indexDescription},
- #{item.modelLogicalId},
+ #{item.customizeRouteId},
#{item.modelDesginFieldId},
#{item.dataType},
#{item.productPeriod},
- #{item.caliberCalculateFunction},
- #{item.caliberMeasureUnit},
- #{item.caliberPrecision},
- #{item.caliberDescription},
#{item.projectId})
@@ -154,8 +142,8 @@
AND sdt_definition_index.index_description LIKE #{safeDefinitionIndexIndexDescription}
-
- AND sdt_definition_index.model_logical_id = #{definitionIndexFilter.modelLogicalId}
+
+ AND sdt_definition_index.customize_route_id = #{definitionIndexFilter.customizeRouteId}
AND sdt_definition_index.model_desgin_field_id = #{definitionIndexFilter.modelDesginFieldId}
@@ -166,25 +154,9 @@
AND sdt_definition_index.product_period = #{definitionIndexFilter.productPeriod}
-
-
- AND sdt_definition_index.caliber_calculate_function LIKE #{safeDefinitionIndexCaliberCalculateFunction}
-
-
-
- AND sdt_definition_index.caliber_measure_unit LIKE #{safeDefinitionIndexCaliberMeasureUnit}
-
-
-
- AND sdt_definition_index.caliber_precision LIKE #{safeDefinitionIndexCaliberPrecision}
-
-
-
- AND sdt_definition_index.caliber_description LIKE #{safeDefinitionIndexCaliberDescription}
-
- AND CONCAT(IFNULL(sdt_definition_index.index_type,''), IFNULL(sdt_definition_index.index_name,''), IFNULL(sdt_definition_index.index_en_name,''), IFNULL(sdt_definition_index.index_code,''), IFNULL(sdt_definition_index.index_level,''), IFNULL(sdt_definition_index.index_description,''), IFNULL(sdt_definition_index.data_type,''), IFNULL(sdt_definition_index.product_period,''), IFNULL(sdt_definition_index.caliber_calculate_function,''), IFNULL(sdt_definition_index.caliber_measure_unit,''), IFNULL(sdt_definition_index.caliber_precision,''), IFNULL(sdt_definition_index.caliber_description,'')) LIKE #{safeDefinitionIndexSearchString}
+ AND CONCAT(IFNULL(sdt_definition_index.index_type,''), IFNULL(sdt_definition_index.index_name,''), IFNULL(sdt_definition_index.index_en_name,''), IFNULL(sdt_definition_index.index_code,''), IFNULL(sdt_definition_index.index_level,''), IFNULL(sdt_definition_index.index_description,''), IFNULL(sdt_definition_index.data_type,''), IFNULL(sdt_definition_index.product_period,'')) LIKE #{safeDefinitionIndexSearchString}
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/ModelDesginFieldMapper.xml b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/ModelDesginFieldMapper.xml
index 9a1d8f2..94d1fc6 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/ModelDesginFieldMapper.xml
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/ModelDesginFieldMapper.xml
@@ -33,6 +33,7 @@
+
@@ -72,7 +73,8 @@
model_quote_standard,
model_field_script,
show_order,
- process_id)
+ process_id,
+ standard_main_id)
VALUES
(#{item.id},
@@ -105,7 +107,8 @@
#{item.modelQuoteStandard},
#{item.modelFieldScript},
#{item.showOrder},
- #{item.processId})
+ #{item.processId},
+ #{item.standardMainId})
@@ -234,6 +237,9 @@
AND sdt_model_desgin_field.process_id = #{modelDesginFieldFilter.processId}
+
+ AND sdt_model_desgin_field.standard_main_id = #{modelDesginFieldFilter.standardMainId}
+
AND CONCAT(IFNULL(sdt_model_desgin_field.model_field_name,''), IFNULL(sdt_model_desgin_field.model_field_code,''), IFNULL(sdt_model_desgin_field.model_field_type,''), IFNULL(sdt_model_desgin_field.model_field_index,''), IFNULL(sdt_model_desgin_field.model_field_meta_standard,''), IFNULL(sdt_model_desgin_field.model_field_value_standard,''), IFNULL(sdt_model_desgin_field.model_field_description,''), IFNULL(sdt_model_desgin_field.model_field_key,''), IFNULL(sdt_model_desgin_field.model_field_ppartition,''), IFNULL(sdt_model_desgin_field.model_field_source_name,''), IFNULL(sdt_model_desgin_field.model_field_source_type,''), IFNULL(sdt_model_desgin_field.model_field_source_table,''), IFNULL(sdt_model_desgin_field.model_field_mapping,''), IFNULL(sdt_model_desgin_field.model_quote_standard,''), IFNULL(sdt_model_desgin_field.model_field_script,'')) LIKE #{safeModelDesginFieldSearchString}
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardMainMapper.xml b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardMainMapper.xml
index f7246d3..313a10e 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardMainMapper.xml
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardMainMapper.xml
@@ -20,6 +20,7 @@
+
@@ -41,7 +42,8 @@
standard_english,
standard_description,
standard_input_mode,
- standard_status)
+ standard_status,
+ standard_regular)
VALUES
(#{item.id},
@@ -61,7 +63,8 @@
#{item.standardEnglish},
#{item.standardDescription},
#{item.standardInputMode},
- #{item.standardStatus})
+ #{item.standardStatus},
+ #{item.standardRegular})
@@ -139,6 +142,10 @@
AND sdt_standard_main.standard_status LIKE #{safeStandardMainStandardStatus}
+
+
+ AND sdt_standard_main.standard_regular LIKE #{safeStandardMainStandardRegular}
+
AND CONCAT(IFNULL(sdt_standard_main.standard_name,''), IFNULL(sdt_standard_main.standard_code,''), IFNULL(sdt_standard_main.standard_type,''), IFNULL(sdt_standard_main.standard_english,''), IFNULL(sdt_standard_main.standard_description,''), IFNULL(sdt_standard_main.standard_input_mode,''), IFNULL(sdt_standard_main.standard_status,'')) LIKE #{safeStandardMainSearchString}
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardQuatityMapper.xml b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardQuatityMapper.xml
index 0e37114..dd4e015 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardQuatityMapper.xml
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardQuatityMapper.xml
@@ -15,8 +15,9 @@
-
-
+
+
+
@@ -42,6 +43,7 @@
standard_qaulity_re,
standard_quality_lang,
standard_quality_not_null,
+ custom_judgment,
standard_quality_quality_center_id)
VALUES
@@ -60,6 +62,7 @@
#{item.standardQaulityRe},
#{item.standardQualityLang},
#{item.standardQualityNotNull},
+ #{item.customJudgment},
#{item.standardQualityQualityCenterId})
@@ -120,13 +123,15 @@
AND sdt_standard_quatity.standard_qaulity_re LIKE #{safeStandardQuatityStandardQaulityRe}
-
-
- AND sdt_standard_quatity.standard_quality_lang LIKE #{safeStandardQuatityStandardQualityLang}
+
+ AND sdt_standard_quatity.standard_quality_lang = #{standardQuatityFilter.standardQualityLang}
-
-
- AND sdt_standard_quatity.standard_quality_not_null LIKE #{safeStandardQuatityStandardQualityNotNull}
+
+ AND sdt_standard_quatity.standard_quality_not_null = #{standardQuatityFilter.standardQualityNotNull}
+
+
+
+ AND sdt_standard_quatity.custom_judgment LIKE #{safeStandardQuatityCustomJudgment}
AND sdt_standard_quatity.standard_quality_quality_center_id = #{standardQuatityFilter.standardQualityQualityCenterId}
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java
index 0d518fe..a539f06 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java
@@ -98,6 +98,12 @@ public class CustomizeRouteDto {
@ApiModelProperty(value = "业务规程ID")
private Long processId;
+ /**
+ * 指标ID。
+ */
+ @ApiModelProperty(value = "指标ID")
+ private Long definitionIndexId;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java
index 983f91d..6afce37 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java
@@ -92,10 +92,10 @@ public class DefinitionIndexDto {
private String indexDescription;
/**
- * 关联明细表id。
+ * 动态路由id。
*/
- @ApiModelProperty(value = "关联明细表id")
- private Long modelLogicalId;
+ @ApiModelProperty(value = "动态路由id")
+ private Long customizeRouteId;
/**
* 关联字段。
@@ -115,30 +115,6 @@ public class DefinitionIndexDto {
@ApiModelProperty(value = "生产周期")
private String productPeriod;
- /**
- * 计算函数。
- */
- @ApiModelProperty(value = "计算函数")
- private String caliberCalculateFunction;
-
- /**
- * 度量单位。
- */
- @ApiModelProperty(value = "度量单位")
- private String caliberMeasureUnit;
-
- /**
- * 度量精度。
- */
- @ApiModelProperty(value = "度量精度")
- private String caliberPrecision;
-
- /**
- * 口径说明。
- */
- @ApiModelProperty(value = "口径说明")
- private String caliberDescription;
-
/**
* updateTime 范围过滤起始值(>=)。
*/
@@ -164,7 +140,7 @@ public class DefinitionIndexDto {
private String createTimeEnd;
/**
- * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / caliber_calculate_function / caliber_measure_unit / caliber_precision / caliber_description LIKE搜索字符串。
+ * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / LIKE搜索字符串。
*/
@ApiModelProperty(value = "LIKE模糊搜索字符串")
private String searchString;
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java
index a26b65a..c40975c 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java
@@ -173,6 +173,12 @@ public class ModelDesginFieldDto {
@ApiModelProperty(value = "业务过程id")
private Long processId;
+ /**
+ * 数据标准主表ID standard_main_id。
+ */
+ @ApiModelProperty(value = "数据标准主表ID")
+ private Long standardMainId;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java
index 862cc4c..ccace30 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java
@@ -97,6 +97,12 @@ public class StandardMainDto {
@ApiModelProperty(value = "标准状态")
private String standardStatus;
+ /**
+ * 正则表达式。
+ */
+ @ApiModelProperty(value = "正则表达式")
+ private String standardRegular;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java
index 65b70ad..6e46c3a 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java
@@ -68,16 +68,22 @@ public class StandardQuatityDto {
private String standardQaulityRe;
/**
- * 质量校验长度限制。
+ * 质量校验长度限制(正数->大等于。负数->小等于)。
*/
- @ApiModelProperty(value = "质量校验长度限制")
- private String standardQualityLang;
+ @ApiModelProperty(value = "质量校验长度限制(正数->大等于。负数->小等于)")
+ private Integer standardQualityLang;
/**
- * 质量校验不为空。
+ * 质量校验不为空(1:不为空。0:可为空)。
*/
- @ApiModelProperty(value = "质量校验不为空")
- private String standardQualityNotNull;
+ @ApiModelProperty(value = "质量校验不为空(1:不为空。0:可为空)")
+ private Integer standardQualityNotNull;
+
+ /**
+ * SQL条件。
+ */
+ @ApiModelProperty(value = "SQL条件")
+ private String customJudgment;
/**
* 质量校验中心关联规则。
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java
index 8634033..7e06406 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java
@@ -51,11 +51,14 @@ public class DataTransferNode extends BaseNode {
*/
private SeatunnelConfig seatunnelConfigModel;
+ String envJobModelValue = null;
+
@Autowired
private SeatunnelConfigMapper seatunnelConfigMapper;
@Autowired
private RemoteHostMapper remoteHostMapper;
+
@Override
public void beforeProcess() {
dataTransferModel = JSONUtil.toBean(devLiteflowNode.getFieldJsonData(), DataTransferModel.class);
@@ -71,15 +74,38 @@ public class DataTransferNode extends BaseNode {
@Override
public void process() throws Exception {
+ try {
+ JsonNode jsonNode = new JsonMapper().readTree(dataTransferModel.getSeaTunnelConfig());
+ envJobModelValue = jsonNode.get("env").get("job.mode").asText();
+ } catch (JsonProcessingException e) {
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, e.getMessage()));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), e.getMessage()));
+ }
//判断什么方式调用 Seatunnel
if (this.seatunnelConfigModel.getCallMode() == 1) {
// 通过接口的方式调用 Seatunnel
nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用RestApi方式调用Seatunnel."));
restApiSubmitJob();
} else if (this.seatunnelConfigModel.getCallMode() == 2) {
+ // 判断是否需要等待 Seatunnel 任务执行完毕
// 通过 SSH 方式调用 Seatunnel
nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用SSH方式执行Seatunnel任务."));
- sshSubmitJob();
+ if ("BATCH".equals(envJobModelValue)) {
+ // BATCH:需要等待执行完再执行下一个节点
+ sshSubmitJob();
+ } else {
+ String logFileName = "RunSeatunnelTask_" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS") + ".log";
+ String message = "env节点下的job.mode节点值为[" + envJobModelValue +
+ "], 使用异步方式执行, Seatunnel任务执行日志[" + logFileName + "]将保存在Seatunnel安装位置下的 taskLog 目录下.";
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, message));
+// String a = "nohup ./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local > ./taskLog/RunSeatunnelTask.log 2>&1 &";
+// String b = "./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local | tee ./taskLog/seatunnel.log && sync";
+ // 开启一个线程
+ // 执行远程命令,并保存日志至
+ new Thread(() -> {
+ sshSubmitJobAndUploadLogFil(logFileName);
+ }).start();
+ }
}
}
@@ -93,77 +119,75 @@ public class DataTransferNode extends BaseNode {
*/
@Override
public void afterProcess() {
- try {
- JsonNode jsonNode = new JsonMapper().readTree(dataTransferModel.getSeaTunnelConfig());
- String envJobModelValue = jsonNode.get("env").get("job.mode").asText();
- if ("BATCH".equals(envJobModelValue)) {
- // 判断该任务是否执行完毕(Api调用判断、SSH执行判断).
- if (this.seatunnelConfigModel.getCallMode() == 2) {
- // TODO SSH方式执行Seatunnel任务, 当前只能够等待任务执行完成才可以执行下一组件
- // 非等待任务则开启一个线程来执行 SSH 命令,log日志则保存在 Seatunnel 路径的log文件夹中
- return;
+ if ("BATCH".equals(envJobModelValue)) {
+ // 判断该任务是否执行完毕(Api调用判断、SSH执行判断).
+ if (this.seatunnelConfigModel.getCallMode() == 2) {
+ // SSH方式执行Seatunnel任务, 当前只能够等待任务执行完成才可以执行下一组件
+ // 非等待任务则开启一个线程来执行 SSH 命令,log日志则保存在 Seatunnel 路径的log文件夹中
+ return;
+ }
+ // API调用判断
+ long startTime = System.currentTimeMillis();
+ int sleepTime = 1000; //休眠时间
+ while (true) {
+ StringBuilder url = new StringBuilder(seatunnelConfigModel.getLocalhostUri());
+ url.append("/hazelcast/rest/maps/running-job/").append(dataTransferModel.getJobId());
+ HttpResponse execute;
+ try {
+ execute = HttpRequest.get(url.toString()).execute();
+ } catch (Exception e) {
+ String errorMessage = "SeatunnelRestApi(" + url.toString() + ")调用报错: " + e.getMessage();
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + errorMessage));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage));
}
- // API调用判断
- long startTime = System.currentTimeMillis();
- int sleepTime = 1000; //休眠时间
- while (true) {
- StringBuilder url = new StringBuilder(seatunnelConfigModel.getLocalhostUri());
- url.append("/hazelcast/rest/maps/running-job/").append(dataTransferModel.getJobId());
- HttpResponse execute;
- try {
- execute = HttpRequest.get(url.toString()).execute();
- } catch (Exception e) {
- String errorMessage = "SeatunnelRestApi(" + url.toString() + ")调用报错: " + e.getMessage();
- nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + errorMessage));
- throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage));
+ if (!execute.isOk()) {
+ nodeLog.add(LiteFlowNodeLogModel.warn(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行信息获取失败,失败信息为:" + execute.body()));
+ if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) {
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!"));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!"));
}
- if (!execute.isOk()) {
- nodeLog.add(LiteFlowNodeLogModel.warn(nodeId, nodeTag,
- "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行信息获取失败,失败信息为:" + execute.body()));
- if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) {
- nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!"));
- throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!"));
- }
- continue;
- }
- String body = URLUtil.decode(execute.body());
- // bodyJsonNode => {} 或 {"jobId":"","jobName":"","jobStatus":"","envOptions":{},"createTime":"","jobDag":{"vertices":[],"edges":[]},"pluginJarsUrls":[],"isStartWithSavePoint":false,"metrics":{"sourceReceivedCount":"","sinkWriteCount":""}}
- JsonNode bodyJsonNode = new JsonMapper().readTree(body);
- if (bodyJsonNode.size() == 0) {
- // 未查询到该jobId所对应的任务信息{},认为该任务已经执行完毕。
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
- "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!"));
- return;
- }
- if ("RUNNING".equals(bodyJsonNode.get("jobStatus").asText())) {
- // 任务运行中,休眠5秒后继续判断是否完成.
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
- "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行中..."));
- if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) {
- nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!"));
- throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!"));
- }
- ThreadUtil.sleep(sleepTime);
- //每次休眠后都追加3秒的时间,直至休眠时间大于1分钟。
- if (sleepTime < 60000) sleepTime = sleepTime + 3000;
- } else {
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
- "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!"));
- return;
+ continue;
+ }
+ String body = URLUtil.decode(execute.body());
+ // bodyJsonNode => {} 或 {"jobId":"","jobName":"","jobStatus":"","envOptions":{},"createTime":"","jobDag":{"vertices":[],"edges":[]},"pluginJarsUrls":[],"isStartWithSavePoint":false,"metrics":{"sourceReceivedCount":"","sinkWriteCount":""}}
+ JsonNode bodyJsonNode;
+ try {
+ bodyJsonNode = new JsonMapper().readTree(body);
+ } catch (JsonProcessingException e) {
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), e.getMessage()));
+ }
+ if (bodyJsonNode.size() == 0) {
+ // 未查询到该jobId所对应的任务信息{},认为该任务已经执行完毕。
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!"));
+ return;
+ }
+ if ("RUNNING".equals(bodyJsonNode.get("jobStatus").asText())) {
+ // 任务运行中,休眠5秒后继续判断是否完成.
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行中..."));
+ if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) {
+ nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!"));
+ throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!"));
}
+ ThreadUtil.sleep(sleepTime);
+ //每次休眠后都追加3秒的时间,直至休眠时间大于1分钟。
+ if (sleepTime < 60000) sleepTime = sleepTime + 3000;
+ } else {
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!"));
+ return;
}
- } else if ("STREAMING".equals(envJobModelValue)) {
- // 无需等待Seatunnel任务执行完成,直接执行下一个节点.
- nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
- "env节点下的job.mode节点值为\"STREAMING\", 不关心执行结果, 直接执行下一组件节点."));
- return;
}
+ } else if ("STREAMING".equals(envJobModelValue)) {
+ // 无需等待Seatunnel任务执行完成,直接执行下一个节点.
nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
- "'env'节点下的'job.mode'节点值为'" + envJobModelValue + "', 不关心该数据传输组件中的 Seatunnel 执行结果, 直接执行下一组件节点."));
- } catch (JsonProcessingException e) {
- nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, e.getMessage()));
- throw new MyLiteFlowException(new ErrorMessageModel(getClass(), e.getMessage()));
+ "env节点下的job.mode节点值为\"STREAMING\", 不关心执行结果, 直接执行下一组件节点."));
+ return;
}
+ nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag,
+ "'env'节点下的'job.mode'节点值为'" + envJobModelValue + "', 不关心该数据传输组件中的 Seatunnel 执行结果, 直接执行下一组件节点."));
}
private void restApiSubmitJob() {
@@ -259,6 +283,42 @@ public class DataTransferNode extends BaseNode {
file.delete();
}
+ private void sshSubmitJobAndUploadLogFil(String logFileName) {
+ // 根据项目ID 获取到该项目的远程服务器的配置
+ RemoteHost remoteHost = remoteHostMapper.selectById(seatunnelConfigModel.getRemoteHostId());
+ RemoteShell remoteShell = new RemoteShellSshjImpl(
+ remoteHost.getHostIp(), remoteHost.getHostPort(),
+ remoteHost.getLoginName(), remoteHost.getPassword(), null);
+ /**
+ * 临时文件路径
+ */
+ String tempFilePathOfConfig = "./tempFolder/" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS-")
+ + RandomUtil.randomString(5) + "-config.json";
+ contentWriteToFile(tempFilePathOfConfig, JSONUtil.toJsonStr(dataTransferModel.getSeaTunnelConfig()));
+ // 上传配置文件(v2.batch.config.template)至 seatunnel 的 ./config/ 中
+ String remoteConfigName = "v2-supie-config" + DateUtil.format(new Date(), "-yyyy-MM-dd-HH-mm-ss-SSS") + ".json";
+ String remoteFilePathOfConfig = seatunnelConfigModel.getSeatunnelPath() + "/config/" + remoteConfigName;
+ remoteShell.uploadFile(tempFilePathOfConfig, remoteFilePathOfConfig);
+ // 执行命令
+ String resultMsg = remoteShell.execCommands(
+ "cd " + seatunnelConfigModel.getSeatunnelPath(),
+ "sh bin/seatunnel.sh --config config/" + remoteConfigName + " -e local");
+ // 日志保存及上传
+ String tempFilePathOfLog = "./tempFolder/" + logFileName;
+ String remoteFilePathOfLog = seatunnelConfigModel.getSeatunnelPath() + "/taskLog/" + logFileName;
+ contentWriteToFile(tempFilePathOfLog, resultMsg);
+ remoteShell.uploadFile(tempFilePathOfLog, remoteFilePathOfLog);
+ remoteShell.close();
+ // 存储执行结果信息
+ if (resultMsg == null) resultMsg = "无回执结果信息, 请检查!";
+ devLiteflowNodeMapper.setExecutionMessage(this.rulerId, this.nodeId, this.nodeTag, resultMsg);
+ // 删除创建的临时文件
+ File configFile = new File(tempFilePathOfConfig);
+ configFile.delete();
+ File logFile = new File(tempFilePathOfLog);
+ logFile.delete();
+ }
+
/**
* 写入文件输入流
*
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java b/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java
index bf8e336..0cd0230 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java
@@ -96,6 +96,11 @@ public class CustomizeRoute extends BaseModel {
*/
private Long processId;
+ /**
+ * 指标ID。
+ */
+ private Long definitionIndexId;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java b/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java
index b2a8f9c..c090701 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java
@@ -92,9 +92,9 @@ public class DefinitionIndex extends BaseModel {
private String indexDescription;
/**
- * 关联明细表id。
+ * 动态路由id。
*/
- private Long modelLogicalId;
+ private Long customizeRouteId;
/**
* 关联字段。
@@ -111,26 +111,6 @@ public class DefinitionIndex extends BaseModel {
*/
private String productPeriod;
- /**
- * 计算函数。
- */
- private String caliberCalculateFunction;
-
- /**
- * 度量单位。
- */
- private String caliberMeasureUnit;
-
- /**
- * 度量精度。
- */
- private String caliberPrecision;
-
- /**
- * 口径说明。
- */
- private String caliberDescription;
-
/**
* updateTime 范围过滤起始值(>=)。
*/
@@ -156,7 +136,7 @@ public class DefinitionIndex extends BaseModel {
private String createTimeEnd;
/**
- * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / caliber_calculate_function / caliber_measure_unit / caliber_precision / caliber_description LIKE搜索字符串。
+ * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / LIKE搜索字符串。
*/
@TableField(exist = false)
private String searchString;
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java b/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java
index 262686a..1169fbf 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java
@@ -171,6 +171,11 @@ public class ModelDesginField extends BaseModel {
*/
private Long processId;
+ /**
+ * 数据标准主表ID standard_main_id。
+ */
+ private Long standardMainId;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java
index b2c5c62..584561e 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java
@@ -96,6 +96,11 @@ public class StandardMain extends BaseModel {
*/
private String standardStatus;
+ /**
+ * 正则表达式。
+ */
+ private String standardRegular;
+
/**
* updateTime 范围过滤起始值(>=)。
*/
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java
index b123950..d2cb9e4 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java
@@ -72,14 +72,19 @@ public class StandardQuatity extends BaseModel {
private String standardQaulityRe;
/**
- * 质量校验长度限制。
+ * 质量校验长度限制(正数->大等于。负数->小等于)。
*/
- private String standardQualityLang;
+ private Integer standardQualityLang;
/**
- * 质量校验不为空。
+ * 质量校验不为空(1:不为空。0:可为空)。
*/
- private String standardQualityNotNull;
+ private Integer standardQualityNotNull;
+
+ /**
+ * SQL条件。
+ */
+ private String customJudgment;
/**
* 质量校验中心关联规则。
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java
index 2ea93e0..ba1e9c8 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java
@@ -335,21 +335,29 @@ public class BaseDataSource {
tableName = sqlList.get(1);
}
ResultSet resultSet = metaData.getColumns(databaseName, schemaPattern, tableName, null);
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ int columnCount = resultSetMetaData.getColumnCount();
resultData = new ArrayList<>();
while (resultSet.next()) {
HashMap dataTypeMap = new HashMap<>();
- // 字段名
- String columnName = resultSet.getString("COLUMN_NAME");
- // 字段类型
- String dataType = resultSet.getString("TYPE_NAME");
- // 字段大小
- int columnSize = resultSet.getInt("COLUMN_SIZE");
- // 字段注释
- String columnComment = resultSet.getString("REMARKS");
- dataTypeMap.put("fieldName",columnName);
- dataTypeMap.put("typeName",dataType);
- dataTypeMap.put("columnSize",columnSize);
- dataTypeMap.put("remarks",columnComment);
+ for (int i = 1; i <= columnCount; i++) {
+ String tableFieldName = resultSetMetaData.getColumnLabel(i);
+ dataTypeMap.put(tableFieldName, resultSet.getObject(tableFieldName));
+ }
+// // 字段名
+// String columnName = resultSet.getString("COLUMN_NAME");
+// // 字段类型
+// String dataType = resultSet.getString("TYPE_NAME");
+// // 字段大小
+// int columnSize = resultSet.getInt("COLUMN_SIZE");
+// // 字段注释
+// String columnComment = resultSet.getString("REMARKS");
+// String nullable = resultSet.getString("NULLABLE");
+// dataTypeMap.put("fieldName", columnName);
+// dataTypeMap.put("typeName", dataType);
+// dataTypeMap.put("columnSize", columnSize);
+// dataTypeMap.put("remarks", columnComment);
+// dataTypeMap.put("nullable", nullable);
resultData.add(dataTypeMap);
}
resultSet.close();
diff --git a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java
index 067ca12..b966455 100644
--- a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java
+++ b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java
@@ -1,5 +1,6 @@
package supie.webadmin.app.service.databasemanagement.strategyImpl;
+import cn.hutool.core.text.StrSplitter;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -9,6 +10,11 @@ import supie.webadmin.app.service.databasemanagement.Strategy;
import supie.webadmin.app.service.databasemanagement.StrategyFactory;
import javax.annotation.PostConstruct;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* 描述:
@@ -58,4 +64,52 @@ public class DataSourceDoris extends BaseDataSource implements Strategy {
initConnection();
}
+ /**
+ * 查询数据库数据的字段名及类型
+ *
+ * @param databaseName
+ * @param tableName
+ * @return
+ */
+ @Override
+ public List