官术网_书友最值得收藏!

Developing the hello world example

Before starting the development, you should have Eclipse and Maven installed in your project. The sample topology explained here will cover how to create a basic Storm project, including a spout and bolt, and how to build, and execute them.

Create a Maven project by using com.stormadvance as groupId and storm-example as artifactId.

Add the following Maven dependencies to the pom.xml file:

<dependency> 
  <groupId>org.apache.storm</groupId> 
  <artifactId>storm-core</artifactId> 
  <version>1.0.2</version> 
  <scope>provided<scope> 
</dependency> 
Make sure the scope of the Storm dependency is provided, otherwise you will not be able to deploy the topology on the Storm cluster.

Add the following Maven build plugins in the pom.xml file:

<build> 
  <plugins> 
    <plugin> 
      <artifactId>maven-assembly-plugin</artifactId> 
      <version>2.2.1</version> 
      <configuration> 
        <descriptorRefs> 
          <descriptorRef>jar-with-dependencies 
          </descriptorRef> 
        </descriptorRefs> 
        <archive> 
          <manifest> 
            <mainClass /> 
          </manifest> 
        </archive> 
      </configuration> 
      <executions> 
        <execution> 
          <id>make-assembly</id> 
          <phase>package</phase> 
          <goals> 
            <goal>single</goal> 
          </goals> 
        </execution> 
      </executions> 
    </plugin> 
  </plugins> 
</build> 

Write your first sample spout by creating a SampleSpout class in the com.stormadvance.storm_example package. The SampleSpout class extends the serialized BaseRichSpout class. This spout does not connect to an external source to fetch data, but randomly generates the data and emits a continuous stream of records. The following is the source code of the SampleSpout class with an explanation:

public class SampleSpout extends BaseRichSpout { 
  private static final long serialVersionUID = 1L; 
 
  private static final Map<Integer, String> map = new HashMap<Integer, String>(); 
  static { 
    map.put(0, "google"); 
    map.put(1, "facebook"); 
    map.put(2, "twitter"); 
    map.put(3, "youtube"); 
    map.put(4, "linkedin"); 
  } 
  private SpoutOutputCollector spoutOutputCollector; 
 
  public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) { 
    // Open the spout 
    this.spoutOutputCollector = spoutOutputCollector; 
  } 
 
  public void nextTuple() { 
    // Storm cluster repeatedly calls this method to emita continuous 
    // stream of tuples. 
    final Random rand = new Random(); 
    // generate the random number from 0 to 4. 
    int randomNumber = rand.nextInt(5); 
    spoutOutputCollector.emit(new Values(map.get(randomNumber))); 
    try{ 
      Thread.sleep(5000); 
    }catch(Exception e) { 
      System.out.println("Failed to sleep the thread"); 
    } 
  } 
 
  public void declareOutputFields(OutputFieldsDeclarer declarer) { 
 
  // emit the tuple with field "site" 
  declarer.declare(new Fields("site")); 
  } 
} 

Write your first sample bolt by creating a SampleBolt class within the same package. The SampleBolt class extends the serialized BaseRichBolt class. This bolt will consume the tuples emitted by the SampleSpout spout and will print the value of the field site on the console. The following is the source code of the SampleStormBolt class with an explanation:

public class SampleBolt extends BaseBasicBolt { 
  private static final long serialVersionUID = 1L; 
 
  public void execute(Tuple input, BasicOutputCollector collector) { 
    // fetched the field "site" from input tuple. 
    String test = input.getStringByField("site"); 
    // print the value of field "site" on console. 
    System.out.println("######### Name of input site is : " + test); 
  } 
 
  public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  } 
} 

Create a main SampleStormTopology class within the same package. This class creates an instance of the spout and bolt along with the classes, and chaines them together using a TopologyBuilder class. This class uses org.apache.storm.LocalCluster to simulate the Storm cluster. The LocalCluster mode is used for debugging/testing the topology on a developer machine before deploying it on the Storm cluster. The following is the implementation of the main class:

public class SampleStormTopology { 
  public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 
    // create an instance of TopologyBuilder class 
    TopologyBuilder builder = new TopologyBuilder(); 
    // set the spout class 
    builder.setSpout("SampleSpout", new SampleSpout(), 2); 
    // set the bolt class 
    builder.setBolt("SampleBolt", new SampleBolt(), 4).shuffleGrouping("SampleSpout"); 
    Config conf = new Config(); 
    conf.setDebug(true); 
    // create an instance of LocalCluster class for 
    // executing topology in local mode. 
    LocalCluster cluster = new LocalCluster(); 
    // SampleStormTopology is the name of submitted topology 
    cluster.submitTopology("SampleStormTopology", conf, builder.createTopology()); 
    try { 
      Thread.sleep(100000); 
    } catch (Exception exception) { 
      System.out.println("Thread interrupted exception : " + exception); 
    } 
    // kill the SampleStormTopology 
    cluster.killTopology("SampleStormTopology"); 
    // shutdown the storm test cluster 
    cluster.shutdown(); 
  } 
} 

