欢迎投稿

今日深度:

hbase 编程,

hbase 编程,


import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Repository;
import com.sf.bdp.dao.define.MessageCountDao;
import com.sf.bdp.model.entity.MessageCount;
import com.sf.bdp.util.HbaseUtil;


@Repository("messageCountDao")
@PropertySource(value = { "classpath:application.properties" })
public class MessageCountDaoImpl extends AbstractDao<Long, MessageCount>implements MessageCountDao {


@Value("${hbase.zk.hosts}")
private String zk;
@Value("${hbase.zk.port}")
private String port;
@Value("${hbase.messageCount.table}")
private String tableName;
@Autowired
Logger logger;


private MessageCount covertoMessageCount(Result rs) {
MessageCount msgCount = new MessageCount();




for (Cell cell : rs.rawCells()) {


long rowKey = Bytes.toLong(rs.getRow());
msgCount.setTimestamp(rowKey);
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)).trim();


byte[] value = CellUtil.cloneValue(cell);


if ("f1".equals(family)) {
switch (qualifier) {
case "mc":
System.out.println("value:" + Bytes.toLong(value));
;
break;
default:
break;
}


}
}


return msgCount;
}





@Override
public List<MessageCount> find(String month, String day, String hour) {


Date currentDate = new Date();
int year = currentDate.getMonth() + 1 < Integer.valueOf(month) ? currentDate.getYear() + 1899
: currentDate.getYear() + 1900;


String maxHour = "23";
String minHour = "00";


if (null != hour) {
maxHour = hour;
minHour = hour;
}


String beginTime = year + "-" + month + "-" + day + " " + minHour + ":00:00";
String endTime = year + "-" + month + "-" + day + " " + maxHour + ":59:59";


SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long beginRowkey = 0l;
long endRowkey = 0l;
try {
beginRowkey = sdf.parse(beginTime).getTime();
endRowkey = sdf.parse(endTime).getTime();
} catch (ParseException e1) {
e1.printStackTrace();


}


System.out.println("beginRowkey:" + sdf.format(new Date(beginRowkey)));
System.out.println("endRowkey:" + sdf.format(new Date(endRowkey)));


List<MessageCount> list = new ArrayList<>();


try {
Connection connection = HbaseUtil.getConnection(zk, port);
Table table = connection.getTable(TableName.valueOf(tableName));


Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(beginRowkey));
scan.setStopRow(Bytes.toBytes(endRowkey));


ResultScanner rsc = table.getScanner(scan);


for (Result rs : rsc) {
list.add(covertoMessageCount(rs));


}


} catch (Exception e) {
e.printStackTrace();
logger.debug(e);
}


return list;


}


/**
* 在date时间的基础上,在timeType类型上提前beforeTime

* @param date
*            时间,基准时间
* @param timeType
*            时间类型,取值和Calendar定义的时间类型的值一样,比如分钟是12即Calendar.MINUTE的值
* @param beforeTime
*            时间量,比如前五分钟,beforTime的值就是5
*/
@Override
public List<MessageCount> findBefore(Date date, int timeType, int beforeTime) {


Calendar c = Calendar.getInstance();
c.setTime(date);
long endRowkey = c.getTimeInMillis();


int typeTime = c.get(timeType);
c.set(timeType, typeTime - beforeTime);
long beginRowkey = c.getTimeInMillis();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("beginRowkey:" + sdf.format(new Date(beginRowkey)));
System.out.println("endRowkey:" + sdf.format(new Date(endRowkey)));


List<MessageCount> list = new ArrayList<>();


try {
Connection connection = HbaseUtil.getConnection(zk, port);
Table table = connection.getTable(TableName.valueOf(tableName));


Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(beginRowkey));
scan.setStopRow(Bytes.toBytes(endRowkey));


ResultScanner rsc = table.getScanner(scan);


for (Result rs : rsc) {
list.add(covertoMessageCount(rs));


}


} catch (Exception e) {
e.printStackTrace();


}


return list;
}


}


