Storm运维:关于nextTuple

时间:2015-11-02 10:27:18   收藏:0   阅读:1022

早上发现storm有点问题,查看日志,发现

2015-10-31T20:02:23.332+0800 STDIO [ERROR] java.lang.InterruptedException
2015-10-31T20:02:23.333+0800 STDIO [ERROR] at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
2015-10-31T20:02:23.333+0800 STDIO [ERROR] at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
2015-10-31T20:02:23.333+0800 STDIO [ERROR] at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
2015-10-31T20:02:23.333+0800 STDIO [ERROR] at com.hzw.monitor.storm.spout.ActiveMQSpout.nextTuple(ActiveMQSpout.java:45)
2015-10-31T20:02:23.333+0800 STDIO [ERROR] at backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:565)
2015-10-31T20:02:23.334+0800 STDIO [ERROR] at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463)
2015-10-31T20:02:23.334+0800 STDIO [ERROR] at clojure.lang.AFn.run(AFn.java:24)
2015-10-31T20:02:23.334+0800 STDIO [ERROR] at java.lang.Thread.run(Thread.java:745)

然后定位,就是看nextTuple函数。

我们用的是一个LinkedBlockingQueue,用了take,这个函数

 Message java.util.concurrent.LinkedBlockingQueue.take() throws InterruptedException

Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.Specified by: take() in BlockingQueueReturns:the head of this queueThrows:InterruptedException - if interrupted while waiting

这个函数会阻塞当前线程。

 

然后nextTuple函数

 /**

     * When this method is called, Storm is requesting that the Spout emit tuples to the 

     * output collector. This method should be non-blocking, so if the Spout has no tuples

     * to emit, this method should return. nextTuple, ack, and fail are all called in a tight

     * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous

     * to have nextTuple sleep for a short amount of time (like a single millisecond)

     * so as not to waste too much CPU.

     */

    void nextTuple();

 

所以,也就是说,这里不应该用take来处理。

后来通过修改为poll(),如果拿出的消息为null,则睡眠10ms.

 

原文:http://my.oschina.net/qiangzigege/blog/524727

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!