官术网_书友最值得收藏!

Handling data joins

Joins are commonplace in Big Data processing. They occur on the value of a join key and on a data type in the datasets that participate in a join. In this book, we will refrain from explaining the different join semantics such as inner joins, outer joins, and cross joins, and focus on inner join processing using MapReduce and the optimizations involved in it.

In MapReduce, joins can be done in either the Map task or the Reduce task. The former is called a Map-side join and the latter is called a Reduce-side join.

Reduce-side joins

Reduce-side joins are meant for more general purposes and do not impose too many conditions on the datasets that participate in the join. However, the shuffle step is very heavy on resources.

The basic idea involves tagging each record with a data source tag and extracting the join key in the Map tasks. The Reduce task receives all the records with the same join key and does the actual join. If one of the datasets participating in the join is very small, it can be distributed via a side channel such as the DistributedCache to every Reduce task.

For the Reduce-side joins to work, there are the following requirements:

  • There needs to be a way of specifying the InputFormat and Mapper classes for the different datasets participating in the join. The MultipleInputs class is designed for this purpose. For a smaller file, the DistributedCache API can be used. A Map-side join, which will be explained later, shows how to use this side-file distribution channel.
  • Secondary sorting capability needs to be there for optimal Reduce-side joins. The sorting of the join keys will happen, but it is important that the source is also sorted for each matching join key. By secondary sorting, one source occurs after the other, eliminating the need to hold all records for a particular key in the memory.

The following example illustrates Reduce-side joins. The dataset contains world cities and some information about the cities, the country code being one of them. It is available at http://dev.maxmind.com/geoip/legacy/geolite/ in a CSV format. Countries have a two-letter ISO code. The countrycodes.txt file was taken from http://www.spoonfork.org/isocodes.html.

