官术网_书友最值得收藏!

Atomic control

When interacting with Hazelcast's distributed collections, we set and retrieve data in a consistent and atomic way in that when we modify an entry, it is immediately available on the other nodes, irrespective of their processing state. This does mean that we have to be careful when developing applications, as data may change while performing an operation. However, it is this default lockless nature that significantly increases the application's throughput and scalability, especially under load. Two of the collections that we already looked at additionally implement specific atomic capabilities that are provided by the java.util.concurrent interfaces.

As we've previously seen, the distributed map collection provided by Hazelcast is defined by its own IMap class. This actually extends ConcurrentMap, which will provide us with additional atomic operations such as putIfAbsent(key, value) and replace(key, oldValue, newValue). These capabilities may prevent any concurrent modification to some extent, as Hazelcast is able to detect when a change has occurred and handle this change appropriately within the application layer.

You can see how one can use this behavior in the following code:

public class AtomicMapExample {
  public static void main(String[] args) {
    HazelcastInstance hz = Hazelcast.newHazelcastInstance();

    IMap<String, String> capitals = hz.getMap("capitals");

    capitals.putIfAbsent("GB", "Winchester");
    System.err.println("Capital of GB (until 1066): " + capitals.get("GB"));

    String actualCapital = capitals.putIfAbsent("GB", "London");
    System.err.println ("Failed to put as already present: " + capitals.get("GB") + " = " + actualCapital);

    boolean r1 = capitals.replace("GB", "Southampton", "London");
    System.err.println("Failed to replace as incorrect old value: + capitals.get("GB") + "; [" + r1 + "]");

    boolean r2 = capitals.replace("GB", "Winchester", "London");
    System.err.println("Capital of GB (since 1066): " + capitals.get("GB") + "; [" + r2 + "]");
  }
}

Another collection that we've worked with so far is the distributed queue. Like its map counterparts, it is also specialized, this time by the IQueue interface. In this case, it extends the BlockingQueue concurrency features. These additional methods allow us to control how an application reacts to the pushing and popping of the FIFO queue. We can use the add(item), offer(item), and put(item) methods to push onto the queue, depending on whether we wish to throw an exception, return a success, or block. For the retrieval, we can use poll() and take() for instant access, blocking, or waiting for an item until it is available. However, indefinitely blocking can be problematic. The offer(item) and poll() methods can optionally take a defined timeout, allowing the application to fail more gracefully if the attempted operation cannot be completed within the specified time, as follows:

public class QueueExample {
  public static void main(String[] args) throws Exception {
    HazelcastInstance hz = Hazelcast.newHazelcastInstance();
    IQueue<String> queue = hz.getQueue("queue");
    Random rand = new Random();

    while (true) {
      queue.add(new Date() + " - " + hz.getCluster().getLocalMember() + " says hello");

      Thread.sleep(rand.nextInt(2000));
      String msg = queue.poll(5, TimeUnit.SECONDS);
      if (msg != null) {
        System.err.println(msg);
      }
    }
  }
}
主站蜘蛛池模板: 定襄县| 神农架林区| 万安县| 读书| 郁南县| 罗田县| 怀远县| 昌宁县| 西畴县| 青阳县| 亚东县| 达拉特旗| 昆明市| 伊金霍洛旗| 宣威市| 宕昌县| 富蕴县| 乌海市| 那曲县| 兰考县| 福泉市| 曲阜市| 宁南县| 文登市| 鄂尔多斯市| 锡林浩特市| 岢岚县| 华蓥市| 涪陵区| 偃师市| 石景山区| 边坝县| 漳平市| 广灵县| 屏南县| 鄂托克前旗| 祁东县| 禄劝| 长海县| 阳新县| 博爱县|