# transflow **Repository Path**: ycpanda/transflow ## Basic Information - **Project Name**: transflow - **Description**: java实现 类似于 logstash 的数据流转平台,插件化 input output - **Primary Language**: Java - **License**: AGPL-3.0 - **Default Branch**: master - **Homepage**: http://sunleader1997.top:18987/#/mgmt/job - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2025-05-29 - **Last Updated**: 2025-05-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## Transflow 集成 reactor 类似于 logstash,插件化数据流转服务 ## 为什么脑子一热 - 多个产品线研发交付后,客户厂商往往需要将一些业务数据对接到第三方的系统上,定制化版本的维护(需要基于某个版本源码进行修改打包以及部署升级)耗费时间与人力成本 - 传统ELK都是通过配置文件的方式修改数据处理与链路,学习成本较高,配置文件复杂,阅读困难,配置文件维护成本也高
例如 logstash 的配置文件, 往往无法快速理解数据链路是如何构建的 ```nginx.conf input{ file{ path => "/var/log/nginx/access.log" start_position => "beginning" type => "nginx_access_log" } kafka{ bootstrap_servers => ["localhost:9092"] group_id => "logstash_group" topic => "nginx_access_log" codec => json{ support_multiple_values => true } type => "file_audit_log" } } filter{ grok{ match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"(?:-|%{DATA:referrer})\" \"%{DATA:user_agent}\" (?:%{IP:proxy}|-) %{DATA:upstream_addr} %{NUMBER:upstream_request_time:float} %{NUMBER:upstream_response_time:float}"} match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"%{DATA:referrer}\" \"%{DATA:user_agent}\" \"%{DATA:proxy}\""} } if [request] { urldecode { field => "request" } ruby { init => "@kname = ['url_path','url_arg']" code => " new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))]) event.append(new_event)" } if [url_arg] { ruby { init => "@kname = ['key', 'value']" code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})" } } } geoip{ source => "clientip" } useragent{ source => "user_agent" target => "ua" remove_field => "user_agent" } date { match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"] locale => "en" } mutate{ remove_field => ["message","timestamp","request","url_arg"] } } output{ elasticsearch { hosts => "localhost:9200" index => "nginx-access-log-%{+YYYY.MM.dd}" } if [type] == "syslog1" and [logType] == "file_audit_log"{ syslog { facility => "local7" appname => "nginx-access-log" } } } ```
- 参考调度系统的设计理念,公司各组件/服务之间的数据交互,应该尽可能低耦合,对于复杂的业务场景下,可以动态插拔组件 - ``` 例如文件传输的场景下,对于文件同步服务,仅需告诉该服务哪些文件需要同步,而对于同步之前的审批等业务,可以根据具体业务场景进行串连 ``` - 参考 NIFI 后发现性能不高,二次开发困难
NIFI ![img.png](nifi.png)
### 功能特点: - 拖拽式编辑数据流 - 脚本化数据处理 - 数据批量处理 (比方 es 批量存储) - 数据流实时监控 - 高吞吐,基于 reactor 实现高吞吐数据流 ## 演示 ![gif](20250327183942.gif) # 打包 * transflow-all 下 执行 mvn clean package, 最终成品在 transflow-app 的 target 下 .zip * 注意,会打包前端资源以及plugin,plugin放在 /plugins下,也会被打进 zip * 发布版本因为大小受限,只提供基础demo插件 # Linux 安装 ``` unzip transflow-app-0.1.0-distribution.zip -d / ``` # Linux 启动 - 通过 systemctl 启动 ``` systemctl start transflow ``` - 通过脚本启动 ``` cd /opt/transflow sh startup.sh start ``` - 手动启动 ``` cd /opt/transflow java -jar transflow-app-0.1.0.jar ``` # 访问页面 http://localhost:18987/#/mgmt/job