Thursday, September 21, 2023

Lease

[ad_1]

A cluster node can ask for a lease for a limited period of time, after which it expires.
The node can renew the lease before it expires if it wants to extend the access.
Implement the lease mechanism with Consistent Core to provide fault tolerance,
and consistency. Have a ‘time to live’ value associated with the lease.
Cluster nodes can create keys in a Consistent Core with a lease attached to it.
The leases are replicated with the Leader and Followers to provide fault tolerance.
It’s the responsibility of the node which owns the lease to periodically refresh it.
HeartBeat is used by the clients to refresh the time
to live value in the consistent core.
The leases are created on all the nodes in the Consistent Core,
but only the leader tracks the lease timeouts.
The timeouts are not tracked on the followers in the Consistent Core.
This is done because we need the leader to decide when the lease expires using its
own monotonic clock, and then let the followers know when the lease expires.
This makes sure that, like any other decision in the Consistent Core,
nodes also reach consensus about lease expiration.

When a node from a consistent core becomes a leader, it starts tracking leases.

class ReplicatedKVStore…

  public void onBecomingLeader() {
      leaseTracker = new LeaderLeaseTracker(this, new SystemClock(), server);
      leaseTracker.start();
  }

Leader starts a scheduled task to periodically check for lease expiration

class LeaderLeaseTracker…

  private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
  private ScheduledFuture<?> scheduledTask;
  @Override
  public void start() {
      scheduledTask = executor.scheduleWithFixedDelay(this::checkAndExpireLeases,
              leaseCheckingInterval,
              leaseCheckingInterval,
              TimeUnit.MILLISECONDS);

  }

  @Override
  public void checkAndExpireLeases() {
      remove(expiredLeases());
  }

  private void remove(Stream<String> expiredLeases) {
      expiredLeases.forEach((leaseId)->{
          //remove it from this server so that it doesnt cause trigger again.
          expireLease(leaseId);
          //submit a request so that followers know about expired leases
          submitExpireLeaseRequest(leaseId);
      });
  }

  private Stream<String> expiredLeases() {
      long now = System.nanoTime();
      Map<String, Lease> leases = kvStore.getLeases();
      return  leases.keySet().stream()
              .filter(leaseId -> {
          Lease lease = leases.get(leaseId);
          return lease.getExpiresAt() < now;
      });
  }

Followers start a no-op lease tracker.

class ReplicatedKVStore…

  public void onCandidateOrFollower() {
      if (leaseTracker != null) {
          leaseTracker.stop();
      }
      leaseTracker = new FollowerLeaseTracker(this, leases);
  }

The lease is represented simply as following:

public class Lease implements Logging {
    String name;
    long ttl;
    //Time at which this lease expires
    long expiresAt;

    //The keys from kv store attached with this lease
    List<String> attachedKeys = new ArrayList<>();

    public Lease(String name, long ttl, long now) {
        this.name = name;
        this.ttl = ttl;
        this.expiresAt = now + ttl;
    }

    public String getName() {
        return name;
    }

    public long getTtl() {
        return ttl;
    }

    public long getExpiresAt() {
        return expiresAt;
    }

    public void refresh(long now) {
        expiresAt = now + ttl;
        getLogger().info("Refreshing lease " + name + " Expiration time is " + expiresAt);
    }

    public void attachKey(String key) {
        attachedKeys.add(key);
    }

    public List<String> getAttachedKeys() {
        return attachedKeys;
    }
}

When a node wants to create a lease, it connects with the leader of the Consistent Core and sends a request to create a lease.
The register lease request is replicated and handled similar to other requests in Consistent Core.
The request is complete only when the High-Water Mark reaches the log index of the request entry in the replicated log.

class ReplicatedKVStore…

  private ConcurrentHashMap<String, Lease> leases = new ConcurrentHashMap<>();
@Override
public CompletableFuture<Response> registerLease(String name, long ttl) {
    if (leaseExists(name)) {
        return CompletableFuture
                .completedFuture(
                        Response.error(Errors.DUPLICATE_LEASE_ERROR,
                            "Lease with name " + name + " already exists"));
    }
    return server.propose(new RegisterLeaseCommand(name, ttl));
}

private boolean leaseExists(String name) {
    return leases.containsKey(name);
}

