博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flume的ChannelExceptio以及memeryChannel中transactionCapacity和sink的batchsize需要注意事项
阅读量:2178 次
发布时间:2019-05-01

本文共 2296 字,大约阅读时间需要 7 分钟。

最近在做flume的日志收集,在用flume的时候发现一个报错.

14 Mar 2020 14:23:58,194 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.AbstractRpcSink.process:384)  - Rpc Sink k1: Unable to get event from channel c1. Exception follows.org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or increasing thread count	at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:95)	at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)	at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)	at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:351)	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)	at java.lang.Thread.run(Thread.java:745)

看了一下自己的配置,发现配置里这么写的

# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = c1# Describe/configure the sourcea2.sources.r1.type = execa2.sources.r1.command = tail -F /home/logs/spring.loga2.sources.r1.shell = /bin/bash -c# Describe the sinka2.sinks.k1.type = avroa2.sinks.k1.hostname = Linux1a2.sinks.k1.port = 4141a1.sinks.k1.batch-size = 1000# Use a channel which buffers events in memorya2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = c1

查阅资料发现,是batchsize和transactionCapacity 这两个参数起了冲突.

这个sink的batchsize是什么意思呢,就是sink会一次从channel中取多少个event去发送,而这个发送是要最终以事务的形式去发送的,因此这个batchsize的event会传送到一个事务的缓存队列中(takeList),这是一个双向队列,这个队列可以在事务失败时进行回滚(也就是把取出来的数据吐memeryChannel的queue中),它的初始大小就是transactionCapacity定义的大小,源码中有: takeList = new LinkedBlockingDeque(transCapacity); 源码来自https://segmentfault.com/a/1190000003586635的分享。

再看这个错误抛出的地方:

if(takeList.remainingCapacity() == 0) {

throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
“increasing capacity, or increasing thread count”);
}

在上面的情况中,sink一次取1000个events,塞到takelist中,在塞了100个后,就会引发上述异常,因此,这个错误的解决办法就是:在sink中,channel的transactionCapacity参数不能小于sink的batchsize

参考:

转载地址:http://gzjkb.baihongyu.com/

你可能感兴趣的文章
03. 交换机的Telnet远程登陆配置
查看>>
微信小程序-调用-腾讯视频-解决方案
查看>>
phpStudy安装yaf扩展
查看>>
密码 加密 加盐 常用操作记录
查看>>
TP 分页后,调用指定页。
查看>>
Oracle数据库中的(+)连接
查看>>
java-oracle中几十个实用的PL/SQL
查看>>
PLSQL常用方法汇总
查看>>
几个基本的 Sql Plus 命令 和 例子
查看>>
PLSQL单行函数和组函数详解
查看>>
Oracle PL/SQL语言初级教程之异常处理
查看>>
Oracle PL/SQL语言初级教程之游标
查看>>
Oracle PL/SQL语言初级教程之操作和控制语言
查看>>
Oracle PL/SQL语言初级教程之过程和函数
查看>>
Oracle PL/SQL语言初级教程之表和视图
查看>>
Oracle PL/SQL语言初级教程之完整性约束
查看>>
PL/SQL学习笔记
查看>>
如何分析SQL语句
查看>>
结构化查询语言(SQL)原理
查看>>
SQL教程之嵌套SELECT语句
查看>>