主要问题: 客户端 与服务器端进行通信(传送一个字符串)当服务器端检测到epollin事件到来时,创建一个任务,加入到任务队列,相应的线程去处理任务。问题来了,为什么我的客户端与服务器只进行一次字符串的传送,服务器端会添加多个任务(300左右个)正常的话,应该是一个任务,所有的线程去抢这个任务,谁拿到任务了谁去执行,然后剩下的线程进行休眠。。。。。本人学生, 才接触这一块,希望大家帮帮忙。。。。
客户端
# include <iostream>
# include <algorithm>
# include <stdio.h>
# include <string>
# include <cstring>
# include <cstdlib>
# include <unistd.h>
# include <sys/types.h>
# include <sys/socket.h>
# include <netinet/in.h>
# include <arpa/inet.h>
using namespace std ;
# define ERR_EXIT(err) \
do \
{ \
perror(err) ; \
exit(EXIT_FAILURE) ; \
\
}while(0) \
int main ()
{
int res = 0 ;
char buf[1024] ;
memset(buf, 0, sizeof(buf)) ;
int sockfd = socket(PF_INET, SOCK_STREAM, 0) ;
if(sockfd < 0)
ERR_EXIT("socket err") ;
struct sockaddr_in cltaddr ;
memset(&cltaddr, 0, sizeof(cltaddr)) ;
cltaddr.sin_family = AF_INET ;
cltaddr.sin_port = htons(8001) ;
cltaddr.sin_addr.s_addr = inet_addr("127.0.0.1") ;;
res = connect(sockfd, (struct sockaddr *)&cltaddr, sizeof(cltaddr)) ;
if(res < 0)
ERR_EXIT("connect err") ;
cin >> buf ;
cout << buf << endl ;
while(strcmp("exit", buf) != 0)
{
write(sockfd, buf, sizeof(buf)) ;
memset(buf, '\0', sizeof(buf)) ;
cin >> buf ;
}
close(sockfd) ;
return 0 ;
}
线程池
#ifndef _CTHREADPOLL_H_
#define _CTHREADPOLL_H_
# include <unistd.h>
# include <stdlib.h>
# include <fcntl.h>
# include <arpa/inet.h>
# include <sys/stat.h>
# include <signal.h>
# include <sys/types.h>
# include <iostream>
# include <sys/socket.h>
# include <sys/socket.h>
# include <iomanip>
# include <string.h>
# include <string>
# include <errno.h>
# include <vector>
# include <sys/wait.h>
# include <pthread.h>
# include <sys/epoll.h>
# include <algorithm>
using namespace std ;
class CTask
{
public:
CTask(){} ; // 无参构造函数
CTask(string taskName) // 有参构造函数
{
this->m_strTaskName = taskName ;
this->m_ptrData = NULL ;
}
public:
virtual int PerformTask() = 0 ; // 执行任务的接口
void SetData(void *data) // 设置数据
{
this->m_ptrData = data ;
}
protected:
void *m_ptrData ;
string m_strTaskName ;
} ;
class CThreadPoll
{
public:
CThreadPoll(int threadNum = 10) ; // 有默认值的构造函数
public:
int AddTack(CTask *task) ; // 添加任务到任务队列中
int getTaskSize() ; // 获取当前任务队列的数量
int StopAll() ; // 使线程池中的线程退出
protected:
int CreatepThread() ; // 创建线程池中的线程
static void *ThreadCallBack (void * arg) ; // 线程回调函数
private:
pthread_t *m_Thread_id ; // 线程id
int m_iThreadNum ; // 线程池中的线程数量
static bool shutdown ; // 线程退出标志
static pthread_mutex_t m_pthreadMutex ; // poxsi线程锁
static pthread_cond_t m_pthreadCond ; // 条件等待变量
static vector<CTask *> m_vecTaskList ; // 任务列表
} ;
#endif
# include "CThreadpool.h"
/************************************************************************/
bool CThreadPoll::shutdown = false ; // 线程退出标志
vector<CTask *> CThreadPoll::m_vecTaskList ; // 任务列表
pthread_mutex_t CThreadPoll::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER ; // poxsi线程锁
pthread_cond_t CThreadPoll::m_pthreadCond = PTHREAD_COND_INITIALIZER ; // 条件等待变量
int CThreadPoll::CreatepThread()
{
m_Thread_id = new pthread_t[m_iThreadNum] ;
for(int i = 0; i < m_iThreadNum; i ++)
{
pthread_create(&m_Thread_id[i], NULL, ThreadCallBack, NULL);
}
return 0 ;
}
CThreadPoll::CThreadPoll(int threadNum) // 有默认值的构造函数
{
this->m_iThreadNum = (pthread_t)threadNum ;
CreatepThread() ;
}
void *CThreadPoll::ThreadCallBack (void *arg)
{
pthread_t tid = pthread_self() ;
while(1)
{
pthread_mutex_lock(&m_pthreadMutex) ;
cout << "tid is : " << tid << endl ;
cout << "size is :" << m_vecTaskList.size() << endl ;
while(m_vecTaskList.size() == 0 && !shutdown)
{
pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex) ;
}
cout << "tid is1111 : " << tid << endl ;
cout << "size is11111 :" << m_vecTaskList.size() << endl ;
if(shutdown)
{
pthread_mutex_unlock(&m_pthreadMutex) ;
pthread_exit(NULL) ;
}
vector<CTask *>::iterator iter = m_vecTaskList.begin() ;
CTask *temp = NULL ;
if(iter != m_vecTaskList.end())
{
temp = *iter ;
cout << "delete ..." << endl ;
m_vecTaskList.erase(iter) ;
}
temp->PerformTask() ;
pthread_mutex_unlock(&m_pthreadMutex) ;
// sleep(2) ;
}
return NULL ;
}
int CThreadPoll::AddTack(CTask *task)
{
pthread_mutex_lock(&m_pthreadMutex) ;
this->m_vecTaskList.push_back(task) ;
pthread_mutex_unlock(&m_pthreadMutex) ;
pthread_cond_signal(&m_pthreadCond) ;
// pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex) ;
return 0 ;
}
int CThreadPoll::getTaskSize()
{
return this->m_vecTaskList.size() ;
}
int CThreadPoll::StopAll()
{
if(shutdown)
{
return -1 ;
}
// 把退出标记设置为true,唤醒所有等待的线程让它们退出
shutdown = true ;
pthread_cond_broadcast(&m_pthreadCond) ;
// 等待所有线程退出, 不然就成了僵尸线程了
for(int i = 0; i < m_iThreadNum; i ++)
{
pthread_join(m_Thread_id[i], NULL) ;
}
//释放掉tid所占用的内存空间
delete [] m_Thread_id ;
//销毁互斥锁,和条件变量
pthread_cond_destroy(&m_pthreadCond) ;
pthread_mutex_destroy(&m_pthreadMutex) ;
return 0 ;
}
测试
# include "CThreadpool.cpp"
# include "mysqlbak.cpp"
# include <unistd.h>
# include <stdlib.h>
# include <fcntl.h>
# include <arpa/inet.h>
# include <sys/stat.h>
# include <signal.h>
# include <sys/types.h>
# include <iostream>
# include <sys/socket.h>
# include <sys/socket.h>
# include <iomanip>
# include <string.h>
# include <string>
# include <errno.h>
# include <vector>
# include <sys/wait.h>
# include <pthread.h>
# include <sys/epoll.h>
# include <algorithm>
using namespace std ;
typedef vector<struct epoll_event> EventList ;
/*typedef struct _Packet
{
int len ;
char buffsize[1024] ;
} PACKET ;*/
struct INFO
{
int uid ;
char name[20] ;
char sex[10] ;
unsigned int age ;
unsigned int score ;
} ;
class WPTask : public CTask
{
public:
WPTask(){} ;
public:
int PerformTask()
{
int ret = 0 ;
char str[1024] ;
struct INFO person ;
int connfd = this->m_Connfd ;
ret = read(connfd, str, sizeof(str)) ;
if(ret == 0)
{
cout << "client close." << endl ;
epoll_ctl(epollfd, EPOLL_CTL_DEL, connfd, &event) ;
return -1 ;
}
cout << str << endl ;
// cout << person.uid << " " << person.name << " " << person.sex <<
// " " << person.age << " " << person.score << endl ;
/* ret = query_db(this->m_Mysql) ;
if(ret == 0)
{
cout << "query err:" << endl ;
} */
return 0 ;
}
void SetConnectFd(int connfd)
{
this->m_Connfd = connfd ;
}
void SetDatabaseFd(MYSQL mysql)
{
this->m_Mysql = mysql ;
}
protected:
MYSQL m_Mysql ;
int m_Connfd ;
public:
int epollfd ;
struct epoll_event event ;
} ;
int main ()
{
MYSQL mysql ;
int res = login_db(mysql) ;
if(res == -1)
cout << "login_db err" << endl ;
signal(SIGPIPE, SIG_IGN) ;
signal(SIGCHLD, SIG_IGN) ;
struct sockaddr_in srvaddr ;
memset(&srvaddr, 0, sizeof(srvaddr)) ;
srvaddr.sin_port = ntohs(8001) ;
srvaddr.sin_family = AF_INET ;
srvaddr.sin_addr.s_addr = inet_addr("127.0.0.1") ;
int idlefd = open("dev/null", O_RDONLY | O_CLOEXEC) ;
int listenfd = socket(PF_INET, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP) ;
if(listenfd == -1)
cout << "socket failure." << endl ;
int on = 1 ;
if(setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1)
cout << "setsockopt failure." << endl ;
if(bind(listenfd, (struct sockaddr *)&srvaddr, sizeof(srvaddr)) == -1)
cout << "bind failure." << endl ;
if(listen(listenfd, SOMAXCONN) == -1)
cout << "listen failure." << endl ;
CThreadPoll pthread(10) ;
int nready, connfd ;
int nontimeout = -1 ;
EventList events(16) ;
struct epoll_event event ;
event.events = EPOLLIN ;
event.data.fd = listenfd ;
int epollfd = epoll_create1(EPOLL_CLOEXEC) ;
epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event) ;
struct sockaddr_in peeraddr ;
socklen_t peerlen = sizeof(peerlen) ;
while(1)
{
nready = epoll_wait(epollfd, &*events.begin(), static_cast<int>(events.size()), -1) ;
if(nready == -1)
{
if(errno == EINTR)
{
continue ;
}
return -1 ;
}
if(nready == 0)
{
continue ;
}
if((size_t)nready == events.size())
{
events.resize(events.size() * 2) ;
}
for(int i = 0; i < nready; i ++)
{
if(events[i].data.fd == listenfd)
{
if((connfd = accept4(listenfd, (struct sockaddr *)&peeraddr, &peerlen, SOCK_NONBLOCK | SOCK_CLOEXEC)) == -1)
{
if(errno == EMFILE)
{
close(idlefd) ;
idlefd = accept(listenfd, NULL, NULL) ;
close(idlefd) ;
idlefd = open("dev/null", O_RDONLY | O_CLOEXEC) ;
continue ;
}
return -1 ;
}
cout << "ip:" << inet_ntoa(peeraddr.sin_addr) << endl ;
cout << "port:" << ntohl(peeraddr.sin_port) << endl ;
event.events = EPOLLIN ;
event.data.fd = connfd ;
epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &event) ;
}
else if(events[i].events & EPOLLIN)
{
connfd = events[i].data.fd ;
if(connfd == 0)
continue ;
WPTask *task = new WPTask ;
task->epollfd = epollfd ;
task->event = events[i] ;
task->SetDatabaseFd(mysql) ;
task->SetConnectFd(connfd) ;
cout << "renwu1" << endl ;
// sleep(3) ;
pthread.AddTack(task) ;
}
}
}
return 0 ;
}
附上图片:
此图片为客户端与服务器进行一次通信,服务器端建立了N个任务。。。。无语了。。。
如有能很好解决的人 麻烦您有时间的时候加下我的qq:712102032,本人不总玩csdn如能解决,必有重谢。
改成ET貌似可以了,但是。。。我的本意是LT, ET与LT的区别在哪里呢?