读写锁
子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着在回到主线程循环100次,如此循环50次,请写出程序。
1 | public class concurrent { |
不可变对象加锁,锁的不是同一个
i++的例子里,如果是Integer,synchronized锁的对象不是同一个
Lock实现业务级别加锁
1 | class AccountWithLock { |
wait notify 实现库存==1 的生产者消费者
1 | class ProductData { |
Condition实现库存>1的生产者消费者
1 | class Store { |
吸烟者问题
线程安全的单例模式
懒汉模式 延迟加载double check
volatile
并且防止重排序.当volatile的共享变量发生写操作,会写回系统内存,并使其他cpu的缓存失效重新读。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class LazySingletonDoubleCheck {
// 用volatile可以禁止2,3重排序 用缓存一致性协议
public volatile static LazySingletonDoubleCheck instance = null;
private LazySingletonDoubleCheck() {
}
public static LazySingletonDoubleCheck getInstance(){
if(instance == null){
synchronized (LazySingletonDoubleCheck.class){
if(instance == null){
instance = new LazySingletonDoubleCheck();
// new其实包括3步
// 1 分配内存
// 2 初始化对象
// 3 将那块内存空间 赋值给instance
// 对于单线程 2,3互换不会改变执行结果,所以多线程里可以重排序
// 多线程 第一个线程先3 instance有了内存空间不是null 但还没初始化
// 第二个线程访问并返回
}
}
}
return instance;
}
}
2.静态内部类1
2
3
4
5
6
7
8
9
10
11
12public class StaticInnerClassSingle {
private StaticInnerClassSingle(){
}
// jvm会加初始化索 同步多个线程对一个class的初始化
// 类初始化 哪个线程先拿到innerclass的初始化锁
private static class InnerClass{
private static StaticInnerClassSingle instance = new StaticInnerClassSingle();
}
public static StaticInnerClassSingle getInstance(){
return InnerClass.instance;
}
}
饿汉式
1 | public class HungrySingleton { |
序列化之后的单例 读取后还是原来的对象
1 | public class HungrySingleton implements Serializable{ |
Promise
例子:FTPClientUtil
客户端尽早调用
注意异常处理。
Future<FTPClientUtil> Promise = FTPClientUtil.newInstance("ftpServer","ftpUserName" , "password");
FTPClientUtil 用静态方法调用私有构造函数,并交给执行线程,返回一个凭证1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class FTPClientUtil {
// 实现一个线程池
private volatile static ThreadPoolExecutor threadPoolExecutor;
static{
threadPoolExecutor = new ThreadPoolExecutor(1,
Runtime.getRuntime().availableProcessors() * 2,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
},new ThreadPoolExecutor.CallerRunsPolicy());
}
private final FtpClient ftp = FtpClient.create();
private FTPClientUtil(){}
//Promisor.compute
public static Future<FTPClientUtil> newInstance(final String ftpServer,final String userName,final String password){
Callable<FTPClientUtil> callable = new Callable<FTPClientUtil>() {
public FTPClientUtil call() throws Exception {
FTPClientUtil self = new FTPClientUtil();
self.init(ftpServer,userName,password);
return self;
}
};
// task 相等于promise
final FutureTask<FTPClientUtil> task = new FutureTask<FTPClientUtil>(callable);
// executor
threadPoolExecutor.execute(task);
return task;
}
}
客户端尽晚调用Promise.get()
synchronized
两个用法:对象锁、类锁
对象锁:
方法锁(默认对象为this) 、同步代码块锁(自己指定锁对象)
类锁:修饰静态方法 或 指定锁为Class对象
java每个对象有内部锁。并且该锁有一个内部条件1
2while (accounts[from] < amount)wait();
notifyAll();
- final 匿名内部类中只能使用final?
java 线程状态
线程停止
下面能让线程停止执行的有
正确答案: A B D 你的答案: B C D (错误)
A.sleep();
B.stop();
C.notify();
D.yield();
线程终止
下面那些情况可以终止当前线程的运行?B
A.当一个优先级高的线程进入就绪状态时
B.抛出一个异常时
C.当该线程调用sleep()方法时
D.当创建一个新线程时
Atomic和synchronized
下面关于Atomic(比方说AtomicLong)和synchronized关键字的说法哪些是对的?
正确答案: A B 你的答案: A B (正确)
A.两者都可以用于线程同步
B.synchronized关键字用于同步一段代码,而atomic用于同步某个状态
C.两者都可以用于同步一段代码
D.多个Atomic在代码中嵌套使用时可能会产生死锁
需要多少资源
为了解决进程间的同步和互斥问题,通常采用一种称为信号量机制的方法。若系统中有7个进程共享若干个资源R,每个进程都需要6个资源R,那么使系统不发生死锁的资源R的最少数目是()
k(n-1)+1 = 36
https://blog.csdn.net/javazejian/article/details/72828483
多线程活跃性问题:死锁 饥饿 活锁
活锁:主动释放资源,资源不断在线程中跳动,没有一个线程可以拿到资源正常执行。
并发级别:阻塞、无饥饿、无锁、无等待
阻塞:synchronized
和重入锁
无障碍Obstruction-Free:都可以修改临界区,如果数据竞争就回滚。通过“一致性标记”实现。
无饥饿:公平优先级。
无锁:有一个线程限步内完成操作,离开临界区。线程无穷循环尝试修改共享变量。总有一个线程可以胜出。
无等待:所有线程有限步,不会饥饿。RCU(read-copy-update)。读线程无等待。写副本,合适时机写回。
Gustafon 如果可被并行的代码足够多,那么加速比就能随CPU数量线性增长
F是只能串行的比例
Gustafon = n-F(n-1)
32系统的long(64位)读写线程有干扰
ThreadLocal
可以看成是 Map<Thread,T> 特定于该线程的值
Cache伪共享(False Sharing):多线程读同一cache line的不同变量,变量无关却要线程同步。
X86 cpu的cache line长64字节如果有一对象有成员变量long a,b,c(共24字节)
则可能加载在一个cache line中。
当 CPU1:线程1和cpu2:线程2 都从内存中读取这个对象放入自己的cache line1和2。
当线程1写a则2上的cache line变成Invalid,当2要读b需要重新从内存中读。
本来无关的两个线程,并行变成串行。
- 解决方法:
将a,b分到不同的cache line 采用@Contended
https://www.jianshu.com/p/7f89650367b8
redis 10w OPS
Disruptor
https://tech.meituan.com/disruptor.html
http://ifeve.com/locks-are-bad/
qucik start
1.工厂Event类,用于创建Event类实例对象1
2
3
4
5
6
7
8
9
10
11
12//走IO 要序列化接口
public class OrderEvent {
private long value;
}
public class OrederEventFactory implements EventFactory<OrderEvent> {
public OrderEvent newInstance() {
// 返回空的数据对象
return new OrderEvent();
}
}
2.监听事件类,处理数据(Event类)1
2
3
4
5
6
7
8// 处理数据 就是消费者
public class OrderEventHandler implements EventHandler<OrderEvent> {
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("消费者"+event.getValue());
}
}
3.实例化Disruptor,配置,编写核心组件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public class Main {
public static void main(String[] args) {
OrederEventFactory factory = new OrederEventFactory();
int ringBufferSize = 1024 * 1024;
// 处理器核数
int p = Runtime.getRuntime().availableProcessors();
// 线程池 固定大小的,超过线程数量会放到LinkedBlockingQueue<Runnable>()等待
ExecutorService execotr = Executors.newFixedThreadPool(p);
/**1 实例化Distruptor对象
* Event工厂,容器长度,线程池(一般自己实现 实现RejectedExecutionHandler) ,单生产者,等待策略
*/
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
factory, ringBufferSize,execotr, ProducerType.SINGLE, new BlockingWaitStrategy()
);
// 2.添加消费者类的监听
disruptor.handleEventsWith(new OrderEventHandler());
// 3启动
disruptor.start();
//4.实际存储数据的容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long i = 0; i <100 ; i++) {
bb.putLong(0, i);
producer.sendData(bb);
}
disruptor.shutdown();
execotr.shutdown();
}
}
4.生产者,向Disruptor容器投递数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class OrderEventProducer {
RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data){
// 获取下一个可用的序号
long sequence = ringBuffer.next();
try {
// 空对象
OrderEvent event = ringBuffer.get(sequence);
event.setValue(data.getLong(0));
}finally {
// 发布出去 消费者会监听到
ringBuffer.publish(sequence);
}
}
}
运行:输出 消费者消费了100条消息
RingBuffer
1首尾相接的环,
2有序号 指向数组中下一个可用元素,加数据递增
数组长度有限,生产者可能会追上消费者.
基于数组,有sequencer(序号)和策略(WaitStrategy)
Disruptor
有RingBuffer,消费者线程池、消费者集合ConsumerRepository
等引用。
Sequence
可以看成是一个AtomicLong
用于标识进度。可以消除CPU伪共享。
事件处理过程是沿着序号逐个递增处理,所以原子性 可以多线程并发访问。
协调RingBuffer/Producer/Consumer
的处理进度。
Sequencer 接口
Distruptor的核心
有两个实现类SingleProducerSequencer
,MultiProducerSequencer
实现生产者消费者之间传递数据的并发算法。
Sequence Barrier
用于保持对RingBuffer
的Main Published Sequence
(生产者)和消费者之间的平衡。
判断消费者的处理类。
WaitStrategy 接口 消费者如何等待的策略
主要策略BlockingWaitStrategy
阻塞SleepingWaitStrategy
休眠 异步日志YieldingWaitStrategy
线程切换
EventProcessor 主要事件循环
最重要的实现类BatchEventProcessor
回调EventHandler
接口的实现对象
EventHandler 由用户实现
WorkProcessor 多消费者
保证一个sequence只能被一个消费者消费
多生产多消费者的Disruptor图解
生产者共用一个Sequence
每个消费者有sequence
每个消费者通过Barrier
和生产者的Sequence
协调
核心链路
PV和QPS估计
每天300w PV 80%会在24小时的20%的时间里
$3000 000*0.8)/(86400*0.2*)=139(QPS)$
如果一台机器QPS是58,则需要139/58=3台机器
并发模型
1.进程&线程Apache C10K问题
2.异步非阻塞 Nginx Libevent Nodejs 回调复杂度高
3.协程Golang Erlang Lua
TreeMap是非线程安全的。
跳表:
数据大时性能高于红黑树1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private transient volatile HeadIndex<K,V> head;
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;}
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;}
ConcurrentMap的实现类
ConcurrentSkipListMap
跳表 redis的实现方法1
ConcurrentSkipListMap<Dish.Type, Double> collect = menu.stream().collect(Collectors.groupingByConcurrent(Dish::getType, ConcurrentSkipListMap::new, Collectors.averagingInt(Dish::getCalories)));
ConcurrentHashMap 读不加锁
HashMap的迭代器用modCount 不允许读的时候修改。
HashMap的nodestatic class Node<K,V> implements Map.Entry<K,V>
HashTable的entryprivate static class Entry<K,V> implements Map.Entry<K,V>
ConcurrentHashMap 的entry
value是可以改的
value可能是null1
2
3
4
5static final class MapEntry<K,V> implements Map.Entry<K,V> {
final K key; // non-null
V val; // non-null
final ConcurrentHashMap<K,V> map;
}
V是volatile 线程同步1
2
3
4
5
6
7
8/*这个类不会被暴露出去当用户可变的 Map.Entry
*但是可以用于只读遍历
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;}
线程池ThreadPoolExecutor
1 | private final BlockingQueue<Runnable> workQueue; |
工作队列(阻塞队列)
保护性暂时挂起模式
线程池中线程数量过多,会竞争浪费事件再上下文切换。
线程池大小与处理器利用率之比:
$N_{threads}=N_{CPU}处理器核数*U_{CPU}期望的cpu利用率(0-1)*(1+W/C等待时间与计算时间的比率)$
Stream在背后引入Fork/join框架
Future接口
目标:实现并发,充分利用cpu的核,最大化程序吞吐量,避免因为等待远程服务返回/数据库查询。阻塞线程。
对计算结果的建模,返回一个运算结果的引用给调用方。1
2
3
4
5
6
7
8
9
10ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() throws Exception {
//异步操作
return doSomeLongComputation();
}
});
//异步操作运行同时也能执行
doSomeThingElse();
Fork/Join
工作窃取算法
发布对象:可以得到成员变量引用
对象溢出:内部类
AQS 两个node的队列
多个CPU缓存一致性 MESI缓存一致性!!!
4种数据状态,4种状态转换的cpu操作。
M(Modified)被修改:只缓存在该CPU的缓存中,被修改,与主存不一致。写回主存
Exlusive独享:缓存行只在该CPU的缓存中,未被修改,与主存一致。其它CPU读取内存时变成S状态。被修改则变成M。
Share共享:该缓存行被多个CPU缓存,且与主存相同。当一个CPU修改时其它CPU的变成I。
Invaild无效。
local read
读本地缓存中数据local write
写本地缓存remote read
读取内存数据 remote write
写回主存
NUMA架构:内存分割,被CPU私有化:一致性协议MESIF:目录表
message pack
并发模拟
- PostMan-runner选中测试的接口iteration(并发多少次)delay(每次延迟多少)
- 安装apahce服务器bin下的
ab -n 1000 -c 50 http://localhost:8080/test
本次测试请求总数1000次,同时并发数50
1 | 并发量 |
- JMeter
- 添加线程组File-Test Plan-Add-Threads- Thread Group
用户数:50
虚拟用户增长时长(Ramp-Up Period): 1
Loop Count循环次数:一个虚拟用户做多少次测试 20(共1000次) - 添加实例请求 add Sanper HttpRequest
- 添加监听器 图形结果、查看结果树
- Option 打开Logviewer
Throughput吞吐量
- 添加线程组File-Test Plan-Add-Threads- Thread Group
用代码并发模拟
1 | ExecutorService executorService = Executors.newCachedThreadPool(); |
通常与线程池一起使用
同步器Semaphore信号量 阻塞线程 控制同一时间请求并发量
- 适合控制并发数
- Semaphore(int count)创建count个许可的信号量
每个线程:1
2
3//public void run
semaphore.acquire();//获取1/num个许可证
semaphore.release();//释放许可
Semphore(2)则
A,B,C三个线程,A执行完后C才能开始执行。
CountDownLatch()计数栓:必须发生指定数量的事件后才可以继续使用
阻塞线程,直到满足某种条件线程再继续执行,计数值(count)实际上就是闭锁需要等待的线程数量
- 适合保证线程执行完再做其它处理
- 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
1 | void run(){} |
- 线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。
1 | //倒计时为0执行 |
面试题
解释一下CountDownLatch概念?
CountDownLatch 和CyclicBarrier的不同之处?
给出一些CountDownLatch使用的例子?
CountDownLatch 类中主要的方法?
汇编 jne有条件跳转 jmp无条件跳转
进程-详细-设置相关性:分配到指定cpu执行,开的线程只在指定的执行
java会把线程直接映射到操作系统
javac xxx.java
->.classjavap -c -v xxx
查看虚拟机字节码
Condition
1 | private Condition sufficientFunds; |
阻塞队列
CAS(compareAndSwap)
AtomicReference<V>
模板,可以封装任何
对数据加上时间戳解决ABA过程状态敏感问题(充值10,20,花费10if (money.compareAndSet(m, m + 20, timestap, timestap + 1))
Pair<V> current = pair;
第i个元素在数组中的偏移量1
2
3private static long byteOffset(int i) {
return ((long) i << shift) + base;//左移2,00乘4
}
- shift = 31 -
Integer.numberOfLeadingZeros(scale);
29个前导0->shift=2
前导零:数字转换成二进制数后前面0的个数 - 数组当中每个元素有多宽:int scale =
unsafe.arrayIndexScale(int[].class);
4 - private static final int
base = unsafe.arrayBaseOffset(int[].class);
int的话4个byte
静态工厂方法
casPair
使用cas的方式更新
1. .start
开启新线程调用run .run
不开启新线程
Java中Thread类中的start()方法和run()方法有什么区别?
A.start()方法创建一个线程,并内部调用run()
两种创建方法1.传入一个runnable对象 2.覆盖run
Thread:1
2
3public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}
1 |
|
2. stop不建议使用,会释放所有的锁(monitor)
实例方法
.interrupt()
在run()中处理1
2
3
4
5
6public void run(){
while(true){
if(Thread.currentTread().isInterrupted()){break;}
}
Thread.yeild();
}用
Thread.sleep(2000)
异常处理 sleep会释放cpu时间片,不释放监视器所有权。让给其它线程
在外部对这个sleep的线程中断会抛出异常.isInterrupted
方法可以清除中断状态1
2
3
4
5
6
7
8
9
10
11while(true){
if(Thread.currentTread().isInterrupted()){break;}
}
try{
Thread.sleep(2000);
}catch(InterruptedException e){
//设置中断状态,抛出异常后会清除中断标记位
Thread.currentTread.interrupt();
}
Thread.yeild();
}用自定义标记抛出异常
3. suspend & resume 已弃用
If the target thread holds a lock on the monitor protecting a critical system resource when it is suspended, no thread can access this resource until the target thread is resumed.
If the thread that would [resume] the target thread attempts to lock thismonitor prior to calling resume, deadlock results. Such deadlocks typically manifest themselves as “frozen” processes.
线程2resume线程1发生在线程1suspend之前,当线程1suspend之后没办法resume,导致线程1资源冻结。
1 | package learnThr; |
yield & join
A.join(0)
等待A结束后执行 用wait实现
join实现
As a thread terminates the
this.notifyAll
method is invoked.
It is recommended that applications not usewait
,notify
, ornotifyAll
onThread
instances.不要在Thread实例上使用,会影响系统API
wait该线程释放监视器所有权。 在同步方法中使用
虚拟机实现notifyAll,在Object上。结束时会唤醒所有等待线程。1
2
3
4
5if (millis == 0) {
while (isAlive()) {
wait(0);
}
}
守护线程t.setDaemon(true);
虚拟机不会管守护线程是否存在,直接退出。
优先级
1 | high.setPriority(Thread.MAX_PRIORITY); |
wait & notify
-Object.wait()
线程等待在当前对象上
The current thread must own this object’s monitor.
releases ownership of this monitor and waits until another thread notifies
-Object.notify()
通知等待在这个线程上的对象 随机唤醒一个
Wakes up
a single thread
that is waiting on this object’s monitor.
1 | synchronized (object) { object.wait();} |
同步
- synchronized独占加锁
java.util.concurrent 并发工具包
- CyclicBarrier(int num)等待多个线程到达预定点
- 执行器
- 并发集合
- Frok/Join框架:并行
- atomic包:不需要锁即可完成并发环境变量使用的原子性操作
- locks包