目 录CONTENT

文章目录

基于Redisson实现的队列组件

码农街
2024-01-30 / 0 评论 / 0 点赞 / 150 阅读 / 6546 字 / 正在检测是否收录...

介绍

组件基于Redisson实现的队列,队列数据持久化到Redis中,另外支持延时队列,

仓库地址:https://github.com/githubwyj/java-plugins/tree/main/spring-redisson-queue-starter

具体使用参考下方说明。

使用

1. 引入依赖

<dependency>
    <groupId>cn.mnjblog</groupId>
    <artifactId>spring-redisson-queue-starter</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

2. application.yml 配置

# 配置redis连接信息
spring:
 redis:
  host: 127.0.0.1
  port: 6379
  database: 0
  password: 123456

3. 定义队列枚举

需要定义队列名称和队列执行器

import cn.mnjblog.common.utils.SpringContextHolder;
import cn.mnjblog.redisson.queue.executor.QueueExecutor;
import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public enum QueueEnum {

    /**
     * 测试队列
     */
    TEST("TEST_QUEUE", "测试队列", "testQueueExecutor",false),
    /**
     * 测试延时队列
     */
    TEST_QUEUE("TEST_DELAY_QUEUE", "测试延时队列", "testQueueExecutor",true);
    /**
     * 队列名称
     */
    private final String queueName;
    
    /**
     * 队列描述
     */
    private final String queueDesc;
    
    /**
     * 执行器
     */
    private final String executorBean;
    
    /**
     * 是否延时队列
     */
    private final boolean isDelayQueue;
    
    public <T> QueueExecutor<T> getExecutor() {
        return SpringContextHolder.getBean(getExecutorBean());
    }

我这里定义了一般队列和延时队列,不过执行器都用的同一个

4. 定义队列执行器


@Slf4j
@Component
@RequiredArgsConstructor
public class TestQueueExecutor implements QueueExecutor<String> {

    @Override
    public void execute(String text) {
        log.info("获取到消息:{}", text);
    }
}

5. 服务启动初始化队列

@Slf4j
@Configuration
@RequiredArgsConstructor
public class QueueConfiguration implements ApplicationListener<ApplicationStartedEvent> {

    private final RedissonDelayQueueHandler redissonDelayQueueHandler;
    private final RedissonQueueHandler redissonQueueHandler;

    @Override
    public void onApplicationEvent(ApplicationStartedEvent event) {
        startRedissonQueueListen(QueueEnum.values());
    }

   
    /**
     * 监听队列
     *
     * @param queueEnums
     */
    public void startRedissonQueueListen(QueueEnum[] queueEnums) {
        for (QueueEnum queueEnum : queueEnums) {
            GlobalThreadPool.execute(() -> {
                while (true) {
                    if (queueEnum.isDelayQueue()) {
                        Optional.ofNullable(redissonDelayQueueHandler.getQueueMessage(queueEnum.getQueueName()))
                                .ifPresent(o -> queueEnum.getExecutor().execute(o));
                    } else {
                        Optional.ofNullable(redissonQueueHandler.getQueueMessage(queueEnum.getQueueName()))
                                .ifPresent(o -> queueEnum.getExecutor().execute(o));
                    }
                }
            });
        }
        log.info("Redis 延迟队列启动成功");
    }
}

6. 加入队列

@Slf4j
@RestController
@RequestMapping("/queue")
@AllArgsConstructor
public class QueueTestController {
    public final RedissonDelayQueueHandler redissonDelayQueueHandler;
    
    public final RedissonQueueHandler redissonQueueHandler;
    
    /**
     * 延时队列测试
     *
     * @param delay
     * @return
     */
    @PostMapping("/{delay}/{message}")
    public String pushDelayQueue(@PathVariable("delay") Long delay, @PathVariable("message") String message) {
        log.info("收到延时队列信息,{},{}", delay, message);
        redissonDelayQueueHandler.addDelayQueueMessage(DelayQueueMessage.builder().delay(delay).queue(QueueEnum.TEST_QUEUE.getQueueName()).message(message).build());
        return "ok";
    }
    
    /**
     * 队列测试
     *
     * @param message
     * @return
     */
    @PostMapping("/{message}")
    public String pushQueue(@PathVariable("message") String message) {
        log.info("收到队列信息,{}", message);
        redissonQueueHandler.addQueueMessage(QueueMessage.builder().queue(QueueEnum.TEST.getQueueName()).message(message).build());
        return "ok";
    }
}

注意延时队列消息的delay参数单位是秒

7. 测试一下

  • 一般队列

curl --location --request POST 'http://localhost:8081/queue/helloworld'

日志如下:

2023-05-23 16:06:15.493 [http-nio-8081-exec-1] INFO  [TestController:350] - 收到队列信息,helloworld
2023-05-23 16:06:15.504 [pool-2-thread-3] INFO  [TestQueueExecutor:23] - 获取到消息:helloworld
  • 延时队列

curl --location --request POST 'http://localhost:8081/queue/10/helloworld'

日志如下:

2023-05-23 16:10:09.608 [http-nio-8081-exec-5] INFO  [TestController:337] - 收到延时队列信息,10,helloworld
2023-05-23 16:10:19.637 [pool-2-thread-4] INFO  [TestQueueExecutor:23] - 获取到消息:helloworld

通过日志可以看到,10秒后进行了消息消费。

注意事项

  1. 延时队列消息的delay参数单位是秒

0

评论区