- Mastering Concurrency Programming with Java 8
- Javier Fernández González
- 3743字
- 2021-07-16 12:55:02
The second example – concurrency in a client/server environment
The client/server model is a software architecture in which applications are split into two parts: the server part that provides resources (data, operations, printer, storage, and so on) and the client part that uses the resources provided by the server. Traditionally, this architecture was used in the enterprise world, but with the boom of the Internet, it is still an actual topic. You can see a web application as a client/server application where the server part is the backend part of the application that is executed in a web server and the web navigator executes the client part of the application. SOA (short for Service-Oriented Architecture) is an other example of client/server architecture where the web services exposed are the server part and the different clients that consume them are the client part.
In a client/server environment, we usually have one server and a lot of clients that use the services provided by the server, so the performance of the server is a critical aspect when you have to design one of these systems.
In this section, we will implement a simple client/server application. It will make a search of data over the World Development Indicators of the World Bank, which you can download from here: http://data.worldbank.org/data-catalog/world-development-indicators. This data contains the values of different indicators over all the countries in the world from 1960 to 2014.
The main characteristics of our server will be:
- The client and the server will connect using sockets
- The client will send its queries in a string, and the server will respond to the results in another string
- The server can respond with three different queries:
- Query: The format of this query is
q;codCountry;codIndicator;year
wherecodCountry
is the code of the country,codIndicator
is the code of the indicator, andyear
is an optional parameter with the year you want to query. The server will respond with the information in a single string. - Report: The format of this query is
r;codIndicator
wherecodIndicator
is the code of the indicator you want to report. The server will respond with the mean value of that indicator for all countries over the years in a single string. - Stop: The format of this query is
z;
. The server stops its execution when it receives this command. - In other cases, the server returns an error message.
- Query: The format of this query is
As in the previous example, we will show you how to implement a serial version of this client/server application. Then, we will show you how to implement a concurrent version using an executor. Finally, we will compare the two solutions to view the advantages of the use of concurrency in this case.
Client/server – serial version
The serial version of our server application has three main parts:
- The DAO (short for Data Access Object) part, responsible for access to the data and obtaining the results of the query
- The command part, formed by a command per kind of query
- The server part, which receives the queries, calls the corresponding command, and returns the results to the client
Let's see in detail each of these parts.
The DAO part
As we mentioned before, the server will make a search of data over the world development indicators of the World Bank. This data is in a CSV file. The DAO component in the application loads the entire file into a List
object in memory. It implements a method per query it will attend that goes over the list looking for the data.
We don't include the code of this class here because it's simple to implement and it's not the main purpose of this book.
The command part
The command part is an intermediary between the DAO and the server parts. We have implemented a base abstract Command
class to be the base class of all the commands:
public abstract class Command { protected String[] command; public Command (String [] command) { this.command=command; } public abstract String execute (); }
Then, we have implemented a command for each query. The query is implemented in the QueryCommand
class. The execute()
method is as follows:
public String execute() { WDIDAO dao=WDIDAO.getDAO(); if (command.length==3) { return dao.query(command[1], command[2]); } else if (command.length==4) { try { return dao.query(command[1], command[2], Short.parseShort(command[3])); } catch (Exception e) { return "ERROR;Bad Command"; } } else { return "ERROR;Bad Command"; } }
The report is implemented in ReportCommand
. The execute()
method is as follows:
@Override public String execute() { WDIDAO dao=WDIDAO.getDAO(); return dao.report(command[1]); }
The stop query is implemented in the StopCommand
class. Its execute()
method is as follows:
@Override public String execute() { return "Server stopped"; }
Finally, the error situations are processed by the ErrorCommand
class. Its execute()
method is as follows:
@Override public String execute() { return "Unknown command: "+command[0]; }
The server part
Finally, the server part is implemented in the SerialServer
class. First of all, it initializes the DAO calling the getDAO()
method. The main objective is that the DAO loads all the data:
public class SerialServer { public static void main(String[] args) throws IOException { WDIDAO dao = WDIDAO.getDAO(); boolean stopServer = false; System.out.println("Initialization completed."); try (ServerSocket serverSocket = new ServerSocket(Constants.SERIAL_PORT)) {
After this, we have a loop that will be executed until the server receives a stop query. This loop does the following four steps:
- Receives a query for a client
- Parses and splits the elements of the query
- Calls the corresponding command
- Returns the results to the client
These four steps are shown in the following code snippet:
do { try (Socket clientSocket = serverSocket.accept(); PrintWriter out = new PrintWriter (clientSocket.getOutputStream(), true); BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) { String line = in.readLine(); Command command; String[] commandData = line.split(";"); System.out.println("Command: " + commandData[0]); switch (commandData[0]) { case "q": System.out.println("Query"); command = new QueryCommand(commandData); break; case "r": System.out.println("Report"); command = new ReportCommand(commandData); break; case "z": System.out.println("Stop"); command = new StopCommand(commandData); stopServer = true; break; default: System.out.println("Error"); command = new ErrorCommand(commandData); } String response = command.execute(); System.out.println(response); out.println(response); } catch (IOException e) { e.printStackTrace(); } } while (!stopServer);
Client/server – parallel version
The serial version of the server has a very important limitation. While it is processing one query, it can't attend to other queries. If the server needs an important amount of time to respond to every request, or to certain requests, the performance of the server will be very low.
We can obtain a better performance using concurrency. If the server creates a thread when it receives a request, it can delegate to the thread all the processes of the query and it can attend new request. This approach can also have some problems. If we receive a high number of queries, we can saturate the system creating too many threads. But if we use an executor with a fixed number of threads, we can control the resources used by our server and obtain a better performance than the serial version.
To convert our serial server to a concurrent one using an executor, we have to modify the server part. The DAO part is the same, and we have changed the names of the classes that implement the command part, but their implementation is almost the same. Only the stop query changes because now it has more responsibilities. Let's see the details of the implementation of the concurrent server part.
The server part
The concurrent server part is implemented in the ConcurrentServer
part. We have added two elements not included in the serial server: a cache system, implemented in the ParallelCache
class and a log system, implemented in the Logger
class. First of all, it initializes the DAO part calling the getDAO()
method. The main objective is that the DAO loads all the data and creates a ThreadPoolExecutor
object using the newFixedThreadPool()
method of the Executors
class. This method receives the maximum number of worker-threads we want in our server. The executor will never have more than those worker-threads. To get the number of worker-threads, we get the number of cores of our system using the availableProcessors()
method of the Runtime
class:
public class ConcurrentServer { private static ThreadPoolExecutor executor; private static ParallelCache cache; private static ServerSocket serverSocket; private static volatile boolean stopped = false; public static void main(String[] args) { serverSocket=null; WDIDAO dao=WDIDAO.getDAO(); executor=(ThreadPoolExecutor) Executors.newFixedThreadPool (Runtime.getRuntime().availableProcessors()); cache=new ParallelCache(); Logger.initializeLog(); System.out.println("Initialization completed.");
The stopped
Boolean variable is declared as volatile because it will be changed from another thread. The volatile
keyword ensures that when the stopped
variable is set to true
by another thread, this change will be visible in the main method. Without the volatile
keyword, the change cannot be visible due to CPU caching or compiler optimizations. Then, we initialize ServerSocket
to listen for the requests:
serverSocket = new ServerSocket(Constants.CONCURRENT_PORT);
We can't use a try-with-resources statement to manage the server socket. When we receive a stop
command, we need to shut down the server, but the server is waiting in the accept()
method of the serverSocket
object. To force the server to leave that method, we need to explicitly close the server (we'll do that in the shutdown()
method), so we can't leave the try-with-resources statement close the socket for us.
After this, we have a loop that will be executed until the server receives a stop query. This loop does three steps, as follows:
- Receives a query for a client
- Creates a task to process that query
- Sends the task to the executor
These three steps are shown in the following code snippet:
do { try { Socket clientSocket = serverSocket.accept(); RequestTask task = new RequestTask(clientSocket); executor.execute(task); } catch (IOException e) { e.printStackTrace(); } } while (!stopped);
Finally, once the server has finished its execution (leaving the loop), we have to wait for the finalization of the executor using the awaitTermination()
method. This method will block the main thread until the executor has finished its execution()
method. Then, we shut down the cache system and wait for a message to indicate the end of the execution of the server, as follows:
executor.awaitTermination(1, TimeUnit.DAYS); System.out.println("Shutting down cache"); cache.shutdown(); System.out.println("Cache ok"); System.out.println("Main server thread ended");
We have added two additional methods: the getExecutor()
method, which returns the ThreadPoolExecutor
object that is used to execute the concurrent tasks, and the shutdown()
method, which is used to finish in an ordered way the executor of the server. It calls the shutdown()
method of the executor and closes ServerSocket
:
public static void shutdown() { stopped = true; System.out.println("Shutting down the server..."); System.out.println("Shutting down executor"); executor.shutdown(); System.out.println("Executor ok"); System.out.println("Closing socket"); try { serverSocket.close(); System.out.println("Socket ok"); } catch (IOException e) { e.printStackTrace(); } System.out.println("Shutting down logger"); Logger.sendMessage("Shutting down the logger"); Logger.shutdown(); System.out.println("Logger ok"); }
In the concurrent server, there is an essential part: the RequestTask
class which processes every request of the clients. This class implements the Runnable
interface, so it can be executed in an executor in a concurrent way. Its constructor receives the Socket
parameter which will be used to communicate to the client:
public class RequestTask implements Runnable { private Socket clientSocket; public RequestTask(Socket clientSocket) { this.clientSocket = clientSocket; }
The run()
method does the same things that are done by the serial server to respond to every requests:
- Receives a query for a client
- Parses and splits the elements of the query
- Calls the corresponding command
- Returns the results to the client
The following is its code snippet:
public void run() { try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); BufferedReader in = new BufferedReader(new InputStreamReader( clientSocket.getInputStream()));) { String line = in.readLine(); Logger.sendMessage(line); ParallelCache cache = ConcurrentServer.getCache(); String ret = cache.get(line); if (ret == null) { Command command; String[] commandData = line.split(";"); System.out.println("Command: " + commandData[0]); switch (commandData[0]) { case "q": System.err.println("Query"); command = new ConcurrentQueryCommand(commandData); break; case "r": System.err.println("Report"); command = new ConcurrentReportCommand(commandData); break; case "s": System.err.println("Status"); command = new ConcurrentStatusCommand(commandData); break; case "z": System.err.println("Stop"); command = new ConcurrentStopCommand(commandData); break; default: System.err.println("Error"); command = new ConcurrentErrorCommand(commandData); break; } ret = command.execute(); if (command.isCacheable()) { cache.put(line, ret); } } else { Logger.sendMessage("Command "+line+" was found in the cache"); } System.out.println(ret); out.println(ret); } catch (Exception e) { e.printStackTrace(); } finally { try { clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } } }
The command part
In the command part, we have renamed all the classes as you can see in the previous fragment of code. The implementation is the same except in the ConcurrentStopCommand
class. Now, it calls the shutdown()
method of the ConcurrentServer
class to terminate the execution of the server in an ordered way. This is the execute()
method:
@Override public String execute() { ConcurrentServer.shutdown(); return "Server stopped"; }
Also, now the Command
class contains a new isCacheable()
Boolean method that returns true
if the command result is stored in the cache and false
otherwise.
Extra components of the concurrent server
We have implemented some extra components in the concurrent server: a new command to return information about the status of the server, a cache system to store the results of the commands, time saving when a request is repeated, and a log system to write error and debug information. The following sections describe each of these components.
The status command
First of all, we have a new possible query. It has the format s;
and is processed by the ConcurrentStatusCommand
class. It gets ThreadPoolExecutor
used by the server and obtains information about the status of the executor:
public class ConcurrentStatusCommand extends Command { public ConcurrentStatusCommand (String[] command) { super(command); setCacheable(false); } @Override public String execute() { StringBuilder sb=new StringBuilder(); ThreadPoolExecutor executor=ConcurrentServer.getExecutor(); sb.append("Server Status;"); sb.append("Actived Threads: "); sb.append(String.valueOf(executor.getActiveCount())); sb.append(";"); sb.append("Maximum Pool Size: "); sb.append(String.valueOf(executor.getMaximumPoolSize())); sb.append(";"); sb.append("Core Pool Size: "); sb.append(String.valueOf(executor.getCorePoolSize())); sb.append(";"); sb.append("Pool Size: "); sb.append(String.valueOf(executor.getPoolSize())); sb.append(";"); sb.append("Largest Pool Size: "); sb.append(String.valueOf(executor.getLargestPoolSize())); sb.append(";"); sb.append("Completed Task Count: "); sb.append(String.valueOf(executor.getCompletedTaskCount())); sb.append(";"); sb.append("Task Count: "); sb.append(String.valueOf(executor.getTaskCount())); sb.append(";"); sb.append("Queue Size: "); sb.append(String.valueOf(executor.getQueue().size())); sb.append(";"); sb.append("Cache Size: "); sb.append(String.valueOf (ConcurrentServer.getCache().getItemCount())); sb.append(";"); Logger.sendMessage(sb.toString()); return sb.toString(); } }
The information we obtain from the server is:
getActiveCount()
: This returns the approximate number of tasks that execute our concurrent tasks. There can be more threads in the pool, but they can be idle.getMaximumPoolSize()
: This returns the maximum number of worker-threads the executor can have.getCorePoolSize()
: This returns the core number of worker-threads the executor will have. This number determines the minimum number of threads the pool will have.getPoolSize()
: This returns the current number of threads in the pool.getLargestPoolSize()
: This returns the maximum number of threads of the pool during its execution.getCompletedTaskCount()
: This returns the number of tasks the executor has executed.getTaskCount()
: This returns the approximate number of tasks that have ever been scheduled for execution.getQueue().size()
: This returns the number of tasks that are waiting in the queue of tasks.
As we have created our executor using the newFixedThreadPool()
method of the Executor
class, our executor will have the same maximum and core worker-threads.
The cache system
We have included a cache system in our parallel server to avoid the data search that has recently been made. Our cache system has three elements:
- The CacheItem class: This class represents every element stored in the cache. It has four attributes:
- The command stored in the cache. We will store the
query
andreport
commands in the cache. - The response generated by that command.
- The creation date of the item in the cache.
- The last time this item was accessed in the cache.
- The command stored in the cache. We will store the
- The CleanCacheTask class: If we store all the commands in the cache but never delete the elements stored in it, the cache will increase its size indefinitely. To avoid this situation, we can have a task that deletes elements in the cache. We are going to implement this task as a
Thread
object. There are two options:- You can have the maximum size in the cache. If the cache has more elements than the maximum size, you can delete the elements that have been accessed less recently.
- You can delete from the cache the elements that haven't been accessed in a predefined period of time. We are going to use this approach.
- The ParallelCache class: This class implements the operations to store and retrieve elements in the cache. To store the data in the cache, we have used a
ConcurrentHashMap
data structure. As the cache will be shared between all the tasks of the server, we have to use a synchronization mechanism to protect the access to the cache avoiding data race conditions. We have three options:- We can use a non-synchronized data structure (for example, a
HashMap
) and add the necessary code to synchronize the types of access to this data structure, for example, with a lock. You can also convert aHashMap
into a synchronized structure using thesynchronizedMap()
method of theCollections
class. - Use a synchronized data structure, for example,
Hashtable
. In this case, we don't have data race conditions, but the performance can be better. - Use a concurrent data structure, for example, a
ConcurrentHashMap
class, which eliminates the possibility of data race conditions and it's optimized to work in a high concurrent environment. This is the option we're going to implement using an object of theConcurrentHashMap
class.
- We can use a non-synchronized data structure (for example, a
The code of the CleanCacheTask
class is as follows:
public class CleanCacheTask implements Runnable { private ParallelCache cache; public CleanCacheTask(ParallelCache cache) { this.cache = cache; } @Override public void run() { try { while (!Thread.currentThread().interrupted()) { TimeUnit.SECONDS.sleep(10); cache.cleanCache(); } } catch (InterruptedException e) { } } }
The class has a ParallelCache
object. Every 10 seconds, it executes the cleanCache()
method of the ParallelCache
instance.
The ParallelCache
class has five different methods. First, the constructor of the class that initializes the elements of the cache. It creates the ConcurrentHashMap
object and starts a thread that will execute the CleanCacheTask
class:
public class ParallelCache { private ConcurrentHashMap<String, CacheItem> cache; private CleanCacheTask task; private Thread thread; public static int MAX_LIVING_TIME_MILLIS = 600_000; public ParallelCache() { cache=new ConcurrentHashMap<>(); task=new CleanCacheTask(this); thread=new Thread(task); thread.start(); }
Then, there are two methods to store and retrieve an element in the cache. We use the put()
method to insert the element in the HashMap
and the get()
method to retrieve the element from the HashMap
:
public void put(String command, String response) { CacheItem item = new CacheItem(command, response); cache.put(command, item); } public String get (String command) { CacheItem item=cache.get(command); if (item==null) { return null; } item.setAccessDate(new Date()); return item.getResponse(); }
Then, the method to clean the cache used by the CleanCacheTask
class is:
public void cleanCache() { Date revisionDate = new Date(); Iterator<CacheItem> iterator = cache.values().iterator(); while (iterator.hasNext()) { CacheItem item = iterator.next(); if (revisionDate.getTime() - item.getAccessDate().getTime() > MAX_LIVING_TIME_MILLIS) { iterator.remove(); } } }
Finally, the method to shut down the cache that interrupts the thread executing the CleanCacheTask
class and the method that returns the number of elements stored in the cache is:
public void shutdown() { thread.interrupt(); } public int getItemCount() { return cache.size(); }
The log system
In all the examples of this chapter, we write information in the console using the System.out.println()
method. When you implement an enterprise application that is going to execute in a production environment, it's a better idea to use a log system to write debug and error information. In Java, log4j
is the most popular log system. In this example, we are going to implement our own log system implementing the producer/consumer concurrency design pattern. The tasks that will use our log system will be the producer, and a special task (executed as a thread) that will write the log information into a file will be the consumer. The components of this log system are:
- LogTask: This class implements the log consumer that after every 10 seconds reads the log messages stored in the queue and writes them to a file. It will be executed by a
Thread
object. - Logger: This is the main class of our log system. It has a queue where the producer will store the information and the consumer will read it. It also includes the method to add a message into the queue and a method to get all the messages stored in the queue and write them to disk.
To implement the queue, as happens with the cache system, we need a concurrent data structure to avoid any data inconsistency errors. We have two options:
- Use a blocking data structure, which blocks the thread when the queue is full (in our case, it will never be full) or empty
- Use a non-blocking data structure, which returns a special value if the queue is full or empty
We have chosen a non-blocking data structure, the ConcurrentLinkedQueue
class, which implements the Queue
interface. We use the offer()
method to insert elements in the queue and the poll()
method to get elements from it.
The LogTask
class code is very simple:
public class LogTask implements Runnable { @Override public void run() { try { while (Thread.currentThread().interrupted()) { TimeUnit.SECONDS.sleep(10); Logger.writeLogs(); } } catch (InterruptedException e) { } Logger.writeLogs(); } }
The class implements the Runnable
interface and, in the run()
method, calls the writeLogs()
method of the Logger
class every 10 seconds.
The Logger
class has five different static methods. First of all, a static block of code that initializes and starts a thread that executes the LogTask
and creates the ConcurrentLinkedQueue
class used to store the log data:
public class Logger { private static ConcurrentLinkedQueue<String> logQueue = new ConcurrentLinkedQueue<String>(); private static Thread thread; private static final String LOG_FILE = Paths.get("output", "server.log").toString(); static { LogTask task = new LogTask(); thread = new Thread(task); }
Then, there is a sendMessage()
method that receives a string as a parameter and stores that message in the queue. To store the message, it uses the offer()
method:
public static void sendMessage(String message) { logQueue.offer(new Date()+": "+message); }
A critical method of this class is the writeLogs()
class. It obtains and deletes all the log messages stored in the queue using the poll()
method of the ConcurrentLinkedQueue
class and writes them to a file:
public static void writeLogs() { String message; Path path = Paths.get(LOG_FILE); try (BufferedWriter fileWriter = Files.newBufferedWriter(path,StandardOpenOption.CREATE, StandardOpenOption.APPEND)) { while ((message = logQueue.poll()) != null) { fileWriter.write(new Date()+": "+message); fileWriter.newLine(); } } catch (IOException e) { e.printStackTrace(); } }
Finally, two methods: one to truncate the log file and another to finish the executor of the log system, which interrupts the thread that is executing LogTask
:
public static void initializeLog() { Path path = Paths.get(LOG_FILE); if (Files.exists(path)) { try (OutputStream out = Files.newOutputStream(path, StandardOpenOption.TRUNCATE_EXISTING)) { } catch (IOException e) { e.printStackTrace(); } } thread.start(); } public static void shutdown() { thread.interrupt(); }
- Building a RESTful Web Service with Spring
- 樂學Web編程:網站制作不神秘
- Java軟件開發基礎
- C語言程序設計案例精粹
- AutoCAD VBA參數化繪圖程序開發與實戰編碼
- Python Web數據分析可視化:基于Django框架的開發實戰
- Lighttpd源碼分析
- 微服務從小白到專家:Spring Cloud和Kubernetes實戰
- SciPy Recipes
- Building Business Websites with Squarespace 7(Second Edition)
- 零基礎C語言學習筆記
- Zend Framework 2 Cookbook
- Unreal Engine Game Development Cookbook
- Spring MVC Blueprints
- 數據預處理從入門到實戰:基于SQL、R、Python