concurrent并发多线程

读写锁

子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着在回到主线程循环100次,如此循环50次,请写出程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class concurrent {

public static void main(String[] args) {

final Business business= new Business();
new Thread(new Runnable() {

@Override
public void run() {
for (int j = 1; j <= 50; j++) {
business.sub(j);
}
}
}).start();
for (int j = 1; j <= 50; j++) {
business.main(j);
}

}

}

class Business {
private volatile boolean bshouldSub = true;//子线程和主线程通信信号

public synchronized void sub(int j) {
if (!bshouldSub) {
try {
this.wait();
} catch (InterruptedException e) {

e.printStackTrace();
}
}
for (int i = 1; i <= 10; i++) {
System.out.println("子线程内" + i
+ ",第" + j+"个子线程");

}
bshouldSub = false;//运行结束,设置值为FALSE 让主程序运行
this.notify();//唤醒等待的程序

}

public synchronized void main(int j) {
if (bshouldSub) {//如果bshouldsub=true ,等待 让子程序运行
try {
this.wait();
} catch (InterruptedException e) {

e.printStackTrace();
}
}
for (int i = 1; i <= 100; i++) {
System.out.println("内循环 " + i
+ ",第" + j+"个主线程");
}
bshouldSub = true;//让子程序运行
this.notify();//唤醒等待的一个程序
}

不可变对象加锁,锁的不是同一个

i++的例子里,如果是Integer,synchronized锁的对象不是同一个

Lock实现业务级别加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class AccountWithLock {
int balance;
private Lock lock;

public AccountWithLock() {
balance = 0;
lock = new ReentrantLock();
}

public void lockAccount() {
lock.lock();
}

public void unLockAccount() {
lock.unlock();
}

public void login() {
}

public void logout() {
}

public void add() {
balance += 800;
System.out.println("After add balance is:" + balance);
}

public void minus() {
balance -= 800;
System.out.println("After minus balance is:" + balance);
}
}

class AddThreadWithLock extends Thread {
String person;

AccountWithLock acc;

public AddThreadWithLock(String person, AccountWithLock acc) {
this.person = person;
this.acc = acc;
}

public void run() {
acc.lockAccount();
acc.login();
System.out.println(person + " login ");
for (int i = 0; i < 3; i++) {
System.out.println(person + " add money," + i + " cnt");
acc.add();
}
System.out.println(person + " logout ");
acc.logout();
acc.unLockAccount();
}
}

class MinusThreadWithLock extends Thread {
AccountWithLock acc;
String person;

public MinusThreadWithLock(String person, AccountWithLock acc) {
this.person = person;
this.acc = acc;
}

public void run() {
acc.lockAccount();
System.out.println(person + " login ");
for (int i = 0; i < 3; i++) {
System.out.println(person + " minus money," + i + " cnt");
acc.minus();
}
System.out.println(person + " logout ");
acc.logout();
acc.unLockAccount();
}
}

public class LockDemo {
public static void main(String[] args) {
AccountWithLock acc = new AccountWithLock();
Thread add = new AddThreadWithLock("Tom", acc);
Thread minus = new MinusThreadWithLock("Peter", acc);
add.start();
minus.start();
}
}

wait notify 实现库存==1 的生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class ProductData {
//表示被哪个生产者线程生产出来的编号
private int number;
// 标志位,true表示已经消费
private boolean flag = true;

public synchronized void product(int number) {
if (!flag) {
try {
// 未消费等待
wait();
} catch (InterruptedException e) {
}
}
this.number = number;
// 标记已经生产
flag = false;
// 通知消费者已经生产,可以消费
notify();
}

public synchronized int consume() {
if (flag) {
try {
// 未生产等待
wait();
} catch (InterruptedException e) {
// 省略报异常的语句
// …
}
}
// 标记已经消费
flag = true;
// 通知需要生产
notify();
return this.number;
}
}

// 生产者线程
class Producer extends Thread {
private ProductData s;

Producer(ProductData s) {
this.s = s;
}

public void run() {
for (int i = 0; i <= 5; i++) {
s.product(i);
System.out.println("P[" + i + "] Product.");
}
}
}

// 消费者线程
class Consumer extends Thread {
private ProductData s;

Consumer(ProductData s) {
this.s = s;
}

public void run() {
int i;
do {
i = s.consume();
System.out.println("P[" + i + "] Consume.");
} while (i != 9);
}
}

public class ProducerConsumer {
public static void main(String argv[]) {
ProductData s = new ProductData();
new Producer(s).start();
new Consumer(s).start();
}
}

Condition实现库存>1的生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class Store {

private Lock lock;
private Condition notFull;
private Condition notEmpty;

private int maxSize;
private LinkedList<String> storage;

public Store(int maxSize) {
lock=new ReentrantLock();
notFull=lock.newCondition();
notEmpty=lock.newCondition();

this.maxSize = maxSize;
storage = new LinkedList<String>();
}

// 生产方法
public void product() {
lock.lock();
try {
//如果仓库满了
while (storage.size() == maxSize ){
System.out.println(Thread.currentThread().getName()+": wait ");;
//阻塞生产线程
notFull.await();
}
storage.add("Java Book");
System.out.println(Thread.currentThread().getName()+": put:"+storage.size());
Thread.sleep(1000);
//唤醒消费线程
notEmpty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}

// 消费方法
public void consume() {
lock.lock();
try {
//如果仓库空了
while (storage.size() ==0 ){
System.out.println(Thread.currentThread().getName()+": wait");;
notEmpty.await();//阻塞消费线程
}
//取出消费
System.out.println(storage.poll());
System.out.println(Thread.currentThread().getName()+": left:"+storage.size());
Thread.sleep(1000);
notFull.signalAll();//唤醒生产线程
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
}

吸烟者问题

cosproyan.jpg

线程安全的单例模式

懒汉模式 延迟加载double check

volatile并且防止重排序.当volatile的共享变量发生写操作,会写回系统内存,并使其他cpu的缓存失效重新读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class LazySingletonDoubleCheck {

// 用volatile可以禁止2,3重排序 用缓存一致性协议
public volatile static LazySingletonDoubleCheck instance = null;

private LazySingletonDoubleCheck() {
}

public static LazySingletonDoubleCheck getInstance(){
if(instance == null){
synchronized (LazySingletonDoubleCheck.class){
if(instance == null){
instance = new LazySingletonDoubleCheck();
// new其实包括3步
// 1 分配内存
// 2 初始化对象
// 3 将那块内存空间 赋值给instance
// 对于单线程 2,3互换不会改变执行结果,所以多线程里可以重排序
// 多线程 第一个线程先3 instance有了内存空间不是null 但还没初始化
// 第二个线程访问并返回

}
}
}
return instance;
}
}

2.静态内部类

1
2
3
4
5
6
7
8
9
10
11
12
public class StaticInnerClassSingle {
private StaticInnerClassSingle(){
}
// jvm会加初始化索 同步多个线程对一个class的初始化
// 类初始化 哪个线程先拿到innerclass的初始化锁
private static class InnerClass{
private static StaticInnerClassSingle instance = new StaticInnerClassSingle();
}
public static StaticInnerClassSingle getInstance(){
return InnerClass.instance;
}
}

饿汉式

1
2
3
4
5
6
7
8
9
10
public class HungrySingleton {
private final static HungrySingleton instance;
static{
instance = new HungrySingleton();
}
private HungrySingleton(){}
public static HungrySingleton getInstance(){
return instance;
}
}

序列化之后的单例 读取后还是原来的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class HungrySingleton implements Serializable{
private final static HungrySingleton instance;
static{
instance = new HungrySingleton();
}
private HungrySingleton(){}
public static HungrySingleton getInstance(){
return instance;
}
// 添加这个,ObjectInputStream的readObject会通过反射调用
private Object readResolve(){
return instance;
}
}

Promise

例子:FTPClientUtil
客户端尽早调用
注意异常处理。

Future<FTPClientUtil> Promise = FTPClientUtil.newInstance("ftpServer","ftpUserName" , "password");

FTPClientUtil 用静态方法调用私有构造函数,并交给执行线程,返回一个凭证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class FTPClientUtil {
// 实现一个线程池
private volatile static ThreadPoolExecutor threadPoolExecutor;
static{
threadPoolExecutor = new ThreadPoolExecutor(1,
Runtime.getRuntime().availableProcessors() * 2,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
},new ThreadPoolExecutor.CallerRunsPolicy());
}


private final FtpClient ftp = FtpClient.create();

private FTPClientUtil(){}
//Promisor.compute
public static Future<FTPClientUtil> newInstance(final String ftpServer,final String userName,final String password){
Callable<FTPClientUtil> callable = new Callable<FTPClientUtil>() {
@Override
public FTPClientUtil call() throws Exception {
FTPClientUtil self = new FTPClientUtil();
self.init(ftpServer,userName,password);
return self;
}
};
// task 相等于promise
final FutureTask<FTPClientUtil> task = new FutureTask<FTPClientUtil>(callable);

// executor
threadPoolExecutor.execute(task);
return task;
}
}

客户端尽晚调用Promise.get()

synchronized

两个用法:对象锁、类锁

对象锁:

方法锁(默认对象为this) 、同步代码块锁(自己指定锁对象)

类锁:修饰静态方法 或 指定锁为Class对象


java每个对象有内部锁。并且该锁有一个内部条件

1
2
while (accounts[from] < amount)wait();
notifyAll();

  • final 匿名内部类中只能使用final?

java 线程状态

线程停止

下面能让线程停止执行的有
正确答案: A B D 你的答案: B C D (错误)
A.sleep();
B.stop();
C.notify();
D.yield();

线程终止

下面那些情况可以终止当前线程的运行?B
A.当一个优先级高的线程进入就绪状态时
B.抛出一个异常时
C.当该线程调用sleep()方法时
D.当创建一个新线程时

Atomic和synchronized

下面关于Atomic(比方说AtomicLong)和synchronized关键字的说法哪些是对的?
正确答案: A B 你的答案: A B (正确)
A.两者都可以用于线程同步
B.synchronized关键字用于同步一段代码,而atomic用于同步某个状态
C.两者都可以用于同步一段代码
D.多个Atomic在代码中嵌套使用时可能会产生死锁

需要多少资源

为了解决进程间的同步和互斥问题,通常采用一种称为信号量机制的方法。若系统中有7个进程共享若干个资源R,每个进程都需要6个资源R,那么使系统不发生死锁的资源R的最少数目是()
k(n-1)+1 = 36

https://blog.csdn.net/javazejian/article/details/72828483

多线程活跃性问题:死锁 饥饿 活锁

活锁:主动释放资源,资源不断在线程中跳动,没有一个线程可以拿到资源正常执行。

并发级别:阻塞、无饥饿、无锁、无等待

阻塞:synchronized 和重入锁
无障碍Obstruction-Free:都可以修改临界区,如果数据竞争就回滚。通过“一致性标记”实现。
无饥饿:公平优先级。
无锁:有一个线程限步内完成操作,离开临界区。线程无穷循环尝试修改共享变量。总有一个线程可以胜出。
无等待:所有线程有限步,不会饥饿。RCU(read-copy-update)。读线程无等待。写副本,合适时机写回。

Gustafon 如果可被并行的代码足够多,那么加速比就能随CPU数量线性增长

F是只能串行的比例
Gustafon = n-F(n-1)

32系统的long(64位)读写线程有干扰

happen-before.jpg

ThreadLocal

可以看成是 Map<Thread,T> 特定于该线程的值

Cache伪共享(False Sharing):多线程读同一cache line的不同变量,变量无关却要线程同步。

X86 cpu的cache line长64字节如果有一对象有成员变量long a,b,c(共24字节)
则可能加载在一个cache line中。
当 CPU1:线程1和cpu2:线程2 都从内存中读取这个对象放入自己的cache line1和2。
当线程1写a则2上的cache line变成Invalid,当2要读b需要重新从内存中读。
本来无关的两个线程,并行变成串行。

  • 解决方法:
    将a,b分到不同的cache line 采用@Contended

https://www.jianshu.com/p/7f89650367b8

redis 10w OPS

Disruptor

https://tech.meituan.com/disruptor.html
http://ifeve.com/locks-are-bad/

qucik start

1.工厂Event类,用于创建Event类实例对象

1
2
3
4
5
6
7
8
9
10
11
12
//走IO 要序列化接口
public class OrderEvent {
private long value;
}
public class OrederEventFactory implements EventFactory<OrderEvent> {

@Override
public OrderEvent newInstance() {
// 返回空的数据对象
return new OrderEvent();
}
}

2.监听事件类,处理数据(Event类)

1
2
3
4
5
6
7
8
// 处理数据 就是消费者
public class OrderEventHandler implements EventHandler<OrderEvent> {

@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("消费者"+event.getValue());
}
}

3.实例化Disruptor,配置,编写核心组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Main {
public static void main(String[] args) {
OrederEventFactory factory = new OrederEventFactory();
int ringBufferSize = 1024 * 1024;
// 处理器核数
int p = Runtime.getRuntime().availableProcessors();

// 线程池 固定大小的,超过线程数量会放到LinkedBlockingQueue<Runnable>()等待
ExecutorService execotr = Executors.newFixedThreadPool(p);
/**1 实例化Distruptor对象
* Event工厂,容器长度,线程池(一般自己实现 实现RejectedExecutionHandler) ,单生产者,等待策略
*/
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
factory, ringBufferSize,execotr, ProducerType.SINGLE, new BlockingWaitStrategy()
);
// 2.添加消费者类的监听
disruptor.handleEventsWith(new OrderEventHandler());

// 3启动
disruptor.start();

//4.实际存储数据的容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();

OrderEventProducer producer = new OrderEventProducer(ringBuffer);

ByteBuffer bb = ByteBuffer.allocate(8);
for (long i = 0; i <100 ; i++) {
bb.putLong(0, i);
producer.sendData(bb);
}


disruptor.shutdown();
execotr.shutdown();
}
}

4.生产者,向Disruptor容器投递数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class OrderEventProducer {
RingBuffer<OrderEvent> ringBuffer;

public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}

public void sendData(ByteBuffer data){
// 获取下一个可用的序号
long sequence = ringBuffer.next();

try {
// 空对象
OrderEvent event = ringBuffer.get(sequence);
event.setValue(data.getLong(0));
}finally {
// 发布出去 消费者会监听到
ringBuffer.publish(sequence);
}
}

}

