AQS AbstractQueuedSynchronizer的同步框架是什么

技术AQS AbstractQueuedSynchronizer的同步框架是什么这篇文章将为大家详细讲解有关AQS AbstractQueuedSynchronizer的同步框架是什么,文章内容质量较高,因此小编分享给大

本文将详细讲解AQS AbstractQueuedSynchronizer的同步框架是什么,文章内容质量较高,所以分享给大家作为参考。希望大家看完这篇文章后对相关知识有一定的了解。

队列抽象队列同步器(以下称为同步器)是构建锁或其他同步组件的基本框架。

它的主要设计思想是使用一个名为state的int类型成员变量来表示同步状态。AQS的大多数方法都是在这个边缘上运行的。然后内置一个FIFO队列,完成资源获取线程的排队。

AQS主要用于继承模式,并且推荐使用静态内部类,这具有隔离用户和实现者关注领域的优势。

AQS的主要接口

访问或修改同步状态

方法名称描述getState()获取当前同步状态。SetState(int newState)设置当前同步状态。Compareandsetstate (int expect,int update)使用CAS设置当前状态,可以保证状态设置的原子性。

可重写的方法

方法名描述了以独占方式获取同步状态的tryAcquire(int arg)。要实现这种方法,需要查询当前状态,判断同步状态是否满足预期状态,然后通过CAS设置同步状态。TreRelease(int arg)以独占方式释放同步状态,等待获得同步状态的线程将有机会获得同步状态。tryAcquireShared(int arg)以共享方式获取同步状态,并返回大于等于0的值,表示获取成功。相反,失败的tryReleaseShared(int arg)共享释放同步状态是isheldexclusive()当前同步器是否被独占模式的线程占用,一般这个方法表示是否被当前线程独占。这些可重写的方法实际上有默认实现,它们直接引发异常,例如:

protected booleantryacquire(intarg){ 0

thrownewUnsupportedOperationException();

}不将这些方法直接定义为抽象方法的好处是,我们只需要根据需要覆盖其中的一些方法。如果我们想要实现一个排他锁,我们只需要覆盖tryAcquire、treRelease和isheldexclusive。

AQS的模板方法

AQS  AbstractQueuedSynchronizer的同步框架是什么

AQS采用模板法的设计模式大致可以分为三类界面:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。.

AQS的队列同步器

同步队列

同步器依靠内部同步队列(一个FIFO双向队列)完成同步状态管理。当当前线程无法获取同步状态时,同步器会将当前线程和等待状态构造成一个Node,并将其添加到同步队列中,同时阻塞当前线程。当同步状态被释放时,它将唤醒第一个节点中的线程,并使其尝试再次获取同步状态。

同步队列中的节点用于保存未能获得同步状态的线程引用、等待状态、前置和后续节点。它是一个静态内部类。

staticfinalclassNode {

/**

*标记表示节点正在共享模式下等待。

*/

staticfinanodeshared=new node();

/**

*标记表示该节点为。

在独占模式下等待
     */
    static final Node EXCLUSIVE = null;
    /**
     * 取消状态
     */
    static final int CANCELLED = 1;
    /**
     * 后继节点处于等待状态
     */
    static final int SIGNAL = -1;
    /**
     * 表示线线程在等待队列中
     */
    static final int CONDITION = -2;
    /**
     * 共享,表示状态要往后面的节点传播
     */
    static final int PROPAGATE = -3;
    /**
     * CANCELLED 值为1:  在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态后将不会发生变化
     * SIGNAL 值为-1: 后继节点的线程处于等待状态,当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
     * CONDITION 值为-2:  节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点会从等待队列中转移到同步队列中,加入到对同步状态的获取中
     * PROPAGATE 值为-3:  表示下一次共享同步状态将会无条件地被传播下去
     * INITIAL 值为0:          节点的初始状态
     */
    volatile int waitStatus;
    /**
     * 前驱结点,当节点加入同步队列时被设置(尾部添加)
     */
    volatile Node prev;
    /**
     * 后继节点
     */
    volatile Node next;
    /**
     * 获取同步状态的线程
     */
    volatile Thread thread;
    /**
     * 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHAEED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用一个字段
     */
    Node nextWaiter;
    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {    // Used to establish initial head or SHARED marker
    }
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

节点是构成同步队列(等待队列)的基础,同步器拥有首节点(head) 和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的 基本结构如图:

AQS AbstractQueuedSynchronizer的同步框架是什么

  • 为了保证线程安全所以设置尾部节点需要使用compareAndSetTail(Node expect,Node update)方法。

  • 由于只有获取同步状态的线程才能设置头结点,所以没有竞争可以直接使用setHead(Node update)方法。

独占式同步状态获取与释放

独占式同步状态获取

通过调用同步器的acquire(int arg)方法可以获取同步状态。

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node) 方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该 节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的 唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

下面分析一下相关工作。首先是节点的构造以及加入同步队列:

