Hbase详解,
HBase架构图理解
18.png写数据流程
读数据流程
HBase Java API基本使用
package com.qh.hbase; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; public class HbaseHH { /* * 连接hbase * */ private static Connection connection=null; static{ Configuration config=HBaseConfiguration.create(); // 设置连接参数:HBase数据库所在的主机IP config.set("hbase.zookeeper.quorum", "192.168.64.104"); // 设置连接参数:HBase数据库使用的端口 config.set("hbase.zookeeper.property.clientPort", "2181"); try { connection=ConnectionFactory.createConnection(config); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /* * 获取表列表 * */ static void List() throws IOException{ Admin admin = connection.getAdmin(); for(TableName tn:admin.listTableNames()){ System.out.println(tn.getNameAsString()); } } /** * 创建表 * @param tablename * @param familyNames * */ public static boolean create(String tableName,String ...familyNames ) { try { TableName tn = TableName.valueOf(tableName); Admin admin = connection.getAdmin(); if(admin.tableExists(tn)) { admin.disableTable(tn); admin.deleteTable(tn); } //创建表的描述对象 HTableDescriptor htd = new HTableDescriptor(tn); for(String family:familyNames) { HColumnDescriptor hcd = new HColumnDescriptor(family); htd.addFamily(hcd); } admin.createTable(htd); admin.close(); return true; } catch (IOException e) { e.printStackTrace(); return false; } } /* *删除表 *@param tablename * */ public static boolean deletetable(String tablename){ TableName tName =TableName.valueOf(tablename); try { Admin admin = connection.getAdmin(); admin.disableTable(tName); admin.deleteTable(tName); return true; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); return false; } } /* *描述表 *@param tablename * */ public static void describe(String tableName) { try { Admin admin = connection.getAdmin(); HTableDescriptor htd= admin.getTableDescriptor(TableName.valueOf(tableName)); System.out.println("===describe "+tableName+"==="); for(HColumnDescriptor hcd:htd.getColumnFamilies()) { System.out.println(hcd.getNameAsString()); } System.out.println("======================="); } catch (IOException e) { e.printStackTrace(); } } /* * 操作表数据 * @param tablename * @param rowkey * @param kvs * */ public static boolean put(String tableName,String rowKey,String[][] kvs) { Table table = null; try { table = connection.getTable(TableName.valueOf(tableName)); List<Put> lp = new ArrayList<>(); for(String[] kv : kvs) { Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn( Bytes.toBytes(kv[0]), Bytes.toBytes(kv[1]), Bytes.toBytes(kv[2])); lp.add(put); } table.put(lp); table.close(); return true; }catch (Exception e) { return false; } } /* * 添加数据 * @param tableName * @param key * @param kvs * @throws IOException */ public static void put1(String tableName,String key,String[][] kvs){ Table table = null; try { table = connection.getTable(TableName.valueOf(tableName)); //List<Put> lp = new ArrayList<Put>(); List<Put> lp = new ArrayList<Put>(); for (String[] kv : kvs) { Put put = new Put(Bytes.toBytes(key)); put.addColumn(Bytes.toBytes(kv[0]), Bytes.toBytes(kv[1]), Bytes.toBytes(kv[2])); lp.add(put); } table.put(lp); System.out.println("添加成功"); } catch (IOException e) { e.printStackTrace(); }finally { if (table!=null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 根据rowKey获取列键的值 * @param tableName * @param rowKey */ public static void get(String tableName,String rowKey) { Table table = null; try { table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); Result result= table.get(get); for(Cell cell : result.listCells()) { String family= Bytes.toString(CellUtil.cloneFamily(cell)); String qualifier= Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); System.out.println(family+"\t"+qualifier+"\t"+value); } } catch (IOException e) { e.printStackTrace(); }finally { try { if(table!=null) table.close(); } catch (IOException e) { e.printStackTrace(); } } } /* * 扫描表 * @param tablename * */ public static void scan(String tableName) { Table table = null; try { table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); //列值过滤器 SingleColumnValueFilter scv = new SingleColumnValueFilter( Bytes.toBytes("grade"), Bytes.toBytes("class"), CompareOp.EQUAL, Bytes.toBytes("1")); //scan.setFilter(scv); //行健过滤器 RowFilter rowFilter = new RowFilter( CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("a02"))); //scan.setFilter(rowFilter); rowFilter = new RowFilter(CompareOp.EQUAL, new BinaryPrefixComparator("a".getBytes())); //scan.setFilter(rowFilter); rowFilter = new RowFilter(CompareOp.EQUAL, new SubstringComparator("redu")); //scan.setFilter(rowFilter); rowFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(".*rr")); //scan.setFilter(rowFilter); //过滤器链 //FilterList.Operator.MUST_PASS_ALL :&& //FilterList.Operator.MUST_PASS_ONE:|| FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); filterList.addFilter(scv); filterList.addFilter(rowFilter); scan.setFilter(filterList); //设置起止rowkey //scan.setStartRow(Bytes.toBytes("jredu001")); //scan.setStopRow(Bytes.toBytes("jredu002")); ResultScanner rs = table.getScanner(scan); for(Result result:rs) { System.out.println("******************************************"); System.out.println(Bytes.toString(result.getRow()));//得到行健 for(Cell cell : result.listCells()) { String family= Bytes.toString(CellUtil.cloneFamily(cell)); String qualifier= Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); System.out.println(family+"\t"+qualifier+"\t"+value); } System.out.println("******************************************"); } } catch (IOException e) { e.printStackTrace(); }finally { try { if(table!=null) table.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { // boolean cr = deletetable("score"); List(); /*String tableName="score"; String familys="subject"; boolean cr = create(tableName, familys); if(cr){ System.out.println("创建成功"); }else{ System.out.println("创建失败"); }*/ //describe("student"); // String [][] kvs ={{"grade","class","2"},{"name","name","zhang"}}; //put1("student","c02rr",kvs); // get("student", "a01"); scan("student"); } }HBase架构中各个模块的功能再次总结
hbase与mapreduce的集成
可以把hbase表中的数据作为mapreduce计算框架的输入,或者把mapreduce的计算结果输出到hbase表中。
我们以hbase中自带的mapreduce程序举例
HBase的数据迁移的importsv的使用
HBase数据来源于日志文件或者RDBMS,把数据迁移到HBase表中。常见的有三种方法:(1)使用HBase Put API;(2)使用HBase批量加载工具;(3)自定义MapReduce job实现。
importtsv是HBase官方提供的基于mapreduce的批量数据导入工具,同时也是hbase提供的一个命令行工具,可以将存储在HDFS上的自定义分隔符(默认是\t)的数据文件,通过一条命令方便的导入到HBase中。
测试
作者:心_的方向
链接:http://www.jianshu.com/p/e2bbf23f1ba2
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。