《hbase学习》-01-eclipse下远程调试Hbase,
1.准备工作,安装好HABSE之后,执行hbase shell
create ‘表名称’, ‘列名称1’,’列名称2’,’列名称N’
create ‘表名称’,’列族名称’
在HBASE中列是可以动态添加的,只需要有个列族就可以了
create 'test_lcc_person','lcc_liezu'
然后添加一些数据key相同的是一条数据,一共有6条数据
put ‘表名称’, ‘rowkey(相当于关系数据的ID,必须唯一)’, ‘列族名称:列名称:’, ‘值’
put 'test_lcc_person','1','lcc_liezu:name:','梁川川1'
put 'test_lcc_person','1','lcc_liezu:sex:','男'
put 'test_lcc_person','1','lcc_liezu:age:','12'
put 'test_lcc_person','2','lcc_liezu:name:','梁川川2'
put 'test_lcc_person','2','lcc_liezu:sex:','男'
put 'test_lcc_person','2','lcc_liezu:age:','12'
put 'test_lcc_person','3','lcc_liezu:name:','梁川川3'
put 'test_lcc_person','3','lcc_liezu:sex:','男'
put 'test_lcc_person','3','lcc_liezu:age:','12'
put 'test_lcc_person','4','lcc_liezu:name:','梁川川4'
put 'test_lcc_person','4','lcc_liezu:sex:','男'
put 'test_lcc_person','4','lcc_liezu:age:','12'
put 'test_lcc_person','5','lcc_liezu:name:','梁川川5'
put 'test_lcc_person','5','lcc_liezu:sex:','男'
put 'test_lcc_person','5','lcc_liezu:age:','12'
put 'test_lcc_person','6','lcc_liezu:name:','梁川川6'
put 'test_lcc_person','6','lcc_liezu:sex:','男'
put 'test_lcc_person','6','lcc_liezu:age:','12'
hbase中数据如下
3。编写业务代码,主要业务代码是实现读取hbase中的数据打印在控制台
建立maven项目,然后把这些类放进去
画红线的都是重点
package com.kensure.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.kensure.mr.mapper.LoadHadoopMapper;
import com.kensure.mr.mapper.LoadHbaseMapper;
import com.kensure.mr.reducer.SaveHadoopReduce;
import com.kensure.mr.reducer.SaveHbaseReduce;
public class CalculateJob {
// private static final String input_path = "/data/mr/out/test";
// private static final String input_path = "/data/mr/out/test_j";
public void run(String[] args) throws IOException, InterruptedException,
ClassNotFoundException {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum", "192.168.10.82");
/**
* job 1
*/
//
Job job_1 = Job.getInstance(configuration, "MyTestJob1");
job_1.setJarByClass(CalculateJob.class);
Scan scan = new Scan();
scan.setCaching(1024);
scan.setCacheBlocks(false);
//test_lggj_zwf
//test_lggj_new_test
TableMapReduceUtil.initTableMapperJob(args[1], scan,
LoadHbaseMapper.class, Text.class, Text.class, job_1);
//Log.info("�������ݿ�"+args[1]);
System.out.println("================1=======�������ݿ�"+args[1]);
ChainMapper.addMapper(job_1, LoadHbaseMapper.class, ImmutableBytesWritable.class, Result.class, Text.class, Text.class, configuration);
ChainReducer.setReducer(job_1, SaveHadoopReduce.class, Text.class, Text.class, Text.class, Text.class, configuration);
job_1.setOutputFormatClass(TextOutputFormat.class);
job_1.setMapOutputKeyClass(Text.class);
job_1.setMapOutputValueClass(Text.class);
job_1.setOutputKeyClass(Text.class);
job_1.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job_1, new Path(args[0]));
System.out.println("================2=======��һ��jobִ�����"+args[0]);
/**
* job 2
*/
Job job_2 = Job.getInstance(configuration, "MyTestJob2");
job_2.setJarByClass(CalculateJob.class);
job_2.setMapperClass(LoadHadoopMapper.class);
job_2.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job_2, new Path(args[0]));
job_2.setMapOutputKeyClass(Text.class);
job_2.setMapOutputValueClass(Text.class);
//test_mr_zwf
//test_lggj_new_test_res1
TableMapReduceUtil.initTableReducerJob(args[2],SaveHbaseReduce.class, job_2);
if (job_1.waitForCompletion(true)) {
job_2.waitForCompletion(true);
}
System.out.println("================3=======��2��jobִ�����");
}
public static void main(String args[]) throws IOException,
InterruptedException, ClassNotFoundException {
System.setProperty("hadoop.home.dir", "E:\\02-hadoop\\hadoop-2.7.3\\");
System.setProperty("HADOOP_USER_NAME", "root");
String[] array=new String[3];
array[0] = "hdfs://bigdata01.hzjs.co/lcc/y";
array[1] = "test_lcc_person";
array[2] = "test_lcc_person_savea";
System.out.println("================4=======��ʼ");
CalculateJob myChainmapper = new CalculateJob();
myChainmapper.run(array);
System.out.println("================5=======����");
}
}
package com.kensure.mr.common;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class Common {
public static boolean isNUll(String str) {
if ( str == null || str.length() <= 0 ) {
return false;
}
return true;
}
public static boolean isDate(String str) {
boolean flag = false;
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd\\sHH:mm:ss");
try {
Date Date = dateFormat.parse(str);
flag = true;
} catch (ParseException e) {
e.printStackTrace();
}
return flag;
}
}
package com.kensure.mr.mapper;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.esotericsoftware.minlog.Log;
public class LoadHadoopMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println("================5=======LoadHadoopMapper==k="+key+" value="+value);
String[] v = value.toString().split("\t");
System.out.println("================6=======LoadHadoopMapper==v[0]="+v+" v[1]="+v[1]);
context.write(new Text(v[0]),new Text(v[1]));
}
}
package com.kensure.mr.mapper;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import com.kensure.mr.common.Common;
public class LoadHbaseMapper extends TableMapper<Text, Text> {
public void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException {
System.out.println("================7=======LoadHbaseMapper==row="+row);
StringBuffer str = new StringBuffer();
String rowkey = Bytes.toString(result.getRow());
String name = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("name")));
String sex = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("sex")));
String age = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("age")));
System.out.println("================8========rowkey=="+rowkey+"\n\r");
System.out.println("================8=========="+name+"\t"+sex+"\t"+age+"\t"+"\n\r");
}
}
package com.kensure.mr.reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SaveHadoopReduce extends Reducer <Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> its, Context context)
throws IOException, InterruptedException {
System.out.println("================12=========key="+key+"\n\r");
for(Text text : its){
}
// Iterator<Text> it = its.iterator();
// Map<String,String> maps = new TreeMap<String,String>();
// while ( it.hasNext() ) {
// String map = it.next().toString();
// String[] str = map.toString().split("\\|");
// maps.put(str[2], map);
// }
//
// List<String> list = new ArrayList<String>();
// list.addAll(maps.keySet());
// if ( list.size() >= 2 ) {
// for ( int x = 0; x < list.size() ; x++ ) {
// for ( int y = 0 ; y < list.size() ; y++ ) {
// if ( !list.get(x).equalsIgnoreCase(list.get(y)) ) {
// String map1 = maps.get( list.get(x) );
// String map2 = maps.get( list.get(y) );
// String newkey = list.get(x) + "-" + list.get(y);
// StringBuffer str = new StringBuffer("");
// str.append("").append(map1).append(",").append(map2).append("");
// context.write(new Text(newkey), new Text(str.toString()));
// }
// }
// }
// }
}
}
package com.kensure.mr.reducer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
public class SaveHbaseReduce extends
TableReducer<Text, Text, ImmutableBytesWritable> {
public void reduce(Text key, Iterable<Text> its, Context context)
throws IOException, InterruptedException {
System.out.println("================12=========key="+key+"\n\r");
/**
* 杈撳叆鐨刱ey褰㈠紡
* zjhm-zjhm
*/
Put put = new Put(Bytes.toBytes(key.toString()));
int sanmeFh = 0;
StringBuffer str = new StringBuffer();
Iterator<Text> it = its.iterator();
Map<String,Integer> tmp = new HashMap<String,Integer>();
//tmp.put("samefh_", 0);
while ( it.hasNext() ) {
Text text = it.next();
System.out.println("================13=========text=");
String[] persones = text.toString().split(",");
/**
* 杈撳嚭鐨剉alue褰㈠紡
* rowkey|lgbm|zjhm|name|fh|kssj|jssj|xzqh
*/
String[] person_1 = persones[0].split("\\|");
String[] person_2 = persones[1].split("\\|");
System.out.println("================13=========person_1="+person_1[0]);
System.out.println("================13=========person_1="+person_2[0]);
//鍚屾埧闂�
if ( person_1[4].equalsIgnoreCase(person_2[4]) ) {
sanmeFh ++;
}
System.out.println("================13=========sanmeFh="+sanmeFh);
//琛屾斂鍖哄垝
String xzqh = person_1[7];
System.out.println("================13=========xzqh="+xzqh);
if ( tmp.containsKey(xzqh) ) {
tmp.put(xzqh, tmp.get(xzqh)+1 );
} else {
tmp.put(xzqh,1 );
}
//璁剧疆瀛樺偍
if ( str.length() > 0 ) {
str.append(",");
}
str.append("[");
str.append(text.toString());
str.append("]");
}
boolean flag = false;
System.out.println("================14=========sanmeFh="+(sanmeFh >= 1));
//鍚屾埧闂�1娆′互涓�
if ( sanmeFh >= 1 ) {
flag = flag || true;
put.add(Bytes.toBytes("base"), Bytes.toBytes("samefh"), Bytes.toBytes(""+sanmeFh));
}
for ( String tmp_key : tmp.keySet() ) {
//鍚岃鏀垮尯鍒� 3娆′互涓�
if ( tmp.get(tmp_key) >= 3 ) {
flag = flag || true;
put.add(Bytes.toBytes("base"), Bytes.toBytes("samexzqh"), Bytes.toBytes(tmp.get(tmp_key)+""));
}
}
//涓嶅悓琛屾斂鍖哄垝涓ゆ
if ( tmp.size() >= 2 ) {
flag = flag || true;
put.add(Bytes.toBytes("base"), Bytes.toBytes("unsamexzqh"), Bytes.toBytes(tmp.size()+""));
}
if ( true ) {
put.add(Bytes.toBytes("base"), Bytes.toBytes("detail"), Bytes.toBytes(str.toString()));
System.out.println("================15=========="+put);
context.write(null, put);
}
System.out.println("================16==========");
}
}
4。下载hadoo2.7.3的hadoop.dll和winutils.exe 这两个文件,把hadoop.dll放到本地电脑的C:\windows/system32下,重启电脑,然后把winutils.exe文件放到服务器的/opt/hzjs/hadoop-2.7.3/bin/目录下,每个集群的机器都要放,把集群的hadoop拷贝一份放到本地E:\02-hadoop\hadoop-2.7.3\目录下,把里面的配置文件拷贝到项目上图中。
没有做这个winutils.exe会出现这样的问题
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:379)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:394)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:387)
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)
at org.apache.hadoop.security.SecurityUtil.getAuthenticationMethod(SecurityUtil.java:611)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:273)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:261)
没有做这个hadoop.dll会出现这样的问题
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
5。然后直接运行打印结果如下
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。