官术网_书友最值得收藏!

Handling data joins

Joins are commonplace in Big Data processing. They occur on the value of a join key and on a data type in the datasets that participate in a join. In this book, we will refrain from explaining the different join semantics such as inner joins, outer joins, and cross joins, and focus on inner join processing using MapReduce and the optimizations involved in it.

In MapReduce, joins can be done in either the Map task or the Reduce task. The former is called a Map-side join and the latter is called a Reduce-side join.

Reduce-side joins

Reduce-side joins are meant for more general purposes and do not impose too many conditions on the datasets that participate in the join. However, the shuffle step is very heavy on resources.

The basic idea involves tagging each record with a data source tag and extracting the join key in the Map tasks. The Reduce task receives all the records with the same join key and does the actual join. If one of the datasets participating in the join is very small, it can be distributed via a side channel such as the DistributedCache to every Reduce task.

For the Reduce-side joins to work, there are the following requirements:

  • There needs to be a way of specifying the InputFormat and Mapper classes for the different datasets participating in the join. The MultipleInputs class is designed for this purpose. For a smaller file, the DistributedCache API can be used. A Map-side join, which will be explained later, shows how to use this side-file distribution channel.
  • Secondary sorting capability needs to be there for optimal Reduce-side joins. The sorting of the join keys will happen, but it is important that the source is also sorted for each matching join key. By secondary sorting, one source occurs after the other, eliminating the need to hold all records for a particular key in the memory.

The following example illustrates Reduce-side joins. The dataset contains world cities and some information about the cities, the country code being one of them. It is available at http://dev.maxmind.com/geoip/legacy/geolite/ in a CSV format. Countries have a two-letter ISO code. The countrycodes.txt file was taken from http://www.spoonfork.org/isocodes.html.

