////// @file Worker.h/// @brief 用户接口类/// @author guozhiming/// @date 2007-05-16///#ifndef __WORKER__#define __WORKER__#include "ThreadPool.h"/// @brief 抽象类class G_Worker{    public:        /// @brief 构造函数        G_Worker(unsigned int num);        /// @brief 析构函数        ~G_Worker();        /// @brief 服务器帮定端口        ///        /// @param nPort 帮定端口        ///        /// @return true表示成功 , false表示失败        bool Bind(unsigned int nPort);        /// @brief 存虚函数子类继承并实现逻辑        ///        /// @param pStr 客户端发送的字符串        virtual void recvMessage(void *pStr , int nSocket) = 0;        /// @brief 发送数据到客户端        ///        /// @param pStr 数据        /// @param nSocket 发送到客户端的套接字        //        /// @return        int sendMessage(int nSocket , const void *pStr);    protected:                                                private:        G_ThreadPool *g_threadPool;};#endif

#include "Worker.h"#include "Log.h"G_Worker::G_Worker(unsigned int num){    g_threadPool = new G_ThreadPool(num , this);  //开num个线程}G_Worker::~G_Worker(){    if(g_threadPool)    {        delete g_threadPool;        g_threadPool = NULL;    }}bool G_Worker::Bind(unsigned int nPort){    return g_threadPool->Bind(nPort);//整个线程池,绑定一个端口。。}int G_Worker::sendMessage(int nSocket , const void *pStr){    return g_threadPool->sendMessage(nSocket , pStr);//这是tcp还是udp发送呢。}

////// @file ThreadPool.h/// @brief 线程池的实现 , 是个管理线程 , 负责调用每个线程之间的互相调用的关系/// @author guozhiming/// @date 2007-05-16///#ifndef __G_THREADPOOL__#define __G_THREADPOOL__#include "ListenThread.h"#include "SendMessThread.h"#include "Queue.h"#include "RecvMessThread.h"#include "Worker.h"class G_ListenThread;class G_SendMessThread;class G_RecvMessThread;class G_Worker;class G_ThreadPool : public G_Thread{    public:        /// @brief 构造函数        G_ThreadPool(unsigned int num , G_Worker *g_work);        /// @brief 析构函数        ~G_ThreadPool();        /// @brief 服务器帮定端口        ///        /// @param nPort 帮定端口        ///        /// @return true表示成功 , false表示失败        bool Bind(unsigned int nPort);        /// @brief 主线程        void Run();        /// @brief 填加socket到队列中        ///        /// @param nSocket 套接口        ///        /// @return true 表示成功 , false 表示失败        bool pushSocket(unsigned int nSocket);        /// @brief 从队列中取套接字        ///        /// @param nSocket 取出的套接字存放在nSocket中        ///        /// @return true 表示成功 , false 表示失败        bool popSocket(int &nSocket);        /// @brief 从G_Data->G_RecvMessThread->G_ThreadPool->G_Worker 回掉        ///        /// @param pStr 客户发的字符串        /// @param nSocket 接受客户连接的套接字        void recvMessage(void *pStr , int nSocket);        /// @brief 发送数据 从testPool->G_Worker->G_ThreadPool->G_SendMessThread->G_Data        ///        /// @param pStr 数据        /// @param nSocket 套接口        /// @return        //        int sendMessage(int nSocket , const void *pStr);    private:        G_Worker *g_worker;        /// @brief 监听线程        G_ListenThread *g_listenThread;        /// @brief 发送消息线程        G_SendMessThread *g_sendMessThread;        /// @brief 存放socket队列        G_Queue
g_sockQueue; /// @brief 存放空闲工作线程队列 G_Queue
g_idleRecvMessThreadQueue; /// @brief 存放忙碌工作线程队列 G_Queue
g_busyRecvMessThreadQueue; /// @brief 每个RecvMessThread线程中最大用户数 static const int maxCounter = 2000; /// @brief 如果线程不够用新增加的线程 static const int addTaskThread = 2;};#endif

