Related Topics
No topics are associated with this blog
package com.georgefisher.tfidf; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; // ============ DRIVER =============== public class TFIDF_driver extends Configured implements Tool{ public static void main(String[] args) throws Exception { System.out.println("driver main() args: " + Arrays.toString(args)); int res = ToolRunner.run(new Configuration(), new TFIDF_driver(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { System.out.println("driver run() args: " + Arrays.toString(args)); Configuration conf = new Configuration(); int step = 1; // ======================== step 1 ============================ // input (byteCount, line) // output (word@doc, n) n = the frequency of word in doc Job job1 = Job.getInstance(conf); job1.setJobName("TFIDF" + step); System.out.println("job: " + job1.getJobName().toString()); job1.setJarByClass(TFIDF_driver.class); // Set the output Key, Value types for the Mapper job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(IntWritable.class); // Set the output Key, Value types for the Reducer job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); // Mapper, Combiner, Reducer // ------------------------- job1.setMapperClass(TFIDF_step1.Map.class); job1.setReducerClass(TFIDF_step1.Reduce.class); // Specify that the Mapper & Reducer are reading text files job1.setInputFormatClass(TextInputFormat.class); job1.setOutputFormatClass(TextOutputFormat.class); // Specify Mapper input and Reducer output paths conf.set("inputDir", args[0]); conf.set("originalInputDir", args[0]); conf.set("outputDir", job1.getJobName()); FileInputFormat.addInputPath(job1, new Path(conf.get("inputDir"))); FileSystem fs1 = FileSystem.get(conf); if (fs1.exists(new Path(conf.get("outputDir")))) fs1.delete(new Path(conf.get("outputDir")), true); FileOutputFormat.setOutputPath(job1, new Path(conf.get("outputDir"))); for (Path inputPath: FileInputFormat.getInputPaths(job1)) System.out.println("input path " + inputPath.toString()); System.out.println("output path " + FileOutputFormat.getOutputPath(job1).toString()); job1.waitForCompletion(true); System.out.println("job completed: " + job1.getJobName().toString()); // ======================== step 2 ============================= // input (word@doc, n) n = the frequency of word in doc // output (word@doc, n;N) N = total words in doc conf.set("inputDir", conf.get("outputDir")); step++; Job job2 = Job.getInstance(conf); job2.setJobName("TFIDF" + step); System.out.println("job : " + job2.getJobName().toString()); job2.setJarByClass(TFIDF_driver.class); // Set the output Key, Value types for the Mapper job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(Text.class); // Set the output Key, Value types for the Reducer job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); // Mapper, Combiner, Reducer // ------------------------- job2.setMapperClass(TFIDF_step2.Map.class); job2.setReducerClass(TFIDF_step2.Reduce.class); // Specify that the Mapper is reading "key tab value" conf.set("key.value.separator.in.input.line", "\t"); job2.setInputFormatClass(KeyValueTextInputFormat.class); // Specify that the Reducer is writing a text file job2.setOutputFormatClass(TextOutputFormat.class); // Specify Mapper input and Reducer output paths conf.set("outputDir", job2.getJobName()); FileInputFormat.addInputPath(job2, new Path(conf.get("inputDir"))); FileSystem fs2 = FileSystem.get(conf); if (fs2.exists(new Path(conf.get("outputDir")))) fs2.delete(new Path(conf.get("outputDir")), true); FileOutputFormat.setOutputPath(job2, new Path(conf.get("outputDir"))); for (Path inputPath: FileInputFormat.getInputPaths(job2)) System.out.println("input path " + inputPath.toString()); System.out.println("output path " + FileOutputFormat.getOutputPath(job2).toString()); job2.waitForCompletion(true); System.out.println("job completed: " + job2.getJobName().toString()); // ======================== step 3 ================================= // input (word@doc, n;N) n = the frequency of word in doc // N = total words in doc // output (word@doc, n;N;df) df = the frequency of word in dataset conf.set("inputDir", conf.get("outputDir")); step++; Job job3 = Job.getInstance(conf); job3.setJobName("TFIDF" + step); System.out.println("job : " + job3.getJobName().toString()); job3.setJarByClass(TFIDF_driver.class); // Set the output Key, Value types for the Mapper job3.setMapOutputKeyClass(Text.class); job3.setMapOutputValueClass(Text.class); // Set the output Key, Value types for the Reducer job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(Text.class); // Mapper, Combiner, Reducer // ------------------------- job3.setMapperClass(TFIDF_step3.Map.class); job3.setReducerClass(TFIDF_step3.Reduce.class); // Specify that the Mapper is reading "key tab value" conf.set("key.value.separator.in.input.line", "\t"); job3.setInputFormatClass(KeyValueTextInputFormat.class); // Specify that the Reducer is writing a text file job3.setOutputFormatClass(TextOutputFormat.class); // Specify Mapper input and Reducer output paths conf.set("outputDir", job3.getJobName()); FileInputFormat.addInputPath(job3, new Path(conf.get("inputDir"))); FileSystem fs3 = FileSystem.get(conf); if (fs3.exists(new Path(conf.get("outputDir")))) fs3.delete(new Path(conf.get("outputDir")), true); FileOutputFormat.setOutputPath(job3, new Path(conf.get("outputDir"))); for (Path inputPath: FileInputFormat.getInputPaths(job3)) System.out.println("input path " + inputPath.toString()); System.out.println("output path " + FileOutputFormat.getOutputPath(job3).toString()); job3.waitForCompletion(true); System.out.println("job completed: " + job3.getJobName().toString()); // ======================== step 4 ================================= // input (word@doc, n;N;df) n = the frequency of word in doc // N = total words in doc // df = the frequency of word in dataset // output (word@doc, [tf, idf, tfidf]) // // map-only // -------- conf.set("inputDir", conf.get("outputDir")); step++; Job job4 = Job.getInstance(conf); job4.setJobName("TFIDF" + step); System.out.println("job : " + job4.getJobName().toString()); job4.setJarByClass(TFIDF_driver.class); // Set the output Key, Value types for the Mapper job4.setMapOutputKeyClass(Text.class); job4.setMapOutputValueClass(Text.class); job4.setNumReduceTasks(0); // Mapper, Combiner, Reducer // ------------------------- job4.setMapperClass(TFIDF_step4.Map.class); // Specify that the Mapper is reading "key tab value" conf.set("key.value.separator.in.input.line", "\t"); job4.setInputFormatClass(KeyValueTextInputFormat.class); // Specify Mapper input and output paths conf.set("outputDir", job4.getJobName()); FileInputFormat.addInputPath(job4, new Path(conf.get("inputDir"))); FileSystem fs4 = FileSystem.get(conf); if (fs4.exists(new Path(conf.get("outputDir")))) fs4.delete(new Path(conf.get("outputDir")), true); FileOutputFormat.setOutputPath(job4, new Path(conf.get("outputDir"))); for (Path inputPath: FileInputFormat.getInputPaths(job4)) System.out.println("input path " + inputPath.toString()); System.out.println("output path " + FileOutputFormat.getOutputPath(job4).toString()); job4.waitForCompletion(true); System.out.println("job completed: " + job4.getJobName().toString()); // ======================== step 5 ================================= // ========= Find the max TF-IDF word in each document ============= // input (word@doc, [tf, idf, tfidf]) // [tf=2 idf=log(fileCount/df)=log(38/1)=3.6375861597263857 tfidf=tf*idf=7.275172319452771] // // output (word@doc, max-tfidf) conf.set("inputDir", conf.get("outputDir")); step++; Job job5 = Job.getInstance(conf); job5.setJobName("TFIDF" + step); System.out.println("job : " + job5.getJobName().toString()); job5.setJarByClass(TFIDF_driver.class); // Set the output Key, Value types for the Mapper job5.setMapOutputKeyClass(Text.class); job5.setMapOutputValueClass(Text.class); // Set the output Key, Value types for the Reducer job5.setOutputKeyClass(Text.class); job5.setOutputValueClass(DoubleWritable.class); // Mapper, Combiner, Reducer // ------------------------- job5.setMapperClass(TFIDF_step5.Map.class); job5.setReducerClass(TFIDF_step5.Reduce.class); // Specify that the Mapper is reading "key tab value" conf.set("key.value.separator.in.input.line", "\t"); job5.setInputFormatClass(KeyValueTextInputFormat.class); // Specify that the Reducer is writing a text file job5.setOutputFormatClass(TextOutputFormat.class); // Specify Mapper input and Reducer output paths conf.set("outputDir", job5.getJobName()); FileInputFormat.addInputPath(job5, new Path(conf.get("inputDir"))); FileSystem fs5 = FileSystem.get(conf); if (fs5.exists(new Path(conf.get("outputDir")))) fs5.delete(new Path(conf.get("outputDir")), true); FileOutputFormat.setOutputPath(job5, new Path(conf.get("outputDir"))); for (Path inputPath: FileInputFormat.getInputPaths(job5)) System.out.println("input path " + inputPath.toString()); System.out.println("output path " + FileOutputFormat.getOutputPath(job5).toString()); job5.waitForCompletion(true); System.out.println("job completed: " + job5.getJobName().toString()); // ================================================================= return 0; } }TF-IDF Step1 ->
Originally published: Wednesday, February 04, 2015; most-recently modified: Thursday, June 06, 2019