이전 포스팅에서 JPA와 R2DBC를 비교했었는데, 이 둘은 DB Connection 접근 시 Blocking 처리인지 Non-Blocking 인지에 따라서 구분할 수 있다.

서버 프로그래밍에서 기초라고 할 수 있는 Synchronization과, 이와 혼동되기 쉽지만 엄연히 다른 개념인 Blocking에 관해서 같이 얘기해보고자 한다.

이는 Spring 뿐만 아니라 Java Application을 개발하는데 있어서 중요한 기초정보가 되므로 카테고리 또한 Spring이 아닌 Java로 정했다.

Synchronize / Asynchronize

동기와 비동기에서 중심이 되는 대상은 작업(Job)을 수행하는 주체이다. 여러 Thread 혹은 Worker를 운용하면서 작업을 수행하는 주체(객체)들 간의 상호관계를 표현한다고 보면 쉽다.

Synchronize

sync_1.png

Thread 1번에서 Thread 2를 새로 생성하고, Task(Job)을 수행하도록 호출하면 Thread 2는 작업을 시작할 것이다. 이 때 두 ‘작업을 수행하는 주체’ Thread 1과 2가 Sync 상태라면 호출하자마자 Thread 1은 2의 작업상태를 지켜보고(Await) 결과값을 직접 Return받은 후 자기작업을 이어나간다. 이처럼 메인인 Thread 1번이 Thread 2의 작업 여부, 결과값을 계속 신경쓴다는 것이 중점이다.

Asynchronize

Async_1.png

Thread 1번은 Thread 2번에 Task를 호출한 후 작업이 어찌되든 자신의 작업을 이어나간다. Thread 2번에 Return 값을 처리하도록 Callback을 따로 주기도 한다.
중점은, 메인인 Thread 1번이 Thread 2번의 작업완료 결과값과 여부를 신경쓰지 않는다는 것이다.

Blocking / Non-Blocking

동기화에서 중심이 작업을 수행하는 주체였다면 Blocking에서 중심대상은 작업 대상이 주체이다.
다시 풀어서 얘기하자면, ‘다른 작업을 수행하는 대상을 어떻게 상대할 것인가’라고 말할 수 있는데, 이는 두 개념을 각각 그림으로 표현하면 더 쉽게 이해할 수 있다.

Blocking

Blocking은 단어적인 느낌 그대로 다음 작업이 다른 작업을 수행하는 대상으로 인해 ‘막히는’ 현상을 말한다. 메인 Worker에서 호출한 Sub Worker 작업의 종료여부가 다음 작업에 영향을 끼친다면, 또는 소스 로직 상 Sub Worker의 Return으로 스택이 빠져야한다면 이는 Blocking 상태가 된다.
전통적인 서버 요청방식이나 일반적인 절차적 함수 호출이 그 대표적인 예시이다.

Non-Blocking

Non-Blocking은 Blocking과 반대당연히 반대겠지..인 개념으로 Sub Worker에게 다른 Job을 Request 한 후로 메인 Worker에서는 더 이상 영향받지 않고 이후 작업을 이어나가게 된다. Sub Worker의 결과 값이 필요하다면 Future 객체를 통해 필요한 시점에서 전달받거나 다른 객체 디자인을 통해 데이터 흐름을 이어나가면 된다.

Java Code Example

저 두 개념들을 섞어서 직접 테스트를 해보자. 총 4가지 경우의 수가 생기는데, 각각 재밌는 현상이 나온다.

  1. Synchronize + Blocking
  2. Synchronize + Non-Blocking
  3. Asynchronize + Blocking
  4. Asynchronize + Non-Blocking

2번과 3번이 참 비슷하면서도 다른 양상을 보인다. 개인적으로 이 둘을 비교하면서 많은 공부가 되었다.

Sync + Blocking

public class SyncAndBlocking {