#include "ThreadPool.h"#include "Log.h"G_ThreadPool::G_ThreadPool(unsigned int num , G_Worker *g_work) : g_worker(g_work){    g_listenThread = new G_ListenThread(this);    g_listenThread->Start();  ///启动监听线程    g_sendMessThread = new G_SendMessThread();    g_sendMessThread->Start();   ///发送消息线程    for(int i=0; i
Start(); } Start(); ///线程池自己启动}G_ThreadPool::~G_ThreadPool(){ if(g_listenThread) { delete g_listenThread; g_listenThread = NULL; } if(g_sendMessThread) { delete g_sendMessThread; g_sendMessThread = NULL; } g_sockQueue.clear(); g_idleRecvMessThreadQueue.clear(); g_busyRecvMessThreadQueue.clear();}bool G_ThreadPool::Bind(unsigned int nPort){ return g_listenThread->Bind(nPort);}int G_ThreadPool::sendMessage(int nSocket , const void *pStr){ g_sendMessThread->sendMessage(nSocket , pStr);}void G_ThreadPool::Run(){ int nSocket; G_RecvMessThread *g_recvMessThread; while(1) { pause(); ///等待ListenThread 发信号,这种事情,用epoll的epoll_wait就好了嘛 while(popSocket(nSocket)) ///必须把存放socket队列中的套接口全部取出 { g_sendMessThread->addEpoll(nSocket);//这里也是用epoll的啊 while(1)//各种循环 { ///从空闲队列中获得对首TaskThread if(g_idleRecvMessThreadQueue.getFront(g_recvMessThread)) { ///如果TaskThread线程中客户大于maxCounter , 从空闲队列中pop并放到忙碌队列中 if(g_recvMessThread->getCounter() >= maxCounter) //接受线程里面,不止一个mess?还要count? { if(g_idleRecvMessThreadQueue.pop(g_recvMessThread)) { g_busyRecvMessThreadQueue.push(g_recvMessThread); } else { ///表示空闲队列中再没有TaskThread可以用 , 创建addTaskThread个TaskThread线程 , 并且把busy队列中的TaskThread放到idle队列中这样可以防止busy队列中的用户数减少但是他还在busy队列中 for(int i=0; i
Start(); } while(g_busyRecvMessThreadQueue.pop(g_recvMessThread))//感觉这么麻烦。。。 { g_idleRecvMessThreadQueue.push(g_recvMessThread); } } } else { /// 填加到TaskThread 线程中 g_recvMessThread->addSocket(nSocket); g_recvMessThread->continues(); /// 唤醒TaskThread 线程 break; } } else { /// 空闲队列中没有任何线程 , 应该没有这种情况 debug_output("idleRecvMessThreadQueue is not g_recvMessThread\n"); } } } }}bool G_ThreadPool::pushSocket(unsigned int nSocket){ return g_sockQueue.push(nSocket);}bool G_ThreadPool::popSocket(int &nSocket){ return g_sockQueue.pop(nSocket);}void G_ThreadPool::recvMessage(void *pStr , int nSocket){ g_worker->recvMessage(pStr , nSocket);}

////// @file TaskThread.h/// @brief 任务类 , 接受client发的消息进行处理/// @author guozhiming/// @date 2007-05-17///#ifndef __TASKTHREAD__#define __TASKTHREAD__#include "def.h"#include "Thread.h"#include "ThreadPool.h"#include "Queue.h"#include "Data.h"class G_ThreadPool;class G_Data;class G_RecvMessThread : public G_Thread{    public:        /// @brief 构造函数        G_RecvMessThread(G_ThreadPool *pool);        /// @brief 析构函数        ~G_RecvMessThread();        /// @brief 主线程运行        void Run();        /// @brief 填加套接字        ///        /// @param nSocket 套接字        void addSocket(int nSocket);        /// @brief 获得连接的客户端数目        ///        /// @return 数目        unsigned int getCounter();        /// @brief      往队列中存放数据 ,,哪个队列?下面定义了一个queue        ///        /// @param pStr  数据        ///        /// @return true 成功 , false 失败        bool pushData(std::string pStr);    private:                        /// @brief 设置套接口非阻塞模式        ///        /// @param sockfd 套接口        ///        /// @return true 成功 , false 失败        bool setNonBlock(int sockfd);        /// @brief epoll_create 返回文件描述符        int epfd;        struct epoll_event events[100];   //才100个。。封装了epoll啊。在recv里封了,难道send里,也封了一个        /// @brief 记录接受客户端数目        unsigned int counter;        /// @brief 线程池对象        G_ThreadPool *g_threadPool;        /// @brief 存放数据的队列        G_Queue
g_dataBufferQueue; G_Data *g_data;};#endif

#include "RecvMessThread.h"#include "Log.h"G_RecvMessThread::G_RecvMessThread(G_ThreadPool *pool) : g_threadPool(pool){    counter = 0;    epfd = epoll_create(256); //最多同时监视256个    g_data = new G_Data(this); //这个数据结构要看看,有啥稀奇}G_RecvMessThread::~G_RecvMessThread(){    close(epfd);}unsigned int G_RecvMessThread::getCounter(){    return counter;}bool G_RecvMessThread::setNonBlock(int sockfd){    int opts = fcntl(sockfd , F_GETFL);    if(-1 == opts)    {        debug_output("%s\n" , "fcntl F_GETFL is faild");        return false;    }    opts = opts | O_NONBLOCK;    if(fcntl(sockfd , F_SETFL , opts) < 0)    {        debug_output("%s\n" , "fcntl F_SETFL is faild");        return false;    }    return true;}void G_RecvMessThread::addSocket(int nSocket){    struct epoll_event ev;    bzero(&ev , sizeof(ev));    setNonBlock(nSocket);    ev.data.fd = nSocket;    ev.events = EPOLLIN | EPOLLET;    epoll_ctl(epfd , EPOLL_CTL_ADD , nSocket , &ev);    counter++;}bool G_RecvMessThread::pushData(std::string pStr){    return g_dataBufferQueue.push(pStr);}void G_RecvMessThread::Run(){    pause();    /// 暂停线程  //都用这招。。。    int nfds , sock;    struct epoll_event ev;    bool nRet;    char line[1024]; //一次最多1024个。。感觉这个模型蛮奇怪。。    while(1)    {        nfds = epoll_wait(epfd,events,100,50);        for(int i=0; i
< 0) continue; if(!(nRet=g_data->recvData(sock)))//这是最底层的收数据的啊 { debug_output("client is quit\n"); ev.data.fd= sock; epoll_ctl(epfd , EPOLL_CTL_DEL , sock , &ev);//收到了直接关掉了 close(sock); events[i].data.fd = -1; counter --; } else { std::string pBuffer; while(g_dataBufferQueue.size()) { g_dataBufferQueue.pop(pBuffer); g_threadPool->recvMessage((void*)pBuffer.c_str() , sock); //这又是一个收数据的。 } } usleep(100);//还要休眠。。这么坑爹。。。 } } }}