Logo
Abdelghani Roussi's Blog
Published on

Managing concurrent writes to database using Leader Election with Spring Boot and Zookeeper

Authors
  • Name
    Twitter

Introduction

In a distributed system, it is common to have multiple instances of an application running at the same time. This can lead to issues when multiple instances try to write to the same resource concurrently. To avoid these issues, we can use a leader election algorithm to ensure that only one instance is allowed to write to the resource at a time.

In this article, we will learn how to manage concurrent writes to a database using Leader Election with Spring Boot and Zookeeper.

What is Leader Election?

Leader Election is a distributed algorithm that is used to select a single leader from a group of nodes. The leader is responsible for coordinating the actions of the other nodes in the group. Leader Election is commonly used in distributed systems to ensure that only one node is responsible for performing certain tasks, such as writing to a database.

What is Zookeeper?

Zookeeper is a distributed coordination service that is used to manage and synchronize the configuration and state of distributed systems. Zookeeper provides a simple and reliable way to implement distributed systems by providing a centralized service for maintaining configuration information, naming, providing distributed synchronization, and group services.

differents solutions to manage concurrent writes

There are several ways to manage concurrent writes in a distributed system:

  • Pessimistic Locking: In this approach, we use locks to prevent multiple instances from writing to the resource at the same time. This can lead to contention and performance issues.
  • Optimistic Locking: In this approach, we use version numbers or timestamps to detect conflicts and resolve them. This can lead to write conflicts and requires retry logic.
  • Leader Election: In this approach, we use a leader election algorithm to select a single leader from a group of nodes. The leader is responsible for coordinating the actions of the other nodes and ensuring that only one node writes to the resource at a time.

Many database provider use an implementation of one of those strategy; Couchbase use CAS (Compare and swap) which is a kind of optimistic locking, the CAS is a value representing the current state of an item. Each time the item is modified, its CAS changes and it's returned as part of a document’s metadata whenever a document is accessed.

The drawbacks of this kind of solution is that it's implemented at database level and extended to the application level. The day you'll change the database type you will be forced to adapt your application as will. Also in case of scheduled task, all instances will run at the same time wasting I/O, and bandwidth resources.

In this article we will focus on the Leader Election strategy using Zookeeper.

Enough definitions let's move to the code

In our example we will use Zookeeper which provide leader election mechanism implementation, docker and spring integration, so make sure you have java and docker installed in your machine and let's start.

What will we do ?
  1. We will run 3 nodes of ZooKeeper using docker images (ZooKeeper cluster is called ensemble, 3 Zookeeper servers is the minimum recommended size for an ensemble).
  2. We will use spring-integration-zookeeper to connect our spring boot application to the ZooKeeper.
  3. We will define a scheduled task to be executed using spring scheduler.
  4. We will run 3 instances of our application on 3 different ports and then watch the logs

Step 1 : Run ZooKeeper ensemble

To run the ZooKeeper ensemble, create a docker-compose.yml file with the following content:

docker-compose.yml
version: '3.1'

services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181

This file defines a ZooKeeper ensemble with 3 nodes running on ports 2181, 2182, and 2183. The ZOO_MY_ID environment variable is used to specify the ID of each node, and the ZOO_SERVERS environment variable is used to specify the list of servers in the ensemble.

To start the ZooKeeper ensemble, run the following command:

docker-compose up

Step 2 : Connect to zookeeper ensemble and create a scheduled service

Add the following dependencies to your pom.xml file:

pom.xml
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zookeeper</artifactId>
    <version>5.1.7.RELEASE</version>
</dependency>

Create a ZookeeperConfiguration class to configure the connection to the ZooKeeper ensemble:

<span style={{ backgroundColor : '#febc22', color : 'black'}}>ZookeeperConfiguration.java</span>

