柚子快報(bào)邀請(qǐng)碼778899分享:運(yùn)維 【Linux】進(jìn)程池實(shí)例
柚子快報(bào)邀請(qǐng)碼778899分享:運(yùn)維 【Linux】進(jìn)程池實(shí)例
這篇博客講解了進(jìn)程池的創(chuàng)建過(guò)程,并在最后附上了完整代碼。
現(xiàn)在有一個(gè)父進(jìn)程,然后提前創(chuàng)建出一批子進(jìn)程,未來(lái)如果父進(jìn)程master有一些任務(wù)要交給子進(jìn)程去運(yùn)行,而不用像shell,需要執(zhí)行命令才回去創(chuàng)建進(jìn)程,創(chuàng)建進(jìn)程本身也是有成本的。父進(jìn)程要把任務(wù)派發(fā)給一些子進(jìn)程,注定了要進(jìn)行通信,我們可以提前給每一個(gè)進(jìn)程創(chuàng)建管道,由父進(jìn)程持有寫端,子進(jìn)程持有讀端。我們有了管道這種技術(shù),就可以讓父進(jìn)程通過(guò)管道將任務(wù)傳遞給子進(jìn)程,想讓哪個(gè)進(jìn)程執(zhí)行任務(wù),就給哪個(gè)管道寫入任務(wù),我們把提前創(chuàng)建的這批進(jìn)程叫做進(jìn)程池, 這種預(yù)先創(chuàng)建的進(jìn)程就可以大大減少未來(lái)執(zhí)行任務(wù)時(shí)創(chuàng)建進(jìn)程的成本。master只負(fù)責(zé)往管道寫任務(wù),子進(jìn)程只會(huì)等待任務(wù)的到來(lái),一旦來(lái)了就會(huì)處理。如果父進(jìn)程不往管道里寫任務(wù),管道里沒(méi)數(shù)據(jù),管道讀寫端的文件描述符也沒(méi)關(guān),子進(jìn)程會(huì)阻塞等待,等待任務(wù)的道理!!master向哪一個(gè)管道寫入,就是喚醒哪一個(gè)子進(jìn)程來(lái)處理任務(wù)!
這樣就通過(guò)管道實(shí)現(xiàn)了進(jìn)程的協(xié)同,可以由父進(jìn)程定向喚醒一個(gè)或多個(gè)進(jìn)程。我們?cè)诮o子進(jìn)程分配任務(wù)時(shí),不能讓一個(gè)特別忙而是讓它們均衡一些,父進(jìn)程在進(jìn)行任務(wù)劃分時(shí)要做到劃分的負(fù)載均衡!
我們站在父進(jìn)程的角度,創(chuàng)建一個(gè)信道類Channel,
//master
class Channel
{
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
其中,_wfd是管道的寫入端,_subprocessid是對(duì)應(yīng)子進(jìn)程的id,_name表示管道的名字。
并通過(guò)vector來(lái)管理管道:
std::vector
有一個(gè)提示需要交代一下,在C++中,我們的形參命名規(guī)范:
const &:輸入型參數(shù)
& :輸入輸出型參數(shù)
*? :輸出型參數(shù)
我們接下來(lái)就是創(chuàng)建信道和子進(jìn)程,并把它們打印出來(lái),看看我們的代碼框架有沒(méi)有問(wèn)題:
#include
#include
#include
#include
#include
void work(int rfd)
{
while(true)
{
sleep(1);
}
}
//master
class Channel
{
public:
Channel(int wfd,pid_t id,const std::string& name)
:_wfd(wfd)
,_subprocessid(id)
,_name(name)
{
}
int GetWfd() {return _wfd ;}
pid_t GetProcessId(){return _subprocessid;}
std::string GetName(){return _name;}
~Channel()
{
}
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
void CreateChannelAndSub(int num, std::vector
{
for(int i = 0; i { //1.創(chuàng)建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if(n < 0) exit(1); //2.創(chuàng)建子進(jìn)程 pid_t id = fork(); if(id == 0) { //child close(pipefd[1]); work(pipefd[0]); close(pipefd[0]); exit(0); } //father close(pipefd[0]); //a.子進(jìn)程的pid b.父進(jìn)程關(guān)心的管道的w端 //3.構(gòu)造一個(gè)通道的名字 std::string channel_name = "Channel-" + std::to_string(i); channels->push_back(Channel(pipefd[1],id,channel_name)); } } // ./processpool 5 int main(int argc,char* argv[]) { if(argc != 2) { std::cerr << "Usage: " << argv[0] << " processnum" << std::endl; return 1; } std::vector int num = std::stoi(argv[1]); //1.創(chuàng)建信道和子進(jìn)程 CreateChannelAndSub(num, &channels); //for test for(auto& channel : channels) { std::cout<<"==============================="< std::cout< std::cout< std::cout< } sleep(100); return 0; } 通過(guò)運(yùn)行程序,我們給的命令行參數(shù)是10,創(chuàng)建10個(gè)子進(jìn)程,然后打開(kāi)進(jìn)程監(jiān)測(cè)窗口: 可以看到我們創(chuàng)建管道和子進(jìn)程成功,非常好! 我們搭建好框架后,接下來(lái)就要通過(guò)channel控制子進(jìn)程、回收管道和子進(jìn)程。 父進(jìn)程需要給子進(jìn)程發(fā)任務(wù),那任務(wù)是什么呢?父進(jìn)程是沒(méi)有辦法把函數(shù)發(fā)送到管道里的,而任務(wù)其實(shí)就是讓子進(jìn)程執(zhí)行某段代碼,而父子進(jìn)程數(shù)據(jù)可以寫時(shí)拷貝,但是代碼是共享的,所以,我們要構(gòu)建任務(wù),我們可以由父進(jìn)程預(yù)先規(guī)定一些任務(wù),這些任務(wù)本質(zhì)就是一張表(函數(shù)指針數(shù)組),保存了各種方法的地址,未來(lái)我們就可以往管道里寫固定長(zhǎng)度的4字節(jié)的數(shù)組下標(biāo)(任務(wù)碼),所以,我們現(xiàn)在要轉(zhuǎn)過(guò)頭構(gòu)建一批任務(wù),為了方便,我們創(chuàng)建Task.hpp文件,.hpp允許聲明和實(shí)現(xiàn)寫在一個(gè)頭文件里,.hpp文件的缺點(diǎn)是無(wú)法形成庫(kù),只能開(kāi)源式地給別人,一般在開(kāi)源項(xiàng)目里會(huì)用到: #pragma once #include #include #include #include #include #define TaskNum 3 typedef void (*task_t)(); // task_t 函數(shù)指針類型 void Print() { std::cout << "I am print task" << std::endl; } void DownLoad() { std::cout<< "I am a download task" << std::endl; } void Flush() { std::cout<< "I am a flush task" << std::endl; } task_t tasks[TaskNum]; void LoadTask() { srand(time(nullptr) ^ getpid() ^ 1642); tasks[0] = Print; tasks[1] = DownLoad; tasks[2] = Flush; } void ExcuteTask(int number) { if(number < 0 || number > 2) return; tasks[number](); } int SelectTask() { return rand() % TaskNum; } 在我們的代碼中,按順序給各個(gè)子進(jìn)程發(fā)送任務(wù),這種叫輪詢方案,以下是我們的具體實(shí)現(xiàn)代碼: #include #include #include #include #include #include #include "Task.hpp" void work(int rfd) { while(true) { int command = 0; int n = read(rfd,&command,sizeof(command)); if(n == sizeof(int)) { std::cout << "pid is : " << getpid() << " handler task" << std::endl; ExcuteTask(command); } else if(n == 0) { std::cout << "sub process : " << getpid() << "quit" << std::endl; break; } } } //master class Channel { public: Channel(int wfd,pid_t id,const std::string& name) :_wfd(wfd) ,_subprocessid(id) ,_name(name) { } int GetWfd() {return _wfd ;} pid_t GetProcessId(){return _subprocessid;} std::string GetName(){return _name;} void CloseChannel() { close(_wfd); } void Wait() { pid_t rid = waitpid(_subprocessid, nullptr, 0); if(rid > 0) { std::cout << "wait " << rid << " success" << std::endl; } } ~Channel() { } private: int _wfd; pid_t _subprocessid; std::string _name; }; void CreateChannelAndSub(int num, std::vector { for(int i = 0; i { //1.創(chuàng)建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if(n < 0) exit(1); //2.創(chuàng)建子進(jìn)程 pid_t id = fork(); if(id == 0) { //child close(pipefd[1]); work(pipefd[0]); close(pipefd[0]); exit(0); } //father close(pipefd[0]); //a.子進(jìn)程的pid b.父進(jìn)程關(guān)心的管道的w端 //3.構(gòu)造一個(gè)通道的名字 std::string channel_name = "Channel-" + std::to_string(i); channels->push_back(Channel(pipefd[1],id,channel_name)); } } int NextChannel(int channelnum) { static int next = 0; int channel = next; next++; next %= channelnum; return channel; } void SendTaskCommand(Channel& channel, int taskcommand) { write(channel.GetWfd(),&taskcommand,sizeof(taskcommand)); } void ctrlProcessOnce(std::vector { sleep(1); //a. 選擇一個(gè)任務(wù) int taskcommand = SelectTask(); //b. 選擇一個(gè)信道和進(jìn)程 int channel_index = NextChannel(channels.size()); //c. 發(fā)送任務(wù) SendTaskCommand(channels[channel_index],taskcommand); std::cout << std::endl; std::cout << "taskcommand: " << taskcommand << " channel: " \ << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl; } void ctrlProcess(std::vector { if(times > 0) { while(times--) { ctrlProcessOnce(channels); } } else { while(true) { ctrlProcessOnce(channels); } } } void CleanUpChannel(std::vector { for(auto& channel : channels) { channel.CloseChannel(); } for(auto& channel : channels) { channel.Wait(); } } // ./processpool 5 int main(int argc,char* argv[]) { if(argc != 2) { std::cerr << "Usage: " << argv[0] << " processnum" << std::endl; return 1; } std::vector int num = std::stoi(argv[1]); LoadTask(); //1.創(chuàng)建信道和子進(jìn)程 CreateChannelAndSub(num, &channels); //2.通過(guò)channel控制子進(jìn)程 ctrlProcess(channels,num); //3.回收管道和子進(jìn)程 a.關(guān)閉所有的寫端 b.回收子進(jìn)程 CleanUpChannel(channels); sleep(5); return 0; } 其中,上面的代碼有兩個(gè)小細(xì)節(jié)處理: 1.在創(chuàng)建子進(jìn)程時(shí), ,這樣就可以讓子進(jìn)程不關(guān)心管道讀端,只需要從標(biāo)準(zhǔn)輸入讀就行。 這樣就可以將管道的邏輯和子進(jìn)程執(zhí)行任務(wù)的邏輯解耦。 2.子進(jìn)程要執(zhí)行的word本身就是一個(gè)任務(wù),可以作為CreateChannelAndSub的參數(shù)傳入,在然后回調(diào)work。其中task_t task叫做回調(diào)函數(shù),未來(lái)所有子進(jìn)程都回去調(diào)用傳入的task。這樣之后,進(jìn)程池本身的代碼和任務(wù)本身兩個(gè)文件就徹底解耦了?。ò褀ork函數(shù)放到Task.hpp ) ?可是現(xiàn)在我們的代碼還存在一個(gè)BUG,我們來(lái)看, 里面寫了兩個(gè)循環(huán),不可以放到一個(gè)循環(huán)里嗎? ?我們發(fā)現(xiàn),隨著管道的創(chuàng)建,越來(lái)越多的寫端指向第一個(gè)管道,如果創(chuàng)建了10個(gè)子進(jìn)程,那就有10個(gè)寫端指向第一個(gè)管道。所以,如果兩個(gè)循環(huán)寫到一起,就會(huì)從頭向后關(guān)管道的文件描述符,第一個(gè)關(guān)完后,還是有九個(gè)文件描述符指向第一個(gè)管道,管道中對(duì)文件描述符有引用計(jì)數(shù),此時(shí),這個(gè)管道并沒(méi)有向我們預(yù)期的那樣退出,寫端沒(méi)有關(guān)完,讀端什么都讀不到,讀端依舊阻塞,子進(jìn)程不退出,進(jìn)程就阻塞了。 那為什么寫成兩個(gè)循環(huán)就可以呢?因?yàn)楫?dāng)關(guān)掉最后一個(gè)管道時(shí),最后一個(gè)子進(jìn)程指向上一個(gè)管道的寫端就被釋放了,類似于遞歸,從下往上就關(guān)掉了。 我們現(xiàn)在意識(shí)到了這個(gè)問(wèn)題,那我們就可以倒著先關(guān)閉最下面的管道, void CleanUpChannel(std::vector { int num = channels.size()-1; while(num >= 0) { channels[num].CloseChannel(); channels[num--].Wait(); } } 這樣做是沒(méi)有問(wèn)題的,但是我們并沒(méi)有從源頭上解決這個(gè)bug,我們就不應(yīng)該讓這種情況發(fā)生,我們的想法是這樣的,如果是第二次及之后創(chuàng)建子進(jìn)程時(shí),*channels數(shù)組一定不為空,里面一定包含寫端,那就把里面的每一個(gè)管道的寫端關(guān)閉一下就好: 完整的進(jìn)程池代碼附上: Makefile processpool:ProcessPool.cc g++ -o $@ $^ -std=c++11 .PHONY:clean clean: rm -f processpool Task.hpp #pragma once #include #include #include #include #include #define TaskNum 3 typedef void (*task_t)(); // task_t 函數(shù)指針類型 void Print() { std::cout << "I am print task" << std::endl; } void DownLoad() { std::cout<< "I am a download task" << std::endl; } void Flush() { std::cout<< "I am a flush task" << std::endl; } task_t tasks[TaskNum]; void LoadTask() { srand(time(nullptr) ^ getpid() ^ 1642); tasks[0] = Print; tasks[1] = DownLoad; tasks[2] = Flush; } void ExcuteTask(int number) { if(number < 0 || number > 2) return; tasks[number](); } int SelectTask() { return rand() % TaskNum; } void work() { while(true) { int command = 0; int n = read(0,&command,sizeof(command)); if(n == sizeof(int)) { std::cout << "pid is : " << getpid() << " handler task" << std::endl; ExcuteTask(command); } else if(n == 0) { std::cout << "sub process : " << getpid() << "quit" << std::endl; break; } } } ProcessPool.cc #include #include #include #include #include #include #include "Task.hpp" // void work(int rfd) // { // while(true) // { // int command = 0; // int n = read(rfd,&command,sizeof(command)); // if(n == sizeof(int)) // { // std::cout << "pid is : " << getpid() << " handler task" << std::endl; // ExcuteTask(command); // } // else if(n == 0) // { // std::cout << "sub process : " << getpid() << "quit" << std::endl; // break; // } // } // } // master class Channel { public: Channel(int wfd, pid_t id, const std::string &name) : _wfd(wfd), _subprocessid(id), _name(name) { } int GetWfd() { return _wfd; } pid_t GetProcessId() { return _subprocessid; } std::string GetName() { return _name; } void CloseChannel() { close(_wfd); } void Wait() { pid_t rid = waitpid(_subprocessid, nullptr, 0); if (rid > 0) { std::cout << "wait " << rid << " success" << std::endl; } } ~Channel() { } private: int _wfd; pid_t _subprocessid; std::string _name; }; void CreateChannelAndSub(int num, std::vector { for (int i = 0; i < num; i++) { // 1.創(chuàng)建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if (n < 0) exit(1); // 2.創(chuàng)建子進(jìn)程 pid_t id = fork(); if (id == 0) { // 第二次及之后創(chuàng)建管道 if (!channels->empty()) { for (auto &channel : *channels) { channel.CloseChannel(); } } // child close(pipefd[1]); dup2(pipefd[0], 0); // 將管道的讀端,重定向到標(biāo)準(zhǔn)輸入 task(); close(pipefd[0]); exit(0); } // father close(pipefd[0]); // a.子進(jìn)程的pid b.父進(jìn)程關(guān)心的管道的w端 // 3.構(gòu)造一個(gè)通道的名字 std::string channel_name = "Channel-" + std::to_string(i); channels->push_back(Channel(pipefd[1], id, channel_name)); } } int NextChannel(int channelnum) { static int next = 0; int channel = next; next++; next %= channelnum; return channel; } void SendTaskCommand(Channel &channel, int taskcommand) { write(channel.GetWfd(), &taskcommand, sizeof(taskcommand)); } void ctrlProcessOnce(std::vector { sleep(1); // a. 選擇一個(gè)任務(wù) int taskcommand = SelectTask(); // b. 選擇一個(gè)信道和進(jìn)程 int channel_index = NextChannel(channels.size()); // c. 發(fā)送任務(wù) SendTaskCommand(channels[channel_index], taskcommand); std::cout << std::endl; std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl; } void ctrlProcess(std::vector { if (times > 0) { while (times--) { ctrlProcessOnce(channels); } } else { while (true) { ctrlProcessOnce(channels); } } } void CleanUpChannel(std::vector { int num = channels.size() - 1; while (num >= 0) { channels[num].CloseChannel(); channels[num--].Wait(); } // for(auto& channel : channels) // { // channel.CloseChannel(); // } // for(auto& channel : channels) // { // channel.Wait(); // } } // ./processpool 5 int main(int argc, char *argv[]) { if (argc != 2) { std::cerr << "Usage: " << argv[0] << " processnum" << std::endl; return 1; } std::vector int num = std::stoi(argv[1]); LoadTask(); // 1.創(chuàng)建信道和子進(jìn)程 CreateChannelAndSub(num, &channels, work); // 2.通過(guò)channel控制子進(jìn)程 ctrlProcess(channels, num); // 3.回收管道和子進(jìn)程 a.關(guān)閉所有的寫端 b.回收子進(jìn)程 CleanUpChannel(channels); sleep(5); return 0; } 柚子快報(bào)邀請(qǐng)碼778899分享:運(yùn)維 【Linux】進(jìn)程池實(shí)例 精彩內(nèi)容
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。