概述
- 源自与谷歌的MapReduce的论文,发表于2004年12月
- Hadoop MapReduce 是 Google MapReduce的克隆版
- MapReduce的有点 : 海量数据的离线处理&易开发&易运行
- MapReduce的缺点 : 实时流式计算
input --> spliting --> Maping -->
shuffling(将不同的key分到一起) --> Reduceing --> Final result
读取流程
- InputFormat 读取Hdfs里面的数据
- 读取的数据转为Split
- Split转为RecordReader(一行数据)
- Map
- Partition(Shuffle)过程
- Reduce
MapReduce编程模型核心概念
- Split
- 将输入的数据进行拆分,拆分为RecordReader(一行数据)
- InputFormat
- 将RecordReader进行读取,处理
- OutPutFormat
- 将处理了的数据写出到指定地方
- Combiner
- 设置Combiner: job.setComnierClass()
- 在map端做一个聚合,后再将数据传到Reduce,这个操作其实和Reduce的逻辑是一样的
- 算除法的时候,比如平均数的时候,先聚合的话会导致分母出问题,需要慎重考虑
- Partitioner –>按key分发map后的值
使用自定义类型进行MapReduce
自定义Hadoop类型处理类
- 实现Writable接口
- 定义空构造函数
- 复写write、readFields方法
- write: 将各个字段写入进去 out.writeUTF(phone) ….
- readFields: 将字段的值读取出来 in.readUTF()….
- 读取字段的值的顺序必须和写入字段的顺序相同
自定义Mapper
- 定义使用
- 复写map方法,分割数据,进行整理
public class AccessMapper extends
Mapper<LongWritable, Text,Text,Access> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] lines =
value.toString().split("\\s+");
String phone = lines[1];// 取出手机号
long up = Long.parseLong(lines[lines.length-3]);// 取出上行流量
long down = Long.parseLong(lines[lines.length-2]);// 取出下行流量
context.write(
new Text(phone),
new Access(phone,up,down));
}
}
自定义Reducer
- 继承Reducer
- 复写Reduce方法
public class AccessReducer extends
Reducer<Text,Access,Text,Access> {
@Override
protected void reduce(Text key, Iterable<Access> values, Context context) throws IOException, InterruptedException {
long ups = 0;
long downs = 0;
for(Access access : values){
ups += access.getUp();
downs += access.getDown();
}
context.write(key,
new Access(key.toString(),ups,downs));
}
}
//若不想再输出key,只想输出Access的信息
public class AccessReducer extends
Reducer<Text,Access, NullWritable,Access> {
...
context.write(
NullWritable.get(),
new Access(key.toString(),ups,downs));
}
默认shuffle规则
HashPartitioner 是默认的分区方式 规则 : 分发的key的hash值与reduce task的个数取模
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public HashPartitioner() {
}
// numReduceTasks : 指定的Reducer的个数,决定了reduce作业输出文件的个数
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & 2147483647) % numReduceTasks;
}
}
自定义分区规则
- 继承Partitioner两个范型为map输出的key和value值
- 复写getPartition方法
- Job中设置Partition和分区数量
public class AccessPartitioner extends
Partitioner<Text,Access> {
@Override
public int getPartition(Text phone, Access access, int i) {
// 将15开头的数据,放在第0个
if(phone.toString().startsWith("15")){
return 0;
}else if (phone.toString().startsWith("17")){
return 1;
}else{
return 2;
}
}
}
//JOB
// 设置自定义分区规则
job.setPartitionerClass(AccessPartitioner.class);
// 设置分区个数
job.setNumReduceTasks(3);
启动
public static void main(String args[])throws Exception{
Configuration configuration = new Configuration();
// 设置参数 可在此指定调用远程hdfs文件系统
Job job = Job.getInstance(configuration);
// 设置启动类名称
job.setJarByClass(AccessApp.class);
// 设置自定义map类
job.setMapperClass(AccessMapper.class);
// 设置自定义reduce类
job.setReducerClass(AccessReducer.class);
// 设置map输出的key,输入的key默认为LongWritable(表示行号)
job.setMapOutputKeyClass(Text.class);
// 设置map输出的value, 输入的value默认为那一行数据
job.setMapOutputValueClass(Access.class);
// 设置reduce输出的key的类型
job.setOutputKeyClass(Text.class);
// 设置reduce输出的value的类型
job.setOutputValueClass(Access.class);
// 设置自定义分区规则
job.setPartitionerClass(AccessPartitioner.class);
// 设置分区个数
job.setNumReduceTasks(3);
// 设置输入文件路径
FileInputFormat.setInputPaths(
job,
new Path("access/input")
);
// 输出文件路径
FileOutputFormat.setOutputPath(
job,
new Path("access/output")
);
job.waitForCompletion(true);
}