官术网_书友最值得收藏!

Atomic control

When interacting with Hazelcast's distributed collections, we set and retrieve data in a consistent and atomic way in that when we modify an entry, it is immediately available on the other nodes, irrespective of their processing state. This does mean that we have to be careful when developing applications, as data may change while performing an operation. However, it is this default lockless nature that significantly increases the application's throughput and scalability, especially under load. Two of the collections that we already looked at additionally implement specific atomic capabilities that are provided by the java.util.concurrent interfaces.

As we've previously seen, the distributed map collection provided by Hazelcast is defined by its own IMap class. This actually extends ConcurrentMap, which will provide us with additional atomic operations such as putIfAbsent(key, value) and replace(key, oldValue, newValue). These capabilities may prevent any concurrent modification to some extent, as Hazelcast is able to detect when a change has occurred and handle this change appropriately within the application layer.

You can see how one can use this behavior in the following code:

public class AtomicMapExample {
  public static void main(String[] args) {
    HazelcastInstance hz = Hazelcast.newHazelcastInstance();

    IMap<String, String> capitals = hz.getMap("capitals");

    capitals.putIfAbsent("GB", "Winchester");
    System.err.println("Capital of GB (until 1066): " + capitals.get("GB"));

    String actualCapital = capitals.putIfAbsent("GB", "London");
    System.err.println ("Failed to put as already present: " + capitals.get("GB") + " = " + actualCapital);

    boolean r1 = capitals.replace("GB", "Southampton", "London");
    System.err.println("Failed to replace as incorrect old value: + capitals.get("GB") + "; [" + r1 + "]");

    boolean r2 = capitals.replace("GB", "Winchester", "London");
    System.err.println("Capital of GB (since 1066): " + capitals.get("GB") + "; [" + r2 + "]");
  }
}

Another collection that we've worked with so far is the distributed queue. Like its map counterparts, it is also specialized, this time by the IQueue interface. In this case, it extends the BlockingQueue concurrency features. These additional methods allow us to control how an application reacts to the pushing and popping of the FIFO queue. We can use the add(item), offer(item), and put(item) methods to push onto the queue, depending on whether we wish to throw an exception, return a success, or block. For the retrieval, we can use poll() and take() for instant access, blocking, or waiting for an item until it is available. However, indefinitely blocking can be problematic. The offer(item) and poll() methods can optionally take a defined timeout, allowing the application to fail more gracefully if the attempted operation cannot be completed within the specified time, as follows:

public class QueueExample {
  public static void main(String[] args) throws Exception {
    HazelcastInstance hz = Hazelcast.newHazelcastInstance();
    IQueue<String> queue = hz.getQueue("queue");
    Random rand = new Random();

    while (true) {
      queue.add(new Date() + " - " + hz.getCluster().getLocalMember() + " says hello");

      Thread.sleep(rand.nextInt(2000));
      String msg = queue.poll(5, TimeUnit.SECONDS);
      if (msg != null) {
        System.err.println(msg);
      }
    }
  }
}
主站蜘蛛池模板: 柳州市| 监利县| 西丰县| 思茅市| 潼关县| 富宁县| 缙云县| 桐城市| 二连浩特市| 阿城市| 和林格尔县| 邳州市| 定兴县| 土默特右旗| 汾阳市| 马公市| 乌拉特后旗| 扎赉特旗| 深泽县| 永登县| 岚皋县| 宜宾县| 界首市| 黄冈市| 讷河市| 红原县| 兰西县| 兴国县| 肥东县| 米脂县| 安徽省| 弥勒县| 新泰市| 琼海市| 盐池县| 梅河口市| 达州市| 克拉玛依市| 修武县| 阳西县| 精河县|