Go to your project's home directory and run the following commands to execute the topology in local mode:

$> cd $STORM_EXAMPLE_HOME 
$> mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=com.stormadvance.storm_example.SampleStormTopology 

Now create a new topology class for deploying the topology on an actual Storm cluster. Create a main SampleStormClusterTopology class within the same package. This class also creates an instance of the spout and bolt along with the classes, and chains them together using a TopologyBuilder class:

public class SampleStormClusterTopology { 
  public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 
    // create an instance of TopologyBuilder class 
    TopologyBuilder builder = new TopologyBuilder(); 
    // set the spout class 
    builder.setSpout("SampleSpout", new SampleSpout(), 2); 
    // set the bolt class 
    builder.setBolt("SampleBolt", new SampleBolt(), 4).shuffleGrouping("SampleSpout"); 
    Config conf = new Config(); 
    conf.setNumWorkers(3); 
    // This statement submit the topology on remote 
    // args[0] = name of topology 
    try { 
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 
    } catch (AlreadyAliveException alreadyAliveException) { 
      System.out.println(alreadyAliveException); 
    } catch (InvalidTopologyException invalidTopologyException) { 
      System.out.println(invalidTopologyException); 
    } catch (AuthorizationException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
    } 
  } 
} 

Build your Maven project by running the following command on the projects home directory:

mvn clean install  

The output of the preceding command is as follows:

    ------------------------------------------------------------------ ----- 
    [INFO] ----------------------------------------------------------- ----- 
    [INFO] BUILD SUCCESS 
    [INFO] ----------------------------------------------------------- ----- 
    [INFO] Total time: 58.326s 
    [INFO] Finished at: 
    [INFO] Final Memory: 14M/116M 
    [INFO] ----------------------------------------------------------- ----
  

We can deploy the topology to the cluster using the following Storm client command:

bin/storm jar jarName.jar [TopologyMainClass] [Args] 

The preceding command runs TopologyMainClass with the arguments arg1 and arg2. The main function of TopologyMainClass is to define the topology and submit it to the Nimbus machine. The storm jar part takes care of connecting to the Nimbus machine and uploading the JAR part.

Log in on a Storm Nimbus machine and execute the following commands:

$> cd $STORM_HOME
$> bin/storm jar ~/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.stormadvance.storm_example.SampleStormClusterTopology storm_example  

In the preceding code ~/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar is the path of the SampleStormClusterTopology JAR that we are deploying on the Storm cluster.

The following information is displayed:

702  [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8367952358273199959:-5050558042400210383
793  [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []
856  [main] INFO  o.a.s.StormSubmitter - Uploading topology jar /home/USER/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar to assigned location: /tmp/storm-data/nimbus/inbox/stormjar-d3007821-f87d-48af-8364-cff7abf8652d.jar
867  [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /tmp/storm-data/nimbus/inbox/stormjar-d3007821-f87d-48af-8364-cff7abf8652d.jar
868  [main] INFO  o.a.s.StormSubmitter - Submitting topology storm_example in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8367952358273199959:-5050558042400210383","topology.workers":3}
 1007 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: storm_example  

Run the jps command to see the number of running JVM processes as follows:

jps   

The preceding command's output is:

26827 worker 
26530 supervisor 
26824 worker 
26468 nimbus 
26822 worker  

In the preceding code, a worker is the JVM launched for the SampleStormClusterTopology topology.

主站蜘蛛池模板: 望都县| 津南区| 文水县| 商南县| 孟津县| 威信县| 徐水县| 虎林市| 泰顺县| 宜黄县| 电白县| 五华县| 梓潼县| 九龙坡区| 甘孜县| 赣榆县| 永福县| 田林县| 浠水县| 辰溪县| 江阴市| 成安县| 西盟| 平舆县| 呼伦贝尔市| 绍兴市| 张掖市| 鹤峰县| 阜新市| 威信县| 盐亭县| 崇明县| 万源市| 冕宁县| 乌兰察布市| 望谟县| 巴林右旗| 昌吉市| 建阳市| 乃东县| 泽州县|