package com.georgefisher.tfidf;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
//MapReduce TF-IDF
// step 5   find the word in each document with the highest TF-IDF
// ------   =====================================================
// map      in  (word@doc, [tf, idf, tfidf])
//          out (doc, word;tfidf)
// reduce   in  (doc, [word;tfidf, word;tfidf, ...])
//          out (word@doc, max_tfidf)
public class TFIDF_step5 {
    // ============ MAPPER ===============
    public static class Map
    extends Mapper<Text, Text, Text, Text> {
        private static Pattern regex;
        public void setup(Context context) throws PatternSyntaxException {
            regex = Pattern.compile("tfidf=tf\\*idf=([-]?\\d+\\.\\d+)", Pattern.MULTILINE);
        private static Text doc        = new Text();
        private static Text word_tfidf = new Text();
        // map      in  (word@doc, [tf, idf, tfidf])
        //          out (doc, word;tfidf)
        public void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] word_doc = key.toString().split("@");
            String word       = word_doc[0];
            String inputValue = value.toString();
            String tfidf = null;
            Matcher regexMatcher = regex.matcher(inputValue);
            if (regexMatcher.find()) {
                tfidf =;
            context.write(doc,  word_tfidf);
    // ============ REDUCER ===============
    public static class Reduce
    extends Reducer<Text, Text, Text, DoubleWritable> {
        private static Text           word_doc  = new Text();
        private static DoubleWritable max_tfidf = new DoubleWritable();
        // reduce   in  (doc, [word;tfidf, word;tfidf, ...])
        //          out (word@doc, max_tfidf)
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String doc = key.toString();
            Double max = Double.MIN_VALUE;
            for (Text value: values) {
                String[] word_tfidf = value.toString().split(";");
                String word  = word_tfidf[0];
                Double tfidf = Double.parseDouble(word_tfidf[1]);
                if (tfidf > max) {
                    max = tfidf;
            context.write(word_doc,  max_tfidf);
Originally published: Wednesday, February 04, 2015; most-recently modified: Thursday, June 06, 2019