From b06a153458fbb3bc3c7503f8a3fdd73ca176e0a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=AB=8B=E5=AE=8F?= <2198083211@qq.com> Date: Mon, 27 Nov 2023 17:55:16 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(DataTransferNode=E3=80=81pom.xml):=20?= =?UTF-8?q?=E5=88=A4=E6=96=AD=E6=98=AF=E5=90=A6=E9=9C=80=E8=A6=81Seatunnel?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=89=A7=E8=A1=8C=E5=AE=8C=E6=AF=95=E6=89=8D?= =?UTF-8?q?=E6=89=A7=E8=A1=8CDataTransferNode=E7=9A=84=E4=B8=8B=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E8=8A=82=E7=82=B9=E3=80=81=E8=A7=A3=E5=86=B3=E5=BC=95?= =?UTF-8?q?=E5=85=A5hive=E6=95=B0=E6=8D=AE=E5=BA=93=E9=A9=B1=E5=8A=A8?= =?UTF-8?q?=E5=90=8E=E5=A4=A7=E9=87=8F=E4=BE=9D=E8=B5=96=E5=86=B2=E7=AA=81?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/liteFlow/model/DataTransferModel.java | 4 +- .../liteFlow/model/LiteFlowNodeLogModel.java | 6 +- .../app/liteFlow/node/DataTransferNode.java | 130 +++++++++++++++--- .../ApiAuthenticationInterceptor.java | 2 +- pom.xml | 7 + 5 files changed, 127 insertions(+), 22 deletions(-) diff --git a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java index 2dc26f8..e02247e 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/DataTransferModel.java @@ -2,6 +2,8 @@ package supie.webadmin.app.liteFlow.model; import lombok.Data; +import java.util.Map; + /** * 描述:DataTransfer组件的相关信息 * @@ -15,7 +17,7 @@ public class DataTransferModel { /** * jobId */ - private String jobId; + private Long jobId; /** * jobName */ diff --git a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java index cc7806a..238176e 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/model/LiteFlowNodeLogModel.java @@ -36,15 +36,15 @@ public class LiteFlowNodeLogModel { private String logMessage; public static LiteFlowNodeLogModel info(String nodeId, String nodeTag, String logMessage) { - return new LiteFlowNodeLogModel("INFO", nodeId, nodeTag, new Date(),logMessage); + return new LiteFlowNodeLogModel("INFO", nodeId, nodeTag, new Date(), logMessage); } public static LiteFlowNodeLogModel warn(String nodeId, String nodeTag, String logMessage) { - return new LiteFlowNodeLogModel("WARN", nodeId, nodeTag, new Date(),logMessage); + return new LiteFlowNodeLogModel("WARN", nodeId, nodeTag, new Date(), logMessage); } public static LiteFlowNodeLogModel error(String nodeId, String nodeTag, String logMessage) { - return new LiteFlowNodeLogModel("ERROR", nodeId, nodeTag, new Date(),logMessage); + return new LiteFlowNodeLogModel("ERROR", nodeId, nodeTag, new Date(), logMessage); } public LiteFlowNodeLogModel(String nodeId, String nodeTag, String logMessage) { diff --git a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java index f1b6090..8634033 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java @@ -1,12 +1,16 @@ package supie.webadmin.app.liteFlow.node; import cn.hutool.core.date.DateUtil; +import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.RandomUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.URLUtil; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import cn.hutool.json.JSONUtil; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.json.JsonMapper; import com.yomahub.liteflow.annotation.LiteflowComponent; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -46,10 +50,6 @@ public class DataTransferNode extends BaseNode { * Seatunnel 配置信息 */ private SeatunnelConfig seatunnelConfigModel; - /** - * 临时文件路径 - */ - private String tempFilePath; @Autowired private SeatunnelConfigMapper seatunnelConfigMapper; @@ -74,20 +74,103 @@ public class DataTransferNode extends BaseNode { //判断什么方式调用 Seatunnel if (this.seatunnelConfigModel.getCallMode() == 1) { // 通过接口的方式调用 Seatunnel - nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用RestApi方式调用Seatunnel。")); + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用RestApi方式调用Seatunnel.")); restApiSubmitJob(); } else if (this.seatunnelConfigModel.getCallMode() == 2) { // 通过 SSH 方式调用 Seatunnel - nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用SSH方式调用Seatunnel。")); + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用SSH方式执行Seatunnel任务.")); sshSubmitJob(); } } + /** + *

seatunnel任务已经调用并开始执行.判断是否需要等待任务执行完成再执行下一个节点.

+ * 根据(seaTunnelConfig -> env-> job.mode)的值判断是否需要等待执行完再执行下一节点. + * BATCH:需要等待执行完再执行下一个节点,其他值不用管,每次执行也都执行一次. + * + * @author 王立宏 + * @date 2023/11/27 11:17 + */ + @Override + public void afterProcess() { + try { + JsonNode jsonNode = new JsonMapper().readTree(dataTransferModel.getSeaTunnelConfig()); + String envJobModelValue = jsonNode.get("env").get("job.mode").asText(); + if ("BATCH".equals(envJobModelValue)) { + // 判断该任务是否执行完毕(Api调用判断、SSH执行判断). + if (this.seatunnelConfigModel.getCallMode() == 2) { + // TODO SSH方式执行Seatunnel任务, 当前只能够等待任务执行完成才可以执行下一组件 + // 非等待任务则开启一个线程来执行 SSH 命令,log日志则保存在 Seatunnel 路径的log文件夹中 + return; + } + // API调用判断 + long startTime = System.currentTimeMillis(); + int sleepTime = 1000; //休眠时间 + while (true) { + StringBuilder url = new StringBuilder(seatunnelConfigModel.getLocalhostUri()); + url.append("/hazelcast/rest/maps/running-job/").append(dataTransferModel.getJobId()); + HttpResponse execute; + try { + execute = HttpRequest.get(url.toString()).execute(); + } catch (Exception e) { + String errorMessage = "SeatunnelRestApi(" + url.toString() + ")调用报错: " + e.getMessage(); + nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + errorMessage)); + throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage)); + } + if (!execute.isOk()) { + nodeLog.add(LiteFlowNodeLogModel.warn(nodeId, nodeTag, + "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行信息获取失败,失败信息为:" + execute.body())); + if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) { + nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!")); + throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!")); + } + continue; + } + String body = URLUtil.decode(execute.body()); + // bodyJsonNode => {} 或 {"jobId":"","jobName":"","jobStatus":"","envOptions":{},"createTime":"","jobDag":{"vertices":[],"edges":[]},"pluginJarsUrls":[],"isStartWithSavePoint":false,"metrics":{"sourceReceivedCount":"","sinkWriteCount":""}} + JsonNode bodyJsonNode = new JsonMapper().readTree(body); + if (bodyJsonNode.size() == 0) { + // 未查询到该jobId所对应的任务信息{},认为该任务已经执行完毕。 + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, + "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!")); + return; + } + if ("RUNNING".equals(bodyJsonNode.get("jobStatus").asText())) { + // 任务运行中,休眠5秒后继续判断是否完成. + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, + "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行中...")); + if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) { + nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!")); + throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!")); + } + ThreadUtil.sleep(sleepTime); + //每次休眠后都追加3秒的时间,直至休眠时间大于1分钟。 + if (sleepTime < 60000) sleepTime = sleepTime + 3000; + } else { + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, + "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!")); + return; + } + } + } else if ("STREAMING".equals(envJobModelValue)) { + // 无需等待Seatunnel任务执行完成,直接执行下一个节点. + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, + "env节点下的job.mode节点值为\"STREAMING\", 不关心执行结果, 直接执行下一组件节点.")); + return; + } + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, + "'env'节点下的'job.mode'节点值为'" + envJobModelValue + "', 不关心该数据传输组件中的 Seatunnel 执行结果, 直接执行下一组件节点.")); + } catch (JsonProcessingException e) { + nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, e.getMessage())); + throw new MyLiteFlowException(new ErrorMessageModel(getClass(), e.getMessage())); + } + } + private void restApiSubmitJob() { if (seatunnelConfigModel.getSubmitJobUrl() == null) { seatunnelConfigModel.setSubmitJobUrl(new SeatunnelConfig().getSubmitJobUrl()); nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, - "未配置Seatunnel提交Job的接口地址,使用默认地址:" + seatunnelConfigModel.getSubmitJobUrl())); + "未配置Seatunnel提交Job的接口地址,使用默认地址:" + seatunnelConfigModel.getSubmitJobUrl())); } StringBuilder url = new StringBuilder(seatunnelConfigModel.getLocalhostUri()); // 判断字符串第一个字符是否为"/" @@ -108,14 +191,14 @@ public class DataTransferNode extends BaseNode { url.append("isStartWithSavePoint=").append(dataTransferModel.getIsStartWithSavePoint()); } nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "提交Job:" + url.toString())); - HttpResponse execute = null; + HttpResponse execute; try { execute = HttpRequest.post(url.toString()) .body(dataTransferModel.getSeaTunnelConfig()) .execute(); } catch (Exception e) { - String errorMessage = "RestApi(" + url.toString() + ")调用报错:" + e.getMessage(); - nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败:" + errorMessage)); + String errorMessage = "SeatunnelRestApi(" + url.toString() + ")调用报错: " + e.getMessage(); + nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + errorMessage)); throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage)); } String body = URLUtil.decode(execute.body()); @@ -124,10 +207,21 @@ public class DataTransferNode extends BaseNode { devLiteflowNodeMapper.setExecutionMessage(this.rulerId, this.nodeId, this.nodeTag, body); if (!execute.isOk()) { // 失败 - nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败:" + body)); + nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + body)); throw new MyLiteFlowException(new ErrorMessageModel(getClass(), body)); } else { - nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "执行成功:" + body)); + // body => {"jobId":733584788375666689,"jobName":"rest_api_test"} + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "任务调用成功,返回值为: " + body)); + try { + // 成功,设置jobId,jobId将在该 DataTransferNode 组件的 afterProcess() 方法使用. + JsonNode jsonNode = new JsonMapper().readTree(JSONUtil.toJsonStr(body)); + Long jobId = jsonNode.get("jobId").asLong(); + dataTransferModel.setJobId(jobId); + } catch (Exception e) { + String errorMessage = "设置jobId失败,失败学习为: " + e.getMessage(); + nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, errorMessage)); + throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage)); + } } } @@ -137,14 +231,16 @@ public class DataTransferNode extends BaseNode { RemoteShell remoteShell = new RemoteShellSshjImpl( remoteHost.getHostIp(), remoteHost.getHostPort(), remoteHost.getLoginName(), remoteHost.getPassword(), null); - - tempFilePath = "./tempFolder/" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS-") + /** + * 临时文件路径 + */ + String tempFilePath = "./tempFolder/" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS-") + RandomUtil.randomString(5) + "-config.json"; contentWriteToFile(tempFilePath, JSONUtil.toJsonStr(dataTransferModel.getSeaTunnelConfig())); // 上传配置文件(v2.batch.config.template)至 seatunnel 的 ./config/ 中 - String remoteConfigName = "v2.supie.config.json"; + String remoteConfigName = "v2-supie-config" + DateUtil.format(new Date(), "-yyyy-MM-dd-HH-mm-ss-SSS") + ".json"; String remoteFilePath = seatunnelConfigModel.getSeatunnelPath() + "/config/" + remoteConfigName; - nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "上传Seatunnel配置文件,remoteFilePath:" + remoteFilePath + "。")); + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "上传Seatunnel配置文件,remoteFilePath:" + remoteFilePath + ".")); remoteShell.uploadFile(tempFilePath, remoteFilePath); // 执行命令 nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, @@ -155,7 +251,7 @@ public class DataTransferNode extends BaseNode { "sh bin/seatunnel.sh --config config/" + remoteConfigName + " -e local"); remoteShell.close(); // 存储执行结果信息 - if (resultMsg == null) resultMsg = "无回执结果信息!"; + if (resultMsg == null) resultMsg = "无回执结果信息, 请检查!"; nodeLog.add(LiteFlowNodeLogModel.warn(nodeId, nodeTag, resultMsg)); devLiteflowNodeMapper.setExecutionMessage(this.rulerId, this.nodeId, this.nodeTag, resultMsg); // 删除创建的临时文件 diff --git a/application-webadmin/src/main/java/supie/webadmin/interceptor/ApiAuthenticationInterceptor.java b/application-webadmin/src/main/java/supie/webadmin/interceptor/ApiAuthenticationInterceptor.java index 142aa49..9878406 100644 --- a/application-webadmin/src/main/java/supie/webadmin/interceptor/ApiAuthenticationInterceptor.java +++ b/application-webadmin/src/main/java/supie/webadmin/interceptor/ApiAuthenticationInterceptor.java @@ -103,7 +103,7 @@ public class ApiAuthenticationInterceptor implements HandlerInterceptor { /** * 获取验证信息. * 先从redis获取,若redis中不存在则从数据库中查找并存入redis. - * TODO 相应的权限有变动修改则删除redis中缓存的权限信息。 + * 相应的权限有变动修改则删除redis中缓存的权限信息。 * * @param url 路由地址 * @return 验证信息集 diff --git a/pom.xml b/pom.xml index 67aaf52..8c1d757 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,13 @@ org.apache.hive hive-jdbc 3.1.2 + + + + * + * + + -- Gitee From 020b3edc742865458ee5b8545ae82ec0865fb186 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=AB=8B=E5=AE=8F?= <2198083211@qq.com> Date: Wed, 29 Nov 2023 16:47:20 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat(Strategy=E3=80=81StandardQuatity?= =?UTF-8?q?=E3=80=81ModelDesginField=E3=80=81StandardMain=E3=80=81Customiz?= =?UTF-8?q?eRoute=E3=80=81DefinitionIndex=E3=80=81DataTransferNode):=20?= =?UTF-8?q?=E5=AD=97=E6=AE=B5=E5=8F=98=E6=9B=B4=E3=80=81=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E6=96=B0=E5=A2=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、StandardQuatity、ModelDesginField、StandardMain、CustomizeRoute、DefinitionIndex、DataTransferNode字段变更 2、Strategy:数据库管理模块适应Doris数据库。 3、DataTransferNode:SSH方式执行新任务,判断是否需要等待任务结束再执行下一组件。 --- .../app/dao/mapper/CustomizeRouteMapper.xml | 10 +- .../app/dao/mapper/DefinitionIndexMapper.xml | 40 +--- .../app/dao/mapper/ModelDesginFieldMapper.xml | 10 +- .../app/dao/mapper/StandardMainMapper.xml | 11 +- .../app/dao/mapper/StandardQuatityMapper.xml | 21 +- .../webadmin/app/dto/CustomizeRouteDto.java | 6 + .../webadmin/app/dto/DefinitionIndexDto.java | 32 +-- .../webadmin/app/dto/ModelDesginFieldDto.java | 6 + .../webadmin/app/dto/StandardMainDto.java | 6 + .../webadmin/app/dto/StandardQuatityDto.java | 18 +- .../app/liteFlow/node/DataTransferNode.java | 190 ++++++++++++------ .../webadmin/app/model/CustomizeRoute.java | 5 + .../webadmin/app/model/DefinitionIndex.java | 26 +-- .../webadmin/app/model/ModelDesginField.java | 5 + .../webadmin/app/model/StandardMain.java | 5 + .../webadmin/app/model/StandardQuatity.java | 13 +- .../strategyImpl/BaseDataSource.java | 32 +-- .../strategyImpl/DataSourceDoris.java | 54 +++++ .../webadmin/app/vo/CustomizeRouteVo.java | 6 + .../webadmin/app/vo/DefinitionIndexVo.java | 29 +-- .../webadmin/app/vo/ModelDesginFieldVo.java | 6 + .../supie/webadmin/app/vo/StandardMainVo.java | 7 + .../webadmin/app/vo/StandardQuatityVo.java | 18 +- 23 files changed, 338 insertions(+), 218 deletions(-) diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/CustomizeRouteMapper.xml b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/CustomizeRouteMapper.xml index a70dfaf..92011f7 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/CustomizeRouteMapper.xml +++ b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/CustomizeRouteMapper.xml @@ -20,6 +20,7 @@ + @@ -41,7 +42,8 @@ database_name, sql_script, parameter, - process_id) + process_id, + definition_index_id) VALUES (#{item.id}, @@ -61,7 +63,8 @@ #{item.databaseName}, #{item.sqlScript}, #{item.parameter}, - #{item.processId}) + #{item.processId}, + #{item.definitionIndexId}) @@ -138,6 +141,9 @@ AND sdt_customize_route.process_id = #{customizeRouteFilter.processId} + + AND sdt_customize_route.definition_index_id = #{customizeRouteFilter.definitionIndexId} + AND CONCAT(IFNULL(sdt_customize_route.name,''), IFNULL(sdt_customize_route.route_describe,''), IFNULL(sdt_customize_route.url,''), IFNULL(sdt_customize_route.database_name,''), IFNULL(sdt_customize_route.sql_script,''), IFNULL(sdt_customize_route.parameter,'')) LIKE #{safeCustomizeRouteSearchString} diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/DefinitionIndexMapper.xml b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/DefinitionIndexMapper.xml index 4ffa2a8..d986f4e 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/DefinitionIndexMapper.xml +++ b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/DefinitionIndexMapper.xml @@ -18,14 +18,10 @@ - + - - - - @@ -47,14 +43,10 @@ index_level, process_id, index_description, - model_logical_id, + customize_route_id, model_desgin_field_id, data_type, product_period, - caliber_calculate_function, - caliber_measure_unit, - caliber_precision, - caliber_description, project_id) VALUES @@ -74,14 +66,10 @@ #{item.indexLevel}, #{item.processId}, #{item.indexDescription}, - #{item.modelLogicalId}, + #{item.customizeRouteId}, #{item.modelDesginFieldId}, #{item.dataType}, #{item.productPeriod}, - #{item.caliberCalculateFunction}, - #{item.caliberMeasureUnit}, - #{item.caliberPrecision}, - #{item.caliberDescription}, #{item.projectId}) @@ -154,8 +142,8 @@ AND sdt_definition_index.index_description LIKE #{safeDefinitionIndexIndexDescription} - - AND sdt_definition_index.model_logical_id = #{definitionIndexFilter.modelLogicalId} + + AND sdt_definition_index.customize_route_id = #{definitionIndexFilter.customizeRouteId} AND sdt_definition_index.model_desgin_field_id = #{definitionIndexFilter.modelDesginFieldId} @@ -166,25 +154,9 @@ AND sdt_definition_index.product_period = #{definitionIndexFilter.productPeriod} - - - AND sdt_definition_index.caliber_calculate_function LIKE #{safeDefinitionIndexCaliberCalculateFunction} - - - - AND sdt_definition_index.caliber_measure_unit LIKE #{safeDefinitionIndexCaliberMeasureUnit} - - - - AND sdt_definition_index.caliber_precision LIKE #{safeDefinitionIndexCaliberPrecision} - - - - AND sdt_definition_index.caliber_description LIKE #{safeDefinitionIndexCaliberDescription} - - AND CONCAT(IFNULL(sdt_definition_index.index_type,''), IFNULL(sdt_definition_index.index_name,''), IFNULL(sdt_definition_index.index_en_name,''), IFNULL(sdt_definition_index.index_code,''), IFNULL(sdt_definition_index.index_level,''), IFNULL(sdt_definition_index.index_description,''), IFNULL(sdt_definition_index.data_type,''), IFNULL(sdt_definition_index.product_period,''), IFNULL(sdt_definition_index.caliber_calculate_function,''), IFNULL(sdt_definition_index.caliber_measure_unit,''), IFNULL(sdt_definition_index.caliber_precision,''), IFNULL(sdt_definition_index.caliber_description,'')) LIKE #{safeDefinitionIndexSearchString} + AND CONCAT(IFNULL(sdt_definition_index.index_type,''), IFNULL(sdt_definition_index.index_name,''), IFNULL(sdt_definition_index.index_en_name,''), IFNULL(sdt_definition_index.index_code,''), IFNULL(sdt_definition_index.index_level,''), IFNULL(sdt_definition_index.index_description,''), IFNULL(sdt_definition_index.data_type,''), IFNULL(sdt_definition_index.product_period,'')) LIKE #{safeDefinitionIndexSearchString} diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/ModelDesginFieldMapper.xml b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/ModelDesginFieldMapper.xml index 9a1d8f2..94d1fc6 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/ModelDesginFieldMapper.xml +++ b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/ModelDesginFieldMapper.xml @@ -33,6 +33,7 @@ + @@ -72,7 +73,8 @@ model_quote_standard, model_field_script, show_order, - process_id) + process_id, + standard_main_id) VALUES (#{item.id}, @@ -105,7 +107,8 @@ #{item.modelQuoteStandard}, #{item.modelFieldScript}, #{item.showOrder}, - #{item.processId}) + #{item.processId}, + #{item.standardMainId}) @@ -234,6 +237,9 @@ AND sdt_model_desgin_field.process_id = #{modelDesginFieldFilter.processId} + + AND sdt_model_desgin_field.standard_main_id = #{modelDesginFieldFilter.standardMainId} + AND CONCAT(IFNULL(sdt_model_desgin_field.model_field_name,''), IFNULL(sdt_model_desgin_field.model_field_code,''), IFNULL(sdt_model_desgin_field.model_field_type,''), IFNULL(sdt_model_desgin_field.model_field_index,''), IFNULL(sdt_model_desgin_field.model_field_meta_standard,''), IFNULL(sdt_model_desgin_field.model_field_value_standard,''), IFNULL(sdt_model_desgin_field.model_field_description,''), IFNULL(sdt_model_desgin_field.model_field_key,''), IFNULL(sdt_model_desgin_field.model_field_ppartition,''), IFNULL(sdt_model_desgin_field.model_field_source_name,''), IFNULL(sdt_model_desgin_field.model_field_source_type,''), IFNULL(sdt_model_desgin_field.model_field_source_table,''), IFNULL(sdt_model_desgin_field.model_field_mapping,''), IFNULL(sdt_model_desgin_field.model_quote_standard,''), IFNULL(sdt_model_desgin_field.model_field_script,'')) LIKE #{safeModelDesginFieldSearchString} diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardMainMapper.xml b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardMainMapper.xml index f7246d3..313a10e 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardMainMapper.xml +++ b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardMainMapper.xml @@ -20,6 +20,7 @@ + @@ -41,7 +42,8 @@ standard_english, standard_description, standard_input_mode, - standard_status) + standard_status, + standard_regular) VALUES (#{item.id}, @@ -61,7 +63,8 @@ #{item.standardEnglish}, #{item.standardDescription}, #{item.standardInputMode}, - #{item.standardStatus}) + #{item.standardStatus}, + #{item.standardRegular}) @@ -139,6 +142,10 @@ AND sdt_standard_main.standard_status LIKE #{safeStandardMainStandardStatus} + + + AND sdt_standard_main.standard_regular LIKE #{safeStandardMainStandardRegular} + AND CONCAT(IFNULL(sdt_standard_main.standard_name,''), IFNULL(sdt_standard_main.standard_code,''), IFNULL(sdt_standard_main.standard_type,''), IFNULL(sdt_standard_main.standard_english,''), IFNULL(sdt_standard_main.standard_description,''), IFNULL(sdt_standard_main.standard_input_mode,''), IFNULL(sdt_standard_main.standard_status,'')) LIKE #{safeStandardMainSearchString} diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardQuatityMapper.xml b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardQuatityMapper.xml index 0e37114..dd4e015 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardQuatityMapper.xml +++ b/application-webadmin/src/main/java/supie/webadmin/app/dao/mapper/StandardQuatityMapper.xml @@ -15,8 +15,9 @@ - - + + + @@ -42,6 +43,7 @@ standard_qaulity_re, standard_quality_lang, standard_quality_not_null, + custom_judgment, standard_quality_quality_center_id) VALUES @@ -60,6 +62,7 @@ #{item.standardQaulityRe}, #{item.standardQualityLang}, #{item.standardQualityNotNull}, + #{item.customJudgment}, #{item.standardQualityQualityCenterId}) @@ -120,13 +123,15 @@ AND sdt_standard_quatity.standard_qaulity_re LIKE #{safeStandardQuatityStandardQaulityRe} - - - AND sdt_standard_quatity.standard_quality_lang LIKE #{safeStandardQuatityStandardQualityLang} + + AND sdt_standard_quatity.standard_quality_lang = #{standardQuatityFilter.standardQualityLang} - - - AND sdt_standard_quatity.standard_quality_not_null LIKE #{safeStandardQuatityStandardQualityNotNull} + + AND sdt_standard_quatity.standard_quality_not_null = #{standardQuatityFilter.standardQualityNotNull} + + + + AND sdt_standard_quatity.custom_judgment LIKE #{safeStandardQuatityCustomJudgment} AND sdt_standard_quatity.standard_quality_quality_center_id = #{standardQuatityFilter.standardQualityQualityCenterId} diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java index 0d518fe..a539f06 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/CustomizeRouteDto.java @@ -98,6 +98,12 @@ public class CustomizeRouteDto { @ApiModelProperty(value = "业务规程ID") private Long processId; + /** + * 指标ID。 + */ + @ApiModelProperty(value = "指标ID") + private Long definitionIndexId; + /** * updateTime 范围过滤起始值(>=)。 */ diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java index 983f91d..6afce37 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/DefinitionIndexDto.java @@ -92,10 +92,10 @@ public class DefinitionIndexDto { private String indexDescription; /** - * 关联明细表id。 + * 动态路由id。 */ - @ApiModelProperty(value = "关联明细表id") - private Long modelLogicalId; + @ApiModelProperty(value = "动态路由id") + private Long customizeRouteId; /** * 关联字段。 @@ -115,30 +115,6 @@ public class DefinitionIndexDto { @ApiModelProperty(value = "生产周期") private String productPeriod; - /** - * 计算函数。 - */ - @ApiModelProperty(value = "计算函数") - private String caliberCalculateFunction; - - /** - * 度量单位。 - */ - @ApiModelProperty(value = "度量单位") - private String caliberMeasureUnit; - - /** - * 度量精度。 - */ - @ApiModelProperty(value = "度量精度") - private String caliberPrecision; - - /** - * 口径说明。 - */ - @ApiModelProperty(value = "口径说明") - private String caliberDescription; - /** * updateTime 范围过滤起始值(>=)。 */ @@ -164,7 +140,7 @@ public class DefinitionIndexDto { private String createTimeEnd; /** - * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / caliber_calculate_function / caliber_measure_unit / caliber_precision / caliber_description LIKE搜索字符串。 + * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / LIKE搜索字符串。 */ @ApiModelProperty(value = "LIKE模糊搜索字符串") private String searchString; diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java index a26b65a..c40975c 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/ModelDesginFieldDto.java @@ -173,6 +173,12 @@ public class ModelDesginFieldDto { @ApiModelProperty(value = "业务过程id") private Long processId; + /** + * 数据标准主表ID standard_main_id。 + */ + @ApiModelProperty(value = "数据标准主表ID") + private Long standardMainId; + /** * updateTime 范围过滤起始值(>=)。 */ diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java index 862cc4c..ccace30 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardMainDto.java @@ -97,6 +97,12 @@ public class StandardMainDto { @ApiModelProperty(value = "标准状态") private String standardStatus; + /** + * 正则表达式。 + */ + @ApiModelProperty(value = "正则表达式") + private String standardRegular; + /** * updateTime 范围过滤起始值(>=)。 */ diff --git a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java index 65b70ad..6e46c3a 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/dto/StandardQuatityDto.java @@ -68,16 +68,22 @@ public class StandardQuatityDto { private String standardQaulityRe; /** - * 质量校验长度限制。 + * 质量校验长度限制(正数->大等于。负数->小等于)。 */ - @ApiModelProperty(value = "质量校验长度限制") - private String standardQualityLang; + @ApiModelProperty(value = "质量校验长度限制(正数->大等于。负数->小等于)") + private Integer standardQualityLang; /** - * 质量校验不为空。 + * 质量校验不为空(1:不为空。0:可为空)。 */ - @ApiModelProperty(value = "质量校验不为空") - private String standardQualityNotNull; + @ApiModelProperty(value = "质量校验不为空(1:不为空。0:可为空)") + private Integer standardQualityNotNull; + + /** + * SQL条件。 + */ + @ApiModelProperty(value = "SQL条件") + private String customJudgment; /** * 质量校验中心关联规则。 diff --git a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java index 8634033..7e06406 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/liteFlow/node/DataTransferNode.java @@ -51,11 +51,14 @@ public class DataTransferNode extends BaseNode { */ private SeatunnelConfig seatunnelConfigModel; + String envJobModelValue = null; + @Autowired private SeatunnelConfigMapper seatunnelConfigMapper; @Autowired private RemoteHostMapper remoteHostMapper; + @Override public void beforeProcess() { dataTransferModel = JSONUtil.toBean(devLiteflowNode.getFieldJsonData(), DataTransferModel.class); @@ -71,15 +74,38 @@ public class DataTransferNode extends BaseNode { @Override public void process() throws Exception { + try { + JsonNode jsonNode = new JsonMapper().readTree(dataTransferModel.getSeaTunnelConfig()); + envJobModelValue = jsonNode.get("env").get("job.mode").asText(); + } catch (JsonProcessingException e) { + nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, e.getMessage())); + throw new MyLiteFlowException(new ErrorMessageModel(getClass(), e.getMessage())); + } //判断什么方式调用 Seatunnel if (this.seatunnelConfigModel.getCallMode() == 1) { // 通过接口的方式调用 Seatunnel nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用RestApi方式调用Seatunnel.")); restApiSubmitJob(); } else if (this.seatunnelConfigModel.getCallMode() == 2) { + // 判断是否需要等待 Seatunnel 任务执行完毕 // 通过 SSH 方式调用 Seatunnel nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, "使用SSH方式执行Seatunnel任务.")); - sshSubmitJob(); + if ("BATCH".equals(envJobModelValue)) { + // BATCH:需要等待执行完再执行下一个节点 + sshSubmitJob(); + } else { + String logFileName = "RunSeatunnelTask_" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS") + ".log"; + String message = "env节点下的job.mode节点值为[" + envJobModelValue + + "], 使用异步方式执行, Seatunnel任务执行日志[" + logFileName + "]将保存在Seatunnel安装位置下的 taskLog 目录下."; + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, message)); +// String a = "nohup ./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local > ./taskLog/RunSeatunnelTask.log 2>&1 &"; +// String b = "./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local | tee ./taskLog/seatunnel.log && sync"; + // 开启一个线程 + // 执行远程命令,并保存日志至 + new Thread(() -> { + sshSubmitJobAndUploadLogFil(logFileName); + }).start(); + } } } @@ -93,77 +119,75 @@ public class DataTransferNode extends BaseNode { */ @Override public void afterProcess() { - try { - JsonNode jsonNode = new JsonMapper().readTree(dataTransferModel.getSeaTunnelConfig()); - String envJobModelValue = jsonNode.get("env").get("job.mode").asText(); - if ("BATCH".equals(envJobModelValue)) { - // 判断该任务是否执行完毕(Api调用判断、SSH执行判断). - if (this.seatunnelConfigModel.getCallMode() == 2) { - // TODO SSH方式执行Seatunnel任务, 当前只能够等待任务执行完成才可以执行下一组件 - // 非等待任务则开启一个线程来执行 SSH 命令,log日志则保存在 Seatunnel 路径的log文件夹中 - return; + if ("BATCH".equals(envJobModelValue)) { + // 判断该任务是否执行完毕(Api调用判断、SSH执行判断). + if (this.seatunnelConfigModel.getCallMode() == 2) { + // SSH方式执行Seatunnel任务, 当前只能够等待任务执行完成才可以执行下一组件 + // 非等待任务则开启一个线程来执行 SSH 命令,log日志则保存在 Seatunnel 路径的log文件夹中 + return; + } + // API调用判断 + long startTime = System.currentTimeMillis(); + int sleepTime = 1000; //休眠时间 + while (true) { + StringBuilder url = new StringBuilder(seatunnelConfigModel.getLocalhostUri()); + url.append("/hazelcast/rest/maps/running-job/").append(dataTransferModel.getJobId()); + HttpResponse execute; + try { + execute = HttpRequest.get(url.toString()).execute(); + } catch (Exception e) { + String errorMessage = "SeatunnelRestApi(" + url.toString() + ")调用报错: " + e.getMessage(); + nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + errorMessage)); + throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage)); } - // API调用判断 - long startTime = System.currentTimeMillis(); - int sleepTime = 1000; //休眠时间 - while (true) { - StringBuilder url = new StringBuilder(seatunnelConfigModel.getLocalhostUri()); - url.append("/hazelcast/rest/maps/running-job/").append(dataTransferModel.getJobId()); - HttpResponse execute; - try { - execute = HttpRequest.get(url.toString()).execute(); - } catch (Exception e) { - String errorMessage = "SeatunnelRestApi(" + url.toString() + ")调用报错: " + e.getMessage(); - nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "执行失败: " + errorMessage)); - throw new MyLiteFlowException(new ErrorMessageModel(getClass(), errorMessage)); + if (!execute.isOk()) { + nodeLog.add(LiteFlowNodeLogModel.warn(nodeId, nodeTag, + "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行信息获取失败,失败信息为:" + execute.body())); + if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) { + nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!")); + throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!")); } - if (!execute.isOk()) { - nodeLog.add(LiteFlowNodeLogModel.warn(nodeId, nodeTag, - "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行信息获取失败,失败信息为:" + execute.body())); - if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) { - nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!")); - throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!")); - } - continue; - } - String body = URLUtil.decode(execute.body()); - // bodyJsonNode => {} 或 {"jobId":"","jobName":"","jobStatus":"","envOptions":{},"createTime":"","jobDag":{"vertices":[],"edges":[]},"pluginJarsUrls":[],"isStartWithSavePoint":false,"metrics":{"sourceReceivedCount":"","sinkWriteCount":""}} - JsonNode bodyJsonNode = new JsonMapper().readTree(body); - if (bodyJsonNode.size() == 0) { - // 未查询到该jobId所对应的任务信息{},认为该任务已经执行完毕。 - nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, - "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!")); - return; - } - if ("RUNNING".equals(bodyJsonNode.get("jobStatus").asText())) { - // 任务运行中,休眠5秒后继续判断是否完成. - nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, - "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行中...")); - if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) { - nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!")); - throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!")); - } - ThreadUtil.sleep(sleepTime); - //每次休眠后都追加3秒的时间,直至休眠时间大于1分钟。 - if (sleepTime < 60000) sleepTime = sleepTime + 3000; - } else { - nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, - "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!")); - return; + continue; + } + String body = URLUtil.decode(execute.body()); + // bodyJsonNode => {} 或 {"jobId":"","jobName":"","jobStatus":"","envOptions":{},"createTime":"","jobDag":{"vertices":[],"edges":[]},"pluginJarsUrls":[],"isStartWithSavePoint":false,"metrics":{"sourceReceivedCount":"","sinkWriteCount":""}} + JsonNode bodyJsonNode; + try { + bodyJsonNode = new JsonMapper().readTree(body); + } catch (JsonProcessingException e) { + throw new MyLiteFlowException(new ErrorMessageModel(getClass(), e.getMessage())); + } + if (bodyJsonNode.size() == 0) { + // 未查询到该jobId所对应的任务信息{},认为该任务已经执行完毕。 + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, + "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!")); + return; + } + if ("RUNNING".equals(bodyJsonNode.get("jobStatus").asText())) { + // 任务运行中,休眠5秒后继续判断是否完成. + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, + "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行中...")); + if ((System.currentTimeMillis() - startTime) > 60 * 60 * 1000) { + nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, "任务执行超时(1小时),请检查!")); + throw new MyLiteFlowException(new ErrorMessageModel(getClass(), "任务执行超时(1小时),请检查!")); } + ThreadUtil.sleep(sleepTime); + //每次休眠后都追加3秒的时间,直至休眠时间大于1分钟。 + if (sleepTime < 60000) sleepTime = sleepTime + 3000; + } else { + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, + "Seatunnel任务[jobId:" + dataTransferModel.getJobId() + "]执行完毕!")); + return; } - } else if ("STREAMING".equals(envJobModelValue)) { - // 无需等待Seatunnel任务执行完成,直接执行下一个节点. - nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, - "env节点下的job.mode节点值为\"STREAMING\", 不关心执行结果, 直接执行下一组件节点.")); - return; } + } else if ("STREAMING".equals(envJobModelValue)) { + // 无需等待Seatunnel任务执行完成,直接执行下一个节点. nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, - "'env'节点下的'job.mode'节点值为'" + envJobModelValue + "', 不关心该数据传输组件中的 Seatunnel 执行结果, 直接执行下一组件节点.")); - } catch (JsonProcessingException e) { - nodeLog.add(LiteFlowNodeLogModel.error(nodeId, nodeTag, e.getMessage())); - throw new MyLiteFlowException(new ErrorMessageModel(getClass(), e.getMessage())); + "env节点下的job.mode节点值为\"STREAMING\", 不关心执行结果, 直接执行下一组件节点.")); + return; } + nodeLog.add(LiteFlowNodeLogModel.info(nodeId, nodeTag, + "'env'节点下的'job.mode'节点值为'" + envJobModelValue + "', 不关心该数据传输组件中的 Seatunnel 执行结果, 直接执行下一组件节点.")); } private void restApiSubmitJob() { @@ -259,6 +283,42 @@ public class DataTransferNode extends BaseNode { file.delete(); } + private void sshSubmitJobAndUploadLogFil(String logFileName) { + // 根据项目ID 获取到该项目的远程服务器的配置 + RemoteHost remoteHost = remoteHostMapper.selectById(seatunnelConfigModel.getRemoteHostId()); + RemoteShell remoteShell = new RemoteShellSshjImpl( + remoteHost.getHostIp(), remoteHost.getHostPort(), + remoteHost.getLoginName(), remoteHost.getPassword(), null); + /** + * 临时文件路径 + */ + String tempFilePathOfConfig = "./tempFolder/" + DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss-SSS-") + + RandomUtil.randomString(5) + "-config.json"; + contentWriteToFile(tempFilePathOfConfig, JSONUtil.toJsonStr(dataTransferModel.getSeaTunnelConfig())); + // 上传配置文件(v2.batch.config.template)至 seatunnel 的 ./config/ 中 + String remoteConfigName = "v2-supie-config" + DateUtil.format(new Date(), "-yyyy-MM-dd-HH-mm-ss-SSS") + ".json"; + String remoteFilePathOfConfig = seatunnelConfigModel.getSeatunnelPath() + "/config/" + remoteConfigName; + remoteShell.uploadFile(tempFilePathOfConfig, remoteFilePathOfConfig); + // 执行命令 + String resultMsg = remoteShell.execCommands( + "cd " + seatunnelConfigModel.getSeatunnelPath(), + "sh bin/seatunnel.sh --config config/" + remoteConfigName + " -e local"); + // 日志保存及上传 + String tempFilePathOfLog = "./tempFolder/" + logFileName; + String remoteFilePathOfLog = seatunnelConfigModel.getSeatunnelPath() + "/taskLog/" + logFileName; + contentWriteToFile(tempFilePathOfLog, resultMsg); + remoteShell.uploadFile(tempFilePathOfLog, remoteFilePathOfLog); + remoteShell.close(); + // 存储执行结果信息 + if (resultMsg == null) resultMsg = "无回执结果信息, 请检查!"; + devLiteflowNodeMapper.setExecutionMessage(this.rulerId, this.nodeId, this.nodeTag, resultMsg); + // 删除创建的临时文件 + File configFile = new File(tempFilePathOfConfig); + configFile.delete(); + File logFile = new File(tempFilePathOfLog); + logFile.delete(); + } + /** * 写入文件输入流 * diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java b/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java index bf8e336..0cd0230 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/model/CustomizeRoute.java @@ -96,6 +96,11 @@ public class CustomizeRoute extends BaseModel { */ private Long processId; + /** + * 指标ID。 + */ + private Long definitionIndexId; + /** * updateTime 范围过滤起始值(>=)。 */ diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java b/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java index b2a8f9c..c090701 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/model/DefinitionIndex.java @@ -92,9 +92,9 @@ public class DefinitionIndex extends BaseModel { private String indexDescription; /** - * 关联明细表id。 + * 动态路由id。 */ - private Long modelLogicalId; + private Long customizeRouteId; /** * 关联字段。 @@ -111,26 +111,6 @@ public class DefinitionIndex extends BaseModel { */ private String productPeriod; - /** - * 计算函数。 - */ - private String caliberCalculateFunction; - - /** - * 度量单位。 - */ - private String caliberMeasureUnit; - - /** - * 度量精度。 - */ - private String caliberPrecision; - - /** - * 口径说明。 - */ - private String caliberDescription; - /** * updateTime 范围过滤起始值(>=)。 */ @@ -156,7 +136,7 @@ public class DefinitionIndex extends BaseModel { private String createTimeEnd; /** - * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / caliber_calculate_function / caliber_measure_unit / caliber_precision / caliber_description LIKE搜索字符串。 + * index_type / index_name / index_en_name / index_code / index_level / index_description / data_type / product_period / LIKE搜索字符串。 */ @TableField(exist = false) private String searchString; diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java b/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java index 262686a..1169fbf 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/model/ModelDesginField.java @@ -171,6 +171,11 @@ public class ModelDesginField extends BaseModel { */ private Long processId; + /** + * 数据标准主表ID standard_main_id。 + */ + private Long standardMainId; + /** * updateTime 范围过滤起始值(>=)。 */ diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java index b2c5c62..584561e 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardMain.java @@ -96,6 +96,11 @@ public class StandardMain extends BaseModel { */ private String standardStatus; + /** + * 正则表达式。 + */ + private String standardRegular; + /** * updateTime 范围过滤起始值(>=)。 */ diff --git a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java index b123950..d2cb9e4 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/model/StandardQuatity.java @@ -72,14 +72,19 @@ public class StandardQuatity extends BaseModel { private String standardQaulityRe; /** - * 质量校验长度限制。 + * 质量校验长度限制(正数->大等于。负数->小等于)。 */ - private String standardQualityLang; + private Integer standardQualityLang; /** - * 质量校验不为空。 + * 质量校验不为空(1:不为空。0:可为空)。 */ - private String standardQualityNotNull; + private Integer standardQualityNotNull; + + /** + * SQL条件。 + */ + private String customJudgment; /** * 质量校验中心关联规则。 diff --git a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java index 2ea93e0..ba1e9c8 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/BaseDataSource.java @@ -335,21 +335,29 @@ public class BaseDataSource { tableName = sqlList.get(1); } ResultSet resultSet = metaData.getColumns(databaseName, schemaPattern, tableName, null); + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + int columnCount = resultSetMetaData.getColumnCount(); resultData = new ArrayList<>(); while (resultSet.next()) { HashMap dataTypeMap = new HashMap<>(); - // 字段名 - String columnName = resultSet.getString("COLUMN_NAME"); - // 字段类型 - String dataType = resultSet.getString("TYPE_NAME"); - // 字段大小 - int columnSize = resultSet.getInt("COLUMN_SIZE"); - // 字段注释 - String columnComment = resultSet.getString("REMARKS"); - dataTypeMap.put("fieldName",columnName); - dataTypeMap.put("typeName",dataType); - dataTypeMap.put("columnSize",columnSize); - dataTypeMap.put("remarks",columnComment); + for (int i = 1; i <= columnCount; i++) { + String tableFieldName = resultSetMetaData.getColumnLabel(i); + dataTypeMap.put(tableFieldName, resultSet.getObject(tableFieldName)); + } +// // 字段名 +// String columnName = resultSet.getString("COLUMN_NAME"); +// // 字段类型 +// String dataType = resultSet.getString("TYPE_NAME"); +// // 字段大小 +// int columnSize = resultSet.getInt("COLUMN_SIZE"); +// // 字段注释 +// String columnComment = resultSet.getString("REMARKS"); +// String nullable = resultSet.getString("NULLABLE"); +// dataTypeMap.put("fieldName", columnName); +// dataTypeMap.put("typeName", dataType); +// dataTypeMap.put("columnSize", columnSize); +// dataTypeMap.put("remarks", columnComment); +// dataTypeMap.put("nullable", nullable); resultData.add(dataTypeMap); } resultSet.close(); diff --git a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java index 067ca12..b966455 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/service/databasemanagement/strategyImpl/DataSourceDoris.java @@ -1,5 +1,6 @@ package supie.webadmin.app.service.databasemanagement.strategyImpl; +import cn.hutool.core.text.StrSplitter; import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -9,6 +10,11 @@ import supie.webadmin.app.service.databasemanagement.Strategy; import supie.webadmin.app.service.databasemanagement.StrategyFactory; import javax.annotation.PostConstruct; +import java.sql.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * 描述: @@ -58,4 +64,52 @@ public class DataSourceDoris extends BaseDataSource implements Strategy { initConnection(); } + /** + * 查询数据库数据的字段名及类型 + * + * @param databaseName + * @param tableName + * @return + */ + @Override + public List> queryTableFields(String databaseName, String tableName) { + List> resultData; + try { + // 查询表结构 + Statement statement = connection.createStatement(); + String query = "SHOW COLUMNS FROM " + tableName; + ResultSet resultSet = statement.executeQuery(query); + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + resultData = new ArrayList<>(); + while (resultSet.next()) { + // 获取到字段名称 (Field、Type、Null、Key、Default) + HashMap dataTypeMap = new HashMap<>(); + for (int i = 1; i <= columnCount; i++) { + String tableFieldName = metaData.getColumnLabel(i); + dataTypeMap.put(tableFieldName, resultSet.getObject(tableFieldName)); + } +// // 字段名 +// String columnName = resultSet.getString("Field"); +// // 字段类型 +// String dataType = resultSet.getString("Type"); +// // 字段大小 +// int columnSize = resultSet.getInt("COLUMN_SIZE"); +// // 字段注释 +// String columnComment = resultSet.getString("Default"); +// String nullable = resultSet.getString("Null"); +// dataTypeMap.put("fieldName", columnName); +// dataTypeMap.put("typeName", dataType); +// dataTypeMap.put("columnSize", columnSize); +// dataTypeMap.put("remarks", columnComment); +// dataTypeMap.put("nullable", nullable); + resultData.add(dataTypeMap); + } + resultSet.close(); + statement.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + return resultData; + } } diff --git a/application-webadmin/src/main/java/supie/webadmin/app/vo/CustomizeRouteVo.java b/application-webadmin/src/main/java/supie/webadmin/app/vo/CustomizeRouteVo.java index b18349b..1516ae2 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/vo/CustomizeRouteVo.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/vo/CustomizeRouteVo.java @@ -73,6 +73,12 @@ public class CustomizeRouteVo extends BaseVo { @ApiModelProperty(value = "存算引擎项目ID") private Long projectId; + /** + * 指标ID。 + */ + @ApiModelProperty(value = "指标ID") + private Long definitionIndexId; + /** * 目标数据库名称。 */ diff --git a/application-webadmin/src/main/java/supie/webadmin/app/vo/DefinitionIndexVo.java b/application-webadmin/src/main/java/supie/webadmin/app/vo/DefinitionIndexVo.java index 618ec9b..4ab6926 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/vo/DefinitionIndexVo.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/vo/DefinitionIndexVo.java @@ -93,10 +93,10 @@ public class DefinitionIndexVo extends BaseVo { private String indexDescription; /** - * 关联明细表id。 + * 动态路由id。 */ - @ApiModelProperty(value = "关联明细表id") - private Long modelLogicalId; + @ApiModelProperty(value = "动态路由id") + private Long customizeRouteId; /** * 关联字段。 @@ -116,27 +116,4 @@ public class DefinitionIndexVo extends BaseVo { @ApiModelProperty(value = "生产周期") private String productPeriod; - /** - * 计算函数。 - */ - @ApiModelProperty(value = "计算函数") - private String caliberCalculateFunction; - - /** - * 度量单位。 - */ - @ApiModelProperty(value = "度量单位") - private String caliberMeasureUnit; - - /** - * 度量精度。 - */ - @ApiModelProperty(value = "度量精度") - private String caliberPrecision; - - /** - * 口径说明。 - */ - @ApiModelProperty(value = "口径说明") - private String caliberDescription; } diff --git a/application-webadmin/src/main/java/supie/webadmin/app/vo/ModelDesginFieldVo.java b/application-webadmin/src/main/java/supie/webadmin/app/vo/ModelDesginFieldVo.java index 2cec467..4d1ae63 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/vo/ModelDesginFieldVo.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/vo/ModelDesginFieldVo.java @@ -176,6 +176,12 @@ public class ModelDesginFieldVo extends BaseVo { @ApiModelProperty(value = "业务过程id") private Long processId; + /** + * 数据标准主表ID standard_main_id。 + */ + @ApiModelProperty(value = "数据标准主表ID") + private Long standardMainId; + /** * id 的多对多关联表数据对象,数据对应类型为DefinitionIndexModelFieldRelationVo。 */ diff --git a/application-webadmin/src/main/java/supie/webadmin/app/vo/StandardMainVo.java b/application-webadmin/src/main/java/supie/webadmin/app/vo/StandardMainVo.java index 31dda04..bccc6b4 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/vo/StandardMainVo.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/vo/StandardMainVo.java @@ -96,4 +96,11 @@ public class StandardMainVo extends BaseVo { */ @ApiModelProperty(value = "标准状态") private String standardStatus; + + /** + * 正则表达式。 + */ + @ApiModelProperty(value = "正则表达式") + private String standardRegular; + } diff --git a/application-webadmin/src/main/java/supie/webadmin/app/vo/StandardQuatityVo.java b/application-webadmin/src/main/java/supie/webadmin/app/vo/StandardQuatityVo.java index 057a86d..a9211fd 100644 --- a/application-webadmin/src/main/java/supie/webadmin/app/vo/StandardQuatityVo.java +++ b/application-webadmin/src/main/java/supie/webadmin/app/vo/StandardQuatityVo.java @@ -69,16 +69,22 @@ public class StandardQuatityVo extends BaseVo { private String standardQaulityRe; /** - * 质量校验长度限制。 + * 质量校验长度限制(正数->大等于。负数->小等于)。 */ - @ApiModelProperty(value = "质量校验长度限制") - private String standardQualityLang; + @ApiModelProperty(value = "质量校验长度限制(正数->大等于。负数->小等于)") + private Integer standardQualityLang; /** - * 质量校验不为空。 + * 质量校验不为空(1:不为空。0:可为空)。 */ - @ApiModelProperty(value = "质量校验不为空") - private String standardQualityNotNull; + @ApiModelProperty(value = "质量校验不为空(1:不为空。0:可为空)") + private Integer standardQualityNotNull; + + /** + * SQL条件。 + */ + @ApiModelProperty(value = "SQL条件") + private String customJudgment; /** * 质量校验中心关联规则。 -- Gitee