public class HbaseUtil {


public static Connection getConnection(String zk, String port) throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zk);
conf.set("hbase.zookeeper.property.clientPort", port);
return ConnectionFactory.createConnection(conf);
}


}


application.properties


jdbc.driverClassName = com.mysql.jdbc.Driver


jdbc.url = jdbc:mysql://xxx.xxx.xxx.xxx:3306/database


jdbc.username = username
jdbc.password = password
hibernate.dialect = org.hibernate.dialect.MySQLDialect
hibernate.show_sql = true
hibernate.format_sql = true




hbase.zk.hosts = host1,host2,host3


hbase.zk.port = 2181
hbase.message.table = tablename
hbase.messageCount.table = tablename2


其他功能的实现

import java.io.IOException;  
import java.util.ArrayList;  
import java.util.List;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.hbase.Cell;  
import org.apache.hadoop.hbase.CellUtil;  
import org.apache.hadoop.hbase.HBaseConfiguration;  
import org.apache.hadoop.hbase.HColumnDescriptor;  
import org.apache.hadoop.hbase.HTableDescriptor;  
import org.apache.hadoop.hbase.MasterNotRunningException;  
import org.apache.hadoop.hbase.TableName;  
import org.apache.hadoop.hbase.ZooKeeperConnectionException;  
import org.apache.hadoop.hbase.client.Admin;  
import org.apache.hadoop.hbase.client.Connection;    
import org.apache.hadoop.hbase.client.Delete;  
import org.apache.hadoop.hbase.client.Get;  
import org.apache.hadoop.hbase.client.Put;  
import org.apache.hadoop.hbase.client.Result;  
import org.apache.hadoop.hbase.client.ResultScanner;  
import org.apache.hadoop.hbase.client.Scan;  
import org.apache.hadoop.hbase.client.Table;  
import org.apache.hadoop.hbase.util.Bytes;


import com.sf.bdp.util.HbaseUtil;  
  
public class HBaseDemo {  
  
    public static Configuration conf;  
  
    static {  
        conf = HBaseConfiguration.create();  
       // conf.addResource("hbase-site.xml");  
    }  
  
    /** 
     * 创建表 
     *  
     * @param tablename 表名 
     * @param columnFamily 列族 
     * @throws MasterNotRunningException 
     * @throws ZooKeeperConnectionException 
     * @throws IOException 
     */  
    public static void createTable(String tablename, String columnFamily)  
            throws MasterNotRunningException, IOException, ZooKeeperConnectionException {  
        //Connection conn = ConnectionFactory.createConnection(conf);  
        Connection conn = HbaseUtil.getConnection("centos1,centos2,centos3", "2181");
        Admin admin = conn.getAdmin();  
        try {  
            if (admin.tableExists(TableName.valueOf(tablename))) {  
                System.out.println(tablename + " already exists");  
            } else {  
                TableName tableName = TableName.valueOf(tablename);  
                HTableDescriptor tableDesc = new HTableDescriptor(tableName);  
                tableDesc.addFamily(new HColumnDescriptor(columnFamily));  
                admin.createTable(tableDesc);  
                System.out.println(tablename + " created succeed");  
            }  
        } finally {  
            admin.close();  
            conn.close();  
        }  
    }  
      
