SpringBoot整合RabbitMQ

时间:2018-12-29 10:47:25   收藏:0   阅读:182

1 整合RabbitMQ

1.1 RabbitMQ的相关概念

}
##### 1.2.2 接收消息 * 通过注解的方式主动监听接收 > 声明我要监听哪个queue即可java
@RabbitListener(queues = "demo-user-add")
public void getMsg(String msg) throws Exception{
log.info("获取消息{}", msg);
User user = (User) JSONObject.toBean(JSONObject.fromObject(msg), User.class);
userService.addUser(user);
}
* 被动接收java
String data = (String) this.amqpTemplate.receiveAndConvert(queueName);
#### 1.3 模拟高并发取值java
@Test
public void testGetMsg() throws Exception{
ExecutorService service = Executors.newCachedThreadPool(); //创建一个线程池
final CountDownLatch beginCountDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(100);

for (int i = 0; i < 100; i++) {
    Runnable runnable = new Runnable() {
        int index = 1;
        @Override
        public void run() {
            try {
                /**
                 *如果调用对象上的await()方法,那么调用者就会一直阻塞在这里,直到别人通过cutDown方法,将计数减到0,才可以继续执行。
                 * 这里先调用beginCountDownLatch的await方法,等到循环结束后,内存中就有100个线程等待去运行。
                 * 所以等到beginCountDownLatch调用countDown的时候,100个线程就开始执行
                 */
                beginCountDownLatch.await();
                log.info("------->index:{}", index);
                String data = (String) amqpTemplate.receiveAndConvert("demo-test");
                log.info("==============>消息n内容:{}", data);

                countDownLatch.countDown();
                index++;

            } catch (Exception e) {
                e.printStackTrace();
            }


        }
    };
    service.execute(runnable);
}
//释放主线程,之前声明的100个线程就开始执行
beginCountDownLatch.countDown();
//
countDownLatch.await();

}
```

原文:https://www.cnblogs.com/KevinStark/p/10193722.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!