柚子快報邀請碼778899分享:網(wǎng)絡協(xié)議 UDP簡單聊天室創(chuàng)建
柚子快報邀請碼778899分享:網(wǎng)絡協(xié)議 UDP簡單聊天室創(chuàng)建
目錄
一.? ?服務端模塊實現(xiàn)
二.? ?處理聊天消息模塊實現(xiàn)
三.? ?調(diào)用服務端模塊實現(xiàn)
四.? ?客戶端模塊實現(xiàn)
五.? ?效果展示
本文介紹了如何用UDP創(chuàng)建一個簡單的聊天室。
一.? ?服務端模塊實現(xiàn)
服務端仍然沿用我們前面的思想(高內(nèi)聚低耦合),因此我們用一下上一篇UDP英譯漢網(wǎng)絡詞典的服務端實現(xiàn)(點此查看)。
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "Log.hpp"
#include"InetAddr.hpp"
#include"Dict.hpp"
using namespace std;
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
USAGE_ERROR
};
const static int defaultfd = -1;
using func_t=function
class UdpServer
{
public:
UdpServer(uint16_t port,func_t func)
: _sockfd(defaultfd), _port(port), _func(func)
,_isrunning(false)
{}
void InitServer()
{
// 1.創(chuàng)建udp socket 套接字...必須要做的
_sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "socket error,%s,%d\n", strerror(errno), errno);
exit(SOCKET_ERROR);
}
LOG(INFO, "socket create success,sockfd: %d\n", _sockfd);
// 2.1 填充sockaddr_in結(jié)構(gòu)
struct sockaddr_in local; // struct sockaddr_in 系統(tǒng)提供的數(shù)據(jù)類型,local是變量,用戶棧上開辟空間
bzero(&local, sizeof(local)); // 清空
local.sin_family = AF_INET;
local.sin_port = htons(_port); // port要經(jīng)過網(wǎng)絡傳輸給對面,即port先到網(wǎng)絡,所以要將_port,從主機序列轉(zhuǎn)化為網(wǎng)絡序列
local.sin_addr.s_addr=INADDR_ANY;//htonl(INADDR_ANY)
// 2.2 bind sockfd和網(wǎng)絡信息(IP(?)+Port)
int n = bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
if(n<0)
{
LOG(FATAL, "bind error,%s,%d\n", strerror(errno), errno);
exit(BIND_ERROR);
}
LOG(INFO, "socket bind success\n");
}
void Start()//所有的服務器,本質(zhì)解決的是輸入輸出的問題!不想讓網(wǎng)絡通信模塊和業(yè)務模塊進行強耦合
{}
~UdpServer()
{
}
private:
int _sockfd;
uint16_t _port; // 服務器所用的端口號
bool _isrunning;
//給服務器設定回調(diào),用來讓上層進行注冊業(yè)務的處理方法
func_t _func;
};
首先明確的是,初始化函數(shù)InitServer是不變的,我們再來看Start函數(shù),也是大差不差,只需改動一捏捏,我們也可以仿照以前的思路讓上層去實現(xiàn)這個聊天的功能,那么我們就知道了,這次的服務端也需要一個回調(diào)函數(shù),讓上層進行業(yè)務處理。我們稍作修改。
using handler_message_t=......
我們先定義出來處理業(yè)務的函數(shù)類型,參數(shù)部分留待下面解析。
那么我們的TcpServer類的類成員就變成了:
class UdpServer
{
private:
int _sockfd;
//string _ip;//不是必須的
uint16_t _port; // 服務器所用的端口號
bool _isrunning;
//給服務器設定回調(diào),用來讓上層進行注冊業(yè)務的處理方法
handler_message_t _handler_message;
};
?由此來編寫構(gòu)造函數(shù),以及Start函數(shù)就顯得水到渠成了。
const static int defaultfd = -1;
using handler_message_t=......
class UdpServer
{
public:
UdpServer(uint16_t port,handler_message_t handler_message)
: _sockfd(defaultfd), _port(port), _handler_message(handler_message)
,_isrunning(false)
{}
void Start()//所有的服務器,本質(zhì)解決的是輸入輸出的問題!不想讓網(wǎng)絡通信模塊和業(yè)務模塊進行強耦合
{
//一直運行,直到管理者不想運行了,服務器都是死循環(huán)
_isrunning=true;
while(true)
{
char message[1024];
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
//1.我們要讓server先收數(shù)據(jù)
ssize_t n=recvfrom(_sockfd,message,sizeof(message)-1,0,(struct sockaddr*)&peer,&len);
if(n>0)
{
message[n]=0;
InetAddr addr(peer);
LOG(DEBUG,"get message from [%s:%d]: %s\n",addr.Ip().c_str(),addr.Port(),message);
_handler_message(_sockfd,message,addr);
}
}
_isrunning=false;
}
~UdpServer()
{}
private:
int _sockfd;
//string _ip;//不是必須的
uint16_t _port; // 服務器所用的端口號
bool _isrunning;
//給服務器設定回調(diào),用來讓上層進行注冊業(yè)務的處理方法
handler_message_t _handler_message;
};
那好我們下面就具體來看看該如何處理業(yè)務,以補充服務端的處理方法。
二.? ?處理聊天消息模塊實現(xiàn)
大家不用猜也知道該怎么辦了吧。沒錯,仍然封裝成一個類。
來看看基本框架如何寫。
class MessageRoute
{
public:
MessageRoute()
{
pthread_mutex_init(&_mutex,nullptr);
}
~MessageRoute()
{
pthread_mutex_destroy(&_mutex);
}
private:
vector
pthread_mutex_t _mutex;
};
我們的成員有兩位,首先我們想想平時我的微信、QQ,聊天的話肯定不止一個人聊天,我不聊天但是別人的消息仍然能顯示到我的屏幕。所以定義一個vector結(jié)構(gòu)的數(shù)組用來裝聊天成員。再定義一個鎖來保護臨界資源,更加安全。
第一次看的朋友,可能不知道vector里面裝的InetAddr是什么,其實是我們封裝的一個類。
class InetAddr
{
private:
void GetAddress(string* ip,uint16_t* port)
{
*port=ntohs(_addr.sin_port);
*ip=inet_ntoa(_addr.sin_addr);
}
public:
InetAddr(const struct sockaddr_in &addr)
:_addr(addr)
{
GetAddress(&_ip,&_port);
}
string Ip()
{
return _ip;
}
bool operator==(const InetAddr& addr)
{
//if(_ip==addr._ip) 任何時刻只允許一個用戶
if(_ip==addr._ip && _port==addr._port)//方便測試
{
return true;
}
return false;
}
struct sockaddr_in Addr()
{
return _addr;
}
uint16_t Port()
{
return _port;
}
~InetAddr()
{}
private:
struct sockaddr_in _addr;
string _ip;
uint16_t _port;
};
這樣封裝更便于我們的操作。
當有新用戶進入聊天室進行聊天的時候,我們應該將其插入到用戶數(shù)組中,而當由用戶退出的時候,我們同樣應該及時的將其從數(shù)組中刪除。
bool IsExists(const InetAddr& addr)
{
for(auto a:_online_user)
{
if(a==addr) return true;
}
return false;
}
void AddUser(const InetAddr& addr)
{
LockGuard lockguard(&_mutex);
if(IsExists(addr)) return;
_online_user.push_back(addr);
}
void DelUser(const InetAddr& addr)
{
LockGuard lockguard(&_mutex);
for(auto iter=_online_user.begin();iter!=_online_user.end();iter++)
{
if(*iter==addr)
{
_online_user.erase(iter);
break;
}
}
}
這里出現(xiàn)了一個新東西----LockGuard,這是我們按照RAII(點此查看)的思路封裝的鎖。
#ifndef __lock_GUARD_HPP__
#define __lock_GUARD_HPP__
#include
#include
class LockGuard
{
public:
LockGuard(pthread_mutex_t* mutex)
:_mutex(mutex)
{
pthread_mutex_lock(_mutex);//構(gòu)造加鎖
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t* _mutex;
};
#endif
那么正式來說該如何處理消息呢?
void RouteHelper(int sockfd,string message,InetAddr who)
{
LockGuard lockguard(&_mutex);
//2.進行消息轉(zhuǎn)發(fā)
for(auto user:_online_user)
{
string send_message="\n["+who.Ip()+":"+to_string(who.Port())+"]#"+message+"\n";
struct sockaddr_in clientaddr=user.Addr();
::sendto(sockfd,send_message.c_str(),send_message.size(),0,(struct sockaddr*)&clientaddr,sizeof(clientaddr));
}
}
void Route(int sockfd,string message,InetAddr who)
{
//1.1 我們?nèi)蝿眨河脩羰状伟l(fā)消息,還要將自己,插入到在線用戶中
AddUser(who);
//1.2 如果客戶端要退出
if(message=="Q" || message=="QUIT")
{
DelUser(who);
}
//2.構(gòu)建任務對象,入隊列,讓線程池進行轉(zhuǎn)發(fā)
task_t t=bind(&MessageRoute::RouteHelper,this,sockfd,message,who);
ThreadPool
}
我們來說說邏輯,處理方法就是將發(fā)來的消息通過線程池進行轉(zhuǎn)發(fā)。
#pragma once
//單例模式的線程池
#include
#include
#include
#include
#include"Thread.hpp"
#include"Log.hpp"
#include"LockGuard.hpp"
using namespace std;
using namespace ThreadModule;
const static int gdefaultthreadnum=3;
template
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond,&_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeAll()
{
pthread_cond_broadcast(&_cond);
}
//私有的
ThreadPool(int threadnum=gdefaultthreadnum)
:_threadnum(threadnum)
,_waitnum(0)
,_isrunning(false)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
LOG(INFO,"ThreadPool Construct()");
}
void Start()
{
for(auto& thread:_threads)
{
thread.Start();
}
}
void HandlerTask(string name)//類的成員方法,也可以成為另一個類的回調(diào)方法,方便我們繼續(xù)類級別的互相調(diào)用
{
LOG(INFO,"%s is running\n",name.c_str());
while(true)
{
//1.保證隊列安全
LockQueue();
//2.隊列中不一定有數(shù)據(jù)
while(_task_queue.empty() && _isrunning)
{
_waitnum++;
ThreadSleep();
_waitnum--;
}
//2.1 如果線程池已經(jīng)退出了 && 任務隊列是空的
if(_task_queue.empty() && !_isrunning)
{
UnLockQueue();
break;
}
//2.2 如果線程池不退出 && 任務隊列不是空的
//2.3 如果線程池已經(jīng)退出 && 任務隊列不是空的 --- 處理完所有的任務,然后再退出
//3.一定有任務,處理任務
T t=_task_queue.front();
_task_queue.pop();
UnLockQueue();
LOG(DEBUG,"%s get a task",name.c_str());
//4.處理任務,這個任務屬于線程獨占的任務,所以不能放在加鎖和解鎖之間
t();
//LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString());
}
}
void InitThreadPool()
{
//指向構(gòu)建出所有的線程,并不自動
for(int num=0;num<_threadnum;num++)
{
string name="thread-"+to_string(num+1);
_threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);
LOG(INFO,"init thread %s done\n",name.c_str());
}
_isrunning=true;
}
//復制拷貝禁用
ThreadPool
ThreadPool(const ThreadPool
public:
static ThreadPool
{
//如果是多線程獲取線程池對象,下面的代碼就有問題,所以要加鎖
//雙判斷的方式,可以有效減少獲取單例的加鎖成本,而且保證線程安全
if(_instance==nullptr)//只有第一次會創(chuàng)建對象,后續(xù)都是獲取,這樣就不用每次都申請鎖
{//保證第二次之后,所有線程,不用再加鎖,直接返回_instance單例對象
LockGuard lockguard(&_lock);
if (_instance == nullptr)
{
_instance = new ThreadPool
_instance->InitThreadPool();
_instance->Start();
LOG(DEBUG, "創(chuàng)建線程池單例\n");
return _instance;
}
}
LOG(DEBUG, "獲取線程池單例\n");
return _instance;
}
bool Enqueue(const T& t)
{
bool ret=false;
LockQueue();
if(_isrunning)
{
_task_queue.push(t);
if(_waitnum>0)
{
ThreadWakeup();
}
LOG(DEBUG,"enqueue task success\n");
ret=true;
}
UnLockQueue();
return ret;
}
void Stop()
{
LockQueue();
_isrunning=false;
ThreadWakeAll();
UnLockQueue();
}
void Wait()
{
for(auto& thread:_threads)
{
thread.Join();
LOG(INFO,"%s is quit",thread.name().c_str());
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;
vector
queue
pthread_mutex_t _mutex;
pthread_cond_t _cond;
int _waitnum;
bool _isrunning;
//添加單例模式--懶漢
static ThreadPool
static pthread_mutex_t _lock;//保護單例的鎖
};
template
ThreadPool
template
pthread_mutex_t ThreadPool
我們可以知道,Route函數(shù)就是我們之前在服務器說的上層處理函數(shù)。那么handler_message_t類型的上層處理函數(shù)的參數(shù)就很明確了。
using handler_message_t=function
那么調(diào)用服務端的主函數(shù)如何寫就很明確了。
此處我們還封裝了原生線程庫,命名文件為Thread.hpp。
//封裝原生線程庫
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include
#include
#include
#include
#include
using namespace std;
namespace ThreadModule
{
using func_t=function
class Thread
{
public:
void Excute()
{
_func(_threadname);
}
public:
Thread(func_t func,const string& name="none-name")
:_func(func)
,_threadname(name)
,_stop(true)
{}
static void* threadroutine(void* args)//類成員函數(shù),形參是有this指針的!
{
Thread *self=static_cast
self->Excute();
return nullptr;
}
bool Start()
{
int n=pthread_create(&_tid,nullptr,threadroutine,this);
if(!n)
{
_stop=false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid,nullptr);
}
}
string name()
{
return _threadname;
}
void Stop()
{
_stop=true;
}
~Thread(){}
private:
pthread_t _tid;
string _threadname;
func_t _func;
bool _stop;
};
}
#endif
三.? ?調(diào)用服務端模塊實現(xiàn)
我們只需將服務端中處理業(yè)務函數(shù)初始化為處理業(yè)務模塊中的Route函數(shù),然后依次調(diào)用InitServer函數(shù)、Start函數(shù)即可。
#include
#include
#include"UdpServer.hpp"
#include"Log.hpp"
#include"MessageRoute.hpp"
using namespace std;
void Usage(string proc)
{
cout<<"Usage:\n\t"< } // ./udpserver ip int main(int argc,char *argv[]) { if(argc!=2) { Usage(argv[0]); exit(USAGE_ERROR); } EnableScreen(); //string ip=argv[1]; //定義消息轉(zhuǎn)發(fā)模塊 MessageRoute route; //網(wǎng)絡模塊 uint16_t port=stoi(argv[1]); unique_ptr bind(&MessageRoute::Route,&route,placeholders::_1,\ placeholders::_2,placeholders::_3));//C++14 usvr->InitServer(); usvr->Start(); return 0; } MessageRoute.hpp文件即我們的處理聊天消息模塊。 ?四.? ?客戶端模塊實現(xiàn) 此處雖說大體還是發(fā)送消息,并接收服務器發(fā)送回來的消息。 但是與眾不同的是:此處發(fā)送消息和接收服務器發(fā)送回來的消息應該是兩個不同的線程。因為要做到不發(fā)消息的時候還是能接收到消息。 #include #include #include #include #include #include #include #include #include #include"Thread.hpp" #include"Comm.hpp" using namespace std; using namespace ThreadModule; void recvmessage(int sockfd,string name) { //version 1 int fd=OpenDev("/dev/pts/0",O_WRONLY); while(true) { struct sockaddr_in peer; socklen_t len=sizeof(peer); char buffer[1024]; ssize_t n=recvfrom(sockfd,buffer,sizeof(buffer)-1,0,(struct sockaddr*)&peer,&len); if(n>0) { buffer[n]=0; write(fd,buffer,strlen(buffer)); } } //version 2 // while(true) // { // struct sockaddr_in peer; // socklen_t len=sizeof(peer); // char buffer[1024]; // ssize_t n=recvfrom(sockfd,buffer,sizeof(buffer)-1,0,(struct sockaddr*)&peer,&len); // if(n>0) // { // buffer[n]=0; // fprintf(stderr,"%s | %s\n",name.c_str(),buffer); // //此時運行指令變?yōu)?/udpclient + ip + port + 2>/dev/pts/2 // } // } } void sendmessage(int sockfd,struct sockaddr_in& server,string name) { string message; while(true) { printf("%s | Enter# ",name.c_str()); fflush(stdout); getline(cin,message); sendto(sockfd,message.c_str(),message.size(),0,(struct sockaddr*)&server,sizeof(server)); } } void Usage(string proc) { cout<<"Usage:\n\t"< } int InitClient(string& serverip,uint16_t serverport,struct sockaddr_in *server) { //1.創(chuàng)建socket int sockfd=socket(AF_INET,SOCK_DGRAM,0); if(sockfd<0) { cerr<<"socket error"< return -1; } //2.client一定要bind,client也有自己的ip和port,但是不建議顯示(和server一樣用bind函數(shù))bind //a.那如何bind呢?當udp client首次發(fā)送數(shù)據(jù)的時候,os會自動隨機的給client進行bind--為什么?要bind,必然要和port關(guān)聯(lián)!防止client port沖突 //b.什么時候bind?首次發(fā)送數(shù)據(jù)的時候 //構(gòu)建目標主機的socket信息 memset(server,0,sizeof(struct sockaddr_in)); server->sin_family=AF_INET; server->sin_port=htons(serverport); server->sin_addr.s_addr=inet_addr(serverip.c_str()); return sockfd; } // ./udpclient serverip serverport int main(int argc,char *argv[]) { if(argc!=3) { Usage(argv[0]); exit(1); } string serverip=argv[1]; uint16_t serverport=stoi(argv[2]); struct sockaddr_in serveraddr; int sockfd=InitClient(serverip,serverport,&serveraddr); if(sockfd==-1) return 1; func_t r=bind(&recvmessage,sockfd,placeholders::_1); func_t s=bind(&sendmessage,sockfd,serveraddr,placeholders::_1); //創(chuàng)建兩個線程,分別用來接收消息和發(fā)消息,使其兩個互不受影響 Thread Recver(r,"recver");//recver在前面,還是sender在前面,都行 Thread Sender(s,"sender"); Sender.Start(); Recver.Start(); Recver.Join(); Sender.Join(); return 0; } 同樣用的是自己封裝的線程。 值得注意的是這里接收消息模塊有兩個版本。此處的終端文件(/dev/pts)可以根據(jù)自己實際情況修改。 五.? ?效果展示 分別來看看兩個版本都是怎么樣的吧。 version 1: ?version 2: 總結(jié): 好了,到這里今天的知識就講完了,大家有錯誤一點要在評論指出,我怕我一人擱這瞎bb,沒人告訴我錯誤就寄了。 祝大家越來越好,不用關(guān)注我(瘋狂暗示) 柚子快報邀請碼778899分享:網(wǎng)絡協(xié)議 UDP簡單聊天室創(chuàng)建 相關(guān)閱讀
本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。