本人小白,刚刚开始研究ACE,看到网上很多Proactor的基本服务器,但这些服务器代码都是Echo服务器,怎么做到服务器能够将某一些消息群发到所有客户端和某一个客户端中?
以下是代码
#include <iostream>
#include <ace/ACE.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/Asynch_Acceptor.h>
#include <ace/Asynch_IO.h>
#include <ace/Proactor.h>
#include <ace/SOCK_Stream.h>
#include <ace/Event_Handler.h>
#include <ace/Log_Priority.h>
#include <ace/Log_Msg.h>
#include <ace/Time_Value.h>
#include <ace/Reactor.h>
#include <ace/Select_Reactor.h>
#include <ace/Message_Block.h>
#include <ace/Message_Queue.h>
#include <ace/OS.h>
#include <iostream>
ACE_Time_Value time_out(2);
class HA_Proactive_Service : public ACE_Service_Handler
{
public:
~HA_Proactive_Service()
{
if (this->handle() != ACE_INVALID_HANDLE)
{
ACE_OS::closesocket(this->handle());
}
}
virtual void open(ACE_HANDLE h, ACE_Message_Block &message_block)
{
this->handle(h);
if (this->m_reader.open(*this) != 0)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("HA_Proactive_Service open")));
delete this;
return;
}
if (this->m_writer.open(*this) != 0)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"), ACE_TEXT("HA_Proactive_Service open")));
delete this;
return;
}
if (message_block.length() > 0)
{
ACE_Message_Block *mb = message_block.duplicate();
ACE_Asynch_Read_Stream_Result_Impl *result = NULL;
result = ACE_Proactor::instance()->create_asynch_read_stream_result(this->proxy(), this->handle_, *mb, mb->length(), 0);
mb->wr_ptr(mb->wr_ptr() - message_block.length());
result->complete(message_block.length(), 1, 0);
delete result;
}
else
{
ACE_Message_Block *mb = 0;
ACE_NEW_NORETURN(mb, ACE_Message_Block(1024));
if (m_reader.read(*mb, mb->space() - 1) == -1)
{
delete mb;
ACE_ERROR((LM_ERROR, "%N:&l:open init read failed!"));
return;
}
}
return;
}
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block();
if (!result.success() || result.bytes_transferred() == 0)
{
mb.release();
delete this;
return;
}
mb.copy("");
ACE_OS::printf("rev:\t%s\n", mb.rd_ptr());
mb.release();
ACE_Message_Block *nmb = new ACE_Message_Block(buffer, 1024);
if (this->m_reader.read(*nmb, nmb->space()) != 0)
{
return;
}
ACE_Message_Block *mbl = NULL;
ACE_NEW_NORETURN(mbl, ACE_Message_Block(256));
char buf[] = "服务器收到数据\n";
memcpy(mbl->wr_ptr(), buf, sizeof(buf));
mbl->wr_ptr(sizeof(buf));
m_writer.write(*mbl, mbl->length());
}
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block();
mb.rd_ptr(mb.rd_ptr() - result.bytes_to_write());
ACE_DEBUG((LM_DEBUG, "发送的消息是: %s\n", mb.rd_ptr()));
mb.release();
return;
}
private:
ACE_Asynch_Read_Stream m_reader;
ACE_Asynch_Write_Stream m_writer;
char buffer[1024];
};
int main(int argc, char *argv[])
{
int port = 2452;
ACE_Asynch_Acceptor<HA_Proactive_Service> acceptor;
if (acceptor.open(ACE_INET_Addr(port, "127.0.0.2")) == -1)
{
return -1;
}
ACE_Proactor::instance()->proactor_run_event_loop();
return 0;
}