网络通信第五课多线程异步服务器

场景
,,,,,,本例子支持多线程异步处理消息,针对每一个链接请求,创建线程处理稍后的指令,CSimpleSession: SessionThreadFunc是线程函数,async_read_some函数设置接收数据的回调函数ContinueRead,一般情况下,read_some函数未必能够完整的读取客户端发送的数据包,当然必须要指定明确的结束标志,双方必须规定好等接收完毕的时候,必须等待线程返回,因此在析构函数调用m_thread→加入函数,等线程函数正常返回之后,关闭连接,如果没有等待线程返回,就直接关闭连接,会导致async_read_some函数抛出异常,目前暂时没有什么头绪

服务。h

的ifndef QPIDPUSHMESSAGESERVICE_H
# define QPIDPUSHMESSAGESERVICE_H

# include & lt; iostream>
# include & lt; vector>
# include & lt; fstream>
# include & lt;提高/asio.hpp>
# include & lt;提高/线/thread.hpp>
# include & lt;提高/bind.hpp>
# include & lt;提高/shared_ptr.hpp>
# include & lt;增强/功能/function0.hpp>
# include & lt;提高/enable_shared_from_this.hpp>
# include & lt;提高/线/mutex.hpp>

名称空间qpid
{
类CSimpleSession:公众提高::enable_shared_from_this
{
公共:
CSimpleSession (boost:: asio: io_service, io_service): m_socket (io_service)
{
m_bRunning=true;
PrepareForNextRecv ();
}
~ CSimpleSession ()
{
m_bRunning=false;
m_thread→加入();
m_socket.close ();
}

空白StartThread ()
{
静态boost:: asio:: ip:: tcp:: no_delay选项(真正);
m_socket.set_option(选项);
m_thread。重置(新增加:线程(boost::绑定(及CSimpleSession:: SessionThreadFunc,))),
}

空白SessionThreadFunc ()
{
, (m_bRunning)
{
如果(m_bStartSetCallBackRead)
{
m_socket.async_read_some (boost:: asio:缓冲区(m_szRecvBuffer),
boost::绑定(及CSimpleSession:: ContinueRead, shared_from_this (),
boost:: asio::占位符::错误,
boost:: asio::占位符::bytes_transferred));
m_bStartSetCallBackRead=false;
}
boost:: this_thread:: sleep_for (boost::空间:毫秒(300)。
}
m_bRunning=false;
}

boost:: asio: ip: tcp:套接字,GetSocket ()
{
返回m_socket;
}

bool GetCurThreadRunningStatus ()
{
返回m_bRunning;
}

空白PrepareForNextRecv ()
{
memset (0 x00 m_szRecvBuffer, 10240年),
m_strMatch=啊啊?br/> m_bStartSetCallBackRead=true;
}私人:

空白ContinueRead (const提高::系统::error_code和错误,std:: size_t bytes_transferred)
{
如果(错误)
{
m_bRunning=false;
返回;
}

m_strMatch=,m_szRecvBuffer;
int nIndexOfContentLength=m_strMatch.find(“内容长度:“0);
int indexOfEnd=m_strMatch.find (“\ r \ n \ r \ n", 0),
如果nIndexOfContentLength ==1)
{
m_bRunning=false;
返回;
}
std:: cout & lt; & lt;m_strMatch & lt; & lt;std:: endl;
std:: string strContextLen=m_strMatch。substr (nIndexOfContentLength + 15, indexOfEnd nIndexOfContentLength - 15),
int nContextLen=atoi (strContextLen.c_str ());
如果nContextLen & lt;m_strMatch.length ())
{
//
处理m_bRunning=false;
返回;
}

m_socket.async_read_some (boost:: asio:缓冲区((m_szRecvBuffer)),
boost::绑定(及CSimpleSession:: ContinueRead, shared_from_this (),
boost:: asio::占位符::错误,
boost:: asio::占位符::bytes_transferred));
}

私人:boost:: asio::
ip: tcp:套接字m_socket;
char m_szRecvBuffer [10240],
std:: string m_strMatch;
bool m_bStartSetCallBackRead;
bool m_bRunning; boost:: shared_ptr
boost:: thread>m_thread;
}

typedef boost:: shared_ptrCPtrSession;

类CSimpleServer
{公共:

CSimpleServer (boost:: asio: io_service, io_service, boost:: asio:: ip:: tcp::端点和端点)
: m_ioService (io_service) m_acceptor (io_service、端点)
{
CPtrSession newSession(新CSimpleSession (io_service));
m_vecSession.push_back (newSession);
m_acceptor.async_accept (newSession→GetSocket (),
boost::绑定(及CSimpleServer:: HandleAccept,
,
newSession,
boost:: asio::占位符::错误));
}

空白HandleAccept (CPtrSession newSession, const提高::系统::error_code和错误)
{
如果(错误)返回;

//如果开始函数进行了阻塞,只有处理完当前的连接,才会进行下一步处理连接
newSession→StartThread ();
ClearHasEndConnection ();
CPtrSession createNewSession(新CSimpleSession (m_ioService));

//当前保存了会话连接,直到连接被释放,而不是由于

m_vecSession.push_back (createNewSession);
m_acceptor.async_accept (createNewSession→GetSocket (),

网络通信第五课多线程异步服务器