# NettyPusher
**Repository Path**: CoderZone/NettyPusher
## Basic Information
- **Project Name**: NettyPusher
- **Description**: AI 生成.........
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-03-19
- **Last Updated**: 2025-03-21
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Spring Boot Netty Push SDK
一个基于Netty的高性能实时消息推送SDK,专为Spring Boot应用设计。
## 特性
- 基于WebSocket的实时双向通信
- 支持单用户消息推送、群组消息和广播消息
- 自动心跳检测和连接管理
- 支持SSL/TLS加密
- 与Spring Boot无缝集成
- 高并发、低延迟设计
- 内置消息队列,异步处理消息
- 实时性能监控和指标收集
## 快速开始
### 1. 添加依赖
```xml
com.netty
spring-boot-netty-push
1.0.0
```
### 2. 配置属性
在`application.properties`或`application.yml`中添加配置:
```properties
# 启用Netty推送服务
netty.push.enabled=true
# WebSocket服务器端口
netty.push.port=8888
# WebSocket路径
netty.push.websocket-path=/ws
# 连接空闲超时时间(秒)
netty.push.idle-timeout-seconds=180
# 心跳间隔时间(秒)
netty.push.heartbeat-interval-seconds=60
# 最大帧长度
netty.push.max-frame-size=65536
# 工作线程数
netty.push.worker-threads=8
# 消息队列大小
netty.push.message-queue-size=10000
# 消息处理线程数
netty.push.message-handler-threads=4
# 是否开启SSL
netty.push.ssl-enabled=false
# SSL证书路径(SSL启用时需配置)
# netty.push.certificate-path=/path/to/cert.pem
# SSL私钥路径(SSL启用时需配置)
# netty.push.private-key-path=/path/to/key.pem
```
### 3. 使用推送服务
```java
import com.netty.push.service.PushService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
public class MessageController {
@Autowired
private PushService pushService;
@GetMapping("/send/{userId}")
public String sendToUser(@PathVariable String userId, String content) {
boolean success = pushService.sendToUser(userId, content);
return success ? "发送成功" : "发送失败,用户可能不在线";
}
@GetMapping("/send/group/{groupId}")
public String sendToGroup(@PathVariable String groupId, String content) {
boolean success = pushService.sendToGroup(groupId, content);
return success ? "发送成功" : "发送失败";
}
@GetMapping("/broadcast")
public String broadcast(String content) {
pushService.broadcast(content);
return "广播发送成功";
}
@GetMapping("/group/add")
public String addToGroup(String groupId, String userId) {
pushService.addUserToGroup(groupId, userId);
return "添加成功";
}
@GetMapping("/online/{userId}")
public boolean isUserOnline(@PathVariable String userId) {
return pushService.isUserOnline(userId);
}
@GetMapping("/stats/online")
public int getOnlineCount() {
return pushService.getOnlineCount();
}
}
```
## 客户端集成
### JavaScript客户端
```javascript
// 创建WebSocket连接
const userId = 'user123';
const ws = new WebSocket(`ws://localhost:8888/ws/${userId}`);
// 连接建立时的处理
ws.onopen = function() {
console.log('连接已建立');
};
// 接收消息的处理
ws.onmessage = function(event) {
const message = JSON.parse(event.data);
console.log('收到消息:', message);
// 根据消息类型处理
switch(message.messageType) {
case 'USER':
console.log(`收到来自${message.senderId}的消息: ${message.content}`);
break;
case 'GROUP':
console.log(`收到群组${message.receiverId}的消息: ${message.content}`);
break;
case 'BROADCAST':
console.log(`收到广播消息: ${message.content}`);
break;
case 'HEARTBEAT':
// 收到心跳消息,回复ACK
sendMessage({
messageType: 'ACK',
timestamp: Date.now()
});
break;
}
};
// 发送消息
function sendMessage(message) {
ws.send(JSON.stringify(message));
}
// 发送用户消息
function sendToUser(receiverId, content) {
sendMessage({
messageType: 'USER',
receiverId: receiverId,
content: content,
timestamp: Date.now()
});
}
// 发送群组消息
function sendToGroup(groupId, content) {
sendMessage({
messageType: 'GROUP',
receiverId: groupId,
content: content,
timestamp: Date.now()
});
}
// 连接关闭的处理
ws.onclose = function() {
console.log('连接已关闭');
};
// 连接错误的处理
ws.onerror = function(error) {
console.error('WebSocket错误:', error);
};
```
### Android客户端
使用OkHttp库实现WebSocket客户端:
```java
import okhttp3.*;
import okio.ByteString;
import org.json.JSONObject;
public class WebSocketClient {
private final WebSocket webSocket;
private final String userId;
public WebSocketClient(String userId) {
this.userId = userId;
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("ws://your-server:8888/ws/" + userId)
.build();
webSocket = client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
System.out.println("连接已建立");
}
@Override
public void onMessage(WebSocket webSocket, String text) {
try {
JSONObject message = new JSONObject(text);
System.out.println("收到消息: " + message);
// 处理消息...
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
System.out.println("连接已关闭");
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
System.err.println("WebSocket错误: " + t.getMessage());
}
});
}
public void sendMessage(JSONObject message) {
webSocket.send(message.toString());
}
public void close() {
webSocket.close(1000, "正常关闭");
}
}
```
## 高级配置
### 消息队列配置
消息队列用于异步处理消息,避免阻塞IO线程,提高系统吞吐量。
```properties
# 消息队列大小,默认10000
netty.push.message-queue-size=10000
# 消息处理线程数,默认为4
netty.push.message-handler-threads=4
```
### SSL配置
启用SSL可以保证WebSocket通信的安全性。
```properties
# 启用SSL
netty.push.ssl-enabled=true
# SSL证书路径
netty.push.certificate-path=/path/to/cert.pem
# SSL私钥路径
netty.push.private-key-path=/path/to/key.pem
```
### 性能调优
```properties
# 增加工作线程数以提高并发处理能力
netty.push.worker-threads=16
# 增加最大帧长度以支持更大的消息
netty.push.max-frame-size=131072
# 减少心跳间隔以提高连接稳定性
netty.push.heartbeat-interval-seconds=30
```
## 性能监控
SDK内置了性能监控功能,可以实时监控系统的运行状况。
### 监控指标
- 消息统计:总消息数、成功消息数、失败消息数、各类型消息数
- 连接统计:总连接数、活跃连接数、峰值连接数
- 性能指标:消息编解码延迟、消息处理延迟、消息发送延迟
- 数据传输:接收字节数、发送字节数
- 错误统计:总错误数、各类型错误数
### 获取监控数据
可以通过创建自定义端点来暴露监控数据:
```java
import com.netty.push.metrics.PushMetrics;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/metrics")
public class MetricsController {
@GetMapping("/push")
public Map getPushMetrics() {
return PushMetrics.getInstance().getMetricsData();
}
}
```
## 最佳实践
### 连接管理
- 使用唯一的用户ID作为连接标识
- 在用户登录后建立WebSocket连接
- 在用户登出时主动关闭连接
- 使用心跳机制检测连接状态
### 消息处理
- 使用消息队列异步处理消息,避免阻塞IO线程
- 对大量消息进行批量处理
- 设置合理的消息队列大小和处理线程数
### 安全性
- 在生产环境中启用SSL加密
- 实现消息签名和验证机制
- 限制单个用户的连接数和消息频率
### 扩展性
- 使用集群部署多个推送服务实例
- 使用Redis等分布式系统管理用户连接和群组信息
- 使用消息中间件如Kafka实现跨实例消息分发
## 常见问题
### 连接断开问题
**问题**:WebSocket连接经常断开
**解决方案**:
- 减少心跳间隔时间
- 增加连接空闲超时时间
- 检查网络环境,特别是代理和防火墙设置
### 消息延迟问题
**问题**:消息发送延迟较高
**解决方案**:
- 增加工作线程数
- 增加消息处理线程数
- 优化消息处理逻辑
- 使用更高性能的服务器
### 内存占用问题
**问题**:系统内存占用过高
**解决方案**:
- 减小消息队列大小
- 限制最大连接数
- 定期清理长时间不活跃的连接
- 优化消息对象,减少不必要的字段
## 许可证
本项目采用MIT许可证。详情请参阅LICENSE文件。