官术网_书友最值得收藏!

The second example – concurrency in a client/server environment

The client/server model is a software architecture in which applications are split into two parts: the server part that provides resources (data, operations, printer, storage, and so on) and the client part that uses the resources provided by the server. Traditionally, this architecture was used in the enterprise world, but with the boom of the Internet, it is still an actual topic. You can see a web application as a client/server application where the server part is the backend part of the application that is executed in a web server and the web navigator executes the client part of the application. SOA (short for Service-Oriented Architecture) is an other example of client/server architecture where the web services exposed are the server part and the different clients that consume them are the client part.

In a client/server environment, we usually have one server and a lot of clients that use the services provided by the server, so the performance of the server is a critical aspect when you have to design one of these systems.

In this section, we will implement a simple client/server application. It will make a search of data over the World Development Indicators of the World Bank, which you can download from here: http://data.worldbank.org/data-catalog/world-development-indicators. This data contains the values of different indicators over all the countries in the world from 1960 to 2014.

The main characteristics of our server will be:

  • The client and the server will connect using sockets
  • The client will send its queries in a string, and the server will respond to the results in another string
  • The server can respond with three different queries:
    • Query: The format of this query is q;codCountry;codIndicator;year where codCountry is the code of the country, codIndicator is the code of the indicator, and year is an optional parameter with the year you want to query. The server will respond with the information in a single string.
    • Report: The format of this query is r;codIndicator where codIndicator is the code of the indicator you want to report. The server will respond with the mean value of that indicator for all countries over the years in a single string.
    • Stop: The format of this query is z;. The server stops its execution when it receives this command.
    • In other cases, the server returns an error message.

As in the previous example, we will show you how to implement a serial version of this client/server application. Then, we will show you how to implement a concurrent version using an executor. Finally, we will compare the two solutions to view the advantages of the use of concurrency in this case.

Client/server – serial version

The serial version of our server application has three main parts:

  • The DAO (short for Data Access Object) part, responsible for access to the data and obtaining the results of the query
  • The command part, formed by a command per kind of query
  • The server part, which receives the queries, calls the corresponding command, and returns the results to the client

Let's see in detail each of these parts.

The DAO part

As we mentioned before, the server will make a search of data over the world development indicators of the World Bank. This data is in a CSV file. The DAO component in the application loads the entire file into a List object in memory. It implements a method per query it will attend that goes over the list looking for the data.

We don't include the code of this class here because it's simple to implement and it's not the main purpose of this book.

The command part

The command part is an intermediary between the DAO and the server parts. We have implemented a base abstract Command class to be the base class of all the commands:

public abstract class Command {

  protected String[] command;

  public Command (String [] command) {
    this.command=command;
  }

  public abstract String execute ();

}

Then, we have implemented a command for each query. The query is implemented in the QueryCommand class. The execute() method is as follows:

  public String execute() {
    WDIDAO dao=WDIDAO.getDAO();

    if (command.length==3) {
      return dao.query(command[1], command[2]);
    } else if (command.length==4) {
      try {
        return dao.query(command[1], command[2], Short.parseShort(command[3]));
      } catch (Exception e) {
        return "ERROR;Bad Command";
      }
    } else {
      return "ERROR;Bad Command";
    }
  }

The report is implemented in ReportCommand. The execute() method is as follows:

  @Override
  public String execute() {

    WDIDAO dao=WDIDAO.getDAO();
    return dao.report(command[1]);
  }

The stop query is implemented in the StopCommand class. Its execute() method is as follows:

  @Override
  public String execute() {
    return "Server stopped";
  }

Finally, the error situations are processed by the ErrorCommand class. Its execute() method is as follows:

  @Override
  public String execute() {
    return "Unknown command: "+command[0];
  }
The server part

Finally, the server part is implemented in the SerialServer class. First of all, it initializes the DAO calling the getDAO() method. The main objective is that the DAO loads all the data:

public class SerialServer {