In this example and in subsequent examples of joins, the ISO code for the country is the join key. This key is used to get the country name and the total population of that country calculated by summing up the population of its individual cities. The join can be done by the following steps:

  1. A custom Writable data type needs to be implemented to have the dataset tag information within the key. The following code shows the implementation of such a composite key:
    package MasteringHadoop;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class CompositeJoinKeyWritable implements WritableComparable<CompositeJoinKeyWritable> {
      
      private Text key = new Text();
      private IntWritable source = new IntWritable();
      
      public CompositeJoinKeyWritable(){
        
      }
      
      public CompositeJoinKeyWritable(String key, int source){
        
        this.key.set(key);
        this.source.set(source);
        
      }
      
      public IntWritable getSource(){
        return this.source;
      }
      
      public Text getKey(){
        return this.key;
      }
      
      public void setSource(int source){
        this.source.set(source);
      }
      
      public void setKey(String key){
        this.key.set(key);
        
      }
      
      @Override
      public void write(DataOutput dataOutput) throws IOException {
        
        this.key.write(dataOutput);
        this.source.write(dataOutput);
      }
      
      @Override
      public void readFields(DataInput dataInput) throws IOException {
        
        this.key.readFields(dataInput);
        this.source.readFields(dataInput);
        
      }
      
      
      @Override
      public int compareTo(CompositeJoinKeyWritable o) {
        
        int result = this.key.compareTo(o.key);
        
        if(result == 0){
          return this.source.compareTo(o.source);
        }
        
        
        return result;
      }
      
      @Override
      public boolean equals(Object obj){
        
        if(obj instanceof CompositeJoinKeyWritable){
          
          CompositeJoinKeyWritable joinKeyWritable = (CompositeJoinKeyWritable)obj;
          
          return (key.equals(joinKeyWritable.key) && source.equals(joinKeyWritable.source));
        }
        
        return false;
        
      }
    }
  2. A custom Partitioner class needs to be implemented. Partitioner must partition the data based on the natural join key only; in this case, it is the ISO country code. This ensures that all the cities with the same country code are processed by the same Reduce task. The following code gives an implementation of a custom Partitioner class:
    public static class CompositeJoinKeyPartitioner extends Partitioner<CompositeJoinKeyWritable, Text>{
    
    
            @Override
    
            public int getPartition(CompositeJoinKeyWritable key, Text value, int i) {
    
                return (key.getKey().hashCode() % i);
    
            }
        }
  3. A custom grouping comparator needs to be written. Again, like the partitioner, the grouping has to be done on the natural key alone. The following code shows the grouping comparator for the composite key:
    public static class CompositeJoinKeyComparator extends WritableComparator{
    
            protected CompositeJoinKeyComparator(){
                 super(CompositeJoinKeyWritable.class, true);
    
            }
    
            @Override
            public int compare(Object a, Object b) {
    
                CompositeJoinKeyWritable compositeKey1 = (CompositeJoinKeyWritable) a;
                CompositeJoinKeyWritable compositeKey2 = (CompositeJoinKeyWritable) b;
    
                return compositeKey1.getKey().compareTo(compositeKey2.getKey());
    
            }
        }
  4. The Mapper classes have to be written for each kind of input datasets. In the following example, two Mapper classes are present: one for the city dataset and the other for the country dataset. The country dataset has a number less than the city dataset. This is done for efficiency. When a secondary sort is done on the dataset keys, the country dataset record appears before the city records at the Reducer:
    public static class MasteringHadoopReduceSideJoinCountryMap extends Mapper<LongWritable, Text, CompositeJoinKeyWritable, Text>{
    
            private static short COUNTRY_CODE_INDEX = 0;
            private static short COUNTRY_NAME_INDEX = 1;
    
            private static CompositeJoinKeyWritable joinKeyWritable = new CompositeJoinKeyWritable("", 1);
            private static Text recordValue = new Text("");
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String[] tokens = value.toString().split(",", -1);
    
                if(tokens != null){
                    joinKeyWritable.setKey(tokens[COUNTRY_CODE_INDEX]);
                    recordValue.set(tokens[COUNTRY_NAME_INDEX]);
                    context.write(joinKeyWritable, recordValue);
                }
    
    
            }
        }
    
        public static class MasteringHadoopReduceSideJoinCityMap extends Mapper<LongWritable, Text, CompositeJoinKeyWritable, Text>{
    
            private static short COUNTRY_CODE_INDEX = 0;
    
    
            private static CompositeJoinKeyWritable joinKeyWritable = new CompositeJoinKeyWritable("", 2);
            private static Text record = new Text("");
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String[] tokens = value.toString().split(",", -1);
    
                if(tokens != null){
                    joinKeyWritable.setKey(tokens[COUNTRY_CODE_INDEX]);
                    record.set(value.toString());
                    context.write(joinKeyWritable, record);
                }
    
    
            }
        }
  5. The Reducer class takes advantage of secondary sorting to emit the joined records. The first value of the iterator is the country record. The name of the country is stored away and the population is calculated based on other records.
    public static class MasteringHadoopReduceSideJoinReduce extends
                Reducer<CompositeJoinKeyWritable, Text, Text, LongWritable>{
    
            private static LongWritable populationValue = new LongWritable(0);
            private static Text countryValue = new Text("");
            private static short POPULATION_INDEX = 4;
    
            @Override
            protected void reduce(CompositeJoinKeyWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    
                long populationTotal = 0;
                boolean firstRecord = true;
                String country = null;
                for(Text record : values){
    
                    String[] tokens = record.toString().split(",", -1);
                    if(firstRecord){
                        firstRecord = false;
                        if(tokens.length > 1)
                            break;
                        else
                          country = tokens[0];
                    }
                    else{
                        String populationString = tokens[POPULATION_INDEX];
    
                        if(populationString != null && populationString.isEmpty() == false){
                            populationTotal += Long.parseLong(populationString);
                        }
    
                    }
                }
    
                if(country != null){
                    populationValue.set(populationTotal);
                    countryValue.set(country);
                    context.write(countryValue, populationValue);
    
                }
    
    
            }
        }
  6. The driver program specifies all the custom data types that are required to do the Reduce-side join:
    public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException{
    
            GenericOptionsParser parser = new GenericOptionsParser(args);
            Configuration config = parser.getConfiguration();
            String[] remainingArgs = parser.getRemainingArgs();
    
            Job job = Job.getInstance(config, "MasteringHadoop-ReduceSideJoin");
    
            job.setMapOutputKeyClass(CompositeJoinKeyWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            job.setReducerClass(MasteringHadoopReduceSideJoinReduce.class);
            job.setPartitionerClass(CompositeJoinKeyPartitioner.class);
            job.setGroupingComparatorClass(CompositeJoinKeyComparator.class);
            job.setNumReduceTasks(3);
    
    
            MultipleInputs.addInputPath(job, new Path(remainingArgs[0]), TextInputFormat.class, MasteringHadoopReduceSideJoinCountryMap.class);
            MultipleInputs.addInputPath(job, new Path(remainingArgs[1]), TextInputFormat.class, MasteringHadoopReduceSideJoinCityMap.class);
    
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path(remainingArgs[2]));
    
            job.waitForCompletion(true);
    
        }