An important thing to note is where to validate for duplicate lease registration.
Checking it before proposing the request is not enough, as there can be multiple in-flight requests.
So the server also checks for duplicates when the lease is registered after successful replication.

class LeaderLeaseTracker…

  private Map<String, Lease> leases;
  @Override
  public void addLease(String name, long ttl) throws DuplicateLeaseException {
      if (leases.get(name) != null) {
          throw new DuplicateLeaseException(name);
      }
      Lease lease = new Lease(name, ttl, clock.nanoTime());
      leases.put(name, lease);
  }

Figure 1: Register Lease

The node responsible for the lease connects to the leader and refreshes the lease before it expires.
As discussed in HeartBeat, it needs to consider the network
round trip time to decide on the ‘time to live’ value, and
send refresh requests before the lease expires. The node can send refresh requests
multiple times within the ‘time to live’ time interval,
to ensure that lease is refreshed in case of any issues.
But the node also needs to make sure that too many refresh requests are not sent.
It’s reasonable to send a request after about half of the lease time is elapsed.
This results in up to two refresh requests within the lease time.
The client node tracks the time with its own monotonic clock.

class LeaderLeaseTracker…

  @Override
  public void refreshLease(String name) {
      Lease lease = leases.get(name);
      lease.refresh(clock.nanoTime());
  }

Refresh requests are sent only to the leader of the Consistent Core,
because only the leader is responsible for deciding when the lease expires.

Figure 2: Refresh Lease

When the lease expires, it is removed from the leader. It’s also critical for this information to be committed to the Consistent Core.
So the leader sends a request to expire the lease, which is handled like other requests in Consistent Core.
Once the High-Water Mark reaches the proposed expire lease request, it’s removed from all the followers.

class LeaderLeaseTracker…

  public void expireLease(String name) {
      getLogger().info("Expiring lease " + name);
      Lease removedLease = leases.remove(name);
      removeAttachedKeys(removedLease);
  }

Figure 3: Expire Lease

Attaching the lease to keys in the key value storage

A cluster needs to know if one of its nodes fails.
It can do that by having the node take a lease from the Consistent Core,
and then attach it to a self-identifying key that it stores within the Consistent Core.
If the cluster node is running, it should renew the lease at regular intervals.
Should the lease expire, the associated keys are removed.
When the key is removed, an event indicating the node failure is sent to the interested cluster
node as discussed in the State Watch pattern.

The cluster node using the Consistent Core, creates a lease by making a network call,
like following:

consistentCoreClient.registerLease("server1Lease", TimeUnit.SECONDS.toNanos(5));

It can then attach this lease to the self-identifying key it stores in the Consistent Core.

consistentCoreClient.setValue("/servers/1", "{address:192.168.199.10, port:8000}", "server1Lease");

When the Consistent Core receives the message to save the key in its key-value store,
it also attaches the key to the specified lease.

class ReplicatedKVStore…

  private ConcurrentHashMap<String, Lease> leases = new ConcurrentHashMap<>();

class ReplicatedKVStore…

  private Response applySetValueCommand(Long walEntryId, SetValueCommand setValueCommand) {
      getLogger().info("Setting key value " + setValueCommand);
      if (setValueCommand.hasLease()) {
          Lease lease = leases.get(setValueCommand.getAttachedLease());
          if (lease == null) {
              //The lease to attach is not available with the Consistent Core
              return Response.error(Errors.NO_LEASE_ERROR,
                      "No lease exists with name "
                              + setValueCommand.getAttachedLease(), 0);
          }

          lease.attachKey(setValueCommand.getKey());

      }
      kv.put(setValueCommand.getKey(), new StoredValue(setValueCommand.getValue(), walEntryId));

Once the lease expires, the Consistent Core also removes the attached keys from its key-value store.

class LeaderLeaseTracker…

  public void expireLease(String name) {
      getLogger().info("Expiring lease " + name);
      Lease removedLease = leases.remove(name);
      removeAttachedKeys(removedLease);
  }
private void removeAttachedKeys(Lease removedLease) {
    if (removedLease == null) {
        return;
    }
    List<String> attachedKeys = removedLease.getAttachedKeys();
    for (String attachedKey : attachedKeys) {
        getLogger().trace("Removing " + attachedKey + " with lease " + removedLease);
        kvStore.remove(attachedKey);
    }
}

[ad_2]

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments