欢迎投稿

今日深度:

Hbase的协处理器,

Hbase的协处理器,


协处理采用Observe观察者模式,监听操作.分为

1)masterobserver    //监听建表等ddl语句

2)walobserver    //监听WAL的操作

3)regionObserver    //监听region内操作 使用最广

regionObserver 使用方式:

1.extends BaseRegionObserver

2.重新方法

 其中有很多方法,可以监听很多操作具体可查看具体的api

1.以微博粉丝关注为例,监听对关注表的添加和删除,当关注表a关注b之后,对应粉丝表中b的粉丝也应该有a 

使用post监听,在表粉丝中插入数据的时候,触发在表关注中插入数据,删除的也同样进行触发

package observer;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.NavigableMap;

public class Weibo extends BaseRegionObserver {

    private static final String GUANZHU_TABLE_NAME = "weibo:guanzhu";
    private static final String FENSI_TABLE_NAME = "weibo:fensi";

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {

        //获取环境对象
        RegionCoprocessorEnvironment env = e.getEnvironment();
        //如果表名是关注表
        if(env.getRegionInfo().getTable().getNameAsString().equals(GUANZHU_TABLE_NAME)){

            //得到put的Cell
            //由于put时只有一个数据,即一个cell,所以获取map中第一个cell即可
            List<Cell> cells = put.getFamilyCellMap().firstEntry().getValue();
            Cell cell = cells.get(0);

            //从cell中获取rowKey,即关注者
            String a = Bytes.toString(CellUtil.cloneRow(cell));
            String[] arr = a.split(",");

            //从cell中获取value,即被关注者
            String b = Bytes.toString(CellUtil.cloneValue(cell));

            String newa = arr[0];
         String newb= b+System.currentTimeMillis();
            //   String newb = b+","+arr[1];


            //通过当前环境构造新的table对象
            Table table = (Table)env.getTable(TableName.valueOf(FENSI_TABLE_NAME));

            //构造新的put对象,将值对调,写入到粉丝表中
            Put newPut = new Put(Bytes.toBytes(newb));
            newPut.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("name"),Bytes.toBytes(a));

            table.put(newPut);

            table.close();
        }
    }

    @Override
    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
        super.postDelete(e, delete, edit, durability);
        FileOutputStream fos = new FileOutputStream("/home/centos/test.log",true);
        NavigableMap<byte[], List<Cell>> map = delete.getFamilyCellMap();
        List<Cell> list = map.firstEntry().getValue();
        Cell cell = list.get(0);
        byte[] row = CellUtil.cloneRow(cell);
        byte[] value = CellUtil.cloneValue(cell);
        String val = "删除Value:"+value+"\n";
        fos.write(value);
      Table table = e.getEnvironment().getTable(TableName.valueOf("weibo:fensi"));
      Scan sc  = new Scan();
        ResultScanner scanner = table.getScanner(sc);
        for (Result result : scanner) {
            List<Cell> cells = result.listCells();
            for (Cell cell1 : cells) {
                byte[] row1 = CellUtil.cloneRow(cell1);
                byte[] value1 = CellUtil.cloneValue(cell1);
                String  fensiRow = Bytes.toString(row1);
                String fenrow = "粉丝row:"+fensiRow+"\n";
                fos.write(fenrow.getBytes());
                String fenVal = "粉丝row:"+value1+"\n";
                fos.write(fenVal.getBytes());
            if(fensiRow.contains(Bytes.toString(value))&&Bytes.toString(value1).equals(Bytes.toString(value))){
                Delete delete1  = new Delete(row1);
                table.delete(delete1);
            }

            }


        }







    }
}

3.协处理的注册方式

1、代码注册

        表级注册方式

           1)建表时指定协处理器

           2)表已经存在,通过modify方式添加协处理器

package calllog;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.junit.Test;

import java.io.IOException;

public class ModifyCopro {


        /***
         * 通过代码删除协处理器
         * @throws Exception
         */
        @Test
        public void testDel() throws Exception{
            //初始化hbase 的conf
            Configuration conf = HBaseConfiguration.create();

            //通过连接工厂创建连接的类
            Connection conn = ConnectionFactory.createConnection(conf);

            //获取hbase管理员
            Admin admin = conn.getAdmin();

            TableName table = TableName.valueOf("ns1:calllog");

            admin.disableTable(table);

            HTableDescriptor htd = new HTableDescriptor(table);

//            htd.addFamily(new HColumnDescriptor("normel"));
//            htd.addFamily(new HColumnDescriptor("profess"));

            admin.modifyTable(table,htd);

            admin.enableTable(table);

            admin.close();
            conn.close();

        }

    @Test
    public void testAdd2() throws Exception{
        //初始化hbase 的conf
        Configuration conf = HBaseConfiguration.create();

        //通过连接工厂创建连接
        Connection conn = ConnectionFactory.createConnection(conf);

        //获取hbase管理员
        Admin admin = conn.getAdmin();

        TableName table = TableName.valueOf("calllog");

        //admin.disableTable(table);

        HTableDescriptor htd = new HTableDescriptor(table);

        htd.addCoprocessor("calllog.CalllogCopro",new Path("/Calllog.jar"),0,null);

        htd.addFamily(new HColumnDescriptor("normel"));
        htd.addFamily(new HColumnDescriptor("profess"));

        admin.modifyTable(table,htd);

       // admin.enableTable(table);

        admin.close();
        conn.close();

    }


    @Test
public void addCopro() throws IOException {
    Configuration conf = HBaseConfiguration.create();
    Connection connection = ConnectionFactory.createConnection(conf);
    Admin admin = connection.getAdmin();
    TableName tableName = TableName.valueOf("ns1:calllog");
    admin.disableTable(tableName);
    HTableDescriptor td = new HTableDescriptor(tableName);
    td.addCoprocessor("calllog.CalllogCopro",new Path("/Calllog.jar"),0,null);
      td.addFamily(new HColumnDescriptor("normel"));
    td.addFamily(new HColumnDescriptor("profess"));
        admin.modifyTable(TableName.valueOf("ns1:calllog"),td);
        admin.enableTable(tableName);
}
}

 

 

    2、命令行注册

       表级注册方式

           1)注册

              禁用表

                  disable 'test:t1'

              修改并添加协处理器

                  alter 'test:t1', 'coprocessor'=>'hdfs://mycluster/myhbase.jar|com.oldboy.hbase.observer.MyObserver|3|'

              启用表

                  enable 'test:t1'

 

           2)删除

              禁用表

                  disable 'test:t1'

              修改并删除协处理器

                  alter 'test:t1', METHOD => 'table_att_unset', NAME => 'coprocessor$1'

              启用表

                  enable 'test:t1'

             

 

    3、xml注册

       全局注册,可以在所有表中发挥作用

           1.编写并编译jar

           2.放置jar到hbase/lib下,并分发

           3.修改配置文件hbase-site.xml,并分发

           <property>

               <name>hbase.coprocessor.region.classes</name>

               <value>Observer类的全路径</value>

            </property>

 

www.htsjk.Com true http://www.htsjk.com/hbase/42216.html NewsArticle Hbase的协处理器, 协处理采用Observe观察者模式,监听操作.分为 1)masterobserver    //监听建表等ddl语句 2)walobserver    //监听WAL的操作 3)regionObserver    //监听region内操作 使用最广 regionO...
相关文章
    暂无相关文章
评论暂时关闭