KEY_FIELDS_PREFIX =
+ ConfigOptions.key("key.fields-prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines a custom prefix for all fields of the key format to avoid "
+ + "name clashes with fields of the value format. By default, the prefix is empty. "
+ + "If a custom prefix is defined, both the table schema and "
+ + "'"
+ + KEY_FIELDS.key()
+ + "' will work with prefixed names. When constructing "
+ + "the data type of the key format, the prefix will be removed and the "
+ + "non-prefixed names will be used within the key format. Please note that this "
+ + "option requires that '"
+ + VALUE_FIELDS_INCLUDE.key()
+ + "' must be '"
+ + ValueFieldsStrategy.EXCEPT_KEY
+ + "'.");
+
+
+// public static StartupMode getStartupMode(ReadableConfig tableOptions) {
+// StartupMode startupMode = tableOptions.getOptional(SCAN_START_MODE)
+// .map(modeString -> {
+// switch (modeString) {
+// case (SCAN_STARTUP_MODE_VALUE_EARLIEST):
+// return StartupMode.EARLIEST;
+// case (SCAN_STARTUP_MODE_VALUE_LATEST):
+// return StartupMode.LATEST;
+// default:throw new TableException("Unsupported scan.start.mode:" + modeString);
+// }
+// }).orElse(StartupMode.LATEST);
+// return startupMode;
+// }
+
+ /**
+ * Creates an array of indices that determine which physical fields of the table schema to
+ * include in the key format and the order that those fields have in the key format.
+ *
+ * See {@link #KEY_FORMAT}, {@link #KEY_FIELDS}, and {@link #KEY_FIELDS_PREFIX} for more
+ * information.
+ */
+ public static int[] createKeyFormatProjection(
+ ReadableConfig options, DataType physicalDataType) {
+ final LogicalType physicalType = physicalDataType.getLogicalType();
+ Preconditions.checkArgument(
+ hasRoot(physicalType, LogicalTypeRoot.ROW), "Row data type expected.");
+ final Optional optionalKeyFormat = options.getOptional(KEY_FORMAT);
+ final Optional> optionalKeyFields = options.getOptional(KEY_FIELDS);// Optional[[k_age, k_kk]]
+
+ if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "The option '%s' can only be declared if a key format is defined using '%s'.",
+ KEY_FIELDS.key(), KEY_FORMAT.key()));
+ } else if (optionalKeyFormat.isPresent()
+ && (!optionalKeyFields.isPresent() || optionalKeyFields.get().size() == 0)) {
+ throw new ValidationException(
+ String.format(
+ "A key format '%s' requires the declaration of one or more of key fields using '%s'.",
+ KEY_FORMAT.key(), KEY_FIELDS.key()));
+ }
+
+ if (!optionalKeyFormat.isPresent()) {
+ return new int[0];
+ }
+
+ final String keyPrefix = options.getOptional(KEY_FIELDS_PREFIX).orElse("");
+
+ final List keyFields = optionalKeyFields.get();
+ final List physicalFields = LogicalTypeChecks.getFieldNames(physicalType);
+ return keyFields.stream()
+ .mapToInt(
+ keyField -> {
+ final int pos = physicalFields.indexOf(keyField);
+ // check that field name exists
+ if (pos < 0) {
+ throw new ValidationException(
+ String.format(
+ "Could not find the field '%s' in the table schema for usage in the key format. "
+ + "A key field must be a regular, physical column. "
+ + "The following columns can be selected in the '%s' option:\n"
+ + "%s",
+ keyField, KEY_FIELDS.key(), physicalFields));
+ }
+ // check that field name is prefixed correctly
+ if (!keyField.startsWith(keyPrefix)) {
+ throw new ValidationException(
+ String.format(
+ "All fields in '%s' must be prefixed with '%s' when option '%s' "
+ + "is set but field '%s' is not prefixed.",
+ KEY_FIELDS.key(),
+ keyPrefix,
+ KEY_FIELDS_PREFIX.key(),
+ keyField));
+ }
+ return pos;
+ })
+ .toArray();
+ }
+
+ /**
+ * Creates an array of indices that determine which physical fields of the table schema to
+ * include in the value format.
+ *
+ * See {@link #VALUE_FORMAT}, {@link #VALUE_FIELDS_INCLUDE}, and {@link #KEY_FIELDS_PREFIX}
+ * for more information.
+ */
+ public static int[] createValueFormatProjection(
+ ReadableConfig options, DataType physicalDataType) {
+ final LogicalType physicalType = physicalDataType.getLogicalType();
+ Preconditions.checkArgument(
+ hasRoot(physicalType, LogicalTypeRoot.ROW), "Row data type expected.");
+ final int physicalFieldCount = LogicalTypeChecks.getFieldCount(physicalType);
+ final IntStream physicalFields = IntStream.range(0, physicalFieldCount);
+
+ final String keyPrefix = options.getOptional(KEY_FIELDS_PREFIX).orElse("");
+
+ final ValueFieldsStrategy strategy = options.get(VALUE_FIELDS_INCLUDE);
+ if (strategy == ValueFieldsStrategy.ALL) {
+ if (keyPrefix.length() > 0) {
+ throw new ValidationException(
+ String.format(
+ "A key prefix is not allowed when option '%s' is set to '%s'. "
+ + "Set it to '%s' instead to avoid field overlaps.",
+ VALUE_FIELDS_INCLUDE.key(),
+ ValueFieldsStrategy.ALL,
+ ValueFieldsStrategy.EXCEPT_KEY));
+ }
+ return physicalFields.toArray();
+ } else if (strategy == ValueFieldsStrategy.EXCEPT_KEY) {
+ final int[] keyProjection = createKeyFormatProjection(options, physicalDataType);
+ return physicalFields
+ .filter(pos -> IntStream.of(keyProjection).noneMatch(k -> k == pos))
+ .toArray();
+ }
+ throw new TableException("Unknown value fields strategy:" + strategy);
+ }
+
+ enum ValueFieldsStrategy{
+ EXCEPT_KEY,
+ ALL,
+ }
+}
diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQSource.java b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQSource.java
new file mode 100644
index 0000000000000000000000000000000000000000..b9c1d0dfed5c8053feeecaf8c2be66d838b5dae8
--- /dev/null
+++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQSource.java
@@ -0,0 +1,418 @@
+package com.zto.fire.flink.sql.connector.rocketmq;
+
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.RocketMQConfig;
+import org.apache.rocketmq.flink.RocketMQUtils;
+import org.apache.rocketmq.flink.RunningChecker;
+import org.apache.rocketmq.flink.util.MetricUtils;
+import org.apache.rocketmq.flink.util.RetryUtil;
+import org.apache.rocketmq.flink.watermark.WaterMarkForAll;
+import org.apache.rocketmq.flink.watermark.WaterMarkPerQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * 描述:{@link org.apache.rocketmq.client.consumer.DefaultMQPullConsumer} 弃用,
+ * 推荐使用 {@link DefaultLitePullConsumer}
+ * 时间:3/11/2021 上午9:31
+ */
+public class FireRocketMQSource extends RichParallelSourceFunction
+ implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable {
+ private static final Logger LOG = LoggerFactory.getLogger(FireRocketMQSource.class);
+ private static final long serialVersionUID = 1L;
+ // state name
+ private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
+ private transient ListState> unionOffsetStates;
+ private LinkedMap pendingOffsetsToCommit = new LinkedMap();
+ private RunningChecker runningChecker;
+ private transient volatile boolean restored;
+ private transient boolean enableCheckpoint;
+ private RocketMQCollector rocketMQCollector;
+ private List assignQueues;
+ private ExecutorService executor;
+ private DefaultLitePullConsumer consumer;
+ private final RocketMQDeserializationSchema deserializer;
+ private final Properties props;
+ private String topic;
+ private String group;
+ private final Map offsetTable = new ConcurrentHashMap<>();
+ private ScheduledExecutorService timer;
+ // watermark in source
+ private WaterMarkPerQueue waterMarkPerQueue;
+ private WaterMarkForAll waterMarkForAll;
+ private Meter tpsMetric;
+
+ private boolean sendAllTag;
+ private Set tagSet;
+
+ public FireRocketMQSource(RocketMQDeserializationSchema deserializer, Properties props) {
+ this.deserializer = deserializer;
+ this.props = props;
+ String tags = props.getProperty(RocketMQConfig.CONSUMER_TAG);
+ // 如果没传tag 或者 tag 为 *,则表示发送所有数据
+ if (tags == null || "*".equals(tags.trim())) {
+ sendAllTag = true;
+ } else {
+ tagSet = new HashSet<>();
+ // 可以指定过滤多个tag,使用逗号分隔 如:accuracy1,accuracy2
+ String[] split = tags.split(",");
+ for (String s : split) {
+ tagSet.add(s.trim());
+ }
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ LOG.debug("source run ......");
+
+ Validate.notEmpty(props, "Consumer properties can not be empty");
+
+ runningChecker = new RunningChecker();
+ runningChecker.setRunning(true);
+ this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
+ this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
+
+ Validate.notEmpty(topic, "Consumer topic can not be empty");
+ Validate.notEmpty(group, "Consumer group can not be empty");
+ this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
+
+ rocketMQCollector = new RocketMQCollector();
+
+ waterMarkPerQueue = new WaterMarkPerQueue(5000);
+
+ waterMarkForAll = new WaterMarkForAll(5000);
+
+ Counter outputCounter = getRuntimeContext().getMetricGroup()
+ .counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
+ tpsMetric = getRuntimeContext().getMetricGroup()
+ .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
+ final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("rmq-pull-thread-%d").build();
+ executor = Executors.newCachedThreadPool(threadFactory);
+
+ timer = Executors.newSingleThreadScheduledExecutor();
+ // 初始化consumer 并分配队列
+ startConsumer();
+ }
+
+ @Override
+ public void run(SourceContext context) throws Exception {
+ // 只有当consumer分配到queue时,才进行消费
+ if (!assignQueues.isEmpty()) {
+ timer.scheduleAtFixedRate(() -> {
+ context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
+ context.emitWatermark(waterMarkForAll.getCurrentWatermark());
+ }, 5, 5, TimeUnit.SECONDS);
+
+ this.executor.execute(() -> RetryUtil.call(() -> {
+ while (runningChecker.isRunning()) {
+ List messages = consumer.poll();
+ for (MessageExt msg : messages) {
+ deserializer.deserialize(msg, rocketMQCollector);
+ // get record from collector and use sourceContext emit it to down task
+ Queue records = rocketMQCollector.getRecords();
+ synchronized (context.getCheckpointLock()) {
+ OUT record;
+ while ((record = records.poll()) != null) {
+ String tags = msg.getTags();
+ // 如果是发送所有消息(未指定tag或者tag为*) 或者 指定tag了,并且消息的tag在指定的tag之内,则下发消息
+ if (sendAllTag || tagSet.contains(tags)) {
+ context.collectWithTimestamp(record, msg.getBornTimestamp());
+ }
+ }
+ // record offset to offset table
+ recordBrokerOffset(msg);
+
+ // update max eventTime per queue
+ waterMarkPerQueue.extractTimestamp(buildMessageQueue(msg), msg.getBornTimestamp());
+ waterMarkForAll.extractTimestamp(msg.getBornTimestamp());
+ tpsMetric.markEvent();
+ }
+ }
+ }
+ return true;
+ }, "RuntimeException"));
+ }
+
+ awaitTermination();
+ }
+
+ private void awaitTermination() throws InterruptedException {
+ while (runningChecker.isRunning()) {
+ Thread.sleep(50);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ LOG.debug("cancel ...");
+ runningChecker.setRunning(false);
+
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+
+ if (offsetTable != null) {
+ offsetTable.clear();
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+ return;
+ }
+
+ Map offsets = (Map) pendingOffsetsToCommit.remove(posInMap);
+
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingOffsetsToCommit.remove(0);
+ }
+
+ if (offsets == null || offsets.size() == 0) {
+ LOG.debug("Checkpoint state was empty.");
+ return;
+ }
+
+ for (Map.Entry entry : offsets.entrySet()) {
+ // MessageQueue中记录的 offset 会比此消息实际offset少1,所以在更新的时候需要 + 1
+ LOG.info("update {} to {}", entry.getKey(), entry.getValue() + 1);
+// consumer.commitSync();
+// consumer.getOffsetStore().updateConsumeOffsetToBroker(entry.getKey(), entry.getValue(), true);
+ consumer.getOffsetStore().updateOffset(entry.getKey(), entry.getValue() + 1, false);
+ }
+ }
+
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ // called when a snapshot for a checkpoint is requested
+ LOG.info("Snapshotting state {} ...", context.getCheckpointId());
+ if (!runningChecker.isRunning()) {
+ LOG.info("snapshotState() called on closed source; returning null.");
+ return;
+ }
+
+ // Discovery topic Route change when snapshot
+ // 每次进行snapshot时,进行rocket-mq queue检查,
+ // 如果有新增的queue,则进行报错,需要进行重启重新分配queue。
+
+ // todo 这里可以当发现queue增加后,直接进行分配消费
+ RetryUtil.call(() -> {
+ List newQueues = getAssignQueues();
+ Collections.sort(newQueues);
+ if (!this.assignQueues.equals(newQueues)) {
+ throw new RuntimeException("topic route changed, this task need to be restarted!");
+ }
+ return true;
+ }, "RuntimeException due to topic route changed");
+
+
+ unionOffsetStates.clear();
+ Map currentOffset = new HashMap<>(assignQueues.size());
+ for (Map.Entry entry : offsetTable.entrySet()) {
+ LOG.info("snapshot {}, offset {}", entry.getKey(), entry.getValue());
+ unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
+ currentOffset.put(entry.getKey(), entry.getValue());
+ }
+ pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffset);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ LOG.info("initialize State ...");
+ unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
+ OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() {
+ })));
+ this.restored = context.isRestored();
+ if (restored) {
+ unionOffsetStates.get().forEach(t -> offsetTable.put(t.f0, t.f1));
+ LOG.info("Restore from state, {}", offsetTable);
+ } else {
+ LOG.info("No restored from state ......");
+ }
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return deserializer.getProducedType();
+ }
+
+ private void recordBrokerOffset(MessageExt message) {
+ MessageQueue mq = buildMessageQueue(message);
+ long queueOffset = message.getQueueOffset();
+ offsetTable.put(mq, queueOffset);
+ // 如果没有启用chk,并且没有启用自动提交,那么每次要提交offset
+ if (!enableCheckpoint && !consumer.isAutoCommit()) {
+ consumer.commitSync();
+ }
+ }
+
+ private MessageQueue buildMessageQueue(MessageExt message) {
+ String topic = message.getTopic();
+ String brokerName = message.getBrokerName();
+ int queueId = message.getQueueId();
+ return new MessageQueue(topic, brokerName, queueId);
+ }
+
+ private void startConsumer() throws MQClientException {
+ LOG.info("consumer start ");
+ this.consumer = new DefaultLitePullConsumer(this.group, RocketMQConfig.buildAclRPCHook(props));
+ String nameServers = props.getProperty(RocketMQConfig.NAME_SERVER_ADDR);
+ Validate.notEmpty(nameServers);
+ this.consumer.setNamesrvAddr(nameServers);
+ this.consumer.setPollNameServerInterval(RocketMQUtils.getInteger(props,
+ RocketMQConfig.NAME_SERVER_POLL_INTERVAL, RocketMQConfig.DEFAULT_NAME_SERVER_POLL_INTERVAL));
+ this.consumer.setHeartbeatBrokerInterval(RocketMQUtils.getInteger(props,
+ RocketMQConfig.BROKER_HEART_BEAT_INTERVAL, RocketMQConfig.DEFAULT_BROKER_HEART_BEAT_INTERVAL));
+ String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
+ int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
+ String instanceName = RocketMQUtils.getInstanceName(runtimeName, topic, group,
+ String.valueOf(indexOfThisSubTask), String.valueOf(System.nanoTime()));
+ this.consumer.setInstanceName(instanceName);
+ boolean autoCommit = RocketMQUtils.getBoolean(props, RocketMQConfig.OFFSET_AUTO_COMMIT, false);
+ this.consumer.setAutoCommit(autoCommit);
+ this.consumer.start();
+ this.assignQueues = getAssignQueues();
+ // 如果此task分配的队列为空,则停止此consumer
+ if (!this.assignQueues.isEmpty()) {
+ this.consumer.assign(this.assignQueues);
+ // 从offsetTable中移除不在本task 中分配的队列,做snapshot,
+ // 并从恢复的offset中,seek到上次的offset
+ removeUnAssignQueues();
+
+ // 每个MessageQueue指定offset消费
+ perQueueSeekToSpecialOffset();
+ } else {
+ this.offsetTable.clear();
+ }
+ }
+
+ private List getAssignQueues() throws MQClientException {
+ final RuntimeContext ctx = getRuntimeContext();
+ int taskNumber = ctx.getNumberOfParallelSubtasks();
+ int taskIndex = ctx.getIndexOfThisSubtask();
+ Collection totalQueues = this.consumer.fetchMessageQueues(this.topic);
+ List shouldAssignQueues = RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
+ return shouldAssignQueues;
+ }
+
+ // 从offsetTable中移除此consumer未消费的信息
+ private void removeUnAssignQueues() throws MQClientException {
+ // offset table 开始从状态恢复,保存的是全部queue的信息,移除多余的
+ this.offsetTable.forEach((k, v) -> {
+ if (!this.assignQueues.contains(k)) {
+ this.offsetTable.remove(k);
+ }
+ });
+ }
+
+ // 根据不同策略,指定不同offset
+ private void perQueueSeekToSpecialOffset() throws MQClientException {
+ for (MessageQueue mq : this.assignQueues) {
+ if (this.offsetTable.containsKey(mq)) {
+ long nextOffset = this.offsetTable.get(mq) + 1;
+ LOG.info("consumer seek {} from state, offset {}", mq, nextOffset);
+ this.consumer.seek(mq, nextOffset);
+ } else {
+ // 1、如果是从状态恢复,但是找不到,那么这个offset 就是新加入的
+ // 2、 另一种可能是,当前这种策略,每个offset table中只保留当前TaskManager的 MQ offset,
+ // 如果在这段时间,某个mq没有数据尽量,checkpoint时没有此mq的数据,这时也找不到,如果设置从seekToBegin就会有问题
+ // 目前在生产环境,这种情况应该不会出现。
+ if (this.restored) {
+ this.consumer.seekToBegin(mq);
+ LOG.info("restore but not found in offsetTable, seek {} to begin", mq);
+ } else {
+ // 不是restored,那么就是直接重启,根据提供的策略选择offset
+ String offsetFrom = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, "latest");
+// Offset initialOffsetFrom = Offset.valueOf(offsetFrom);
+ switch (offsetFrom) {
+ case "earliest-offset":
+ this.consumer.seekToBegin(mq);
+ LOG.info("{} seek to begin ......", mq);
+ break;
+ case "latest-offset":
+ this.consumer.seekToEnd(mq);
+ LOG.info("{} seek to end ......", mq);
+ break;
+ case "store":
+ long storedOffset = this.consumer.getOffsetStore().readOffset(mq, ReadOffsetType.READ_FROM_STORE);
+ LOG.info("{} seek to stored offset {}", mq, storedOffset);
+ this.consumer.seek(mq, storedOffset);
+ break;
+ default:
+ throw new RuntimeException(String.format("Not supported Offset [%s]", offsetFrom));
+ }
+ }
+ }
+ }
+ }
+
+ // 用来保存值
+ private class RocketMQCollector implements Collector {
+ private final Queue records = new ArrayDeque<>();
+ private boolean endOfStreamSignalled = false;
+
+ @Override
+ public void collect(OUT record) {
+ // do not emit subsequent elements if the end of the stream reached
+ if (endOfStreamSignalled || deserializer.isEndOfStream(record)) {
+ endOfStreamSignalled = true;
+ return;
+ }
+ records.add(record);
+ }
+
+ public Queue getRecords() {
+ return records;
+ }
+
+ public boolean isEndOfStreamSignalled() {
+ return endOfStreamSignalled;
+ }
+
+ public void setEndOfStreamSignalled(boolean endOfStreamSignalled) {
+ this.endOfStreamSignalled = endOfStreamSignalled;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+}
diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/RocketMQDeserializationSchema.java b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/RocketMQDeserializationSchema.java
new file mode 100644
index 0000000000000000000000000000000000000000..9e284650c1576c6676b29093cd8c50236c872b93
--- /dev/null
+++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/RocketMQDeserializationSchema.java
@@ -0,0 +1,22 @@
+package com.zto.fire.flink.sql.connector.rocketmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.io.Serializable;
+
+public interface RocketMQDeserializationSchema extends Serializable, ResultTypeQueryable {
+
+ default void open(DeserializationSchema.InitializationContext context) throws Exception {}
+
+ T deserialize(MessageExt message) throws Exception;
+
+ default void deserialize(MessageExt message, Collector collector) throws Exception {
+ T record = deserialize(message);
+ if (record != null) collector.collect(record);
+ }
+
+ boolean isEndOfStream(T record);
+}
diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
index 29ae2b4b164738fef0e2aa8f5aca7a93b7b74cfe..2088439d61446e1e6e72661b4a40f8a036550921 100644
--- a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
+++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
@@ -52,6 +52,7 @@ public class RocketMQConfig {
public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3;
+ public static final String OFFSET_AUTO_COMMIT = "offset.auto.commit";
public static final String PRODUCER_TIMEOUT = "producer.timeout";
public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/fire-connectors/flink-connectors/flink-rocketmq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index a49aff8a65c4fbe16505a29422c49a20dac406d5..0db6809a4432ce9e6d24c4998fef1bab20a84d7d 100644
--- a/fire-connectors/flink-connectors/flink-rocketmq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -1 +1,3 @@
-com.zto.fire.flink.sql.connector.rocketmq.RocketMQDynamicTableFactory
\ No newline at end of file
+com.zto.fire.flink.sql.connector.rocketmq.RocketMQDynamicTableFactory
+
+com.zto.fire.flink.sql.connector.rocketmq.FireRocketMQDynamicTableFactory
\ No newline at end of file
diff --git a/fire-engines/fire-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/fire-engines/fire-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index a49aff8a65c4fbe16505a29422c49a20dac406d5..0160d6485a3617bb24b7e4c0f1911a84bf3728fc 100644
--- a/fire-engines/fire-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/fire-engines/fire-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -1 +1,3 @@
-com.zto.fire.flink.sql.connector.rocketmq.RocketMQDynamicTableFactory
\ No newline at end of file
+com.zto.fire.flink.sql.connector.rocketmq.RocketMQDynamicTableFactory
+
+com.zto.fire.flink.formats.json.ZDTPJsonFormatFactory
\ No newline at end of file
diff --git a/fire-examples/flink-examples/pom.xml b/fire-examples/flink-examples/pom.xml
index 3467d24f25868d3ec79e83859738b705d4d50b5e..96eb8ad960eda7d28f2444ae3471e472c699c525 100644
--- a/fire-examples/flink-examples/pom.xml
+++ b/fire-examples/flink-examples/pom.xml
@@ -31,6 +31,11 @@
+
+ com.zto.fire
+ fire-connector-flink-format-zdtp_1.14_2.12
+ ${fire.version}
+
org.apache.kafka
kafka_${scala.binary.version}
diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/Test.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/Test.scala
index 68c40f6f3d9c9359de081737342d4dd867be9484..3148140c97fee3e9967621d61edbe27dc5d2d750 100644
--- a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/Test.scala
+++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/Test.scala
@@ -17,7 +17,7 @@
package com.zto.fire.examples.flink
-import com.zto.VersionTest
+//import com.zto.VersionTest
import com.zto.fire._
import com.zto.fire.common.anno.Config
import com.zto.fire.common.enu.Operation
@@ -42,20 +42,20 @@ object Test extends FlinkStreaming {
@Process
def kafkaSource: Unit = {
- println("version=" + VersionTest.version())
- val stream = this.fire.addSourceLineage(new SourceFunction[Int] {
- val random = new Random()
- override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
- while (true) {
- ctx.collect(random.nextInt(1000))
- Thread.sleep(1000)
- }
- }
-
- override def cancel(): Unit = ???
- })(MQDatasource("kafka", "localhost:9092", "fire", "fire"), Operation.SOURCE)
-
-
- stream.print()
+// println("version=" + VersionTest.version())
+// val stream = this.fire.addSourceLineage(new SourceFunction[Int] {
+// val random = new Random()
+// override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
+// while (true) {
+// ctx.collect(random.nextInt(1000))
+// Thread.sleep(1000)
+// }
+// }
+//
+// override def cancel(): Unit = ???
+// })(MQDatasource("kafka", "localhost:9092", "fire", "fire"), Operation.SOURCE)
+//
+//
+// stream.print()
}
}
\ No newline at end of file
diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/ZTORouteBillCDC.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/ZTORouteBillCDC.scala
new file mode 100644
index 0000000000000000000000000000000000000000..4795c58a8780a8469758308a05412cead66301bd
--- /dev/null
+++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/ZTORouteBillCDC.scala
@@ -0,0 +1,320 @@
+package com.zto.fire.examples.flink
+
+import com.zto.fire.common.anno.Config
+import com.zto.fire.flink.FlinkStreaming
+import com.zto.fire.flink.anno.Streaming
+import com.zto.fire._
+
+/**
+ * 运单宽表binlog cdc 数据
+ */
+
+@Config(
+ """
+ |flink.stream.number.execution.retries=3
+ |flink.stream.time.characteristic=ProcessingTime
+ |flink.force.kryo.enable=false
+ |flink.force.avro.enable=false
+ |flink.generic.types.enable=true
+ |flink.execution.mode=pipelined
+ |flink.auto.type.registration.enable=true
+ |flink.auto.generate.uid.enable=true
+ |flink.max.parallelism=1536
+ |""")
+@Streaming(interval = 300, timeout = 1800, unaligned = false, pauseBetween = 180,
+ concurrent = 1, failureNumber = 3, parallelism = 3, autoStart = true, disableOperatorChaining = true)
+object ZTORouteBillCDC extends FlinkStreaming {
+ override def process(): Unit = {
+
+// val dbTable = parameter.get("dbTable", "lyznhb_ml.zto_route_bill_cdc_test")
+// println(s"dbTable : $dbTable")
+
+ val rocketMQTableName = "zto_route_bill_cdc"
+ val kafkaSql =
+ s"""
+ |create table $rocketMQTableName (
+ | `row_kind` STRING METADATA FROM 'value.row_kind' VIRTUAL,
+ | `mq_topic` STRING METADATA FROM 'topic' VIRTUAL,
+ | `mq_broker` STRING METADATA FROM 'broker' VIRTUAL,
+ | `mq_queue_id` INT METADATA FROM 'queue_id' VIRTUAL,
+ | `mq_offset` BIGINT METADATA FROM 'offset' VIRTUAL,
+ | `mq_timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
+ | `id` bigint,
+ | `customize_id` STRING,
+ | `bill_code` STRING,
+ | `order_code` STRING,
+ | `fst_code` STRING,
+ | `operate_weight` decimal(10, 2),
+ | `estimate_weight` decimal(10, 2),
+ | `estimate_volume` decimal(10, 2),
+ | `goods_type` STRING,
+ | `rec_site_id` BIGINT,
+ | `disp_site_id` BIGINT,
+ | `rec_scan_time` STRING,
+ | `disp_scan_time` STRING,
+ | `sign_scan_time` STRING,
+ | `plan_sign_time` STRING,
+ | `route_code` STRING,
+ | `route_index` INT,
+ | `route_type` STRING,
+ | `ori_site_id` BIGINT,
+ | `des_site_id` BIGINT,
+ | `ori_unload_code` STRING,
+ | `des_unload_code` STRING,
+ | `send_site_id` BIGINT,
+ | `come_site_id` BIGINT,
+ | `send_scan_time` STRING,
+ | `come_scan_time` STRING,
+ | `last_send_time` STRING,
+ | `last_come_time` STRING,
+ | `crossing_type` INT,
+ | `line_name` STRING,
+ | `line_number` STRING,
+ | `time_cost` INT,
+ | `transport_type` STRING,
+ | `unload_port` STRING,
+ | `owner_bag_no` STRING,
+ | `exec_status` INT,
+ | `truck_number` STRING,
+ | `truck_tail_number` STRING,
+ | `truck_sign_code` STRING,
+ | `batch_number` STRING,
+ | `schedule_id` STRING,
+ | `plan_send_time` STRING,
+ | `plan_arrive_time` STRING,
+ | `actual_send_time` STRING,
+ | `actual_arrive_time` STRING,
+ | `start_hours` STRING,
+ | `operate_time` INT,
+ | `clear_store_time` STRING,
+ | `receive_time` STRING,
+ | `dispatch_time` STRING,
+ | `exception_type` INT,
+ | `event_id` STRING,
+ | `event_type` INT,
+ | `is_blocked` INT,
+ | `is_deleted` INT,
+ | `remark` STRING,
+ | `created_by` STRING,
+ | `created_on` STRING,
+ | `modified_by` STRING,
+ | `modified_on` STRING,
+ | `subarea_time` STRING,
+ | `disp_way_in_time` STRING,
+ | `route_situation` STRING,
+ | `is_problem` INT,
+ | `first_disp_time` STRING,
+ | `first_disp_way_in_time` STRING,
+ | `promise_time` STRING,
+ | `intercept_status` INT,
+ | `value_added_type` INT,
+ | `last_rec_scan_time` STRING,
+ | `clerk_rec_scan_time` STRING,
+ | `packet_exception_type` BIGINT,
+ | `packet_exception_effect` BIGINT,
+ | `packet_exception_time` STRING,
+ | `assessment_type` INT,
+ | `assessment_site_id` BIGINT,
+ | `predicate_assessment_time` STRING
+ |) WITH (
+ | 'connector' = 'rocketmq',
+ | 'topic' = 'ROUTE_BILL_BINLOG',
+ | 'properties.bootstrap.servers' = '10.9.15.40:9876;10.9.63.237:9876',
+ | 'properties.group.id' = 'ROUTE_BILL_BINLOG_FLINK_CONSUMER_VERIFY',
+ | 'scan.startup.mode' = 'earliest-offset',
+ | 'zdtp-json.cdc-write-hive' = 'false',
+ | 'format' = 'zdtp-json'
+ | )
+ """.stripMargin
+
+ this.fire.sql(kafkaSql)
+
+ val printSql =
+ """
+ |create table print_table(
+ | `row_kind` STRING,
+ | `mq_topic` STRING,
+ | `mq_broker` STRING ,
+ | `mq_queue_id` INT ,
+ | `mq_offset` BIGINT ,
+ | `mq_timestamp` STRING ,
+ | `id` bigint,
+ | `customize_id` STRING,
+ | `bill_code` STRING,
+ | `order_code` STRING,
+ | `fst_code` STRING,
+ | `operate_weight` decimal(10, 2),
+ | `estimate_weight` decimal(10, 2),
+ | `estimate_volume` decimal(10, 2),
+ | `goods_type` STRING,
+ | `rec_site_id` BIGINT,
+ | `disp_site_id` BIGINT,
+ | `rec_scan_time` STRING,
+ | `disp_scan_time` STRING,
+ | `sign_scan_time` STRING,
+ | `plan_sign_time` STRING,
+ | `route_code` STRING,
+ | `route_index` INT,
+ | `route_type` STRING,
+ | `ori_site_id` BIGINT,
+ | `des_site_id` BIGINT,
+ | `ori_unload_code` STRING,
+ | `des_unload_code` STRING,
+ | `send_site_id` BIGINT,
+ | `come_site_id` BIGINT,
+ | `send_scan_time` STRING,
+ | `come_scan_time` STRING,
+ | `last_send_time` STRING,
+ | `last_come_time` STRING,
+ | `crossing_type` INT,
+ | `line_name` STRING,
+ | `line_number` STRING,
+ | `time_cost` INT,
+ | `transport_type` STRING,
+ | `unload_port` STRING,
+ | `owner_bag_no` STRING,
+ | `exec_status` INT,
+ | `truck_number` STRING,
+ | `truck_tail_number` STRING,
+ | `truck_sign_code` STRING,
+ | `batch_number` STRING,
+ | `schedule_id` STRING,
+ | `plan_send_time` STRING,
+ | `plan_arrive_time` STRING,
+ | `actual_send_time` STRING,
+ | `actual_arrive_time` STRING,
+ | `start_hours` STRING,
+ | `operate_time` INT,
+ | `clear_store_time` STRING,
+ | `receive_time` STRING,
+ | `dispatch_time` STRING,
+ | `exception_type` INT,
+ | `event_id` STRING,
+ | `event_type` INT,
+ | `is_blocked` INT,
+ | `is_deleted` INT,
+ | `remark` STRING,
+ | `created_by` STRING,
+ | `created_on` STRING,
+ | `modified_by` STRING,
+ | `modified_on` STRING,
+ | `subarea_time` STRING,
+ | `disp_way_in_time` STRING,
+ | `route_situation` STRING,
+ | `is_problem` INT,
+ | `first_disp_time` STRING,
+ | `first_disp_way_in_time` STRING,
+ | `promise_time` STRING,
+ | `intercept_status` INT,
+ | `value_added_type` INT,
+ | `last_rec_scan_time` STRING,
+ | `clerk_rec_scan_time` STRING,
+ | `packet_exception_type` BIGINT,
+ | `packet_exception_effect` BIGINT,
+ | `packet_exception_time` STRING,
+ | `assessment_type` INT,
+ | `assessment_site_id` BIGINT,
+ | `predicate_assessment_time` STRING
+ |) WITH (
+ | 'connector' = 'print'
+ |)
+ |""".stripMargin
+
+ this.fire.sql(printSql)
+ // checkToHive()
+
+ val insertSql =
+ s"""
+ |insert into
+ |print_table
+ |select
+ | row_kind,
+ | mq_topic,
+ | mq_broker,
+ | mq_queue_id,
+ | mq_offset,
+ | DATE_FORMAT(mq_timestamp, 'yyyy-MM-dd HH:mm:ss.SSS') mq_timestamp,
+ | id,
+ | customize_id,
+ | bill_code,
+ | order_code,
+ | fst_code,
+ | operate_weight,
+ | estimate_weight,
+ | estimate_volume,
+ | goods_type,
+ | rec_site_id,
+ | disp_site_id,
+ | rec_scan_time,
+ | disp_scan_time,
+ | sign_scan_time,
+ | plan_sign_time,
+ | route_code,
+ | route_index,
+ | route_type,
+ | ori_site_id,
+ | des_site_id,
+ | ori_unload_code,
+ | des_unload_code,
+ | send_site_id,
+ | come_site_id,
+ | send_scan_time,
+ | come_scan_time,
+ | last_send_time,
+ | last_come_time,
+ | crossing_type,
+ | line_name,
+ | line_number,
+ | time_cost,
+ | transport_type,
+ | unload_port,
+ | owner_bag_no,
+ | exec_status,
+ | truck_number,
+ | truck_tail_number,
+ | truck_sign_code,
+ | batch_number,
+ | schedule_id,
+ | plan_send_time,
+ | plan_arrive_time,
+ | actual_send_time,
+ | actual_arrive_time,
+ | start_hours,
+ | operate_time,
+ | clear_store_time,
+ | receive_time,
+ | dispatch_time,
+ | exception_type,
+ | event_id,
+ | event_type,
+ | is_blocked,
+ | is_deleted,
+ | remark,
+ | created_by,
+ | created_on,
+ | modified_by,
+ | modified_on,
+ | subarea_time,
+ | disp_way_in_time,
+ | route_situation,
+ | is_problem,
+ | first_disp_time,
+ | first_disp_way_in_time,
+ | promise_time,
+ | intercept_status,
+ | value_added_type,
+ | last_rec_scan_time,
+ | clerk_rec_scan_time,
+ | packet_exception_type,
+ | packet_exception_effect,
+ | packet_exception_time,
+ | assessment_type,
+ | assessment_site_id,
+ | predicate_assessment_time
+ |from
+ |default_catalog.default_database.$rocketMQTableName
+ """.stripMargin
+ this.fire.sql(insertSql)
+
+ }
+}
diff --git a/fire-examples/spark-examples/src/main/scala/com/zto/fire/examples/spark/Test.scala b/fire-examples/spark-examples/src/main/scala/com/zto/fire/examples/spark/Test.scala
index c76c877381dd1d400bbd9f8605ae8c0d634f052d..bd606b11efc698c26fba82360cb12efe7416bf88 100644
--- a/fire-examples/spark-examples/src/main/scala/com/zto/fire/examples/spark/Test.scala
+++ b/fire-examples/spark-examples/src/main/scala/com/zto/fire/examples/spark/Test.scala
@@ -46,10 +46,10 @@ object Test extends SparkStreaming {
val stream = this.fire.createKafkaDirectStream()
stream.foreachRDD(rdd => {
rdd.foreachPartition(it => {
- LineageManager.addPrintLineage(Operation.SINK)
+// LineageManager.addPrintLineage(Operation.SINK)
})
})
- LineageManager.addMySQLLineage("jdbc://localhost:3306/fire", "t_user", "root", Operation.INSERT_INTO)
+// LineageManager.addMySQLLineage("jdbc://localhost:3306/fire", "t_user", "root", Operation.INSERT_INTO)
stream.print()
LineageManager.addSql("""select * from tmp.baseorganize""")
LineageManager.print(10)