官术网_书友最值得收藏!

Developing the hello world example

Before starting the development, you should have Eclipse and Maven installed in your project. The sample topology explained here will cover how to create a basic Storm project, including a spout and bolt, and how to build, and execute them.

Create a Maven project by using com.stormadvance as groupId and storm-example as artifactId.

Add the following Maven dependencies to the pom.xml file:

<dependency> 
  <groupId>org.apache.storm</groupId> 
  <artifactId>storm-core</artifactId> 
  <version>1.0.2</version> 
  <scope>provided<scope> 
</dependency> 
Make sure the scope of the Storm dependency is provided, otherwise you will not be able to deploy the topology on the Storm cluster.

Add the following Maven build plugins in the pom.xml file:

<build> 
  <plugins> 
    <plugin> 
      <artifactId>maven-assembly-plugin</artifactId> 
      <version>2.2.1</version> 
      <configuration> 
        <descriptorRefs> 
          <descriptorRef>jar-with-dependencies 
          </descriptorRef> 
        </descriptorRefs> 
        <archive> 
          <manifest> 
            <mainClass /> 
          </manifest> 
        </archive> 
      </configuration> 
      <executions> 
        <execution> 
          <id>make-assembly</id> 
          <phase>package</phase> 
          <goals> 
            <goal>single</goal> 
          </goals> 
        </execution> 
      </executions> 
    </plugin> 
  </plugins> 
</build> 

Write your first sample spout by creating a SampleSpout class in the com.stormadvance.storm_example package. The SampleSpout class extends the serialized BaseRichSpout class. This spout does not connect to an external source to fetch data, but randomly generates the data and emits a continuous stream of records. The following is the source code of the SampleSpout class with an explanation:

public class SampleSpout extends BaseRichSpout { 
  private static final long serialVersionUID = 1L; 
 
  private static final Map<Integer, String> map = new HashMap<Integer, String>(); 
  static { 
    map.put(0, "google"); 
    map.put(1, "facebook"); 
    map.put(2, "twitter"); 
    map.put(3, "youtube"); 
    map.put(4, "linkedin"); 
  } 
  private SpoutOutputCollector spoutOutputCollector; 
 
  public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) { 
    // Open the spout 
    this.spoutOutputCollector = spoutOutputCollector; 
  } 
 
  public void nextTuple() { 
    // Storm cluster repeatedly calls this method to emita continuous 
    // stream of tuples. 
    final Random rand = new Random(); 
    // generate the random number from 0 to 4. 
    int randomNumber = rand.nextInt(5); 
    spoutOutputCollector.emit(new Values(map.get(randomNumber))); 
    try{ 
      Thread.sleep(5000); 
    }catch(Exception e) { 
      System.out.println("Failed to sleep the thread"); 
    } 
  } 
 
  public void declareOutputFields(OutputFieldsDeclarer declarer) { 
 
  // emit the tuple with field "site" 
  declarer.declare(new Fields("site")); 
  } 
} 

Write your first sample bolt by creating a SampleBolt class within the same package. The SampleBolt class extends the serialized BaseRichBolt class. This bolt will consume the tuples emitted by the SampleSpout spout and will print the value of the field site on the console. The following is the source code of the SampleStormBolt class with an explanation:

public class SampleBolt extends BaseBasicBolt { 
  private static final long serialVersionUID = 1L; 
 
  public void execute(Tuple input, BasicOutputCollector collector) { 
    // fetched the field "site" from input tuple. 
    String test = input.getStringByField("site"); 
    // print the value of field "site" on console. 
    System.out.println("######### Name of input site is : " + test); 
  } 
 
  public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  } 
} 

Create a main SampleStormTopology class within the same package. This class creates an instance of the spout and bolt along with the classes, and chaines them together using a TopologyBuilder class. This class uses org.apache.storm.LocalCluster to simulate the Storm cluster. The LocalCluster mode is used for debugging/testing the topology on a developer machine before deploying it on the Storm cluster. The following is the implementation of the main class:

public class SampleStormTopology { 
  public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 
    // create an instance of TopologyBuilder class 
    TopologyBuilder builder = new TopologyBuilder(); 
    // set the spout class 
    builder.setSpout("SampleSpout", new SampleSpout(), 2); 
    // set the bolt class 
    builder.setBolt("SampleBolt", new SampleBolt(), 4).shuffleGrouping("SampleSpout"); 
    Config conf = new Config(); 
    conf.setDebug(true); 
    // create an instance of LocalCluster class for 
    // executing topology in local mode. 
    LocalCluster cluster = new LocalCluster(); 
    // SampleStormTopology is the name of submitted topology 
    cluster.submitTopology("SampleStormTopology", conf, builder.createTopology()); 
    try { 
      Thread.sleep(100000); 
    } catch (Exception exception) { 
      System.out.println("Thread interrupted exception : " + exception); 
    } 
    // kill the SampleStormTopology 
    cluster.killTopology("SampleStormTopology"); 
    // shutdown the storm test cluster 
    cluster.shutdown(); 
  } 
} 

