欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)邀請(qǐng)碼778899分享:運(yùn)維 【Linux】進(jìn)程池實(shí)例

柚子快報(bào)邀請(qǐng)碼778899分享:運(yùn)維 【Linux】進(jìn)程池實(shí)例

http://yzkb.51969.com/

這篇博客講解了進(jìn)程池的創(chuàng)建過(guò)程,并在最后附上了完整代碼。

現(xiàn)在有一個(gè)父進(jìn)程,然后提前創(chuàng)建出一批子進(jìn)程,未來(lái)如果父進(jìn)程master有一些任務(wù)要交給子進(jìn)程去運(yùn)行,而不用像shell,需要執(zhí)行命令才回去創(chuàng)建進(jìn)程,創(chuàng)建進(jìn)程本身也是有成本的。父進(jìn)程要把任務(wù)派發(fā)給一些子進(jìn)程,注定了要進(jìn)行通信,我們可以提前給每一個(gè)進(jìn)程創(chuàng)建管道,由父進(jìn)程持有寫端,子進(jìn)程持有讀端。我們有了管道這種技術(shù),就可以讓父進(jìn)程通過(guò)管道將任務(wù)傳遞給子進(jìn)程,想讓哪個(gè)進(jìn)程執(zhí)行任務(wù),就給哪個(gè)管道寫入任務(wù),我們把提前創(chuàng)建的這批進(jìn)程叫做進(jìn)程池, 這種預(yù)先創(chuàng)建的進(jìn)程就可以大大減少未來(lái)執(zhí)行任務(wù)時(shí)創(chuàng)建進(jìn)程的成本。master只負(fù)責(zé)往管道寫任務(wù),子進(jìn)程只會(huì)等待任務(wù)的到來(lái),一旦來(lái)了就會(huì)處理。如果父進(jìn)程不往管道里寫任務(wù),管道里沒(méi)數(shù)據(jù),管道讀寫端的文件描述符也沒(méi)關(guān),子進(jìn)程會(huì)阻塞等待,等待任務(wù)的道理!!master向哪一個(gè)管道寫入,就是喚醒哪一個(gè)子進(jìn)程來(lái)處理任務(wù)!

這樣就通過(guò)管道實(shí)現(xiàn)了進(jìn)程的協(xié)同,可以由父進(jìn)程定向喚醒一個(gè)或多個(gè)進(jìn)程。我們?cè)诮o子進(jìn)程分配任務(wù)時(shí),不能讓一個(gè)特別忙而是讓它們均衡一些,父進(jìn)程在進(jìn)行任務(wù)劃分時(shí)要做到劃分的負(fù)載均衡!

我們站在父進(jìn)程的角度,創(chuàng)建一個(gè)信道類Channel,

//master

class Channel

{

private:

int _wfd;

pid_t _subprocessid;

std::string _name;

};

其中,_wfd是管道的寫入端,_subprocessid是對(duì)應(yīng)子進(jìn)程的id,_name表示管道的名字。

并通過(guò)vector來(lái)管理管道:

std::vector channels;

有一個(gè)提示需要交代一下,在C++中,我們的形參命名規(guī)范:

const &:輸入型參數(shù)

& :輸入輸出型參數(shù)

*? :輸出型參數(shù)

我們接下來(lái)就是創(chuàng)建信道和子進(jìn)程,并把它們打印出來(lái),看看我們的代碼框架有沒(méi)有問(wèn)題:

#include

#include

#include

#include

#include

void work(int rfd)

{

while(true)

{

sleep(1);

}

}

//master

class Channel

{

public:

Channel(int wfd,pid_t id,const std::string& name)

:_wfd(wfd)

,_subprocessid(id)

,_name(name)

{

}

int GetWfd() {return _wfd ;}

pid_t GetProcessId(){return _subprocessid;}

std::string GetName(){return _name;}

~Channel()

{

}

private:

int _wfd;

pid_t _subprocessid;

std::string _name;

};

void CreateChannelAndSub(int num, std::vector* channels)

