欢迎投稿

今日深度:

HBase之Filter,

HBase之Filter,


Filter的作用是谓词下推,就是在Scan查询数据时,将过滤数据的操作放到服务端进行,减少数据的传输,减少网络IO。

介绍Filter使用方法的文章很多,就不再赘述了,主要记录下如何自定义Filter。

解析

在一次Scan的过程中,Filter存在于2个地方:

RegionScannerImpl中调用Filter的方法:

ScanQueryMatcher中调用Filter的方法:

Filter还有2个方法,用于RPC时,将Filter发送到服务端

Protobuf安装

既然要发送实例到服务端,就需要Protobuf了。HBase1.3.2版本使用的是protobuf2.5.0,所以这里装个2.5.0。其他版本在Git Release上找下,安装流程是一样的。2.x和3.x生成的Java代码并不兼容,已有3.x也按照下面步骤执行就行。

基本示例

先写个Filter.proto文件,版本proto2。外部类名字不能和内部类一样,名就取个Filter。

syntax = "proto2";

option java_package = "cn.edu.bupt.hbase.protobuf.generated";
option java_outer_classname = "Filter";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message AnFilter {
}

执行protoc编译命令,生成Protobuf版的Filter类。

protoc --proto_path=src/main/protobuf/ --java_out=src/main/java/ src/main/protobuf/Filter.proto

写个Java的Filter类,继承FilterBase,引用前面生成的Filter.AnFilter。

public class AnFilter extends FilterBase {

    @Override
    public ReturnCode filterKeyValue(Cell cell) throws IOException {
        // 默认值
        return null;
    }

    public byte[] toByteArray() {
        // 使用Filter.AnFilter
        Filter.AnFilter.Builder builder = Filter.AnFilter.newBuilder();
        return builder.build().toByteArray();
    }

    public static AnFilter parseFrom(byte[] pbBytes) throws DeserializationException {
        // 这里暂时不解析了,直接new一个
        return new AnFilter();
    }
}

把代码打包,放入HBase的classpath里,重启HBase。

在ProtobufUtil.toFilter打个断点,客户端执行Scan代码,即进入断点,看到成功在Server端生成了Filter。

进阶示例

让我们写个a+b=c的Filter,并且只返回一行。随便放些数据进去

f:a f:b f:v
row0 value0
row1 1 1
row2 2 2 value2
row3 3 value3
row4 4
row5 5 5 value5
row6 4 6 value6

更改Filter.proto,增加一个变量c,这个变量是要传给服务端的

message AnFilter {
    required int32 c = 1;
}

重新执行protoc编译命令,编译生成Filter。

修改Java的AnFilter,也增加上变量c,修改toByteArray把c设置进去、parseFrom再从byte[]里把c读出来。

public class AnFilter extends FilterBase {
    private int c;
    
    public AnFilter(int c) {
        this.c = c;
    }
    
    public byte[] toByteArray() {
        Filter.AnFilter.Builder builder = Filter.AnFilter.newBuilder();
        builder.setC(c);
        return builder.build().toByteArray();
    }

    public static AnFilter parseFrom(byte[] pbBytes) throws DeserializationException {
        Filter.AnFilter filter;
        try {
            filter = Filter.AnFilter.parseFrom(pbBytes);
        } catch (InvalidProtocolBufferException e) {
            throw new DeserializationException(e);
        }
        return new AnFilter(filter.getC());
    }

首先要找到a+b=c的行,找到qualifier是a,记录下值;找到qualifier是b,记录下值。a、b都找到了,判断下和是不是等于c,等于就返回ReturnCode.INCLUDE,把当前行找齐;否则跳过,返回ReturnCode.NEXT_ROW,继续找下一行。如果这一行还没找到a、b,还是得先返回ReturnCode.INCLUDE,不然这列就不会包含进去了。

private int sum;

private boolean hasA;
private boolean hasB;
private boolean foundC;

@Override
public ReturnCode filterKeyValue(Cell cell) throws IOException {
    String qualifier = Bytes.toString(cell.getQualifierArray(), 
                                      cell.getQualifierOffset(),
                                      cell.getQualifierLength());
    if ("a".equals(qualifier)) {
        sum += Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), 
                           cell.getValueLength());
        hasA = true;
    } else if ("b".equals(qualifier)) {
        sum += Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), 
                           cell.getValueLength());
        hasB = true;
    }
    if (hasA && hasB) {
        if (sum == c) {
            foundC = true;
            return ReturnCode.INCLUDE;
        } else {
            return ReturnCode.NEXT_ROW;
        }
    }
    return ReturnCode.INCLUDE;
}

那列都包含进去了,如果后面发现a+b!=c怎么办?没事,还有filterRow方法,没找到就返回true,把当前列过滤掉。

@Override
public boolean filterRow() throws IOException {
    return !foundC;
}

一行执行完成,要重置下临时状态。

@Override
public void reset() throws IOException {
    sum = 0;
    hasA = false;
    hasB = false;
}

找齐了这行就不继续找了,filterAllRemaining返回true。找到了a、b,不能直接返回true,因为可能还有其他列,filterAllRemaining是每个cell都执行的方法。所以在找齐一列的时候设置下值。

private boolean filterAllRemaining;
@Override
public void reset() throws IOException {
	……
    filterAllRemaining = foundC;
}
@Override
public boolean filterAllRemaining() throws IOException {
    return filterAllRemaining;
}

打包放进HBase,重启HBase,客户端执行代码。

scan.setFilter(new AnFilter(10));

返回找到的那行。

row5	5	5	value5

-END-

www.htsjk.Com true http://www.htsjk.com/hbase/42376.html NewsArticle HBase之Filter, Filter的作用是谓词下推,就是在Scan查询数据时,将过滤数据的操作放到服务端进行,减少数据的传输,减少网络IO。 介绍Filter使用方法的文章很多,就不再赘述了,主要记...
相关文章
    暂无相关文章
评论暂时关闭