2011-12-22 12 views
12

Piszę programy hadoop, i naprawdę nie chcę grać z przestarzałymi klasami. wszędzie w internecie i nie jestem w stanie znaleźć programy z zaktualizowanymHadoop JobConf jest przestarzałe, potrzebny jest zaktualizowany przykład

org.apache.hadoop.conf.Configuration

klasy insted

org.apache.hadoop.mapred .JobConf

klasa.

public static void main(String[] args) throws Exception { 
    JobConf conf = new JobConf(Test.class); 
    conf.setJobName("TESST"); 

    conf.setOutputKeyClass(Text.class); 
    conf.setOutputValueClass(IntWritable.class); 

    conf.setMapperClass(Map.class); 
    conf.setCombinerClass(Reduce.class); 
    conf.setReducerClass(Reduce.class); 

    conf.setInputFormat(TextInputFormat.class); 
    conf.setOutputFormat(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(conf, new Path(args[0])); 
    FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

    JobClient.runJob(conf); 
    } 

Tak wygląda mój główny(). Może sprawić, że ktoś dostarczy mi zaktualizowaną funkcję.

+0

Możliwy duplikat [Run Hadoop pracy bez użycia JobConf] (http://stackoverflow.com/questions/2115292/run-hadoop-job-without-using-jobconf) – chess007

+0

Nie, jego similar.But í na przykład przy pomocy klasy Configuration, która jest substytucją klasy jobconf. – CodeBanger

Odpowiedz

18

Oto klasyczny przykład WordCount. Zauważysz ton innych importów, które mogą nie być konieczne, czytając kod, który będziesz wymyślał, który jest który.

Co jest innego? Używam interfejsu narzędzia i GenericOptionParser do parsowania polecenia job a.k.a: hadoop jar ...

W programie odwzorowującym zauważysz uruchomienie programu. Możesz się tego pozbyć, zwykle jest wywoływana domyślnie po podaniu kodu dla metody Map. Umieściłem go tam, aby dać ci informację, że możesz dalej kontrolować etap mapowania. To wszystko za pomocą nowego interfejsu API. Mam nadzieję, że okaże się to przydatne. Wszelkie inne pytania, daj mi znać!

import java.io.IOException; 
import java.util.*; 

import org.apache.commons.io.FileUtils; 
import org.apache.hadoop.conf.*; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 

import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
import org.apache.hadoop.util.GenericOptionsParser; 

public class Inception extends Configured implements Tool{ 

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { 
    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
     String line = value.toString(); 
     StringTokenizer tokenizer = new StringTokenizer(line); 
     while (tokenizer.hasMoreTokens()) { 
      word.set(tokenizer.nextToken()); 
      context.write(word, one); 
     } 
    } 

    public void run (Context context) throws IOException, InterruptedException { 
     setup(context); 
     while (context.nextKeyValue()) { 
       map(context.getCurrentKey(), context.getCurrentValue(), context); 
      } 
     cleanup(context); 
    } 
} 

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { 

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
     throws IOException, InterruptedException { 
     int sum = 0; 
     for (IntWritable val : values) { 
      sum += val.get(); 
     } 
     context.write(key, new IntWritable(sum)); 
    } 
} 

public int run(String[] args) throws Exception { 

    Job job = Job.getInstance(new Configuration()); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 

    job.setMapperClass(Map.class); 
    job.setReducerClass(Reduce.class); 

    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    job.setJarByClass(WordCount.class); 

    job.submit(); 
    return 0; 
    } 

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    ToolRunner.run(new WordCount(), otherArgs); 
} 
} 
+0

thnx dla kolesia szybkiego reagowania. Spróbuję, a następnie zaakceptuję twoją odpowiedź. – CodeBanger

+0

Mogłem zapomnieć o nawiasach zamykających. Wszelkie błędy kompilacji, które pojawiają się podczas robienia rzeczy, powinny być proste do rozwiązania! – inquire

+0

Tak ... Rozwiąże to i odpowie. ;-) – CodeBanger

1

wziąć również klasyczne WordCount jako przykład:

org.apache.hadoop.mapred.JobConf jest stary, w nowej wersji używamy Configuration i Job osiągnąć.

Użyj org.apache.hadoop.mapreduce.lib.* (nowy interfejs API) zamiast org.apache.hadoop.mapred.TextInputFormat (jest stary).

Mapper i Reducer to nic nowego, zobacz funkcję main, zawiera ona stosunkowo ogólne konfiguracje, możesz je zmienić zgodnie ze swoimi specyficznymi wymaganiami.

import java.io.IOException; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { 
    private Text outputKey; 
    private IntWritable outputVal; 

    @Override 
    public void setup(Context context) { 
    outputKey = new Text(); 
    outputVal = new IntWritable(1); 
    } 

    @Override 
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
    StringTokenizer stk = new StringTokenizer(value.toString()); 
    while(stk.hasMoreTokens()) { 
     outputKey.set(stk.nextToken()); 
     context.write(outputKey, outputVal); 
    } 
    } 
} 

class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 
    private IntWritable result; 

    @Override 
    public void setup(Context context) { 
    result = new IntWritable(); 
    } 

    @Override 
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 
    int sum = 0; 
    for(IntWritable val: values) { 
     sum += val.get(); 
    } 
    result.set(sum); 
    context.write(key, result); 
    } 
} 

public class WordCount { 
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
    Configuration conf = new Configuration(); 
    if(args.length != 2) { 
     System.err.println("Usage: <in> <out>"); 
     System.exit(2); 
    } 
    Job job = Job.getInstance(conf, "Word Count"); 

    // set jar 
    job.setJarByClass(WordCount.class); 

    // set Mapper, Combiner, Reducer 
    job.setMapperClass(TokenizerMapper.class); 
    job.setCombinerClass(IntSumReducer.class); 
    job.setReducerClass(IntSumReducer.class); 

    /* Optional, set customer defined Partioner: 
    * job.setPartitionerClass(MyPartioner.class); 
    */ 

    // set output key 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(IntWritable.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 

    // set input and output path 
    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    // by default, Hadoop use TextInputFormat and TextOutputFormat 
    // any customer defined input and output class must implement InputFormat/OutputFormat interface 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
}