欢迎投稿

今日深度:

HBase工具类,

HBase工具类,


数据库连接类:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;

import java.io.IOException;

public class HBaseConn {
    private static final HBaseConn INSTANCE = new HBaseConn();
    private static Configuration configuration;
    private static Connection connection;

    private HBaseConn() {
        if (configuration == null) {
            configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "node02,node03,node04");
        }
    }

    /**
     * 创建数据库连接
     *
     * @return
     */
    private Connection getConnection() {
        if (connection == null || connection.isClosed()) {
            try {
                connection = ConnectionFactory.createConnection(configuration);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return connection;
    }

    /**
     * 获取数据库连接
     *
     * @return
     */
    public static Connection getHBaseConn() {

        return INSTANCE.getConnection();
    }

    /**
     * 获取表实例
     *
     * @param tableName
     * @return
     * @throws IOException
     */
    public static Table getTable(String tableName) throws IOException {
        return INSTANCE.getConnection().getTable(TableName.valueOf(tableName));
    }

    /**
     * 关闭连接
     */
    public static void closeConn() {
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}



表操作类:

import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class HBaseUtil {
    /**
     * 创建HBase     *
     * @param tableName 表名
     * @param cfs       列族的数组
     * @return 是否创建成功
     */
    public static boolean createTable(String tableName, String[] cfs) {
        try {
            HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin();
            if (admin.tableExists(tableName)) {
                return false;
            }
            HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
            Arrays.stream(cfs).forEach(cf -> {
                HColumnDescriptor columnDescriptor = new HColumnDescriptor(cf);
                columnDescriptor.setMaxVersions(1);
                tableDescriptor.addFamily(columnDescriptor);
            });
            admin.createTable(tableDescriptor);
        } catch (IOException e) {
            e.printStackTrace();
        }

        return true;
    }

    /**
     * 删除表
     *
     * @param tableName
     * @return
     */
    public static boolean deleteTable(String tableName) {
        try {
            HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin();
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 插入一条数据
     *
     * @param tableName 表名
     * @param rowKey    唯一标识
     * @param cfName    列族名
     * @param qualifier 列标识
     * @param data      数据
     * @return 是否插入成功
     */
    public static boolean putRow(String tableName, String rowKey, String cfName, String qualifier, String data) {
        try {
            Table table = HBaseConn.getTable(tableName);
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier), Bytes.toBytes(data));
            table.put(put);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 批量插入数据
     *
     * @param tableName
     * @param puts
     * @return
     */
    public static boolean putRows(String tableName, List<Put> puts) {
        try {
            Table table = HBaseConn.getTable(tableName);
            table.put(puts);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 获取单条数据
     *
     * @param tableName 表名
     * @param rowKey    唯一标识
     * @return 查询结果
     */
    public static Result getRow(String tableName, String rowKey) {
        try {
            Table table = HBaseConn.getTable(tableName);
            Get get = new Get(Bytes.toBytes(rowKey));
            return table.get(get);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 利用过滤器获取单条数据
     *
     * @param tableName
     * @param rowKey
     * @param filterList
     * @return
     */
    public static Result getRow(String tableName, String rowKey, FilterList filterList) {
        try {
            Table table = HBaseConn.getTable(tableName);
            Get get = new Get(Bytes.toBytes(rowKey));
            get.setFilter(filterList);
            return table.get(get);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * scan扫描表操作
     *
     * @param tableName
     * @return
     */
    public static ResultScanner getScanner(String tableName) {
        try {
            Table table = HBaseConn.getTable(tableName);
            Scan scan = new Scan();
            scan.setCaching(1000);
            return table.getScanner(scan);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 批量检索数据
     *
     * @param tableName   表名
     * @param startRowKey 起始RowKey
     * @param endRowKey   终止RowKey
     * @return resultScanner
     */
    public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey) {
        try {
            Table table = HBaseConn.getTable(tableName);
            Scan scan = new Scan();
            scan.setStartRow(Bytes.toBytes(startRowKey));
            scan.setStopRow(Bytes.toBytes(endRowKey));
            scan.setCaching(1000);
            return table.getScanner(scan);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 过滤扫描
     *
     * @param tableName
     * @param startRowKey
     * @param endRowKey
     * @param filterList
     * @return
     */
    public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, FilterList filterList) {
        try {
            Table table = HBaseConn.getTable(tableName);
            Scan scan = new Scan();
            scan.setStartRow(Bytes.toBytes(startRowKey));
            scan.setStopRow(Bytes.toBytes(endRowKey));
            scan.setFilter(filterList);
            scan.setCaching(1000);
            return table.getScanner(scan);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 删除一条记录
     *
     * @param tableName 表名
     * @param rowKey    唯一标识行
     * @return 是否删除成功
     */
    public static boolean deleteRow(String tableName, String rowKey) {
        try {
            Table table = HBaseConn.getTable(tableName);
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            table.delete(delete);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 删除列族
     *
     * @param tableName
     * @param cfName
     * @return
     */
    public static boolean deleteColumnFamily(String tableName, String cfName) {
        try {
            HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin();
            admin.deleteColumn(tableName, cfName);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    public static boolean deleteQualifier(String tableName, String rowKey, String cfName, String qualifier) {
        try {
            Table table = HBaseConn.getTable(tableName);
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            delete.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier));
            table.delete(delete);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;

    }
}















测试链接:

import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.junit.Test;

import java.io.IOException;

public class HBaseConnTest {

    @Test
    public void getConnTest() {
        Connection conn = HBaseConn.getHBaseConn();
        System.out.println(conn.isClosed());
        HBaseConn.closeConn();
        System.out.println(conn.isClosed());
    }

    @Test
    public void getTableTest() {
        try {
            Table table = HBaseConn.getTable("table2");
            System.out.println(table.getName().getNameAsString());
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


表操作测试:


import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

public class HBaseUtilTest {

    /**
     * 创建表
     */
    @Test
    public void createTable() {
        //交易记录表
       /* HBaseUtil.createTable("transaction_record_list", new String[]{"fileInfo"});*/

        //客户入金表
        /*HBaseUtil.createTable("customer_in_list", new String[]{"fileInfo"});*/

        //分析结果表
      /*  HBaseUtil.createTable("result_analysis",new String[]{"fileInfo"});*/

        //测试表
        HBaseUtil.createTable("test_list",new String[]{"fileInfo"});
    }

    /**
     * 向表中插入数据
     */
    @Test
    public void addFileDetails() {
   


/*        HBaseUtil.putRow("customer_in_list", "rowkey11", "fileInfo", "serial_num", "2");
        HBaseUtil.putRow("customer_in_list", "rowkey11", "fileInfo", "customer_num", "test00004");
        HBaseUtil.putRow("customer_in_list", "rowkey11", "fileInfo", "currency_in", "JPY");
        HBaseUtil.putRow("customer_in_list", "rowkey11", "fileInfo", "amount_in", "9823");
        HBaseUtil.putRow("customer_in_list", "rowkey11", "fileInfo", "date_in", "20180614");
        HBaseUtil.putRow("customer_in_list", "rowkey11", "fileInfo", "time_in", "103007");*/

    }


    /**
     * 检索数据
     */
    @Test
    public void getFileDetails() {
        Result result = HBaseUtil.getRow("transaction_record_list", "rowkey1");
        if (result != null) {
            System.out.println("rowKey=" + Bytes.toString(result.getRow()));
            System.out.println("transaction_record_list=" + Bytes
                    .toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("serial_num"))));
        }
    }

    /**
     * 扫描表
     */
    @Test
    public void ScanFileDetails() {
        ResultScanner scanner = HBaseUtil.getScanner("transaction_record_list", "rowkey1", "rowkey1");
        if (scanner != null) {
            scanner.forEach(result -> {
                System.out.println("rowKey=" + Bytes.toString(result.getRow()));
                System.out.println("transaction_record_list=" + Bytes
                        .toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("serial_num"))));
            });
            scanner.close();
        }
    }

    /**
     * 删除rowkey数据
     */
    @Test
    public void deleteRow() {
        HBaseUtil.deleteRow("FileTable", "rowkey1");
    }


    /**
     * 删除表
     */
    @Test
    public void deleteTable() {
        HBaseUtil.deleteTable("test_list");

        System.out.println("删除成功");
    }
}

转载附上链接:    https://blog.csdn.net/Godlike77/article/details/80913252

www.htsjk.Com true http://www.htsjk.com/hbase/42505.html NewsArticle HBase工具类, 数据库连接类: import org.apache.hadoop.conf.Configuration ; import org.apache.hadoop.hbase.HBaseConfiguration ; import org.apache.hadoop.hbase.TableName ; import org.apache.hadoop.hbase.client.Connection ; import o...
相关文章
    暂无相关文章
评论暂时关闭