diff --git a/fire-connectors/flink-connectors/flink-format/zdtp-format/pom.xml b/fire-connectors/flink-connectors/flink-format/zdtp-format/pom.xml index 86916cf4c46c6cca0c9bebe78192875598be8c86..0cf31d9ac2ed0b1e5cd42d8d444ca559e180b315 100644 --- a/fire-connectors/flink-connectors/flink-format/zdtp-format/pom.xml +++ b/fire-connectors/flink-connectors/flink-format/zdtp-format/pom.xml @@ -15,4 +15,31 @@ UTF-8 + + + org.apache.flink + flink-json + ${flink.version} + ${maven.scope} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + src/main/resources + true + + + diff --git a/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDecodingFormat.java b/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDecodingFormat.java new file mode 100644 index 0000000000000000000000000000000000000000..b0a6c9cd5dd3ba273a73c29a06b21a55dd9787e7 --- /dev/null +++ b/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDecodingFormat.java @@ -0,0 +1,215 @@ +package com.zto.fire.flink.formats.json; + +import com.zto.fire.flink.formats.json.ZDTPJsonDeserializationSchema.MetadataConverter; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +public class ZDTPJsonDecodingFormat implements DecodingFormat> { + private List metadataKeys; + private final boolean ignoreParseErrors; + + private final boolean cdcWriteHive; + + private final TimestampFormat timestampFormatOption; + + public ZDTPJsonDecodingFormat(boolean ignoreParseErrors, boolean cdcWriteHive, TimestampFormat timestampFormatOption) { + this.ignoreParseErrors = ignoreParseErrors; + this.cdcWriteHive = cdcWriteHive; + this.timestampFormatOption = timestampFormatOption; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public DeserializationSchema createRuntimeDecoder(DynamicTableSource.Context context, DataType physicalDataType) { + + final List readableMetadata = + Stream.of(ReadableMetadata.values()) + .filter(m -> metadataKeys.contains(m.key)) + .collect(Collectors.toList()); + + + final List metadataFields = + readableMetadata.stream() + .map(m -> DataTypes.FIELD(m.key, m.dataType)) + .collect(Collectors.toList()); + + final DataType producedDataType = DataTypeUtils.appendRowFields(physicalDataType, metadataFields); + + final TypeInformation producedTypeInfo = + context.createTypeInformation(producedDataType); + + + return new ZDTPJsonDeserializationSchema( + physicalDataType, readableMetadata, producedTypeInfo, ignoreParseErrors, timestampFormatOption); + } + + @Override + public ChangelogMode getChangelogMode() { + return cdcWriteHive ? ChangelogMode.insertOnly() : ChangelogMode.all(); + } + + @Override + public Map listReadableMetadata() { + final Map metadataMap = new LinkedHashMap<>(); + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.put(m.key, m.dataType)); + return metadataMap; + } + + // 所有 json中的metadataKeys + @Override + public void applyReadableMetadata(List metadataKeys) { + this.metadataKeys = metadataKeys; + } + + /** + * zdtp 字段中的metadata + * { + * "schema":"route_order", + * "table":"route_order_001", + * "gtid":"ldjfdlfj-ldfjdl", + * "logFile":"0002334.bin", + * "offset":"dfle-efe", + * "pos":12384384, + * "when":123943434383, + * "before":{}, + * "after":{} + * } + */ + enum ReadableMetadata { + SCHEMA( + "schema", + DataTypes.STRING().nullable(), + DataTypes.FIELD("schema", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getString(pos); + } + }), + TABLE( + "table", + DataTypes.STRING().nullable(), + DataTypes.FIELD("table", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getString(pos); + } + }), + GTID( + "gtid", + DataTypes.STRING().nullable(), + DataTypes.FIELD("gtid", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getString(pos); + } + }), + LOGFILE( + "logFile", + DataTypes.STRING().nullable(), + DataTypes.FIELD("logFile", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getString(pos); + } + }), + OFFSET( + "offset", + DataTypes.STRING().nullable(), + DataTypes.FIELD("offset", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getString(pos); + } + }), + POS( + "pos", + DataTypes.BIGINT().nullable(), + DataTypes.FIELD("pos", DataTypes.BIGINT()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getLong(pos); + } + }), + WHEN( + "when", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), + DataTypes.FIELD("when", DataTypes.BIGINT()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + if (row.isNullAt(pos)) return null; + return TimestampData.fromEpochMillis(row.getLong(pos)); + } + }), + // 将binlog中的RowKind带出去,当作元数据 + ROW_KIND( + "row_kind", + DataTypes.STRING().notNull(), + DataTypes.FIELD("row_kind", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + @Override + public Object convert(GenericRowData row, int pos) { + + return StringData.fromString(row.getRowKind().toString()); + } + } + ) + ; + + final String key; + + final DataType dataType; + + final DataTypes.Field requiredJsonField; + + final MetadataConverter converter; + + ReadableMetadata(String key, DataType dataType, DataTypes.Field requiredJsonField, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.requiredJsonField = requiredJsonField; + this.converter = converter; + } + } +} diff --git a/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDeserializationSchema.java b/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDeserializationSchema.java new file mode 100644 index 0000000000000000000000000000000000000000..13e3318a061697670dd76ac5649da7111f883dd5 --- /dev/null +++ b/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDeserializationSchema.java @@ -0,0 +1,214 @@ +package com.zto.fire.flink.formats.json; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static java.lang.String.format; + + +public class ZDTPJsonDeserializationSchema implements DeserializationSchema { + private static final long serialVersionUID = 1L; + private static final String OP_TYPE = "op_type"; + private static final String BEFORE = "before"; + private static final String AFTER = "after"; + private static final String OP_INSERT = "I"; + private static final String OP_UPDATE = "U"; + private static final String OP_DELETE = "D"; + + /** Flag that indicates that an additional projection is required for metadata. */ + private final boolean hasMetadata; + + private final JsonRowDataDeserializationSchema jsonDeserializer; + + /** Metadata to be extracted for every record. */ + private final MetadataConverter[] metadataConverters; + + private final TypeInformation producedTypeInfo; + + private final List fieldNames; + + private final boolean ignoreParseErrors; + + + private final int fieldCount; + + public ZDTPJsonDeserializationSchema( + DataType physicalDataType, + List readableMetadata, + TypeInformation producedTypeInfo, + boolean ignoreParseErrors, + TimestampFormat timestampFormatOption) { + final RowType jsonRowType = createJsonRowType(physicalDataType, readableMetadata); + this.jsonDeserializer = + new JsonRowDataDeserializationSchema( + jsonRowType, + producedTypeInfo, + false, + ignoreParseErrors, + timestampFormatOption); + this.hasMetadata = readableMetadata.size() > 0; + + this.metadataConverters = createMetadataConverters(jsonRowType, readableMetadata); + + this.producedTypeInfo = producedTypeInfo; + + this.ignoreParseErrors = ignoreParseErrors; + + final RowType physicalRowType = (RowType) physicalDataType.getLogicalType(); + + this.fieldNames = physicalRowType.getFieldNames(); + + this.fieldCount = physicalRowType.getFieldCount(); + } + @Override + public RowData deserialize(byte[] message) throws IOException { + throw new RuntimeException( + "Please invoke DeserializationSchema#deserialize(byte[], Collector) instead."); + } + + @Override + public void deserialize(byte[] message, Collector collector) throws IOException { + try { + GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message); + String type = row.getString(0).toString(); + GenericRowData before = (GenericRowData) row.getField(1); // before filed + GenericRowData after = (GenericRowData) row.getField(2); // after field + if (OP_INSERT.equals(type)) { + // "data" field is a row, contains inserted rows + after.setRowKind(RowKind.INSERT); + emitRow(row, after, collector); + } else if (OP_UPDATE.equals(type)) { + before.setRowKind(RowKind.UPDATE_BEFORE); + after.setRowKind(RowKind.UPDATE_AFTER); + emitRow(row, before, collector); + emitRow(row, after, collector); + } else if (OP_DELETE.equals(type)) { + before.setRowKind(RowKind.DELETE); + emitRow(row, before, collector); + } else { + if (!ignoreParseErrors) { + throw new IOException( + format( + "Unknown \"type\" value \"%s\". The Maxwell JSON message is '%s'", + type, new String(message))); + } + } + }catch (Throwable t) { + if (!ignoreParseErrors) { + throw new IOException( + format("Corrupt ZDTP JSON message '%s'.", new String(message)), t); + } + } + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return producedTypeInfo; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + + if (obj == null || getClass() != obj.getClass()) return false; + + ZDTPJsonDeserializationSchema that = (ZDTPJsonDeserializationSchema) obj; + return ignoreParseErrors == that.ignoreParseErrors + && fieldCount == that.fieldCount + && Objects.equals(jsonDeserializer, that.jsonDeserializer) + && Objects.equals(producedTypeInfo, that.producedTypeInfo); + } + + @Override + public int hashCode() { + return Objects.hash( + jsonDeserializer, + producedTypeInfo, + ignoreParseErrors, + fieldCount); + } + + private void emitRow(GenericRowData rootRow, GenericRowData physicalRow, Collector collector) { + if (!hasMetadata) { + collector.collect(physicalRow); + return; + } + int physicalArity = physicalRow.getArity(); + int metadataArity = metadataConverters.length; + + GenericRowData producedRow = new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity); + + for (int physicalPos = 0; physicalPos < physicalArity; physicalPos ++) { + producedRow.setField(physicalPos, physicalRow.getField(physicalPos)); + } + RowKind rootRowRowKind = rootRow.getRowKind(); + // `row_kind` STRING METADATA FROM 'value.row_kind' VIRTUAL 从rootRow convert 元数据时, + // 由于rootRow的RowKind总是为 INSERT, 获取不到预期的结果,所以每次设置为physicalRow的 RowKind + rootRow.setRowKind(physicalRow.getRowKind()); + for (int metadataPos = 0; metadataPos < metadataArity; metadataPos ++) { + producedRow.setField(physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow)); + } + // 还原rootRow的RowKind + rootRow.setRowKind(rootRowRowKind); + collector.collect(producedRow); + } + + private static RowType createJsonRowType( + DataType physicalDataType, List readableMetadata) { + + DataType root = + DataTypes.ROW( + DataTypes.FIELD(OP_TYPE, DataTypes.STRING()), + DataTypes.FIELD(BEFORE, physicalDataType), + DataTypes.FIELD(AFTER, physicalDataType)); + // append fields that are required for reading metadata in the root + final List rootMetadataFields = + readableMetadata.stream() + .map(m -> m.requiredJsonField) + .distinct() + .collect(Collectors.toList()); + return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType(); + } + + private static MetadataConverter[] createMetadataConverters( + RowType jsonRowType, List requestedMetadata) { + return requestedMetadata.stream() + .map(m -> convert(jsonRowType, m)) +// .map(m -> (MetadataConverter) (row, pos) -> m.converter.convert(row, jsonRowType.getFieldNames().indexOf(m.requiredJsonField.getName()))) + .toArray(MetadataConverter[]::new); + } + + private static MetadataConverter convert(RowType jsonRowType, ZDTPJsonDecodingFormat.ReadableMetadata metadata) { + final int pos = jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName()); + return (root, unused) -> metadata.converter.convert(root, pos); + } + + interface MetadataConverter extends Serializable { + default Object convert(GenericRowData row) { + return convert(row, -1); + } + + Object convert(GenericRowData row, int pos); + } +} diff --git a/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonFormatFactory.java b/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonFormatFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..b2b04f57dce509ab2642842f54091d8dca84a5e7 --- /dev/null +++ b/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonFormatFactory.java @@ -0,0 +1,69 @@ +package com.zto.fire.flink.formats.json; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonFormatOptionsUtil; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static com.zto.fire.flink.formats.json.ZDTPJsonOptions.*; + + +/** + * 描述:flink 官网只提供了 canal maxwell debezium三种CDC connector, + * 由于目前使用的是中通自己开发的ZDTP 组件采集的binlog, + * 所以要写一个关于ZDTP组件来解析CDC消息, + * create table rocket_mq_table( + * `bill_code` string, + * `row_kind` STRING METADATA FROM 'value.row_kind' VIRTUAL, // 从binlog中提取的rowKind + * `mq_topic` STRING METADATA FROM 'topic' VIRTUAL, // topic + * `mq_broker` STRING METADATA FROM 'broker' VIRTUAL, // broker + * `id` bigint, + * customize_id string + * ) WITH ( + * 'connector' = '...', + * 'format' = 'zdtp-json', + * 'zdtp-json.cdc-write-hive' = 'true', // 添加此参数,下游可以直接insert 到hive + * ...... + * ) + */ +public class ZDTPJsonFormatFactory implements DeserializationFormatFactory { + private static final String IDENTIFIER = "zdtp-json"; + @Override + public DecodingFormat> createDecodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS); + final boolean cdcWriteHive = formatOptions.get(CDC_WRITE_HIVE); + final TimestampFormat timestampFormatOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions); + + return new ZDTPJsonDecodingFormat(ignoreParseErrors, cdcWriteHive, timestampFormatOption); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(IGNORE_PARSE_ERRORS); + options.add(TIMESTAMP_FORMAT); + options.add(JSON_MAP_NULL_KEY_MODE); + options.add(JSON_MAP_NULL_KEY_LITERAL); + return options; + } +} diff --git a/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonOptions.java b/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonOptions.java new file mode 100644 index 0000000000000000000000000000000000000000..d3dcbd972a236eefabe1f7d05ca76972420b2792 --- /dev/null +++ b/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonOptions.java @@ -0,0 +1,25 @@ +package com.zto.fire.flink.formats.json; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.formats.json.JsonFormatOptions; + + +public class ZDTPJsonOptions { + public static final ConfigOption IGNORE_PARSE_ERRORS = JsonFormatOptions.IGNORE_PARSE_ERRORS; + + public static final ConfigOption TIMESTAMP_FORMAT = JsonFormatOptions.TIMESTAMP_FORMAT; + + public static final ConfigOption JSON_MAP_NULL_KEY_MODE = JsonFormatOptions.MAP_NULL_KEY_MODE; + + public static final ConfigOption JSON_MAP_NULL_KEY_LITERAL = JsonFormatOptions.MAP_NULL_KEY_LITERAL; + + public static final ConfigOption CDC_WRITE_HIVE = + ConfigOptions.key("cdc-write-hive") + .booleanType() + .defaultValue(false) + .withDescription( + "原生的maxwell-json canal-json debezium-json 因为其getChangelogMode " + + "返回值为ChangelogMode.all(),当下游是hive时,flink框架会判断报错,因为hive" + + "只支持insert模式,添加这个参数主要针对binlog写hive的场景。"); +} diff --git a/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000000000000000000000000000000000..e6c70e15c5f34e6c04d08b34e6786a236a4bc7d3 --- /dev/null +++ b/fire-connectors/flink-connectors/flink-format/zdtp-format/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,3 @@ +## 解析中通zdtp采集的binlog,添加了带出binlog中RowKind功能以及在table properties中添加参数, +## 可以让binlog下游写hive +com.zto.fire.flink.formats.json.ZDTPJsonFormatFactory diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/DynamicRocketMQDeserializationSchema.java b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/DynamicRocketMQDeserializationSchema.java new file mode 100644 index 0000000000000000000000000000000000000000..cc567ab147a6c16b5a0a7b42f147ecacf0881c4e --- /dev/null +++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/DynamicRocketMQDeserializationSchema.java @@ -0,0 +1,247 @@ +package com.zto.fire.flink.sql.connector.rocketmq; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.DeserializationException; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.apache.rocketmq.common.message.MessageExt; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + + +public class DynamicRocketMQDeserializationSchema implements RocketMQDeserializationSchema { + + private final @Nullable DeserializationSchema keyDeserialization; + + private final DeserializationSchema valueDeserialization; + + private final boolean hasMetadata; + + private final KeyBufferCollector keyBufferCollector; + + private final OutputProjectionCollector outputCollector; + + private final TypeInformation producedTypeInfo; + + private final boolean upsertMode; + + public DynamicRocketMQDeserializationSchema( + int physicalArity, + DeserializationSchema keyDeserialization, + int[] keyProjection, + DeserializationSchema valueDeserialization, + int[] valueProjection, + boolean hasMetadata, + MetadataConverter[] metadataConverters, + TypeInformation producedTypeInfo, + boolean upsertMode) { + this.keyDeserialization = keyDeserialization; + this.valueDeserialization = valueDeserialization; + this.hasMetadata = hasMetadata; + this.producedTypeInfo = producedTypeInfo; + this.keyBufferCollector = new KeyBufferCollector(); + this.outputCollector = new OutputProjectionCollector( + physicalArity, keyProjection, valueProjection, metadataConverters, upsertMode); + + this.upsertMode = upsertMode; + } + + @Override + public void open(DeserializationSchema.InitializationContext context) throws Exception { + if (keyDeserialization != null) keyDeserialization.open(context); + + valueDeserialization.open(context); + } + + @Override + public RowData deserialize(MessageExt message) throws Exception { + throw new IllegalStateException("A collector is required for deserializing."); + } + + @Override + public void deserialize(MessageExt message, Collector collector) throws IOException { + // shortcut in case no output projection is required, + // also not for a cartesian product with the keys + if (keyDeserialization == null && !hasMetadata) { + valueDeserialization.deserialize(message.getBody(), collector); + return; + } + // buffer key(s) + if (keyDeserialization != null) { + keyDeserialization.deserialize(message.getKeys().getBytes(), keyBufferCollector); + } + + // project output while emitting values + outputCollector.inputRecord = message; + outputCollector.physicalKeyRows = keyBufferCollector.buffer; + outputCollector.outputCollector = collector; + + if (message.getBody() == null && upsertMode) { + // collect tombstone messages in upsert mode by hand + outputCollector.collect(null); + } else { + valueDeserialization.deserialize(message.getBody(), outputCollector); + } + keyBufferCollector.buffer.clear(); + } + + @Override + public boolean isEndOfStream(RowData record) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return producedTypeInfo; + } + + interface MetadataConverter extends Serializable { + Object read(MessageExt messageExt); + } + + private static final class KeyBufferCollector implements Collector, Serializable { + private static final long serialVersionUID = 1L; + private final List buffer = new ArrayList<>(); + @Override + public void collect(RowData record) { + buffer.add(record); + } + + @Override + public void close() { + // do nothing + } + } + + private static final class OutputProjectionCollector + implements Collector, Serializable { + + private static final long serialVersionUID = 1L; + + private final int physicalArity; + + private final int[] keyProjection; + + private final int[] valueProjection; + + private final MetadataConverter[] metadataConverters; + + private final boolean upsertMode; + + private transient MessageExt inputRecord; + + private transient List physicalKeyRows; + + private transient Collector outputCollector; + + OutputProjectionCollector( + int physicalArity, + int[] keyProjection, + int[] valueProjection, + MetadataConverter[] metadataConverters, + boolean upsertMode) { + this.physicalArity = physicalArity; + this.keyProjection = keyProjection; + this.valueProjection = valueProjection; + this.metadataConverters = metadataConverters; + this.upsertMode = upsertMode; + } + + @Override + public void collect(RowData physicalValueRow) { + // no key defined + if (keyProjection.length == 0) { + emitRow(null, (GenericRowData) physicalValueRow); + return; + } + + // otherwise emit a value for each key + for (RowData physicalKeyRow : physicalKeyRows) { + emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow); + } + } + + @Override + public void close() { + // nothing to do + } + + private void emitRow( + @Nullable GenericRowData physicalKeyRow, + @Nullable GenericRowData physicalValueRow) { + final RowKind rowKind; + if (physicalValueRow == null) { + if (upsertMode) { + rowKind = RowKind.DELETE; + } else { + throw new DeserializationException( + "Invalid null value received in non-upsert mode. Could not to set row kind for output record."); + } + } else { + rowKind = physicalValueRow.getRowKind(); + } + + final int metadataArity = metadataConverters.length; + // key中的值和 before after 中的值位置不变, value中的metadata 放后面,connector中的metadata放最后 + /** + * create table orders( + * `tb` STRING metadata FROM 'value.table', + * id string, + * `db` STRING METADATA FROM 'value.schema', + * name STRING, + * k_age int, + * description string, + * `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', + * k_kk string, + * weight float + * ) WITH ( + * 'connector' = 'rocket-mq', + * 'topic' = 'test0', + * 'properties.bootstrap.servers' = 'localhost:9876', + * 'properties.group.id' = 'test_read', + * 'scan.startup.mode' = 'latest-offset', + * 'format' = 'maxwell-json', + * 'key.format' = 'json', + * 'key.fields-prefix' = 'k_', + * 'key.fields' = 'k_age;k_kk', + * 'value.fields-include' = 'EXCEPT_KEY' + * ) + * + * 应该位置如下设置值 + * + * 0 1 2 3 4 5 6 7 8 + * id, name, k_age, description, k_kk, weight, tb, db, event_time + */ + + final GenericRowData producedRow = + new GenericRowData(rowKind, physicalArity + metadataArity); + + for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) { + assert physicalKeyRow != null; + producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos)); + } + + if (physicalValueRow != null) { + for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) { + producedRow.setField( + valueProjection[valuePos], physicalValueRow.getField(valuePos)); + } + } + + // 为什么connector 的metadata在最后一位? + for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) { + producedRow.setField( + physicalArity + metadataPos, + metadataConverters[metadataPos].read(inputRecord)); + } + outputCollector.collect(producedRow); + } + } +} diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQDynamicSource.java b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQDynamicSource.java new file mode 100644 index 0000000000000000000000000000000000000000..a4d91fac968bd9d620fff90fcbbe3e9f7572f758 --- /dev/null +++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQDynamicSource.java @@ -0,0 +1,367 @@ +package com.zto.fire.flink.sql.connector.rocketmq; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.rocketmq.common.message.MessageExt; + +import javax.annotation.Nullable; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * , SupportsReadingMetadata, SupportsWatermarkPushDown + */ +public class FireRocketMQDynamicSource implements ScanTableSource + , SupportsReadingMetadata + , SupportsWatermarkPushDown { + private static final String VALUE_METADATA_PREFIX = "value."; + + /** + * Data type that describes the final output of the source. + */ + protected DataType producedDataType; + + private final DataType physicalDataType; + + private final DecodingFormat> keyDecodingFormat; + + private final DecodingFormat> valueDecodingFormat; + + protected final int[] keyProjection; + + /** + * Indices that determine the value fields and the target position in the produced row. + */ + protected final int[] valueProjection; + + /** + * Prefix that needs to be removed from fields when constructing the physical data type. + */ + protected final @Nullable + String keyPrefix; + + /** + * Watermark strategy that is used to generate per-partition watermark. + */ + protected @Nullable + WatermarkStrategy watermarkStrategy; + + private final Properties properties; + + private final boolean upsertMode; + + private List metadataKeys; + + public FireRocketMQDynamicSource( + DataType physicalDataType, + DecodingFormat> keyDecodingFormat, + DecodingFormat> valueDecodingFormat, + int[] keyProjection, + int[] valueProjection, + String keyPrefix, + Properties properties, + boolean upsertMode) { + this.physicalDataType = physicalDataType; + this.producedDataType = physicalDataType; + this.keyDecodingFormat = keyDecodingFormat; + this.valueDecodingFormat = valueDecodingFormat; + this.keyProjection = keyProjection; + this.valueProjection = valueProjection; + this.keyPrefix = keyPrefix; + this.properties = properties; + this.upsertMode = upsertMode; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public ChangelogMode getChangelogMode() { + return valueDecodingFormat.getChangelogMode(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + final DeserializationSchema keyDeserialization = + createDeserialization(runtimeProviderContext, keyDecodingFormat, keyProjection, keyPrefix); + + final DeserializationSchema valueDeserialization = + createDeserialization(runtimeProviderContext, valueDecodingFormat, valueProjection, null); + + final TypeInformation typeInformation = runtimeProviderContext.createTypeInformation(physicalDataType); + + + final FireRocketMQSource sourceFunction = createRocketMQConsumer(keyDeserialization, valueDeserialization, typeInformation); + + return SourceFunctionProvider.of(sourceFunction, false); + } + + @Override + public DynamicTableSource copy() { + FireRocketMQDynamicSource copy = new FireRocketMQDynamicSource( + physicalDataType, keyDecodingFormat, valueDecodingFormat, keyProjection, valueProjection, keyPrefix, properties, upsertMode); + copy.producedDataType = producedDataType; + copy.metadataKeys = metadataKeys; + copy.watermarkStrategy = watermarkStrategy; + return copy; + } + + @Override + public String asSummaryString() { + return "Rocket MQ Table Source"; + } + + @Override + public Map listReadableMetadata() { + + final Map metadataMap = new LinkedHashMap<>(); + + // according to convention, the order of the final row must be + // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA + // where the format metadata has highest precedence + + // add value format metadata with prefix + valueDecodingFormat + .listReadableMetadata() + .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value)); + + // 添加 连接器的 metadata,此处为rocket mq + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType)); + + return metadataMap; + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + // 分开连接器和 json中自带的metadata + + // 找到json中可以提取的 metadata 的所有 key + final List formatMetadataKeys = + metadataKeys.stream() + .filter(k -> k.startsWith(VALUE_METADATA_PREFIX)) + .collect(Collectors.toList()); + + // connector metadata key + final List connectorMetadataKeys = metadataKeys.stream() + .filter(k -> !k.startsWith(VALUE_METADATA_PREFIX)) + .collect(Collectors.toList()); + + // push down format metadata + // 拿到所有json中可以获取的metadata + final Map formatMetadata = valueDecodingFormat.listReadableMetadata(); + if (formatMetadata.size() > 0) { + // 去掉 value.前缀 + final List requestedFormatMetadataKeys = + formatMetadataKeys.stream() + .map(k -> k.substring(VALUE_METADATA_PREFIX.length())) + .collect(Collectors.toList()); + valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys); + } + + this.metadataKeys = connectorMetadataKeys; + this.producedDataType = producedDataType; + } + + @Override + public void applyWatermark(WatermarkStrategy watermarkStrategy) { + this.watermarkStrategy = watermarkStrategy; + } + + // 重写 equals + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final FireRocketMQDynamicSource that = (FireRocketMQDynamicSource) o; + return Objects.equals(producedDataType, that.producedDataType) + && Objects.equals(metadataKeys, that.metadataKeys) + && Objects.equals(physicalDataType, that.physicalDataType) + && Objects.equals(keyDecodingFormat, that.keyDecodingFormat) + && Objects.equals(valueDecodingFormat, that.valueDecodingFormat) + && Arrays.equals(keyProjection, that.keyProjection) + && Arrays.equals(valueProjection, that.valueProjection) + && Objects.equals(keyPrefix, that.keyPrefix) + && Objects.equals(properties, that.properties) + && Objects.equals(upsertMode, that.upsertMode) + && Objects.equals(watermarkStrategy, that.watermarkStrategy); + } + + @Override + public int hashCode() { + return Objects.hash( + producedDataType, + metadataKeys, + physicalDataType, + keyDecodingFormat, + valueDecodingFormat, + keyProjection, + valueProjection, + keyPrefix, + properties, + upsertMode, + watermarkStrategy); + } + + protected FireRocketMQSource createRocketMQConsumer( + DeserializationSchema keyDeserialization, + DeserializationSchema valueDeserialization, + TypeInformation producedTypeInfo) { + + final DynamicRocketMQDeserializationSchema.MetadataConverter[] metadataConverters = + Stream.of(ReadableMetadata.values()) + .filter(m -> metadataKeys.contains(m.key)) + .map(m -> m.converter) + .toArray(DynamicRocketMQDeserializationSchema.MetadataConverter[]::new); + + // 是否含有 connector metadata + final boolean hasMetadata = metadataKeys.size() > 0; + + // adjust physical arity with value format's metadata + final int adjustedPhysicalArity = + producedDataType.getChildren().size() - metadataKeys.size(); + + // adjust value format projection to include value format's metadata columns at the end + + final int[] adjustedValueProjection = + IntStream.concat( + IntStream.of(valueProjection), + IntStream.range( + keyProjection.length + valueProjection.length, + adjustedPhysicalArity)) + .toArray(); + + final RocketMQDeserializationSchema rocketMQDeserializer = + new DynamicRocketMQDeserializationSchema( + adjustedPhysicalArity, + keyDeserialization, + keyProjection, + valueDeserialization, + adjustedValueProjection, + hasMetadata, + metadataConverters, + producedTypeInfo, + upsertMode); + + return new FireRocketMQSource<>(rocketMQDeserializer, properties); + } + + private @Nullable + DeserializationSchema createDeserialization( + Context context, + @Nullable DecodingFormat> format, + int[] projection, + @Nullable String prefix) { + if (format == null) { + return null; + } + DataType physicalFormatDataType = + DataTypeUtils.projectRow(this.physicalDataType, projection); + if (prefix != null) { + physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); + } + return format.createRuntimeDecoder(context, physicalFormatDataType); + } + + enum ReadableMetadata { + TOPIC( + "topic", + DataTypes.STRING().notNull(), + new DynamicRocketMQDeserializationSchema.MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(MessageExt msg) { + + return StringData.fromString(msg.getTopic()); + } + }), + TIMESTAMP( + "timestamp", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), + new DynamicRocketMQDeserializationSchema.MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(MessageExt msg) { + return TimestampData.fromEpochMillis(msg.getBornTimestamp()); + } + }), + + OFFSET( + "offset", + DataTypes.BIGINT().notNull(), + new DynamicRocketMQDeserializationSchema.MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(MessageExt msg) { + return msg.getQueueOffset(); + } + }), + + TAGS("tags", + DataTypes.STRING().notNull(), + new DynamicRocketMQDeserializationSchema.MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(MessageExt msg) { + return StringData.fromString(msg.getTags()); + } + }), + BROKER( + "broker", + DataTypes.STRING().notNull(), + new DynamicRocketMQDeserializationSchema.MetadataConverter() { + private static final long serialVersionUID = 1L; + @Override + public Object read(MessageExt messageExt) { + return StringData.fromString(messageExt.getBrokerName()); + } + } + ), + QUEUE_ID( + "queue_id", + DataTypes.INT().notNull(), + new DynamicRocketMQDeserializationSchema.MetadataConverter() { + private static final long serialVersionUID = 1L; + @Override + public Object read(MessageExt messageExt) { + return messageExt.getQueueId(); + } + } + ) + ; + + public final String key; + + public final DataType dataType; + + public final DynamicRocketMQDeserializationSchema.MetadataConverter converter; + + ReadableMetadata(String key, DataType dataType, DynamicRocketMQDeserializationSchema.MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + } +} diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQDynamicTableFactory.java b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQDynamicTableFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..53daa7f71e4d2d157aae4e08833688c0d86e681d --- /dev/null +++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQDynamicTableFactory.java @@ -0,0 +1,185 @@ +package com.zto.fire.flink.sql.connector.rocketmq; + +import com.zto.fire.common.enu.Operation; +import com.zto.fire.common.lineage.parser.connector.MQDatasource; +import com.zto.fire.common.lineage.parser.connector.RocketmqConnectorParser; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.rocketmq.flink.RocketMQConfig; +import scala.collection.JavaConverters; + +import java.util.*; + +import static com.zto.fire.common.enu.Datasource.ROCKETMQ; +import static com.zto.fire.flink.sql.connector.rocketmq.FireRocketMQOptions.*; + +/** + * 描述:本类通过java SPI机制,对flink 的 table connector 进行扩展, + * 配置在source/META-INF/services/org.apache.flink.table.table.factories.Factory中, + * create table rocket_mq_table( + * ....... + * ) WITH ( + * 'connector' = 'rocketmq', + * ...... + * ) + * 时间:19/8/2021 下午5:18 + */ +public class FireRocketMQDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static final String IDENTIFIER = "rocketmq"; + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + final ReadableConfig tableOptions = helper.getOptions(); + + // 通过 key.format 找到针对 MQ key 的解析格式,如 'key.format' = 'json' + Optional>> keyDecodingFormat = + getKeyDecodingFormat(helper); + + final DecodingFormat> valueDecodingFormat = + getValueDecodingFormat(helper); + + // 验证主键和value的模式是否冲突 + validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); + + // ROW<`k_age` INT, `k_kk` STRING, `id` STRING, `name` STRING, `description` STRING, `weight` FLOAT> NOT NULL + final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); + + final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + + final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); + + final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + + Properties properties = getRocketMQStartupProperties(tableOptions); + + return new FireRocketMQDynamicSource( + physicalDataType, + keyDecodingFormat.orElse(null), + valueDecodingFormat, + keyProjection, + valueProjection, + keyPrefix, + properties, + false); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + final Set> options = new HashSet<>(); + options.add(NAME_SERVER); + options.add(TOPIC); + return options; + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(SCAN_START_MODE); + options.add(CONSUMER_GROUP); + options.add(FactoryUtil.FORMAT); + options.add(VALUE_FORMAT); + options.add(VALUE_FIELDS_INCLUDE); + options.add(KEY_FORMAT); + options.add(KEY_FIELDS_PREFIX); + options.add(KEY_FIELDS); + options.add(TAGS); + return options; + } + + // rocket mq consumer 需要的参数 + private static Properties getRocketMQStartupProperties(ReadableConfig tableOptions) { + String nameServers = tableOptions.get(NAME_SERVER); + String topic = tableOptions.get(TOPIC); + String consumerGroup = tableOptions.get(CONSUMER_GROUP); + + Set set = new HashSet<>(); + set.add(Operation.SOURCE); + + MQDatasource mqDatasource = new MQDatasource("rocketmq", nameServers, topic, consumerGroup, set); + // 消费rocketmq埋点信息 + RocketmqConnectorParser.addDatasource(ROCKETMQ, mqDatasource); + + String scanStartMode = tableOptions.get(SCAN_START_MODE); + Properties properties = new Properties(); + properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServers); + properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, topic); + properties.setProperty(RocketMQConfig.CONSUMER_GROUP, consumerGroup); + properties.setProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, scanStartMode); + String tags = tableOptions.get(TAGS); + if (tags != null) + properties.setProperty(RocketMQConfig.CONSUMER_TAG, tags); + return properties; + } + + private static Optional>> getKeyDecodingFormat( + FactoryUtil.TableFactoryHelper helper) { + final Optional>> keyDecodingFormat = + helper.discoverOptionalDecodingFormat( + DeserializationFormatFactory.class, KEY_FORMAT); + keyDecodingFormat.ifPresent( + format -> { + if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { + throw new ValidationException( + String.format( + "A key format should only deal with INSERT-only records. " + + "But %s has a changelog mode of %s.", + helper.getOptions().get(KEY_FORMAT), + format.getChangelogMode())); + } + }); + return keyDecodingFormat; + } + + // 'format' = 'zdtp-json' or 'value.format' = 'zdtp-json' + private static DecodingFormat> getValueDecodingFormat( + FactoryUtil.TableFactoryHelper helper) { + return helper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, FactoryUtil.FORMAT) + .orElseGet( + () -> helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT)); + + } + + private static void validatePKConstraints( + ObjectIdentifier tableName, CatalogTable catalogTable, Format format) { + if (catalogTable.getSchema().getPrimaryKey().isPresent() + && format.getChangelogMode().containsOnly(RowKind.INSERT)) { + Configuration options = Configuration.fromMap(catalogTable.getOptions()); + String formatName = + options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT)); + throw new ValidationException( + String.format( + "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + + " on the table, because it can't guarantee the semantic of primary key.", + tableName.asSummaryString(), formatName)); + } + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { +// return new RocketMQDynamicSink(); + return null; + } +} diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQOptions.java b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQOptions.java new file mode 100644 index 0000000000000000000000000000000000000000..28c64a9975c233aeaf094feb8820afcde1947fa0 --- /dev/null +++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQOptions.java @@ -0,0 +1,240 @@ +package com.zto.fire.flink.sql.connector.rocketmq; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.util.Preconditions; + +import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; + +import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot; + +public class FireRocketMQOptions { + private FireRocketMQOptions(){} + + public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; + public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; + + public static final ConfigOption SCAN_START_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("latest-offset") + .withDescription("指定rocket mq 消费偏移量,目前只支持(earliest-offset) 和 (latest-offset)"); + + + public static final ConfigOption NAME_SERVER = + ConfigOptions.key("properties.bootstrap.servers") + .stringType() + .noDefaultValue() + .withDescription(""); + + + public static final ConfigOption TOPIC = + ConfigOptions.key("topic") + .stringType() + .noDefaultValue() + .withDescription(""); + + public static final ConfigOption CONSUMER_GROUP = + ConfigOptions.key("properties.group.id") + .stringType() + .noDefaultValue() + .withDescription(""); + + public static final ConfigOption TAGS = + ConfigOptions.key("tags") + .stringType() + .noDefaultValue() + .withDescription("指定rocket mq 消费 tag,用逗号分隔"); + + public static final ConfigOption KEY_FORMAT = + ConfigOptions.key("key" + FORMAT_SUFFIX) + .stringType() + .noDefaultValue() + .withDescription( + "Defines the format identifier for encoding key data. " + + "The identifier is used to discover a suitable format factory."); + + public static final ConfigOption> KEY_FIELDS = + ConfigOptions.key("key.fields") + .stringType() + .asList() + .defaultValues() + .withDescription( + "Defines an explicit list of physical columns from the table schema " + + "that configure the data type for the key format. By default, this list is " + + "empty and thus a key is undefined."); + + public static final ConfigOption VALUE_FIELDS_INCLUDE = + ConfigOptions.key("value.fields-include") + .enumType(ValueFieldsStrategy.class) + .defaultValue(ValueFieldsStrategy.ALL) + .withDescription( + "Defines a strategy how to deal with key columns in the data type of " + + "the value format. By default, '" + + ValueFieldsStrategy.ALL + + "' physical " + + "columns of the table schema will be included in the value format which " + + "means that key columns appear in the data type for both the key and value " + + "format."); + + public static final ConfigOption VALUE_FORMAT = + ConfigOptions.key("value" + FORMAT_SUFFIX) + .stringType() + .noDefaultValue() + .withDescription( + "Defines the format identifier for encoding value data. " + + "The identifier is used to discover a suitable format factory."); + + public static final ConfigOption KEY_FIELDS_PREFIX = + ConfigOptions.key("key.fields-prefix") + .stringType() + .noDefaultValue() + .withDescription( + "Defines a custom prefix for all fields of the key format to avoid " + + "name clashes with fields of the value format. By default, the prefix is empty. " + + "If a custom prefix is defined, both the table schema and " + + "'" + + KEY_FIELDS.key() + + "' will work with prefixed names. When constructing " + + "the data type of the key format, the prefix will be removed and the " + + "non-prefixed names will be used within the key format. Please note that this " + + "option requires that '" + + VALUE_FIELDS_INCLUDE.key() + + "' must be '" + + ValueFieldsStrategy.EXCEPT_KEY + + "'."); + + +// public static StartupMode getStartupMode(ReadableConfig tableOptions) { +// StartupMode startupMode = tableOptions.getOptional(SCAN_START_MODE) +// .map(modeString -> { +// switch (modeString) { +// case (SCAN_STARTUP_MODE_VALUE_EARLIEST): +// return StartupMode.EARLIEST; +// case (SCAN_STARTUP_MODE_VALUE_LATEST): +// return StartupMode.LATEST; +// default:throw new TableException("Unsupported scan.start.mode:" + modeString); +// } +// }).orElse(StartupMode.LATEST); +// return startupMode; +// } + + /** + * Creates an array of indices that determine which physical fields of the table schema to + * include in the key format and the order that those fields have in the key format. + * + *

See {@link #KEY_FORMAT}, {@link #KEY_FIELDS}, and {@link #KEY_FIELDS_PREFIX} for more + * information. + */ + public static int[] createKeyFormatProjection( + ReadableConfig options, DataType physicalDataType) { + final LogicalType physicalType = physicalDataType.getLogicalType(); + Preconditions.checkArgument( + hasRoot(physicalType, LogicalTypeRoot.ROW), "Row data type expected."); + final Optional optionalKeyFormat = options.getOptional(KEY_FORMAT); + final Optional> optionalKeyFields = options.getOptional(KEY_FIELDS);// Optional[[k_age, k_kk]] + + if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) { + throw new ValidationException( + String.format( + "The option '%s' can only be declared if a key format is defined using '%s'.", + KEY_FIELDS.key(), KEY_FORMAT.key())); + } else if (optionalKeyFormat.isPresent() + && (!optionalKeyFields.isPresent() || optionalKeyFields.get().size() == 0)) { + throw new ValidationException( + String.format( + "A key format '%s' requires the declaration of one or more of key fields using '%s'.", + KEY_FORMAT.key(), KEY_FIELDS.key())); + } + + if (!optionalKeyFormat.isPresent()) { + return new int[0]; + } + + final String keyPrefix = options.getOptional(KEY_FIELDS_PREFIX).orElse(""); + + final List keyFields = optionalKeyFields.get(); + final List physicalFields = LogicalTypeChecks.getFieldNames(physicalType); + return keyFields.stream() + .mapToInt( + keyField -> { + final int pos = physicalFields.indexOf(keyField); + // check that field name exists + if (pos < 0) { + throw new ValidationException( + String.format( + "Could not find the field '%s' in the table schema for usage in the key format. " + + "A key field must be a regular, physical column. " + + "The following columns can be selected in the '%s' option:\n" + + "%s", + keyField, KEY_FIELDS.key(), physicalFields)); + } + // check that field name is prefixed correctly + if (!keyField.startsWith(keyPrefix)) { + throw new ValidationException( + String.format( + "All fields in '%s' must be prefixed with '%s' when option '%s' " + + "is set but field '%s' is not prefixed.", + KEY_FIELDS.key(), + keyPrefix, + KEY_FIELDS_PREFIX.key(), + keyField)); + } + return pos; + }) + .toArray(); + } + + /** + * Creates an array of indices that determine which physical fields of the table schema to + * include in the value format. + * + *

See {@link #VALUE_FORMAT}, {@link #VALUE_FIELDS_INCLUDE}, and {@link #KEY_FIELDS_PREFIX} + * for more information. + */ + public static int[] createValueFormatProjection( + ReadableConfig options, DataType physicalDataType) { + final LogicalType physicalType = physicalDataType.getLogicalType(); + Preconditions.checkArgument( + hasRoot(physicalType, LogicalTypeRoot.ROW), "Row data type expected."); + final int physicalFieldCount = LogicalTypeChecks.getFieldCount(physicalType); + final IntStream physicalFields = IntStream.range(0, physicalFieldCount); + + final String keyPrefix = options.getOptional(KEY_FIELDS_PREFIX).orElse(""); + + final ValueFieldsStrategy strategy = options.get(VALUE_FIELDS_INCLUDE); + if (strategy == ValueFieldsStrategy.ALL) { + if (keyPrefix.length() > 0) { + throw new ValidationException( + String.format( + "A key prefix is not allowed when option '%s' is set to '%s'. " + + "Set it to '%s' instead to avoid field overlaps.", + VALUE_FIELDS_INCLUDE.key(), + ValueFieldsStrategy.ALL, + ValueFieldsStrategy.EXCEPT_KEY)); + } + return physicalFields.toArray(); + } else if (strategy == ValueFieldsStrategy.EXCEPT_KEY) { + final int[] keyProjection = createKeyFormatProjection(options, physicalDataType); + return physicalFields + .filter(pos -> IntStream.of(keyProjection).noneMatch(k -> k == pos)) + .toArray(); + } + throw new TableException("Unknown value fields strategy:" + strategy); + } + + enum ValueFieldsStrategy{ + EXCEPT_KEY, + ALL, + } +} diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQSource.java b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQSource.java new file mode 100644 index 0000000000000000000000000000000000000000..b9c1d0dfed5c8053feeecaf8c2be66d838b5dae8 --- /dev/null +++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/FireRocketMQSource.java @@ -0,0 +1,418 @@ +package com.zto.fire.flink.sql.connector.rocketmq; + + +import org.apache.commons.collections.map.LinkedMap; +import org.apache.commons.lang3.Validate; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.consumer.store.ReadOffsetType; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.RocketMQConfig; +import org.apache.rocketmq.flink.RocketMQUtils; +import org.apache.rocketmq.flink.RunningChecker; +import org.apache.rocketmq.flink.util.MetricUtils; +import org.apache.rocketmq.flink.util.RetryUtil; +import org.apache.rocketmq.flink.watermark.WaterMarkForAll; +import org.apache.rocketmq.flink.watermark.WaterMarkPerQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.ManagementFactory; +import java.util.*; +import java.util.concurrent.*; + +/** + * 描述:{@link org.apache.rocketmq.client.consumer.DefaultMQPullConsumer} 弃用, + * 推荐使用 {@link DefaultLitePullConsumer} + * 时间:3/11/2021 上午9:31 + */ +public class FireRocketMQSource extends RichParallelSourceFunction + implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable { + private static final Logger LOG = LoggerFactory.getLogger(FireRocketMQSource.class); + private static final long serialVersionUID = 1L; + // state name + private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states"; + private transient ListState> unionOffsetStates; + private LinkedMap pendingOffsetsToCommit = new LinkedMap(); + private RunningChecker runningChecker; + private transient volatile boolean restored; + private transient boolean enableCheckpoint; + private RocketMQCollector rocketMQCollector; + private List assignQueues; + private ExecutorService executor; + private DefaultLitePullConsumer consumer; + private final RocketMQDeserializationSchema deserializer; + private final Properties props; + private String topic; + private String group; + private final Map offsetTable = new ConcurrentHashMap<>(); + private ScheduledExecutorService timer; + // watermark in source + private WaterMarkPerQueue waterMarkPerQueue; + private WaterMarkForAll waterMarkForAll; + private Meter tpsMetric; + + private boolean sendAllTag; + private Set tagSet; + + public FireRocketMQSource(RocketMQDeserializationSchema deserializer, Properties props) { + this.deserializer = deserializer; + this.props = props; + String tags = props.getProperty(RocketMQConfig.CONSUMER_TAG); + // 如果没传tag 或者 tag 为 *,则表示发送所有数据 + if (tags == null || "*".equals(tags.trim())) { + sendAllTag = true; + } else { + tagSet = new HashSet<>(); + // 可以指定过滤多个tag,使用逗号分隔 如:accuracy1,accuracy2 + String[] split = tags.split(","); + for (String s : split) { + tagSet.add(s.trim()); + } + } + } + + @Override + public void open(Configuration parameters) throws Exception { + LOG.debug("source run ......"); + + Validate.notEmpty(props, "Consumer properties can not be empty"); + + runningChecker = new RunningChecker(); + runningChecker.setRunning(true); + this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC); + this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP); + + Validate.notEmpty(topic, "Consumer topic can not be empty"); + Validate.notEmpty(group, "Consumer group can not be empty"); + this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled(); + + rocketMQCollector = new RocketMQCollector(); + + waterMarkPerQueue = new WaterMarkPerQueue(5000); + + waterMarkForAll = new WaterMarkForAll(5000); + + Counter outputCounter = getRuntimeContext().getMetricGroup() + .counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter()); + tpsMetric = getRuntimeContext().getMetricGroup() + .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60)); + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("rmq-pull-thread-%d").build(); + executor = Executors.newCachedThreadPool(threadFactory); + + timer = Executors.newSingleThreadScheduledExecutor(); + // 初始化consumer 并分配队列 + startConsumer(); + } + + @Override + public void run(SourceContext context) throws Exception { + // 只有当consumer分配到queue时,才进行消费 + if (!assignQueues.isEmpty()) { + timer.scheduleAtFixedRate(() -> { + context.emitWatermark(waterMarkPerQueue.getCurrentWatermark()); + context.emitWatermark(waterMarkForAll.getCurrentWatermark()); + }, 5, 5, TimeUnit.SECONDS); + + this.executor.execute(() -> RetryUtil.call(() -> { + while (runningChecker.isRunning()) { + List messages = consumer.poll(); + for (MessageExt msg : messages) { + deserializer.deserialize(msg, rocketMQCollector); + // get record from collector and use sourceContext emit it to down task + Queue records = rocketMQCollector.getRecords(); + synchronized (context.getCheckpointLock()) { + OUT record; + while ((record = records.poll()) != null) { + String tags = msg.getTags(); + // 如果是发送所有消息(未指定tag或者tag为*) 或者 指定tag了,并且消息的tag在指定的tag之内,则下发消息 + if (sendAllTag || tagSet.contains(tags)) { + context.collectWithTimestamp(record, msg.getBornTimestamp()); + } + } + // record offset to offset table + recordBrokerOffset(msg); + + // update max eventTime per queue + waterMarkPerQueue.extractTimestamp(buildMessageQueue(msg), msg.getBornTimestamp()); + waterMarkForAll.extractTimestamp(msg.getBornTimestamp()); + tpsMetric.markEvent(); + } + } + } + return true; + }, "RuntimeException")); + } + + awaitTermination(); + } + + private void awaitTermination() throws InterruptedException { + while (runningChecker.isRunning()) { + Thread.sleep(50); + } + } + + @Override + public void cancel() { + LOG.debug("cancel ..."); + runningChecker.setRunning(false); + + if (consumer != null) { + consumer.shutdown(); + } + + if (offsetTable != null) { + offsetTable.clear(); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); + if (posInMap == -1) { + LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); + return; + } + + Map offsets = (Map) pendingOffsetsToCommit.remove(posInMap); + + // remove older checkpoints in map + for (int i = 0; i < posInMap; i++) { + pendingOffsetsToCommit.remove(0); + } + + if (offsets == null || offsets.size() == 0) { + LOG.debug("Checkpoint state was empty."); + return; + } + + for (Map.Entry entry : offsets.entrySet()) { + // MessageQueue中记录的 offset 会比此消息实际offset少1,所以在更新的时候需要 + 1 + LOG.info("update {} to {}", entry.getKey(), entry.getValue() + 1); +// consumer.commitSync(); +// consumer.getOffsetStore().updateConsumeOffsetToBroker(entry.getKey(), entry.getValue(), true); + consumer.getOffsetStore().updateOffset(entry.getKey(), entry.getValue() + 1, false); + } + } + + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + // called when a snapshot for a checkpoint is requested + LOG.info("Snapshotting state {} ...", context.getCheckpointId()); + if (!runningChecker.isRunning()) { + LOG.info("snapshotState() called on closed source; returning null."); + return; + } + + // Discovery topic Route change when snapshot + // 每次进行snapshot时,进行rocket-mq queue检查, + // 如果有新增的queue,则进行报错,需要进行重启重新分配queue。 + + // todo 这里可以当发现queue增加后,直接进行分配消费 + RetryUtil.call(() -> { + List newQueues = getAssignQueues(); + Collections.sort(newQueues); + if (!this.assignQueues.equals(newQueues)) { + throw new RuntimeException("topic route changed, this task need to be restarted!"); + } + return true; + }, "RuntimeException due to topic route changed"); + + + unionOffsetStates.clear(); + Map currentOffset = new HashMap<>(assignQueues.size()); + for (Map.Entry entry : offsetTable.entrySet()) { + LOG.info("snapshot {}, offset {}", entry.getKey(), entry.getValue()); + unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue())); + currentOffset.put(entry.getKey(), entry.getValue()); + } + pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffset); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + LOG.info("initialize State ..."); + unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>( + OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { + }))); + this.restored = context.isRestored(); + if (restored) { + unionOffsetStates.get().forEach(t -> offsetTable.put(t.f0, t.f1)); + LOG.info("Restore from state, {}", offsetTable); + } else { + LOG.info("No restored from state ......"); + } + } + + @Override + public TypeInformation getProducedType() { + return deserializer.getProducedType(); + } + + private void recordBrokerOffset(MessageExt message) { + MessageQueue mq = buildMessageQueue(message); + long queueOffset = message.getQueueOffset(); + offsetTable.put(mq, queueOffset); + // 如果没有启用chk,并且没有启用自动提交,那么每次要提交offset + if (!enableCheckpoint && !consumer.isAutoCommit()) { + consumer.commitSync(); + } + } + + private MessageQueue buildMessageQueue(MessageExt message) { + String topic = message.getTopic(); + String brokerName = message.getBrokerName(); + int queueId = message.getQueueId(); + return new MessageQueue(topic, brokerName, queueId); + } + + private void startConsumer() throws MQClientException { + LOG.info("consumer start "); + this.consumer = new DefaultLitePullConsumer(this.group, RocketMQConfig.buildAclRPCHook(props)); + String nameServers = props.getProperty(RocketMQConfig.NAME_SERVER_ADDR); + Validate.notEmpty(nameServers); + this.consumer.setNamesrvAddr(nameServers); + this.consumer.setPollNameServerInterval(RocketMQUtils.getInteger(props, + RocketMQConfig.NAME_SERVER_POLL_INTERVAL, RocketMQConfig.DEFAULT_NAME_SERVER_POLL_INTERVAL)); + this.consumer.setHeartbeatBrokerInterval(RocketMQUtils.getInteger(props, + RocketMQConfig.BROKER_HEART_BEAT_INTERVAL, RocketMQConfig.DEFAULT_BROKER_HEART_BEAT_INTERVAL)); + String runtimeName = ManagementFactory.getRuntimeMXBean().getName(); + int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask(); + String instanceName = RocketMQUtils.getInstanceName(runtimeName, topic, group, + String.valueOf(indexOfThisSubTask), String.valueOf(System.nanoTime())); + this.consumer.setInstanceName(instanceName); + boolean autoCommit = RocketMQUtils.getBoolean(props, RocketMQConfig.OFFSET_AUTO_COMMIT, false); + this.consumer.setAutoCommit(autoCommit); + this.consumer.start(); + this.assignQueues = getAssignQueues(); + // 如果此task分配的队列为空,则停止此consumer + if (!this.assignQueues.isEmpty()) { + this.consumer.assign(this.assignQueues); + // 从offsetTable中移除不在本task 中分配的队列,做snapshot, + // 并从恢复的offset中,seek到上次的offset + removeUnAssignQueues(); + + // 每个MessageQueue指定offset消费 + perQueueSeekToSpecialOffset(); + } else { + this.offsetTable.clear(); + } + } + + private List getAssignQueues() throws MQClientException { + final RuntimeContext ctx = getRuntimeContext(); + int taskNumber = ctx.getNumberOfParallelSubtasks(); + int taskIndex = ctx.getIndexOfThisSubtask(); + Collection totalQueues = this.consumer.fetchMessageQueues(this.topic); + List shouldAssignQueues = RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex); + return shouldAssignQueues; + } + + // 从offsetTable中移除此consumer未消费的信息 + private void removeUnAssignQueues() throws MQClientException { + // offset table 开始从状态恢复,保存的是全部queue的信息,移除多余的 + this.offsetTable.forEach((k, v) -> { + if (!this.assignQueues.contains(k)) { + this.offsetTable.remove(k); + } + }); + } + + // 根据不同策略,指定不同offset + private void perQueueSeekToSpecialOffset() throws MQClientException { + for (MessageQueue mq : this.assignQueues) { + if (this.offsetTable.containsKey(mq)) { + long nextOffset = this.offsetTable.get(mq) + 1; + LOG.info("consumer seek {} from state, offset {}", mq, nextOffset); + this.consumer.seek(mq, nextOffset); + } else { + // 1、如果是从状态恢复,但是找不到,那么这个offset 就是新加入的 + // 2、 另一种可能是,当前这种策略,每个offset table中只保留当前TaskManager的 MQ offset, + // 如果在这段时间,某个mq没有数据尽量,checkpoint时没有此mq的数据,这时也找不到,如果设置从seekToBegin就会有问题 + // 目前在生产环境,这种情况应该不会出现。 + if (this.restored) { + this.consumer.seekToBegin(mq); + LOG.info("restore but not found in offsetTable, seek {} to begin", mq); + } else { + // 不是restored,那么就是直接重启,根据提供的策略选择offset + String offsetFrom = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, "latest"); +// Offset initialOffsetFrom = Offset.valueOf(offsetFrom); + switch (offsetFrom) { + case "earliest-offset": + this.consumer.seekToBegin(mq); + LOG.info("{} seek to begin ......", mq); + break; + case "latest-offset": + this.consumer.seekToEnd(mq); + LOG.info("{} seek to end ......", mq); + break; + case "store": + long storedOffset = this.consumer.getOffsetStore().readOffset(mq, ReadOffsetType.READ_FROM_STORE); + LOG.info("{} seek to stored offset {}", mq, storedOffset); + this.consumer.seek(mq, storedOffset); + break; + default: + throw new RuntimeException(String.format("Not supported Offset [%s]", offsetFrom)); + } + } + } + } + } + + // 用来保存值 + private class RocketMQCollector implements Collector { + private final Queue records = new ArrayDeque<>(); + private boolean endOfStreamSignalled = false; + + @Override + public void collect(OUT record) { + // do not emit subsequent elements if the end of the stream reached + if (endOfStreamSignalled || deserializer.isEndOfStream(record)) { + endOfStreamSignalled = true; + return; + } + records.add(record); + } + + public Queue getRecords() { + return records; + } + + public boolean isEndOfStreamSignalled() { + return endOfStreamSignalled; + } + + public void setEndOfStreamSignalled(boolean endOfStreamSignalled) { + this.endOfStreamSignalled = endOfStreamSignalled; + } + + @Override + public void close() { + + } + } +} diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/RocketMQDeserializationSchema.java b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/RocketMQDeserializationSchema.java new file mode 100644 index 0000000000000000000000000000000000000000..9e284650c1576c6676b29093cd8c50236c872b93 --- /dev/null +++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/com/zto/fire/flink/sql/connector/rocketmq/RocketMQDeserializationSchema.java @@ -0,0 +1,22 @@ +package com.zto.fire.flink.sql.connector.rocketmq; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; +import org.apache.rocketmq.common.message.MessageExt; + +import java.io.Serializable; + +public interface RocketMQDeserializationSchema extends Serializable, ResultTypeQueryable { + + default void open(DeserializationSchema.InitializationContext context) throws Exception {} + + T deserialize(MessageExt message) throws Exception; + + default void deserialize(MessageExt message, Collector collector) throws Exception { + T record = deserialize(message); + if (record != null) collector.collect(record); + } + + boolean isEndOfStream(T record); +} diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java index 29ae2b4b164738fef0e2aa8f5aca7a93b7b74cfe..2088439d61446e1e6e72661b4a40f8a036550921 100644 --- a/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java +++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java @@ -52,6 +52,7 @@ public class RocketMQConfig { public static final String PRODUCER_RETRY_TIMES = "producer.retry.times"; public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3; + public static final String OFFSET_AUTO_COMMIT = "offset.auto.commit"; public static final String PRODUCER_TIMEOUT = "producer.timeout"; public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds diff --git a/fire-connectors/flink-connectors/flink-rocketmq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/fire-connectors/flink-connectors/flink-rocketmq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index a49aff8a65c4fbe16505a29422c49a20dac406d5..0db6809a4432ce9e6d24c4998fef1bab20a84d7d 100644 --- a/fire-connectors/flink-connectors/flink-rocketmq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/fire-connectors/flink-connectors/flink-rocketmq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -1 +1,3 @@ -com.zto.fire.flink.sql.connector.rocketmq.RocketMQDynamicTableFactory \ No newline at end of file +com.zto.fire.flink.sql.connector.rocketmq.RocketMQDynamicTableFactory + +com.zto.fire.flink.sql.connector.rocketmq.FireRocketMQDynamicTableFactory \ No newline at end of file diff --git a/fire-engines/fire-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/fire-engines/fire-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index a49aff8a65c4fbe16505a29422c49a20dac406d5..0160d6485a3617bb24b7e4c0f1911a84bf3728fc 100644 --- a/fire-engines/fire-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/fire-engines/fire-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -1 +1,3 @@ -com.zto.fire.flink.sql.connector.rocketmq.RocketMQDynamicTableFactory \ No newline at end of file +com.zto.fire.flink.sql.connector.rocketmq.RocketMQDynamicTableFactory + +com.zto.fire.flink.formats.json.ZDTPJsonFormatFactory \ No newline at end of file diff --git a/fire-examples/flink-examples/pom.xml b/fire-examples/flink-examples/pom.xml index 3467d24f25868d3ec79e83859738b705d4d50b5e..96eb8ad960eda7d28f2444ae3471e472c699c525 100644 --- a/fire-examples/flink-examples/pom.xml +++ b/fire-examples/flink-examples/pom.xml @@ -31,6 +31,11 @@ + + com.zto.fire + fire-connector-flink-format-zdtp_1.14_2.12 + ${fire.version} + org.apache.kafka kafka_${scala.binary.version} diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/Test.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/Test.scala index 68c40f6f3d9c9359de081737342d4dd867be9484..3148140c97fee3e9967621d61edbe27dc5d2d750 100644 --- a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/Test.scala +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/Test.scala @@ -17,7 +17,7 @@ package com.zto.fire.examples.flink -import com.zto.VersionTest +//import com.zto.VersionTest import com.zto.fire._ import com.zto.fire.common.anno.Config import com.zto.fire.common.enu.Operation @@ -42,20 +42,20 @@ object Test extends FlinkStreaming { @Process def kafkaSource: Unit = { - println("version=" + VersionTest.version()) - val stream = this.fire.addSourceLineage(new SourceFunction[Int] { - val random = new Random() - override def run(ctx: SourceFunction.SourceContext[Int]): Unit = { - while (true) { - ctx.collect(random.nextInt(1000)) - Thread.sleep(1000) - } - } - - override def cancel(): Unit = ??? - })(MQDatasource("kafka", "localhost:9092", "fire", "fire"), Operation.SOURCE) - - - stream.print() +// println("version=" + VersionTest.version()) +// val stream = this.fire.addSourceLineage(new SourceFunction[Int] { +// val random = new Random() +// override def run(ctx: SourceFunction.SourceContext[Int]): Unit = { +// while (true) { +// ctx.collect(random.nextInt(1000)) +// Thread.sleep(1000) +// } +// } +// +// override def cancel(): Unit = ??? +// })(MQDatasource("kafka", "localhost:9092", "fire", "fire"), Operation.SOURCE) +// +// +// stream.print() } } \ No newline at end of file diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/ZTORouteBillCDC.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/ZTORouteBillCDC.scala new file mode 100644 index 0000000000000000000000000000000000000000..4795c58a8780a8469758308a05412cead66301bd --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/ZTORouteBillCDC.scala @@ -0,0 +1,320 @@ +package com.zto.fire.examples.flink + +import com.zto.fire.common.anno.Config +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import com.zto.fire._ + +/** + * 运单宽表binlog cdc 数据 + */ + +@Config( + """ + |flink.stream.number.execution.retries=3 + |flink.stream.time.characteristic=ProcessingTime + |flink.force.kryo.enable=false + |flink.force.avro.enable=false + |flink.generic.types.enable=true + |flink.execution.mode=pipelined + |flink.auto.type.registration.enable=true + |flink.auto.generate.uid.enable=true + |flink.max.parallelism=1536 + |""") +@Streaming(interval = 300, timeout = 1800, unaligned = false, pauseBetween = 180, + concurrent = 1, failureNumber = 3, parallelism = 3, autoStart = true, disableOperatorChaining = true) +object ZTORouteBillCDC extends FlinkStreaming { + override def process(): Unit = { + +// val dbTable = parameter.get("dbTable", "lyznhb_ml.zto_route_bill_cdc_test") +// println(s"dbTable : $dbTable") + + val rocketMQTableName = "zto_route_bill_cdc" + val kafkaSql = + s""" + |create table $rocketMQTableName ( + | `row_kind` STRING METADATA FROM 'value.row_kind' VIRTUAL, + | `mq_topic` STRING METADATA FROM 'topic' VIRTUAL, + | `mq_broker` STRING METADATA FROM 'broker' VIRTUAL, + | `mq_queue_id` INT METADATA FROM 'queue_id' VIRTUAL, + | `mq_offset` BIGINT METADATA FROM 'offset' VIRTUAL, + | `mq_timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, + | `id` bigint, + | `customize_id` STRING, + | `bill_code` STRING, + | `order_code` STRING, + | `fst_code` STRING, + | `operate_weight` decimal(10, 2), + | `estimate_weight` decimal(10, 2), + | `estimate_volume` decimal(10, 2), + | `goods_type` STRING, + | `rec_site_id` BIGINT, + | `disp_site_id` BIGINT, + | `rec_scan_time` STRING, + | `disp_scan_time` STRING, + | `sign_scan_time` STRING, + | `plan_sign_time` STRING, + | `route_code` STRING, + | `route_index` INT, + | `route_type` STRING, + | `ori_site_id` BIGINT, + | `des_site_id` BIGINT, + | `ori_unload_code` STRING, + | `des_unload_code` STRING, + | `send_site_id` BIGINT, + | `come_site_id` BIGINT, + | `send_scan_time` STRING, + | `come_scan_time` STRING, + | `last_send_time` STRING, + | `last_come_time` STRING, + | `crossing_type` INT, + | `line_name` STRING, + | `line_number` STRING, + | `time_cost` INT, + | `transport_type` STRING, + | `unload_port` STRING, + | `owner_bag_no` STRING, + | `exec_status` INT, + | `truck_number` STRING, + | `truck_tail_number` STRING, + | `truck_sign_code` STRING, + | `batch_number` STRING, + | `schedule_id` STRING, + | `plan_send_time` STRING, + | `plan_arrive_time` STRING, + | `actual_send_time` STRING, + | `actual_arrive_time` STRING, + | `start_hours` STRING, + | `operate_time` INT, + | `clear_store_time` STRING, + | `receive_time` STRING, + | `dispatch_time` STRING, + | `exception_type` INT, + | `event_id` STRING, + | `event_type` INT, + | `is_blocked` INT, + | `is_deleted` INT, + | `remark` STRING, + | `created_by` STRING, + | `created_on` STRING, + | `modified_by` STRING, + | `modified_on` STRING, + | `subarea_time` STRING, + | `disp_way_in_time` STRING, + | `route_situation` STRING, + | `is_problem` INT, + | `first_disp_time` STRING, + | `first_disp_way_in_time` STRING, + | `promise_time` STRING, + | `intercept_status` INT, + | `value_added_type` INT, + | `last_rec_scan_time` STRING, + | `clerk_rec_scan_time` STRING, + | `packet_exception_type` BIGINT, + | `packet_exception_effect` BIGINT, + | `packet_exception_time` STRING, + | `assessment_type` INT, + | `assessment_site_id` BIGINT, + | `predicate_assessment_time` STRING + |) WITH ( + | 'connector' = 'rocketmq', + | 'topic' = 'ROUTE_BILL_BINLOG', + | 'properties.bootstrap.servers' = '10.9.15.40:9876;10.9.63.237:9876', + | 'properties.group.id' = 'ROUTE_BILL_BINLOG_FLINK_CONSUMER_VERIFY', + | 'scan.startup.mode' = 'earliest-offset', + | 'zdtp-json.cdc-write-hive' = 'false', + | 'format' = 'zdtp-json' + | ) + """.stripMargin + + this.fire.sql(kafkaSql) + + val printSql = + """ + |create table print_table( + | `row_kind` STRING, + | `mq_topic` STRING, + | `mq_broker` STRING , + | `mq_queue_id` INT , + | `mq_offset` BIGINT , + | `mq_timestamp` STRING , + | `id` bigint, + | `customize_id` STRING, + | `bill_code` STRING, + | `order_code` STRING, + | `fst_code` STRING, + | `operate_weight` decimal(10, 2), + | `estimate_weight` decimal(10, 2), + | `estimate_volume` decimal(10, 2), + | `goods_type` STRING, + | `rec_site_id` BIGINT, + | `disp_site_id` BIGINT, + | `rec_scan_time` STRING, + | `disp_scan_time` STRING, + | `sign_scan_time` STRING, + | `plan_sign_time` STRING, + | `route_code` STRING, + | `route_index` INT, + | `route_type` STRING, + | `ori_site_id` BIGINT, + | `des_site_id` BIGINT, + | `ori_unload_code` STRING, + | `des_unload_code` STRING, + | `send_site_id` BIGINT, + | `come_site_id` BIGINT, + | `send_scan_time` STRING, + | `come_scan_time` STRING, + | `last_send_time` STRING, + | `last_come_time` STRING, + | `crossing_type` INT, + | `line_name` STRING, + | `line_number` STRING, + | `time_cost` INT, + | `transport_type` STRING, + | `unload_port` STRING, + | `owner_bag_no` STRING, + | `exec_status` INT, + | `truck_number` STRING, + | `truck_tail_number` STRING, + | `truck_sign_code` STRING, + | `batch_number` STRING, + | `schedule_id` STRING, + | `plan_send_time` STRING, + | `plan_arrive_time` STRING, + | `actual_send_time` STRING, + | `actual_arrive_time` STRING, + | `start_hours` STRING, + | `operate_time` INT, + | `clear_store_time` STRING, + | `receive_time` STRING, + | `dispatch_time` STRING, + | `exception_type` INT, + | `event_id` STRING, + | `event_type` INT, + | `is_blocked` INT, + | `is_deleted` INT, + | `remark` STRING, + | `created_by` STRING, + | `created_on` STRING, + | `modified_by` STRING, + | `modified_on` STRING, + | `subarea_time` STRING, + | `disp_way_in_time` STRING, + | `route_situation` STRING, + | `is_problem` INT, + | `first_disp_time` STRING, + | `first_disp_way_in_time` STRING, + | `promise_time` STRING, + | `intercept_status` INT, + | `value_added_type` INT, + | `last_rec_scan_time` STRING, + | `clerk_rec_scan_time` STRING, + | `packet_exception_type` BIGINT, + | `packet_exception_effect` BIGINT, + | `packet_exception_time` STRING, + | `assessment_type` INT, + | `assessment_site_id` BIGINT, + | `predicate_assessment_time` STRING + |) WITH ( + | 'connector' = 'print' + |) + |""".stripMargin + + this.fire.sql(printSql) + // checkToHive() + + val insertSql = + s""" + |insert into + |print_table + |select + | row_kind, + | mq_topic, + | mq_broker, + | mq_queue_id, + | mq_offset, + | DATE_FORMAT(mq_timestamp, 'yyyy-MM-dd HH:mm:ss.SSS') mq_timestamp, + | id, + | customize_id, + | bill_code, + | order_code, + | fst_code, + | operate_weight, + | estimate_weight, + | estimate_volume, + | goods_type, + | rec_site_id, + | disp_site_id, + | rec_scan_time, + | disp_scan_time, + | sign_scan_time, + | plan_sign_time, + | route_code, + | route_index, + | route_type, + | ori_site_id, + | des_site_id, + | ori_unload_code, + | des_unload_code, + | send_site_id, + | come_site_id, + | send_scan_time, + | come_scan_time, + | last_send_time, + | last_come_time, + | crossing_type, + | line_name, + | line_number, + | time_cost, + | transport_type, + | unload_port, + | owner_bag_no, + | exec_status, + | truck_number, + | truck_tail_number, + | truck_sign_code, + | batch_number, + | schedule_id, + | plan_send_time, + | plan_arrive_time, + | actual_send_time, + | actual_arrive_time, + | start_hours, + | operate_time, + | clear_store_time, + | receive_time, + | dispatch_time, + | exception_type, + | event_id, + | event_type, + | is_blocked, + | is_deleted, + | remark, + | created_by, + | created_on, + | modified_by, + | modified_on, + | subarea_time, + | disp_way_in_time, + | route_situation, + | is_problem, + | first_disp_time, + | first_disp_way_in_time, + | promise_time, + | intercept_status, + | value_added_type, + | last_rec_scan_time, + | clerk_rec_scan_time, + | packet_exception_type, + | packet_exception_effect, + | packet_exception_time, + | assessment_type, + | assessment_site_id, + | predicate_assessment_time + |from + |default_catalog.default_database.$rocketMQTableName + """.stripMargin + this.fire.sql(insertSql) + + } +} diff --git a/fire-examples/spark-examples/src/main/scala/com/zto/fire/examples/spark/Test.scala b/fire-examples/spark-examples/src/main/scala/com/zto/fire/examples/spark/Test.scala index c76c877381dd1d400bbd9f8605ae8c0d634f052d..bd606b11efc698c26fba82360cb12efe7416bf88 100644 --- a/fire-examples/spark-examples/src/main/scala/com/zto/fire/examples/spark/Test.scala +++ b/fire-examples/spark-examples/src/main/scala/com/zto/fire/examples/spark/Test.scala @@ -46,10 +46,10 @@ object Test extends SparkStreaming { val stream = this.fire.createKafkaDirectStream() stream.foreachRDD(rdd => { rdd.foreachPartition(it => { - LineageManager.addPrintLineage(Operation.SINK) +// LineageManager.addPrintLineage(Operation.SINK) }) }) - LineageManager.addMySQLLineage("jdbc://localhost:3306/fire", "t_user", "root", Operation.INSERT_INTO) +// LineageManager.addMySQLLineage("jdbc://localhost:3306/fire", "t_user", "root", Operation.INSERT_INTO) stream.print() LineageManager.addSql("""select * from tmp.baseorganize""") LineageManager.print(10)