# transflow
**Repository Path**: ycpanda/transflow
## Basic Information
- **Project Name**: transflow
- **Description**: java实现 类似于 logstash 的数据流转平台,插件化 input output
- **Primary Language**: Java
- **License**: AGPL-3.0
- **Default Branch**: master
- **Homepage**: http://sunleader1997.top:18987/#/mgmt/job
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 2
- **Created**: 2025-05-29
- **Last Updated**: 2025-05-29
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## Transflow
集成 reactor 类似于 logstash,插件化数据流转服务
## 为什么脑子一热
- 多个产品线研发交付后,客户厂商往往需要将一些业务数据对接到第三方的系统上,定制化版本的维护(需要基于某个版本源码进行修改打包以及部署升级)耗费时间与人力成本
- 传统ELK都是通过配置文件的方式修改数据处理与链路,学习成本较高,配置文件复杂,阅读困难,配置文件维护成本也高
例如 logstash 的配置文件, 往往无法快速理解数据链路是如何构建的
```nginx.conf
input{
file{
path => "/var/log/nginx/access.log"
start_position => "beginning"
type => "nginx_access_log"
}
kafka{
bootstrap_servers => ["localhost:9092"]
group_id => "logstash_group"
topic => "nginx_access_log"
codec => json{
support_multiple_values => true
}
type => "file_audit_log"
}
}
filter{
grok{
match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"(?:-|%{DATA:referrer})\" \"%{DATA:user_agent}\" (?:%{IP:proxy}|-) %{DATA:upstream_addr} %{NUMBER:upstream_request_time:float} %{NUMBER:upstream_response_time:float}"}
match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"%{DATA:referrer}\" \"%{DATA:user_agent}\" \"%{DATA:proxy}\""}
}
if [request] {
urldecode {
field => "request"
}
ruby {
init => "@kname = ['url_path','url_arg']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
event.append(new_event)"
}
if [url_arg] {
ruby {
init => "@kname = ['key', 'value']"
code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"
}
}
}
geoip{
source => "clientip"
}
useragent{
source => "user_agent"
target => "ua"
remove_field => "user_agent"
}
date {
match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
locale => "en"
}
mutate{
remove_field => ["message","timestamp","request","url_arg"]
}
}
output{
elasticsearch {
hosts => "localhost:9200"
index => "nginx-access-log-%{+YYYY.MM.dd}"
}
if [type] == "syslog1" and [logType] == "file_audit_log"{
syslog {
facility => "local7"
appname => "nginx-access-log"
}
}
}
```
- 参考调度系统的设计理念,公司各组件/服务之间的数据交互,应该尽可能低耦合,对于复杂的业务场景下,可以动态插拔组件
- ```
例如文件传输的场景下,对于文件同步服务,仅需告诉该服务哪些文件需要同步,而对于同步之前的审批等业务,可以根据具体业务场景进行串连
```
- 参考 NIFI 后发现性能不高,二次开发困难
NIFI

### 功能特点:
- 拖拽式编辑数据流
- 脚本化数据处理
- 数据批量处理 (比方 es 批量存储)
- 数据流实时监控
- 高吞吐,基于 reactor 实现高吞吐数据流
## 演示

# 打包
* transflow-all 下 执行 mvn clean package, 最终成品在 transflow-app 的 target 下 .zip
* 注意,会打包前端资源以及plugin,plugin放在 /plugins下,也会被打进 zip
* 发布版本因为大小受限,只提供基础demo插件
# Linux 安装
```
unzip transflow-app-0.1.0-distribution.zip -d /
```
# Linux 启动
- 通过 systemctl 启动
```
systemctl start transflow
```
- 通过脚本启动
```
cd /opt/transflow
sh startup.sh start
```
- 手动启动
```
cd /opt/transflow
java -jar transflow-app-0.1.0.jar
```
# 访问页面
http://localhost:18987/#/mgmt/job