# CAS 无锁优化

CAS(Compare And Swap) 字面意思是比较并且替换, 是一种轻量级的乐观锁.

1.8 之前是 Compare And Swap, 之后是 Compare And Set

# 原理

cas 的方法可以理解为 cas(value, expected, newValue); 此方法中有三个值.

  • value 内存地址
  • expected 期望的值
  • newValue 将要设定的值.

如何保证线程安全?

if V == E
V = NewV
otherwise try again or fail
1
2
3

在修改值前先进行判断, 如果值和期望值一致则修改, 如果不一致(说明有其他线程修改了此值)则重试.

如果在判断和赋值之间被其他线程打断如何处理?

cas 操作是 CPU 的原语支持, 中间不会被打断.

# 实现

由于项目中对某些值加锁的使用频率较高, 所以 Java 就做了一些基于 CAS 设计方式的实现.例如 AtomicBoolean, AtomicInteger, AtomicLong

# AtomicInteger

AtomicInteger 的使用示例

public class AtomicIntegerDemo {

    // 创建 AtomicInteger 对象
    AtomicInteger count = new AtomicInteger(0);
//    Integer count = 0;

    // 定义 m 方法, 用于执行 count 的累加
    void m() {
        for (int i = 0; i < 10000; i++) {
            count.incrementAndGet();
//            count++;
        }
    }

