欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:Dubbo線程池

柚子快報邀請碼778899分享:Dubbo線程池

http://yzkb.51969.com/

前言

? ? Dubbo使用Netty作為網(wǎng)絡(luò)調(diào)用框架,Netty是一個Reactor模型的框架,線程模型分為boss線程池和worker線程池,boss線程池負(fù)責(zé)監(jiān)聽、分配事件,worker線程池負(fù)責(zé)處理事件,簡單說就是boss線程池負(fù)責(zé)hold請求,并分發(fā)到worker池,worker線程池負(fù)責(zé)處理具體事件。

? ? dubbo在原本的netty中的線程(boss線程和worker)做了一些修改,將其定義為io線程,而后由實(shí)現(xiàn)了一套用于處理業(yè)務(wù)的業(yè)務(wù)線程池,這就和上一篇介紹的Dubbo協(xié)議下的服務(wù)端線程模型產(chǎn)生了關(guān)聯(lián),dubbo的io線程監(jiān)聽請求,業(yè)務(wù)處理由dubbo自定義的線程池處理,這里將請求分發(fā)到具體的業(yè)務(wù)線程池就是由Dispatcher實(shí)現(xiàn)的,默認(rèn)是AllDispatcher,上一篇已經(jīng)簡單介紹了Dubbo協(xié)議的線程池的分發(fā)模型,這篇文章就介紹下Dubbo究竟自定義了哪幾種線程池的實(shí)現(xiàn),并且都是怎么實(shí)現(xiàn)的。

注:Apache Dubbo版本為3.0.7

Dubbo線程池接口ThreadPool

? ? Dubbo自定義的線程池的核心接口是org.apache.dubbo.common.threadpool.ThreadPool,并且提供了四種實(shí)現(xiàn)分別是CachedThreadPool、FixedThreadPool、LimitedThreadPool、EagerThreadPool,ThreadPool接口是SPI的,如果不指定線程池的具體實(shí)現(xiàn)默認(rèn)是fixed,在項(xiàng)目中配置如下:配置線程池類型是fixed,線程數(shù)為100,線程模型是all

xml

復(fù)制代碼

ThreadPool代碼如下,接下來分別簡單介紹一下四種線程池的具體實(shí)現(xiàn)

java

復(fù)制代碼

@SPI(value = "fixed", scope = ExtensionScope.FRAMEWORK)

public interface ThreadPool {

/**

* Thread pool

*

* @param url URL contains thread parameter

* @return thread pool */

@Adaptive({THREADPOOL_KEY})

Executor getExecutor(URL url);

}

CachedThreadPool緩存線程池

? ? 該線程池是緩存類型的,當(dāng)空閑到一定時間時會將線程刪掉,使用時再創(chuàng)建,具體dubbo的實(shí)現(xiàn)如下,代碼實(shí)現(xiàn)很簡單,就是使用JUC的ThreadPoolExecutor創(chuàng)建了一個緩存類型的線程池,將maximumPoolSize設(shè)置成Integer.MAX_VALUE,keepAliveTime設(shè)置成60000毫秒,隊(duì)列大小設(shè)置成0,當(dāng)超過任務(wù)數(shù)超過corePoolSize就會直接創(chuàng)建worker線程,當(dāng)線程空閑60s后就會被銷毀。

public class CachedThreadPool implements ThreadPool {

@Override

public Executor getExecutor(URL url) {

String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));

int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);

int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);

int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);

int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);

return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,

queues == 0 ? new SynchronousQueue() :

(queues < 0 ? new LinkedBlockingQueue()

: new LinkedBlockingQueue(queues)),

new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));

}

FixedThreadPool固定線程數(shù)的線程池

? ? 該線程池是固定線程數(shù)的線程池實(shí)現(xiàn),具體實(shí)現(xiàn)也是使用JUC的ThreadPoolExecutor創(chuàng)建了一個固定線程數(shù)的線程池,通過url中配置的threads,將corePoolSize和maximumPoolSize都設(shè)置成threads的數(shù)量,并且keepAliveTime設(shè)置成0。

public class FixedThreadPool implements ThreadPool {

@Override

public Executor getExecutor(URL url) {

String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));

int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);

int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);

return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,

queues == 0 ? new SynchronousQueue() :

(queues < 0 ? new LinkedBlockingQueue()

: new LinkedBlockingQueue(queues)),

new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));

}

}

LimitedThreadPool可伸縮線程池

? ? 雖然叫可伸縮線程池,但是實(shí)際上只能伸不能縮,官網(wǎng)上說是為了突然大量的流量引起性能問題,具體實(shí)現(xiàn)就是將keepAliveTime設(shè)置成無限大,這樣當(dāng)隊(duì)列滿了后就會創(chuàng)建線程達(dá)到maximumPoolSize,新創(chuàng)建的這些線程因?yàn)閗eepAliveTime設(shè)置成無限大所以也不會銷毀了。

public class LimitedThreadPool implements ThreadPool {

@Override

public Executor getExecutor(URL url) {

String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));

int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);

int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);

int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);

return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,

queues == 0 ? new SynchronousQueue() :

(queues < 0 ? new LinkedBlockingQueue()

: new LinkedBlockingQueue(queues)),

new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));

}

}

EagerThreadPool

