官术网_书友最值得收藏!

Atomic control

When interacting with Hazelcast's distributed collections, we set and retrieve data in a consistent and atomic way in that when we modify an entry, it is immediately available on the other nodes, irrespective of their processing state. This does mean that we have to be careful when developing applications, as data may change while performing an operation. However, it is this default lockless nature that significantly increases the application's throughput and scalability, especially under load. Two of the collections that we already looked at additionally implement specific atomic capabilities that are provided by the java.util.concurrent interfaces.

As we've previously seen, the distributed map collection provided by Hazelcast is defined by its own IMap class. This actually extends ConcurrentMap, which will provide us with additional atomic operations such as putIfAbsent(key, value) and replace(key, oldValue, newValue). These capabilities may prevent any concurrent modification to some extent, as Hazelcast is able to detect when a change has occurred and handle this change appropriately within the application layer.

You can see how one can use this behavior in the following code:

public class AtomicMapExample {
  public static void main(String[] args) {
    HazelcastInstance hz = Hazelcast.newHazelcastInstance();

    IMap<String, String> capitals = hz.getMap("capitals");

    capitals.putIfAbsent("GB", "Winchester");
    System.err.println("Capital of GB (until 1066): " + capitals.get("GB"));

    String actualCapital = capitals.putIfAbsent("GB", "London");
    System.err.println ("Failed to put as already present: " + capitals.get("GB") + " = " + actualCapital);

    boolean r1 = capitals.replace("GB", "Southampton", "London");
    System.err.println("Failed to replace as incorrect old value: + capitals.get("GB") + "; [" + r1 + "]");

    boolean r2 = capitals.replace("GB", "Winchester", "London");
    System.err.println("Capital of GB (since 1066): " + capitals.get("GB") + "; [" + r2 + "]");
  }
}

Another collection that we've worked with so far is the distributed queue. Like its map counterparts, it is also specialized, this time by the IQueue interface. In this case, it extends the BlockingQueue concurrency features. These additional methods allow us to control how an application reacts to the pushing and popping of the FIFO queue. We can use the add(item), offer(item), and put(item) methods to push onto the queue, depending on whether we wish to throw an exception, return a success, or block. For the retrieval, we can use poll() and take() for instant access, blocking, or waiting for an item until it is available. However, indefinitely blocking can be problematic. The offer(item) and poll() methods can optionally take a defined timeout, allowing the application to fail more gracefully if the attempted operation cannot be completed within the specified time, as follows:

public class QueueExample {
  public static void main(String[] args) throws Exception {
    HazelcastInstance hz = Hazelcast.newHazelcastInstance();
    IQueue<String> queue = hz.getQueue("queue");
    Random rand = new Random();

    while (true) {
      queue.add(new Date() + " - " + hz.getCluster().getLocalMember() + " says hello");

      Thread.sleep(rand.nextInt(2000));
      String msg = queue.poll(5, TimeUnit.SECONDS);
      if (msg != null) {
        System.err.println(msg);
      }
    }
  }
}
主站蜘蛛池模板: 灌阳县| 高清| 安新县| 石泉县| 天门市| 大同市| 罗江县| 正蓝旗| 金堂县| 长乐市| 赤水市| 翁牛特旗| 广饶县| 汉阴县| 曲麻莱县| 芦溪县| 永平县| 兖州市| 商水县| 凤冈县| 阳山县| 巴中市| 西乡县| 得荣县| 岢岚县| 敦化市| 日照市| 余干县| 湖州市| 松江区| 洞头县| 台南县| 吐鲁番市| 丹寨县| 桂林市| 寿宁县| 辽宁省| 蚌埠市| 长武县| 湛江市| 高密市|