leftPush消息入隊,rightPop對應,消息出隊。
rightPop(RedisConstant.MQ_LIST, 0L, TimeUnit.SECONDS)阻塞出隊,0表示永久阻塞
@Service
public class RedisService {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public Object publish() {OrderDTO dto = new OrderDTO();dto.setId(1);dto.setCreateTime(new Date());dto.setMoney("12.34");dto.setOrderNo("orderNo1");String s = JSON.toJSONString(dto);ListOperations<String, String> listOperations = redisTemplate.opsForList();//leftPush和rightPop對應,左邊入隊,右邊出隊listOperations.leftPush(RedisConstant.MQ_LIST, s);//因為出隊是阻塞讀取的,所以上一步入隊后,數據立刻就被驅走了,下一步size=0Long size = listOperations.size(RedisConstant.MQ_LIST);List<String> list = new ArrayList<>();if (size != null && size > 0) {list = listOperations.range(RedisConstant.MQ_LIST, 0, size - 1);}return list;}
}
測試
@RestController
@RequestMapping("redisList")
public class RedisListController {@Autowiredprivate RedisService redisService;@GetMapping("publish")public Object publish() {return redisService.publish();}
}
@Component
public class RedisConsumeTask {@Autowiredprivate RedisService redisService;@TaskLock(RedisConstant.CONSUME_REDIS_LIST)@Scheduled(cron = "0/10 * * * * ?")public void consumeMqList() {redisService.consumeMqList();}
}@Service
@Slf4j
public class RedisService {@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void consumeMqList() {ListOperations<String, String> listOperations = redisTemplate.opsForList();//0時間,表示阻塞永久//待機一小時后,再次發消息,消費不了了,阻塞有問題啊。還得輪尋啊//String s = listOperations.rightPop(RedisConstant.MQ_LIST, 0L, TimeUnit.SECONDS);String s = listOperations.rightPop(RedisConstant.MQ_LIST);if (s == null) {return;}log.info("{} = {}", RedisConstant.MQ_LIST, s);OrderDTO dto = JSON.parseObject(s, OrderDTO.class);log.info("dto = {}", dto);}
}
@Component
@Aspect
public class TaskLockAop {@Autowiredprivate RedisLockRegistry redisLockRegistry;@Around("execution(@TaskLock * * (..))")public Object taskAround(ProceedingJoinPoint pjp) throws Throwable {TaskLock taskAnnotation = ((MethodSignature)pjp.getSignature()).getMethod().getAnnotation(TaskLock.class);String lockKey = taskAnnotation.value();Lock lock = redisLockRegistry.obtain(lockKey);try {lock.tryLock(30L, TimeUnit.SECONDS);System.out.println("任務開始, " + lockKey + ", " + new Date());return pjp.proceed();} finally {lock.unlock();System.out.println("任務結束, " + lockKey + ", " + new Date());}}}
http://localhost:9040/redisList/publish
redis的list、["{“createTime”:1574394538430,“id”:1,“money”:“12.34”,“orderNo”:“orderNo1”}"]
下面一直阻塞,任務開始了,不收到消息,永遠不會結束。
阻塞有問題,改用輪詢了。
先啟動發送消息服務,發送消息。后啟動消費消息服務,可以消費消息。這一點,比發布訂閱要穩定。
redis做數據庫,關聯項目https://github.com/mingwulipo/cloud-demo.git
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态