本文共 2296 字,大约阅读时间需要 7 分钟。
最近在做flume的日志收集,在用flume的时候发现一个报错.
14 Mar 2020 14:23:58,194 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.AbstractRpcSink.process:384) - Rpc Sink k1: Unable to get event from channel c1. Exception follows.org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or increasing thread count at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:95) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:351) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745)
看了一下自己的配置,发现配置里这么写的
# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = c1# Describe/configure the sourcea2.sources.r1.type = execa2.sources.r1.command = tail -F /home/logs/spring.loga2.sources.r1.shell = /bin/bash -c# Describe the sinka2.sinks.k1.type = avroa2.sinks.k1.hostname = Linux1a2.sinks.k1.port = 4141a1.sinks.k1.batch-size = 1000# Use a channel which buffers events in memorya2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = c1
查阅资料发现,是batchsize和transactionCapacity 这两个参数起了冲突.
这个sink的batchsize是什么意思呢,就是sink会一次从channel中取多少个event去发送,而这个发送是要最终以事务的形式去发送的,因此这个batchsize的event会传送到一个事务的缓存队列中(takeList),这是一个双向队列,这个队列可以在事务失败时进行回滚(也就是把取出来的数据吐memeryChannel的queue中),它的初始大小就是transactionCapacity定义的大小,源码中有: takeList = new LinkedBlockingDeque(transCapacity); 源码来自https://segmentfault.com/a/1190000003586635的分享。再看这个错误抛出的地方:
if(takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for MemoryTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + “increasing capacity, or increasing thread count”); }在上面的情况中,sink一次取1000个events,塞到takelist中,在塞了100个后,就会引发上述异常,因此,这个错误的解决办法就是:在sink中,channel的transactionCapacity参数不能小于sink的batchsize
参考:
转载地址:http://gzjkb.baihongyu.com/