스레드 풀 설정과 튜닝 시 사용할 수 있는 고급 옵션을 살펴보고 흔히 발생할 수 있는 난관을 헤쳐 나갈 수 있는 방법에 대해서 알아본다.
8.1 작업과 실행 정책 간의 보이지 않는 연결 관계
● Executor 프레임워크를 씀으로써 작업 생성과 실행이 분리된다.
● 일정한 조건을 갖춘 Executor 프레임워크 실행 정책이 필요한 작업에는 다음과 같은 것들이 있다.
- 의존성이 있는 작업 : 활동성 문제가 발생할 수 있음
- 스레드 한정 기법을 사용하는 작업 : 단일 스레드로 동작해야 하는 정책이 있어야 함
- 응답 시간이 민감한 작업 : 스레드 풀 크기에 따라 응답 시간이 달리질 수 있음
- ThreadLocal 을 사용하는 작업 : 스레드 풀 사용 시 값이 재사용될 수 있음
● 스레드 풀은 동일하고 서로 독립적인 다수의 작업 실행 시 가장 효과적
● 실행 시간이 오래 걸리는 작업과 금방 끝나는 작업을 섞어 실행하면 작업 실행이 방해된다.
(풀의 크기가 굉장히 크지 않는 한..)
● 다른 작업에 의존성이 있는 작업 실행 시는 스레드 풀 크기를 충분히 크게 잡아야 한다.
작업이 큐에 대기하거나 등록되지 못하는 상황이 발생하지 않도록
8.1.1 스레드 부족 데드락
● 작업 간에 의존성이 존재할 경우 데드락이 발생할 수 있다.
● 스레드 풀의 크기가 커도 발생할 수 있는 문제임
A, B 스레드가 동시에 시작되었으나 서로 상대가 끝나기를 기다리는 경우 문제가 됌
● 싱글 스레드 풀 사용 시 데드락이 걸리는 예제
8.1.2 오래 실행되는 작업
● 데드락은 발생하지 않는다 해도 오랫동안 종료되지 않는 스레드가 있으면 전체 작업 성능에 영향을 줄 수 있다.
● JDK의 대부분의 블로킹 메소드는 시간 제한이 없는 것과 있는 것이 같이 존재한다.
예) BlockingQueue.put, CountDownLatch.await
● 모든 스레드가 대기 상태에 빠질 수 있기 때문에 시간 제한이 있는 메소드를 사용해서 시간을 초과하면 제대로 실행되지 못했다고 기록하고 일단 종료한 뒤 큐의 맨 뒤에 다시 추가하는 등의 대책을 세울 수 있다.
8.2 스레드 풀 크기 조절
● 스레드 풀 크기를 하드코딩 하기보다는 설정파일이나 Runtime.availableProcessors 메소드로 동적으로 지정되도록 하는게 좋다.
● 특별한 스레드 풀 크기 결정 공식은 없지만, 너무 크거나 적은 극단적인 크기만 아니면 된다.
● 크기 산정 시 컴퓨터 환경, 자원, 작업 동작 방식을 확인해야 한다.
CPU, 메모리, I/O 작업이 많은가?, JDBC 커넥션 개수 등
● CPU를 많이 사용하는 작업은 CPU 개수 + 1 개가 최적의 성능을 발휘함
(CPU 개수에 맞추되 스레드가 멈추는 경우를 대비해 1개를 더 추가)
● I/O 작업, 블로킹 작업이 많으면 스레드가 대기 상태에 들어갈 가능성이 크므로 풀의 크기를 훨씬 크게 잡아야 함
● 원하는 CPU 활용도를 유지할 수 있는 스레드 풀 크기 구하는 공식
스레드 풀 크기 = CPU개수 * 원하는 CPU활용도(0 ~ 1) * (1 + 작업 시간/대기 시간)
- 작업 시간 : 스레드 실행 시 실제 작업되는 시간
- 대기 시간 : 스레드 실행 시 대기하고 있는 시간
예) CPU 개수 : 4개, 원하는 CPU 활용도 : 50%, 작업 시간/대기 시간 : 9/1
4 * 0.5 * (1 + 9) = 20개
● 데이터베이스 연결과 같은 자원 사용에 대한 풀 크기 구하는 법은 다음과 같이 한다.
전체 자원의 개수 / 각 작업에서 필요한 자원의 개수 = 스레드 풀 크기
(간단한 계산 방법이다.)
8.3 ThreadPoolExecutor 설정
8.3.1 스레드 생성과 제거
● Executors 클래스의 newFixedThreadPool 같은 팩토리 메소드 말고 생성자를 직접 호출하여 스레드 풀의 옵션을 조절하여 생성할 수도 있다.
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) { ... }
● 생성자 인자값 설명
int corePoolSize : 코어 스레드 개수, 계속 유지되는 개수
int maximunPoolSize : 최대 스레드 개수
long keepAliveTime : 스레드 미사용 시 제거 대기 시간
TimeUnit unit : keepAliveTime 값 시간 단위
BlockingQueue<Runnable> workQueue : 요청된 작업들이 저장될 큐
ThreadFactory threadFactory : 풀 내 스레드 생성 시 사용될 스레드 팩토리
RejectedExecutionHandler handler : 작업 요청 거부 시 처리할 핸들러
8.3.2 큐에 쌓인 작업 관리
● ThreadPoolExecutor 생성 시 요청 작업을 쌓아둘 큐로 BlockingQueue를 지정하는데, 세 가지 전략을 취할 수 있음
1. 큐 크기에 제한을 두지 않는 방법
newFixedThreadPool, newSingThreadExecutor 사용 시 크기 제한없는 LinkedBlockingQueue가 사용됨
2. 큐 크기를 제한하는 방법
LinkedBlockingQueue의 크기를 제한하여 큐로 사용, 자원 관리 및 안정적인 동작을 위해 사용
3. 작업을 스레드에게 직접 넘겨주는 방법
스레드 수 제한이 거의 없는 상태는 큐에 쌓는 절차를 생략, newCachedThreadPool 사용 시 SynchronousQueue가 사용됨
8.3.3 집중 대응 정책
● 크기가 제한된 큐에 작업이 가득 차면 포화 정책이 동작한다.
● 디폴트 정책은 AbortPolicy이며, 포화 정책은 setRejectedExecutionHandler로 변경할 수 있다.
AbortPolicy : 작업 추가되지 않고 RejectedExecutionException을 던지게 된다.
● 몇 가지 기본 정책들은 ThreadPoolExecutor 클래스에 구현되어 있어 가져가 쓰면 된다.
● 그 밖의 기본 정책
- DiscardPolicy : 추가하려했던 작업을 아무 반응 없이 제거
- DiscardOldestPolicy : 큐에서 가장 오래된 항목 제거 후 추가
- CallerRunsPolicy : 작업을 등록하려 한 스레드에서 작업을 실행하게 함. 따라서 그 시간 동안 스레드 풀의 스레드는 큐의 작업을 처리할 시간을 가지게 된다.
8.3.4 스레드 팩토리
● 스레드 풀에서 스레드 생성 시 ThreadFactory.newThread 로 생성되게 되어있음
- public interface ThreadFactory{
- Thread newThread(Runnable r);
- }
● 스레드 풀 생성 시 지정할 수 있으며, 따로 지정하지 않으면 기본적으로 비데몬, 디폴트로 스레드 생성함.
● UncaughtExceptionHandler 지정, 스레드 네이밍 지정, 커스터마이징한 스레드를 생성해서 쓰려고 할 때 사용하면 됌
예제) 스레드 팩토리에서 MyAppThread 를 생성하여 스레드의 이름, 현재 실행중인 스레드의 개수 등을 확인할 수 있다.
- public class MyAppThread extends Thread {
- public static final String DEFAULT_NAME = "MyAppThread";
- private static volatile boolean debugLifecycle = false;
- private static final AtomicInteger created = new AtomicInteger();
- private static final AtomicInteger alive = new AtomicInteger();
- private static final Logger log = Logger.getAnonymousLogger();
- public MyAppThread(Runnable r){ this(r, DEFAULT_NAME);}
- public MyAppThread(Runnable runnable, String name){
- super(runnable, name + "=" + created.incrementAndGet());
- setUncaughtExceptionHandler(
- new Thread.UncaughtExceptionHandler(){
- public void uncaughtException(Thread t, Throwable e){
- log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
- }
- }
- );
- }
- public void run() {
- boolean debug = debugLifecycle;
- if ( debug) log.log(Level.FINE, "Created " + getName());
- try {
- alive.incrementAndGet();
- super.run();
- } finally {
- alive.decrementAndGet();
- if (debug) log.log(Level.FINE, "Exiting " + getName());
- }
- }
- public static int getThreadsCreated() { return created.get(); }
- public static int getThreadsAlive() { return alive.get(); }
- public static boolean getDebug() { return debugLifecycle; }
- public static void setDebug(boolean b) {debugLifecycle = b; }
- }
8.3.5 ThreadPoolExecutor 생성 이후 설정 변경
Executors 로 생성한 ThreadPool 은 관련된 set 메소드를 사용해서 내부의 설정된 값
(코어 스레드 갯수, 최대 스레드 갯수, 스레드 유지 시간, 스레드 팩토리, 작업 거부 처리 정책)을 변경할 수 있다.
Executors 에는 unconfigurableExecutorService 를 사용해서 설정을 변경할 수 없는 ThreadPool을 생성할 수도 있다.
8.4 ThreadPoolExecutor 상속
● ThreadPoolExecutor는 상속받아 기능을 추가할 수 있다.
● beforeExecute, afterExecute, terminated 등의 hook 메소드가 제공된다.
● 로깅 등에 활용할 수 있으며 afterExecute는 정상 종료/예외 시도 항상 호출된다.
● 처리 순서
1. beforeExecute
2. 작업
3. afterExecute
4. terminated
예제 8.9 : 스레드 풀에 통계 확인 기능 추가
- package com.may.java.concurrent.executor;
- import lombok.extern.slf4j.Slf4j;
- import java.util.concurrent.*;
- import java.util.concurrent.atomic.AtomicLong;
- @Slf4j
- public class TimingThreadPool extends ThreadPoolExecutor {
- private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
- private final AtomicLong numTasks = new AtomicLong();
- private final AtomicLong totalTime = new AtomicLong();
- public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
- }
- protected void beforeExecute(Thread t, Runnable r) {
- super.beforeExecute(t, r);
- log.info(String.format("[beforeExecute] Thread %s: start %s", t, r));
- startTime.set(System.nanoTime());
- }
- protected void afterExecute(Runnable r, Throwable t) {
- try {
- long endTime = System.nanoTime();
- long taskTime = endTime - startTime.get();
- numTasks.incrementAndGet();
- totalTime.addAndGet(taskTime);
- log.info(String.format("[afterExecute] Thread %s: end %s, time=%dns", t, r, taskTime));
- } finally {
- super.afterExecute(r, t);
- }
- }
- protected void terminated() {
- try {
- log.info(String.format("[terminated] Terminated: avg time=%dns", totalTime.get() / numTasks.get()));
- } finally {
- super.terminated();
- }
- }
- public static void main(String[] args) {
- TimingThreadPool timingThreadPool = new TimingThreadPool(4, 6, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
- timingThreadPool.submit(new Callable<String>() {
- @Override
- public String call() throws InterruptedException {
- Thread.sleep(2000);
- return "OK";
- }
- });
- timingThreadPool.shutdown();
- }
- }
8.5 재귀 함수 병렬화
특정 작업을 여러 번 실행하는 반복문이 있을 때 반복 작업이 서로 독립적이라면 병렬화해서 성능을 향상시킬 수 있다.
특히 작업을 개별적인 작업들로 분리해 실행하느라 발생하는 오버헤드가 부담되지 않을 정도의 작업이어야 더 효과를 볼 수 있다.
- void processSequentially(List<Element> elements) {
- for (Element e : elements)
- process(e);
- }
- void processInParallel(Executor exec, List<Element> elements) {
- for (final Element e : elements)
- exec.execute(new Runnable() {
- public void run() { process(e); }
- });
- }
ExecutorService.invokeAll 메소드를 사용하면 한 묶음의 작업을 한꺼번에 등록하고 그 작업들이 모두 종료될 때까지 대기하도록 할 수 있다.
- public<T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
- for(Node<T> n : nodes){
- results.add(n.compute());
- sequentialRecursive( n.getChildren(), results);
- }
- }
- public<T> void paralleRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results){
- for(final Node<T> n : nodes){
- exec.execute(new Runnable(){
- public void run(){
- results.add(n.compute());
- }
- });
- parallelRecursive(exec, n.getChildren(), results);
- }
- }
- public<T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException{
- ExecutorService exec = Executors.newCachedThreadPool();
- Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
- parallelRecursive(exec, nodes, resultQueue);
- exec.shutdown();
- exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
- return resultQueue;
- }
8.5.1 예제 : 퍼즐 프레임웍
'개발 > 병렬 프로그래밍' 카테고리의 다른 글
자바 병렬 프로그래밍 - 11장 성능, 확장성 (0) | 2016.09.04 |
---|---|
자바 병렬 프로그래밍 - 10장 활동성 최대로 높이기 (0) | 2016.09.01 |
자바 병렬 프로그래밍 - 7장 중단 및 종료 (0) | 2016.08.29 |
자바 병렬 프로그래밍 - 6장 작업실행 (0) | 2016.08.16 |
자바 병렬 프로그래밍 - 5장 구성 단위 (0) | 2016.08.12 |