运行:输出 消费者消费了100条消息

RingBuffer

1首尾相接的环,
2有序号 指向数组中下一个可用元素,加数据递增

数组长度有限,生产者可能会追上消费者.

基于数组,有sequencer(序号)和策略(WaitStrategy)

Disruptor

有RingBuffer,消费者线程池、消费者集合ConsumerRepository等引用。

Sequence

可以看成是一个AtomicLong 用于标识进度。可以消除CPU伪共享。
事件处理过程是沿着序号逐个递增处理,所以原子性 可以多线程并发访问。
协调RingBuffer/Producer/Consumer的处理进度。

Sequencer 接口

Distruptor的核心
有两个实现类SingleProducerSequencer,MultiProducerSequencer
实现生产者消费者之间传递数据的并发算法。

Sequence Barrier

用于保持对RingBufferMain Published Sequence(生产者)和消费者之间的平衡。
判断消费者的处理类。

WaitStrategy 接口 消费者如何等待的策略

waitstrategy1.jpg
waitstrategy2.jpg

主要策略
BlockingWaitStrategy 阻塞
SleepingWaitStrategy 休眠 异步日志
YieldingWaitStrategy 线程切换

EventProcessor 主要事件循环

最重要的实现类BatchEventProcessor 回调EventHandler接口的实现对象

