官术网_书友最值得收藏!

Map Reduce program to find distinct values

In this recipe, we are going to learn how to write a map reduce program to find distinct values from a given set of data.

Getting ready

To perform this recipe, you should have a running Hadoop cluster as well as an eclipse that is similar to an IDE.

How to do it

Sometimes, there may be a chance that the data you have contains some duplicate values. In SQL, we have something called a distinct function, which helps us get distinct values. In this recipe, we are going to take a look at how we can get distinct values using map reduce programs.

Let's consider a use case where we have some user data with us, which contains two columns: userId and username. Let's assume that the data we have contains duplicate records, and for our processing needs, we only need distinct records through user IDs. Here is some sample data that we have where columns are separated by '|':

1|Tanmay
2|Ramesh
3|Ram
1|Tanmay
2|Ramesh
6|Rahul
6|Rahul
4|Sneha
4|Sneha

The idea here is to use the default reducer behavior where the same keys are sent to one reducer. In this case, we will make userId the key and emit it to the reducer. In the reducer, the same keys will be reduced together, which will avoid duplicates.

Let's look at the Mapper Code.

public static class DistinctUserMapper extends Mapper<Object, Text, Text, NullWritable> {
        private Text userId = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            String words[] = value.toString().split("[|]");
            userId.set(words[0]);
            context.write(userId, NullWritable.get());
        }
    }

We only want distinct user IDs, hence, we emit only user IDs as keys and nulls as values.

Now, let's look at the reducer code:

public static class DistinctUserReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
        public void reduce(Text key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
}

Here, we only emit user IDs as they come. This step removes duplicates as the reducer only treats the records by their keys and only one record per key is kept.

The driver code remains simple, as shown here:

public class DistinctValues {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        if (args.length != 2) {
            System.err.println("Usage: DistinctValues <in><out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "Distinct User Id finder");
        job.setJarByClass(DistinctValues.class);
        job.setMapperClass(DistinctUserMapper.class);
        job.setReducerClass(DistinctUserReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

Now, when we execute the code, we will see the following output:

hadoop jar distinct.jar com.demo.DistinctValues /users /distinct_user_ids
hadoop fs -cat /distinct_user_ids/part-r-00000
 1
 2
 3
 4
 6

How it works...

When mapper emits keys and values, the output is shuffled across the nodes in the cluster. Here, the partitioner decides which keys should be reduced and on which node. On all the nodes, the same partitioning logic is used, which makes sure that the same keys are grouped together. In the preceding code, we use this default behavior to find distinct user IDs.

主站蜘蛛池模板: 进贤县| 抚宁县| 柳州市| 萨嘎县| 景谷| 罗城| 正安县| 伊川县| 连江县| 吐鲁番市| 蒲江县| 会同县| 海兴县| 贵州省| 莱州市| 九寨沟县| 全南县| 神木县| 连江县| 汨罗市| 三原县| 齐河县| 昌都县| 盐山县| 依安县| 南涧| 大英县| 清河县| 揭阳市| 镇康县| 凤翔县| 南和县| 凌海市| 泗水县| 唐河县| 徐闻县| 南岸区| 冷水江市| 阿拉尔市| 文水县| 遂川县|