# Queue

# 接口

image-20201115122941603

# Queue 接口的主要方法

public interface Queue<E> extends Collection<E> {
    /**
     * 将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;
     */
    boolean add(E e);

    /**
     * 将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;
     */
    boolean offer(E e);

    /**
     * 移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;
     */
    E remove();

    /**
     * 移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;
     */
    E poll();

    /**
     * 检索但不删除此队列的头。这个方法与{@link #peek peek}的不同之处仅在于它引发异常
     */
    E element();

    /**
     * 获取队首元素,若成功,则返回队首元素;否则返回null
     */
    E peek();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# BlockingQueue

BlockingQueue 是继承自 Queue 的接口, 其接口中的方法如下.

public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 
     */
    boolean add(E e);

    /**
     * 
     */
    boolean offer(E e);

    /**
     * 一定要放到队列, 如果队列满了就阻塞
     */
    void put(E e) throws InterruptedException;

    /**
     * 
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 一定要获取, 如果队列为空就阻塞
     */
    E take() throws InterruptedException;

    ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

BlockingQueue 阻塞的特点主要体现在 put() 方法和 take(), 队列满了 put() 方法就阻塞, 队列空了 take() 方法就阻塞.

其天生实现了生产者和消费者的模式.

# put()

我一定要往队列中放, 如果队列中满了, 我就阻塞住.

# take()

我一定要往外取, 如果队列中满了, 我就阻塞主.

# 相关实现

# PriorityQueue

具有优先级的队列

其底层实现是通过二叉树实现的.

放入无序的元素, 再取出的时候就会自动排好顺序

# ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改

适用于高并发读写操作,理论上有最高的吞吐量,无界,不保证数据访问实时一致性,Iterator不抛出并发修改异常,采用CAS机制实现无锁访问。

# DelayQueue

其本质上是一个 PriorityQueue

可以按照某种排序规则对队列中的元素进行排序.

其主要用于按时间进行任务调度.

# SynchronousQueue

他并不是用来装东西的

使用示例

public static void main(String[] args) throws InterruptedException {
	BlockingQueue<String> strs = new SynchronousQueue<>();

	new Thread(()->{
		try {
			System.out.println(strs.take());
			System.out.println(strs.take());
			System.out.println(strs.take());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}).start();

	//阻塞等待消费者消费
	strs.put("aaa");
	strs.put("bbb");
	// 这是是不能使用 add 方法的, 因为 SynchronousQueue 的队列长度为 0 
	strs.add("aaa");
	System.out.println(strs.size());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  • 如果没有其他线程 take(), SynchronousQueue 是无法执行 put() 的, 会一直处于等待状态
  • 其不能直接调用 add() 方法, 因为其队列容量为 0
  • 其思想像线程中的 Exchanger
  • 执行的是两个线程之间的交换, 一定要递到对方手里.

# TransferQueue

TransferQueue 是有容量的, 其最大的特点是提供了 transfer() 方法

# transfer()

此方法的特点是, 线程把元素放入队列后, 线程会等待其他线程把元素取走.

使用示例

public static void main(String[] args) throws InterruptedException {
	LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
	
  // 需要先定义消费者, 否则 transfer() 方法会一直处于等待状态
	new Thread(() -> {
		try {
			System.out.println(strs.take());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}).start();
	
	strs.transfer("aaa");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
上次更新时间: 2020/11/17 上午12:35:40