  public static void main(String[] args) throws IOException {
    WDIDAO dao = WDIDAO.getDAO();
    boolean stopServer = false;
    System.out.println("Initialization completed.");

    try (ServerSocket serverSocket = new ServerSocket(Constants.SERIAL_PORT)) {

After this, we have a loop that will be executed until the server receives a stop query. This loop does the following four steps:

  • Receives a query for a client
  • Parses and splits the elements of the query
  • Calls the corresponding command
  • Returns the results to the client

These four steps are shown in the following code snippet:

  do {
    try (Socket clientSocket = serverSocket.accept();
      PrintWriter out = new PrintWriter (clientSocket.getOutputStream(), true);
      BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {
    String line = in.readLine();
    Command command;

    String[] commandData = line.split(";");
    System.out.println("Command: " + commandData[0]);
    switch (commandData[0]) {
    case "q":
      System.out.println("Query");
      command = new QueryCommand(commandData);
      break;
    case "r":
      System.out.println("Report");
      command = new ReportCommand(commandData);
      break;
    case "z":
      System.out.println("Stop");
      command = new StopCommand(commandData);
      stopServer = true;
      break;
    default:
      System.out.println("Error");
      command = new ErrorCommand(commandData);
    }
    String response = command.execute();
    System.out.println(response);
    out.println(response);
  } catch (IOException e) {
    e.printStackTrace();
  }
} while (!stopServer);

Client/server – parallel version

The serial version of the server has a very important limitation. While it is processing one query, it can't attend to other queries. If the server needs an important amount of time to respond to every request, or to certain requests, the performance of the server will be very low.

We can obtain a better performance using concurrency. If the server creates a thread when it receives a request, it can delegate to the thread all the processes of the query and it can attend new request. This approach can also have some problems. If we receive a high number of queries, we can saturate the system creating too many threads. But if we use an executor with a fixed number of threads, we can control the resources used by our server and obtain a better performance than the serial version.

To convert our serial server to a concurrent one using an executor, we have to modify the server part. The DAO part is the same, and we have changed the names of the classes that implement the command part, but their implementation is almost the same. Only the stop query changes because now it has more responsibilities. Let's see the details of the implementation of the concurrent server part.

The server part

The concurrent server part is implemented in the ConcurrentServer part. We have added two elements not included in the serial server: a cache system, implemented in the ParallelCache class and a log system, implemented in the Logger class. First of all, it initializes the DAO part calling the getDAO() method. The main objective is that the DAO loads all the data and creates a ThreadPoolExecutor object using the newFixedThreadPool() method of the Executors class. This method receives the maximum number of worker-threads we want in our server. The executor will never have more than those worker-threads. To get the number of worker-threads, we get the number of cores of our system using the availableProcessors() method of the Runtime class:

public class ConcurrentServer {

  private static ThreadPoolExecutor executor;

  private static ParallelCache cache;

  private static ServerSocket serverSocket;

  private static volatile boolean stopped = false;

  public static void main(String[] args) {

    serverSocket=null;
    WDIDAO dao=WDIDAO.getDAO();
    executor=(ThreadPoolExecutor) Executors.newFixedThreadPool (Runtime.getRuntime().availableProcessors());
    cache=new ParallelCache();
    Logger.initializeLog();

    System.out.println("Initialization completed.");

The stopped Boolean variable is declared as volatile because it will be changed from another thread. The volatile keyword ensures that when the stopped variable is set to true by another thread, this change will be visible in the main method. Without the volatile keyword, the change cannot be visible due to CPU caching or compiler optimizations. Then, we initialize ServerSocket to listen for the requests:

    serverSocket = new ServerSocket(Constants.CONCURRENT_PORT);

We can't use a try-with-resources statement to manage the server socket. When we receive a stop command, we need to shut down the server, but the server is waiting in the accept() method of the serverSocket object. To force the server to leave that method, we need to explicitly close the server (we'll do that in the shutdown() method), so we can't leave the try-with-resources statement close the socket for us.

After this, we have a loop that will be executed until the server receives a stop query. This loop does three steps, as follows:

  • Receives a query for a client
  • Creates a task to process that query
  • Sends the task to the executor

These three steps are shown in the following code snippet:

  do {
    try {
      Socket clientSocket = serverSocket.accept();
      RequestTask task = new RequestTask(clientSocket);
      executor.execute(task);
    } catch (IOException e) {
      e.printStackTrace();
    }
  } while (!stopped);

Finally, once the server has finished its execution (leaving the loop), we have to wait for the finalization of the executor using the awaitTermination() method. This method will block the main thread until the executor has finished its execution() method. Then, we shut down the cache system and wait for a message to indicate the end of the execution of the server, as follows:

  executor.awaitTermination(1, TimeUnit.DAYS);
  System.out.println("Shutting down cache");
  cache.shutdown();
  System.out.println("Cache ok");

  System.out.println("Main server thread ended");

We have added two additional methods: the getExecutor() method, which returns the ThreadPoolExecutor object that is used to execute the concurrent tasks, and the shutdown() method, which is used to finish in an ordered way the executor of the server. It calls the shutdown() method of the executor and closes ServerSocket:

  public static void shutdown() {
    stopped = true;
    System.out.println("Shutting down the server...");
    System.out.println("Shutting down executor");
    executor.shutdown();
    System.out.println("Executor ok");
    System.out.println("Closing socket");
    try {
      serverSocket.close();
      System.out.println("Socket ok");
    } catch (IOException e) {
      e.printStackTrace();
    }
    System.out.println("Shutting down logger");
    Logger.sendMessage("Shutting down the logger");
    Logger.shutdown();
    System.out.println("Logger ok");
  }

In the concurrent server, there is an essential part: the RequestTask class which processes every request of the clients. This class implements the Runnable interface, so it can be executed in an executor in a concurrent way. Its constructor receives the Socket parameter which will be used to communicate to the client:

public class RequestTask implements Runnable {

  private Socket clientSocket;

  public RequestTask(Socket clientSocket) {
    this.clientSocket = clientSocket;
  }

The run() method does the same things that are done by the serial server to respond to every requests:

  • Receives a query for a client
  • Parses and splits the elements of the query
  • Calls the corresponding command
  • Returns the results to the client

The following is its code snippet:

  public void run() {

    try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
      BufferedReader in = new BufferedReader(new InputStreamReader( clientSocket.getInputStream()));) {

      String line = in.readLine();

      Logger.sendMessage(line);
      ParallelCache cache = ConcurrentServer.getCache();
      String ret = cache.get(line);

      if (ret == null) {
        Command command;

        String[] commandData = line.split(";");
        System.out.println("Command: " + commandData[0]);
        switch (commandData[0]) {
        case "q":
          System.err.println("Query");
          command = new ConcurrentQueryCommand(commandData);
          break;
        case "r":
          System.err.println("Report");
          command = new ConcurrentReportCommand(commandData);
          break;
        case "s":
          System.err.println("Status");
          command = new ConcurrentStatusCommand(commandData);
          break;
        case "z":
          System.err.println("Stop");
          command = new ConcurrentStopCommand(commandData);
          break;
        default:
          System.err.println("Error");
          command = new ConcurrentErrorCommand(commandData);
          break;
        }
        ret = command.execute();
        if (command.isCacheable()) {
          cache.put(line, ret);
        }
      } else {
        Logger.sendMessage("Command "+line+" was found in the cache");
      }

      System.out.println(ret);
      out.println(ret);
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      try {
        clientSocket.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
The command part

In the command part, we have renamed all the classes as you can see in the previous fragment of code. The implementation is the same except in the ConcurrentStopCommand class. Now, it calls the shutdown() method of the ConcurrentServer class to terminate the execution of the server in an ordered way. This is the execute() method:

  @Override
  public String execute() {
    ConcurrentServer.shutdown();
    return "Server stopped";
  }

Also, now the Command class contains a new isCacheable() Boolean method that returns true if the command result is stored in the cache and false otherwise.

Extra components of the concurrent server

We have implemented some extra components in the concurrent server: a new command to return information about the status of the server, a cache system to store the results of the commands, time saving when a request is repeated, and a log system to write error and debug information. The following sections describe each of these components.

The status command

First of all, we have a new possible query. It has the format s; and is processed by the ConcurrentStatusCommand class. It gets ThreadPoolExecutor used by the server and obtains information about the status of the executor:

public class ConcurrentStatusCommand extends Command {
  public ConcurrentStatusCommand (String[] command) {
    super(command);
    setCacheable(false);
  }
  @Override
  public String execute() {
    StringBuilder sb=new StringBuilder();
    ThreadPoolExecutor executor=ConcurrentServer.getExecutor();

    sb.append("Server Status;");
    sb.append("Actived Threads: ");
    sb.append(String.valueOf(executor.getActiveCount()));
    sb.append(";");
    sb.append("Maximum Pool Size: ");
    sb.append(String.valueOf(executor.getMaximumPoolSize()));
    sb.append(";");
    sb.append("Core Pool Size: ");
    sb.append(String.valueOf(executor.getCorePoolSize()));
    sb.append(";");
    sb.append("Pool Size: ");
    sb.append(String.valueOf(executor.getPoolSize()));
    sb.append(";");
    sb.append("Largest Pool Size: ");
    sb.append(String.valueOf(executor.getLargestPoolSize()));
    sb.append(";");
    sb.append("Completed Task Count: ");
    sb.append(String.valueOf(executor.getCompletedTaskCount()));
    sb.append(";");
    sb.append("Task Count: ");
    sb.append(String.valueOf(executor.getTaskCount()));
    sb.append(";");
    sb.append("Queue Size: ");
    sb.append(String.valueOf(executor.getQueue().size()));
    sb.append(";");
    sb.append("Cache Size: ");
    sb.append(String.valueOf (ConcurrentServer.getCache().getItemCount()));
    sb.append(";");
    Logger.sendMessage(sb.toString());
    return sb.toString();
  }
}

The information we obtain from the server is:

  • getActiveCount(): This returns the approximate number of tasks that execute our concurrent tasks. There can be more threads in the pool, but they can be idle.
  • getMaximumPoolSize(): This returns the maximum number of worker-threads the executor can have.
  • getCorePoolSize(): This returns the core number of worker-threads the executor will have. This number determines the minimum number of threads the pool will have.
  • getPoolSize(): This returns the current number of threads in the pool.
  • getLargestPoolSize(): This returns the maximum number of threads of the pool during its execution.
  • getCompletedTaskCount(): This returns the number of tasks the executor has executed.
  • getTaskCount(): This returns the approximate number of tasks that have ever been scheduled for execution.
  • getQueue().size(): This returns the number of tasks that are waiting in the queue of tasks.

As we have created our executor using the newFixedThreadPool() method of the Executor class, our executor will have the same maximum and core worker-threads.

The cache system

We have included a cache system in our parallel server to avoid the data search that has recently been made. Our cache system has three elements:

  • The CacheItem class: This class represents every element stored in the cache. It has four attributes:
    • The command stored in the cache. We will store the query and report commands in the cache.
    • The response generated by that command.
    • The creation date of the item in the cache.
    • The last time this item was accessed in the cache.
  • The CleanCacheTask class: If we store all the commands in the cache but never delete the elements stored in it, the cache will increase its size indefinitely. To avoid this situation, we can have a task that deletes elements in the cache. We are going to implement this task as a Thread object. There are two options:
    • You can have the maximum size in the cache. If the cache has more elements than the maximum size, you can delete the elements that have been accessed less recently.
    • You can delete from the cache the elements that haven't been accessed in a predefined period of time. We are going to use this approach.
  • The ParallelCache class: This class implements the operations to store and retrieve elements in the cache. To store the data in the cache, we have used a ConcurrentHashMap data structure. As the cache will be shared between all the tasks of the server, we have to use a synchronization mechanism to protect the access to the cache avoiding data race conditions. We have three options:
    • We can use a non-synchronized data structure (for example, a HashMap) and add the necessary code to synchronize the types of access to this data structure, for example, with a lock. You can also convert a HashMap into a synchronized structure using the synchronizedMap() method of the Collections class.
    • Use a synchronized data structure, for example, Hashtable. In this case, we don't have data race conditions, but the performance can be better.
    • Use a concurrent data structure, for example, a ConcurrentHashMap class, which eliminates the possibility of data race conditions and it's optimized to work in a high concurrent environment. This is the option we're going to implement using an object of the ConcurrentHashMap class.

The code of the CleanCacheTask class is as follows:

public class CleanCacheTask implements Runnable {

  private ParallelCache cache;

  public CleanCacheTask(ParallelCache cache) {
    this.cache = cache;
  }

  @Override
  public void run() {
    try {
      while (!Thread.currentThread().interrupted()) {
        TimeUnit.SECONDS.sleep(10);
        cache.cleanCache();
      }
    } catch (InterruptedException e) {

    }
  }

}

The class has a ParallelCache object. Every 10 seconds, it executes the cleanCache() method of the ParallelCache instance.

The ParallelCache class has five different methods. First, the constructor of the class that initializes the elements of the cache. It creates the ConcurrentHashMap object and starts a thread that will execute the CleanCacheTask class:

public class ParallelCache {

  private ConcurrentHashMap<String, CacheItem> cache;
  private CleanCacheTask task;
  private Thread thread;
  public static int MAX_LIVING_TIME_MILLIS = 600_000;

  public ParallelCache() {
    cache=new ConcurrentHashMap<>();
    task=new CleanCacheTask(this);
    thread=new Thread(task);
    thread.start();
  }

Then, there are two methods to store and retrieve an element in the cache. We use the put() method to insert the element in the HashMap and the get() method to retrieve the element from the HashMap:

  public void put(String command, String response) {
    CacheItem item = new CacheItem(command, response);
    cache.put(command, item);

  }

  public String get (String command) {
    CacheItem item=cache.get(command);
    if (item==null) {
      return null;
    }
    item.setAccessDate(new Date());
    return item.getResponse();
  }

Then, the method to clean the cache used by the CleanCacheTask class is:

  public void cleanCache() {
    Date revisionDate = new Date();
    Iterator<CacheItem> iterator = cache.values().iterator();

    while (iterator.hasNext()) {
      CacheItem item = iterator.next();
      if (revisionDate.getTime() - item.getAccessDate().getTime() > MAX_LIVING_TIME_MILLIS) {
        iterator.remove();
      }
    }
  }

Finally, the method to shut down the cache that interrupts the thread executing the CleanCacheTask class and the method that returns the number of elements stored in the cache is:

  public void shutdown() {
    thread.interrupt();
  }

  public int getItemCount() {
    return cache.size();
  }
The log system

In all the examples of this chapter, we write information in the console using the System.out.println() method. When you implement an enterprise application that is going to execute in a production environment, it's a better idea to use a log system to write debug and error information. In Java, log4j is the most popular log system. In this example, we are going to implement our own log system implementing the producer/consumer concurrency design pattern. The tasks that will use our log system will be the producer, and a special task (executed as a thread) that will write the log information into a file will be the consumer. The components of this log system are:

  • LogTask: This class implements the log consumer that after every 10 seconds reads the log messages stored in the queue and writes them to a file. It will be executed by a Thread object.
  • Logger: This is the main class of our log system. It has a queue where the producer will store the information and the consumer will read it. It also includes the method to add a message into the queue and a method to get all the messages stored in the queue and write them to disk.

To implement the queue, as happens with the cache system, we need a concurrent data structure to avoid any data inconsistency errors. We have two options:

  • Use a blocking data structure, which blocks the thread when the queue is full (in our case, it will never be full) or empty
  • Use a non-blocking data structure, which returns a special value if the queue is full or empty

We have chosen a non-blocking data structure, the ConcurrentLinkedQueue class, which implements the Queue interface. We use the offer() method to insert elements in the queue and the poll() method to get elements from it.

The LogTask class code is very simple:

public class LogTask implements Runnable {

  @Override
  public void run() {
    try {
      while (Thread.currentThread().interrupted()) {
        TimeUnit.SECONDS.sleep(10);
        Logger.writeLogs();
      }
    } catch (InterruptedException e) {
    }
    Logger.writeLogs();
  }
}

The class implements the Runnable interface and, in the run() method, calls the writeLogs() method of the Logger class every 10 seconds.

The Logger class has five different static methods. First of all, a static block of code that initializes and starts a thread that executes the LogTask and creates the ConcurrentLinkedQueue class used to store the log data:

public class Logger {

  private static ConcurrentLinkedQueue<String> logQueue = new ConcurrentLinkedQueue<String>();

  private static Thread thread;

  private static final String LOG_FILE = Paths.get("output", "server.log").toString();

  static {
    LogTask task = new LogTask();
    thread = new Thread(task);
  }

Then, there is a sendMessage() method that receives a string as a parameter and stores that message in the queue. To store the message, it uses the offer() method:

  public static void sendMessage(String message) {
    logQueue.offer(new Date()+": "+message);
  }

A critical method of this class is the writeLogs() class. It obtains and deletes all the log messages stored in the queue using the poll() method of the ConcurrentLinkedQueue class and writes them to a file:

  public static void writeLogs() {
    String message;
    Path path = Paths.get(LOG_FILE);
    try (BufferedWriter fileWriter = Files.newBufferedWriter(path,StandardOpenOption.CREATE,
        StandardOpenOption.APPEND)) {
      while ((message = logQueue.poll()) != null) {
        fileWriter.write(new Date()+": "+message);
        fileWriter.newLine();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

Finally, two methods: one to truncate the log file and another to finish the executor of the log system, which interrupts the thread that is executing LogTask:

  public static void initializeLog() {
    Path path = Paths.get(LOG_FILE);
    if (Files.exists(path)) {
      try (OutputStream out = Files.newOutputStream(path,
          StandardOpenOption.TRUNCATE_EXISTING)) {

      } catch (IOException e) {
        e.printStackTrace();
      }
    }
    thread.start();
  }
  public static void shutdown() {
    thread.interrupt();
  }
主站蜘蛛池模板: 和龙市| 平谷区| 调兵山市| 兰溪市| 平罗县| 武陟县| 湖南省| 昭通市| 洛阳市| 遂溪县| 格尔木市| 高清| 富宁县| 道孚县| 仁怀市| 隆昌县| 屯留县| 尚志市| 本溪| 清涧县| 蓬安县| 济源市| 金坛市| 苗栗市| 佛冈县| 灯塔市| 九台市| 江孜县| 咸宁市| 庆安县| 静宁县| 桦甸市| 昌吉市| 河源市| 兴隆县| 汉川市| 曲松县| 科技| 怀集县| 清镇市| 东宁县|