{

for(int i = 0; i

{

//1.創(chuàng)建管道

int pipefd[2] = {0};

int n = pipe(pipefd);

if(n < 0) exit(1);

//2.創(chuàng)建子進(jìn)程

pid_t id = fork();

if(id == 0)

{

//child

close(pipefd[1]);

work(pipefd[0]);

close(pipefd[0]);

exit(0);

}

//father

close(pipefd[0]);

//a.子進(jìn)程的pid b.父進(jìn)程關(guān)心的管道的w端

//3.構(gòu)造一個(gè)通道的名字

std::string channel_name = "Channel-" + std::to_string(i);

channels->push_back(Channel(pipefd[1],id,channel_name));

}

}

// ./processpool 5

int main(int argc,char* argv[])

{

if(argc != 2)

{

std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;

return 1;

}

std::vector channels;

int num = std::stoi(argv[1]);

//1.創(chuàng)建信道和子進(jìn)程

CreateChannelAndSub(num, &channels);

//for test

for(auto& channel : channels)

{

std::cout<<"==============================="<

std::cout<

std::cout<

std::cout<

}

sleep(100);

return 0;

}

通過(guò)運(yùn)行程序,我們給的命令行參數(shù)是10,創(chuàng)建10個(gè)子進(jìn)程,然后打開(kāi)進(jìn)程監(jiān)測(cè)窗口:

可以看到我們創(chuàng)建管道和子進(jìn)程成功,非常好!

我們搭建好框架后,接下來(lái)就要通過(guò)channel控制子進(jìn)程、回收管道和子進(jìn)程。

父進(jìn)程需要給子進(jìn)程發(fā)任務(wù),那任務(wù)是什么呢?父進(jìn)程是沒(méi)有辦法把函數(shù)發(fā)送到管道里的,而任務(wù)其實(shí)就是讓子進(jìn)程執(zhí)行某段代碼,而父子進(jìn)程數(shù)據(jù)可以寫時(shí)拷貝,但是代碼是共享的,所以,我們要構(gòu)建任務(wù),我們可以由父進(jìn)程預(yù)先規(guī)定一些任務(wù),這些任務(wù)本質(zhì)就是一張表(函數(shù)指針數(shù)組),保存了各種方法的地址,未來(lái)我們就可以往管道里寫固定長(zhǎng)度的4字節(jié)的數(shù)組下標(biāo)(任務(wù)碼),所以,我們現(xiàn)在要轉(zhuǎn)過(guò)頭構(gòu)建一批任務(wù),為了方便,我們創(chuàng)建Task.hpp文件,.hpp允許聲明和實(shí)現(xiàn)寫在一個(gè)頭文件里,.hpp文件的缺點(diǎn)是無(wú)法形成庫(kù),只能開(kāi)源式地給別人,一般在開(kāi)源項(xiàng)目里會(huì)用到:

#pragma once

#include

#include

#include

#include

#include

#define TaskNum 3

typedef void (*task_t)(); // task_t 函數(shù)指針類型

void Print()

{

std::cout << "I am print task" << std::endl;

}

void DownLoad()

{

std::cout<< "I am a download task" << std::endl;

}

void Flush()

{

std::cout<< "I am a flush task" << std::endl;

}

task_t tasks[TaskNum];

void LoadTask()

{

srand(time(nullptr) ^ getpid() ^ 1642);

tasks[0] = Print;

tasks[1] = DownLoad;

tasks[2] = Flush;

}

void ExcuteTask(int number)

{

if(number < 0 || number > 2) return;

tasks[number]();

}

int SelectTask()

{

return rand() % TaskNum;

}

在我們的代碼中,按順序給各個(gè)子進(jìn)程發(fā)送任務(wù),這種叫輪詢方案,以下是我們的具體實(shí)現(xiàn)代碼:

#include

#include

#include

#include

#include

#include

#include "Task.hpp"

void work(int rfd)

{

while(true)

{

int command = 0;

int n = read(rfd,&command,sizeof(command));

if(n == sizeof(int))

{

std::cout << "pid is : " << getpid() << " handler task" << std::endl;

ExcuteTask(command);

}

else if(n == 0)

{

std::cout << "sub process : " << getpid() << "quit" << std::endl;

break;

}

}

}

//master

class Channel

{

public:

Channel(int wfd,pid_t id,const std::string& name)

:_wfd(wfd)

,_subprocessid(id)

,_name(name)

{

}

int GetWfd() {return _wfd ;}

pid_t GetProcessId(){return _subprocessid;}

std::string GetName(){return _name;}

void CloseChannel()

{

close(_wfd);

}

void Wait()

{

pid_t rid = waitpid(_subprocessid, nullptr, 0);

if(rid > 0)

{

std::cout << "wait " << rid << " success" << std::endl;

}

}

~Channel()

{

}

private:

int _wfd;

pid_t _subprocessid;

std::string _name;

};

void CreateChannelAndSub(int num, std::vector* channels)

{

for(int i = 0; i

{

//1.創(chuàng)建管道

int pipefd[2] = {0};

int n = pipe(pipefd);

if(n < 0) exit(1);

//2.創(chuàng)建子進(jìn)程

pid_t id = fork();

if(id == 0)

{

//child

close(pipefd[1]);

work(pipefd[0]);

close(pipefd[0]);

exit(0);

}

//father

close(pipefd[0]);

//a.子進(jìn)程的pid b.父進(jìn)程關(guān)心的管道的w端

//3.構(gòu)造一個(gè)通道的名字

std::string channel_name = "Channel-" + std::to_string(i);

channels->push_back(Channel(pipefd[1],id,channel_name));

}

}

int NextChannel(int channelnum)

{

static int next = 0;

int channel = next;

next++;

next %= channelnum;

return channel;

}

void SendTaskCommand(Channel& channel, int taskcommand)

{

write(channel.GetWfd(),&taskcommand,sizeof(taskcommand));

}

void ctrlProcessOnce(std::vector& channels)

{

sleep(1);

//a. 選擇一個(gè)任務(wù)

int taskcommand = SelectTask();

//b. 選擇一個(gè)信道和進(jìn)程

int channel_index = NextChannel(channels.size());

//c. 發(fā)送任務(wù)

SendTaskCommand(channels[channel_index],taskcommand);

std::cout << std::endl;

std::cout << "taskcommand: " << taskcommand << " channel: " \

<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;

}

void ctrlProcess(std::vector& channels, int times = -1)

{

if(times > 0)

{

while(times--)

{

ctrlProcessOnce(channels);

}

}

else

{

while(true)

{

ctrlProcessOnce(channels);

}

}

}

void CleanUpChannel(std::vector& channels)

{

for(auto& channel : channels)

{

channel.CloseChannel();

}

for(auto& channel : channels)

{

channel.Wait();

}

}

// ./processpool 5

int main(int argc,char* argv[])

{

if(argc != 2)

{

std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;

return 1;

}

std::vector channels;

int num = std::stoi(argv[1]);

LoadTask();

//1.創(chuàng)建信道和子進(jìn)程

CreateChannelAndSub(num, &channels);

//2.通過(guò)channel控制子進(jìn)程

ctrlProcess(channels,num);

//3.回收管道和子進(jìn)程 a.關(guān)閉所有的寫端 b.回收子進(jìn)程

CleanUpChannel(channels);

sleep(5);

return 0;

}

其中,上面的代碼有兩個(gè)小細(xì)節(jié)處理:

1.在創(chuàng)建子進(jìn)程時(shí), ,這樣就可以讓子進(jìn)程不關(guān)心管道讀端,只需要從標(biāo)準(zhǔn)輸入讀就行。

這樣就可以將管道的邏輯和子進(jìn)程執(zhí)行任務(wù)的邏輯解耦。

2.子進(jìn)程要執(zhí)行的word本身就是一個(gè)任務(wù),可以作為CreateChannelAndSub的參數(shù)傳入,在然后回調(diào)work。其中task_t task叫做回調(diào)函數(shù),未來(lái)所有子進(jìn)程都回去調(diào)用傳入的task。這樣之后,進(jìn)程池本身的代碼和任務(wù)本身兩個(gè)文件就徹底解耦了?。ò褀ork函數(shù)放到Task.hpp )

?可是現(xiàn)在我們的代碼還存在一個(gè)BUG,我們來(lái)看,

里面寫了兩個(gè)循環(huán),不可以放到一個(gè)循環(huán)里嗎?

?我們發(fā)現(xiàn),隨著管道的創(chuàng)建,越來(lái)越多的寫端指向第一個(gè)管道,如果創(chuàng)建了10個(gè)子進(jìn)程,那就有10個(gè)寫端指向第一個(gè)管道。所以,如果兩個(gè)循環(huán)寫到一起,就會(huì)從頭向后關(guān)管道的文件描述符,第一個(gè)關(guān)完后,還是有九個(gè)文件描述符指向第一個(gè)管道,管道中對(duì)文件描述符有引用計(jì)數(shù),此時(shí),這個(gè)管道并沒(méi)有向我們預(yù)期的那樣退出,寫端沒(méi)有關(guān)完,讀端什么都讀不到,讀端依舊阻塞,子進(jìn)程不退出,進(jìn)程就阻塞了。

那為什么寫成兩個(gè)循環(huán)就可以呢?因?yàn)楫?dāng)關(guān)掉最后一個(gè)管道時(shí),最后一個(gè)子進(jìn)程指向上一個(gè)管道的寫端就被釋放了,類似于遞歸,從下往上就關(guān)掉了。

