官术网_书友最值得收藏!

Transactionally rolling on

Now that we have looked at the simple atomic approach that we can take when dealing with the concurrency of consumption and changes to the persisted data, let's address the following question: What happens if this approach is just too simple for a use case? Well, now that we have the ability to lock both globally across the cluster and on inpidual data items, we can prevent unexpected changes to our supporting data in the middle of an operation. However, what do we do if, partway through the operation, we need to stop and undo the changes that we made?

Luckily, drawing inspiration from the traditional roots, Hazelcast provides us with transactional capabilities via the REPEATABLE_READ transaction isolation (the only transactional mode that is currently supported). Once you enter a transaction, Hazelcast will automatically acquire the appropriate key locks for each entry that it interacted with. Any changes that we write will be buffered locally until the transaction is complete. If the transaction was successful and committed, all the locally buffered changes will be flushed out to the wider cluster, and the locks will be released. If the transaction was rolled back, we will simply release the locks without flushing out the local changes.

In order to provide an application with transactional capabilities, we have to create a TransactionContext instance ourselves to track and buffer the changes. This can be additionally tuned, if required, via a passed-in TransactionOptions, which allows us to tweak the timeout and durability of a transaction. However, for our purposes, the default configuration will suit us just fine. One notable difference from the earlier map examples is that rather than obtaining an IMap reference as we did before, Hazelcast provides us with a TransactionalMap type that we must retrieve from the newly created context rather than from the main instance. This is an API design decision that helps us identify whether the interactions with these objects support transactions; due to performance-related reasons and limitations, not all data collections of Hazelcast support the transactions.

Let's look at the following example:

public class TransactionExample {
  public static void main(String[] args) throws Exception {
    HazelcastInstance hz = Hazelcast.newHazelcastInstance();


    TransactionContext tx = hz.newTransactionContext();
    tx.beginTransaction();

    TransactionalMap<String, String> testMap = tx.getMap("test");

    try {
      System.err.println(testMap.get("foo"));
      Thread.sleep(30000);

      System.err.println(testMap.get("foo"));
      testMap.put("foo", "bar");
      Thread.sleep(30000);

      tx.commitTransaction();
      System.err.println("Committed!");
    }
    catch (Exception e) {
      tx.rollbackTransaction();
      System.err.println("Rolled Back!");
    }
  }
}

If we fire up the ConsoleApp from earlier and attempt to manipulate the test map during the transaction, interacting with keys other than foo will succeed as normal. However, following the update to the entry from within the transaction, reading and writing to foo will cause the console to block until the application completes its transaction. So, from the TransactionExample application, we will consistently see the following:

Members [2] {
 Member [127.0.0.1]:5701 this
 Member [127.0.0.1]:5702
}

null
null

Even if we attempt to try disrupt that transaction from the separate console:

Members [2] {
 Member [127.0.0.1]:5701
 Member [127.0.0.1]:5702 this
}

hazelcast[test] > m.put other wibble
null

hazelcast[test] > m.get other
wibble

hazelcast[test] > m.get foo
null

hazelcast[test] > m.put foo chew
<blocked until transaction completes>
bar

We can see the process of what is going on under the hood in the following diagram:

Differences when queuing

Unlike the storage collections where the transactional nature is when writing, hence its ability to be buffered locally before flushing on commit, queues are transactional on reads. This is as if we were to take an item from the queue and then roll back. The item would need to be returned to the queue so that it could be redelivered. However, what if the node died during the transaction? It wouldn't be able to return the item back to the queue. To avoid this situation rather than buffering locally, the input values are copied to the owner member in the cluster so that the queue is buffered remotely. In this way, the node will disappear and the cluster is in a position to restore the rolled back item to the queue.

Enterprising onwards

If you are going to use Hazelcast within an enterprise J2EE container, you can also integrate this transaction support as a standard resource adapter. While the details will vary depending on the container that you are using, it would be best to follow any relevant documentation that you have for your specific case. The required hazelcast-jca-rar-3.5.rar file can be found in the lib/ directory of the previously downloaded archive.

Collectively counting up

Another piece of functionality that we may have lost in the migrating functionality that is away from the traditional data source, is the ability to generate a sequence number or an autogenerated identifier. One primary issue with the original mechanism is the single point of failure in the previous data source. Hazelcast fortunately provides us with a distributed alternative in the form of IdGenerator.

This instance provides us with a cluster-wide unique identifier generator. We can request the identifier generator to issue a new unique identifier. We have to be aware that the internal counter state is only persisted during the life span of the cluster. In case all the nodes are lost, the counter will restart at zero, unless we use the init() method to manually configure a different starting point. Let's consider the following IdGeneratorExample code:

public class IdGeneratorExample {
  public static void main(String[] args) throws Exception {
    HazelcastInstance hz = Hazelcast.newHazelcastInstance();

    IdGenerator idGen = hz.getIdGenerator("newId");

    while (true) {
      Long id = idGen.newId();
      System.err.println("New Id: " + id);
      Thread.sleep(1000);
    }
  }
}

If we run the preceding code multiple times, we will see that the generated values are unique and they count up within their own group of identifiers, as follows:

Members [1] {
 Member [127.0.0.1]:5701 this
}

New Id: 1
New Id: 2
New Id: 3


Members [2] {
 Member [127.0.0.1]:5701
 Member [127.0.0.1]:5702 this
}

New Id: 1000001
New Id: 1000002
New Id: 1000003

As you can probably tell from the output, groups of 1 million are allocated to each node to start with. Once this pool of identifiers is exhausted, a new pool of 1 million is allocated. This process is repeated as required, with possible values starting from zero. The largest value that can be issued is Long.MAX_VALUE. However, because multiple nodes in the cluster might provide identifiers concurrently, the application must cope with the nonconsecutive numbers that are being provided.

主站蜘蛛池模板: 定日县| 友谊县| 仲巴县| 沛县| 华容县| 平利县| 台北县| 吴忠市| 正定县| 清苑县| 浑源县| 清苑县| 双辽市| 崇明县| 靖安县| 静宁县| 田东县| 贵港市| 磐安县| 西和县| 武鸣县| 连江县| 武强县| 盐源县| 平舆县| 麻阳| 新乡县| 大安市| 红原县| 农安县| 普兰店市| 湄潭县| 赣榆县| 洛隆县| 神池县| 邳州市| 保康县| 陕西省| 马公市| 丹东市| 鄂伦春自治旗|