hadoop常用算法简单实例,hadoop实例
实例一、对以下数据进行排序,根据收入减去支出得到最后结余从大到小排序,数据如下:
SumStep运行之后结果如下:
SortStep运行之后结果为上图根据结余从大到小排序。
代码如下:
public class InfoBean implements WritableComparable<InfoBean>{
private String account;
private double income;
private double expenses;
private double surplus;
public void set(String account, double income, double expenses){
this.account = account;
this.income = income;
this.expenses = expenses;
this.surplus = income - expenses;
}
@Override
public String toString() {
return this.income + "\t" + this.expenses + "\t" + this.surplus;
}
/**
* serialize
*/
public void write(DataOutput out) throws IOException {
out.writeUTF(account);
out.writeDouble(income);
out.writeDouble(expenses);
out.writeDouble(surplus);
}
/**
* deserialize
*/
public void readFields(DataInput in) throws IOException {
this.account = in.readUTF();
this.income = in.readDouble();
this.expenses = in.readDouble();
this.surplus = in.readDouble();
}
public int compareTo(InfoBean o) {
if(this.income == o.getIncome()){
return this.expenses > o.getExpenses() ? 1 : -1;
} else {
return this.income > o.getIncome() ? -1 : 1;
}
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public double getIncome() {
return income;
}
public void setIncome(double income) {
this.income = income;
}
public double getExpenses() {
return expenses;
}
public void setExpenses(double expenses) {
this.expenses = expenses;
}
public double getSurplus() {
return surplus;
}
public void setSurplus(double surplus) {
this.surplus = surplus;
}
}public class SumStep {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SumStep.class);
job.setMapperClass(SumMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
private InfoBean bean = new InfoBean();
private Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// split
String line = value.toString();
String[] fields = line.split("\t");
// get useful field
String account = fields[0];
double income = Double.parseDouble(fields[1]);
double expenses = Double.parseDouble(fields[2]);
k.set(account);
bean.set(account, income, expenses);
context.write(k, bean);
}
}
public static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{
private InfoBean bean = new InfoBean();
@Override
protected void reduce(Text key, Iterable<InfoBean> v2s, Context context)
throws IOException, InterruptedException {
double in_sum = 0;
double out_sum = 0;
for(InfoBean bean : v2s){
in_sum += bean.getIncome();
out_sum += bean.getExpenses();
}
bean.set("", in_sum, out_sum);
context.write(key, bean);
}
}
}
此处的输入为SumStep的输出而不是源文件作为输入,当然也可以将两个job合并到一起执行,此处不再讨论。
public class SortStep {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SortStep.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(InfoBean.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{
private InfoBean bean = new InfoBean();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String account = fields[0];
double income = Double.parseDouble(fields[1]);
double expenses = Double.parseDouble(fields[2]);
bean.set(account, income, expenses);
context.write(bean, NullWritable.get());
}
}
public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
private Text k = new Text();
@Override
protected void reduce(InfoBean bean, Iterable<NullWritable> v2s, Context context)
throws IOException, InterruptedException {
String account = bean.getAccount();
k.set(account);
context.write(k, bean);
}
}
}实例二、倒排索引,过程如下:
Map阶段
<0,"hello tom">
....
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
--------------------------------------------------------
combiner阶段
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
context.write("hello","a.txt->5");
context.write("hello","b.txt->3");
--------------------------------------------------------
Reducer阶段
<"hello",{"a.txt->5","b.txt->3"}>
context.write("hello","a.txt->5 b.txt->3");
-------------------------------------------------------
hello "a.txt->5 b.txt->3"
tom "a.txt->2 b.txt->1"
kitty "a.txt->1"
.......
代码如下:
public class InverseIndex {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置jar
job.setJarByClass(InverseIndex.class);
//设置Mapper相关的属性
job.setMapperClass(IndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));//words.txt
//设置Reducer相关属性
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setCombinerClass(IndexCombiner.class);
//提交任务
job.waitForCompletion(true);
}
public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{
private Text k = new Text();
private Text v = new Text();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
FileSplit inputSplit = (FileSplit) context.getInputSplit();
Path path = inputSplit.getPath();
String name = path.getName();
for(String f : fields){
k.set(f + "->" + name);
v.set("1");
context.write(k, v);
}
}
}
public static class IndexCombiner extends Reducer<Text, Text, Text, Text>{
private Text k = new Text();
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] fields = key.toString().split("->");
long sum = 0;
for(Text t : values){
sum += Long.parseLong(t.toString());
}
k.set(fields[0]);
v.set(fields[1] + "->" + sum);
context.write(k, v);
}
}
public static class IndexReducer extends Reducer<Text, Text, Text, Text>{
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String value = "";
for(Text t : values){
value += t.toString() + " ";
}
v.set(value);
context.write(key, v);
}
}
}实例三、使用Partitioner使相同或者相似的数据传递到相同的reduce:
数据格式如下,分别代表手机号,上行流量,下行流量:
代码如下:
public class DataBean implements Writable {
private String telNo;
private long upPayLoad;
private long downPayLoad;
private long totalPayLoad;
public DataBean(){}
public DataBean(String telNo, long upPayLoad, long downPayLoad) {
super();
this.telNo = telNo;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = upPayLoad + downPayLoad;
}
@Override
public String toString() {
return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad;
}
/**
* 序列化
* 注意:1.类型 2.顺序
*/
public void write(DataOutput out) throws IOException {
out.writeUTF(telNo);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
/**
* 反序列化
*/
public void readFields(DataInput in) throws IOException {
this.telNo = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}
public String getTelNo() {
return telNo;
}
public void setTelNo(String telNo) {
this.telNo = telNo;
}
public long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
}
public class DataCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DataCount.class);
job.setMapperClass(DCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(DCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置reduce默认的Partitioner
job.setPartitionerClass(ServiceProviderPartitioner.class);
//此处需要设置reduce的数量
job.setNumReduceTasks(Integer.parseInt(args[2]));
job.waitForCompletion(true);
}
public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//接收数据
String line = value.toString();
//分割数据
String[] fields = line.split("\t");
//获取有效字段,封装到对象里面
//手机号
String telNo = fields[0];
//上行流量
long up = Long.parseLong(fields[1]);
//下行流量
long down = Long.parseLong(fields[2]);
//封装数据,new DataBean
DataBean bean = new DataBean(telNo, up, down);
//输出
context.write(new Text(telNo), bean);
}
}
public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{
@Override
protected void reduce(Text key, Iterable<DataBean> v2s, Context context)
throws IOException, InterruptedException {
//定义计数器
long up_sum = 0;
long down_sum = 0;
//迭代v2s,进行求和
for(DataBean bean : v2s){
up_sum += bean.getUpPayLoad();
down_sum += bean.getDownPayLoad();
}
//封装数据
DataBean bean = new DataBean("", up_sum, down_sum);
//输出
context.write(key, bean);
}
}
public static class ServiceProviderPartitioner extends Partitioner<Text, DataBean>{
private static Map<String, Integer> providerMap = new HashMap<String, Integer>();
static {
providerMap.put("139", 1);
providerMap.put("138", 2);
providerMap.put("159", 3);
}
@Override
public int getPartition(Text key, DataBean value, int number) {
String telNo = key.toString();
String pcode = telNo.substring(0, 3);
Integer p = providerMap.get(pcode);
if(p == null){
p = 0;
}
return p;
}
}
}实例四、实现以下简单算法,其中mr程序涉及到了reduce分组等概念:
#当第一列相同时,求出第二列的最小值
3 3
3 2
3 1
2 2
2 1
1 1
----------结果---------
3 1
2 1
1 1public class GroupApp {
static final String INPUT_PATH = "hdfs://xxx:9000/input";
static final String OUT_PATH = "hdfs://xxx:9000/out";
public static void main(String[] args) throws Exception{
final Configuration configuration = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
if(fileSystem.exists(new Path(OUT_PATH))){
fileSystem.delete(new Path(OUT_PATH), true);
}
final Job job = Job.getInstance(configuration, GroupApp.class.getSimpleName());
//1.1 指定输入文件路径
FileInputFormat.setInputPaths(job, INPUT_PATH);
//指定哪个类用来格式化输入文件
job.setInputFormatClass(TextInputFormat.class);
//1.2指定自定义的Mapper类
job.setMapperClass(MyMapper.class);
//指定输出<k2,v2>的类型
job.setMapOutputKeyClass(NewK2.class);
job.setMapOutputValueClass(LongWritable.class);
//1.3 指定分区类
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
//1.4 TODO 排序、分区
job.setGroupingComparatorClass(MyGroupingComparator.class);
//1.5 TODO (可选)合并
//2.2 指定自定义的reduce类
job.setReducerClass(MyReducer.class);
//指定输出<k3,v3>的类型
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
//2.3 指定输出到哪里
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
//设定输出文件的格式化类
job.setOutputFormatClass(TextOutputFormat.class);
//把代码提交给JobTracker执行
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>. throws java.io.IOException ,InterruptedException {
final String[] splited = value.toString().split("\t");
final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
context.write(k2, v2);
};
}
static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
long min = Long.MAX_VALUE;
for (LongWritable v2 : v2s) {
if(v2.get()<min){
min = v2.get();
}
}
context.write(new LongWritable(k2.first), new LongWritable(min));
};
}
/**
* 问:为什么实现该类?
* 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2
*
*/
static class NewK2 implements WritableComparable<NewK2>{
Long first;
Long second;
public NewK2(){}
public NewK2(long first, long second){
this.first = first;
this.second = second;
}
public void readFields(DataInput in) throws IOException {
this.first = in.readLong();
this.second = in.readLong();
}
public void write(DataOutput out) throws IOException {
out.writeLong(first);
out.writeLong(second);
}
/**
* 当k2进行排序时,会调用该方法.
* 当第一列不同时,升序;当第一列相同时,第二列升序
*/
public int compareTo(NewK2 o) {
final long minus = this.first - o.first;
if(minus !=0){
return (int)minus;
}
return (int)(this.second - o.second);
}
@Override
public int hashCode() {
return this.first.hashCode()+this.second.hashCode();
}
@Override
public boolean equals(Object obj) {
if(!(obj instanceof NewK2)){
return false;
}
NewK2 oK2 = (NewK2)obj;
return (this.first==oK2.first)&&(this.second==oK2.second);
}
}
/**
* 问:为什么自定义该类?
* 答:业务要求分组是按照第一列分组,但是NewK2的比较规则决定了不能按照第一列分。只能自定义分组比较器。
*/
static class MyGroupingComparator implements RawComparator<NewK2>{
public int compare(NewK2 o1, NewK2 o2) {
return (int)(o1.first - o2.first);
}
/**
* @param arg0 表示第一个参与比较的字节数组
* @param arg1 表示第一个参与比较的字节数组的起始位置
* @param arg2 表示第一个参与比较的字节数组的偏移量
*
* @param arg3 表示第二个参与比较的字节数组
* @param arg4 表示第二个参与比较的字节数组的起始位置
* @param arg5 表示第二个参与比较的字节数组的偏移量
*/
public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
int arg4, int arg5) {
return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8);
}
}
}
实例五:利用MapReduce求解海量数据文件中的最大值,利用Mapper类中的cleanup()函数,因为cleanup()函数是在所有的map()完成之后才执行的。
public class TopKApp {
static final String INPUT_PATH = "hdfs://xxx:9000/input";
static final String OUT_PATH = "hdfs://xxx:9000/out";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUT_PATH);
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}
final Job job = Job.getInstance(conf , WordCountApp.class.getSimpleName());
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
long max = Long.MIN_VALUE;
protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
final long temp = Long.parseLong(v1.toString());
if(temp>max){
max = temp;
}
};
protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
context.write(new LongWritable(max), NullWritable.get());
};
}
static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
long max = Long.MIN_VALUE;
protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException {
final long temp = k2.get();
if(temp>max){
max = temp;
}
};
protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
context.write(new LongWritable(max), NullWritable.get());
};
}
}实力六、计数器:
public class WordCountApp {
static final String INPUT_PATH = "hdfs://xxx:9000/hello";
static final String OUT_PATH = "hdfs://xxx:9000/out";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUT_PATH);
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}
final Job job = Job.getInstance(conf , WordCountApp.class.getSimpleName());
//1.1指定读取的文件位于哪里
FileInputFormat.setInputPaths(job, INPUT_PATH);
//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
//job.setInputFormatClass(TextInputFormat.class);
//1.2 指定自定义的map类
job.setMapperClass(MyMapper.class);
//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
//job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(LongWritable.class);
//1.3 分区
//job.setPartitionerClass(HashPartitioner.class);
//有一个reduce任务运行
//job.setNumReduceTasks(1);
//1.4 TODO 排序、分组
//1.5 TODO 规约
//2.2 指定自定义reduce类
job.setReducerClass(MyReducer.class);
//指定reduce的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//2.3 指定写出到哪里
FileOutputFormat.setOutputPath(job, outPath);
//指定输出文件的格式化类
//job.setOutputFormatClass(TextOutputFormat.class);
//把job提交给JobTracker运行
job.waitForCompletion(true);
}
/**
* KEYIN 即k1 表示行的偏移量
* VALUEIN 即v1 表示行文本内容
* KEYOUT 即k2 表示行中出现的单词
* VALUEOUT 即v2 表示行中出现的单词的次数,固定值1
*/
static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
final String line = v1.toString();
if(line.contains("hello")){
//记录敏感词出现在一行中
helloCounter.increment(1L);
}
final String[] splited = line.split("\t");
for (String word : splited) {
context.write(new Text(word), new LongWritable(1));
}
};
}
/**
* KEYIN 即k2 表示行中出现的单词
* VALUEIN 即v2 表示行中出现的单词的次数
* KEYOUT 即k3 表示文本中出现的不同单词
* VALUEOUT 即v3 表示文本中出现的不同单词的总次数
*
*/
static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
long times = 0L;
for (LongWritable count : v2s) {
times += count.get();
}
ctx.write(k2, new LongWritable(times));
};
}
}
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。