Related Topics
No topics are associated with this blog
package com.georgefisher.tfidf;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
// ============ DRIVER ===============
public class TFIDF_driver extends Configured implements Tool{
public static void main(String[] args) throws Exception {
System.out.println("driver main() args: " + Arrays.toString(args));
int res = ToolRunner.run(new Configuration(), new TFIDF_driver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
System.out.println("driver run() args: " + Arrays.toString(args));
Configuration conf = new Configuration();
int step = 1;
// ======================== step 1 ============================
// input (byteCount, line)
// output (word@doc, n) n = the frequency of word in doc
Job job1 = Job.getInstance(conf);
job1.setJobName("TFIDF" + step);
System.out.println("job: " + job1.getJobName().toString());
job1.setJarByClass(TFIDF_driver.class);
// Set the output Key, Value types for the Mapper
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(IntWritable.class);
// Set the output Key, Value types for the Reducer
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
// Mapper, Combiner, Reducer
// -------------------------
job1.setMapperClass(TFIDF_step1.Map.class);
job1.setReducerClass(TFIDF_step1.Reduce.class);
// Specify that the Mapper & Reducer are reading text files
job1.setInputFormatClass(TextInputFormat.class);
job1.setOutputFormatClass(TextOutputFormat.class);
// Specify Mapper input and Reducer output paths
conf.set("inputDir", args[0]);
conf.set("originalInputDir", args[0]);
conf.set("outputDir", job1.getJobName());
FileInputFormat.addInputPath(job1, new Path(conf.get("inputDir")));
FileSystem fs1 = FileSystem.get(conf);
if (fs1.exists(new Path(conf.get("outputDir"))))
fs1.delete(new Path(conf.get("outputDir")), true);
FileOutputFormat.setOutputPath(job1, new Path(conf.get("outputDir")));
for (Path inputPath: FileInputFormat.getInputPaths(job1))
System.out.println("input path " + inputPath.toString());
System.out.println("output path " +
FileOutputFormat.getOutputPath(job1).toString());
job1.waitForCompletion(true);
System.out.println("job completed: " + job1.getJobName().toString());
// ======================== step 2 =============================
// input (word@doc, n) n = the frequency of word in doc
// output (word@doc, n;N) N = total words in doc
conf.set("inputDir", conf.get("outputDir"));
step++;
Job job2 = Job.getInstance(conf);
job2.setJobName("TFIDF" + step);
System.out.println("job : " + job2.getJobName().toString());
job2.setJarByClass(TFIDF_driver.class);
// Set the output Key, Value types for the Mapper
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
// Set the output Key, Value types for the Reducer
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
// Mapper, Combiner, Reducer
// -------------------------
job2.setMapperClass(TFIDF_step2.Map.class);
job2.setReducerClass(TFIDF_step2.Reduce.class);
// Specify that the Mapper is reading "key tab value"
conf.set("key.value.separator.in.input.line", "\t");
job2.setInputFormatClass(KeyValueTextInputFormat.class);
// Specify that the Reducer is writing a text file
job2.setOutputFormatClass(TextOutputFormat.class);
// Specify Mapper input and Reducer output paths
conf.set("outputDir", job2.getJobName());
FileInputFormat.addInputPath(job2, new Path(conf.get("inputDir")));
FileSystem fs2 = FileSystem.get(conf);
if (fs2.exists(new Path(conf.get("outputDir"))))
fs2.delete(new Path(conf.get("outputDir")), true);
FileOutputFormat.setOutputPath(job2, new Path(conf.get("outputDir")));
for (Path inputPath: FileInputFormat.getInputPaths(job2))
System.out.println("input path " + inputPath.toString());
System.out.println("output path " +
FileOutputFormat.getOutputPath(job2).toString());
job2.waitForCompletion(true);
System.out.println("job completed: " + job2.getJobName().toString());
// ======================== step 3 =================================
// input (word@doc, n;N) n = the frequency of word in doc
// N = total words in doc
// output (word@doc, n;N;df) df = the frequency of word in dataset
conf.set("inputDir", conf.get("outputDir"));
step++;
Job job3 = Job.getInstance(conf);
job3.setJobName("TFIDF" + step);
System.out.println("job : " + job3.getJobName().toString());
job3.setJarByClass(TFIDF_driver.class);
// Set the output Key, Value types for the Mapper
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(Text.class);
// Set the output Key, Value types for the Reducer
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
// Mapper, Combiner, Reducer
// -------------------------
job3.setMapperClass(TFIDF_step3.Map.class);
job3.setReducerClass(TFIDF_step3.Reduce.class);
// Specify that the Mapper is reading "key tab value"
conf.set("key.value.separator.in.input.line", "\t");
job3.setInputFormatClass(KeyValueTextInputFormat.class);
// Specify that the Reducer is writing a text file
job3.setOutputFormatClass(TextOutputFormat.class);
// Specify Mapper input and Reducer output paths
conf.set("outputDir", job3.getJobName());
FileInputFormat.addInputPath(job3, new Path(conf.get("inputDir")));
FileSystem fs3 = FileSystem.get(conf);
if (fs3.exists(new Path(conf.get("outputDir"))))
fs3.delete(new Path(conf.get("outputDir")), true);
FileOutputFormat.setOutputPath(job3, new Path(conf.get("outputDir")));
for (Path inputPath: FileInputFormat.getInputPaths(job3))
System.out.println("input path " + inputPath.toString());
System.out.println("output path " +
FileOutputFormat.getOutputPath(job3).toString());
job3.waitForCompletion(true);
System.out.println("job completed: " + job3.getJobName().toString());
// ======================== step 4 =================================
// input (word@doc, n;N;df) n = the frequency of word in doc
// N = total words in doc
// df = the frequency of word in dataset
// output (word@doc, [tf, idf, tfidf])
//
// map-only
// --------
conf.set("inputDir", conf.get("outputDir"));
step++;
Job job4 = Job.getInstance(conf);
job4.setJobName("TFIDF" + step);
System.out.println("job : " + job4.getJobName().toString());
job4.setJarByClass(TFIDF_driver.class);
// Set the output Key, Value types for the Mapper
job4.setMapOutputKeyClass(Text.class);
job4.setMapOutputValueClass(Text.class);
job4.setNumReduceTasks(0);
// Mapper, Combiner, Reducer
// -------------------------
job4.setMapperClass(TFIDF_step4.Map.class);
// Specify that the Mapper is reading "key tab value"
conf.set("key.value.separator.in.input.line", "\t");
job4.setInputFormatClass(KeyValueTextInputFormat.class);
// Specify Mapper input and output paths
conf.set("outputDir", job4.getJobName());
FileInputFormat.addInputPath(job4, new Path(conf.get("inputDir")));
FileSystem fs4 = FileSystem.get(conf);
if (fs4.exists(new Path(conf.get("outputDir"))))
fs4.delete(new Path(conf.get("outputDir")), true);
FileOutputFormat.setOutputPath(job4, new Path(conf.get("outputDir")));
for (Path inputPath: FileInputFormat.getInputPaths(job4))
System.out.println("input path " + inputPath.toString());
System.out.println("output path " +
FileOutputFormat.getOutputPath(job4).toString());
job4.waitForCompletion(true);
System.out.println("job completed: " + job4.getJobName().toString());
// ======================== step 5 =================================
// ========= Find the max TF-IDF word in each document =============
// input (word@doc, [tf, idf, tfidf])
// [tf=2 idf=log(fileCount/df)=log(38/1)=3.6375861597263857 tfidf=tf*idf=7.275172319452771]
//
// output (word@doc, max-tfidf)
conf.set("inputDir", conf.get("outputDir"));
step++;
Job job5 = Job.getInstance(conf);
job5.setJobName("TFIDF" + step);
System.out.println("job : " + job5.getJobName().toString());
job5.setJarByClass(TFIDF_driver.class);
// Set the output Key, Value types for the Mapper
job5.setMapOutputKeyClass(Text.class);
job5.setMapOutputValueClass(Text.class);
// Set the output Key, Value types for the Reducer
job5.setOutputKeyClass(Text.class);
job5.setOutputValueClass(DoubleWritable.class);
// Mapper, Combiner, Reducer
// -------------------------
job5.setMapperClass(TFIDF_step5.Map.class);
job5.setReducerClass(TFIDF_step5.Reduce.class);
// Specify that the Mapper is reading "key tab value"
conf.set("key.value.separator.in.input.line", "\t");
job5.setInputFormatClass(KeyValueTextInputFormat.class);
// Specify that the Reducer is writing a text file
job5.setOutputFormatClass(TextOutputFormat.class);
// Specify Mapper input and Reducer output paths
conf.set("outputDir", job5.getJobName());
FileInputFormat.addInputPath(job5, new Path(conf.get("inputDir")));
FileSystem fs5 = FileSystem.get(conf);
if (fs5.exists(new Path(conf.get("outputDir"))))
fs5.delete(new Path(conf.get("outputDir")), true);
FileOutputFormat.setOutputPath(job5, new Path(conf.get("outputDir")));
for (Path inputPath: FileInputFormat.getInputPaths(job5))
System.out.println("input path " + inputPath.toString());
System.out.println("output path " +
FileOutputFormat.getOutputPath(job5).toString());
job5.waitForCompletion(true);
System.out.println("job completed: " + job5.getJobName().toString());
// =================================================================
return 0;
}
}
TF-IDF Step1 ->
Originally published: Wednesday, February 04, 2015; most-recently modified: Thursday, June 06, 2019