    /** 
     * 向表中插入一条新数据 
     *  
     * @param tableName 表名 
     * @param row 行键key 
     * @param columnFamily 列族 
     * @param column 列名 
     * @param data 要插入的数据 
     * @throws IOException 
     */  
    public static void putData(String tableName, String row, String columnFamily, String column, String data)  
            throws IOException {  
      /*  Connection conn = ConnectionFactory.createConnection(conf);  */
    Connection conn = HbaseUtil.getConnection("centos1,centos2,centos3", "2181");
        Table table = conn.getTable(TableName.valueOf(tableName));  
        try {  
            Put put = new Put(Bytes.toBytes(row));  
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));  
            table.put(put);  
//          System.out.println("put '" + row + "','" + columnFamily + ":" + column + "','" + data + "'");  
        } finally {  
            table.close();  
            conn.close();  
        }  
    }  
      
    /** 
     * add a column family to an existing table 
     *  
     * @param tableName table name  
     * @param columnFamily column family 
     * @throws IOException 
     */  
    public static void putFamily(String tableName, String columnFamily) throws IOException {  
    Connection conn = HbaseUtil.getConnection("centos1,centos2,centos3", "2181");
        Admin admin = conn.getAdmin();  
        try {  
            if (!admin.tableExists(TableName.valueOf(tableName))) {  
                System.out.println(tableName + " not exists");  
            } else {  
                admin.disableTable(TableName.valueOf(tableName));  
                  
                HColumnDescriptor cf1 = new HColumnDescriptor(columnFamily);  
                admin.addColumn(TableName.valueOf(tableName), cf1);  
                  
                admin.enableTable(TableName.valueOf(tableName));  
                System.out.println(TableName.valueOf(tableName) + ", " + columnFamily + " put succeed");  
            }  
        } finally {  
            admin.close();  
            conn.close();  
        }  
    }  
      
    /** 
     * 根据key读取一条数据 
     *  
     * @param tableName 表名 
     * @param row 行键key 
     * @param columnFamily 列族 
     * @param column 列名 
     * @throws IOException 
     */  
    public static void getData(String tableName, String row, String columnFamily, String column) throws IOException{  
    Connection conn = HbaseUtil.getConnection("centos1,centos2,centos3", "2181");
        Table table = conn.getTable(TableName.valueOf(tableName));  
        try{  
            Get get = new Get(Bytes.toBytes(row));  
            Result result = table.get(get);  
            byte[] rb = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(column));  
            String value = new String(rb, "UTF-8");  
            System.out.println(value);  
        } finally {  
            table.close();  
            conn.close();  
        }  
    }  
  
    /** 
     * get all data of a table 
     *  
     * @param tableName table name 
     * @throws IOException 
     */  
    public static void scanAll(String tableName) throws IOException {  
    Connection conn = HbaseUtil.getConnection("centos1,centos2,centos3", "2181");
        Table table = conn.getTable(TableName.valueOf(tableName));  
        try {  
            Scan scan = new Scan();  
            ResultScanner resultScanner = table.getScanner(scan);  
            for(Result result : resultScanner){  
                List<Cell> cells = result.listCells();  
                for(Cell cell : cells){  
                    String row = new String(result.getRow(), "UTF-8");  
                    String family = new String(CellUtil.cloneFamily(cell), "UTF-8");  
                    String qualifier = new String(CellUtil.cloneQualifier(cell), "UTF-8");  
                    String value = new String(CellUtil.cloneValue(cell), "UTF-8");  
                    System.out.println("[row:"+row+"],[family:"+family+"],[qualifier:"+qualifier+"],[value:"+value+"]");  
                }  
            }  
        } finally {  
            table.close();  
            conn.close();  
        }  
    }  
      
    /** 
     * delete a data by row key 
     *  
     * @param tableName table name 
     * @param rowKey row key 
     * @throws IOException 
     */  
    public static void delData(String tableName, String rowKey) throws IOException {  
    Connection conn = HbaseUtil.getConnection("centos1,centos2,centos3", "2181"); 
        Table table = conn.getTable(TableName.valueOf(tableName));  
        try {  
            List<Delete> list = new ArrayList<Delete>();  
            Delete del = new Delete(rowKey.getBytes());  
            list.add(del);  
            table.delete(list);  
            System.out.println("delete record " + rowKey + " ok");  
        } finally {  
            table.close();  
            conn.close();  
        }  
    }  
      
    /** 
     * delete a column's value of a row 
     *  
     * @param tableName table name 
     * @param rowKey row key 
     * @param familyName family name 
     * @param columnName column name 
     * @throws IOException 
     */  
    public static void deleteColumn(String tableName, String rowKey, String familyName, String columnName) throws IOException {  
    Connection conn = HbaseUtil.getConnection("centos1,centos2,centos3", "2181");
        Table table = conn.getTable(TableName.valueOf(tableName));  
        try{  
            Delete del = new Delete(Bytes.toBytes(rowKey));  
            del.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));  
            List<Delete> list = new ArrayList<Delete>(1);  
            list.add(del);  
            table.delete(list);  
            System.out.println("[table:"+tableName+"],row:"+rowKey+"],[family:"+familyName+"],[qualifier:"+columnName+"]");  
        } finally {  
            table.close();  
            conn.close();  
        }  
    }  
  
    /** 
     * delete a columnFamily's all columns value of a row  
     *  
     * @param tableName table name 
     * @param rowKey row key 
     * @param familyName family name 
     * @throws IOException 
     */  
    public static void deleteFamily(String tableName, String rowKey, String familyName) throws IOException {  
    Connection conn = HbaseUtil.getConnection("centos1,centos2,centos3", "2181");
        Table table = conn.getTable(TableName.valueOf(tableName));  
        try{  
            Delete del = new Delete(Bytes.toBytes(rowKey));  
            del.addFamily(Bytes.toBytes(familyName));  
            List<Delete> list = new ArrayList<Delete>(1);  
            list.add(del);  
            table.delete(list);  
            System.out.println("[table:"+tableName+"],row:"+rowKey+"],[family:"+familyName+"]");  
        } finally {  
            table.close();  
            conn.close();  
        }  
    }  
  
    /** 
     * delete a table 
     *  
     * @param tableName table name 
     * @throws IOException 
     * @throws MasterNotRunningException 
     * @throws ZooKeeperConnectionException 
     */  
    public static void deleteTable(String tableName) throws IOException, MasterNotRunningException, ZooKeeperConnectionException {  
    Connection conn = HbaseUtil.getConnection("centos1,centos2,centos3", "2181");
        Admin admin = conn.getAdmin();  
        try {  
            admin.disableTable(TableName.valueOf(tableName));  
            admin.deleteTable(TableName.valueOf(tableName));  
            System.out.println("delete table " + tableName + " ok");  
        } finally {  
            admin.close();  
            conn.close();  
        }  
    }  
      
    public static void main(String[] args) {  
        System.err.println("start...");  
//      String tableName = "student";  
        String tableName = "test";  
        try{  
//          createTable(tableName, "stu");  
//          createTable(tableName + "2", "stu");  
            createTable(tableName, "f1");  
            putData(tableName, "row_1", "f1", "no", "0001");  
            putData(tableName, "row_2", "f1", "rec_date", "2016-06-03");  
            putData(tableName, "row_3", "f1", "rec_time", "09:49:00");  
            putData(tableName, "row_4", "f1", "power", "154.24");  
            System.out.println("put1");
            scanAll(tableName); 
          putData(tableName, "row_1", "f1", "stu_id", "001");  
          putData(tableName, "row_2", "f1", "stu_id", "002");  
          putData(tableName, "row_5", "f1", "stu_id", "003");  
          getData(tableName, "row_5", "f1", "stu_id");  
          delData(tableName, "row_1");  
          System.out.println("put2");
          scanAll(tableName);  
          deleteTable(tableName + "2");  
          System.out.println("add family score");
          putFamily(tableName, "score");  
          putData(tableName, "row_5", "score", "chinese", "90");  
          putData(tableName, "row_6", "score", "math", "91");  
          scanAll(tableName);
          System.out.println("deleteColumn row4 score chinese");
          deleteColumn(tableName, "row_4", "score", "chinese"); 
          scanAll(tableName);
          System.out.println("deleteFamily row5 score ");
          deleteFamily(tableName, "row_5", "score");  
            scanAll(tableName);  
        } catch(Exception ex){  
            ex.printStackTrace();  
        }  
        System.err.println("end...");  
    }  


}  

www.htsjk.Com true http://www.htsjk.com/hbase/40111.html NewsArticle hbase 编程, import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import o...
相关文章
    暂无相关文章
评论暂时关闭