我們現(xiàn)在意識(shí)到了這個(gè)問(wèn)題,那我們就可以倒著先關(guān)閉最下面的管道,

void CleanUpChannel(std::vector& channels)

{

int num = channels.size()-1;

while(num >= 0)

{

channels[num].CloseChannel();

channels[num--].Wait();

}

}

這樣做是沒(méi)有問(wèn)題的,但是我們并沒(méi)有從源頭上解決這個(gè)bug,我們就不應(yīng)該讓這種情況發(fā)生,我們的想法是這樣的,如果是第二次及之后創(chuàng)建子進(jìn)程時(shí),*channels數(shù)組一定不為空,里面一定包含寫端,那就把里面的每一個(gè)管道的寫端關(guān)閉一下就好:

完整的進(jìn)程池代碼附上:

Makefile

processpool:ProcessPool.cc

g++ -o $@ $^ -std=c++11

.PHONY:clean

clean:

rm -f processpool

Task.hpp

#pragma once

#include

#include

#include

#include

#include

#define TaskNum 3

typedef void (*task_t)(); // task_t 函數(shù)指針類型

void Print()

{

std::cout << "I am print task" << std::endl;

}

void DownLoad()

{

std::cout<< "I am a download task" << std::endl;

}

void Flush()

{

std::cout<< "I am a flush task" << std::endl;

}