private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// 先假设没有竞争,快速的尝试一下将当前节点设置成尾节点,如果失败再调用enq方法,自旋
	Node pred = tail;
	if (pred != null) {
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}
private Node enq(final Node node) {
	for (;;) {
		Node t = tail;
		if (t == null) { // Must initialize
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
			node.prev = t;
			if (compareAndSetTail(t, node)) {
				t.next = node;
				return t;
			}
		}
	}
}

上述代码通过使用compareAndSetTail(Node expect,Node update)方法来确保节点能够被线 程安全添加。

在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循 环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线 程不断地尝试设置。可以看出,enq(final Node node)方法将并发添加节点的请求通过CAS变 得“串行化”了。

节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自 省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这 个自旋过程中(并会阻塞节点的线程),代码如下:

final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		boolean interrupted = false;
		// 以自旋的方式来获取同步状态
		for (;;) {
			final Node p = node.predecessor();
			// 获取同步状态
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
            // 检查并设置同步状态,然后挂起当前线程
			if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	int ws = pred.waitStatus;
	if (ws == Node.SIGNAL)
		return true;
	if (ws > 0) {
		do {
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}
private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this);
	return Thread.interrupted();
}

在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,原因有两个,如下:

  • 头节点是成功获取到同状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。

  • 维护同步队列的FIFO原则。

节点自旋获取同步状态结构图:

AQS AbstractQueuedSynchronizer的同步框架是什么

独占式同步状态获取流程,也就是acquire(int arg)方法调用流程,如图:

AQS AbstractQueuedSynchronizer的同步框架是什么

独占式同步状态超时获取

独占式超时获取同步态起始就是在独占式获取同步状态的自旋方法体加上了时间判断,每次循环都会判断是否超少,代码如下:

private boolean doAcquireNanos(int arg, long nanosTimeout)
		throws InterruptedException {
	if (nanosTimeout <= 0L)
		return false;
	final long deadline = System.nanoTime() + nanosTimeout;
	final Node node = addWaiter(Node.EXCLUSIVE);
	boolean failed = true;
	try {
		// 在非常短的超时等待无法做到十分精确,所以同步器会直接进入无条件的快速自旋
		for (;;) {
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return true;
			}
			// 检查是否超时
			nanosTimeout = deadline - System.nanoTime();
			if (nanosTimeout <= 0L)
				return false;
			if (shouldParkAfterFailedAcquire(p, node) &&
				nanosTimeout > spinForTimeoutThreshold)
				LockSupport.parkNanos(this, nanosTimeout);
			if (Thread.interrupted())
				throw new InterruptedException();
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

在非常短的超时等待无法做到十分精确,所以同步器会直接进入无条件的快速自旋。

独占式同步状态释放

当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能 够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释 放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。该方法代码如下:

public final boolean release(int arg) {
	if (tryRelease(arg)) {
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);
		return true;
	}
	return false;
}
private void unparkSuccessor(Node node) {
	int ws = node.waitStatus;
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0);
	Node s = node.next;
	if (s == null || s.waitStatus > 0) {
		s = null;
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	// 唤醒头结点的后继节点
	if (s != null)
		LockSupport.unpark(s.thread);
}

独占锁示例

package com.xiaolyuh;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
 * 互斥锁
 *
 * @author yuhao.wang3
 * @since 2019/7/10 17:21
 */
public class MutexLock implements Lock {
    // 使用静态内部类的方式来自定义同步器,隔离使用者和实现者
    static class Sync extends AbstractQueuedSynchronizer {
        // 我们定义状态标志位是1时表示获取到了锁,为0时表示没有获取到锁
        @Override
        protected boolean tryAcquire(int arg) {
            // 获取锁有竞争所以需要使用CAS原子操作
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        @Override
        protected boolean tryRelease(int arg) {
            // 只有获取到锁的线程才会解锁,所以这里没有竞争,直接使用setState方法在来改变同步状态
            setState(0);
            setExclusiveOwnerThread(null);
            return true;
        }
        @Override
        protected boolean isHeldExclusively() {
            // 如果货物到锁,当前线程独占
            return getState() == 1;
        }
        // 返回一个Condition,每个condition都包含了一个condition队列
        Condition newCondition() {
            return new ConditionObject();
        }
    }
    // 仅需要将操作代理到Sync上即可
    private final Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock() {
        return sync.tryRelease(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    @Override
    public void unlock() {
        sync.release(0);
    }
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
    public static void main(String[] args) {
        MutexLock lock = new MutexLock();
        final User user = new User();
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    user.setAge(user.getAge() + 1);
                    System.out.println(user.getAge());
                } finally {
                    lock.unlock();
                }
            }).start();
        }
    }
}

