Motan中线程池

/ JDK / 没有评论 / 35浏览

介绍motan中业务线程池。

源码

/**
 * <pre>
 * 
 * 代码和思路主要来自于:
 * 
 * tomcat : 
 * 		org.apache.catalina.core.StandardThreadExecutor
 * 
 * java.util.concurrent
 * threadPoolExecutor execute执行策略: 	   优先offer到queue,queue满后再扩充线程到maxThread,如果已经到了maxThread就reject 
 * 						   		比较适合于CPU密集型应用(比如runnable内部执行的操作都在JVM内部,memory copy, or compute等等)
 * 
 * StandardThreadExecutor execute执行策略:	优先扩充线程到maxThread,再offer到queue,如果满了就reject 
 * 						      	比较适合于业务处理需要远程资源的场景
 * 
 * </pre>
 * 
 * @author maijunsheng
 * @version 创建时间:2013-6-20
 * 
 */
public class StandardThreadExecutor extends ThreadPoolExecutor {

	public static final int DEFAULT_MIN_THREADS = 20;
	public static final int DEFAULT_MAX_THREADS = 200;
	public static final int DEFAULT_MAX_IDLE_TIME = 60 * 1000; // 1 minutes

	protected AtomicInteger submittedTasksCount;	// 正在处理的任务数 
	private int maxSubmittedTaskCount;				// 最大允许同时处理的任务数

	public StandardThreadExecutor() {
		this(DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS);
	}

	public StandardThreadExecutor(int coreThread, int maxThreads) {
		this(coreThread, maxThreads, maxThreads);
	}

	public StandardThreadExecutor(int coreThread, int maxThreads, long keepAliveTime, TimeUnit unit) {
		this(coreThread, maxThreads, keepAliveTime, unit, maxThreads);
	}

	public StandardThreadExecutor(int coreThreads, int maxThreads, int queueCapacity) {
		this(coreThreads, maxThreads, queueCapacity, Executors.defaultThreadFactory());
	}

	public StandardThreadExecutor(int coreThreads, int maxThreads, int queueCapacity, ThreadFactory threadFactory) {
		this(coreThreads, maxThreads, DEFAULT_MAX_IDLE_TIME, TimeUnit.MILLISECONDS, queueCapacity, threadFactory);
	}

	public StandardThreadExecutor(int coreThreads, int maxThreads, long keepAliveTime, TimeUnit unit, int queueCapacity) {
		this(coreThreads, maxThreads, keepAliveTime, unit, queueCapacity, Executors.defaultThreadFactory());
	}

	public StandardThreadExecutor(int coreThreads, int maxThreads, long keepAliveTime, TimeUnit unit,
			int queueCapacity, ThreadFactory threadFactory) {
		this(coreThreads, maxThreads, keepAliveTime, unit, queueCapacity, threadFactory, new AbortPolicy());
	}

	public StandardThreadExecutor(int coreThreads, int maxThreads, long keepAliveTime, TimeUnit unit,
			int queueCapacity, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
		super(coreThreads, maxThreads, keepAliveTime, unit, new ExecutorQueue(), threadFactory, handler);
		((ExecutorQueue) getQueue()).setStandardThreadExecutor(this);

		submittedTasksCount = new AtomicInteger(0);
		
		// 最大并发任务限制: 队列buffer数 + 最大线程数 
		maxSubmittedTaskCount = queueCapacity + maxThreads; 
	}

	public void execute(Runnable command) {
		int count = submittedTasksCount.incrementAndGet();

		// 超过最大的并发任务限制,进行 reject
		// 依赖的LinkedTransferQueue没有长度限制,因此这里进行控制 
		if (count > maxSubmittedTaskCount) {
			submittedTasksCount.decrementAndGet();
			getRejectedExecutionHandler().rejectedExecution(command, this);
		}

		try {
			super.execute(command);
		} catch (RejectedExecutionException rx) {
			// there could have been contention around the queue
			if (!((ExecutorQueue) getQueue()).force(command)) {
				submittedTasksCount.decrementAndGet();

				getRejectedExecutionHandler().rejectedExecution(command, this);
			}
		}
	}

	public int getSubmittedTasksCount() {
		return this.submittedTasksCount.get();
	}
	
	public int getMaxSubmittedTaskCount() {
		return maxSubmittedTaskCount;
	}

	protected void afterExecute(Runnable r, Throwable t) {
		submittedTasksCount.decrementAndGet();
	}
}

/**
 * LinkedTransferQueue 能保证更高性能,相比与LinkedBlockingQueue有明显提升 
 * 
 * <pre>
 * 		1) 不过LinkedTransferQueue的缺点是没有队列长度控制,需要在外层协助控制
 * </pre>
 * 
 * @author maijunsheng
 *
 */
class ExecutorQueue extends LinkedTransferQueue<Runnable> {
	private static final long serialVersionUID = -265236426751004839L;
	StandardThreadExecutor threadPoolExecutor;

	public ExecutorQueue() {
		super();
	}

	public void setStandardThreadExecutor(StandardThreadExecutor threadPoolExecutor) {
		this.threadPoolExecutor = threadPoolExecutor;
	}

	// 注:代码来源于 tomcat 
	public boolean force(Runnable o) {
		if (threadPoolExecutor.isShutdown()) {
			throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
		}
		// forces the item onto the queue, to be used if the task is rejected
		return super.offer(o);
	}

	// 注:tomcat的代码进行一些小变更 
	public boolean offer(Runnable o) {
		int poolSize = threadPoolExecutor.getPoolSize();

		// we are maxed out on threads, simply queue the object
		if (poolSize == threadPoolExecutor.getMaximumPoolSize()) {
			return super.offer(o);
		}
		// we have idle threads, just add it to the queue
		// note that we don't use getActiveCount(), see BZ 49730
		if (threadPoolExecutor.getSubmittedTasksCount() <= poolSize) {
			return super.offer(o);
		}
		// if we have less threads than maximum force creation of a new
		// thread
                // 主要是这里,当线程没有到达最大数量,则返回false,那么内部的workQueue会offer失败,从而会创建新线程池。
		if (poolSize < threadPoolExecutor.getMaximumPoolSize()) {
			return false;
		}
		// if we reached here, we need to add it to the queue
		return super.offer(o);
	}
}

两种实现方案总结

JDK线程池提交任务三步曲

主要看java.util.concurrent.ThreadPoolExecutor#execute方法。

上面的注释:

1. If fewer than corePoolSize threads are running, try to
start a new thread with the given command as its first
task.  The call to addWorker atomically checks runState and
workerCount, and so prevents false alarms that would add
threads when it shouldn't, by returning false.

2. If a task can be successfully queued, then we still need
to double-check whether we should have added a thread
(because existing ones died since last checking) or that
the pool shut down since entry into this method. So we
recheck state and if necessary roll back the enqueuing if
stopped, or start a new thread if there are none.

3. If we cannot queue task, then we try to add a new
thread.  If it fails, we know we are shut down or saturated
and so reject the task.
  1. 如果当前线程池的数量小于corePoolSize,则创建新线程
  2. 如果可以添加到任务队列,则添加
  3. 不能添加任务到队列,则创建新线程,直到任务被拒绝。

motan定义的线程池中,为了让线程优先于任务队列扩充,自定义了ExecutorQueue。当线程没有到达最大数量,offer方法返回false,从而第二步没有满足条件,到第三步创建新线程。