线程的生命周期是指从线程创建、运行到消亡的整个过程。线程的状态可以表明线程处于生命周期具体那一阶段。线程的状态包括:创建、就绪、运行、阻塞、死亡五个状态。
线程状态间的转化,引用一张网上的图来说明:
Java多线程基本概念
Java多线程的编程主要通过几个基础类来完成的:Thread、Runnable、Object对象的wait()、notify()、notifyAll()方法、synchronized关键字以及Java 5.0后引入的一些多线程工具包。
不管是扩展的Thread类还是实现Runnable接口的实现类来实现多线程编程,最终都是通过Thread对象的API来控制线程的。
在5.0后引入了两个特殊的接口:Callable和Future接口,这两个接口结合使用可实现带返回值的多线程编程。Future接口表示异步计算的结果,它接供检查计算是否完成,未完成等待计算完成,并取得计算结果的机制,执行Callable任务后,就可以取得一个Future的对象,在Future对象上调用get方法就可以得到任务计算的结果。
Callable与Runnable的区别:
* Callable规定的方法是call(),而Runnable规定的方法是run().
* Callable的任务执行后可返回值,而Runnable的任务是不能返回值的。
* call()方法可抛出异常,而run()方法是不能抛出异常的。
* 运行Callable任务可拿到一个Future对象,通过Future对象可了解任务执行情况,可取消任务的执行,还可获取任务执行的结果。
使用Callable的基本编程方式如下:
class foo { ExecutorService executor = ... void doAction(final String[] parameters) throws InterruptedException { Future<String> future = executor.submit(new Callable<String>() { public String call() { // do something here. return result; //返回结果 }}); ….. } } |
前面提到线程最终是用Thread对象来控制的,在Thread对象中我们可以设置线程的优先级。优先级的高低反映线程的重要或紧急程度。线程调度是在优先级基础上的“先到先服务”。
在Thread类中,还有几个特殊的方法:yield()方法和join方法。
yield方法是把CPU的控制权或运行机会让给同优先级的其他线程,但原线程仍处于可运行的状态,它只是让同优先级的线程有可执行的机会。这点与sleep方法不同,sleep方法强制当前运行的线程暂停运行,在其苏醒或睡眠时间到期前,不能返回可运行状态。这样使用sleep方法可以使用当前线程减慢,同时允许较低优先级的线程获得运行机会。
Join方法的作用是让主线程等待子线程终止。即调用join方法后的代码需要在子线程运行结束后才能被执行。
如下:
ChildThread t1 = …;
ParentThread t = …;
class ParentThread extends Thread {
ChildThread t1;
….
public void run() {
…..
t1.start(); // t1.start()必须在t1.joint()被调用前被调用,使用t1线程运行起来。
…..
t1.join(); //如果线程被生成了,但还未被起动,调用它的join()方法是没有作用的。
…… //线程t1结束后,才能运行此处(t1.join() 代码后)的代码。
}
}
特别注意的是:在调用线程的join方法时,该线程必须是已被运行起的线程,即已经调用了该线程的start()方法。
许多文章已提到Thread类的interrupt方法不会中断一个正在运行的线程,但会让线程退出阻塞状态。 当线程在调用Object类的wait(),或Thread类的join()、sleep()方法受阻时抛出异常,退出阻塞状态同时提供了一个应用程序处理线程阻塞中断的机会。它的本质是轮询中断变量标志,这种方式并不是一种抢占式中断。
JDK同时废弃了Thread类的几个方法:stop()、susupend()、resume()方法。在应用程序中可以通过设置变量标志来控制或停止、结束线程。
任务调度框架
JDK提供了一些任务调度框架来执行任务,这样就不需要直接操作Thread类了。
Timer/TimerTask任务调度是JDK中最早引进的任务调度框架。其中 Timer 负责设定 TimerTask 的起始与间隔执行时间。使用者只需要创建一个 TimerTask 的继承类,实现其中 run 方法来定义工作任务,然后将其传给 Timer 执行。
Timer 的设计核心是一个 TaskList 和一个 TaskThread。Timer 将接收到的任务丢到自己的 TaskList 中,TaskList 按照 Task 的最初执行时间进行排序。TimerThread 在创建 Timer 时会启动成为一个守护线程。这个线程会轮询所有任务,找到一个最近要执行的任务,然后休眠,当到达最近要执行任务的开始时间点,TimerThread 被唤醒并执行该任务。之后 TimerThread 更新最近一个要执行的任务,继续休眠。
Timer有两种执行任务的模式,最常用的是schedule,shedule也可以以两种方式执行任务:1.在某个时间(Data);2.在某个固定的时间之后(int delay).
其中值得注意的方法:
1.调用TimerTask的cancel()方法,将退出该任务执行。
2.调用Timer的cancel()方法,将退出所有的任务执行。
说明:Timer的scheduleAtFixedRate模式:在该模式下,Timer会尽量让任务在一个固定的频率下运行。也就是说:运行场景比如是1秒钟后MyTask 执行一次,因为系统繁忙之后的2.5秒后MyTask 才得以执行第二次,此时Timer会记下这个延迟,并尝试在下一个任务的时候弥补这个延迟。那么,在接下来的1.5秒后,MyTask 将执行的三次。"以固定的频率而不是固定的延迟时间去执行一个任务”。
JDK5.0后引入了新的Executor任务调度框架,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。
Executor任务调度框架主要由三个接口和其相应的具体类组成:
• Executor接口:执行Runnable任务的
• ExecutorService接口:继承了Executor的方法,并提供了执行Callable任务和中止任务执行的服务
• ScheduledExecutorService接口:在ExecutorService的基础上,提供了按时间安排执行任务的功能
• Executors工具类:提供得到Executor接口的具体对象的一些静态方法
一般我们可以通过Executors工具类得到Executor接口的具体对象,如下:
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); //创建调度执行器服务类
ExecutorService pool = Executors.newFixedThreadPool(poolSize); //创建执行器服务类
pool.submit(new Callable() {….});
ScheduledExecutorService 中两种最常用的调度方法 ScheduleAtFixedRate 和ScheduleWithFixedDelay。
ScheduleAtFixedRate模式下每次执行时间为上一次任务开始起向后推一个时间间隔,是基于固定时间间隔进行任务调度,ScheduleWithFixedDelay模式下每次执行时间为上一次任务结束起向后推一个时间间隔,是基于不固定时间间隔进行任务调度。
Timer 和 ScheduledExecutor 都提供基于开始时间与重复间隔的任务调度,虽然我们可以借助Calendar类来实现一些更加复杂的调度功能,但实现上使用开源的Quartz将更简单。
线程池
线程池为线程生命周期开销问题和资源不足问题提供了解决方案(由于减少了线程创建和销毁的开销)。一个线程池通常包括四个基本组成部分:线程池管理器、工作线程、任务接口、任务队列。
线程池的使用可以带服务程度性能搞高的好处,但也存在一些风险。常见的多线程编程的风险比如:死锁、资源不足、并发错误等在线程池中也可能存在。线程池的另一个比较严重的风险是线程泄漏,它可能由几个原因造成:
原因1: 对于一个工作线程数目固定的线程池,如果工作线程在运行任务时抛出 了异常,而这些异常或错误没有被捕获并处理,那么这个工作线程就会异常终止并且没有返回到池中,使得线程池永久失去了一个工作线程。当所有的工作线程都异常终止时,线程池也就最终为空,不再有可用的工作线程来处理新的任务了。
原因2:工作线程在执行一个任务时被阻塞,比如等待输入的数据,但是由于某些原因用户一直没提供输入数据,导致这个工作线程一直被阻塞。这样这个工作线程实际上也不执行任何任务了。如果线程池中所有的工作线程都进入了这样的阻塞状态,那么线程池就无法处理新来的任务了。
对于使用了线程池的程序来说,可采用打印Thread Dump来排查线程泄漏(建议每个线程都要有自己的名称),但对没有使用线程池的程序来说,还需要跟踪线程数的增长(原因在于,一个工作线程被阻塞后,其后不断有新增线程被阻塞,表象上就是线程数在不断增长),当线程增长到一程度时,由于线程的切换以及线程本身使用的资源还可能导致应用性能下降,甚至出现OutofMemenry(内存泄漏)等问题。
调整池的大小
线程池的最佳大小取决于可用处理器的数目以及工作队列中的任务的性质。若在一个具有 N 个处理器的系统上只有一个工作队列,其中全部是计算性质的任务,在线程池具有 N 或 N+1 个线程时一般会获得最大的 CPU 利用率。
对于那些可能需要等待 I/O 完成的任务(例如,从套接字读取 HTTP 请求的任务),需要让池的大小超过可用处理器的数目,因为并不是所有线程都一直在工作。通过使用概要分析,您可以估计某个典型请求的等待时间(WT)与服务时间(ST)之间的比例。如果我们将这一比例称之为 WT/ST,那么对于一个具有 N 个处理器的系统,需要设置大约 N*(1+WT/ST) 个线程来保持处理器得到充分利用。
JDK中提供了ThreadPoolExecutor的线程池,它是一种以工作队列为基础的线程池的实现,这样,我们无须再编写自己的线程池了。
ThreadPoolExecutor线程池配置参数:
A.核心和最大的线程池大小:通过把corePoolSize和maximumPoolSize设置为相同的值,可以建立一个大小固定的线程池了。
B.根据需要构造:在默认情况下,只有在新事务要求的时候,ThreadPoolExecutor才开始建立和启动核心的线程,但是可以使用prestartCoreThread或prestartAllCoreThreads动态地重载它。
C.保持活动的时间:如果线程池中当前线程的数量超过了corePoolSize,那么这些超过的线程的空闲时间大于keepAliveTime的时候,它们就会被终止。
D.排队:排队遵循下面的规则:
* 如果正在运行的线程数量少于corePoolSize,Executor总会添加新线程而不会排队。
* 如果corePoolSize或更多数量的线程在运行,Executor总会对请求进行排队而不会添加新线程。
* 如果某个请求不能参与排队,就会建立新线程,除非线程数量超过了maximumPoolSize(在超过的情况下,该事务会被拒绝)。
E.Hook方法:这个类提供了beforeExecute()和afterExecute() hook方法,它们分别在每个事务执行之前和之后被调用。为了使用它们,你必须建立这个类的子类(因为这些方法是受保护的)。
建议使用Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程)来创建线程池,它们已经能满足大多数的使用场景。
同步
Java比较简单的同步机制是使用关键字synchronized和Object对象方法wait/notify。同步的机制是在每一个对象上拥有一个监视器(monitor),同时只允许一个线程持有监视器,并且拥有监视器的线程才允许进行对对象的访问,那些没有获得监视器的线程必须等待直到持有监视器的线程释放监视器。
关键字synchronized的使用有两种方式:synchronized块和synchronized方法。
使用synchronized块时应该注意到,线程所持有的对象锁应当是共享且唯一的。这里需要注意两种错误:
一、将synchronized关键字放在Thread类的run方法上,如:
public class foo extends Thread {
…..public synchronized void run() {
….
}
}放在run方法前的synchronized关键字实际上是不能起到同步的作用。我们知道对于一个成员方法加synchronized关键字,这实际上是以这个成员方法所在的对象本身作为对象锁,这里的使用相当于每一个Thread实例对象用监视器关联其自身。当创建多个Thread实例对象时,就会有多个不同的实例对象锁,这些对象锁并不是共享且唯一的。
但是,如果我们将上述的run方法改为:
public void run() {
synchronized(foo.class) {
…..
}
}
这样,就创建了一个共享且唯一的对象锁。因为我们知道在JVM中,所有被加载的类都有唯一的类对象,不管我们创建某个类的多少个实例,但是它们的类实例仍然是一个。上述修改实际上是将监视器关联到了foo类的类实例上了。
注意:如果采用method级别的同步,那么对象锁为method所在的对象,如果方法是静态方法,对象锁是method所在的类实例 (唯一);
二、对一个可改变的String上使用同步块。
我们知道对象一般通过new在堆中创建,当我们用String s=new String("Hello World"); 时,实际上用将常量池中的对象“Hello World” 复制到堆中,再把堆地址交给引用变量s(Java确保一个字符串常量只有一个拷贝,“Hello World”是一个字符串常量,它们在编译期就被确定了,故放在常用量池中,而new String() 创建的字符串不是常量,不能在编译期就确定,所以new String()创建的字符串不放入常量池中,它们拥有自己的地址空间;这里使用String s = new String(“Hello World”);创建了两个对象)。
如果对s进行修改:s = “New World”;”New World”仍是常量池中的对象,现在把引用变量s指向了字符串"New World",即用"New World"的引用地址把"Hello World"的引用地址覆盖了。所以在修改String变量时,实际是改变了变量引用的内存地址,所以在使用用String变量作同步块时,如果String变量发生变化,就意味着同步块中的对象锁已经发生了变化。
关于String更详细的内容,可以网上搜索关键字 Java String 对象剖析
Synchronized关键字提供了对每个对象相关的隐式监视器锁定的访问,但同时也强制所有锁定的获得和释放均要在一个块结构中,多个锁定必须以相反的顺序进行释放。Synchronized关键字无法处理“hand-over-hand”或“chain locking”:先获取节点 A 的锁定,然后再获取节点 B 的锁定,然后释放 A 并获取 C,然后释放 B 并获取 D,依此类推。
故JDK5.0又提供了一种新的机制来处理更复杂的同步问题:Lock/Condition。
Lock 接口实现允许锁定在不同的作用范围内获取和释放,并允许以任何顺序获取和释放多个锁定。常见的方式如下:
Lock lk = new ReentrantLock(); // ReentrantLock重入锁是Lock的具体类
lk.lock(); //取得锁定
try {
// do something 对共享资源进行操作
} finally {
lk.unlock(); //消掉锁定,锁本身是不会自动解锁的
}
Lock 实现提供了使用 synchronized 方法和语句所没有的其他功能,包括:
1)一个非块结构的获取锁定尝试 (tryLock());
2)一个获取可中断锁定的尝试 (lockInterruptibly()) ;
3)一个获取超时失效锁定的尝试 (tryLock(long, TimeUnit))。
4)unlock():取消锁定,需要注意的是Lock不会自动取消,编程时必须手动解锁。
Lock 实例只是一个普通的对象,它本身可以在 synchronized 语句中作为目标使用,但建议不要混合使用Lock和synchronized。
Condition(条件变量) 替代了 Object 监视器方法的使用,可以更精细控制线程等待与唤醒。(Lock 替代了 synchronized 方法和语句的使用)
Condition(条件变量)的实例化是通过一个Lock对象上调用newCondition()方法获得的,这样,条件就和一个锁对象进行了绑定。Java中的条件变量只能和锁配合使用,来控制并发程序访问竞争资源的安全。
经典的Producer和Consumer问题,在Java 5.0以前是由Object类的wait(), notify()和notifyAll()等方法来实现,在5.0后这些功能可以通过Lock/Condition接口来实现了。
注意:使用新的Lock或ReentrantLock,最佳的实践是结合try/finally块来使用:在try块之前使用lock方法,而在finally中使用unlock方法。
Volatile变量
锁具有两种主要特性:互斥(mutual exclusion) 和可见性(visibility)。Java 语言中的 volatile 变量可以被看作是一种 “程度较轻的 synchronized”, Volatile 变量具有 synchronized 的可见性(visibility),但是不具备原子特性或互斥(mutual exclusion)。
要使 volatile 变量提供理想的线程安全,必须同时满足下面两个条件:
* 对变量的写操作不依赖于当前值。
* 该变量没有包含在具有其他变量的不变式中。
第一个条件的限制使 volatile 变量不能用作线程安全计数器。虽然增量操作(x++)看上去类似一个单独操作,实际上它是一个由读取-修改-写入操作序列组成的组合操作,必须以原子方式执行,而 volatile 不能提供必须的原子特性。定义为 volatile 类型不能够充分实现类的线程安全;从而仍然需要使用同步。
关于Volatile变量的更多内容,可参考IBM网站的《Java 理论与实践: 正确使用 Volatile 变量》
ThreadLocal
ThreadLocal源于一种多线程技术:Thread Local Storage(线程本地存储技术)。
ThreadLocal和其它的同步机制都是为了解决多线程中的对同一变量的访问冲突,在普通的同步机制中,是通过对象加锁来实现多个线程对同一变量的安全访问的。ThreadLocal则为每一个线程维护一个和该线程绑定的变量的副本,从而隔离了多个线程的数据。所以可以说ThreadLocal是一种利用空间来换取时间的多线程编程解决方案。
线程本地存储与同步机制的区别在于:同步是为了解决多个线程对共享资源的并发访问,实现了多个线程之间的通信;而ThreadLocal则隔离多个线程的数据共享。
所以, ThreadLocal 并不解决共享对象的多线程访问问题。通常,通过ThreadLocal.get() 得到的的对象是该线程自己使用的对象,其他线程是不需要访问的,也访问不到的。而各个线程中访问的是不同的对象。
ThreadLocal实现上主要通过内部ThreadLocalMap来实现的,使用好ThreadLocal的关键在使用ThreadLocal类的set()或get()方法时应分清这两个方法是对那一个活动线程中的ThreadLocalMap进行操作。如果ThreadLocal.set()放入的数据本身是多个线程共享的,那么线程的ThreadLocal.get()取得的还是这个共享数据本身,仍然有并发访问的问题。ThreadLocal的正确使用方法是:将ThreadLocal以内部类的形式进行继承,并覆盖原来的initialValue()方法,在这里产生可供线程拥有的本地变量值。
通过看ThreadLocal的实现代码有助于理解ThreadLocal,ThreadLocal的类似实现代码如下:
public class ThreadLocal<T> { private final int threadLocalHashCode = nextHashCode(); private static int nextHashCode = 0; private static final int HASH_INCREMENT = 0x61c88647; private static synchronized int nextHashCode() { int h = nextHashCode; nextHashCode = h + HASH_INCREMENT; return h; }
public ThreadLocal() { }
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) return (T)map.get(this);
// Maps are constructed lazily. if the map for this thread doesn't exist, create it, with this ThreadLocal and its // initial value as its only entry. T value = initialValue(); createMap(t, value); return value; }
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }
ThreadLocalMap getMap(Thread t) { return t.threadLocals; }
void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); } ....... static class ThreadLocalMap { //定制的Hash Map ........ } } |
参考:用J2SE1.5来实现多任务的Java应用程序
IBM Java多线程与并发编程专题 http://www.ibm.com/developerworks/cn/java/j-concurrent/
同步装置
JDK5.0后提供了一个新的多线程并发控制的装置/工具,它允许一组线程互相进行协调运行。
先引用IBM网站的一个表格:
Semaphore | 一个经典的并发工具(计数信号量)。通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。 |
CountDownLatch | 同步辅助类,在完成一组正在其他线程中执行的操作之前,允许一个或多个线程一直等待。用于在保持给定数目的信号、事件或条件前阻塞执行。 |
CyclicBarrier | 同步辅助类,是一个可重置的多路同步点,在某些并行编程风格中很有用。 |
Exchanger | 允许两个线程在 collection 点交换对象,它在多流水线设计中是有用的。 |
1.信号量
Semaphore 信号量通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。Semaphore可以看成是个通行证,线程访问资源池中的资源时必须先取得通行证。如果线程没有取得通行证就会被阻塞进入等待状态。
通常在取得通行证前会调用acquire()阻塞其他的申请者,然后再获取该许可。最后调用 release() 释放一个通行证,从而也可能释放了一个正在阻塞的申请者。
有两个主要的方法:
* public void acquire();
* public void release();
Semaphore提供的通行证数量和资源池的大小一致。
注意的是:信号量仅仅是对池资源进行监控,但不保证线程的安全,因此,在使用时候,应该自己控制线程的安全访问池资源。
网上拷贝的一个示例:
public class Pool {
ArrayList pool = null;
Semaphore pass = null;
public Pool(int size){
//初始化资源池
pool = new ArrayList();
for(int i=0; i
pool.add("Resource "+i);
}
//Semaphore的大小和资源池的大小一致
pass = new Semaphore(size);
}
public String get() throws InterruptedException{
//获取通行证,只有得到通行证后才能得到资源
pass.acquire();
return getResource();
}
public void put(String resource){
//归还通行证,并归还资源
pass.release();
releaseResource(resource);
}
private synchronized String getResource() {
String result = pool.get(0);
pool.remove(0);
System.out.println("Give out "+result);
return result;
}
private synchronized void releaseResource(String resource) {
System.out.println("return "+resource);
pool.add(resource);
}
}
2.同步计数器
同步计数器CountDownLatch的实现是通过一个计数器来控制线程对资源的访问,当计数器不为零时处于等待中。它可用于一个简单的开/关锁存器或入口,也可用于分解计算一个复杂问题。它主要有两个方法:
public void countDown()
递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少。如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程。如果当前计数等于零,则不发生任何操作。
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回 true 值。
引用一个例子:
class Driver2 {
void main() throws InterruptedException {
int N = 10;
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = Executors.newSingleThreadExecutor();
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // 等待所有任务结束(即计数器为0)
………………………..
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown(); //计数器递减
} catch (Exception ex) {
} // return;
}
void doWork(int num) {
System.out.println(num);
}
}
}
3.障碍器
障碍器CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。
CyclicBarrier与CountDownLatchu有些相似,但也存在一些区别:
CountDownLatchu是等待一组线程执行完以后,再执行等待的线程,等待线程可以是一个或几个线程;通过计数器来通信。CyclicBarrier是等待多个子线程都执行到某一点后,再继续执行CyclicBarrier中的任务。
有一个主要方法:
public int await();
引用一个网上的例子:两个线程分别在一个数组里放一个数,当这两个线程都结束后,主线程算出数组里的数的和
public class Test {
public static void main(String[] args) {
//创建障碍器,并设置MainTask为所有定数量的线程都达到障碍点时候所要执行的任务(Runnable)
CyclicBarrier cb = new CyclicBarrier(7, new MainTask());
new SubTask("A", cb).start();
new SubTask("B", cb).start();
new SubTask("C", cb).start();
new SubTask("D", cb).start();
new SubTask("E", cb).start();
new SubTask("F", cb).start();
new SubTask("G", cb).start();
}
}
/**
* 主任务
*/
class MainTask implements Runnable {
public void run() {
System.out.println(">>>>主任务执行了!<<<<");
}
}
/**
* 子任务
*/
class SubTask extends Thread {
private String name;
private CyclicBarrier cb;
SubTask(String name, CyclicBarrier cb) {
this.name = name;
this.cb = cb;
}
public void run() {
System.out.println("[子任务" + name + "]开始执行了!");
for (int i = 0; i < 999999; i++) ; //模拟耗时的任务
System.out.println("[子任务" + name + "]开始执行完成了,并通知障碍器已经完成!");
try {
//通知障碍器已经完成
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
执行结果:
[子任务E]开始执行了!
[子任务E]开始执行完成了,并通知障碍器已经完成!
[子任务F]开始执行了!
[子任务G]开始执行了!
[子任务F]开始执行完成了,并通知障碍器已经完成!
[子任务G]开始执行完成了,并通知障碍器已经完成!
[子任务C]开始执行了!
[子任务B]开始执行了!
[子任务C]开始执行完成了,并通知障碍器已经完成!
[子任务D]开始执行了!
[子任务A]开始执行了!
[子任务D]开始执行完成了,并通知障碍器已经完成!
[子任务B]开始执行完成了,并通知障碍器已经完成!
[子任务A]开始执行完成了,并通知障碍器已经完成!
>>>>主任务执行了!<<<<
4.连接器
连接器Exchanger类,可以用来完成线程间的数据交换。当两个线程通过Exchanger交换了对象,这个交换对于两个线程来说都是安全的。Exchanger仅可以在两个线程之间交换数据。
Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。
引个网上的例子:
class FillAndEmpty {
//初始化一个Exchanger,并规定可交换的信息类型是DataBuffer
Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
DataBuffer initialEmptyBuffer = new DataBufferByte(10, 10); //初始化一个empty Buffer
DataBuffer initialFullBuffer = new DataBufferByte(10, 10); //初始化一个full Buffer
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer); //往empty Buffer添加数据
if (currentBuffer.isFull())
currentBuffer = exchanger.exchange(currentBuffer); //和full Buffer进行交换
}
} catch (InterruptedException ex) {
//...handle...
ex.printStackTrace();
}
}
}
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer); //从full Buffer中取数据
if (currentBuffer.isEmpty())
currentBuffer = exchanger.exchange(currentBuffer); //和empty Buffer进行交换
}
} catch (InterruptedException ex) {
//...handle...
ex.printStackTrace();
}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
public void addToBuffer(DataBuffer data) {
….
}
public void takeFromBuffer() {
….
}
}
当线程A调用Exchange对象的exchange()方法后,他会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行
原子变量与无阻塞算法
关于原子变量,可以参考IBM网站文章:
Java理论与实践:流行的原子 https://www.ibm.com/developerworks/cn/java/j-jtp11234/
原子变量是基于现代处理器(CPU)的硬件支持把两步操作合为一步,避免了不必要的锁定,提高了程序的运行效率。它们实现了CAS(compare-and-set)和 “check-and-act”动作。
Java中的常见的原子变量类(automic variable classes)主要有AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference ……。
这些原子量级的变量主要提供两个方法:
* compareAndSet(expectedValue, newValue): 比较当前的值是否等于expectedValue,若等于把当前值改成newValue,并返回true。若不等,返回false。
* getAndSet(newValue): 把当前值改为newValue,并返回改变前的值。
Java中的原子变量更多地是被用在“非阻塞算法”或“lock-free算法”中。关于非阻塞算法可以参考IBM网站文章:Java 理论与实践: 非阻塞算法简介 http://www.ibm.com/developerworks/cn/java/j-jtp04186/
阻塞队列
java阻塞队列应用于生产者消费者模式、消息传递、并行任务执行和相关并发设计的大多数常见使用上下文。
BlockingQueue是一种特殊的Queue,若BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态直到BlocingkQueue进了新货才会被唤醒。同样,如果BlockingQueue是满的任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有新的空间才会被唤醒继续操作。
ConcurrentLinkedQueue | 一个基于链接节点的高效的、可伸缩的、线程安全的非阻塞 FIFO 队列, 此实现采用了有效的“无等待 (wait-free)”算法 |
LinkedBlockingQueue | 一个基于已链接节点的、范围任意的 blocking queue |
ArrayBlockingQueue | 一个由数组支持的有界阻塞队列。这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。 |
SynchronousQueue | 一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。 |
PriorityBlockingQueue | 一个无界阻塞队列,类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。 |
DelayQueue | 一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。 |
LinkedBlockingDeque | 一个基于已链接节点的、任选范围的阻塞双端队列。是BlockingDeque接口的一个基本实现,以支持 FIFO 和 LIFO(基于堆栈)操作。 |
并发集合
在Java的聚集框架里可以调用Collections.synchronizeCollection(aCollection)将普通聚集改变成同步聚集,使之可用于多线程的环境下。 但同步聚集在一个时刻只允许一个线程访问它,其它想同时访问它的线程会被阻断,导致程序运行效率不高。Java 5.0里提供了几个共点聚集类,它们把以前需要几步才能完成的操作合成一个原子量级的操作,这样就可让多个线程同时对聚集进行操作,避免了锁定,从而提高了程序的运行效率。JDK 1.5提供下面一些集合实现,它们是被设计为用于多线程环境的:
ConcurrentHashMap | 为检索和更新(update)可调整的预期的并发性提供了完整的线程安全的(thread-safe)并发性支持 |
CopyOnWriteArrayList | 一组线程安全的变量集合 |
CopyOnWriteArraySet | 一个线程安全的数组列表(ArrayList)变量 |
ConcurrentSkipListMap | 对应TreeMap |
ConcurrentSkipListSet |
|
在修改原始的数组或集合之前,它们中的每一个都会把下层的数组或集合复制一份。其结果是,读取的速度很快,而更新的速度很慢。
并发集合类为Iterator(迭代子)提供快照式的数据(即使下层数据发生了改变,在Iterator中也不会反映出来)。