Hbase BufferedMutator批量写入
2024-12-30 14:28:42 # hbase # 问题汇总

Hbase BufferedMutator批量写入

hbase 的bufferedMutator功能主要是批量写入数据到hbase表中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Configuration conf =  HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "zookeeperHost");
final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
for (int i = 0; i < e.getNumExceptions(); i++) {
LOG.info("Failed to sent put " + e.getRow(i) + ".");
}
}
};
BufferedMutatorParams params = new BufferedMutatorParams(TABLE)
.listener(listener);
params.writeBufferSize(123123L);
try {
Connection conn = ConnectionFactory.createConnection(conf);
BufferedMutator mutator = conn.getBufferedMutator(params);
Put p = new Put(Bytes.toBytes("someRow"));
p.addColumn(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value"));
mutator.mutate(p);
mutator.close();
conn.close();
} catch (IOException e1) {
e1.printStackTrace();
}

这个代码就是

首先对hbase配置了初始化,然后定义了一个监听器,监听批量操作里面的异常,RetriesExhaustedWithDetailsException表示多次重试后仍然无法完成的操作

然后设置了缓冲区的大小是123123字节,当数据达到这个值的时候就会自动触发批量写入操作

然后try catch里面的就是创建连接,然后批量缓冲写入,最后关闭bufferedMutator,并且确保缓冲区中的剩余数据写入hbase