EventHandler 由用户实现

WorkProcessor 多消费者

保证一个sequence只能被一个消费者消费

多生产多消费者的Disruptor图解

生产者共用一个Sequence
每个消费者有sequence
每个消费者通过Barrier和生产者的Sequence协调
distruptor.jpg


核心链路

PV和QPS估计

每天300w PV 80%会在24小时的20%的时间里
$3000 000*0.8)/(86400*0.2*)=139(QPS)$
如果一台机器QPS是58,则需要139/58=3台机器

并发模型

1.进程&线程Apache C10K问题
2.异步非阻塞 Nginx Libevent Nodejs 回调复杂度高
3.协程Golang Erlang Lua

TreeMap是非线程安全的。
跳表:
数据大时性能高于红黑树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private transient volatile HeadIndex<K,V> head;
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}

static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;}

static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;}

ConcurrentMap的实现类

ConcurrentSkipListMap跳表 redis的实现方法

1
ConcurrentSkipListMap<Dish.Type, Double> collect = menu.stream().collect(Collectors.groupingByConcurrent(Dish::getType, ConcurrentSkipListMap::new, Collectors.averagingInt(Dish::getCalories)));

ConcurrentHashMap 读不加锁

HashMap的迭代器用modCount 不允许读的时候修改。
HashMap的node
static class Node<K,V> implements Map.Entry<K,V>
HashTable的entry
private static class Entry<K,V> implements Map.Entry<K,V>
ConcurrentHashMap 的entry
value是可以改的
value可能是null

