# NettyPusher **Repository Path**: CoderZone/NettyPusher ## Basic Information - **Project Name**: NettyPusher - **Description**: AI 生成......... - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-03-19 - **Last Updated**: 2025-03-21 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Spring Boot Netty Push SDK 一个基于Netty的高性能实时消息推送SDK,专为Spring Boot应用设计。 ## 特性 - 基于WebSocket的实时双向通信 - 支持单用户消息推送、群组消息和广播消息 - 自动心跳检测和连接管理 - 支持SSL/TLS加密 - 与Spring Boot无缝集成 - 高并发、低延迟设计 - 内置消息队列,异步处理消息 - 实时性能监控和指标收集 ## 快速开始 ### 1. 添加依赖 ```xml com.netty spring-boot-netty-push 1.0.0 ``` ### 2. 配置属性 在`application.properties`或`application.yml`中添加配置: ```properties # 启用Netty推送服务 netty.push.enabled=true # WebSocket服务器端口 netty.push.port=8888 # WebSocket路径 netty.push.websocket-path=/ws # 连接空闲超时时间(秒) netty.push.idle-timeout-seconds=180 # 心跳间隔时间(秒) netty.push.heartbeat-interval-seconds=60 # 最大帧长度 netty.push.max-frame-size=65536 # 工作线程数 netty.push.worker-threads=8 # 消息队列大小 netty.push.message-queue-size=10000 # 消息处理线程数 netty.push.message-handler-threads=4 # 是否开启SSL netty.push.ssl-enabled=false # SSL证书路径(SSL启用时需配置) # netty.push.certificate-path=/path/to/cert.pem # SSL私钥路径(SSL启用时需配置) # netty.push.private-key-path=/path/to/key.pem ``` ### 3. 使用推送服务 ```java import com.netty.push.service.PushService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; @RestController public class MessageController { @Autowired private PushService pushService; @GetMapping("/send/{userId}") public String sendToUser(@PathVariable String userId, String content) { boolean success = pushService.sendToUser(userId, content); return success ? "发送成功" : "发送失败,用户可能不在线"; } @GetMapping("/send/group/{groupId}") public String sendToGroup(@PathVariable String groupId, String content) { boolean success = pushService.sendToGroup(groupId, content); return success ? "发送成功" : "发送失败"; } @GetMapping("/broadcast") public String broadcast(String content) { pushService.broadcast(content); return "广播发送成功"; } @GetMapping("/group/add") public String addToGroup(String groupId, String userId) { pushService.addUserToGroup(groupId, userId); return "添加成功"; } @GetMapping("/online/{userId}") public boolean isUserOnline(@PathVariable String userId) { return pushService.isUserOnline(userId); } @GetMapping("/stats/online") public int getOnlineCount() { return pushService.getOnlineCount(); } } ``` ## 客户端集成 ### JavaScript客户端 ```javascript // 创建WebSocket连接 const userId = 'user123'; const ws = new WebSocket(`ws://localhost:8888/ws/${userId}`); // 连接建立时的处理 ws.onopen = function() { console.log('连接已建立'); }; // 接收消息的处理 ws.onmessage = function(event) { const message = JSON.parse(event.data); console.log('收到消息:', message); // 根据消息类型处理 switch(message.messageType) { case 'USER': console.log(`收到来自${message.senderId}的消息: ${message.content}`); break; case 'GROUP': console.log(`收到群组${message.receiverId}的消息: ${message.content}`); break; case 'BROADCAST': console.log(`收到广播消息: ${message.content}`); break; case 'HEARTBEAT': // 收到心跳消息,回复ACK sendMessage({ messageType: 'ACK', timestamp: Date.now() }); break; } }; // 发送消息 function sendMessage(message) { ws.send(JSON.stringify(message)); } // 发送用户消息 function sendToUser(receiverId, content) { sendMessage({ messageType: 'USER', receiverId: receiverId, content: content, timestamp: Date.now() }); } // 发送群组消息 function sendToGroup(groupId, content) { sendMessage({ messageType: 'GROUP', receiverId: groupId, content: content, timestamp: Date.now() }); } // 连接关闭的处理 ws.onclose = function() { console.log('连接已关闭'); }; // 连接错误的处理 ws.onerror = function(error) { console.error('WebSocket错误:', error); }; ``` ### Android客户端 使用OkHttp库实现WebSocket客户端: ```java import okhttp3.*; import okio.ByteString; import org.json.JSONObject; public class WebSocketClient { private final WebSocket webSocket; private final String userId; public WebSocketClient(String userId) { this.userId = userId; OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() .url("ws://your-server:8888/ws/" + userId) .build(); webSocket = client.newWebSocket(request, new WebSocketListener() { @Override public void onOpen(WebSocket webSocket, Response response) { System.out.println("连接已建立"); } @Override public void onMessage(WebSocket webSocket, String text) { try { JSONObject message = new JSONObject(text); System.out.println("收到消息: " + message); // 处理消息... } catch (Exception e) { e.printStackTrace(); } } @Override public void onClosed(WebSocket webSocket, int code, String reason) { System.out.println("连接已关闭"); } @Override public void onFailure(WebSocket webSocket, Throwable t, Response response) { System.err.println("WebSocket错误: " + t.getMessage()); } }); } public void sendMessage(JSONObject message) { webSocket.send(message.toString()); } public void close() { webSocket.close(1000, "正常关闭"); } } ``` ## 高级配置 ### 消息队列配置 消息队列用于异步处理消息,避免阻塞IO线程,提高系统吞吐量。 ```properties # 消息队列大小,默认10000 netty.push.message-queue-size=10000 # 消息处理线程数,默认为4 netty.push.message-handler-threads=4 ``` ### SSL配置 启用SSL可以保证WebSocket通信的安全性。 ```properties # 启用SSL netty.push.ssl-enabled=true # SSL证书路径 netty.push.certificate-path=/path/to/cert.pem # SSL私钥路径 netty.push.private-key-path=/path/to/key.pem ``` ### 性能调优 ```properties # 增加工作线程数以提高并发处理能力 netty.push.worker-threads=16 # 增加最大帧长度以支持更大的消息 netty.push.max-frame-size=131072 # 减少心跳间隔以提高连接稳定性 netty.push.heartbeat-interval-seconds=30 ``` ## 性能监控 SDK内置了性能监控功能,可以实时监控系统的运行状况。 ### 监控指标 - 消息统计:总消息数、成功消息数、失败消息数、各类型消息数 - 连接统计:总连接数、活跃连接数、峰值连接数 - 性能指标:消息编解码延迟、消息处理延迟、消息发送延迟 - 数据传输:接收字节数、发送字节数 - 错误统计:总错误数、各类型错误数 ### 获取监控数据 可以通过创建自定义端点来暴露监控数据: ```java import com.netty.push.metrics.PushMetrics; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/metrics") public class MetricsController { @GetMapping("/push") public Map getPushMetrics() { return PushMetrics.getInstance().getMetricsData(); } } ``` ## 最佳实践 ### 连接管理 - 使用唯一的用户ID作为连接标识 - 在用户登录后建立WebSocket连接 - 在用户登出时主动关闭连接 - 使用心跳机制检测连接状态 ### 消息处理 - 使用消息队列异步处理消息,避免阻塞IO线程 - 对大量消息进行批量处理 - 设置合理的消息队列大小和处理线程数 ### 安全性 - 在生产环境中启用SSL加密 - 实现消息签名和验证机制 - 限制单个用户的连接数和消息频率 ### 扩展性 - 使用集群部署多个推送服务实例 - 使用Redis等分布式系统管理用户连接和群组信息 - 使用消息中间件如Kafka实现跨实例消息分发 ## 常见问题 ### 连接断开问题 **问题**:WebSocket连接经常断开 **解决方案**: - 减少心跳间隔时间 - 增加连接空闲超时时间 - 检查网络环境,特别是代理和防火墙设置 ### 消息延迟问题 **问题**:消息发送延迟较高 **解决方案**: - 增加工作线程数 - 增加消息处理线程数 - 优化消息处理逻辑 - 使用更高性能的服务器 ### 内存占用问题 **问题**:系统内存占用过高 **解决方案**: - 减小消息队列大小 - 限制最大连接数 - 定期清理长时间不活跃的连接 - 优化消息对象,减少不必要的字段 ## 许可证 本项目采用MIT许可证。详情请参阅LICENSE文件。