柚子快報邀請碼778899分享:Dubbo線程池
柚子快報邀請碼778899分享:Dubbo線程池
前言
? ? Dubbo使用Netty作為網(wǎng)絡(luò)調(diào)用框架,Netty是一個Reactor模型的框架,線程模型分為boss線程池和worker線程池,boss線程池負(fù)責(zé)監(jiān)聽、分配事件,worker線程池負(fù)責(zé)處理事件,簡單說就是boss線程池負(fù)責(zé)hold請求,并分發(fā)到worker池,worker線程池負(fù)責(zé)處理具體事件。
? ? dubbo在原本的netty中的線程(boss線程和worker)做了一些修改,將其定義為io線程,而后由實(shí)現(xiàn)了一套用于處理業(yè)務(wù)的業(yè)務(wù)線程池,這就和上一篇介紹的Dubbo協(xié)議下的服務(wù)端線程模型產(chǎn)生了關(guān)聯(lián),dubbo的io線程監(jiān)聽請求,業(yè)務(wù)處理由dubbo自定義的線程池處理,這里將請求分發(fā)到具體的業(yè)務(wù)線程池就是由Dispatcher實(shí)現(xiàn)的,默認(rèn)是AllDispatcher,上一篇已經(jīng)簡單介紹了Dubbo協(xié)議的線程池的分發(fā)模型,這篇文章就介紹下Dubbo究竟自定義了哪幾種線程池的實(shí)現(xiàn),并且都是怎么實(shí)現(xiàn)的。
注:Apache Dubbo版本為3.0.7
Dubbo線程池接口ThreadPool
? ? Dubbo自定義的線程池的核心接口是org.apache.dubbo.common.threadpool.ThreadPool,并且提供了四種實(shí)現(xiàn)分別是CachedThreadPool、FixedThreadPool、LimitedThreadPool、EagerThreadPool,ThreadPool接口是SPI的,如果不指定線程池的具體實(shí)現(xiàn)默認(rèn)是fixed,在項(xiàng)目中配置如下:配置線程池類型是fixed,線程數(shù)為100,線程模型是all
xml
復(fù)制代碼
ThreadPool代碼如下,接下來分別簡單介紹一下四種線程池的具體實(shí)現(xiàn)
java
復(fù)制代碼
@SPI(value = "fixed", scope = ExtensionScope.FRAMEWORK)
public interface ThreadPool {
/**
* Thread pool
*
* @param url URL contains thread parameter
* @return thread pool */
@Adaptive({THREADPOOL_KEY})
Executor getExecutor(URL url);
}
CachedThreadPool緩存線程池
? ? 該線程池是緩存類型的,當(dāng)空閑到一定時間時會將線程刪掉,使用時再創(chuàng)建,具體dubbo的實(shí)現(xiàn)如下,代碼實(shí)現(xiàn)很簡單,就是使用JUC的ThreadPoolExecutor創(chuàng)建了一個緩存類型的線程池,將maximumPoolSize設(shè)置成Integer.MAX_VALUE,keepAliveTime設(shè)置成60000毫秒,隊(duì)列大小設(shè)置成0,當(dāng)超過任務(wù)數(shù)超過corePoolSize就會直接創(chuàng)建worker線程,當(dāng)線程空閑60s后就會被銷毀。
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue
(queues < 0 ? new LinkedBlockingQueue
: new LinkedBlockingQueue
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
FixedThreadPool固定線程數(shù)的線程池
? ? 該線程池是固定線程數(shù)的線程池實(shí)現(xiàn),具體實(shí)現(xiàn)也是使用JUC的ThreadPoolExecutor創(chuàng)建了一個固定線程數(shù)的線程池,通過url中配置的threads,將corePoolSize和maximumPoolSize都設(shè)置成threads的數(shù)量,并且keepAliveTime設(shè)置成0。
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue
(queues < 0 ? new LinkedBlockingQueue
: new LinkedBlockingQueue
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
LimitedThreadPool可伸縮線程池
? ? 雖然叫可伸縮線程池,但是實(shí)際上只能伸不能縮,官網(wǎng)上說是為了突然大量的流量引起性能問題,具體實(shí)現(xiàn)就是將keepAliveTime設(shè)置成無限大,這樣當(dāng)隊(duì)列滿了后就會創(chuàng)建線程達(dá)到maximumPoolSize,新創(chuàng)建的這些線程因?yàn)閗eepAliveTime設(shè)置成無限大所以也不會銷毀了。
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue
(queues < 0 ? new LinkedBlockingQueue
: new LinkedBlockingQueue
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
EagerThreadPool
? ? Eager單詞是渴望的,熱切地的意思,這個線程池所實(shí)現(xiàn)的邏輯是,當(dāng)任務(wù)數(shù)超過corePoolSize但小于maximumPoolSize時不是將新任務(wù)放到隊(duì)列中,而是優(yōu)先創(chuàng)建新的worker線程,當(dāng)線程數(shù)已經(jīng)達(dá)到maximumPoolSize,接下來新的任務(wù)才會放到阻塞隊(duì)列中,阻塞隊(duì)列滿了會拋出RejectedExecutionException。
? ? EagerThreadPool線程池就不是通過JUC的ThreadPoolExecutor實(shí)現(xiàn)的了,而是繼承ThreadPoolExecutor自己實(shí)現(xiàn)一些邏輯,下面一步一步看。
EagerThreadPool
? ? Dubbo自己實(shí)現(xiàn)了阻塞隊(duì)列TaskQueue和線程池EagerThreadPoolExecutor,從EagerThreadPool的代碼中看不到該類型線程池的核心邏輯,核心邏輯是在TaskQueue代碼中,這里跳過直接看TaskQueue代碼。
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
// init queue and executor
TaskQueue
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
TaskQueue
? ? Dubbo的EagerThreadPool是通過TaskQueue的offer方法實(shí)現(xiàn)的,邏輯就是當(dāng)提交到線程池任務(wù)時,如果任務(wù)數(shù)大于corePoolSize,會將任務(wù)offer到TaskQueue中,這時如果活躍的線程數(shù)大于等于線程池大小,并且當(dāng)前線程數(shù)小于maximumPoolSize時就會偽裝成放入到隊(duì)列失敗,這時線程池就會創(chuàng)建線程,從而實(shí)現(xiàn)超過corePoolSize不超過maximumPoolSize時創(chuàng)建worker線程而不是將任務(wù)放入到隊(duì)列中。
public class TaskQueue
private static final long serialVersionUID = -2635853580887179627L;
private EagerThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
if (executor.getActiveCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// 偽裝放入隊(duì)列失敗,讓線程池創(chuàng)建線程
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
return super.offer(runnable);
}
/**
* retry offer task
*
* @param o task
* @return offer success or not
* @throws RejectedExecutionException if executor is terminated.
*/
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}
EagerThreadPoolExecutor
? ? 當(dāng)任務(wù)數(shù)大于maximumPoolSize時,線程池會拋出RejectedExecutionException,EagerThreadPoolExecutor捕獲這個異常,并且調(diào)用TaskQueue的retryOffer方法嘗試放入隊(duì)列,這樣就實(shí)現(xiàn)了當(dāng)線程數(shù)已經(jīng)達(dá)到maximumPoolSize,接下來新的任務(wù)才會放到阻塞隊(duì)列中,阻塞隊(duì)列滿了會拋出RejectedExecutionException,代碼如下:
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// 重新嘗試將任務(wù)放到隊(duì)列中.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
throw new RejectedExecutionException(x);
}
}
}
}
總結(jié)
? ? Dubbo實(shí)現(xiàn)了自定義線程池,其核心接口是ThreadPool,該接口是SPI的默認(rèn)的實(shí)現(xiàn)是fixed,Dubbo提供了四種實(shí)現(xiàn),分別是CachedThreadPool、FixedThreadPool、LimitedThreadPool、EagerThreadPool。
柚子快報邀請碼778899分享:Dubbo線程池
參考文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。