1
2
3
4
5
static final class MapEntry<K,V> implements Map.Entry<K,V> {
final K key; // non-null
V val; // non-null
final ConcurrentHashMap<K,V> map;
}

V是volatile 线程同步

1
2
3
4
5
6
7
8
/*这个类不会被暴露出去当用户可变的 Map.Entry 
*但是可以用于只读遍历
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;}

线程池ThreadPoolExecutor

1
private final BlockingQueue<Runnable> workQueue;

工作队列(阻塞队列)

保护性暂时挂起模式

线程池中线程数量过多,会竞争浪费事件再上下文切换。
线程池大小与处理器利用率之比:
$N_{threads}=N_{CPU}处理器核数*U_{CPU}期望的cpu利用率(0-1)*(1+W/C等待时间与计算时间的比率)$

Stream在背后引入Fork/join框架

Future接口

目标:实现并发,充分利用cpu的核,最大化程序吞吐量,避免因为等待远程服务返回/数据库查询。阻塞线程。
对计算结果的建模,返回一个运算结果的引用给调用方。

1
2
3
4
5
6
7
8
9
10
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
@Override
public Double call() throws Exception {
//异步操作
return doSomeLongComputation();
}
});
//异步操作运行同时也能执行
doSomeThingElse();

Fork/Join

工作窃取算法

发布对象:可以得到成员变量引用
对象溢出:内部类

AQS 两个node的队列

多个CPU缓存一致性 MESI缓存一致性!!!

mesi.jpg

4种数据状态,4种状态转换的cpu操作。
M(Modified)被修改:只缓存在该CPU的缓存中,被修改,与主存不一致。写回主存
Exlusive独享:缓存行只在该CPU的缓存中,未被修改,与主存一致。其它CPU读取内存时变成S状态。被修改则变成M。
Share共享:该缓存行被多个CPU缓存,且与主存相同。当一个CPU修改时其它CPU的变成I。
Invaild无效。

local read读本地缓存中数据local write写本地缓存
remote read读取内存数据 remote write写回主存


NUMA架构:内存分割,被CPU私有化:一致性协议MESIF:目录表

message pack

msgpack


并发模拟

  1. PostMan-runner选中测试的接口iteration(并发多少次)delay(每次延迟多少)
  2. 安装apahce服务器bin下的ab -n 1000 -c 50 http://localhost:8080/test
    本次测试请求总数1000次,同时并发数50
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 并发量
Concurrency Level: 50
整个测试所用的时间
Time taken for tests: 0.667 seconds
完成的请求数
Complete requests: 1000
失败请求数
Failed requests: 0
所有响应数据长度总和,包括http头信息和正文数据长度,不包括http请求信息的长度
Total transferred: 136000 bytes
所有正文数据长度
HTML transferred: 4000 bytes
吞吐率(与并发数相关)=Complete requests:/Time taken for tests
Requests per second: 1498.52 [#/sec] (mean)
用户平均请求等待时间
Time per request: 33.366 [ms] (mean)
服务器平均请求等待时间
Time per request: 0.667 [ms] (mean, across all concurrent requests)
单位时间从服务器获取的数据长度=Total transferred/Time taken for tests
Transfer rate: 199.02 [Kbytes/sec] received
  1. JMeter
    1. 添加线程组File-Test Plan-Add-Threads- Thread Group
      用户数:50
      虚拟用户增长时长(Ramp-Up Period): 1
      Loop Count循环次数:一个虚拟用户做多少次测试 20(共1000次)
    2. 添加实例请求 add Sanper HttpRequest
    3. 添加监听器 图形结果、查看结果树
    4. Option 打开Logviewer
      展开代码

      Throughput吞吐量
      jmeter.jpg
      viewtree.jpg

用代码并发模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ExecutorService executorService = Executors.newCachedThreadPool();
//同时的并发数
final Semaphore semaphore = new Semaphore(threadTotal);
//请求完之后统计结果 传入请求总数
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {//超过了并发数add会被阻塞
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}//闭锁 每执行完一次-1
countDownLatch.countDown();
});
}//保证闭锁到0再执行
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}

通常与线程池一起使用

同步器Semaphore信号量 阻塞线程 控制同一时间请求并发量

  • 适合控制并发数
  • Semaphore(int count)创建count个许可的信号量

每个线程:

1
2
3
//public void run
semaphore.acquire();//获取1/num个许可证
semaphore.release();//释放许可

Semphore(2)则
A,B,C三个线程,A执行完后C才能开始执行。


CountDownLatch()计数栓:必须发生指定数量的事件后才可以继续使用
阻塞线程,直到满足某种条件线程再继续执行,计数值(count)实际上就是闭锁需要等待的线程数量

  • 适合保证线程执行完再做其它处理
    countdown.jpg
  • 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
1
2
3
void run(){}
//主线程必须在启动其他线程后立即调用CountDownLatch.await()方法
CountDownLatch.await();}//等待锁存器
  • 线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。
1
2
3
4
5
//倒计时为0执行
main{
new CountDownLatch(3);
CountDownLatch.countDown();//触发事件
}

面试题
解释一下CountDownLatch概念?
CountDownLatch 和CyclicBarrier的不同之处?
给出一些CountDownLatch使用的例子?
CountDownLatch 类中主要的方法?


汇编 jne有条件跳转 jmp无条件跳转
进程-详细-设置相关性:分配到指定cpu执行,开的线程只在指定的执行
java会把线程直接映射到操作系统

threadstate.jpg

javac xxx.java->.class
javap -c -v xxx 查看虚拟机字节码

Condition

1
2
3
4
5
6
7
private Condition sufficientFunds;
if (accounts[from] < amount)
//将该线程放到等待集
sufficientFunds.await();
try{////最后。账户发生变化,重新检查余额
sufficientFunds.signalAll();
}

阻塞队列

CAS(compareAndSwap)

AtomicReference<V>模板,可以封装任何
对数据加上时间戳解决ABA过程状态敏感问题(充值10,20,花费10
if (money.compareAndSet(m, m + 20, timestap, timestap + 1))
Pair<V> current = pair;
第i个元素在数组中的偏移量

1
2
3
private static long byteOffset(int i) {
return ((long) i << shift) + base;//左移2,00乘4
}

  1. shift = 31 - Integer.numberOfLeadingZeros(scale); 29个前导0->shift=2
    前导零:数字转换成二进制数后前面0的个数
  2. 数组当中每个元素有多宽:int scale = unsafe.arrayIndexScale(int[].class); 4
  3. private static final int base = unsafe.arrayBaseOffset(int[].class); int的话4个byte

静态工厂方法

casPair使用cas的方式更新

1. .start开启新线程调用run .run不开启新线程

Java中Thread类中的start()方法和run()方法有什么区别?
A.start()方法创建一个线程,并内部调用run()

两种创建方法1.传入一个runnable对象 2.覆盖run
Thread:

1
2
3
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}

1
2
3
4
5
6
@Override
public void run() {
if (target != null) {
target.run();
}
}

2. stop不建议使用,会释放所有的锁(monitor)

  1. 实例方法.interrupt()在run()中处理

    1
    2
    3
    4
    5
    6
    public void run(){
    while(true){
    if(Thread.currentTread().isInterrupted()){break;}
    }
    Thread.yeild();
    }
  2. Thread.sleep(2000)异常处理 sleep会释放cpu时间片,不释放监视器所有权。让给其它线程
    在外部对这个sleep的线程中断会抛出异常
    .isInterrupted方法可以清除中断状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
     while(true){
    if(Thread.currentTread().isInterrupted()){break;}
    }
    try{
    Thread.sleep(2000);
    }catch(InterruptedException e){
    //设置中断状态,抛出异常后会清除中断标记位
    Thread.currentTread.interrupt();
    }
    Thread.yeild();
    }
  3. 用自定义标记抛出异常

3. suspend & resume 已弃用

If the target thread holds a lock on the monitor protecting a critical system resource when it is suspended, no thread can access this resource until the target thread is resumed.
If the thread that would [resume] the target thread attempts to lock thismonitor prior to calling resume, deadlock results. Such deadlocks typically manifest themselves as “frozen” processes.

线程2resume线程1发生在线程1suspend之前,当线程1suspend之后没办法resume,导致线程1资源冻结。

测试代码点击显/隐内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package learnThr;

public class learnThread {
public static Object u = new Object();
static ChangeObjectThread t1 = new ChangeObjectThread("t1");
static ChangeObjectThread t2 = new ChangeObjectThread("t2");

public static class ChangeObjectThread extends Thread{
public ChangeObjectThread(String name){
super.setName(name);
}
@Override
public void run(){
//加锁
synchronized (u){
Thread.currentThread().suspend();
}
}
}
public static void main(String[] args)throws InterruptedException{
t1.start();
Thread.sleep(100);
t2.start();
t1.resume();
t2.resume();
t1.join();
t2.join();
}
}

suspend.jpg

yield & join

A.join(0)等待A结束后执行 用wait实现
join实现

As a thread terminates the this.notifyAll method is invoked.
It is recommended that applications not use wait, notify, ornotifyAllon Thread instances.不要在Thread实例上使用,会影响系统API

wait该线程释放监视器所有权。 在同步方法中使用
虚拟机实现notifyAll,在Object上。结束时会唤醒所有等待线程。

1
2
3
4
5
if (millis == 0) {
while (isAlive()) {
wait(0);
}
}

守护线程t.setDaemon(true);

虚拟机不会管守护线程是否存在,直接退出。

优先级

1
2
high.setPriority(Thread.MAX_PRIORITY);
low.setPriority(Thread.MIN_PRIORITY);

wait & notify

-Object.wait() 线程等待在当前对象上

The current thread must own this object’s monitor.
releases ownership of this monitor and waits until another thread notifies

-Object.notify()通知等待在这个线程上的对象 随机唤醒一个

Wakes up a single threadthat is waiting on this object’s monitor.

1
2
synchronized (object) { object.wait();}
synchronized (object) { object.notify();}

同步

  1. synchronized独占加锁

java.util.concurrent 并发工具包

  1. CyclicBarrier(int num)等待多个线程到达预定点
  2. 执行器
  3. 并发集合
  4. Frok/Join框架:并行
  5. atomic包:不需要锁即可完成并发环境变量使用的原子性操作
  6. locks包