#pragma once
#include <stdio.h>
#include <tchar.h>
#include <iostream>
using namespace std;
class CCoreByIOCP;
class LanThreadPool;
#include "common.h"
#include "Mapper.h"
#include "CoreByIOCP.h"
#include "LanWorkerThread.h"
#pragma once
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <WinSock2.h>
#include<memory.h>
#include<windows.h>
#include <MSWSock.h>
#include <MSTcpIP.h>
#include <list>
#pragma comment(lib, "ws2_32.lib")
#define PORT 10240
#define SRV_IP "127.0.0.1"
#define MAX_BUFFER_LEN 1024
#define WORKER_THREADS_PER_PROCESSOR 2
#define MAX_POST_ACCEPT 10
#define EXIT_CODE NULL
#define RELEASE_MEMORY(x) { if(x != NULL) { delete x; x = NULL;} }
#define RELEASE_HANDLE(x) { if(x != NULL && x != INVALID_HANDLE_VALUE) { CloseHandle(x); x = NULL; } }
#define RELEASE_SOCKET(x) { if(x !=INVALID_SOCKET) { closesocket(x); x = INVALID_SOCKET; } }
#define NC_CLIENT_CONNECT 0x0001
#define NC_CLIENT_DISCONNECT 0x0002
#define NC_TRANSMIT 0x0003
#define NC_RECEIVE 0x0004
#define NC_RECEIVE_COMPLETE 0x0005
typedef enum EM_IOType
{
EM_IOAccept,
EM_IOSend,
EM_IORecv,
EM_IOIdle
}IOType;
struct PER_IO_CONTEXT
{
OVERLAPPED m_overLapped;
IOType m_IOType;
WSABUF m_wsaBuf;
SOCKET m_rourceSock;
SOCKADDR_IN m_resourceAddr;
char m_szBuf[MAX_BUFFER_LEN];
char m_szBufCache[MAX_BUFFER_LEN];
DWORD m_dwBytesSend;
DWORD m_dwBytesRecv;
UINT32 m_desID;
void Init()
{
ZeroMemory(&m_overLapped, sizeof(OVERLAPPED));
ZeroMemory(m_szBuf, MAX_BUFFER_LEN);
ZeroMemory(m_szBufCache, MAX_BUFFER_LEN);
ZeroMemory(&m_resourceAddr, sizeof(SOCKADDR_IN));
ZeroMemory(&m_resourceAddr.sin_zero, 8);
m_IOType = EM_IOIdle;
m_rourceSock = INVALID_SOCKET;
m_wsaBuf.buf = m_szBuf;
m_wsaBuf.len = MAX_BUFFER_LEN;
m_dwBytesSend = 0;
m_dwBytesRecv = 0;
m_desID = 0;
}
void Reset()
{
ZeroMemory(&m_overLapped, sizeof(OVERLAPPED));
ZeroMemory(m_szBuf, MAX_BUFFER_LEN);
ZeroMemory(m_szBufCache, MAX_BUFFER_LEN);
ZeroMemory(&m_resourceAddr, sizeof(SOCKADDR_IN));
ZeroMemory(&m_resourceAddr.sin_zero, 8);
m_IOType = EM_IOIdle;
m_rourceSock = INVALID_SOCKET;
m_wsaBuf.buf = m_szBuf;
m_wsaBuf.len = MAX_BUFFER_LEN;
m_dwBytesSend = 0;
m_dwBytesRecv = 0;
m_desID = 0;
}
};
const int nMsgSize = sizeof(char) * sizeof(PER_IO_CONTEXT);
struct IOCP_PARAM
{
SOCKET m_rourceSock;
};
const int nMessageBufMaxSize = MAX_BUFFER_LEN - 2 * sizeof(UINT32);
typedef struct ST_SendToIpInfo
{
UINT32 nSelfID;
UINT32 nSendToID;
char buf[nMessageBufMaxSize];
}SendToIpInfo;
#pragma once
#ifndef __IO_MAPPER__
#define __IO_MAPPER__
#define net_msg
class __declspec(novtable) CIOMessageMap
{
public:
virtual bool ProcessIOMessage(IOType clientIO, PER_IO_CONTEXT* pIOContext, DWORD dwSize) = 0;
};
#define BEGIN_IO_MSG_MAP() \
public: \
bool ProcessIOMessage(IOType clientIO, PER_IO_CONTEXT* pIOContext, DWORD dwSize) \
{ \
bool bRet = false;
#define IO_MESSAGE_HANDLE(msg, func) \
if(msg == clientIO) \
bRet = func(pIOContext, dwSize);
#define END_IO_MSG_MAP() \
return bRet; \
}
#endif
#pragma once
#include <Windows.h>
#include <string>
class LanCriticalLock
{
public:
LanCriticalLock(CRITICAL_SECTION& cs, const std::string& strFunc);
~LanCriticalLock();
protected:
void Lock();
void Unlock();
private:
CRITICAL_SECTION* m_pcs;
std::string m_strFunc;
};
#include "Global.h"
#include "LanCriticalLock.h"
LanCriticalLock::LanCriticalLock(CRITICAL_SECTION& cs, const std::string& strFunc)
{
m_strFunc = strFunc;
m_pcs = &cs;
Lock();
}
LanCriticalLock::~LanCriticalLock()
{
Unlock();
}
void LanCriticalLock::Lock()
{
EnterCriticalSection(m_pcs);
}
void LanCriticalLock::Unlock()
{
LeaveCriticalSection(m_pcs);
}
#pragma once
#include<windows.h>
class LanEvent
{
public:
LanEvent();
~LanEvent();
void Wait();
void Signal();
private:
HANDLE m_hEvent;
};
#include "Global.h"
#include "LanEvent.h"
LanEvent::LanEvent()
{
m_hEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL);
if (!m_hEvent)
cout << "创建信号量失败!!!" << endl;
}
LanEvent::~LanEvent()
{
if (m_hEvent)
{
::CloseHandle(m_hEvent);
m_hEvent = NULL;
}
}
void LanEvent::Wait()
{
::WaitForSingleObject(m_hEvent, INFINITE);
::ResetEvent(m_hEvent);
}
void LanEvent::Signal()
{
if (m_hEvent)
::SetEvent(m_hEvent);
}
#pragma once
#include "Global.h"
#include <mutex>
#include <vector>
#include "LanWorkerThread.h"
typedef void(*Task)(VOID);
class LanThreadPool
{
public:
~LanThreadPool();
void SetThreadPool(int nMinThreadCount, int nMaxThreadCount);
void Initialize(int nMinThreadCount, int nMaxThreadCount);
int GetNumberOfProcessors();
bool CreateWorkerThread(int nCount, Task task);
void ExecTask(Task task);
void Run()
{
for (int i = 0; i < m_vWorkerThread.size(); i++)
{
m_vWorkerThread[i]->Run();
}
}
void StopTask();
static LanThreadPool* GetLanThreadPoolInstance();
void FreeMommery();
void SetIOCP(CCoreByIOCP* coreByIOCP);
int GetCurrentThreadCount() { return (int)m_vWorkerThread.size(); }
private:
LanThreadPool();
LanThreadPool(const LanThreadPool&);
LanThreadPool& operator=(const LanThreadPool& threadPool);
static LanThreadPool* m_lanThreadPool;
static std::mutex* m_mtxForInstance;
private:
std::vector<LanWorkerThread*> m_vWorkerThread;
int m_minThreadCount;
int m_maxThreadCount;
};
#include "Global.h"
#include "LanThreadPool.h"
LanThreadPool* LanThreadPool::m_lanThreadPool = new LanThreadPool();
std::mutex* LanThreadPool::m_mtxForInstance = new std::mutex();
LanThreadPool::LanThreadPool()
{
m_vWorkerThread.clear();
m_minThreadCount = 0;
m_maxThreadCount = GetNumberOfProcessors() * 2;
}
LanThreadPool::LanThreadPool(const LanThreadPool&)
{
}
LanThreadPool& LanThreadPool::operator=(const LanThreadPool& threadPool)
{
if (this != &threadPool)
{
}
return *this;
}
LanThreadPool::~LanThreadPool()
{
if (m_mtxForInstance)
{
delete m_mtxForInstance;
m_mtxForInstance = NULL;
}
for (auto iter = m_vWorkerThread.begin(); iter != m_vWorkerThread.end(); )
{
delete *iter;
iter = m_vWorkerThread.erase(iter);
}
m_vWorkerThread.clear();
}
void LanThreadPool::SetThreadPool(int nMinThreadCount, int nMaxThreadCount)
{
if ((nMinThreadCount < 0) || (nMaxThreadCount < 0) || (nMinThreadCount > nMaxThreadCount))
{
cout << "线程数设置的最大或最小值错误!!" << endl;
return;
}
Initialize(nMinThreadCount, nMaxThreadCount);
}
void LanThreadPool::Initialize(int nMinThreadCount, int nMaxThreadCount)
{
int nNumberOfProcessors = GetNumberOfProcessors();
nNumberOfProcessors = nNumberOfProcessors * 2;
if (nMinThreadCount > nNumberOfProcessors)
{
nMinThreadCount = nNumberOfProcessors;
}
if (nMaxThreadCount > nNumberOfProcessors)
{
nMaxThreadCount = nNumberOfProcessors;
}
m_minThreadCount = nMinThreadCount;
m_maxThreadCount = nMaxThreadCount;
if (m_vWorkerThread.size() > 0)
{
for (auto iter = m_vWorkerThread.begin(); iter != m_vWorkerThread.end(); )
{
(*iter)->Stop();
ResumeThread((*iter)->GetHandle());
delete *iter;
iter = m_vWorkerThread.erase(iter);
}
}
m_vWorkerThread.clear();
CreateWorkerThread(nMinThreadCount, NULL);
}
int LanThreadPool::GetNumberOfProcessors()
{
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
return sysInfo.dwNumberOfProcessors;
}
bool LanThreadPool::CreateWorkerThread(int nCount, Task task)
{
for (int i = 0; i < nCount; i++)
{
if (m_vWorkerThread.size() + 1 <= m_maxThreadCount)
{
LanWorkerThread* newLanWorkerThread = new LanWorkerThread();
if (!newLanWorkerThread)
return false;
m_vWorkerThread.push_back(newLanWorkerThread);
newLanWorkerThread->SetTask(task);
}
}
return true;
}
void LanThreadPool::ExecTask(Task task)
{
if (task)
{
Task tempTask = task;
bool bIsOk = false;
do
{
bool bIsRunTask = false;
for (int i = 0; i < m_vWorkerThread.size(); i++)
{
if (!m_vWorkerThread[i]->GetRunState())
{
m_vWorkerThread[i]->SetTask(tempTask);
bIsRunTask = true;
bIsOk = true;
break;
}
}
if (!bIsRunTask)
{
bIsOk = CreateWorkerThread(1, tempTask);
if (bIsOk)
{
m_vWorkerThread[m_vWorkerThread.size() - 1]->Run();
}
}
} while (!bIsOk && tempTask);
}
}
void LanThreadPool::StopTask()
{
cout << "当前运行的线程数量: " << m_vWorkerThread.size() << endl;
for (int i = 0; i < m_vWorkerThread.size(); i++)
{
m_vWorkerThread[i]->Stop();
ResumeThread(m_vWorkerThread[i]->GetHandle());
}
}
LanThreadPool* LanThreadPool::GetLanThreadPoolInstance()
{
if (m_lanThreadPool == NULL)
{
m_mtxForInstance->lock();
if (m_lanThreadPool == NULL)
{
m_lanThreadPool = new LanThreadPool();
}
m_mtxForInstance->unlock();
}
return m_lanThreadPool;
}
void LanThreadPool::FreeMommery()
{
RELEASE_MEMORY(m_lanThreadPool);
}
void LanThreadPool::SetIOCP(CCoreByIOCP* coreByIOCP)
{
if (coreByIOCP == NULL)
{
cout << "IOCP对象为NULL!!!" << endl;
return;
}
for (int i = 0; i < m_vWorkerThread.size(); i++)
{
m_vWorkerThread[i]->SetCoreIOCP(coreByIOCP);
}
}
#pragma once
#include "Global.h"
#include "LanEvent.h"
#include <atomic>
typedef void(*Task)(VOID);
class LanWorkerThread
{
public:
LanWorkerThread();
~LanWorkerThread();
void Run();
void SetCoreIOCP(CCoreByIOCP* coreIOCP);
void Stop();
static unsigned __stdcall ThreadExec(void* pThis);
void DoExec();
void SetTask(Task task);
bool GetRunState()
{
return m_bRunState;
}
HANDLE GetHandle() { return m_hdlThread; }
private:
bool m_bIsStop;
std::atomic<bool> m_bRunState;
HANDLE m_hdlThread;
Task m_task;
LanEvent m_eventCondition;
CCoreByIOCP* m_coreIOCP;
};
#include "Global.h"
#include "LanWorkerThread.h"
#include <process.h>
LanWorkerThread::LanWorkerThread()
{
m_bIsStop = false;
m_bRunState = false;
m_task = NULL;
m_coreIOCP = NULL;
m_hdlThread = (HANDLE)_beginthreadex(NULL, 0, &LanWorkerThread::ThreadExec, this, CREATE_SUSPENDED, NULL);
if (m_hdlThread == 0)
{
cout << "创建线程失败!!!" << endl;
}
}
LanWorkerThread::~LanWorkerThread()
{
if (m_hdlThread != 0)
{
WaitForSingleObject(m_hdlThread, INFINITE);
cout << "线程ID:" << GetThreadId(m_hdlThread) << "退出!!!" << endl;
CloseHandle(m_hdlThread);
}
}
void LanWorkerThread::SetCoreIOCP(CCoreByIOCP* coreIOCP)
{
if (coreIOCP != NULL)
m_coreIOCP = coreIOCP;
}
void LanWorkerThread::Run()
{
if (m_coreIOCP == NULL)
{
cout << "IOCP对象为NULL!!!!!" << endl;
return;
}
if (m_hdlThread)
ResumeThread(m_hdlThread);
else
cout << "线程HANDLE错误!!!";
}
void LanWorkerThread::Stop()
{
m_bIsStop = true;
m_eventCondition.Signal();
}
unsigned __stdcall LanWorkerThread::ThreadExec(void* m_coreIOCP)
{
LanWorkerThread* pWorker = (LanWorkerThread*)m_coreIOCP;
if (!pWorker)
return 0;
pWorker->DoExec();
return 1;
}
void LanWorkerThread::DoExec()
{
if (m_coreIOCP == NULL)
{
cout << "IOCP对象为NULL!!!!!" << endl;
return;
}
OVERLAPPED* pOverlapped = NULL;
DWORD dwIoSize = 0;
BOOL bRet = FALSE;
DWORD dwErr = 0;
IOCP_PARAM* pIocpParam = NULL;
PER_IO_CONTEXT* pIoContext = NULL;
HANDLE hdIOCP = m_coreIOCP->GetIOCPPort();
while (1)
{
bRet = GetQueuedCompletionStatus(
hdIOCP,
&dwIoSize,
(PULONG_PTR)&pIocpParam,
&pOverlapped,
INFINITE);
if (EXIT_CODE == pIocpParam)
break;
pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_overLapped);
if (!bRet)
{
dwErr = GetLastError();
if (WAIT_TIMEOUT == dwErr)
{
if (-1 == send(pIocpParam->m_rourceSock, "", 0, 0))
{
m_coreIOCP->MoveToFreeParamPool(pIocpParam);
m_coreIOCP->RemoveStaleClient(pIoContext, FALSE);
}
continue;
}
if (ERROR_NETNAME_DELETED == dwErr)
{
m_coreIOCP->MoveToFreeParamPool(pIocpParam);
m_coreIOCP->RemoveStaleClient(pIoContext, FALSE);
continue;
}
break;
}
if (bRet && 0 == dwIoSize)
{
m_coreIOCP->MoveToFreeParamPool(pIocpParam);
m_coreIOCP->RemoveStaleClient(pIoContext, FALSE);
continue;
}
if (bRet && NULL != pIoContext && NULL != pIocpParam)
{
try
{
m_coreIOCP->ProcessIOMessage(pIoContext->m_IOType, pIoContext, dwIoSize);
}
catch (...) { }
}
}
cout << "线程ID:" << GetThreadId(m_hdlThread) << "执行完毕!!" << endl;
}
void LanWorkerThread::SetTask(Task task)
{
if (task)
{
m_task = task;
m_bRunState = true;
m_eventCondition.Signal();
}
}
#pragma once
#define _CRT_SECURE_NO_WARNINGS
#include "Global.h"
#include "LanCriticalLock.h"
#include "LanEvent.h"
#include <map>
typedef void (CALLBACK* NOTIFYPROC)(LPVOID, PER_IO_CONTEXT*, UINT);
typedef std::list<PER_IO_CONTEXT*> IOContextList;
typedef std::list<IOCP_PARAM*> IocpParamList;
class CCoreByIOCP
{
public:
explicit CCoreByIOCP();
~CCoreByIOCP();
bool RunCore(NOTIFYPROC pNotifyProc, const UINT& nPort);
void StopCore();
HANDLE GetIOCPPort() { return m_hIOCompletionPort; }
VOID MoveToFreeParamPool(IOCP_PARAM* pIocpParam);
VOID MoveToFreePool(PER_IO_CONTEXT* pIoContext);
IOCP_PARAM* AllocateIocpParam();
VOID RemoveStaleClient(PER_IO_CONTEXT* pIoContext, BOOL bGraceful);
VOID ReleaseResource();
bool PostSend(PER_IO_CONTEXT* pIoContext, bool bIsSendToSource = true);
PER_IO_CONTEXT* AllocateClientIOContext();
BEGIN_IO_MSG_MAP()
IO_MESSAGE_HANDLE(EM_IORecv, OnClientReading)
IO_MESSAGE_HANDLE(EM_IOSend, OnClientWriting)
IO_MESSAGE_HANDLE(EM_IOAccept, OnClientAccept)
END_IO_MSG_MAP()
bool OnClientAccept(PER_IO_CONTEXT* pIOContext, DWORD dwSize = 0);
bool OnClientReading(PER_IO_CONTEXT* pIOContext, DWORD dwSize = 0);
bool OnClientWriting(PER_IO_CONTEXT* pIOContext, DWORD dwSize = 0);
protected:
bool InitNetEnvironment();
bool InitializeIOCP();
bool InitializeListenSocket();
bool PostAcceptEx(PER_IO_CONTEXT* pAcceptIoContext);
bool PostRecv(PER_IO_CONTEXT* pIoContext, bool bIsNeedInit = false);
bool OnAccept(PER_IO_CONTEXT* pIoContext);
bool AssociateSocketWithCompletionPort(SOCKET socket, DWORD dwCompletionKey);
void AddCLientInfo(SOCKET socket, UINT32 id);
void RomoveClientInfo(SOCKET socket);
void GetClientSOCKET(UINT32 id, SOCKET& socket);
void GetClientID(UINT32& id, SOCKET socket);
private:
NOTIFYPROC m_pNotifyProc;
CRITICAL_SECTION m_cs;
HANDLE m_hShutDownEvent;
HANDLE m_hIOCompletionPort;
LanThreadPool* m_pWorkThreads;
PER_IO_CONTEXT* m_pListenContext;
SOCKET m_socListen;
UINT m_nPort;
IOContextList m_listAcceptExSock;
IOContextList m_listIoContext;
IOContextList m_listFreeIoContext;
IocpParamList m_listIocpParam;
IocpParamList m_listFreeIocpParam;
IOCP_PARAM* m_pListenIocpParam;
int m_nKeepLiveTime;
LPFN_ACCEPTEX m_lpfnAcceptEx;
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs;
std::map<SOCKET, UINT32> m_mapClient;
bool m_bIsExecStopCore;
};
CCoreByIOCP::CCoreByIOCP()
{
m_hShutDownEvent = NULL;
m_hIOCompletionPort = NULL;
m_pWorkThreads = NULL;
m_nPort = 0;
m_lpfnAcceptEx = NULL;
m_pListenContext = new PER_IO_CONTEXT();
m_nKeepLiveTime = 1000 * 60 * 3;
m_pListenIocpParam = new IOCP_PARAM;
m_mapClient.clear();
m_bIsExecStopCore = false;
}
CCoreByIOCP::~CCoreByIOCP()
{
if (!m_bIsExecStopCore)
{
StopCore();
}
}
bool CCoreByIOCP::RunCore(NOTIFYPROC pNotifyProc, const UINT& nPort)
{
m_nPort = nPort;
m_pNotifyProc = pNotifyProc;
m_bIsExecStopCore = false;
InitializeCriticalSection(&m_cs);
bool bRet = false;
do
{
if (NULL == (m_hShutDownEvent = CreateEvent(NULL, TRUE, FALSE, NULL)))
break;
if (!InitNetEnvironment())
break;
if (!InitializeIOCP())
break;
if (!InitializeListenSocket())
break;
bRet = true;
} while (FALSE);
if (!bRet)
{
cout << "启动服务器失败!!!!!!!!" << endl;
}
return bRet;
}
void CCoreByIOCP::StopCore()
{
m_bIsExecStopCore = true;
if (m_socListen != INVALID_SOCKET)
{
SetEvent(m_hShutDownEvent);
if (m_pWorkThreads)
{
for (int i = 0; i <m_pWorkThreads->GetCurrentThreadCount(); i++)
{
PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL);
}
}
ReleaseResource();
}
}
bool CCoreByIOCP::InitNetEnvironment()
{
WSADATA wsaData;
if (0 != WSAStartup(MAKEWORD(2, 2), &wsaData))
return false;
return true;
}
bool CCoreByIOCP::InitializeIOCP()
{
SYSTEM_INFO systemInfo;
m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (NULL == m_hIOCompletionPort)
return false;
GetSystemInfo(&systemInfo);
int m_nThreadCnt = WORKER_THREADS_PER_PROCESSOR * systemInfo.dwNumberOfProcessors;
m_pWorkThreads = LanThreadPool::GetLanThreadPoolInstance();
m_pWorkThreads->SetThreadPool(m_nThreadCnt, m_nThreadCnt);
m_pWorkThreads->SetIOCP(this);
m_pWorkThreads->Run();
return true;
}
bool CCoreByIOCP::InitializeListenSocket()
{
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
m_socListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == m_socListen)
return false;
m_pListenIocpParam->m_rourceSock = m_socListen;
if (!AssociateSocketWithCompletionPort(m_socListen, (DWORD)(m_pListenIocpParam->m_rourceSock)))
{
RELEASE_SOCKET(m_socListen);
return false;
}
SOCKADDR_IN servAddr;
ZeroMemory(&servAddr, sizeof(SOCKADDR_IN));
servAddr.sin_family = AF_INET;
servAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
servAddr.sin_port = htons(m_nPort);
if (SOCKET_ERROR == ::bind(m_socListen, (struct sockaddr*)&servAddr, sizeof(servAddr)))
{
RELEASE_SOCKET(m_socListen);
return false;
}
if (SOCKET_ERROR == listen(m_socListen, SOMAXCONN))
{
RELEASE_SOCKET(m_socListen);
return false;
}
DWORD dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
m_socListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx, sizeof(GuidAcceptEx),
&m_lpfnAcceptEx, sizeof(m_lpfnAcceptEx),
&dwBytes, NULL, NULL))
{
this->ReleaseResource();
return false;
}
if (SOCKET_ERROR == WSAIoctl(
m_socListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs),
&m_lpfnGetAcceptExSockAddrs,
sizeof(m_lpfnGetAcceptExSockAddrs),
&dwBytes,
NULL,
NULL))
{
this->ReleaseResource();
return false;
}
for (int i = 0; i < MAX_POST_ACCEPT; i++)
{
PER_IO_CONTEXT* pAcceptIoContext = new PER_IO_CONTEXT();
pAcceptIoContext->Init();
if (FALSE == PostAcceptEx(pAcceptIoContext))
{
this->RemoveStaleClient(pAcceptIoContext, TRUE);
this->ReleaseResource();
return false;
}
m_listAcceptExSock.push_back(pAcceptIoContext);
}
return true;
}
VOID CCoreByIOCP::MoveToFreeParamPool(IOCP_PARAM* pIocpParam)
{
LanCriticalLock cs(m_cs, "MoveToFreeParamPool");
IocpParamList::iterator iter;
iter = find(m_listIocpParam.begin(), m_listIocpParam.end(), pIocpParam);
if (iter != m_listIocpParam.end())
{
m_listFreeIocpParam.push_back(pIocpParam);
m_listIocpParam.remove(pIocpParam);
}
}
VOID CCoreByIOCP::MoveToFreePool(PER_IO_CONTEXT* pIoContext)
{
LanCriticalLock cs(m_cs, "MoveToFreePool");
IOContextList::iterator iter;
iter = find(m_listIoContext.begin(), m_listIoContext.end(), pIoContext);
if (iter != m_listIoContext.end())
{
m_listFreeIoContext.push_back(pIoContext);
m_listIoContext.remove(pIoContext);
}
}
IOCP_PARAM* CCoreByIOCP::AllocateIocpParam()
{
LanCriticalLock cs(m_cs, "AllocateIocpParam");
IOCP_PARAM* pIocpParam = NULL;
if (m_listFreeIocpParam.empty())
{
pIocpParam = new IOCP_PARAM;
}
else
{
pIocpParam = m_listFreeIocpParam.front();
m_listFreeIocpParam.remove(pIocpParam);
}
m_listIocpParam.push_back(pIocpParam);
if (pIocpParam != NULL)
{
pIocpParam->m_rourceSock = INVALID_SOCKET;
}
return pIocpParam;
}
VOID CCoreByIOCP::RemoveStaleClient(PER_IO_CONTEXT* pIoContext, BOOL bGraceful)
{
LanCriticalLock cs(m_cs, "RemoveStaleClient");
if (!bGraceful)
{
LINGER linger;
linger.l_onoff = 1;
linger.l_linger = 0;
setsockopt(pIoContext->m_rourceSock, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger));
}
RomoveClientInfo(pIoContext->m_rourceSock);
std::list<PER_IO_CONTEXT*>::iterator iter;
iter = find(m_listIoContext.begin(), m_listIoContext.end(), pIoContext);
if (iter != m_listIoContext.end())
{
CancelIo((HANDLE)pIoContext->m_rourceSock);
RELEASE_SOCKET(pIoContext->m_rourceSock);
while (!HasOverlappedIoCompleted((LPOVERLAPPED)pIoContext))
Sleep(0);
m_pNotifyProc(NULL, pIoContext, NC_CLIENT_DISCONNECT);
MoveToFreePool(pIoContext);
}
}
VOID CCoreByIOCP::ReleaseResource()
{
DeleteCriticalSection(&m_cs);
RELEASE_HANDLE(m_hShutDownEvent);
if (m_pWorkThreads)
m_pWorkThreads->FreeMommery();
RELEASE_HANDLE(m_hIOCompletionPort);
RELEASE_SOCKET(m_socListen);
RELEASE_MEMORY(m_pListenIocpParam);
IOContextList::iterator iter;
iter = m_listFreeIoContext.begin();
while (iter != m_listFreeIoContext.end())
{
RELEASE_MEMORY(*iter);
++iter;
}
m_listFreeIoContext.clear();
iter = m_listIoContext.begin();
while (iter != m_listIoContext.end())
{
RELEASE_SOCKET((*iter)->m_rourceSock);
RELEASE_MEMORY(*iter);
++iter;
}
m_listIoContext.clear();
iter = m_listAcceptExSock.begin();
while (iter != m_listAcceptExSock.end())
{
RELEASE_MEMORY(*iter);
++iter;
}
m_mapClient.clear();
}
bool CCoreByIOCP::OnAccept(PER_IO_CONTEXT* pIoContext)
{
SOCKADDR_IN* RemoteSockAddr = NULL;
SOCKADDR_IN* LocalSockAddr = NULL;
int nLen = sizeof(SOCKADDR_IN);
this->m_lpfnGetAcceptExSockAddrs(
pIoContext->m_wsaBuf.buf,
pIoContext->m_wsaBuf.len - ((nLen + 16) * 2),
nLen + 16, nLen + 16,
(sockaddr**)&LocalSockAddr, &nLen,
(sockaddr**)&RemoteSockAddr, &nLen);
PER_IO_CONTEXT* pNewIoContext = AllocateClientIOContext();
pNewIoContext->m_rourceSock = pIoContext->m_rourceSock;
pNewIoContext->m_resourceAddr = *RemoteSockAddr;
memcpy_s(pNewIoContext->m_szBufCache, pIoContext->m_dwBytesRecv, pIoContext->m_szBuf, pIoContext->m_dwBytesRecv);
pNewIoContext->m_dwBytesRecv = pIoContext->m_dwBytesRecv;
m_pNotifyProc(NULL, pIoContext, NC_CLIENT_CONNECT);
IOCP_PARAM* pIocpParam = AllocateIocpParam();
pIocpParam->m_rourceSock = pNewIoContext->m_rourceSock;
if (!AssociateSocketWithCompletionPort(pNewIoContext->m_rourceSock, (DWORD)pIocpParam->m_rourceSock))
{
RELEASE_SOCKET(m_socListen);
RELEASE_SOCKET(pNewIoContext->m_rourceSock);
RELEASE_MEMORY(pNewIoContext);
RELEASE_MEMORY(pIocpParam);
return false;
}
SendToIpInfo ipInfo;
ZeroMemory(&ipInfo, MAX_BUFFER_LEN);
memcpy_s(&ipInfo, MAX_BUFFER_LEN, pIoContext->m_szBuf, MAX_BUFFER_LEN);
AddCLientInfo(pNewIoContext->m_rourceSock, ipInfo.nSelfID);
unsigned long chOpt = 1;
if (SOCKET_ERROR == setsockopt(pNewIoContext->m_rourceSock, SOL_SOCKET, SO_KEEPALIVE, (char*)&chOpt, sizeof(char)))
{
MoveToFreeParamPool(pIocpParam);
RemoveStaleClient(pNewIoContext, TRUE);
return false;
}
tcp_keepalive klive;
klive.onoff = 1;
klive.keepalivetime = m_nKeepLiveTime;
klive.keepaliveinterval = 1000 * 10;
WSAIoctl
(
pNewIoContext->m_rourceSock,
SIO_KEEPALIVE_VALS,
&klive,
sizeof(tcp_keepalive),
NULL,
0,
(unsigned long *)&chOpt,
0,
NULL
);
if (!PostRecv(pNewIoContext))
{
MoveToFreeParamPool(pIocpParam);
}
LanCriticalLock cs(m_cs, "OnAccept");
pIoContext->Reset();
return PostAcceptEx(pIoContext);
}
bool CCoreByIOCP::OnClientAccept(PER_IO_CONTEXT* pIOContext, DWORD dwSize )
{
bool bRet = false;
try
{
pIOContext->m_dwBytesRecv = dwSize;
bRet = OnAccept(pIOContext);
}
catch (...) { }
return bRet;
}
bool CCoreByIOCP::OnClientReading(PER_IO_CONTEXT* pIOContext, DWORD dwSize )
{
LanCriticalLock cs(m_cs, "OnClientReading");
bool bRet = false;
try
{
if (pIOContext->m_dwBytesRecv != MAX_BUFFER_LEN)
{
char* p = pIOContext->m_szBufCache + pIOContext->m_dwBytesRecv;
memcpy(p, pIOContext->m_szBuf, dwSize);
}
pIOContext->m_dwBytesRecv += dwSize;
m_pNotifyProc(NULL, pIOContext, NC_RECEIVE);
if (pIOContext->m_dwBytesRecv == MAX_BUFFER_LEN)
{
cout << "完整接收数据!!!!!!!!!" << endl;
m_pNotifyProc(NULL, pIOContext, NC_RECEIVE_COMPLETE);
bRet = PostRecv(pIOContext, true);
}
else
{
bRet = PostRecv(pIOContext, false);
}
}
catch (...) { }
return bRet;
}
bool CCoreByIOCP::PostAcceptEx(PER_IO_CONTEXT * pAcceptIoContext)
{
pAcceptIoContext->m_IOType = EM_IOAccept;
pAcceptIoContext->m_rourceSock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == pAcceptIoContext->m_rourceSock)
return false;
if (FALSE == m_lpfnAcceptEx(
m_socListen,
pAcceptIoContext->m_rourceSock,
pAcceptIoContext->m_wsaBuf.buf,
pAcceptIoContext->m_wsaBuf.len - ((sizeof(SOCKADDR_IN) + 16) * 2),
sizeof(SOCKADDR_IN) + 16,
sizeof(SOCKADDR_IN) + 16,
&(pAcceptIoContext->m_dwBytesRecv),
&(pAcceptIoContext->m_overLapped))
)
{
if (WSA_IO_PENDING != WSAGetLastError())
return false;
}
return true;
}
bool CCoreByIOCP::PostRecv(PER_IO_CONTEXT* pIoContext, bool bIsNeedInit)
{
if (bIsNeedInit)
{
ZeroMemory(&pIoContext->m_overLapped, sizeof(OVERLAPPED));
ZeroMemory(pIoContext->m_szBuf, MAX_BUFFER_LEN);
ZeroMemory(pIoContext->m_szBufCache, MAX_BUFFER_LEN);
pIoContext->m_dwBytesRecv = 0;
}
pIoContext->m_IOType = EM_IORecv;
DWORD dwNumBytesOfRecvd;
ULONG ulFlags = MSG_PARTIAL;
UINT nRet = WSARecv(
pIoContext->m_rourceSock,
&(pIoContext->m_wsaBuf),
1,
&dwNumBytesOfRecvd,
&(ulFlags),
&(pIoContext->m_overLapped),
NULL);
if (SOCKET_ERROR == nRet && WSA_IO_PENDING != WSAGetLastError())
{
RemoveStaleClient(pIoContext, FALSE);
return false;
}
return true;
}
bool CCoreByIOCP::OnClientWriting(PER_IO_CONTEXT* pIOContext, DWORD dwSize )
{
bool bRet = false;
try
{
pIOContext->m_dwBytesSend += dwSize;
if (MAX_BUFFER_LEN > pIOContext->m_dwBytesSend)
{
bRet = PostSend(pIOContext);
}
else
{
m_pNotifyProc(NULL, pIOContext, NC_TRANSMIT);
pIOContext->m_dwBytesSend = 0;
MoveToFreePool(pIOContext);
}
}
catch (...) { }
return bRet;
}
bool CCoreByIOCP::PostSend(PER_IO_CONTEXT* pIoContext, bool bIsSendToSource)
{
char buf[MAX_BUFFER_LEN]{ 0 };
char cResourceID[12]{ 0 };
UINT32 ID = 0;
GetClientID(ID, pIoContext->m_rourceSock);
itoa(ID, cResourceID, 10);
memcpy(buf, cResourceID, 4);
char* p = buf + 4;
memcpy(p, pIoContext->m_szBuf, MAX_BUFFER_LEN - 4);
memcpy(pIoContext->m_szBuf, buf, MAX_BUFFER_LEN);
pIoContext->m_wsaBuf.buf = pIoContext->m_szBuf;
pIoContext->m_wsaBuf.len = MAX_BUFFER_LEN;
pIoContext->m_IOType = EM_IOSend;
ULONG ulFlags = MSG_PARTIAL;
SOCKET socket = INVALID_SOCKET;
if (bIsSendToSource)
{
socket = pIoContext->m_rourceSock;
}
else
{
GetClientSOCKET(pIoContext->m_desID, socket);
if (socket == INVALID_SOCKET)
{
return false;
}
}
INT nRet = WSASend(
socket,
&pIoContext->m_wsaBuf,
1,
&(pIoContext->m_wsaBuf.len),
ulFlags,
&(pIoContext->m_overLapped),
NULL);
if (SOCKET_ERROR == nRet && WSA_IO_PENDING != WSAGetLastError())
{
RemoveStaleClient(pIoContext, FALSE);
return false;
}
return true;
}
bool CCoreByIOCP::AssociateSocketWithCompletionPort(SOCKET socket, DWORD dwCompletionKey)
{
HANDLE hTmp = CreateIoCompletionPort((HANDLE)socket, m_hIOCompletionPort, dwCompletionKey, 0);
return hTmp == m_hIOCompletionPort;
}
void CCoreByIOCP::AddCLientInfo(SOCKET socket, UINT32 id)
{
cout << "增加客户端信息: " << socket << ":" << id << endl;
m_mapClient[socket] = id;
}
void CCoreByIOCP::RomoveClientInfo(SOCKET socket)
{
m_mapClient.erase(socket);
}
void CCoreByIOCP::GetClientSOCKET(UINT32 id, SOCKET & socket)
{
std::map<SOCKET, UINT32>::iterator iter = m_mapClient.begin();
for (; iter!=m_mapClient.end(); iter++)
{
if (iter->second == id)
{
socket = iter->first;
return;
}
}
socket = INVALID_SOCKET;
}
void CCoreByIOCP::GetClientID(UINT32& id, SOCKET socket)
{
std::map<SOCKET, UINT32>::iterator iter = m_mapClient.begin();
for (; iter != m_mapClient.end(); ++iter)
{
if (iter->first == socket)
{
id = iter->second;
return;
}
}
id = 0;
}
PER_IO_CONTEXT* CCoreByIOCP::AllocateClientIOContext()
{
LanCriticalLock cs(m_cs, "AllocateSocketContext");
PER_IO_CONTEXT* pIoContext = NULL;
if (!m_listFreeIoContext.empty())
{
pIoContext = m_listFreeIoContext.front();
m_listFreeIoContext.remove(pIoContext);
}
else
{
pIoContext = new PER_IO_CONTEXT();
}
m_listIoContext.push_back(pIoContext);
if (pIoContext != NULL)
{
pIoContext->Init();
}
return pIoContext;
}
#include "Global.h"
#include "CoreByIOCP.h"
#include <WS2tcpip.h>
#include <stdlib.h>
#include<string.h>
CCoreByIOCP* pIOCP;
void CALLBACK NotifyProc(LPVOID lparam, PER_IO_CONTEXT* pIoContext, UINT uFlag)
{
switch (uFlag)
{
case NC_CLIENT_CONNECT:
{
SendToIpInfo ipInfo;
memset(&ipInfo, 0, MAX_BUFFER_LEN);
memcpy(&ipInfo, pIoContext->m_szBuf, MAX_BUFFER_LEN);
cout << "[" << inet_ntoa(pIoContext->m_resourceAddr.sin_addr) << "-" << htons(pIoContext->m_resourceAddr.sin_port)
<< "]->连接,第一条数据【" << ipInfo.nSelfID << ":" << ipInfo.nSendToID << ":" << ipInfo.buf << "】" << endl;
break;
}
case NC_CLIENT_DISCONNECT:
{
cout << "[" << inet_ntoa(pIoContext->m_resourceAddr.sin_addr) << "-" << htons(pIoContext->m_resourceAddr.sin_port)
<< "]->断开连接" << endl;
break;
}
case NC_TRANSMIT:
{
cout << "[" << inet_ntoa(pIoContext->m_resourceAddr.sin_addr) << "-" << htons(pIoContext->m_resourceAddr.sin_port)
<< "]->数据发送成功【" << pIoContext->m_szBuf << "】" << endl;
break;
}
case NC_RECEIVE:
{
SendToIpInfo ipInfo;
ZeroMemory(&ipInfo, MAX_BUFFER_LEN);
memcpy(&ipInfo, pIoContext->m_szBuf, MAX_BUFFER_LEN);
cout << "接收到[" << inet_ntoa(pIoContext->m_resourceAddr.sin_addr) << "-" << htons(pIoContext->m_resourceAddr.sin_port)
<< "]->【" << ipInfo.nSelfID << ":" << ipInfo.nSendToID << ":" << ipInfo.buf << "】" << endl;
break;
}
case NC_RECEIVE_COMPLETE:
{
SendToIpInfo ipInfo;
ZeroMemory(&ipInfo, MAX_BUFFER_LEN);
memcpy(&ipInfo, pIoContext->m_szBufCache, MAX_BUFFER_LEN);
cout << "服务器已完整接收到您发的信息[" << inet_ntoa(pIoContext->m_resourceAddr.sin_addr) << "-" << htons(pIoContext->m_resourceAddr.sin_port)
<< "]->【" << ipInfo.nSelfID << ":" << ipInfo.nSendToID << ":" << ipInfo.buf << "】" << endl;
if (ipInfo.nSendToID <= 0)
{
break;
}
PER_IO_CONTEXT* pSendIoContext = pIOCP->AllocateClientIOContext();
pSendIoContext->m_rourceSock = pIoContext->m_rourceSock;
pSendIoContext->m_resourceAddr = pIoContext->m_resourceAddr;
pSendIoContext->m_desID = ipInfo.nSendToID;
memcpy(pSendIoContext->m_szBuf, ipInfo.buf, strlen(ipInfo.buf));
pSendIoContext->m_dwBytesSend = 0;
if (pIOCP->PostSend(pSendIoContext, false))
{
char szSend[] = "目标地址已成功接收!!!";
ZeroMemory(pSendIoContext->m_szBuf, MAX_BUFFER_LEN);
memcpy(pSendIoContext->m_szBuf, szSend, strlen(szSend));
pSendIoContext->m_dwBytesSend = static_cast<WORD>(strlen(szSend));
pIOCP->PostSend(pSendIoContext, true);
}
else
{
char szSend[] = "接收者ID不存在或已下线!!!";
ZeroMemory(pSendIoContext->m_szBuf, MAX_BUFFER_LEN);
memcpy(pSendIoContext->m_szBuf, szSend, strlen(szSend));
pSendIoContext->m_dwBytesSend = static_cast<WORD>(strlen(szSend));
pIOCP->PostSend(pSendIoContext, true);
}
break;
}
default:
break;
}
return;
}
int main()
{
pIOCP = new CCoreByIOCP;
pIOCP->RunCore(NotifyProc, PORT);
char szIn[32];
while (1)
{
printf("输入 quit 退出IOCP服务器: \n");
memset(szIn, 0, sizeof(szIn));
scanf_s("%s", szIn, 32);
if (strcmp(szIn, "quit") == 0)
{
pIOCP->StopCore();
break;
}
}
system("pause");
return 0;
}