task_t tasks[TaskNum];

void LoadTask()

{

srand(time(nullptr) ^ getpid() ^ 1642);

tasks[0] = Print;

tasks[1] = DownLoad;

tasks[2] = Flush;

}

void ExcuteTask(int number)

{

if(number < 0 || number > 2) return;

tasks[number]();

}

int SelectTask()

{

return rand() % TaskNum;

}

void work()

{

while(true)

{

int command = 0;

int n = read(0,&command,sizeof(command));

if(n == sizeof(int))

{

std::cout << "pid is : " << getpid() << " handler task" << std::endl;

ExcuteTask(command);

}

else if(n == 0)

{

std::cout << "sub process : " << getpid() << "quit" << std::endl;

break;

}

}

}

ProcessPool.cc

#include

#include

#include

#include

#include

#include

#include "Task.hpp"

// void work(int rfd)

// {

// while(true)

// {

// int command = 0;

// int n = read(rfd,&command,sizeof(command));

// if(n == sizeof(int))

// {

// std::cout << "pid is : " << getpid() << " handler task" << std::endl;

// ExcuteTask(command);

// }

// else if(n == 0)

// {

// std::cout << "sub process : " << getpid() << "quit" << std::endl;

// break;

// }

// }

// }

// master

class Channel

{

public:

Channel(int wfd, pid_t id, const std::string &name)

: _wfd(wfd), _subprocessid(id), _name(name)

{

}

int GetWfd() { return _wfd; }

pid_t GetProcessId() { return _subprocessid; }

std::string GetName() { return _name; }

void CloseChannel()

{

close(_wfd);

}

void Wait()

{

pid_t rid = waitpid(_subprocessid, nullptr, 0);

if (rid > 0)

{

std::cout << "wait " << rid << " success" << std::endl;

}

}

~Channel()

{

}

private:

int _wfd;

pid_t _subprocessid;

std::string _name;

};

