diff --git a/src/main/java/tech/smartboot/redisun/Redisun.java b/src/main/java/tech/smartboot/redisun/Redisun.java index cf8d20873cf4287ce0eb77ad0b447dcbbc2226de..87ad8fe28f88988ce7c887a4a6e3c8d99bfe0a1c 100644 --- a/src/main/java/tech/smartboot/redisun/Redisun.java +++ b/src/main/java/tech/smartboot/redisun/Redisun.java @@ -23,6 +23,8 @@ import tech.smartboot.redisun.cmd.LPopCommand; import tech.smartboot.redisun.cmd.LPushCommand; import tech.smartboot.redisun.cmd.MGetCommand; import tech.smartboot.redisun.cmd.MSetCommand; +import tech.smartboot.redisun.cmd.PSubscribeCommand; +import tech.smartboot.redisun.cmd.PUnsubscribeCommand; import tech.smartboot.redisun.cmd.PublishCommand; import tech.smartboot.redisun.cmd.RPopCommand; import tech.smartboot.redisun.cmd.RPushCommand; @@ -83,6 +85,9 @@ public final class Redisun { private final BufferPagePool bufferPagePool = new BufferPagePool(Runtime.getRuntime().availableProcessors(), true); private volatile AioQuickClient currentClient; + /** + * 处理 Redis 的订阅和发布功能 + */ private volatile RedisunPubSub pubSub; /** @@ -626,7 +631,6 @@ public final class Redisun { }); } - /** * 关闭Redisun客户端,释放资源 */ @@ -1221,11 +1225,17 @@ public final class Redisun { currentClient = null; } pubSub = new RedisunPubSub(this, client); + AioSession session = client.getSession(); + RedisSession redisSession = session.getAttachment(); + redisSession.setPubSub(pubSub); } return pubSub; } synchronized void releasePubSub() { + if (pubSub == null) { + return; + } AioSession session = pubSub.getClient().getSession(); RedisSession redisSession = session.getAttachment(); redisSession.setPubSub(null); @@ -1233,6 +1243,10 @@ public final class Redisun { pubSub = null; } + /** + * 取消订阅给定的一个或多个频道 + * @param channels 要取消订阅的频道列表,如果为空则取消所有 频道订阅 + */ public void unsubscribe(String... channels) { try { if (pubSub == null) { @@ -1251,28 +1265,23 @@ public final class Redisun { /** * 订阅给定的一个或多个频道 - * 注意:一旦进入订阅状态,连接就不能用于执行其他命令,直到取消订阅 + * 注意:一个 Redisun 对象只分配一个TCP连接进行订阅 * * @param pubsub 消息回调处理类 * @param channels 要订阅的频道列表 - * @return 订阅对象 */ public void subscribe(Subscriber pubsub, String... channels) { if (channels == null || channels.length == 0) { throw new RedisunException("Channels must not be null or empty"); } - if (pubsub == null) { throw new RedisunException("Subscriber must not be null"); } - try { RedisunPubSub redisunPubSub = redisunPubSub(); redisunPubSub.subscribe(pubsub, channels); AioSession session = redisunPubSub.getClient().getSession(); - RedisSession redisSession = session.getAttachment(); - redisSession.setPubSub(redisunPubSub); - // 执行订阅命令 + // 执行频道订阅命令 synchronized (redisunPubSub.getClient()) { new SubscribeCommand(channels).writeTo(session.writeBuffer()); } @@ -1282,4 +1291,51 @@ public final class Redisun { } } + /** + * 订阅给定的一个或多个频道的模式 + * 注意:一个 Redisun 对象只分配一个TCP连接进行订阅 + * + * @param pubsub 模式匹配消息回调处理类 + * @param patterns 要订阅的频道模式列表(支持通配符 * 和 ?) + */ + public void pSubscribe(Subscriber pubsub, String... patterns) { + if (patterns == null || patterns.length == 0) { + throw new RedisunException("Patterns must not be null or empty"); + } + if (pubsub == null) { + throw new RedisunException("PSubscribe must not be null"); + } + try { + RedisunPubSub redisunPubSub = redisunPubSub(); + redisunPubSub.pSubscribe(pubsub, patterns); + AioSession session = redisunPubSub.getClient().getSession(); + // 执行模式订阅命令 + synchronized (redisunPubSub.getClient()) { + new PSubscribeCommand(patterns).writeTo(session.writeBuffer()); + } + session.writeBuffer().flush(); + } catch (Throwable e) { + throw new RedisunException(e); + } + } + + /** + * 取消订阅给定的一个或多个频道的模式 + * @param patterns 要取消订阅的模式列表,如果为空则取消所有 模式订阅 + */ + public void pUnsubscribe(String... patterns) { + try { + if (pubSub == null) { + return; + } + AioSession session = pubSub.getClient().getSession(); + // 执行订阅命令 + synchronized (pubSub.getClient()) { + new PUnsubscribeCommand(patterns).writeTo(session.writeBuffer()); + } + session.writeBuffer().flush(); + } catch (Throwable e) { + throw new RedisunException(e); + } + } } diff --git a/src/main/java/tech/smartboot/redisun/RedisunPubSub.java b/src/main/java/tech/smartboot/redisun/RedisunPubSub.java index a5d43099ad17994bbac4d26295ef8c40448d70fd..bb80e7ca807ae848179ff003ca27974dd91a85e4 100644 --- a/src/main/java/tech/smartboot/redisun/RedisunPubSub.java +++ b/src/main/java/tech/smartboot/redisun/RedisunPubSub.java @@ -18,13 +18,18 @@ import java.util.concurrent.ConcurrentHashMap; * @version v1.0 2025-12-07 */ class RedisunPubSub { - + // 频道订阅者(subscribe) private final Map subscribers = new ConcurrentHashMap<>(); private final Map pending = new ConcurrentHashMap<>(); + // 模式订阅者(psubscribe)【glob 风格的正则表达式订阅】 + private final Map pSubscribers = new ConcurrentHashMap<>(); + private final Map pPending = new ConcurrentHashMap<>(); + + // 一个 Redisun 对象,对应一个 RedisunPubSub 管理对象,绑定一个 AioQuickClient 客户端对象 private final Redisun redisun; private final AioQuickClient client; - private boolean close = false; - private boolean subscribed = false; + private volatile boolean close = false; + private volatile boolean subscribed = false; public RedisunPubSub(Redisun redisun, AioQuickClient client) { this.redisun = redisun; @@ -41,6 +46,12 @@ class RedisunPubSub { } } + void pSubscribe(Subscriber pubsub, String... patterns) { + for (String pattern : patterns) { + pPending.put(pattern, pubsub); + } + } + /** * 处理来自服务端的订阅/取消订阅消息 */ @@ -59,7 +70,9 @@ class RedisunPubSub { System.err.println("Invalid message type"); return; } - subscribed = true; + if (! subscribed) { + subscribed = true; + } String messageType = ((BulkStrings) messageTypeResp).getValue(); // 接收来自服务器的发布消息 @@ -67,9 +80,19 @@ class RedisunPubSub { handleMessagePush(arrays); return; } + // 接收来自服务器的模式订阅消息 + if ("pmessage".equals(messageType)) { + handlePMessagePush(arrays); + return; + } // 订阅: 订阅成功 或者 取消订阅成功 if ("subscribe".equals(messageType) || "unsubscribe".equals(messageType)) { handleSubscriptionConfirmation(messageType, arrays); + return; + } + // 模式订阅: 模式订阅成功 或者 取消模式订阅成功 + if ("psubscribe".equals(messageType) || "punsubscribe".equals(messageType)) { + handlePatternSubscriptionConfirmation(messageType, arrays); } } @@ -79,41 +102,44 @@ class RedisunPubSub { } // 通知所有订阅者发生错误 - for (Subscriber subscriber : subscribers.values()) { - try { - subscriber.onError(ex); - } catch (Exception e) { - // 忽略订阅者内部错误 - } - } - - for (Subscriber subscriber : pending.values()) { - try { - subscriber.onError(ex); - } catch (Exception e) { - // 忽略订阅者内部错误 - } - } - + subscribers.values().forEach(v -> subscriberOnError(ex, v)); + pending.values().forEach(v -> subscriberOnError(ex, v)); + pSubscribers.values().forEach(v -> subscriberOnError(ex, v)); + pPending.values().forEach(v -> subscriberOnError(ex, v)); + + // 组合旧回调对象的订阅频道 Map> oldSubscribers = new ConcurrentHashMap<>(); - for (Map.Entry entry : subscribers.entrySet()) { - Set keys = oldSubscribers.computeIfAbsent(entry.getValue(), k -> new HashSet<>()); - keys.add(entry.getKey()); - } - for (Map.Entry entry : pending.entrySet()) { - Set keys = oldSubscribers.computeIfAbsent(entry.getValue(), k -> new HashSet<>()); - keys.add(entry.getKey()); - } - // 重新订阅 + extractedHandle(oldSubscribers, subscribers); + extractedHandle(oldSubscribers, pending); + Map> pOldSubscribers = new ConcurrentHashMap<>(); + extractedHandle(pOldSubscribers, pSubscribers); + extractedHandle(pOldSubscribers, pPending); + + // 释放旧订阅资源 redisun.releasePubSub(); - for (Map.Entry> entry : oldSubscribers.entrySet()) { - redisun.subscribe(entry.getKey(), entry.getValue().toArray(new String[0])); + // 重新订阅 + oldSubscribers.forEach((key, value) + -> redisun.subscribe(key, value.toArray(new String[0]))); + pOldSubscribers.forEach((key, value) + -> redisun.pSubscribe(key, value.toArray(new String[0]))); + } + + private void extractedHandle(Map> pack, Map source) { + source.forEach((key, value) + -> pack.computeIfAbsent(value, k -> new HashSet<>()).add(key)); + } + + private void subscriberOnError(Throwable ex, Subscriber subscriber) { + try { + subscriber.onError(ex); + } catch (Exception ignored) { + // 忽略订阅者内部错误 } } /** - * 处理消息推送 + * 处理频道订阅消息推送 */ private void handleMessagePush(Arrays arrays) { RESP channelResp = arrays.getValue().get(1); @@ -135,7 +161,35 @@ class RedisunPubSub { } /** - * 处理订阅/取消订阅确认消息 + * 处理模式订阅消息推送 + */ + private void handlePMessagePush(Arrays arrays) { + if (arrays.getValue().size() < 4) { + System.err.println("Invalid pmessage format"); + return; + } + RESP patternResp = arrays.getValue().get(1); + RESP channelResp = arrays.getValue().get(2); + RESP messageResp = arrays.getValue().get(3); + if (!(patternResp instanceof BulkStrings) || !(channelResp instanceof BulkStrings) || !(messageResp instanceof BulkStrings)) { + return; + } + String pattern = ((BulkStrings) patternResp).getValue(); + String channel = ((BulkStrings) channelResp).getValue(); + String message = ((BulkStrings) messageResp).getValue(); + // 调用模式订阅回调 + try { + Subscriber subscriber = pSubscribers.get(pattern); + if (subscriber != null) { + subscriber.onPMessage(pattern, channel, message); + } + } catch (Exception e) { + System.err.println("Error handling pmessage push: " + e.getMessage()); + } + } + + /** + * 处理频道订阅/取消订阅确认消息 */ private void handleSubscriptionConfirmation(String messageType, Arrays arrays) { // 需要从订阅列表中移除相应的频道 @@ -175,9 +229,58 @@ class RedisunPubSub { } } // 如果订阅列表为空,则释放订阅资源 - if (subscribers.isEmpty() && pending.isEmpty()) { - redisun.releasePubSub(); + releasePubSub(); + } + } + + /** + * 处理模式订阅/取消模式订阅确认消息 + */ + private void handlePatternSubscriptionConfirmation(String messageType, Arrays arrays) { + RESP patternResp = arrays.getValue().get(1); + if (!(patternResp instanceof BulkStrings)) { + return; + } + String pattern = ((BulkStrings) patternResp).getValue(); + if ("psubscribe".equals(messageType)){ + Subscriber subscriber = pPending.remove(pattern); + if (subscriber != null) { + pSubscribers.put(pattern, subscriber); + try { + subscriber.onPSubscribe(pattern); + } catch (Exception e) { + System.err.println("Error in onPSubscribe callback: " + e.getMessage()); + } } + return; + } + if ("punsubscribe".equals(messageType)) { + Subscriber subscriber = pSubscribers.remove(pattern); + if (subscriber != null) { + try { + subscriber.onUnsubscribe(pattern); + } catch (Exception e) { + System.err.println("Error in onUnsubscribe callback: " + e.getMessage()); + } + } + + subscriber = pPending.remove(pattern); + if (subscriber != null) { + try { + subscriber.onPUnsubscribe(pattern); + } catch (Exception e) { + System.err.println("Error in onPUnsubscribe callback: " + e.getMessage()); + } + } + releasePubSub(); + } + } + + private void releasePubSub() { + // 如果订阅列表为空,则释放订阅资源 + if (subscribers.isEmpty() && pending.isEmpty() + && pSubscribers.isEmpty() && pPending.isEmpty()) { + redisun.releasePubSub(); } } @@ -192,6 +295,8 @@ class RedisunPubSub { close = true; subscribers.clear(); pending.clear(); + pSubscribers.clear(); + pPending.clear(); redisun.releasePubSub(); } diff --git a/src/main/java/tech/smartboot/redisun/Subscriber.java b/src/main/java/tech/smartboot/redisun/Subscriber.java index 3b944e0e9e58ac02dd99636c0b0e395fc58708b7..03e5f721c131a7783344733734659178692be305 100644 --- a/src/main/java/tech/smartboot/redisun/Subscriber.java +++ b/src/main/java/tech/smartboot/redisun/Subscriber.java @@ -36,4 +36,31 @@ public interface Subscriber { */ default void onError(Throwable throwable) { } -} \ No newline at end of file + + /** + * 接收到模式订阅消息的回调方法 + * + * @param pattern 模式 + * @param channel 频道名称 + * @param message 订阅消息 + */ + default void onPMessage(String pattern, String channel, String message){ + onMessage(channel, message); + } + + /** + * 接收到模式订阅确认消息的回调方法 + * + * @param pattern 模式 + */ + default void onPSubscribe(String pattern) { + } + + /** + * 接收到取消模式订阅确认消息的回调方法 + * + * @param pattern 模式 + */ + default void onPUnsubscribe(String pattern) { + } +} diff --git a/src/main/java/tech/smartboot/redisun/cmd/PSubscribeCommand.java b/src/main/java/tech/smartboot/redisun/cmd/PSubscribeCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..e48f5d24e7a2b71da1a26ed8bf6f93149358ba78 --- /dev/null +++ b/src/main/java/tech/smartboot/redisun/cmd/PSubscribeCommand.java @@ -0,0 +1,40 @@ +package tech.smartboot.redisun.cmd; + +import tech.smartboot.redisun.Command; +import tech.smartboot.redisun.resp.BulkStrings; +import tech.smartboot.redisun.resp.RESP; + +import java.util.ArrayList; +import java.util.List; + +/** + * 订阅一个或多个符合给定模式的频道。 + * Available since: Redis Open Source 2.0.0 + * + * @author dufuzhong + * @version v1.0 2025-12-09 + * @see Redis PSUBSCRIBE Command + */ +public class PSubscribeCommand extends Command { + private static final BulkStrings CMD_PSUBSCRIBE = BulkStrings.of("PSUBSCRIBE"); + private final String[] patterns; + + /** + * 创建 PSUBSCRIBE 命令 + * + * @param patterns 要订阅的频道模式列表(支持通配符 * 和 ?) + */ + public PSubscribeCommand(String... patterns) { + this.patterns = patterns; + } + + @Override + protected List buildParams() { + List param = new ArrayList<>(patterns.length + 1); + param.add(CMD_PSUBSCRIBE); + for (String pattern : patterns) { + param.add(RESP.ofString(pattern)); + } + return param; + } +} diff --git a/src/main/java/tech/smartboot/redisun/cmd/PUnsubscribeCommand.java b/src/main/java/tech/smartboot/redisun/cmd/PUnsubscribeCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..52f5150e065c79fe98fbc78d624f69ee5f5846d9 --- /dev/null +++ b/src/main/java/tech/smartboot/redisun/cmd/PUnsubscribeCommand.java @@ -0,0 +1,41 @@ +package tech.smartboot.redisun.cmd; + +import tech.smartboot.redisun.Command; +import tech.smartboot.redisun.resp.BulkStrings; +import tech.smartboot.redisun.resp.RESP; + +import java.util.ArrayList; +import java.util.List; + +/** + * 取消订阅所有给定模式的频道 + * Available since: Redis Open Source 2.0.0 + * + * @author dufuzhong + * @version v1.0 2025-12-09 + * @see Redis PUNSUBSCRIBE Command + */ +public class PUnsubscribeCommand extends Command { + private static final BulkStrings CMD_PUNSUBSCRIBE = BulkStrings.of("PUNSUBSCRIBE"); + private final String[] patterns; + + /** + * 创建 PUNSUBSCRIBE 命令 + * 如果不指定模式,则取消订阅所有模式 + * + * @param patterns 要取消订阅的模式列表,如果为空则取消所有模式订阅 + */ + public PUnsubscribeCommand(String... patterns) { + this.patterns = patterns != null ? patterns : new String[0]; + } + + @Override + protected List buildParams() { + List param = new ArrayList<>(patterns.length + 1); + param.add(CMD_PUNSUBSCRIBE); + for (String pattern : patterns) { + param.add(RESP.ofString(pattern)); + } + return param; + } +} diff --git a/src/test/java/tech/smartboot/redisun/test/RedisunTest.java b/src/test/java/tech/smartboot/redisun/test/RedisunTest.java index a7d137a4d575894632c0a8cbc55e720574830076..278d2e64342a32361c9fc0b65e0a4a4577e9b2bf 100644 --- a/src/test/java/tech/smartboot/redisun/test/RedisunTest.java +++ b/src/test/java/tech/smartboot/redisun/test/RedisunTest.java @@ -1380,15 +1380,15 @@ public class RedisunTest { // 等待订阅建立 Thread.sleep(1000); - + Assert.assertTrue("onSubscribe should be called", subscribed.get()); // 取消订阅 redisunSubscriber.unsubscribe(channel); Thread.sleep(1000); - + Assert.assertTrue("onUnsubscribe should be called", unsubscribed.get()); - + redisunSubscriber.close(); } @@ -1415,18 +1415,18 @@ public class RedisunTest { // 创建一个会断开连接的订阅者来触发错误回调 Redisun redisunSubscriber = Redisun.create(opt -> opt.debug(true).setAddress("127.0.0.1:6379")); redisunSubscriber.subscribe(subscriber, channel); - + // 等待订阅建立 Thread.sleep(1000); - + // 模拟连接中断(这可能不会总是触发错误回调,取决于具体实现) // 我们只是验证框架能够处理这种情形 - + redisunSubscriber.close(); - + // 等待可能的错误回调 Thread.sleep(1000); - + // 错误回调可能不会被触发,因为我们正常关闭了连接 // 但我们至少验证了代码路径是可行的 System.out.println("Error callback test completed"); @@ -1510,4 +1510,33 @@ public class RedisunTest { redisunSubscriber.close(); } + /** + * 模式订阅的测试用例,取消订阅 + */ + @Test + public void testPUnsubscribe() throws InterruptedException { + System.out.println("\n=== 测试模式订阅 ==="); + final StringBuilder receivedChannel = new StringBuilder(); + redisun.pSubscribe((channel, message) -> { + receivedChannel.append(channel); + System.out.println("订阅消息:" + channel + message); + }, + "channel:*"); + Thread.sleep(2000); + + String channel = "channel:123"; + int publish = redisun.publish(channel, "你好"); + Assert.assertEquals(1, publish); + Thread.sleep(1000); + + System.out.println("=== 测试取消模式订阅 ==="); + redisun.pUnsubscribe("channel:*"); + Thread.sleep(1000); + + publish = redisun.publish("channel:456", "你好"); + Assert.assertEquals(0, publish); + Assert.assertEquals(channel, receivedChannel.toString()); + System.out.println("=== 测试总结 ==="); + System.out.println("模式订阅测试通过!"); + } }