# 线程池

  • Callable 相当于 Runnable + Return
  • Future 存储执行的将来才会产生的结果
  • FutureTask Future + Runnable

# Executor

Executor 接口如下

public interface Executor {

    /**
     * Executes 执行器, 执行的是一个可运行的线程
     * 实现了定义线程和执行线程的分离
     */
    void execute(Runnable command);
}
1
2
3
4
5
6
7
8

# Callable

@FunctionalInterface
public interface Callable<V> {
    /**
     * Callable 和 Runnable 比较类似, 但是 Callable 可以有返回值
     */
    V call() throws Exception;
}
1
2
3
4
5
6
7

简单使用示例:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    Callable<String> c = new Callable() {
        @Override
        public String call() throws Exception {
            return "Hello Callable";
        }
    };

    ExecutorService service = Executors.newCachedThreadPool();
    Future<String> future = service.submit(c); //异步

    System.out.println(future.get());//阻塞

    service.shutdown();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# FutureTask

简单使用示例

public static void main(String[] args) throws InterruptedException, ExecutionException {
	
	FutureTask<Integer> task = new FutureTask<>(()->{
		TimeUnit.MILLISECONDS.sleep(500);
		return 1000;
	}); //new Callable () { Integer call();}
	
	new Thread(task).start();
	
	System.out.println(task.get()); //阻塞
}
1
2
3
4
5
6
7
8
9
10
11

# CompletableFuture

// TODO 需要了解的重点

# ThreadPoolExecutor

线程池执行器

线程池维护了两个集合, 一个是线程的集合, 另一个是任务的集合.

# 定义方式

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
}
1
2
3
4
5
6
7
8

# 参数含义

# int corePoolSize

核心线程数, 核心线程的数量是不会归还的.

# int maximumPoolSize

最大线程数, 可扩展的最大数量.

# long keepAliveTime

生存时间, 归还给操作系统的时间.

# TimeUnit unit

生存时间的单位, 归还给系统的时间的单位.

# BlockingQueue<Runnable> workQueue

任务队列, 各种 BlockingQueue 都可以使用.

# ThreadFactory threadFactory

线程工厂, 可以使用自己定义的方式去产生线程.

# RejectedExecutionHandler handler

拒绝策略, 线程都被占用, 并且队列都排满了以后的拒绝策略. 实际应用中, 一般使用自己实现的拒绝策略.

JDK 默认提供了 4 种

  • Abort: 抛出异常
  • Discard: 扔掉, 不抛出异常
  • DiscardOldest: 扔掉排队时间最久的
  • CallerRuns: 调用者处理任务(任务提交者)

# 变种线程池

# SingleThreadPool

单线程的线程池

源码

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}
1
2
3
4
5
6

可以看出 SingleThreadExecutor 就是一中 ThreadPoolExecutor

  • 核心线程数为 1,
  • 最大线程数为 1,
  • 任务队列为 LinkedBlockingQueue, 队列长度为 Integer.MAX_VALUE

# CachedThreadPool

缓存线程池

源码

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}
1
2
3
4
5
  • 核心线程数为 0
  • 最大线程数为 Integer.MAX_VALUE
  • 生存时长为 60s
  • 生存的时间单位为 SECONDS
  • 任务队列为 SynchronousQueue

# FixedThreadPool

固定线程池

源码

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}
1
2
3
4
5
  • 核心线程数和最大线程数都需要传入
  • 任务队列为 LinkedBlockingQueue, 队列长度为 Integer.MAX_VALUE

# ScheduledThreadPool

用于执行定时任务的线程池

源码

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    public ScheduledThreadPoolExecutor(int corePoolSize) {
      	// 其 supper 还是 ThreadPoolExecutor
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
  • 核心线程数为传入值

  • 最大线程数为 Integer.MAX_VALUE

  • 线程队列是 DelayedWorkQueue

    • DelayedWorkQueue 是 ScheduledThreadPoolExecutor 的静态内部内

      static class DelayedWorkQueue extends AbstractQueue<Runnable>
              implements BlockingQueue<Runnable> {}
      
      1
      2
    • 可以设定各多长时间执行

在实际使用中用的并不是特别多, 如果有定时任务, 一般使用定时器框架 Quartz, 或者使用 SpringBoot 封装的 Scheduled(cron)

面试题 假如提供一个闹钟服务, 订阅这个服务的人特别多 (10亿人) 应该怎么优化

  1. 做服务器分发
  2. 使用线程池 + 任务队列

# ForkJoinPool

  • 分解汇总任务
  • 用很少的线程可以执行很多的任务 (子任务) TPE 做不到先执行子任务
  • CPU 密集型
  • 适合把大任务切分成小任务的线程池

[ForkJoinPool的使用示例][./ForkJoinPool的使用示例]

image-20201105232039795

# 定义方式

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {}
1
2
3
4
  • parallelism 并行度级别
  • factory 线程创建工厂
  • handler
  • asyncMode

# WorkStealingPool

  • 每个线程都有自己的任务队列
  • 当自己的任务队列执行完成后, 去其他线程的任务队列中去取任务

实现方式

public static ExecutorService newWorkStealingPool() {
  return new ForkJoinPool
    (Runtime.getRuntime().availableProcessors(),
     ForkJoinPool.defaultForkJoinWorkerThreadFactory,
     null, true);
}
1
2
3
4
5
6

# ParallelStream

流式处理, 就是把一个任务拆分成多个小任务来执行的一种实现方式

使用示例

public static void main(String[] args) {
  List<Integer> nums = new ArrayList<>();
  Random r = new Random();
  for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000));

  //System.out.println(nums);

  // 普通的 forEach 循环形式处理
  long start = System.currentTimeMillis();
  nums.forEach(v->isPrime(v));
  long end = System.currentTimeMillis();
  System.out.println(end - start);

  //使用parallel stream api
  start = System.currentTimeMillis();
  nums.parallelStream().forEach(T13_ParallelStreamAPI::isPrime);
  end = System.currentTimeMillis();

  System.out.println(end - start);
}

static boolean isPrime(int num) {
  for(int i=2; i<=num/2; i++) {
    if(num % i == 0) return false;
  }
  return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 并发 VS 并行

并发(Concurrent) 是指任务的提交

并行(Parallel) 是指任务执行

并行是并发的子集.

上次更新时间: 2020/11/6 上午1:10:21