最近忙着毕设,要做前端,所以看更多的是React的知识,小后端🐶还是要继续学习总结,不然就要没(tou)时(lan)间继续写了。
Semaphore
中文含义是信号量,它是synchronized的升级版。
synchronized 关键字,代表这个方法加锁,相当于不管哪一个线程(例如线程A),运行到这个方法时,都要检查有没有其它线程B(或者C、 D等)正在用这个方法(或者该类的其他同步方法),有的话要等正在使用synchronized方法的线程B(或者C 、D)运行完这个方法后再运行此线程A,没有的话,锁定调用者,然后直接运行。它包括两种用法:synchronized 方法和 synchronized 块。(度娘解释)
Semaphore主要的作用是控制线程的并发数,如果单纯的使用synchronized是不能实现的。
简单看
1 | Semaphore.java |
Semaphore结构
1 | /** |
两个构造方法:
参数permit是许可的意思,代表在同一时间内,最多允许多少个线程同时执行acquire()和release()之间的代码,Semaphore发放许可的操作是减法操作。
参数fair,表示内部使用的是FairSync(公平锁)或者NonfairSync(非公平锁),表示每次线程获取锁的机会是否是公平的,具体的可以看底层实现,继承自AbstractQueuedSynchronizer,通过state判断是否是加锁状态,state 为0,表示锁未被获取,不为0,表示已被获取。
如何用
第一种,同步的执行一个任务(与Synchronized相似)
1 | //多个线程里,保持一份信号量 |
这种比较简单,就不深入展示了,要看的是在多个线程下如何控制并发数量。
第二种,控制线程并发数量
首先是执行类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class TestService {
//许可证数量为2
private Semaphore semaphore = new Semaphore(2);
public void testMethod() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() +
"begin Time" + System.currentTimeMillis());
//停顿5秒
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() +
"end Time" + System.currentTimeMillis());
} catch (Exception ex) {
ex.printStackTrace();
} finally {
semaphore.release();
}
}
}
创建一个线程池,分配任务:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public static void main(String[] args) throws IOException {
//创建线程池,自己新建一个ThreadFactory,定义线程名字
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(@NotNull Runnable r) {
return new Thread(r, "当前线程哈希值是:" + r.hashCode());
}
});
TestService testService = new TestService();
//分派任务
for (int i = 0; i < 10; i++){
Thread thread = new Thread(new Runnable() {
public void run() {
testService.testMethod();
}
});
poolExecutor.submit(thread);
}
//关闭线程池,等待池中的线程任务执行完毕
poolExecutor.shutdown();
}
执行结果:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24Connected to the target VM, address: '127.0.0.1:60937', transport: 'socket'
当前线程哈希值是:55909012begin Time1521281771135
当前线程哈希值是:922151033begin Time1521281771135
当前线程哈希值是:55909012end Time1521281776136
当前线程哈希值是:922151033end Time1521281776136
当前线程哈希值是:1915058446begin Time1521281776136
当前线程哈希值是:1387228415begin Time1521281776136
当前线程哈希值是:1915058446end Time1521281781140
当前线程哈希值是:1387228415end Time1521281781140
当前线程哈希值是:748658608begin Time1521281781140
当前线程哈希值是:167185492begin Time1521281781140
当前线程哈希值是:167185492end Time1521281786145
当前线程哈希值是:748658608end Time1521281786145
当前线程哈希值是:1937348256begin Time1521281786145
当前线程哈希值是:1358444045begin Time1521281786145
当前线程哈希值是:1937348256end Time1521281791148
当前线程哈希值是:1358444045end Time1521281791148
当前线程哈希值是:331844619begin Time1521281791148
当前线程哈希值是:64830413begin Time1521281791148
Disconnected from the target VM, address: '127.0.0.1:60937', transport: 'socket'
当前线程哈希值是:331844619end Time1521281796152
当前线程哈希值是:64830413end Time1521281796152
Process finished with exit code 0
可以看到,在设定的线程睡眠5秒内,只有两个线程同时执行acquire()和release()之间的逻辑,通过Semaphore控制了线程的并发数量。
其它方法
- acquire(int) : 一次获取多个许可
- acquireUninterruptibly() : 使等待进入acquire()方法,不允许被终止
- tryAcquire() : 尝试地获得一个许可,如果获取不到就返回false,通常与if判断使用,具有无阻塞的特点
- tryAcquire(long timeout, TimeUnit unit) : 多少时间内获取不到许可就放弃
还有很多方法,诸如availablePermits()/drainPermits()/hasQueuedThreads()/getQueueLength()等,感兴趣的话请怒开IDE查看具体实现吧。
CountDownLatch
CountDownLatch也是一个工具类,可以使线程同步的处理上更加灵活,CountDownLatch也是减法操作。
简单介绍
1 | /* A synchronization aid that allows one or more threads to wait until |
举个🌰:
来了一辆小汽车,要等满5个人才开车,来了1个,不开;再来1个,还是不开,最后5个人到齐了,开车开车。
类结构&构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14//这货也是继承AbstractQueuedSynchronizer
private final Sync sync;
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
参数count表示要等待的数量
方法示范
执行类,等待5秒后执行countDown1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class TestService {
private Semaphore semaphore = new Semaphore(1);
private CountDownLatch countDownLatch;
public TestService(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public void testMethod() {
try {
semaphore.acquire();
System.out.println("当前线程是: " + Thread.currentThread().getName() +
" 时间是: " + System.currentTimeMillis());
//等待5秒
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
countDownLatch.countDown();
semaphore.release();
}
}
}
运行类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public static void main(String[] args) throws IOException, InterruptedException {
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(@NotNull Runnable r) {
return new Thread(r, "当前线程哈希值是:" + r.hashCode());
}
});
//设定十个限制
CountDownLatch countDownLatch = new CountDownLatch(10);
TestService testService = new TestService(countDownLatch);
for (int i = 0; i < 10; i++){
Thread thread = new Thread(new Runnable() {
public void run() {
testService.testMethod();
}
});
thread.setName("" + i);
poolExecutor.submit(thread);
}
//关闭线程池,等待池中的线程任务执行完毕
poolExecutor.shutdown();
System.out.println("poolExecutor分发任务结束: " + System.currentTimeMillis());
countDownLatch.await();
System.out.println("CountDown方法结束: " + System.currentTimeMillis());
}
执行日志:1
2
3
4
5
6
7
8
9
10
11
12当前线程是: 当前线程哈希值是:922151033 时间是: 1521287832750
poolExecutor分发任务结束: 1521287832752
当前线程是: 当前线程哈希值是:55909012 时间是: 1521287833755
当前线程是: 当前线程哈希值是:1387228415 时间是: 1521287834759
当前线程是: 当前线程哈希值是:748658608 时间是: 1521287835763
当前线程是: 当前线程哈希值是:167185492 时间是: 1521287836765
当前线程是: 当前线程哈希值是:1937348256 时间是: 1521287837769
当前线程是: 当前线程哈希值是:1358444045 时间是: 1521287838770
当前线程是: 当前线程哈希值是:331844619 时间是: 1521287839774
当前线程是: 当前线程哈希值是:64830413 时间是: 1521287840776
当前线程是: 当前线程哈希值是:653687670 时间是: 1521287841779
CountDown方法结束: 1521287842784
可以看到,在线程池控制1个并发线程,poolExecutor提交任务之后打印日志,但是countDownLatch.await()方法之后的代码,因为count没有减到0,不能执行。
在TestService方法中,每隔一秒执行countDownLatch.countDown()方法,最后十个线程跑完,count减到0,countDownLatch.await()方法之后的代码才可以执行。
方法
- await() : 等待
- countDown() : 计数减一
- await(long timeout, TimeUnit unit) : 在限定时间内进行等待,超过时间返回false
- getCount() : 获取计数count
小结
Semaphore作为信号量,用来控制线程的并发数量,CountDownLatch用来控制线程执行任务的时机也挺不错的。它们两个的理解和使用都比较简单,好了,又填了一个坑,下次继续挖坑和填坑hhh