final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
if (wrapper == null) {
log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
return;
}
wrapper.success = true;
wrapper.latch.countDown();
}
else {
log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
}
}
//消费失败时返回
public void fail(final Object msgId) {
if (msgId instanceof Long) {
final long id = (Long) msgId;
final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
if (wrapper == null) {
log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
return;
}
wrapper.success = false;
wrapper.latch.countDown();
}
else {
log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
}
}
2.2 MetaqBolt
2.2.1 接口说明
该部分代码修改自Github上的Metaq异步生产者实例。设计这个Bolt的原因是,部分业务有这种需求,当经过storm实时处理
后,数据发送到下一个业务系统,当下一个业务系统也是从metaq拉取数据时,就需要我们把处理过的数据写入到metaq中
去,所以有了这个接口。
其读取配置文件的过程与MetaqSpout相似,但是没有组(Group)的概念,只需指定地址、目录及Topic(前提是Metaq上有
该Topic),则可以把数据写入metaq中。
2.2.1 上代码
该部分代码较简单,可以参考AsyncConsumer代码。