欢迎投稿

今日深度:

Hbase-Observer,

Hbase-Observer,


  • HBase的协处理器涵盖了两种类似关系型数据库中的应用场景:存储过程和触发器,所以协处理器也分为两种:用来实现存储过程功能的终端 程序EndPoint和用来实现触发器功能的观察者Observers

Observer

  • 在hbase2.x的时候,按照之前的继承BaseRegionObserver 是不起作用的,经过我的测试,这个类好像是被移除了,我使用的版本是2.1.1
  • 新的实现可以查看接口Coprocessor来查看,我们来看一下

    /**
     * Base interface for the 4 coprocessors - MasterCoprocessor, RegionCoprocessor,
     * RegionServerCoprocessor, and WALCoprocessor.
     * Do NOT implement this interface directly. Unless an implementation implements one (or more) of
     * the above mentioned 4 coprocessors, it'll fail to be loaded by any coprocessor host.
     *
     * Example:
     * Building a coprocessor to observe Master operations.
     * <pre>
     * class MyMasterCoprocessor implements MasterCoprocessor {
     *   &#64;Override
     *   public Optional&lt;MasterObserver> getMasterObserver() {
     *     return new MyMasterObserver();
     *   }
     * }
     * class MyMasterObserver implements MasterObserver {
     *   ....
     * }
     * </pre>
     * Building a Service which can be loaded by both Master and RegionServer
     * <pre>
     * class MyCoprocessorService implements MasterCoprocessor, RegionServerCoprocessor {
     *   &#64;Override
     *   public Optional&lt;Service> getServices() {
     *     return new ...;
     *   }
     * }
     */
  • 这是Coprocessor接口的介绍,非常清楚的介绍了协处理器的用法,那么我就简单的翻译一下下,自己的英语很渣的

    有四个接口的父类是coprocessors,他们分别是MasterCoprocessor, RegionCoprocessor, RegionServerCoprocessor, WALCoprocessor.
    不要去直接实现coprocessors接口,除非你的实现类实现了上述4个Observer
    然后下面的就是一些具体的实现代码
  • 经过之前的介绍,我们知道了Observer的作用就类似一个触发器,当指定操作也就是你实现的方法对应的操作被执行的时候,你定义的Observer逻辑就会去在HBase上去跑,以实现一些控制
  • 当然上面的coprocessors的实现不知四个接口,还有一些其他实现类,比如AccessController,这个提供数据访问和管理的基本授权检查等操作,还有一些自己可以去查看一下
  • 多说无益,直接上 需求

     t1表中有f1列族
     向f1中插入name,然后自动计算出name的hash作为此行的password列,不可单独添加password
     可删除name,顺便一起删除password,不可单独删除password
  • maven

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.7</version>
        </dependency>
    </dependencies>
  • 直接上 代码

    public class MyRegionCoprocessor implements RegionCoprocessor {
        @Override
        public Optional<RegionObserver> getRegionObserver() {
            return Optional.ofNullable(new MyRegionObserver());
        }
        private static class MyRegionObserver implements RegionObserver {
            private static final byte[] FAMILY = Bytes.toBytes("f1");
            private static final byte[] COL_NAME = Bytes.toBytes("name");
            private static final byte[] COL_PASSWORD = Bytes.toBytes("password");
            @Override
            public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException {
                //如果插入的是password列,那么就报错
                List<Cell> passwords = put.get(FAMILY, COL_PASSWORD);
                if (passwords.size() != 0 || !passwords.isEmpty()){
                    throw new IllegalArgumentIOException("Password insertion is not allowed !!");
                }
                //如果添加的是name,那么就增加相对应的name的hash的password列
                List<Cell> names = put.get(FAMILY, COL_NAME);
                names.forEach(name -> {
                    int hash = Objects.hash(name);
                    put.addColumn(FAMILY,COL_PASSWORD,Bytes.toBytes(hash));
                });
            }
            @Override
            public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException {
                //阻止删除密码,不可单独删除密码
                List<Cell> passwords = delete.get(FAMILY, COL_PASSWORD);
                if (passwords.size() != 0 || !passwords.isEmpty()){
                    throw new IllegalArgumentIOException("Password insertion is not delete !!");
                }
                //到这就代表不是密码了,获取name,然后删除对应的password
                List<Cell> names = delete.get(FAMILY, COL_NAME);
                names.forEach(name -> {
                    delete.addColumns(FAMILY,COL_PASSWORD);
                });
            }
        }
    }
  • 如果你对上面的Optional类感到陌生,你就记住他是用来判断值是否是空的就可以,如果想进一步了解,你可以参考我的专辑Java8学习,希望能帮助到你
  • 上面的代码逻辑还是比较清晰的,在外部类的重写的方法getRegionObserver中返回了我们自己实现的Observer类,当然这个方法是不用必须重写的,当我们需要实现自己的Observer的时候必须重写getRegionObserver,而当我们实现自己的EndPoint的时候,就需要重写getEndpointObserver方法了,还有一个在RegionCoprocessor中的方法是getBulkLoadObserver,从名字我们可以看到是批加载用的
  • 当我们实现好了自己的Observer的时候,我们就需要部署到HBase集群中去看看效果,在网上我并没有找到类似本地测试Observer的办法,这可真是麻烦.
  • 开始部署,当然打成jar,我就不说了,打完后,部署到HBase的方式有两种,一种是通过配置文件的方式去部署,一种是通过动态的方式去部署,前者称为静态部署,后面即动态部署, 静态部署 我将简单介绍一下,如下,而自己使用的是动态部署的方法

    在hbase-site.xml中配置静态部署
      <property>
          <name>hbase.coprocessor.region.classes</name>   #这是配置region的协处理器,对应的就是:hbase.coprocessor.master.classes
          <value>你的全类名</value>   #如果想同时配置多个协处理器,可以用逗号分隔多个协处理器的类名
      </property>
    相关的配置
      hbase.coprocessor.enabled:是否启用协处理器机制,默认true,即开启
      hbase.coprocessor.user.enabled:即是否允许用户动态配置,如果为false,就只能在xml中配置了
      hbase.coprocessor.wal.classes:即监控wal的
      hbase.coprocessor.abortonerror:即如果个别协处理器启动失败整个hbase是否启动,默认是如果有的协处理器启动失败,那么hbase就不起了
    需要注意的是
      1.配置顺序决定了执行顺序
      2. 所有协处理器是以系统级优先级加载的 (优先级一会儿会提到)
      3. 重启集群起作用
  • 动态配置

    • 首先你如果使用我上面的代码测试,那么请create 't1','f1'
    • 然后将你的jar上传到集群,然后上传到hdfs,如果你使用我的代码测试,请直接将jar上传到hdfs:///hbase.jar
    • 然后disable 't1'
    • 然后使用特定命令在表上配置协处理器:alter 't1','coprocessor'=>'hdfs:///hbase.jar|qidai.MyRegionCoprocessor|1001|''
    • 再然后enabled 't1'
    • 然后desc 't1',你就会看到
    hbase(main):086:0> desc 't1'
    Table t1 is ENABLED
    #注意这里代表已经配置上去了,如果没有配置上的话,就是 : 紧接着COLUMN FAMILIES DESCRIPTION 字眼,并没有下面的说明
    t1, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://hbase.jar|qidai.MyRegionCoprocessor|1001|'}        
    COLUMN FAMILIES DESCRIPTION                                                       
    • 到了这一步其实就可以了,然后在t1表中尽情测试需求就好了,我测试的时候就很简单的测试了一下,如果你使用我的代码测试,如果有问题,请及时指正哈!!!如果你发现你的动态配置貌似没起作用,那么请重启一下对应的RegionServer,我出现了这个问题,估计是我的笔记本内存上限了,没反应过来??????
  • 到这就该 说说优先级 了,如上如果配置了处理器,多出来的那一行我们分析一下

    alter 't1','coprocessor'=>'hdfs:///hbase.jar|qidai.MyRegionCoprocessor|1001|'
    alter '表名','coprocessor'=>'hdfs地址|你的全类名|优先级|传入给你实现类的参数'
    
    这里的优先级是0是最高的,以整数来表示的,越小值越先被执行
  • 好了到这之后,大概的一些问题已经解决了,那么怎么将Observer给 卸载 呢 ?

    • 静态部署的话,那么删除对应的xml内容,然后重启HBase即可
    • 动态部署的话,我们可以先disable,然后使用alter 't1', METHOD => 'table_att_unset', NAME => 'coprocessor$1'来进行卸载,在最后面的coprocessor$1需要自己改一下,看看自己的处理器名是啥就写啥,然后在enable就行了
  • 剩余的EndPoint的使用这篇暂时不介绍, 不过迟早会补上的 , 因为现在只对Avro了解一些, 而Protobuf 不太清楚, 恩..今天这篇涉及的只是Observer一些最基本的操作,是并没有涉及什么原理之类的, 所以在了解其原理之前这也是必备的知识, 如果本片文章对你有些许帮助那就点个赞吧哈哈

www.htsjk.Com true http://www.htsjk.com/hbase/11163.html NewsArticle Hbase-Observer, HBase的协处理器涵盖了两种类似关系型数据库中的应用场景:存储过程和触发器,所以协处理器也分为两种:用来实现存储过程功能的终端 程序EndPoint和用来实现触发器功能的...
评论暂时关闭