欢迎投稿

今日深度:

hbase,

hbase,


一.hbase架构

1.1、HMaster
功能:

1.2、RegionServer
功能:

组件:

1.3、HBase 的架构

HBase 内置有 Zookeeper,但一般我们会有其他的 Zookeeper 集群来监管 master 和regionserver,Zookeeper 通过选举,保证任何时候,集群中只有一个活跃的 HMaster,HMaster与 HRegionServer 启动时会向 ZooKeeper 注册,存储所有 HRegion 的寻址入口,实时监控HRegionserver 的上线和下线信息。并实时通知给 HMaster,存储 HBase 的 schema 和 table元数据,默认情况下,Base 管理 ZooKeeper 实例,Zookeeper 的引入使得 HMaster 不再是单点故障。一般情况下会启动两个 HMaster,非 Active 的 HMaster 会定期的和 Active HMaster
通信以获取其最新状态,从而保证它是实时更新的,因而如果启动了多个 Master 反而增加了 Active HMaster 的负担。一个 RegionServer 可以包含多个 egion,每个 RegionServer 维护一个 HLog,和多个 HFiles以及其对应的 MemStore。RegionServer 运行于 DataNode 上,数量可以与 DatNode 数量一致,
请参考如下架构图:

3hbase物理存储结构

二、表的操作

    hbase(main)> create 'student','info'
  1. 插入数据到表
hbase(main) > put 'student','1001','info:name','Thomas'
hbase(main) > put 'student','1001','info:sex','male'
hbase(main) > put 'student','1001','info:age','18'
hbase(main) > put 'student','1002','info:name','Janna'
hbase(main) > put 'student','1002','info:sex','female'
hbase(main) > put 'student','1002','info:age','20'
  1. 扫描查看表数据
hbase(main) > scan 'student'
hbase(main) > scan 'student',{STARTROW => '1001', STOPROW => '1001'}
hbase(main) > scan 'student',{STARTROW => '1001'}
  1. 查看表结构
hbase(main):012:0> describe ‘student’
  1. 更新指定字段的数据
hbase(main) > put 'student','1001','info:name','Nick'
hbase(main) > put 'student','1001','info:age','100'
  1. 查看“指定行”或“指定列族:列”的数据
hbase(main) > get 'student','1001'
hbase(main) > get 'student','1001','info:name'
  1. 删除数据
    删除某 rowkey 的全部数据:
hbase(main) > deleteall 'student','1001'

删除某 rowkey 的某一列数据:

hbase(main) > delete 'student','1002','info:sex'
  1. 清空表数据
hbase(main) > truncate 'student'

尖叫提示:清空表的操作顺序为先 disable,然后再 truncating。
9) 删除表
首先需要先让该表为 disable 状态:

hbase(main) > disable 'student'

然后才能 drop 这个表:

hbase(main) > drop 'student'

尖叫提示:如果直接 drop 表,会报错:Drop the named table. Table must first be disabled
ERROR: Table student is enabled. Disable it first.
10) 统计表数据行数

hbase(main) > count 'student'
  1. 变更表信息
    将 info 列族中的数据存放 3 个版本:
hbase(main) > alter 'student',{NAME=>'info',VERSIONS=>3}

三.读写过程

1、读写流程

四,api操作

1新建项目后在 pom.xml 中添加依赖:

<dependency>
<groupId>org.apache.hbase</groupId>
 <artifactId>hbase-server</artifactId>
 <version>1.2.0</version>
</dependency>
<dependency>
 <groupId>org.apache.hbase</groupId>
 <artifactId>hbase-client</artifactId>
 <version>1.2.0</version>
</dependency>

2操作

package hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class HbaseApi {
    public static Configuration conf;

    static {
//使用 HBaseConfiguration 的单例方法实例化
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master01");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
    }
    /**
     * 创建表空间
     *
     * @param namespaceNameStr
     * @throws IOException
     */
    public static void createNamespace(String namespaceNameStr) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespaceNameStr);
        NamespaceDescriptor nd = builder.build();
        admin.createNamespace(nd);
        System.out.println("创建命" + namespaceNameStr + "名空间成功");
    }
    /**
     * 删除命名空间
     *
     * @param namespaceNameStr
     * @throws IOException
     */
    public static void deleteNamespace(String namespaceNameStr) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        admin.deleteNamespace(namespaceNameStr);
        System.out.println("删除命名空间" + namespaceNameStr + "成功");
    }

    /**
     * 查看所有命名空间
     *
     * @param namespaceNameStr
     * @throws IOException
     */
    public static void scanAllNamespace() throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
        for (NamespaceDescriptor name : list) {
            System.out.println("命名空间:" + name);
        }
    }

    /**
     * 判读表是否存在(新的api)
     *
     * @param tableName
     * @return true or false
     */
    public static boolean isTableExist(String tableName) throws IOException {
        //在 HBase 中管理、访问表需要先创建 HBaseAdmin 对象
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        return admin.tableExists(TableName.valueOf(tableName));
    }

    /**
     * 注意老版本的api 判断表是否存在
     * 老的api兼容性好(一定范围),
     *
     * @param tableName
     * @return
     * @throws IOException
     */
    public static boolean isTableExistOld(String tableName) throws IOException {
        //在 HBase 中管理、访问表需要先创建 HBaseAdmin 对象
        //注意这是老版本的api
        HBaseAdmin admin = new HBaseAdmin(conf);
        return admin.tableExists(tableName);
    }

    /**
     * 创建一张表
     *
     * @param tableName
     * @param columnFamily
     * @throws IOException
     */
    public static void createTable(String tableName, String... columnFamily) throws IOException {
        //管理表用的是admin  ,管理表的数据getTable
        HBaseAdmin admin = new HBaseAdmin(conf);
        //判断表是否存在
        if (isTableExist(tableName)) {
            System.out.println("表" + tableName + "已存在");
            //System.exit(0);
        } else {
            //创建表属性对象,表名需要转字节
            //HTableDescriptor 表的描述器
            //创建表需要的是列族,不需要表
            HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
            //创建多个列族
            for (String cf : columnFamily) {
                descriptor.addFamily(new HColumnDescriptor(cf));
            }
            //根据对表的配置,创建表
            admin.createTable(descriptor);
            System.out.println("表" + tableName + "创建成功!");
        }
    }

    /**
     * 删除表
     *
     * @param tableName
     * @throws IOException
     */
    public static void dropTable(String tableName) throws IOException {
        HBaseAdmin admin = new HBaseAdmin(conf);
        if (isTableExist(tableName)) {
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
            System.out.println("表" + tableName + "删除成功!");
        } else {
            System.out.println("表" + tableName + "不存在!");
        }
    }

    /**
     * 进行单条数据插入
     *
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param column
     * @param value
     * @throws IOException
     */
    public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws
            IOException {
        //创建 HTable 对象
        HTable hTable = new HTable(conf, tableName);   //老的api
       /* Connection connection = ConnectionFactory.createConnection(conf);//新的api
        Table table = connection.getTable(TableName.valueOf(tableName));*/
        //向表中插入数据
        Put put = new Put(Bytes.toBytes(rowKey));    //要求是字节数组,转换为(Bytes.toBytes)
        //向 Put 对象中组装数据
        //put.add() 封装到一个新的单元格
        //用于添加多条数据
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
      /* //单条数据,已经过期
        put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));*/
        hTable.put(put);
        //  hTable.put(Put);  put对象
        // hTable.put();  list集合
        hTable.close();
        System.out.println("插入数据成功");
    }

    /**
     * 删除多行数据
     *
     * @param tableName
     * @param rows(根据业务可以将方法重载)
     * @throws IOException
     */
    public static void deleteMultiRow(String tableName, String... rows) throws IOException {
        HTable hTable = new HTable(conf, tableName);
        List<Delete> deleteList = new ArrayList<Delete>();
        for (String row : rows) {
            Delete delete = new Delete(Bytes.toBytes(row));
            deleteList.add(delete);
        }
        hTable.delete(deleteList);
        hTable.close();
        System.out.println("删除数据成功");
    }

    /**
     * 获得所有行的数据
     *
     * @param tableName
     * @throws IOException
     */
    public static void getAllRows(String tableName) throws IOException {
        HTable hTable = new HTable(conf, tableName);
        //得到用于扫描 region 的对象
        Scan scan = new Scan();
        // scan.setMaxVersions();  扫描最大的版本号
        //使用 HTable 得到 resultcanner 实现类的对象
        ResultScanner resultScanner = hTable.getScanner(scan);
        for (Result result : resultScanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                //得到 rowkey
                System.out.println("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));
                //得到列族
                System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
                System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
                System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
    }

    /**
     * 获取rowKey下某一行的数据
     *
     * @param tableName
     * @param rowKey
     * @throws IOException
     */
    public static void getRow(String tableName, String rowKey) throws IOException {
        HTable table = new HTable(conf, tableName);
        Get get = new Get(Bytes.toBytes(rowKey));
        //get.setMaxVersions();显示所有版本
        //get.setTimeStamp();显示指定时间戳的版本
        Result result = table.get(get);
        for (Cell cell : result.rawCells()) {
            System.out.println("行键:" + Bytes.toString(result.getRow()));
            System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
            System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
            System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
            System.out.println("时间戳:" + cell.getTimestamp());
        }
    }

    /**
     * 获取某一行指定“列族:列”的数据
     *
     * @param tableName
     * @param rowKey
     * @param family
     * @param qualifier
     * @throws IOException
     */
    public static void getRowQualifier(String tableName, String rowKey, String family, String
            qualifier) throws IOException {
        HTable table = new HTable(conf, tableName);
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        Result result = table.get(get);
        for (Cell cell : result.rawCells()) {
            System.out.println("行键:" + Bytes.toString(result.getRow()));
            System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
            System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
            System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }


    public static void main(String[] args) throws IOException {
     /*   scanAllNamespace();
        createNamespace("aa");
        scanAllNamespace();
        deleteNamespace("aa");*/
        //1测试isTableExist
        //    System.out.println(isTableExist("TEST_HBASE_PHOENIX"));
        //  createTable("staffe", "cf1", "cf2");
        //  dropTable("staffe");
        // System.out.println(isTableExist("staffe"));
        // addRowData("staffe", "1001", "cf1", "name", "nick");
        //  addRowData("staffe", "1002", "cf1", "name", "1002");
        //  addRowData("staffe", "1003", "cf1", "name", "1003");
        //   deleteMultiRow("staffe", "cf1", "1001", "1002");
        //  getAllRows("staffe");
        //  getRow("staffe", "1001");
        //  getRowQualifier("staffe", "1001", "cf1", "name");
    }
}

package hbase


import java.io.IOException
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Delete, HBaseAdmin, HTable, Put, ResultScanner, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes

import scala.collection.mutable.ListBuffer

object HbaseScalaUtilsApi {
  //创建hbase的链接信息
  val hBaseconf = HBaseConfiguration.create
  hBaseconf.set("hbase.zookeeper.property.clientPort", "2181")
  hBaseconf.set("hbase.zookeeper.quorum", "master01,slave01,slave02")
  hBaseconf.set("hbase.master", "master01")
  hBaseconf.set("zookeeper.znode.parent", "/hbase")
  val connection = ConnectionFactory.createConnection(hBaseconf)
  val hAdmin = connection.getAdmin

  /**
    * 判断表是否存在
    *
    * @param tableName
    * @return true or false
    */
  def isTableExist(tableName: String): Boolean = {
    var result = false
    val tName = TableName.valueOf(tableName)
    if (hAdmin.tableExists(tName)) {
      result = true
      println("表已经存在")
    } else {
      println("表不存在")
    }
    result
  }

  /**
    * 创建表
    *
    * @param tableName
    * @param columnFamilys
    */
  def createTable(tableName: String, columnFamilys: Array[String]) = {
    //操作的表名
    val tName = TableName.valueOf(tableName)
    //当表不存在的时候创建Hbase表
    if (!hAdmin.tableExists(tName)) {
      //创建Hbase表模式
      val descriptor = new HTableDescriptor(tName)
      //创建列簇i  此处是通过数组来封装多个列族
      for (columnFamily <- columnFamilys) {
        descriptor.addFamily(new HColumnDescriptor(columnFamily))
      }
      //创建表
      hAdmin.createTable(descriptor)
      println("create successful!!")
    } else {
      println("create fail!!")
    }
  }

  /**
    * 删除一张表
    *
    * @param tableName
    */
  def dropTable(tableName: String): Boolean = {
    var status = false
    val tName = TableName.valueOf(tableName)
    if (hAdmin.tableExists(tName)) {
      hAdmin.disableTable(tName)
      hAdmin.deleteTable(tName)
      println("drop successful!!")
      status = true
    } else {
      println("drop fail!!")
    }
    false
  }

  /**
    * put一条数据
    *
    * @param tableName
    * @param rowkey
    * @param columnFamily
    * @param column
    * @param value
    */
  def addRowData(tableName: String, rowkey: String, columnFamily: String, column: String, value: String) = {
    val table = connection.getTable(TableName.valueOf(tableName))
    //准备key 的数据
    val puts = new Put(rowkey.getBytes())
    //添加列簇名,字段名,字段值value
    puts.addColumn(columnFamily.getBytes(), column.getBytes(), value.getBytes())
    //把数据插入到tbale中
    table.put(puts)
    println("put successful!!")
  }

  /**
    * 查看表中某一列族下列的所有数据
    * scan( "staffe" "cf1" "name")
    *
    * @param tableName
    * @param columnFamily
    * @param column
    * @return
    */
  def getAllColumnRows(tableName: String, columnFamily: String, column: String) = {
    val table = connection.getTable(TableName.valueOf(tableName))
    //定义scan对象
    val scan = new Scan()
    //添加列簇名称
    scan.addFamily(columnFamily.getBytes())
    //从table中抓取数据来scan
    val scanner = table.getScanner(scan)
    var result = scanner.next()
    //数据不为空时输出数据
    while (result != null) {
      println(s"rowkey:${Bytes.toString(result.getRow)},列簇:${columnFamily}:${column},value:${Bytes.toString(result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(column)))}")
      result = scanner.next()
    }
    //通过scan取完数据后,记得要关闭ResultScanner,否则RegionServer可能会出现问题(对应的Server资源无法释放)
    scanner.close()
  }

  /**
    * 获得表所有数据
    *
    * @param tableName
    * @return
    */
  def getAllRows(tableName: String): ListBuffer[String] = {
    var table: Table = null
    val list = new ListBuffer[String]
    try {
      table = connection.getTable(TableName.valueOf(tableName))
      val results: ResultScanner = table.getScanner(new Scan)
      import scala.collection.JavaConversions._
      for (result <- results) {
        for (cell <- result.rawCells) {
          val row: String = Bytes.toString(cell.getRowArray, cell.getRowOffset, cell.getRowLength)
          val family: String = Bytes.toString(cell.getFamilyArray, cell.getFamilyOffset, cell.getFamilyLength)
          val colName: String = Bytes.toString(cell.getQualifierArray, cell.getQualifierOffset, cell.getQualifierLength)
          val value: String = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
          val context: String = "rowkey:" + row + "," + "列族:" + family + "," + "列:" + colName + "," + "值:" + value
          list += context
        }
      }
      results.close()
    } catch {
      case e: IOException =>
        e.printStackTrace()
    }
    list
  }


  /**
    * 删除某列
    * delete "staffe" "1001" "cf1" "name"
    *
    * @param tableName
    * @param rowkey
    * @param columnFamily
    * @param column
    */
  def deleteRecord(tableName: String, rowkey: String, columnFamily: String, column: String) = {
    val table = connection.getTable(TableName.valueOf(tableName))
    val info = new Delete(Bytes.toBytes(rowkey))
    info.addColumn(columnFamily.getBytes(), column.getBytes())
    table.delete(info)
    println("delete successful!!")
  }

  /**
    * 删除多列
    *
    * @param tableName
    * @param rows
    */
  def delMultiRows(tableName: String, rows: Array[String]): Unit = {
    val table = connection.getTable(TableName.valueOf(tableName))
    val deleteList = for (row <- rows) yield new Delete(Bytes.toBytes(row)) //yield 会把当前的元素记下来,保存在集合中
    deleteList.foreach(x => {
      table.delete(x)
    })
    println("删除多列成功")

  }

  // 关闭 connection 连接
  def close() = {
    if (connection != null) {
      try {
        connection.close()
        println("关闭成功!")
      } catch {
        case e: IOException => println("关闭失败!")
      }
    }
  }


  def main(args: Array[String]): Unit = {
    // isTableExist("tes2")
    //   createTable("staffe", Array("name", "age", "cf1"))
    //  dropTable("staffe")
    addRowData("staffe", "1001", "cf1", "name", "nick")
    addRowData("staffe", "1002", "cf1", "name", "1002")
    addRowData("staffe", "1003", "cf1", "name", "1003")
    getAllColumnRows("staffe", "cf1", "name")
    /*   val allRows: ListBuffer[String] = getAllRows("staffe")
       allRows.foreach(x => {
         println(x)
       })*/
    // deleteRecord("staffe", "1001", "cf1", "name")
    // delMultiRows("staffe", Array("1002", "1003"))
    close()
  }
}

四.hbase的shell操作

五.hbase跟hive进行映射

六.hbase跟sqoop集成

www.htsjk.Com true http://www.htsjk.com/hbase/37288.html NewsArticle hbase, 一.hbase架构 1.1、HMaster 功能: 1.2、RegionServer 功能: 组件: 1.3、HBase 的架构 HBase 内置有 Zookeeper,但一般我们会有其他的 Zookeeper 集群来监管 master 和regionserver,Zookeeper 通过选举...
相关文章
    暂无相关文章
评论暂时关闭