HBase学习笔记-聚合函数,
利用HBase的coprocessor特性实现聚合函数,添加coprocessor方式有两种
1、修改hbase-site.xml,添加如下内容
<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>
2、通过调用API,让Table注册该Coprocessor
String coprocessClassName = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
HBaseAdmin admin = new HBaseAdmin(HBaseConfiguration.create());
admin.disableTable(tableName);
HTableDescriptor htd = admin.getTableDescriptor(tableName);
htd.addCoprocessor(coprocessClassName);
admin.modifyTable(tableName, htd);
admin.enableTable(tableName);</span>
/** 统计表行数*/
public static long rowCount(String tableName, String family) {
AggregationClient ac = new AggregationClient(configuration);
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(family));
scan.setFilter(new FirstKeyOnlyFilter());
long rowCount = 0;
try {
rowCount = ac.rowCount(Bytes.toBytes(tableName), new LongColumnInterpreter(), scan);
} catch (Throwable e) {
logger.info(e.getMessage(), e);
}
return rowCount;
}
/** 求和*/
public static double sum(String tableName, String family, String qualifier) {
AggregationClient ac = new AggregationClient(configuration);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
double sum = 0;
try {
sum = ac.sum(Bytes.toBytes(tableName), new DoubleColumnInterpreter(), scan);
} catch (Throwable e) {
logger.info(e.getMessage(), e);
}
return sum;
}
/** 求平均值*/
public static double avg(String tableName, String family, String qualifier) {
AggregationClient ac = new AggregationClient(configuration);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
double avg = 0;
try {
avg = ac.avg(Bytes.toBytes(tableName), new DoubleColumnInterpreter(), scan);
} catch (Throwable e) {
logger.info(e.getMessage(), e);
}
return avg;
}注意sum、avg时候里面的列解释器ColumnInterpreter是DoubleColumnInterpreter,而不是HBase里面自带的LongColumnInterpreter。DoubleColumnInterpreter是实现ColumnInterpreter接口的一个子类。
public class DoubleColumnInterpreter implements
ColumnInterpreter<Double, Double> {
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void readFields(DataInput in) throws IOException {
}
@Override
public Double getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
throws IOException {
if (kv == null)
return null;
return Double.valueOf(new String(kv.getValue()));
}
@Override
public Double add(Double l1, Double l2) {
if (l1 == null ^ l2 == null) {
return l1 == null ? l2 : l1;
} else if (l1 == null) {
return null;
}
return l1 + l2;
}
@Override
public Double getMaxValue() {
return null;
}
@Override
public Double getMinValue() {
return null;
}
@Override
public Double multiply(Double o1, Double o2) {
if (o1 == null ^ o2 == null) {
return o1 == null ? o2 : o1;
} else if (o1 == null) {
return null;
}
return o1 * o2;
}
@Override
public Double increment(Double o) {
return null;
}
@Override
public Double castToReturnType(Double o) {
return o.doubleValue();
}
@Override
public int compare(Double l1, Double l2) {
if (l1 == null ^ l2 == null) {
return l1 == null ? -1 : 1; // either of one is null.
} else if (l1 == null)
return 0; // both are null
return l1.compareTo(l2); // natural ordering.
}
@Override
public double divideForAvg(Double o, Long l) {
return (o == null || l == null) ? Double.NaN : (o.doubleValue() / l
.doubleValue());
}
}
将这个类打成jar包放到HBASE_HOME目录下lib目录下,然后重启hbase,遍可以调用相关聚合函数。
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。