희디비
[Java] 생산자 소비자 문제 본문
생산자 소비자 문제
생산 하는 스레드 / 소비 하는 스레드를 나누어 생각해 보겠습니다.
생산 스레드는 파일에 데이터를 읽는 역할을 하고
소비 스레드는 읽은 데이터를 처리 하는 역할을 합니다.
이렇게 처리 하려면 어딘가 데이터를 보관하는 자료 구조가 필요 합니다.
자료 구조를 Queue 로 만들고 이름을 버퍼 라고 하고 버퍼의 크기를 2개로 설정 하겠습니다.
Bounded Queue는 여러 스레드에서 접근 할수 있기 때문에 synchronized가 걸려 있습니다.
생산자 스레드만 먼저 실행 해보겠습니다.
생산자 1번 스레드가 락을 획득해 버퍼에 데이터를 넣고 락을 반납 하였습니다.
2,3 번 스레드는 락을 획득 하지 못해 락 대기 집합에서 blocked 상태로 있습니다.
2번 스레드가 락을 획득해 버퍼에 데이터를 채우고 락을 반납 하고 종료 합니다.
3번 스레드가 락을 획득 했습니다. 이제 버퍼에 데이터를 넣으려고 합니다.
하지만 버퍼가 가득 차서 넣지 못합니다. 이러한 문제를 한정된 버퍼 문제 라고 합니다.
어떻게 해결 할 수 있을까요?
기다리다 보면 소비자 스레드가 데이터를 소비 하지않을까요?
"소비자 스레드가 데이터를 소비 해줄거야" 라고 기다리며 1초 마다 깨어나 버퍼를 체크해 보겠습니다.
while (queue.size() == max){
log("[put] 큐가 가득 참 생산자 대기 ");
sleep(1000);
}
queue.offer(data);
소비자 스레드가 1초마다 깨어나 버퍼를 확인 하고 있습니다.
그런데 .....? 큐에 데이터가 계속 없다고 뜹니다.
뭐지?... 왜 데이터 소비가 안되고 있을까?
소비자 스레드가 데이터를 처리 해줄거라고 생각 했는데 처리가 되지 않았습니다.
왜 그럴까요?
이유는 생산자 3번 스레드가 락을 계속 가지고 있어서 다른 스레드가 접근 하지 못한 것 입니다.
임계영역에 들어 가기 위해선 락이 필요 한데 3번 스레드가 락을 들고 잠에 들었다 깨어서 체크 하고 있는데
다른 스레드는 락이 없어서 임계영역에 들어 갈 수 없었습니다.
이 문제는 어떻게 할까요?
해답은 Object 클래스에 있습니다. 자바는 멀티 스테드를 염두한 언어기 때문입니다.
Object 클래스는 락과 관련된 두 메서드를 제공 합니다.
- Object.wait() : 락을 가진 상태에서 호출 가능 하며, 락을 반납하고 스레드 대기열에 waiting 상태로 대기 한다.
- Object.notify() : 스레드 대기열의 스레드를 하나 깨운다.
만약 버퍼가 가득 찼다면 wait를 호출 해서 대기 집합에 들어가고
업무를 완료하면 대기 집합 스레드를 깨우면 되겠네요!
@Override
public synchronized void put(String data) {
while (queue.size() == max){
log("[put] 큐가 가득 참 생산자 대기 ");
try {
wait(); //Runnable -> Waitting 락 반납
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
notify(); //대기 스레드 Wait -> Blocked
}
3번 스레드는 버퍼가 가득차서 스레드 대기 집합에서 waiting 상태로 대기 중입니다.
이제 소비자 스레드를 실행해 보겠습니다.
소비자 1번 스레드가 임계영역에 들어와 데이터를 처리 했습니다.
2,3번 스레드는 락을 획득 하지 못해 락 대기집합에서 blocked 상태로 있습니다.
소비자 스레드 1번이 업무를 완료 했습니다.
이때, 락을 반납 하기 전에 notify()를 통해 스레드 대기 집합의 스레드 하나를 깨웁니다.
생산자 스레드가 깨어 났습니다. 스레드를 깨우면 blocked 상태가 됩니다. ( synchronized 에서만 )
이후 락 대기 집합에서 락을 얻는 것을 시도 하고 임계영역에 들어 옵니다.
다른 스레드들이 먼저 실행 되는 것이 맞지만
편의상 3번 생산자 스레드가 락을 얻었다고 하겠습니다. 생산자 3번 스레드는 데이터를 버퍼에 넣고 종료 됩니다.
2번, 3번 소비자 스레드의 업무가 수행 되어 버퍼를 모두 비우고 정상적으로 종료 되었습니다.
잘 동작된 것 처럼 보이지만 사실 synchronized는 비효율적인 부분이 있습니다.
스레드가 업무를 처리 한후 항상 notify()를 호출 하는점 때문입니다.
그림에서는 2번 생산자 스레드가 작업을 완료 한 후 1번 스레드를 깨웁니다.
생산자 1번 스레드는 락을 얻어 다시 임계영역에 들어 왔습니다.
그런데 버퍼가 가득차 다시 스레드 대기 집합으로 가게 됩니다. 이러한 행동은 불필요한 행동 입니다.
소비자 스레드는 생산자만 깨우고 생산자는 소비자 스레드만 깨울순 없을까요?
문제의 해결방안인 ReentrantLock을 설명 하기 전에 synchronized에 대해 정리 해보겠습니다.
synchronized는 2가지 비효율 적인 문제가 있습니다.
- 락 대기 집합에서 Blocked 상태 존재 -> 깨울수 없습니다.
- 스레드 대기 집합을 비효율적으로 깨우는 문제
- 스레드 대기 집합 순서가 ( 1, 2, 3, 4 ) 라면 ( 4, 2, 1, 3 ) 으로 깨울 수 있습니다.
ReentrantLock은 두 문제를 해결 했습니다.
ReentrantLock은 자바에서 제공 하는 락, 스레드 대기 집합을 사용 하지 않고 구현 하였습니다.
- 스레드 대기 집합을 Condition 이라 표현 하며 생산자, 소비자 대기 집합으로 나눌 수 있습니다.
- 공정 모드를 제공 하며 new ReentrantLock(true) 대기 순서를 보장 합니다.
- Object의 wait(), notify() 를 대신 하는 await(), signal() 를 제공 합니다.
- 락 대기 집합에서 Blocked 상태가 아닌 Waiting 상태로 기다리므로 interrupt, unpark 깨울 수 있습니다.
이전 코드를 ReentrantLock으로 바꿔 보겠습니다.
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition();
private final Condition consumerCond = lock.newCondition();
private final Queue<String> queue = new ArrayDeque<>();
public final int max;
public BoundedQueueV5(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try{
while (queue.size() == max){
log("[put] 큐가 가득 참 생산자 대기 ");
try {
producerCond.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
consumerCond.signal();
}finally {
lock.unlock();
}
}
@Override
public synchronized String take() {
lock.lock();
try{
while(queue.isEmpty()){
log("[take] 큐에 데이터가 없음 대기");
try {
consumerCond.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
producerCond.signal();
return data;
}finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
ReentrantLock를 사용하여 synchronized의 단점도 해결하고 생산자 소비자 문제도 해결 했습니다.
그런데 매번 이렇게 소비자 생산자 문제를 해결 하기 위해 Reentrantlock을 사용 하여 버퍼를 구현 하고 해야 할까요?
자바에선 BlockingQueue 구현체를 제공 합니다. 내부에는 Reentrantlock이 있습니다.
위에서 만든 버퍼 처럼 동작 하는게 BlockingQueue 였던거죠!
위 글을 김영한 선생님 실전 자바 고급 1편을 보고 요약 한 것입니다.
자세한 내용이 궁금 하시다면 강의를 추천 드려요!
(여담 + 그림은 제가 그렸습니다. 영한님이 왜 그림 그리는게 쉽지 않다고 하셨는지 알겠습니다. ㅠㅠ
영한님 그림은 저 보다 4배 정도 많습니다. 강의 많이 들어 주세요! )
'Java' 카테고리의 다른 글
[Java] enum 왜 쓰는 걸까? (0) | 2024.11.27 |
---|---|
[Java] LockSupprot, ReentrantLock (1) | 2024.11.19 |
[Java] 메모리 가시성, 임계 영역 (4) | 2024.11.15 |
[Java] Join, Interrupt, Yield (0) | 2024.11.12 |
[Java] 스레드의 생성과 생명주기 (0) | 2024.11.10 |