void CreateChannelAndSub(int num, std::vector *channels, task_t task)

{

for (int i = 0; i < num; i++)

{

// 1.創(chuàng)建管道

int pipefd[2] = {0};

int n = pipe(pipefd);

if (n < 0)

exit(1);

// 2.創(chuàng)建子進(jìn)程

pid_t id = fork();

if (id == 0)

{

// 第二次及之后創(chuàng)建管道

if (!channels->empty())

{

for (auto &channel : *channels)

{

channel.CloseChannel();

}

}

// child

close(pipefd[1]);

dup2(pipefd[0], 0); // 將管道的讀端,重定向到標(biāo)準(zhǔn)輸入

task();

close(pipefd[0]);

exit(0);

}

// father

close(pipefd[0]);

// a.子進(jìn)程的pid b.父進(jìn)程關(guān)心的管道的w端

// 3.構(gòu)造一個(gè)通道的名字

std::string channel_name = "Channel-" + std::to_string(i);

channels->push_back(Channel(pipefd[1], id, channel_name));

}

}

int NextChannel(int channelnum)

{

static int next = 0;

int channel = next;

next++;

next %= channelnum;

return channel;

}

void SendTaskCommand(Channel &channel, int taskcommand)

{

write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));

}

void ctrlProcessOnce(std::vector &channels)

{

sleep(1);

// a. 選擇一個(gè)任務(wù)

int taskcommand = SelectTask();

// b. 選擇一個(gè)信道和進(jìn)程

int channel_index = NextChannel(channels.size());

// c. 發(fā)送任務(wù)

SendTaskCommand(channels[channel_index], taskcommand);

std::cout << std::endl;

std::cout << "taskcommand: " << taskcommand << " channel: "

<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;

}

void ctrlProcess(std::vector &channels, int times = -1)

{

if (times > 0)

{

while (times--)

{

ctrlProcessOnce(channels);

}

}

else

{

while (true)

{

ctrlProcessOnce(channels);

}

}

}

void CleanUpChannel(std::vector &channels)

{

int num = channels.size() - 1;

while (num >= 0)

{

channels[num].CloseChannel();

channels[num--].Wait();

}

// for(auto& channel : channels)

// {

// channel.CloseChannel();

// }

// for(auto& channel : channels)

// {

// channel.Wait();

// }

}

// ./processpool 5

int main(int argc, char *argv[])

{

if (argc != 2)

{

std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;

return 1;

}

std::vector channels;

int num = std::stoi(argv[1]);

LoadTask();

// 1.創(chuàng)建信道和子進(jìn)程

CreateChannelAndSub(num, &channels, work);

// 2.通過(guò)channel控制子進(jìn)程

ctrlProcess(channels, num);

// 3.回收管道和子進(jìn)程 a.關(guān)閉所有的寫端 b.回收子進(jìn)程

CleanUpChannel(channels);

sleep(5);

return 0;

}

柚子快報(bào)邀請(qǐng)碼778899分享:運(yùn)維 【Linux】進(jìn)程池實(shí)例

http://yzkb.51969.com/

精彩內(nèi)容

評(píng)論可見(jiàn),查看隱藏內(nèi)容
大家都在看:

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19463067.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問(wèn)

文章目錄