寒玉 Blog
  • Home
  • Books
  • About Me
  • Categories
  • Tags
  • Archives

Hadoop mapreduce multiple files


在我们一般的mapreduce程序中,我们只输入一种格式的文件,如果要输入多种问文件格式怎么办呢?

MapReduce多文件输入

MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class, CounterMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]),TextInputFormat.class, CountertwoMapper.class);

mulit file input

We use MultipleInputs class which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path. To understand the concept more clearly let us take a case where user want to take input from two input files with similar structure. Also assume that both the input files have 2 columns, first having “Name” and second having “Age”. We want to simply combine the data and sort it by “Name”. What we need to do? Just two things: Use two mapper classes. Specify the mapper classes in MultipleInputs class object in run/main method.

  • Use two mapper classes.
  • Specify the mapper classes in MultipleInputs class object in run/main method.

Input Files 1

File1.txt

Aman  19
Tom   20
Tony  15
John  18
Johnny      19
Hugh  17

Input Files 2

File2.txt

James,21
Punk,21
Frank,20

实现Map和Reduce类

Hadoop multiple input files Driver Class

package com.mcis;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultiInputJob extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJarByClass(MultiInputJob.class);
        MultipleInputs.addInputPath(job, new Path(args[0]),
                TextInputFormat.class, CounterMapper.class);
        MultipleInputs.addInputPath(job, new Path(args[1]),
                TextInputFormat.class, CountertwoMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        job.setReducerClass(CounterReducer.class);
        job.setNumReduceTasks(1);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(IntWritable.class);

        return (job.waitForCompletion(true) ? 0 : 1);

    }

    public static void main(String[] args) throws Exception {

        ToolRunner.run(new Configuration(), new MultiInputJob(), args);

    }

}

Hadoop multiple input files Counter Mapper Class

package com.mcis;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

class CounterMapper extends Mapper<LongWritable, Text, LongWritable,Text>
{
 public void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException
 {
  String[] line=value.toString().trim().split("\t"); 

  context.write(new LongWritable(Long.parseLong(line[1])), new Text(line[0]));
 }
}

Hadoop multiple input files Counter Two Mapper Class

package com.mcis;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

class CountertwoMapper extends Mapper<LongWritable, Text,LongWritable,Text>
{
 public void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException
 {
  String[] lines=value.toString().trim().split(",");
  context.write(new LongWritable(Long.parseLong(lines[1])),new Text(lines[0]));
 }
}

Hadoop multiple input files Counter Reducer Class

package com.mcis;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

class CounterReducer extends Reducer<LongWritable,Text,LongWritable,IntWritable>
{

    String line=null;

    public void reduce(LongWritable  key, Iterable<Text> value, Context context ) throws IOException, InterruptedException
    {
        int count=0;
        for (Text text : value) {
           count++;
        }
        context.write(key, new IntWritable(count));
    }
}

Output Files

15    1
17    1
18    1
19    2
20    2
21    2

参考

  • http://www.hadooptpoint.org/hadoop-multiple-input-files-example-in-mapreduce/#codesyntax_5

  • « JAVA8 Guava
  • 大数据资料 »

Published

1 4, 2018

Category

bigdata

Tags

  • bigdata 12
  • mr 1
  • Powered by Pelican. Theme: Elegant by Talha Mansoor