Java 并发编程Semaphore的应用与源码解析

 2023-09-15 阅读 28 评论 0

摘要:What Semaphore标识信号量,允许指定数量的线程同时访问某个资源 How java实现并发的方式。通过以下两部实现信号量: acquire方法用于获得准入许可(如果没有获得许可,则进行等待,直到有线程释放许可而获得许可为止)release用于释放准

What

Semaphore标识信号量,允许指定数量的线程同时访问某个资源

How

java实现并发的方式。通过以下两部实现信号量:

  • acquire方法用于获得准入许可(如果没有获得许可,则进行等待,直到有线程释放许可而获得许可为止)
  • release用于释放准入许可

应用场景

  1. 实现某种资源池限制,类似于数据库连接池
  2. 对容器施加边界,比如一个集合中最多只能添加5个元素
  3. 资源并发访问数量限制
  4. 当作普通的使用(信号量为1时相当于普通的锁 信号量大于1时共享锁)

Semaphore代码示例

import java.util.concurrent.Semaphore;public class SemaphoreDemo implements Runnable {Semaphore semaphore = new Semaphore(5);public static void main(String[] args) {SemaphoreDemo semaphoreDemo = new SemaphoreDemo();for (int i = 0; i < 10; i++) {Thread thread = new Thread(semaphoreDemo);thread.start();}}@Overridepublic void run() {try {// 获得准入许可(如何没有获得成功,则进行等待,直到有线程释放许可而获得该许可为止)semaphore.acquire();Thread.sleep(1000);System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + ", 执行完毕!");// 释放准入许可semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}}
}

示例中,有10条线程对信号量为5的资源进行争用,但每次只有5个线程拿到许可,另外的线程需要等待拿到许可的线程释放许可后才能拿到许可

Semaphore源码解析

java并发编程。关键方法如下:

  1. 构造方法:new Semaphore(5);
  2. 获取许可:semaphore.acquire();
  3. 释放许可:semaphore.release();

接下来我们从这三个方法入手进行源码解析

1. 构造方法:new Semaphore(5)

    public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}

默认构造方法为非公平共享锁,可以通过构造参数fair来选择公平或非公平,类似于ReentantLock

2. 获取许可:semaphore.acquire()

    public void acquire() throws InterruptedException {// 共享式获取AQS的同步状态sync.acquireSharedInterruptibly(1);}

调用的是AQS的acquireSharedInterruptibly方法:

    public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}if (tryAcquireShared(arg) < 0) {doAcquireSharedInterruptibly(arg);}}

其中tryAcquireShared依赖于Sync实现,在Semaphore中有AQS的实现Sync类,方法如下:

        // 尝试获取共享锁protected int tryAcquireShared(int acquires) {for (; ; ) {// 队列中存在等待线程则返回-1if (hasQueuedPredecessors())return -1;int available = getState(); // 可用许可数量int remaining = available - acquires; // 剩余许可数量if (remaining < 0 || compareAndSetState(available, remaining))// 返回可用的余量return remaining;}}

这是FairSync的tryAcquireShared方法,在NonfairSync中,没有hasQueuedPredecessors()判断,其余一样。

在方法中可以看出,最终返回的是剩余的许可数量,有如下几种情况:

  1. 如果剩余许可数量<0,则执行doAcquireSharedInterruptibly方法让线程自旋等待,这里是等待别的线程释放许可后线程被唤醒去尝试获取

        private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 线程进入同步队列final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (; ; ) { // 自旋final Node p = node.predecessor();if (p == head) { // 当前节点的前置节点是AQS的头节点 即自己是AQS同步队列的第一个节点int r = tryAcquireShared(arg); // 再去获取信号量if (r >= 0) {setHeadAndPropagate(node, r); // 退出自旋p.next = null; // help GCfailed = false;return;}}// 判断是否应该挂起该线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {throw new InterruptedException();}}} finally {if (failed) {cancelAcquire(node); // 获取失败 就取消获取}}}
    
  2. 否则就是拿到了许可数量,继续正常执行,不阻塞

3. 释放许可:semaphore.release()

    public void release() {sync.releaseShared(1);}

同样,调用的是AQS的releaseShared方法,看下代码:

    public final boolean releaseShared(int arg) {// 调用AQS实现类的tryReleaseSharedif (tryReleaseShared(arg)) {// 唤醒后续的线程节点doReleaseShared();return true;}return false;}

tryReleaseShared交由子类Sync实现,代码如下:

        protected final boolean tryReleaseShared(int releases) {for (; ; ) {int current = getState(); // 当前信号量许可数int next = current + releases; // 当前信号量许可数+释放的信号量许可数if (next < current)  // overflow 一般不会走进来throw new Error("Maximum permit count exceeded");if (compareAndSetState(current, next)) // CAS更新当前信号量许可数return true;}}

释放许可成功则继续调用AQS的doReleaseShared方法来唤醒后续节点可以来争取许可了

    private void doReleaseShared() {for (; ; ) { // 自旋等待Node h = head;// 有头节点且头节点和尾节点不是同一个if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {// 设置status为0if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {continue; // 循环检查}// 唤醒节点的后续节点unparkSuccessor(h);} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) { //continue; // 失败则继续循环}}if (h == head) {break;}}}

总结

Semaphore使用AQS同步状态来保存信号量的计数器。
acquireSharedInterruptibly会减少计数(获取许可),当计数为非正值的时候阻塞线程,否则不会阻塞线程
releaseShared方法会增加计数(释放许可),在计数不超过信号量限制时会解除线程的阻塞(获取到许可的线程)

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/5/60248.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息