总结: 在获取同步状态时,同步器会维护一个同步队列,获取同步状态失败的线程都会被加入到同步队列中并在队列中进行自旋;移出队列 (或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步 器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

共享式同步状态获取与释放

共享式同步状态获取

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。

AQS AbstractQueuedSynchronizer的同步框架是什么

左半部分,共享式访问资源时,其他共享式的访问均被允许,而独占式访问被 阻塞,右半部分是独占式访问资源时,同一时刻其他访问均被阻塞。

通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态,该方法代码如下:

public final void acquireShared(int arg) {
	if (tryAcquireShared(arg) < 0)
		doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true;
	try {
		boolean interrupted = false;
		for (;;) {
			final Node p = node.predecessor();
			if (p == head) {
				int r = tryAcquireShared(arg);
				if (r >= 0) {
					setHeadAndPropagate(node, r);
					p.next = null; // help GC
					if (interrupted)
						selfInterrupt();
					failed = false;
					return;
				}
			}
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是 tryAcquireShared(int arg)方法返回值大于等于0。可以看到,在doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。

共享式同步状态释放

与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以 释放同步状态,代码如下:

public final boolean releaseShared(int arg) {
	if (tryReleaseShared(arg)) {
		doReleaseShared();
		return true;
	}
	return false;
}

该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线 程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于tryReleaseShared(int arg) 方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程。

共享锁示例

package com.xiaolyuh;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
 * 共享锁,可以允许多个线程同时获得锁
 *
 * @author yuhao.wang3
 * @since 2019/7/12 10:00
 */
public class SharedLock implements Lock {
    private static class Sync extends AbstractQueuedSynchronizer {
        public Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero.");
            }
            setState(count);
        }
        @Override
        protected int tryAcquireShared(int arg) {
            int count = getState();
            if (count > 0 && compareAndSetState(count, count - arg)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return count;
            }
            return -1;
        }
        @Override
        protected boolean tryReleaseShared(int arg) {
            for (; ; ) {
                // 通过循环和CAS来保证安全的释放锁
                int count = getState();
                if (compareAndSetState(count, count + arg)) {
                    setExclusiveOwnerThread(null);
                    return true;
                }
            }
        }
        @Override
        protected boolean isHeldExclusively() {
            return getState() <= 0;
        }
        public Condition newCondition() {
            return new ConditionObject();
        }
    }
    private Sync sync;
    /**
     * @param count 能同时获取到锁的线程数
     */
    public SharedLock(int count) {
        this.sync = new Sync(count);
    }
    @Override
    public void lock() {
        sync.acquireShared(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    @Override
    public boolean tryLock() {
        try {
            return sync.tryAcquireSharedNanos(1, 100L);
        } catch (InterruptedException e) {
            return false;
        }
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }
    @Override
    public void unlock() {
        sync.releaseShared(1);
    }
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
    public static void main(String[] args) {
        final Lock lock = new SharedLock(5);
        // 启动10个线程
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(1000);
                } catch (Exception e) {
                } finally {
                    lock.unlock();
                }
            }).start();
        }
        // 每隔1秒换行
        for (int i = 0; i < 20; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println();
        }
    }
}

关于AQS AbstractQueuedSynchronizer的同步框架是什么就分享到这里了,希望

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/37103.html

(0)

相关推荐

  • 如何在11.2.0.4部署面向Linux的分布式电源

    技术如何进行 11.2.0.4 DG for linux 部署如何进行 11.2.0.4 DG for linux 部署,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能

    攻略 2021年12月24日
  • 化妆品牌子大全,日本有哪些好用的化妆品牌

    技术化妆品牌子大全,日本有哪些好用的化妆品牌日本女生从初中开始很多就开始化妆了化妆品牌子大全,所以整体国家的化妆品还是发展的很不错的,毕竟需求大嘛,而且从贵妇到大众品牌一应俱全,很多大集团都会出不同价位的品牌,以满足不同

    生活 2021年11月1日
  • relocating对Elasticsearch集群的影响是什么

    技术relocating对Elasticsearch集群的影响是什么本篇内容主要讲解“relocating对Elasticsearch集群的影响是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面

    攻略 2021年10月25日
  • 退烧按摩手法图解法,小儿发热推拿方法有哪些

    技术退烧按摩手法图解法,小儿发热推拿方法有哪些小儿发烧推拿手法有什么小儿发热是指小儿体温超过正常范围,可见于多种急、慢性疾病过程中退烧按摩手法图解法。根据小儿发热病因可将其分为外感发热、肺胃实热和阴虚内热,临床可采用按摩

    生活 2021年10月22日
  • 如何连接Flex数据库行

    技术如何连接Flex数据库行这篇文章主要介绍如何连接Flex数据库行,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!Flex应用程序并不直接与一个Flex数据库进行连接,所以你需要使用某种类型的服务

    攻略 2021年12月9日
  • Angular和SAP C4C的事件处理队列分析

    技术Angular和SAP C4C的事件处理队列分析本篇内容主要讲解“Angular和SAP C4C的事件处理队列分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Angul

    攻略 2021年11月10日