In this example and in subsequent examples of joins, the ISO code for the country is the join key. This key is used to get the country name and the total population of that country calculated by summing up the population of its individual cities. The join can be done by the following steps:

  1. A custom Writable data type needs to be implemented to have the dataset tag information within the key. The following code shows the implementation of such a composite key:
    package MasteringHadoop;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class CompositeJoinKeyWritable implements WritableComparable<CompositeJoinKeyWritable> {
      
      private Text key = new Text();
      private IntWritable source = new IntWritable();
      
      public CompositeJoinKeyWritable(){
        
      }
      
      public CompositeJoinKeyWritable(String key, int source){
        
        this.key.set(key);
        this.source.set(source);
        
      }
      
      public IntWritable getSource(){
        return this.source;
      }
      
      public Text getKey(){
        return this.key;
      }
      
      public void setSource(int source){
        this.source.set(source);
      }
      
      public void setKey(String key){
        this.key.set(key);
        
      }
      
      @Override
      public void write(DataOutput dataOutput) throws IOException {
        
        this.key.write(dataOutput);
        this.source.write(dataOutput);
      }
      
      @Override
      public void readFields(DataInput dataInput) throws IOException {
        
        this.key.readFields(dataInput);
        this.source.readFields(dataInput);
        
      }
      
      
      @Override
      public int compareTo(CompositeJoinKeyWritable o) {
        
        int result = this.key.compareTo(o.key);
        
        if(result == 0){
          return this.source.compareTo(o.source);
        }
        
        
        return result;
      }
      
      @Override
      public boolean equals(Object obj){
        
        if(obj instanceof CompositeJoinKeyWritable){
          
          CompositeJoinKeyWritable joinKeyWritable = (CompositeJoinKeyWritable)obj;
          
          return (key.equals(joinKeyWritable.key) && source.equals(joinKeyWritable.source));
        }
        
        return false;
        
      }
    }
  2. A custom Partitioner class needs to be implemented. Partitioner must partition the data based on the natural join key only; in this case, it is the ISO country code. This ensures that all the cities with the same country code are processed by the same Reduce task. The following code gives an implementation of a custom Partitioner class:
    public static class CompositeJoinKeyPartitioner extends Partitioner<CompositeJoinKeyWritable, Text>{
    
    
            @Override
    
            public int getPartition(CompositeJoinKeyWritable key, Text value, int i) {
    
                return (key.getKey().hashCode() % i);
    
            }
        }
  3. A custom grouping comparator needs to be written. Again, like the partitioner, the grouping has to be done on the natural key alone. The following code shows the grouping comparator for the composite key:
    public static class CompositeJoinKeyComparator extends WritableComparator{
    
            protected CompositeJoinKeyComparator(){
                 super(CompositeJoinKeyWritable.class, true);
    
            }
    
            @Override
            public int compare(Object a, Object b) {
    
                CompositeJoinKeyWritable compositeKey1 = (CompositeJoinKeyWritable) a;
                CompositeJoinKeyWritable compositeKey2 = (CompositeJoinKeyWritable) b;
    
                return compositeKey1.getKey().compareTo(compositeKey2.getKey());
    
            }
        }
  4. The Mapper classes have to be written for each kind of input datasets. In the following example, two Mapper classes are present: one for the city dataset and the other for the country dataset. The country dataset has a number less than the city dataset. This is done for efficiency. When a secondary sort is done on the dataset keys, the country dataset record appears before the city records at the Reducer:
    public static class MasteringHadoopReduceSideJoinCountryMap extends Mapper<LongWritable, Text, CompositeJoinKeyWritable, Text>{
    
            private static short COUNTRY_CODE_INDEX = 0;
            private static short COUNTRY_NAME_INDEX = 1;
    
            private static CompositeJoinKeyWritable joinKeyWritable = new CompositeJoinKeyWritable("", 1);
            private static Text recordValue = new Text("");
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String[] tokens = value.toString().split(",", -1);
    
                if(tokens != null){
                    joinKeyWritable.setKey(tokens[COUNTRY_CODE_INDEX]);
                    recordValue.set(tokens[COUNTRY_NAME_INDEX]);
                    context.write(joinKeyWritable, recordValue);
                }
    
    
            }
        }
    
        public static class MasteringHadoopReduceSideJoinCityMap extends Mapper<LongWritable, Text, CompositeJoinKeyWritable, Text>{
    
            private static short COUNTRY_CODE_INDEX = 0;
    
    
            private static CompositeJoinKeyWritable joinKeyWritable = new CompositeJoinKeyWritable("", 2);
            private static Text record = new Text("");
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String[] tokens = value.toString().split(",", -1);
    
                if(tokens != null){
                    joinKeyWritable.setKey(tokens[COUNTRY_CODE_INDEX]);
                    record.set(value.toString());
                    context.write(joinKeyWritable, record);
                }
    
    
            }
        }
  5. The Reducer class takes advantage of secondary sorting to emit the joined records. The first value of the iterator is the country record. The name of the country is stored away and the population is calculated based on other records.
    public static class MasteringHadoopReduceSideJoinReduce extends
                Reducer<CompositeJoinKeyWritable, Text, Text, LongWritable>{
    
            private static LongWritable populationValue = new LongWritable(0);
            private static Text countryValue = new Text("");
            private static short POPULATION_INDEX = 4;
    
            @Override
            protected void reduce(CompositeJoinKeyWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    
                long populationTotal = 0;
                boolean firstRecord = true;
                String country = null;
                for(Text record : values){
    
                    String[] tokens = record.toString().split(",", -1);
                    if(firstRecord){
                        firstRecord = false;
                        if(tokens.length > 1)
                            break;
                        else
                          country = tokens[0];
                    }
                    else{
                        String populationString = tokens[POPULATION_INDEX];
    
                        if(populationString != null && populationString.isEmpty() == false){
                            populationTotal += Long.parseLong(populationString);
                        }
    
                    }
                }
    
                if(country != null){
                    populationValue.set(populationTotal);
                    countryValue.set(country);
                    context.write(countryValue, populationValue);
    
                }
    
    
            }
        }
  6. The driver program specifies all the custom data types that are required to do the Reduce-side join:
    public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException{
    
            GenericOptionsParser parser = new GenericOptionsParser(args);
            Configuration config = parser.getConfiguration();
            String[] remainingArgs = parser.getRemainingArgs();
    
            Job job = Job.getInstance(config, "MasteringHadoop-ReduceSideJoin");
    
            job.setMapOutputKeyClass(CompositeJoinKeyWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            job.setReducerClass(MasteringHadoopReduceSideJoinReduce.class);
            job.setPartitionerClass(CompositeJoinKeyPartitioner.class);
            job.setGroupingComparatorClass(CompositeJoinKeyComparator.class);
            job.setNumReduceTasks(3);
    
    
            MultipleInputs.addInputPath(job, new Path(remainingArgs[0]), TextInputFormat.class, MasteringHadoopReduceSideJoinCountryMap.class);
            MultipleInputs.addInputPath(job, new Path(remainingArgs[1]), TextInputFormat.class, MasteringHadoopReduceSideJoinCityMap.class);
    
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path(remainingArgs[2]));
    
            job.waitForCompletion(true);
    
        }

