Loading... # synchronized与AbstractQueuedSynchronizer ## synchronized synchronized是排它锁,是jvm内置的加锁方式,本质上是创建一个对象,在对象之上添加标识,来确定是否有其他线程获得了锁。 ### synchronized实现方式 在jdk1.5之前,synchronized每次都需要向操作系统申请重量级锁,效率很低。从jdk1.5开始,引入了锁升级的概念。 ### 锁升级 1. 无锁 - synchronized方法创建之后,如果没有任何线程调用过,此时是无锁状态。 2. 偏向锁 - 如果有线程调用,记录下线程id,再次有线程调用时,检查id,若依然是该id,表示同一个线程调用,直接放行。 - 如果检查id不是当前线程id,根据id,判断线程是否存活,如果不存活,锁对象将被设置为无锁状态,新线程可以争抢偏向锁。 - 如果线程存活,则判断线程是否占用锁,如果线程已经释放了锁资源,那么,锁对象将被设置为无锁状态,新线程可以争抢偏向锁,否则,暂停此线程,撤销偏向锁,升级为轻量级锁。 3. 轻量级锁 - 通过CAS尝试获取锁,修改markword,修改成功则表示得到锁,修改失败,则升级为重量级锁。 4. 重量级锁 - 尝试争抢锁,如果获取失败,则进入自适应自旋,当前线程包装成 ObjectWaiter 对象CAS自旋加入到 _cxq 这个单向链表之中,如果push失败再尝试获取一次锁,如果还获取不到,就阻塞当前线程。 synchronized只能升级,不能降级。 ## AbstractQueuedSynchronizer(AQS) AQS的本质就是CAS加volatile,AQS中维护一个state变量,通过volatile保证线程间可见,然后通过CAS尝试修改该值,并且用CAS向等待队列中插入新的等待线程。 ### CAS(CompareAndSwap) CompareAndSwap,比较并置换,它提供了compareAndSet(Object,expect,update)方法。第一个参数为执行CAS的对象,第二个参数为期望的值,第三个参数是修改之后的值。比如说,i++操作,我期望当前值是2,如果是2,我就给他改成3,如果不是2,表示有其它线程修改过。如果此时的值是3,那么,再次将期望设置成3,要改成的值设置成4,如此循环,直到修改成功为止。因此,CAS是通过自旋锁来实现的。 ### ReentrantLock ReentrantLock是AQS提供的排它锁。 #### ReentrantLock与synchronized的区别 1. ReentrantLock通过自旋锁实现,synchronized有锁升级。 2. synchronized由jvm自动完成加锁解锁,ReentrantLock需要手动完成加锁与解锁。 3. synchronized在其他线程未释放锁时,只能等待,而ReentrantLock提供了一系列更加灵活的操作。 - tryLock(),尝试锁定,不管锁定与否,方法都将继续执行,可以根据tryLock的返回值来判定是否锁定。 - lockInterruptibly(),可以对interrupt()方法做出响应,在异常处理中执行逻辑,比如打断该线程。 - new ReentrantLock(true),传一个布尔值,true表示公平锁,默认是false,表示非公平锁。 #### ReentrantLock.lock()原理图 ![ReentrantLock.lock().png][1] #### ReentrantLock线程进入等待队列 ``` private Node addWaiter(Node mode) { Node node = new Node(mode); for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); } } } ``` 可以看出compareAndSetTail这个CAS操作,上来先创建一个oldTail变量,指向tail,如果oldTail不是null,将新节点的前一个节点设置为oldTail,然后通过compareAndSetTail(oldTail, node)方法,期望是oldTail,如果是就将tail设置为新节点,如果不是表示有其他线程修改过,重新循环,直到成功修改为止,返回新的tail。 ``` final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } } ``` 向等待队列添加时,先获取自己的前一个节点,看是不是head节点,如果是,就直接尝试获得锁,因为head有可能已经执行完,释放了锁。如果成功获得,直接将自己设置成head然后去执行。当线程获得锁失败时,就将自己park住,等待前一个节点来唤醒自己。 #### Condition ReentrantLock可以创建多个不同的Condition作为不同的等待队列。普通的同步方法中的,notifyAll()方法会唤醒被该对象锁住的所有线程,而Condition可以让线程去不同的等待队列中等待,唤醒时也可以分队列唤醒。 ***用Condition实现生产消费队列*** ``` public class Test { private LinkedList<Object> list = new LinkedList<>(); private ReentrantLock lock = new ReentrantLock(); //生产者等待队列 private Condition producer = lock.newCondition(); //消费者等待队列 private Condition consumer = lock.newCondition(); private int count = 0; private static final int MAX = 100; public void put(Object o) { lock.lock(); try { while (count >= MAX) {//必须用while不能用if,保证被唤醒时重新检查长度。 producer.await();//list满时,线程进入生产者的Condition中等待。 } list.add(o); count++; consumer.signalAll();//唤醒全部的消费者Condition中的线程 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public Object get() { Object o = null; lock.lock(); try { while (count <= 0) { consumer.await();//list空时,线程进入消费者Condition中等待 } o = list.removeFirst(); count--; producer.signalAll();//唤醒所有生产者Condition中的线程 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return o; } public int getCount() { lock.lock(); try { return count; } finally { lock.unlock(); } } public static void main(String[] args) { Test test = new Test(); for (int i = 0; i < 2; i++) { new Thread(() -> { while (true) { test.put(new Object()); System.out.println(test.getCount()); } }).start(); } for (int i = 0; i < 10; i++) { new Thread(() -> { while (true) { test.get(); } }).start(); } } } ``` 相比于notifAll(),当我取出list中的元素,需要唤醒生产者再次添加的时候,我不用去唤醒正在等待的消费者线程,导致他们再次进入等待状态,造成不必要的开销,我只需要针对性的唤醒所有生产者线程就可以了。 ### CountDownLatch(门闩) ```CountDownLatch latch = new CountDownLatch(2)```创建CountDownLatch时需要传入一个int类型的数值,它对应AQS的state变量,当state为零时,放行。 ```latch.await()```在调用await()方法的地方阻塞住。 ```latch.countDown()```每次调用,state值减1。减为0时,释放阻塞,继续执行。 ### CyclicBarrier(循环栅栏) ```CyclicBarrier barrier = new CyclicBarrier(20, Runnable);```我们最常用的是这种构造方式,传入一个int值和一个线程,int值表示多少个线程到达可以推倒栅栏,线程为栅栏推倒后执行的操作。 ```barrier.await();```在调用该方法的地点阻塞住,直到达到数量的线程都调用了await()方法,则推倒栅栏,放行,放行之后,栅栏竖起,再次等待满足数量的线程到达。 ### Phaser Phaser可以让线程分阶段执行某些操作。 创建一个类,从Phaser继承,重写onAdvance方法,参数phase表示执行的阶段,每次触发arriveAndAwaitAdvance()时会执行,初始为0每次触发加1,registeredParties表示当前有多少线程注册。 ``` class MyPhaser extends Phaser { @Override protected boolean onAdvance(int phase, int registeredParties) { switch(phase) { case 0: System.out.println(1); return false; case 1: System.out.println(2); return false; case 2: System.out.println(3); return false; case 3: System.out.println(4); return true; default: return true; } } } ``` ``` static MyPhaser phaser = new MyPhaser (); public void m1() { phaser.arriveAndAwaitAdvance(); } public static void main(String[] args) { phaser.bulkRegister(7); m1(); } ``` phaser.bulkRegister(7)类似CyclicBarrier,表示有7个线程注册,phaser.arriveAndAwaitAdvance()会在调用地点阻塞,直到所有注册线程都到达时,解除阻塞执行,并触发一次onAdvance()方法,返回false表示还有后续步骤,步数加1,返回true表示所有步骤执行完成不会再触发onAdvance()方法。后续也可以通过phaser.arriveAndDeregister()方法来解除某线程的注册,也可通过phaser.register()方法注册新的线程进来。 ### ReadWriteLock 读写锁,其中读锁是共享锁,写锁是排它锁,适用于多线程,大量读取,少量修改的场景。读取时会加读锁,禁止写入,但是允许其他读线程操作,写入时加写锁,不允许任何其他线程的操作。 ``` ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); Lock readLock = readWriteLock.readLock(); Lock writeLock = readWriteLock.writeLock(); ``` ### Semaphore(信号量) Semaphore允许多个线程同时进行操作。 ``` Semaphore s = new Semaphore(2, true); ``` 构造方法第一个参数表示允许多少个线程同时执行,第二个参数表示是否是公平锁,默认是false,表示非公平。 ```s.acquire()```方法会尝试获取锁,当没有达到设置的最大线程数时,会获取成功,否则阻塞。 ```s.release()```方法释放锁。 Semaphore常用于限流操作。 ### Exchanger Exchanger用于两个线程之间的数据交换。 ``` Exchanger<String> exchanger = new Exchanger<>(); ``` Exchanger会维护一个长度为2的数组,当某一个线程调用```String s = exchanger.exchange("abc")```方法时,会将abc字符串放入数组的0号位置,并且线程阻塞,等待另一个线程调用```String s = exchanger.exchange("def")```方法,将def字符串放入数组的1号位置,并交换两个线程在数组中的引用,两个线程中s的值交换完成,程解除阻塞,继续运行。 ### LockSupport ```LockSupport.park()```方法会阻塞当前线程。 ```LockSupport.unpark(t);```参数是一个线程实例,指定唤醒某个被park()的线程。 实际上底层维护这一个int值,初始为0,当调用park()时,先判断是否大于0,如果大于0,就设置成0,线程不阻塞,当等于0时,线程进入阻塞状态。调用unpark()时,就直接将值设置为1,并解除线程阻塞状态。也就是说,如果先调用unpark(),再调用park()线程也不会阻塞,而是继续执行。 #### 与wait/notify的区别 1. wait/notify必须在synchronized同步代码块中执行,而LockSupport则不需要。 2. notify是随机叫醒一个线程,而LockSupport可以指定唤醒某一个线程。 [1]: https://www.princelei.club/usr/uploads/2020/06/2227669098.jpg Last modification:April 21st, 2021 at 06:23 pm © 允许规范转载