[ad_1]
Each key value is associated with a
version vector that maintains
a number for each cluster node.
In essence, a version vector is a set of counters, one for
each node. A version vector for three nodes (blue, green, black)
would look something like [blue: 43, green: 54, black:
. Each time a node has an internal update, it updates
12]
its own counter, so an update in the green node would change the
vector to [blue: 43, green: 55, black: 12]
. Whenever
two nodes communicate, they synchronize their vector stamps, allowing them
to detect any simultaneous updates.
Using version vector in a key value store
The version vector can be used in a key value storage as follows.
A list of versioned values is needed, as there can be multiple values which are concurrent.
class VersionVectorKVStore…
public class VersionVectorKVStore { Map<String, List<VersionedValue>> kv = new HashMap<>();
When a client wants to store a value, it first reads the latest known version for the given key.
It then picks up the cluster node to store the value, based on the key.
While storing the value, the client passes back the known version.
The request flow is shown in the following diagram.
There are two servers named blue and green. For the key “name”,
blue is the primary server.

In the leader-less replication scheme, the client or a coordinator node picks up the node to write data based on the key.
The version vector is updated based on the primary cluster node that the key maps to.
A value with the same version vector
is copied on the other cluster nodes for replication.
If the cluster node mapping to the key is not available, the next node is chosen.
The version vector is only incremented for the first cluster node the value is saved to. All the other nodes save the copy of the data.
The code for incrementing version vector in databases like [voldemort] looks like this:
class ClusterClient…
public void put(String key, String value, VersionVector existingVersion) {
List<Integer> allReplicas = findReplicas(key);
int nodeIndex = 0;
List<Exception> failures = new ArrayList<>();
VersionedValue valueWrittenToPrimary = null;
for (; nodeIndex < allReplicas.size(); nodeIndex++) {
try {
ClusterNode node = clusterNodes.get(nodeIndex);
//the node which is the primary holder of the key value is responsible for incrementing version number.
valueWrittenToPrimary = node.putAsPrimary(key, value, existingVersion);
break;
} catch (Exception e) {
//if there is exception writing the value to the node, try other replica.
failures.add(e);
}
}
if (valueWrittenToPrimary == null) {
throw new NotEnoughNodesAvailable("No node succeeded in writing the value.", failures);
}
//Succeded in writing the first node, copy the same to other nodes.
nodeIndex++;
for (; nodeIndex < allReplicas.size(); nodeIndex++) {
ClusterNode node = clusterNodes.get(nodeIndex);
node.put(key, valueWrittenToPrimary);
}
}
The node acting as a primary is the one which increments the version number.
public VersionedValue putAsPrimary(String key, String value, VersionVector existingVersion) {
VersionVector newVersion = existingVersion.increment(nodeId);
VersionedValue versionedValue = new VersionedValue(value, newVersion);
put(key, versionedValue);
return versionedValue;
}
public void put(String key, VersionedValue value) {
versionVectorKvStore.put(key, value);
}
As can be seen in the above code, it is possible for different clients to update the same key on different nodes
for instance when a client cannot reach a specific node.
This creates a situation where different nodes have different values which are considered ‘concurrent’ according to their version vector.
As shown in the following diagram, both client1 and client2 are trying to write to the key, “name”.
If client1 cannot write to server green, the green server will be missing the value written by client1.
When client2 tries to write, but fails to connect to server blue, it will write on server green.
The version vector for the key “name”, will reflect that the servers, blue and green, have concurrent writes.

Figure 2: Concurrent updates on different replicas
Therefore the version vector based storage keeps multiple versions for any key, when the versions are considered concurrent.
class VersionVectorKVStore…
public void put(String key, VersionedValue newValue) { List<VersionedValue> existingValues = kv.get(key); if (existingValues == null) { existingValues = new ArrayList<>(); } rejectIfOldWrite(key, newValue, existingValues); List<VersionedValue> newValues = merge(newValue, existingValues); kv.put(key, newValues); } //If the newValue is older than existing one reject it. private void rejectIfOldWrite(String key, VersionedValue newValue, List<VersionedValue> existingValues) { for (VersionedValue existingValue : existingValues) { if (existingValue.descendsVersion(newValue)) { throw new ObsoleteVersionException("Obsolete version for key '" + key + "': " + newValue.versionVector); } } } //Merge new value with existing values. Remove values with lower version than the newValue. //If the old value is neither before or after (concurrent) with the newValue. It will be preserved private List<VersionedValue> merge(VersionedValue newValue, List<VersionedValue> existingValues) { List<VersionedValue> retainedValues = removeOlderVersions(newValue, existingValues); retainedValues.add(newValue); return retainedValues; } private List<VersionedValue> removeOlderVersions(VersionedValue newValue, List<VersionedValue> existingValues) { List<VersionedValue> retainedValues = existingValues .stream() .filter(v -> !newValue.descendsVersion(v)) //keep versions which are not directly dominated by newValue. .collect(Collectors.toList()); return retainedValues; }
If concurrent values are detected while reading from multiple nodes, an error is thrown, allowing the client to do possible conflict resolution.
Resolving conflicts
If multiple versions are returned from different replicas, vector clock comparison can allow the latest value to be detected.
class ClusterClient…
public List<VersionedValue> get(String key) { List<Integer> allReplicas = findReplicas(key); List<VersionedValue> allValues = new ArrayList<>(); for (Integer index : allReplicas) { ClusterNode clusterNode = clusterNodes.get(index); List<VersionedValue> nodeVersions = clusterNode.get(key); allValues.addAll(nodeVersions); } return latestValuesAcrossReplicas(allValues); } private List<VersionedValue> latestValuesAcrossReplicas(List<VersionedValue> allValues) { List<VersionedValue> uniqueValues = removeDuplicates(allValues); return retainOnlyLatestValues(uniqueValues); } private List<VersionedValue> retainOnlyLatestValues(List<VersionedValue> versionedValues) { for (int i = 0; i < versionedValues.size(); i++) { VersionedValue v1 = versionedValues.get(i); versionedValues.removeAll(getPredecessors(v1, versionedValues)); } return versionedValues; } private List<VersionedValue> getPredecessors(VersionedValue v1, List<VersionedValue> versionedValues) { List<VersionedValue> predecessors = new ArrayList<>(); for (VersionedValue v2 : versionedValues) { if (!v1.sameVersion(v2) && v1.descendsVersion(v2)) { predecessors.add(v2); } } return predecessors; } private List<VersionedValue> removeDuplicates(List<VersionedValue> allValues) { return allValues.stream().distinct().collect(Collectors.toList()); }
Just doing conflict resolution based on version vectors is not enough when there are concurrent updates.
So it’s important to allow clients to provide application-specific conflict resolvers.
A conflict resolver can be provided by the client while reading a value.
public interface ConflictResolver { VersionedValue resolve(List<VersionedValue> values); }
class ClusterClient…
public VersionedValue getResolvedValue(String key, ConflictResolver resolver) { List<VersionedValue> versionedValues = get(key); return resolver.resolve(versionedValues); }
For example, [riak] allows applications to provide conflict resolvers as explained
here.
Last Write Wins (LWW) Conflict Resolution
While the version vector allows detection of concurrent writes across a different set of servers,
they do not by themselves provide any help to clients in figuring out which value to choose in case of conflicts.
The burden is on the client to do the resolution. Sometimes clients prefer for the key value store to do
conflict resolution based on the timestamp.
While there are known issues with timestamps across servers,
the simplicity of this approach makes it a preferred choice for clients, even with the risk
of losing some updates because of issues with timestamps across servers.
They rely fully on the services like NTP to be well configured and working across the cluster.
Databases like [riak] and [voldemort]
allow users to select the ‘last write wins’ conflict resolution strategy.
To support LWW conflict resolution, a timestamp is stored with each value while its written.
class TimestampedVersionedValue…
class TimestampedVersionedValue { String value; VersionVector versionVector; long timestamp; public TimestampedVersionedValue(String value, VersionVector versionVector, long timestamp) { this.value = value; this.versionVector = versionVector; this.timestamp = timestamp; }
While reading the value, the client can use the timestamp to pick up the latest value.
The version vector is completely ignored in this case.
class ClusterClient…
public Optional<TimestampedVersionedValue> getWithLWWW(List<TimestampedVersionedValue> values) { return values.stream().max(Comparator.comparingLong(v -> v.timestamp)); }
Read repair
While allowing any cluster node to accept write requests improves availability,
it’s important that eventually all of the replicas have the same data.
One of the common methods to repair replicas happens when the client reads the data.
When the conflicts are resolved, it’s also possible to detect which nodes have older versions.
The nodes with older versions can be sent the latest versions as part of the read request handling from the client.
This is called as read repair.
Consider a scenario shown in the following diagram. Two nodes,
blue and green, have values for a key “name”.
The green node has the latest version
with version vector [blue: 1, green:1]
.
When the values are read from both the replicas, blue and green,
they are compared to find out which node is missing the latest version,
and a put request with the latest version is sent to the cluster node.

Figure 3: Read repair
Allowing concurrent updates on the same cluster node
There is a possibility of two clients writing concurrently to the same node. In the default implementation shown above, the second write will be rejected.
The basic implementation with the version number per cluster node is not enough in this case.
Consider the following scenario. With two clients trying to update the same key, the second client will get an exception,
as the version it passes in its put request is stale.

Figure 4: Two clients concurrently updating the same key
A database like [riak] gives flexibility to clients to allow these kind of concurrent writes
and prefer not getting error responses.
Using Client IDs instead of Server IDs
If each cluster client can have a unique ID, client ID can be used. A version number is stored per client ID.
Every time a client writes a value, it first reads the existing version,
increments the number associated with the client ID and writes it to the server.
class ClusterClient…
private VersionedValue putWithClientId(String clientId, int nodeIndex, String key, String value, VersionVector version) {
ClusterNode node = clusterNodes.get(nodeIndex);
VersionVector newVersion = version.increment(clientId);
VersionedValue versionedValue = new VersionedValue(value, newVersion);
node.put(key, versionedValue);
return versionedValue;
}
Because each client increments its own counter, concurrent writes create sibling values on the servers,
but concurrent writes never fail.
The above mentioned scenario, which gives error to second client, works as following:

Figure 5: Two clients concurrently updating the same key
Dotted version vectors
One of the major problems with client ID based version vectors is that the size of the version vector
is directly dependent on the number of clients.
This causes cluster nodes to accumulate too many concurrent values for a given key over time.
The problem is called as sibling explosion.
To solve this issue and still allow cluster node based
version vectors, [riak] uses a variant of version vector called
dotted version vector.
[ad_2]