```java
@Configuration

public class ZookeeperConfiguration {

    public static final String ZOOKEEPER_CLIENT_OP= "localhost:2181";
    public static final String ZOOKEEPER_LEADER_STORAGE= "/my-app/leader";
    public static final String ZOOKEEPER_LEADER_ROLE= "my-app-leadership";


    @Value("${project.basedirectory}")
    private String baseDir;


    /**
     * Config Curator that handles the complexity of managing
     * connections to the ZooKeeper cluster and retrying operations
     * @return
     */
    @Bean
    public CuratorFrameworkFactoryBean curatorFramework(){
        return new CuratorFrameworkFactoryBean(ZOOKEEPER_CLIENT_OP);
    }


    @Bean
    public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client){
        String zookeeperLeaderDataLocation = String.format("%s%s", baseDir, ZOOKEEPER_LEADER_STORAGE);
        System.out.println(zookeeperLeaderDataLocation);
        return new LeaderInitiatorFactoryBean().setClient(client)
                .setPath(zookeeperLeaderDataLocation)
                .setRole(ZOOKEEPER_LEADER_ROLE);
    }
}

To make thing more interesting we'll create an Aspect around the execution of a method annotated with a custom annotation (that we will create), so that we can simply annotate methods that could run concurrently and in the aspect we will check if the current service instance is running for the leader or not:

Leader.java
/**
 * Annotate method that will be executed in the same time by many instances
 * Only the leader will execute method annotated with {@link Leader}
 *
 * @author Abdelghani ROUSSI
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Leader {
}

And then our aspect: LeaderElectionAspect.java

**
 * Aspect that watch for @Leader annotated method
 *
 * @author Abdelghani ROUSSI
 */
@Slf4j
@Aspect
@Component
public class LeaderElectionAspect {


    private final LeaderInitiator leaderInitiator;


    public LeaderElectionAspect(LeaderInitiator leaderInitiator) {
        this.leaderInitiator = leaderInitiator;
    }


    /**
     * Execute method annotated with {@link Leader} only if the current node is a leader
     * @param joinPoint
     * @throws Throwable
     */
    @Around(value = "@annotation(com.example.leaderelection.aspect.Leader)")
    public void aroundLeaderAnnotation(ProceedingJoinPoint joinPoint) throws Throwable {
        if(this.leaderInitiator.getContext().isLeader()){
            log.info("=====  I'm the leader I'll execute the Scheduled tasks =====");
            joinPoint.proceed();
        }
    }
}

Last in this step we will create the scheduled method and annotate it with @Leader: ScheduledService.java

```java
@Service
public class ScheduledService {
    /**
    * Run every 30s
    */
    @Leader
    @Scheduled(cron = "0/30 * * * * *")
    public void runBatch(){
      log.info("===== START BATCH ====== {}");
    }
}

Don't forget to add @EnableScheduling to enable scheduling and @EnableAspectJAutoProxy to enable aspect.

Step 3 : Run 3 instances of the application

To run 3 instances of the application, create a docker-compose.yml file with the following content:

version: '3.1'

services:
  app1:
    build: .
    ports:
      - 8080:8080
    environment:
      SERVER_PORT: 8080

  app2:
    build: .
    ports:
      - 8081:8080
    environment:
      SERVER_PORT: 8081

  app3:
    build: .
    ports:
      - 8082:8080
    environment:
      SERVER_PORT: 8082

This file defines 3 instances of the application running on ports 8080, 8081, and 8082.

or you can run the application using the following command:

./mvnw spring-boot:run -Dserver.port=8001
./mvnw spring-boot:run -Dserver.port=8002
./mvnw spring-boot:run -Dserver.port=8003

Then you'll see in one of your terminals :

[eaderSelector-0] o.s.integration.leader.DefaultCandidate  : DefaultCandidate{role=my-app-leadership, id=bd74b45e-3011-4416-bba0-ec5c88432e64} has been granted leadership; context: CuratorContext{role=my-app-leadership, id=bd74b45e-3011-4416-bba0-ec5c88432e64, isLeader=true}
[   scheduling-1] c.e.l.aspect.LeaderElectionAspect        : =====  I'm the leader I'll execute the Scheduled tasks =====
[   scheduling-1] c.e.l.LeaderElectionApplication          : ===== START BATCH ====== {}

This means that the specific node has been elected as leader, and he will own the execution of the scheduled task, for others they will be in standby mode (just watching for leader, if he dies one of them will be elected as leader and will execute the service).

Finally. As we've seen leader election seems to be a simple and efficient way to handle concurrent task execution for distributed cloud native application, using it will give us :

  • Data consistency.
  • Data storage provider agnostic since it is done on service layer.
  • Conserves resources by reducing chance of work being done by multiple service instances.
  • Simple, effective and can be customised to add business rules before and after triggering.
  • Can be introduced as auto-configuration in any microservice, by just adding the dependency and annotate desired methods with @Leader.

Conclusion

In this article, we learned how to manage concurrent writes to a database using Leader Election with Spring Boot and Zookeeper. We created a ZooKeeper ensemble with 3 nodes, connected our Spring Boot application to the ensemble, and created a scheduled task that is executed by the leader node. We also created an Aspect to check if the current node is the leader before executing the scheduled task.

Leader Election is a powerful technique that can be used to ensure that only one node writes to a resource at a time. It can help to prevent data corruption and improve the performance of distributed systems. I hope this article has been helpful in understanding how to use Leader Election with Spring Boot and Zookeeper.

References