    public static void main(String[] args) {
        AtomicIntegerDemo atomicIntegerDemo = new AtomicIntegerDemo();

        // 定义线程集合
        List<Thread> threadList = new ArrayList<>();

        // 循环创建多个线程
        for (int i = 0; i < 5; i++) {
            threadList.add(new Thread(atomicIntegerDemo::m, "t-" + i));
        }
        // 循环开启线程
        threadList.forEach((thread) -> {
            thread.start();
        });

        threadList.forEach((thread) -> {
            try {
                // 让主程序等待每个线程都执行完后再结束
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 输出最后累加的值
        System.out.println(atomicIntegerDemo.count);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# LongAdder

LongAdder 在大并发的情况下比 synchronizeAtomicXXX 都要快, 因为其内部使用了分段锁. 同时使用这种分段锁思想的还有 ConcurrentHashMap.

# Unsafe(JDK 1.8)

AtomicXXX 底层都是调用的 Unsafe 类中的方法.

  • 其可以直接操作内存
    • allocateMemory putXXX freeMemory pageSize
  • 直接升成类的实例
    • allocateInstance
  • 直接操作类或示例的变量
    • objectFieldOffset
    • getInt
    • getObject
  • CAS 相关操作
    • compareAndSwapObject/Int/Long

# ABA 问题

ABA 问题指的是一个线程操作中, 另一个线程把一个值 A 变成了 B, 之后又把这个值变回了 A. 之后在第一个线程检查值的时候会发现值 A 是期望的值.

基础数据类型不会有问题, 但是如果是对象(引用类型), 其对象中的值有可能发生改变, 所以会有问题.

此问题可以通过加版本号或时间戳的方式解决, Java 中 StampedReference 就是用了这种机制.

# 基于 CAS 的锁

ReentrantLock, CounDownLatch, CyclicBarrier, Semaphore, Exchanger, LockSupport 都是基于 CAS 思想实现的

# ReentrantLock

ReentrantLockLock 接口的实现, Lock 接口的定义如下.

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}
1
2
3
4
5
6
7
8

# 特点

  1. ReentrantLocksynchronize 都是可重录锁.
  2. ReentrantLock 需要手动解锁.
  3. 可以使用 tryLock(), 尝试获取锁.
  4. 默认是一个非公平锁, 可以在初始化时通过参数设置为公平锁.

但是 ReentrantLock 需要手动解锁.使用示例如下

Lock lock = new ReentrantLock();

void m2() {
  try {
    lock.lock();
    System.out.println("m2 ...");
  } finally {
    lock.unlock();
  }
}
1
2
3
4
5
6
7
8
9
10

# 主要方法

# lock()

lock() 方法的作用是获取锁。如果锁已被其他线程获取,则进行等待。

# tryLock()

tryLock() 方法的作用是尝试获取锁,如果成功,则返回 true;如果失败(即锁已被其他线程获取),则返回 false。也就是说,这个方法无论如何都会立即返回,获取不到锁时不会一直等待。

# tryLock(long time, TimeUnit unit)

tryLock(long time, TimeUnit unit) 方法和 tryLock() 方法是类似的,区别仅在于这个方法在获取不到锁时会等待一定的时间,在时间期限之内如果还获取不到锁,就返回 false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true。

# lockInterruptibly()

lockInterruptibly() 方法比较特殊,当通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状态。也就使说,当两个线程同时通过 lock.lockInterruptibly() 想获取某个锁时,假若此时线程 A 获取到了锁,而线程 B 只有在等待,那么对线程 B 调用 threadB.interrupt() 方法能够中断线程 B 的等待过程。由于 lockInterruptibly() 的声明中抛出了异常,所以 lock.lockInterruptibly() 必须放在 try 块中或者在调用 lockInterruptibly() 的方法外声明抛出 InterruptedException

注意:当一个线程获取了锁之后,是不会被 interrupt() 方法中断的。因为本身在前面的文章中讲过单独调用 interrupt() 方法不能中断正在运行过程中的线程,只能中断阻塞过程中的线程。因此当通过 lockInterruptibly() 方法获取某个锁时,如果不能获取到,只有进行等待的情况下,是可以响应中断的。

# unlock()

unlock() 方法的作用是释放锁。

# newCondition()

其返回的结果是一个 Condition, 他就是一个队列, 其本质就是等待队列. 可以用于生产者和消费者中, 只唤醒某一组线程(notify/notifyAll 只能随机唤醒或全部唤醒).

final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;

private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();

public void put(T t) {
  try {
    lock.lock();
    while(lists.size() == MAX) { //想想为什么用while而不是用if?
      producer.await();
    }

    lists.add(t);
    ++count;
    consumer.signalAll(); //通知消费者线程进行消费
  } catch (InterruptedException e) {
    e.printStackTrace();
  } finally {
    lock.unlock();
  }
}

public T get() {
  T t = null;
  try {
    lock.lock();
    while(lists.size() == 0) {
      consumer.await();
    }
    t = lists.removeFirst();
    count --;
    producer.signalAll(); //通知生产者进行生产
  } catch (InterruptedException e) {
    e.printStackTrace();
  } finally {
    lock.unlock();
  }
  return t;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

ReentrantLock 是唯一实现了 Lock 接口的类。

# CountDownLatch

倒计时门栓, 可以用于线程同步. 其作用有点类似于 Thread.join() 但原理不一样.

# CyclicBarrier

循环屏障, 人满发车.

# Phaser

移相器, 是分阶段执行的控制器, 在遗传算法中会使用到.

多阶段的栅栏, 每到一处都要等待其他线程到齐.

# ReadWriteLock

读写锁 -- 共享锁和排他锁

# 要点

对于特定的资源,ReadWriteLock 允许多个线程同时对其执行读操作,但是只允许一个线程对其执行写操作。

ReadWriteLock 维护一对相关的锁。一个是读锁;一个是写锁。将读写锁分开,有利于提高并发效率。

ReentrantReadWriteLock 实现了 ReadWriteLock 接口,所以它是一个读写锁。

读-读线程之间不存在互斥关系。

读-写线程、写-写线程之间存在互斥关系。

# 源码

ReadWriteLock 接口定义

public interface ReadWriteLock {
    /**
     * 返回用于读操作的锁
     */
    Lock readLock();

    /**
     * 返回用于写操作的锁
     */
    Lock writeLock();
}
1
2
3
4
5
6
7
8
9
10
11

# 示例

public class ReentrantReadWriteLockDemo {

    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    public static void main(String[] args) {
        final ReentrantReadWriteLockDemo demo = new ReentrantReadWriteLockDemo();
        new Thread(() -> demo.get(Thread.currentThread())).start();
        new Thread(() -> demo.get(Thread.currentThread())).start();
    }

    public synchronized void get(Thread thread) {
        rwl.readLock().lock();
        try {
            long start = System.currentTimeMillis();

            while (System.currentTimeMillis() - start <= 1) {
                System.out.println(thread.getName() + "正在进行读操作");
            }
            System.out.println(thread.getName() + "读操作完毕");
        } finally {
            rwl.readLock().unlock();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# Semaphore

# 代码示例

public static void main(String[] args) {
    //Semaphore s = new Semaphore(2);
	  // 可以手动控制是否公平, 默认是不公平的, fair 设置为 true 则是公平锁
    Semaphore s = new Semaphore(2, true);
    //允许一个线程同时执行
    //Semaphore s = new Semaphore(1);

    new Thread(()->{
        try {
            // 信号量减一, 表示获得锁
            s.acquire();

            System.out.println("T1 running...");
            Thread.sleep(200);
            System.out.println("T1 running...");

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 信号状态回归, 表示其他人可以继续得到
            s.release();
        }
    }).start();

    new Thread(()->{
        try {
            s.acquire();

            System.out.println("T2 running...");
            Thread.sleep(200);
            System.out.println("T2 running...");

            s.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 主要用途

Semaphore 主要用于限流. 表示最多允许多少个线程同时运行.

# Exchanger

Exchanger 主要用于线程之间通讯.

  1. Exchanger 只能用于两个线程之间进行交换
  2. 如果第二个线程没有进入, 则第一个线程进行等待.

# 例如

游戏中, 两个用户执行装备的交换.

# LockSupport

# 特性

  1. LockSupport 之前, 我们想要阻塞线程, 需要把阻塞加在某把锁上. 如 wait() 就必须在 synchronized 关键字里使用. 而 LockSupport 不需要加在锁上.
  2. 我们想要唤醒线程的时候, 通过 notify/notifyAll 是不能特定唤醒某个线程的, 而 LockSupport 可以做到.
  3. unpark(thread) 可以先于 park(thread) 之前运行.

# 使用示例

public static void main(String[] args) {
    Thread t = new Thread(()->{
        for (int i = 0; i < 10; i++) {
            System.out.println(i);
            if(i == 5) {
                // 执行线程的阻塞
                LockSupport.park();
            }
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    t.start();

//        LockSupport.unpark(t);

    try {
        TimeUnit.SECONDS.sleep(8);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("after 8 senconds!");

    LockSupport.unpark(t);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
上次更新时间: 2020/11/4 下午7:19:24