官术网_书友最值得收藏!

Customizing Storm spouts

You have explored and understood WordCount topology provided by the Storm-starter project in previous chapters. Now it's time we move on to the next step, the do it yourself journey with Storm; so let's take up the next leap and do some exciting stuff with our own spouts that read from various sources.

Creating FileSpout

Here we will create our own spout to read the events or tuples from a file source and emit them into the topology; we would substitute spout in place of RandomSentenceSpout we used in the WordCount topology in the previous chapter.

To start, copy the project we created in Chapter 2, Getting Started with Your First Topology, into a new project and make the following changes in RandomSentenceSpout to make a new class called FileSpout within the Storm-starter project.

Now we will make changes in FileSpout so that it reads sentences from a file as shown in the following code:

public class FileSpout extends BaseRichSpout {
  //declaration section
  SpoutOutputCollector _collector;
  DataInputStream in ;
  BufferedReader br;
  Queue qe;

  //constructor
    public FileSpout() {
        qe = new LinkedList();
    }

  // the messageId builder method
  private String getMsgId(int i) {
    return (new StringBuilder("#@#MsgId")).append(i).toString();
    }

  //The function that is called at every line being read by  readFile
  //method and adds messageId at the end of each line and then add
  // the line to the linked list
    private void queueIt() {
      int msgId = 0;
      String strLine;
      try {
          while ((strLine = br.readLine()) != null) {
              qe.add((new  StringBuilder(String.valueOf(strLine))).append("#@#"  + getMsgId(msgId)).toString());
              msgId++;
          }
      } catch (IOException e) {
          e.printStackTrace();
      } catch (Exception e) {
          e.printStackTrace();
      }
    }

  //function to read line from file at specified location 
  private void readFile() {
        try {
          FileInputStream fstream = new  FileInputStream("/home/mylog"); in =  new DataInputStream(fstream);
          br = new BufferedReader(new InputStreamReader( in ));
          queueIt();
          System.out.println("FileSpout file reading done");
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
    }

  //open function that is called at the time of spout  initialization
  // it calls the readFile method that reads the file , adds  events 
  // to the linked list to be fed to the spout as tuples
  @
    Override
    public void open(Map conf, TopologyContext context,  SpoutOutputCollector  collector) {
      _collector = collector;
      readFile();
    }

  //this method is called every 100 ms and it polls the list
  //for message which is read off as next tuple and emit the spout  to
  //the topology. When queue doesn't have any events, it reads the
  //file again calling the readFile method
    @
    Override
    public void nextTuple() {
      Utils.sleep(100);
      String fullMsg = (String) qe.poll();
      String msg[] = (String[]) null;
      if (fullMsg != null) {
          msg = (new String(fullMsg)).split("#@#");
          _collector.emit(new Values(msg[0]));
          System.out.println((new StringBuilder("nextTuple done  ")).append(msg[1]).toString());
      } else {
          readFile();
      }
    }

  @
  Override
  public void ack(Object id) {}

  @
  Override
  public void fail(Object id) {}

  @
  Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
  }
}

Tip

Downloading the example code

You can download the example code files for all the Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

Tweaking WordCount topology to use FileSpout

Now we need to fit FileSpout into our WordCount topology and execute it. To do this, you need to change one line of code in WordCount topology and instantiate FileSpout instead of RandomSentenceSpout in TopologyBuilder, as shown here:

public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
//builder.setSpout("spout", new RandomSentenceSpout(), 5);
  builder.setSpout("spout", new FileSpout(), 1);

This one line change will take care of instantiation of the new spout that will read from the specified file /home/mylog (please create this file before you execute the program). Here is a screenshot of the output for your reference:

Tweaking WordCount topology to use FileSpout

The SocketSpout class

As a next step to understand the spouts better, let's create a SocketSpout class. Assuming that you are proficient in writing Socket Server or Producer, I will walk you through the process of creating a custom SocketSpout class to consume a socket output in the Storm topology:

public class SocketSpout extends BaseRichSpout{
  static SpoutOutputCollector collector;
  //The socket
    static Socket myclientSocket;
    static ServerSocket myserverSocket;
    static int myport;

  public SocketSpout(int port){
    myport=port;
  }

  public void open(Map conf,TopologyContext context,  SpoutOutputCollector collector){
    _collector=collector;
    myserverSocket=new ServerSocket(myport);
  }

  public void nextTuple(){
    myclientSocket=myserverSocket.accept();
    InputStream incomingIS=myclientSocket.getInputStream();
    byte[] b=new byte[8196];
    int len=b.incomingIS.read(b);
    _collector.emit(new Values(b));
  }
}
主站蜘蛛池模板: 上高县| 增城市| 闽侯县| 忻城县| 马边| 娄底市| 萨迦县| 昌图县| 颍上县| 揭阳市| 博白县| 筠连县| 土默特右旗| 山阳县| 安宁市| 华安县| 当雄县| 平罗县| 偏关县| 曲靖市| 镇宁| 临猗县| 开阳县| 长子县| 周口市| 白城市| 邳州市| 通辽市| 咸阳市| 南江县| 始兴县| 舞阳县| 洪泽县| 墨江| 定西市| 株洲市| 绥棱县| 镇沅| 遵化市| 华池县| 集贤县|