Map-side joins

Map-side joins, on the contrary, require either of two conditions satisfied in the datasets they join. These conditions are as follows:

  • In addition to the presence of join keys, all inputs must be sorted using the join keys. The input datasets must have the same number of partitions. All records with the same key must reside in the same partition. Map-side joins are particularly attractive when operated on outputs of other MapReduce jobs. Such conditions are automatically satisfied in these cases. The CompositeInputFormat class can be used to run Map-side joins on such datasets. The configurations for inputs and join types can be specified using properties.
  • If one of the datasets is small enough, side file distribution channels such as the DistributedCache can be used to do a Map-side join.

In the following example, the countries file is distributed across all nodes. During Map task setup, it is loaded into the memory onto a TreeMap data structure. The setup() method of the Mapper class is overridden to load the smaller data set in memory:

package MasteringHadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.TreeMap;

public class MasteringHadoopMapSideJoin {

    public static class MasteringHadoopMapSideJoinMap extends Mapper<LongWritable, Text, Text, LongWritable> {

        private static short COUNTRY_CODE_INDEX = 0;
        private static short COUNTRY_NAME_INDEX = 1;
        private static short POPULATION_INDEX = 4;


        private TreeMap<String, String> countryCodesTreeMap = new TreeMap<String, String>();
        private Text countryKey = new Text("");
        private LongWritable populationValue = new LongWritable(0);


        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

            URI[] localFiles = context.getCacheFiles();

            String path = null;
            for(URI uri : localFiles){
                path = uri.getPath();
                if(path.trim().equals("countrycodes.txt")){
                     break;
                }

            }

            if(path != null){
               getCountryCodes(path, context);
            }

        }

The getCountryCodes() private method, given as follows, is used to read the side file from the DistributedCache. Each line is processed and stored in the TreeMap instance. This method is a part of the Mapper class as well:

        private void getCountryCodes(String path, Context context) throws IOException{

            Configuration configuration = context.getConfiguration();
            FileSystem fileSystem = FileSystem.get(configuration);
            FSDataInputStream in = fileSystem.open(new Path(path));
            Text line = new Text("");
            LineReader lineReader = new LineReader(in, configuration);

            int offset = 0;
            do{
                offset = lineReader.readLine(line);

                if(offset > 0){
                    String[] tokens = line.toString().split(",", -1);
                    countryCodesTreeMap.put(tokens[COUNTRY_CODE_INDEX], tokens[COUNTRY_NAME_INDEX]);
                }

            }while(offset != 0);

        }

The map override method of the Mapper is where the join takes place. Each key is checked against the TreeMap data structure for a match. If a match exists, a joined record is emitted:

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String cityRecord = value.toString();
            String[] tokens = cityRecord.split(",", -1);

            String country = tokens[COUNTRY_CODE_INDEX];
            String populationString = tokens[POPULATION_INDEX];

            if(country != null && country.isEmpty() == false){

                if(populationString != null && populationString.isEmpty() == false){

                    long population = Long.parseLong(populationString);
                    String countryName = countryCodesTreeMap.get(country);

                    if(countryName == null) countryName = country;

                    countryKey.set(countryName);
                    populationValue.set(population);
                    context.write(countryKey, populationValue);

                }

            }
       }
    }

The Reduce task is a simple task that reduces on the join key and calculates the total population in a country. The code is given as follows:

    public static class MasteringHadoopMapSideJoinReduce extends Reducer<Text, LongWritable, Text, LongWritable>{

        private static LongWritable populationValue = new LongWritable(0);
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

             long populationTotal = 0;

             for(LongWritable population : values){
                populationTotal += population.get();
             }
            populationValue.set(populationTotal);
            context.write(key, populationValue);
        }
    }
主站蜘蛛池模板: 东山县| 榆树市| 岗巴县| 吐鲁番市| 常宁市| 威远县| 东至县| 陈巴尔虎旗| 和田市| 锡林浩特市| 金门县| 涟源市| 靖远县| 噶尔县| 泾阳县| 神木县| 泸西县| 双峰县| 邵武市| 昌平区| 商洛市| 南开区| 博罗县| 温州市| 周至县| 东明县| 章丘市| 巩义市| 宝兴县| 上高县| 遵义县| 襄汾县| 延川县| 大庆市| 苍溪县| 乌兰察布市| 陵水| 郸城县| 镇赉县| 双牌县| 屏东市|