Map-side joins

Map-side joins, on the contrary, require either of two conditions satisfied in the datasets they join. These conditions are as follows:

  • In addition to the presence of join keys, all inputs must be sorted using the join keys. The input datasets must have the same number of partitions. All records with the same key must reside in the same partition. Map-side joins are particularly attractive when operated on outputs of other MapReduce jobs. Such conditions are automatically satisfied in these cases. The CompositeInputFormat class can be used to run Map-side joins on such datasets. The configurations for inputs and join types can be specified using properties.
  • If one of the datasets is small enough, side file distribution channels such as the DistributedCache can be used to do a Map-side join.

In the following example, the countries file is distributed across all nodes. During Map task setup, it is loaded into the memory onto a TreeMap data structure. The setup() method of the Mapper class is overridden to load the smaller data set in memory:

package MasteringHadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.TreeMap;

public class MasteringHadoopMapSideJoin {

    public static class MasteringHadoopMapSideJoinMap extends Mapper<LongWritable, Text, Text, LongWritable> {

        private static short COUNTRY_CODE_INDEX = 0;
        private static short COUNTRY_NAME_INDEX = 1;
        private static short POPULATION_INDEX = 4;


        private TreeMap<String, String> countryCodesTreeMap = new TreeMap<String, String>();
        private Text countryKey = new Text("");
        private LongWritable populationValue = new LongWritable(0);


        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

            URI[] localFiles = context.getCacheFiles();

            String path = null;
            for(URI uri : localFiles){
                path = uri.getPath();
                if(path.trim().equals("countrycodes.txt")){
                     break;
                }

            }

            if(path != null){
               getCountryCodes(path, context);
            }

        }

The getCountryCodes() private method, given as follows, is used to read the side file from the DistributedCache. Each line is processed and stored in the TreeMap instance. This method is a part of the Mapper class as well:

        private void getCountryCodes(String path, Context context) throws IOException{

            Configuration configuration = context.getConfiguration();
            FileSystem fileSystem = FileSystem.get(configuration);
            FSDataInputStream in = fileSystem.open(new Path(path));
            Text line = new Text("");
            LineReader lineReader = new LineReader(in, configuration);

            int offset = 0;
            do{
                offset = lineReader.readLine(line);

                if(offset > 0){
                    String[] tokens = line.toString().split(",", -1);
                    countryCodesTreeMap.put(tokens[COUNTRY_CODE_INDEX], tokens[COUNTRY_NAME_INDEX]);
                }

            }while(offset != 0);

        }

The map override method of the Mapper is where the join takes place. Each key is checked against the TreeMap data structure for a match. If a match exists, a joined record is emitted:

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String cityRecord = value.toString();
            String[] tokens = cityRecord.split(",", -1);

            String country = tokens[COUNTRY_CODE_INDEX];
            String populationString = tokens[POPULATION_INDEX];

            if(country != null && country.isEmpty() == false){

                if(populationString != null && populationString.isEmpty() == false){

                    long population = Long.parseLong(populationString);
                    String countryName = countryCodesTreeMap.get(country);

                    if(countryName == null) countryName = country;

                    countryKey.set(countryName);
                    populationValue.set(population);
                    context.write(countryKey, populationValue);

                }

            }
       }
    }

The Reduce task is a simple task that reduces on the join key and calculates the total population in a country. The code is given as follows:

    public static class MasteringHadoopMapSideJoinReduce extends Reducer<Text, LongWritable, Text, LongWritable>{

        private static LongWritable populationValue = new LongWritable(0);
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

             long populationTotal = 0;

             for(LongWritable population : values){
                populationTotal += population.get();
             }
            populationValue.set(populationTotal);
            context.write(key, populationValue);
        }
    }
主站蜘蛛池模板: 崇明县| 珠海市| 景谷| 昌邑市| 台前县| 齐齐哈尔市| 基隆市| 祥云县| 崇仁县| 禹城市| 米泉市| 务川| 广安市| 犍为县| 泰兴市| 思茅市| 宽甸| 隆子县| 菏泽市| 阳高县| 合江县| 中西区| 水富县| 寿阳县| 安丘市| 望江县| 田林县| 景东| 鸡西市| 紫阳县| 南涧| 商洛市| 加查县| 富源县| 府谷县| 昭苏县| 永善县| 射洪县| 凯里市| 桂阳县| 平度市|