    public static void main(String... args) {
        SyncAndBlocking sb = new SyncAndBlocking();
        try {
            sb.process();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void process() throws Exception {
        FutureTask<String> a = new FutureTask<>(new ThreadSyncA());
        Thread thread_a = new Thread(a);
        thread_a.start();

        System.out.println(a.get());
    }
}

//----------------- Worker A --------------------
public class ThreadSyncA implements Callable<String> {

    @Override
    public String call() throws Exception {

        for(int i = 0; i < 10; i++) {
            System.out.println("Thread A는 " + i + "번째 작업을 실행 중입니다.");

            if(i == 4) {
                //call ThreadB
                System.out.println("==== Thread B 호출 ====");
                FutureTask<String> b = new FutureTask<String>(new ThreadSyncB());
                Thread thread_b = new Thread(b);
                thread_b.start();

                System.out.println(b.get());
            }
        }

        return "Thread A is Done.";
    }
}
//----------------- Worker B --------------------
import java.util.concurrent.Callable;

public class ThreadSyncB implements Callable<String> {

    @Override
    public String call() throws Exception {

        for(int i = 0; i < 10; i++) {
            System.out.println("Thread B는 " + i + "번째 작업을 실행 중입니다.");
        }

        return "Thread B is Done.";
    }
}

Thread를 가지고 개발하라고 할 때 보통 필자는 이런 식으로 로직을 구성할 것이다.
0 ~9까지 출력하는 작업을 수행하다가 중간에 0 ~ 9까지 다시 세고 나머지를 출력해야한다는… 억지스러운 전개이지만 이를 Thread 2개로 수행해야한다고 하자.

Thread A에서 Loop문을 통해 0에서 9까지 출력하도록 하고 중간이 되는 i == 4 시점에서 Thread B를 새로 호출하여 작업을 수행하도록 해보자.

이에 대한 결과는 아래와 같이 나온다.

Thread A는 0번째 작업을 실행 중입니다.
Thread A는 1번째 작업을 실행 중입니다.
Thread A는 2번째 작업을 실행 중입니다.
Thread A는 3번째 작업을 실행 중입니다.
Thread A는 4번째 작업을 실행 중입니다.
==== Thread B 호출 ====
Thread B는 0번째 작업을 실행 중입니다.
Thread B는 1번째 작업을 실행 중입니다.
Thread B는 2번째 작업을 실행 중입니다.
Thread B는 3번째 작업을 실행 중입니다.
Thread B는 4번째 작업을 실행 중입니다.
Thread B는 5번째 작업을 실행 중입니다.
Thread B는 6번째 작업을 실행 중입니다.
Thread B는 7번째 작업을 실행 중입니다.
Thread B는 8번째 작업을 실행 중입니다.
Thread B는 9번째 작업을 실행 중입니다.
Thread B is Done.
Thread A는 5번째 작업을 실행 중입니다.
Thread A는 6번째 작업을 실행 중입니다.
Thread A는 7번째 작업을 실행 중입니다.
Thread A는 8번째 작업을 실행 중입니다.
Thread A는 9번째 작업을 실행 중입니다.
Thread A is Done.

예상했던 대로 Thread A는 Thread B가 호출된 시점부터 작업을 중단하고 B가 끝나기만을 기다린다.
위 예시코드의 경우 한 작업이 수행되는 동안 다른 작업은 수행되지 않아야 한다. (동기적) 서로 다른 두 개 이상의 작업이 인과 관계를 가지고 있어서 한 작업의 종료여부가 다른 작업의 종료에 영향을 주어야 할 때 사용하면 된다.
다만 동시성이 많고 처리량이 많은 비즈니스 로직에서는 독이 될 뿐이다.

Sync + Non-Blocking

public class SyncAndNonBlocking {

    public static void main(String[] args) {
        SyncAndNonBlockWorkerA workerA = new SyncAndNonBlockWorkerA();
        SyncAndNonBlockWorkerB workerB = new SyncAndNonBlockWorkerB();

        workerA.doMyJob();
    }
}

//----------------- Worker A --------------------

public class SyncAndNonBlockWorkerA {
    private final String name = "Worker A";
    private final int jobCnt = 5;

    Consumer<String> myJob = (msg) -> {
        for(int i = 0; i < jobCnt; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + "는 " + msg + " " + i + "번째 작업 수행...");
        }
        System.out.println(name + " 작업완료 : " + msg);
    };

    private final Consumer<String> requireJob = (msg) -> {
        System.out.println(msg);
        for(int i = 0; i < 10; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B는 " + i + "번째 작업 수행...");
        }
        System.out.println("Require Job for B 작업완료");
    };

    public Consumer<String> getRequireJob() {
        return this.requireJob;
    }

    public void isWorkerB_Finished(CompletableFuture<Void> wokerJoin) {
        while(!wokerJoin.isDone()) {
            try {
                Thread.sleep(50);
            } catch(InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Polling - Worker B는 아직 작업 중... ");
        }
        System.out.println("Worker B 작업 완료.");
    }

    public void doMyJob() {
        myJob.accept("First");
        CompletableFuture<Void> callWorkerB = (new AsyncAndNonBlockWorkerB()).TakeAnddoMyWork(this.requireJob);
        myJob.accept("Second");
        // B가 끝났음을 감지해야 Second 시작
        this.isWorkerB_Finished(callWorkerB);
        myJob.accept("Third");
    }
}

//----------------- Worker B --------------------
public class SyncAndNonBlockWorkerB {
    public CompletableFuture<Void> TakeAnddoMyWork(Consumer<String> requestJob) {
        return CompletableFuture.runAsync(() -> {
            requestJob.accept("I'm worker B. Stared now");
        });
    }
}

Worker A는 First 0번째 작업 수행...
Worker A는 First 1번째 작업 수행...
Worker A는 First 2번째 작업 수행...
Worker A는 First 3번째 작업 수행...
Worker A는 First 4번째 작업 수행...
Worker A 작업완료 : First
I'm worker B. Stared now
Worker A는 Second 0번째 작업 수행...
B는 0번째 작업 수행...
Worker A는 Second 1번째 작업 수행...
B는 1번째 작업 수행...
Worker A는 Second 2번째 작업 수행...
B는 2번째 작업 수행...
B는 3번째 작업 수행...
Worker A는 Second 3번째 작업 수행...
B는 4번째 작업 수행...
Worker A는 Second 4번째 작업 수행...
Worker A 작업완료 : Second
Polling - Worker B는 아직 작업 중... 
B는 5번째 작업 수행...
Polling - Worker B는 아직 작업 중... 
Polling - Worker B는 아직 작업 중... 
B는 6번째 작업 수행...
Polling - Worker B는 아직 작업 중... 
Polling - Worker B는 아직 작업 중... 
B는 7번째 작업 수행...
Polling - Worker B는 아직 작업 중... 
Polling - Worker B는 아직 작업 중... 
B는 8번째 작업 수행...
Polling - Worker B는 아직 작업 중... 
B는 9번째 작업 수행...
Require Job for B 작업완료
Polling - Worker B는 아직 작업 중... 
Worker B 작업 완료.
Worker A는 Third 0번째 작업 수행...
Worker A는 Third 1번째 작업 수행...
Worker A는 Third 2번째 작업 수행...
Worker A는 Third 3번째 작업 수행...
Worker A는 Third 4번째 작업 수행...
Worker A 작업완료 : Third

Process finished with exit code 0

이번 작업은 Worker A에서 총 3가지 작업을 진행한다. First는 독자진행, Second는 시작 전 Worker B에게 작업을 주어야 하고 Third는 Worker B가 작업이 완료되었음을 확인한 후에야 진행이 가능하다.
디버깅 결과를 보면 Worker B를 호출한 후 계속 Polling을 하면서 종료여부를 확인한다. 이는 Sync + Blocking과의 가장 중요한 차이점이라고 볼 수 있다. Worker B를 호출한 시점부터 Await을 한다면 Blocking, 호출 후 자기 작업을 계속 진행하다가 완료여부를 기다린다면 Non-Blocking 이다.
간단하고 눈에 보이도록 while loop로 Polling 했지만 Observer 등 다양한 패턴으로 구현 가능하다.

Async + Blocking

public class AsyncAndblocking {

    public static void main(String[] args) {
        AsyncAndBlockWorkerA workerA = new AsyncAndBlockWorkerA();
        AsyncAndBlockWorkerB workerB = new AsyncAndBlockWorkerB();

        CompletableFuture<Void> joinB = workerB.doMyWorkAndCallbackToA(workerA.getCallBackToA());
        CompletableFuture<Void> joinA = workerA.doMyJob();

        // Async로 진행시킨 workerB를 Blocking.
        joinA.join();
        joinB.join();
    }
}

//----------------- Worker A --------------------
public class AsyncAndBlockWorkerA {
    private boolean WorkerBStatus = false;
    private final String name = "Worker A";
    private final int jobCnt = 10;

    Consumer<String> myJob = (msg) -> {
        for(int i = 0; i < jobCnt; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + "는 " + msg + " " + i + "번째 작업 수행...");
        }
        System.out.println(name + " 작업완료 : " + msg);
    };

    // polling
    private void waitWorkBFinished() {
        while (!WorkerBStatus) {
            System.out.println("A: 나는 B가 끝나기를 기다리고 있습니다.");

            try {
                Thread.sleep(120);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public CompletableFuture<Void> doMyJob() {
        return CompletableFuture.runAsync(() -> {
            myJob.accept("First");
            this.waitWorkBFinished();
            myJob.accept("Second");
        });
    }

    Consumer<Void> callBackToA = (Void) -> {
        WorkerBStatus = true;
        System.out.println("Signal!! Worker B sent Callback To A for Finish First Job");
    };

    public Consumer<Void> getCallBackToA() {
        return this.callBackToA;
    }
}
//----------------- Worker B --------------------
public class AsyncAndBlockWorkerB {
    private final String name = "Worker B";
    private final int jobCnt = 20;

    Consumer<String> myJob = (msg) -> {
        for(int i = 0; i < jobCnt; i++) {
            System.out.println(name + "는 " + msg + " " + i + "번째 작업 수행...");

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(name + " 작업완료 : " + msg);
    };

    public CompletableFuture<Void> doMyWorkAndCallbackToA(Consumer<Void> callback) {
        return CompletableFuture.runAsync(() -> {
            myJob.accept("First");
            callback.accept(null);
            myJob.accept("Second");
        });
    }
}

Worker A는 First 0번째 작업 수행...
Worker B는 First 0번째 작업 수행...
Worker A는 First 1번째 작업 수행...
Worker A는 First 2번째 작업 수행...
Worker A는 First 3번째 작업 수행...
Worker A는 First 4번째 작업 수행...
Worker A는 First 5번째 작업 수행...
Worker A는 First 6번째 작업 수행...
Worker A는 First 7번째 작업 수행...
Worker A는 First 8번째 작업 수행...
Worker A는 First 9번째 작업 수행...
Worker A 작업완료 : First
A: 나는 B가 끝나기를 기다리고 있습니다.
A: 나는 B가 끝나기를 기다리고 있습니다.
A: 나는 B가 끝나기를 기다리고 있습니다.
A: 나는 B가 끝나기를 기다리고 있습니다.
A: 나는 B가 끝나기를 기다리고 있습니다.
A: 나는 B가 끝나기를 기다리고 있습니다.
A: 나는 B가 끝나기를 기다리고 있습니다.
Worker B는 First 1번째 작업 수행...
A: 나는 B가 끝나기를 기다리고 있습니다.
Worker B는 First 2번째 작업 수행...
Worker B는 First 3번째 작업 수행...
A: 나는 B가 끝나기를 기다리고 있습니다.
Worker B는 First 4번째 작업 수행...
Worker B는 First 5번째 작업 수행...
A: 나는 B가 끝나기를 기다리고 있습니다.
Worker B는 First 6번째 작업 수행...
A: 나는 B가 끝나기를 기다리고 있습니다.
Worker B는 First 7번째 작업 수행...
Worker B는 First 8번째 작업 수행...
A: 나는 B가 끝나기를 기다리고 있습니다.
Worker B는 First 9번째 작업 수행...
Worker B는 First 10번째 작업 수행...
Worker B는 First 11번째 작업 수행...
Worker B는 First 12번째 작업 수행...
A: 나는 B가 끝나기를 기다리고 있습니다.
Worker B는 First 13번째 작업 수행...
Worker B는 First 14번째 작업 수행...
Worker B는 First 15번째 작업 수행...
Worker B는 First 16번째 작업 수행...
A: 나는 B가 끝나기를 기다리고 있습니다.
Worker B는 First 17번째 작업 수행...
Worker B는 First 18번째 작업 수행...
A: 나는 B가 끝나기를 기다리고 있습니다.
Worker B는 First 19번째 작업 수행...
A: 나는 B가 끝나기를 기다리고 있습니다.
A: 나는 B가 끝나기를 기다리고 있습니다.
A: 나는 B가 끝나기를 기다리고 있습니다.
A: 나는 B가 끝나기를 기다리고 있습니다.
Worker B 작업완료 : First
A: 나는 B가 끝나기를 기다리고 있습니다.
Signal!! Worker B sent Callback To A for Finish First Job
Worker A는 Secode 0번째 작업 수행...
Worker B는 Secode 0번째 작업 수행...
Worker A는 Secode 1번째 작업 수행...
Worker A는 Secode 2번째 작업 수행...
Worker A는 Secode 3번째 작업 수행...
Worker A는 Secode 4번째 작업 수행...
Worker A는 Secode 5번째 작업 수행...
Worker A는 Secode 6번째 작업 수행...
Worker A는 Secode 7번째 작업 수행...
Worker A는 Secode 8번째 작업 수행...
Worker A는 Secode 9번째 작업 수행...
Worker A 작업완료 : Secode
Worker B는 Secode 1번째 작업 수행...
Worker B는 Secode 2번째 작업 수행...
Worker B는 Secode 3번째 작업 수행...
Worker B는 Secode 4번째 작업 수행...
Worker B는 Secode 5번째 작업 수행...
Worker B는 Secode 6번째 작업 수행...
Worker B는 Secode 7번째 작업 수행...
Worker B는 Secode 8번째 작업 수행...
Worker B는 Secode 9번째 작업 수행...
Worker B는 Secode 10번째 작업 수행...
Worker B는 Secode 11번째 작업 수행...
Worker B는 Secode 12번째 작업 수행...
Worker B는 Secode 13번째 작업 수행...
Worker B는 Secode 14번째 작업 수행...
Worker B는 Secode 15번째 작업 수행...
Worker B는 Secode 16번째 작업 수행...
Worker B는 Secode 17번째 작업 수행...
Worker B는 Secode 18번째 작업 수행...
Worker B는 Secode 19번째 작업 수행...
Worker B 작업완료 : Secode

Process finished with exit code 0

비동기적이면서 Blocking인 작업은, 언뜻 보면 Sync + Non-Blocking과 비슷하다고 느낄 수 있다. 왜냐하면, A가 B의 작업을 기다리고 있다는 점은 동일하기 때문이다.

그러나 이 둘의 가장 큰 차이점은, A가 직접 Return 받느냐, B가 Callback을 통해 Signal을 넣느냐이다.
위 Sync + Non-Blocking으로 다시 올라가보자. return 으로 받는 Completable 객체의 결과값이 While loop를 탈출하는 조건값으로 설정되어있다. 이는 A가 B의 결과값을 직접 판단한다는 것이지 않은가?
Async + Blocking을 보자. B는 시작 전 A로부터 종료되면 Signal을 보내달라는 Callback을 받으면서 작업이 시작된다. 작업이 완료된 후 Callback을 실행하면서 A에게 끝났다는 신호를 주게된다.

A는 B가 주는 신호를 기다릴 뿐 직접 return 값을 판단하지 않는다. 그렇기 때문에 디버깅에서도 보이듯, B가 끝났음과는 상관없이 Signal을 받아야 Await logging을 끝낸다.

Async + Non-Blocking

public class AsyncAndNonBlocking {

    public static void main(String[] args) {
        AsyncAndNonBlockWorkerA workerA = new AsyncAndNonBlockWorkerA();
        CompletableFuture<Void> joinA = workerA.doMyJob();

        joinA.join();
    }
}

//----------------- Worker A --------------------

public class AsyncAndNonBlockWorkerA {
    private final String name = "Worker A";
    private final int jobCnt = 10;

    Consumer<String> myJob = (msg) -> {
        for(int i = 0; i < jobCnt; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + "는 " + msg + " " + i + "번째 작업 수행...");
        }
        System.out.println(name + " 작업완료 : " + msg);
    };

    Consumer<String> requireJob = (msg) -> {
        System.out.println(msg);
        for(int i = 0; i < jobCnt; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B는 " + i + "번째 작업 수행...");
        }
        System.out.println("Require Job for B 작업완료");
    };

    public CompletableFuture<Void> doMyJob() {
        return CompletableFuture.runAsync(() -> {
            myJob.accept("First");
            System.out.println("Worker b 호출!!");
            CompletableFuture<Void> callWorkerB = (new AsyncAndNonBlockWorkerB()).TakeAnddoMyWork(this.requireJob);
            myJob.accept("Second");
        });
    }
}
//----------------- Worker B --------------------
public class AsyncAndNonBlockWorkerB {
    private final String name = "Worker B";
    private final int jobCnt = 10;

    public CompletableFuture<Void> TakeAnddoMyWork(Consumer<String> requestJob) {
        return CompletableFuture.runAsync(() -> {
            requestJob.accept("I'm worker B. Stared now");
        });
    }
}

Worker A는 First 0번째 작업 수행...
Worker A는 First 1번째 작업 수행...
Worker A는 First 2번째 작업 수행...
Worker A는 First 3번째 작업 수행...
Worker A는 First 4번째 작업 수행...
Worker A는 First 5번째 작업 수행...
Worker A는 First 6번째 작업 수행...
Worker A는 First 7번째 작업 수행...
Worker A는 First 8번째 작업 수행...
Worker A는 First 9번째 작업 수행...
Worker A 작업완료 : First
Worker b 호출!!
I'm worker B. Stared now
Worker A는 Second 0번째 작업 수행...
B는 0번째 작업 수행...
B는 1번째 작업 수행...
Worker A는 Second 1번째 작업 수행...
B는 2번째 작업 수행...
Worker A는 Second 2번째 작업 수행...
B는 3번째 작업 수행...
Worker A는 Second 3번째 작업 수행...
B는 4번째 작업 수행...
Worker A는 Second 4번째 작업 수행...
B는 5번째 작업 수행...
Worker A는 Second 5번째 작업 수행...
B는 6번째 작업 수행...
Worker A는 Second 6번째 작업 수행...
B는 7번째 작업 수행...
Worker A는 Second 7번째 작업 수행...
B는 8번째 작업 수행...
Worker A는 Second 8번째 작업 수행...
B는 9번째 작업 수행...
Require Job for B 작업완료
Worker A는 Second 9번째 작업 수행...
Worker A 작업완료 : Second

Process finished with exit code 0

처음에는 두 Worker를 독립적으로 실행시키려다가 Worker A에서 중간에 Worker B를 호출하는 방향으로 로직을 구성하는 것이 더 재밌을 것 같아서 이렇게 구성해봤다.
호출된 후로 A와 B는 서로 신경쓰지 않고 작업을 수행하다가 자기의 일이 완료되면 그대로 return, 끝을 내버린다.

4가지 경우 중에 뭐가 좋고 뭐가 안좋다… 라는 절대적인 기준은 없다는걸 프로그래머들은 모두 공감할 것이다. 이 4가지 조합을 가장 적합한 로직에 사용하는 것이 초급 프로그래머를 벗어나는 첫 번째 과제가 아닐까….? 라는 개인적 생각을 해본다.

Written on May 14, 2022