? ? Eager單詞是渴望的,熱切地的意思,這個線程池所實(shí)現(xiàn)的邏輯是,當(dāng)任務(wù)數(shù)超過corePoolSize但小于maximumPoolSize時不是將新任務(wù)放到隊(duì)列中,而是優(yōu)先創(chuàng)建新的worker線程,當(dāng)線程數(shù)已經(jīng)達(dá)到maximumPoolSize,接下來新的任務(wù)才會放到阻塞隊(duì)列中,阻塞隊(duì)列滿了會拋出RejectedExecutionException。

? ? EagerThreadPool線程池就不是通過JUC的ThreadPoolExecutor實(shí)現(xiàn)的了,而是繼承ThreadPoolExecutor自己實(shí)現(xiàn)一些邏輯,下面一步一步看。

EagerThreadPool

? ? Dubbo自己實(shí)現(xiàn)了阻塞隊(duì)列TaskQueue和線程池EagerThreadPoolExecutor,從EagerThreadPool的代碼中看不到該類型線程池的核心邏輯,核心邏輯是在TaskQueue代碼中,這里跳過直接看TaskQueue代碼。

public class EagerThreadPool implements ThreadPool {

@Override

public Executor getExecutor(URL url) {

String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));

int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);

int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);

int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);

int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);

// init queue and executor

TaskQueue taskQueue = new TaskQueue(queues <= 0 ? 1 : queues);

EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,

threads,

alive,

TimeUnit.MILLISECONDS,

taskQueue,

new NamedInternalThreadFactory(name, true),

new AbortPolicyWithReport(name, url));

taskQueue.setExecutor(executor);

return executor;

}

}

TaskQueue

? ? Dubbo的EagerThreadPool是通過TaskQueue的offer方法實(shí)現(xiàn)的,邏輯就是當(dāng)提交到線程池任務(wù)時,如果任務(wù)數(shù)大于corePoolSize,會將任務(wù)offer到TaskQueue中,這時如果活躍的線程數(shù)大于等于線程池大小,并且當(dāng)前線程數(shù)小于maximumPoolSize時就會偽裝成放入到隊(duì)列失敗,這時線程池就會創(chuàng)建線程,從而實(shí)現(xiàn)超過corePoolSize不超過maximumPoolSize時創(chuàng)建worker線程而不是將任務(wù)放入到隊(duì)列中。

public class TaskQueue extends LinkedBlockingQueue {

private static final long serialVersionUID = -2635853580887179627L;

private EagerThreadPoolExecutor executor;

public TaskQueue(int capacity) {

super(capacity);

}

public void setExecutor(EagerThreadPoolExecutor exec) {

executor = exec;

}

@Override

public boolean offer(Runnable runnable) {

if (executor == null) {

throw new RejectedExecutionException("The task queue does not have executor!");

}

int currentPoolThreadSize = executor.getPoolSize();

// have free worker. put task into queue to let the worker deal with task.

if (executor.getActiveCount() < currentPoolThreadSize) {

return super.offer(runnable);

}

// 偽裝放入隊(duì)列失敗,讓線程池創(chuàng)建線程

if (currentPoolThreadSize < executor.getMaximumPoolSize()) {

return false;

}

// currentPoolThreadSize >= max

return super.offer(runnable);

}

/**

* retry offer task

*

* @param o task

* @return offer success or not

* @throws RejectedExecutionException if executor is terminated.

*/

public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {

if (executor.isShutdown()) {

throw new RejectedExecutionException("Executor is shutdown!");

}

return super.offer(o, timeout, unit);

}

}

EagerThreadPoolExecutor

? ? 當(dāng)任務(wù)數(shù)大于maximumPoolSize時,線程池會拋出RejectedExecutionException,EagerThreadPoolExecutor捕獲這個異常,并且調(diào)用TaskQueue的retryOffer方法嘗試放入隊(duì)列,這樣就實(shí)現(xiàn)了當(dāng)線程數(shù)已經(jīng)達(dá)到maximumPoolSize,接下來新的任務(wù)才會放到阻塞隊(duì)列中,阻塞隊(duì)列滿了會拋出RejectedExecutionException,代碼如下:

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

public EagerThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit, TaskQueue workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

}

@Override

public void execute(Runnable command) {

if (command == null) {

throw new NullPointerException();

}

try {

super.execute(command);

} catch (RejectedExecutionException rx) {

// 重新嘗試將任務(wù)放到隊(duì)列中.

final TaskQueue queue = (TaskQueue) super.getQueue();

try {

if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {

throw new RejectedExecutionException("Queue capacity is full.", rx);

}

} catch (InterruptedException x) {

throw new RejectedExecutionException(x);

}

}

}

}

總結(jié)

? ? Dubbo實(shí)現(xiàn)了自定義線程池,其核心接口是ThreadPool,該接口是SPI的默認(rèn)的實(shí)現(xiàn)是fixed,Dubbo提供了四種實(shí)現(xiàn),分別是CachedThreadPool、FixedThreadPool、LimitedThreadPool、EagerThreadPool。

柚子快報邀請碼778899分享:Dubbo線程池

http://yzkb.51969.com/

參考文章

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/18901982.html

發(fā)布評論

您暫未設(shè)置收款碼

請?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