介绍
组件基于Redisson实现的队列,队列数据持久化到Redis中,另外支持延时队列,
仓库地址:https://github.com/githubwyj/java-plugins/tree/main/spring-redisson-queue-starter
具体使用参考下方说明。
使用
1. 引入依赖
<dependency>
<groupId>cn.mnjblog</groupId>
<artifactId>spring-redisson-queue-starter</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
2. application.yml 配置
# 配置redis连接信息
spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
password: 123456
3. 定义队列枚举
需要定义队列名称和队列执行器
import cn.mnjblog.common.utils.SpringContextHolder;
import cn.mnjblog.redisson.queue.executor.QueueExecutor;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum QueueEnum {
/**
* 测试队列
*/
TEST("TEST_QUEUE", "测试队列", "testQueueExecutor",false),
/**
* 测试延时队列
*/
TEST_QUEUE("TEST_DELAY_QUEUE", "测试延时队列", "testQueueExecutor",true);
/**
* 队列名称
*/
private final String queueName;
/**
* 队列描述
*/
private final String queueDesc;
/**
* 执行器
*/
private final String executorBean;
/**
* 是否延时队列
*/
private final boolean isDelayQueue;
public <T> QueueExecutor<T> getExecutor() {
return SpringContextHolder.getBean(getExecutorBean());
}
我这里定义了一般队列和延时队列,不过执行器都用的同一个
4. 定义队列执行器
@Slf4j
@Component
@RequiredArgsConstructor
public class TestQueueExecutor implements QueueExecutor<String> {
@Override
public void execute(String text) {
log.info("获取到消息:{}", text);
}
}
5. 服务启动初始化队列
@Slf4j
@Configuration
@RequiredArgsConstructor
public class QueueConfiguration implements ApplicationListener<ApplicationStartedEvent> {
private final RedissonDelayQueueHandler redissonDelayQueueHandler;
private final RedissonQueueHandler redissonQueueHandler;
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
startRedissonQueueListen(QueueEnum.values());
}
/**
* 监听队列
*
* @param queueEnums
*/
public void startRedissonQueueListen(QueueEnum[] queueEnums) {
for (QueueEnum queueEnum : queueEnums) {
GlobalThreadPool.execute(() -> {
while (true) {
if (queueEnum.isDelayQueue()) {
Optional.ofNullable(redissonDelayQueueHandler.getQueueMessage(queueEnum.getQueueName()))
.ifPresent(o -> queueEnum.getExecutor().execute(o));
} else {
Optional.ofNullable(redissonQueueHandler.getQueueMessage(queueEnum.getQueueName()))
.ifPresent(o -> queueEnum.getExecutor().execute(o));
}
}
});
}
log.info("Redis 延迟队列启动成功");
}
}
6. 加入队列
@Slf4j
@RestController
@RequestMapping("/queue")
@AllArgsConstructor
public class QueueTestController {
public final RedissonDelayQueueHandler redissonDelayQueueHandler;
public final RedissonQueueHandler redissonQueueHandler;
/**
* 延时队列测试
*
* @param delay
* @return
*/
@PostMapping("/{delay}/{message}")
public String pushDelayQueue(@PathVariable("delay") Long delay, @PathVariable("message") String message) {
log.info("收到延时队列信息,{},{}", delay, message);
redissonDelayQueueHandler.addDelayQueueMessage(DelayQueueMessage.builder().delay(delay).queue(QueueEnum.TEST_QUEUE.getQueueName()).message(message).build());
return "ok";
}
/**
* 队列测试
*
* @param message
* @return
*/
@PostMapping("/{message}")
public String pushQueue(@PathVariable("message") String message) {
log.info("收到队列信息,{}", message);
redissonQueueHandler.addQueueMessage(QueueMessage.builder().queue(QueueEnum.TEST.getQueueName()).message(message).build());
return "ok";
}
}
注意延时队列消息的delay参数单位是秒
7. 测试一下
一般队列
curl --location --request POST 'http://localhost:8081/queue/helloworld'
日志如下:
2023-05-23 16:06:15.493 [http-nio-8081-exec-1] INFO [TestController:350] - 收到队列信息,helloworld
2023-05-23 16:06:15.504 [pool-2-thread-3] INFO [TestQueueExecutor:23] - 获取到消息:helloworld
延时队列
curl --location --request POST 'http://localhost:8081/queue/10/helloworld'
日志如下:
2023-05-23 16:10:09.608 [http-nio-8081-exec-5] INFO [TestController:337] - 收到延时队列信息,10,helloworld
2023-05-23 16:10:19.637 [pool-2-thread-4] INFO [TestQueueExecutor:23] - 获取到消息:helloworld
通过日志可以看到,10秒后进行了消息消费。
注意事项
延时队列消息的delay参数单位是秒
评论区