Go to your project's home directory and run the following commands to execute the topology in local mode:

$> cd $STORM_EXAMPLE_HOME 
$> mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=com.stormadvance.storm_example.SampleStormTopology 

Now create a new topology class for deploying the topology on an actual Storm cluster. Create a main SampleStormClusterTopology class within the same package. This class also creates an instance of the spout and bolt along with the classes, and chains them together using a TopologyBuilder class:

public class SampleStormClusterTopology { 
  public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 
    // create an instance of TopologyBuilder class 
    TopologyBuilder builder = new TopologyBuilder(); 
    // set the spout class 
    builder.setSpout("SampleSpout", new SampleSpout(), 2); 
    // set the bolt class 
    builder.setBolt("SampleBolt", new SampleBolt(), 4).shuffleGrouping("SampleSpout"); 
    Config conf = new Config(); 
    conf.setNumWorkers(3); 
    // This statement submit the topology on remote 
    // args[0] = name of topology 
    try { 
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 
    } catch (AlreadyAliveException alreadyAliveException) { 
      System.out.println(alreadyAliveException); 
    } catch (InvalidTopologyException invalidTopologyException) { 
      System.out.println(invalidTopologyException); 
    } catch (AuthorizationException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
    } 
  } 
} 

Build your Maven project by running the following command on the projects home directory:

mvn clean install  

The output of the preceding command is as follows:

    ------------------------------------------------------------------ ----- 
    [INFO] ----------------------------------------------------------- ----- 
    [INFO] BUILD SUCCESS 
    [INFO] ----------------------------------------------------------- ----- 
    [INFO] Total time: 58.326s 
    [INFO] Finished at: 
    [INFO] Final Memory: 14M/116M 
    [INFO] ----------------------------------------------------------- ----
  

We can deploy the topology to the cluster using the following Storm client command:

bin/storm jar jarName.jar [TopologyMainClass] [Args] 

The preceding command runs TopologyMainClass with the arguments arg1 and arg2. The main function of TopologyMainClass is to define the topology and submit it to the Nimbus machine. The storm jar part takes care of connecting to the Nimbus machine and uploading the JAR part.

Log in on a Storm Nimbus machine and execute the following commands:

$> cd $STORM_HOME
$> bin/storm jar ~/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.stormadvance.storm_example.SampleStormClusterTopology storm_example  

In the preceding code ~/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar is the path of the SampleStormClusterTopology JAR that we are deploying on the Storm cluster.

The following information is displayed:

702  [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8367952358273199959:-5050558042400210383
793  [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []
856  [main] INFO  o.a.s.StormSubmitter - Uploading topology jar /home/USER/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar to assigned location: /tmp/storm-data/nimbus/inbox/stormjar-d3007821-f87d-48af-8364-cff7abf8652d.jar
867  [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /tmp/storm-data/nimbus/inbox/stormjar-d3007821-f87d-48af-8364-cff7abf8652d.jar
868  [main] INFO  o.a.s.StormSubmitter - Submitting topology storm_example in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8367952358273199959:-5050558042400210383","topology.workers":3}
 1007 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: storm_example  

Run the jps command to see the number of running JVM processes as follows:

jps   

The preceding command's output is:

26827 worker 
26530 supervisor 
26824 worker 
26468 nimbus 
26822 worker  

In the preceding code, a worker is the JVM launched for the SampleStormClusterTopology topology.

主站蜘蛛池模板: 阳城县| 阳东县| 靖边县| 屏南县| 淮阳县| 台安县| 涿州市| 普定县| 靖江市| 天峻县| 中方县| 平遥县| 成武县| 沙河市| 三门峡市| 科技| 广汉市| 林口县| 都江堰市| 石家庄市| 长沙市| 曲靖市| 兴城市| 龙口市| 壤塘县| 锡林郭勒盟| 石嘴山市| 尼勒克县| 镇沅| 翁牛特旗| 张家港市| 娄烦县| 武乡县| 台安县| 东乌| 碌曲县| 松潘县| 台前县| 泰和县| 安宁市| 湘潭市|