Linux-Thift学习笔记
一、Thrift概述
1.1 Thrift基本概念
Thrift是一个RPC(远程过程调用协议Remote Procedure Call Protocol)软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在C++、Java、Go、Python、PHP、Ruby、Erlang、Perl、Haskell、C#、Cocoa、JavaScript、Node.js、Smalltalk、OCaml这些编程语言间无缝结合的、高效的服务。Thrift允许定义一个简单的定义文件中的数据类型和服务接口,以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言。
1.2 Thrift IDL
Thrift采用接口定义语言IDL(Interface Definition Language)来定义通用的服务接口,然后通过Thrift提供的编译器,可以将服务接口编译成不同语言编写的代码,通过这个方式来实现跨语言的功能。
- 通过命令调用Thrift提供的编译器将服务接口编译成不同语言编写的代码。
- 这些代码又分为服务端和客户端,将所在不同进程(或服务器)的功能连接起来。
thrift -r --gen <language> <Thrift filename>
1.3 如何创建Thrift服务?
- 定义服务接口(存放接口的文件夹就是Thrift文件)。
- 作为服务端的服务,需要生成server。
- 作为请求端的服务,需要生成client。
1.4 实例讲解
如上图所示,这个游戏的功能可能运行在一个或多个服务器(进程)上,而Thrift就是将不同服务器不同语言的功能连接起来。图中的三个节点(功能)是完全独立的,既可以在同一个服务器上,也可以在不同服务器上。每一个节点就是一个进程,每个进程可以使用不同的语言来实现。
游戏节点到匹配节点需要实现一条有向边(可以包含多个函数),因此游戏节点需要实现match_client
端,表示可以调用匹配服务器的函数;匹配系统需要实现match_server
端,表示可以让游戏节点的client端调用自身的函数。同时匹配系统还需实现save_client
端(假设数据存储服务器已实现save_server
端)。
二、Thrift教程
首先创建一个游戏系统文件夹game
、匹配系统文件夹match_system
、保存各种接口的文件夹thrift
。
2.1 match_server端框架
在thrift
文件夹中创建一个文件:match.thrift
,内容如下:
namespace cpp match_service
struct User /**定义结构体存储用户信息*/
{
1: i32 id, /**i32表示int*/
2: string name,
3: i32 score /**按照分值匹配*/
}
service Match
{
i32 add_user(1: User user, 2: string info),
i32 remove_user(1: User user, 2: string info),
}
前往Thrift官网,点击Tutorial,再点击C++,即可看到如何通过这个接口生成一个C++版本的服务器。命令如下:
thrift -r --gen cpp tutorial.thrift
在match_system
文件夹中创建一个文件夹src
,表示源文件。在src
文件夹中输入以下命令:
thrift -r --gen cpp ../../thrift/match.thrift
执行后会发现该目录下生成了一个gen-cpp
的文件夹,为了后续方便操作,将文件夹改个名:
mv gen-cpp match_server
将自动实现好的文件移出来:
mv match_server/Match_server.skeleton.cpp main.cpp
由于该文件里的函数还没有进行逻辑实现,因此先在每个函数中加上return 0;
后编译一遍,文件内容如下(可以使用gg=G
进行格式化):
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h" // 注意已经将该文件移出来了,因此头文件路径要改
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
return 0;
}
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
return 0;
}
};
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
server.serve();
return 0;
}
接下来进行编译链接,链接的时候需要用到Thrift的动态链接库,需要加上-lthrift
:
g++ -c main.cpp match_server/*.cpp
g++ *.o -o main -lthrift
这时输入./main
即可运行程序,但是此时什么内容都没有。Thrift只是将接口实现好了,具体的业务逻辑没有实现。我们可以先将文件上传至Git,上传的时候注意一般不将.o
文件和可执行文件上传;
git add .
git restore --staged *.o
git restore --staged main
git status
git commit -m "add match_server"
git push
2.2 match_client端框架与实现
首先同样在game
文件夹中创建src
文件夹,进入src
文件夹后我们需要生成Python代码:
thrift -r --gen py ../../thrift/match.thrift
生成后该目录下有个文件夹gen-py
,也就是生成了Python的服务器端,同样将其改个名:
mv gen-py match_client
创建文件client.py
,将官网中Python客户端的代码(前四行是为了将当前路径加入到Python的环境变量中,可以删掉)复制过来,并进行简单的修改:
from match_client.match import Match
from match_client.match.ttypes import User
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
def main():
# Make socket
transport = TSocket.TSocket('localhost', 9090)
# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)
# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)
# Create a client to use the protocol encoder
client = Match.Client(protocol)
# Connect!
transport.open()
user = User(1, 'yyj', 1500)
client.add_user(user, "")
# Close!
transport.close()
if __name__ == "__main__":
main()
然后我们将match_system/src
中的main
执行后,再执行game/src
中的client.py
:
python3 client.py
可以看到main
程序那边输出:add_user。说明我们的match_client
端和match_server
端已经初步实现了,此时更新一下Git,注意.pyc
文件也最好不要上传:
git add .
git restore --staged *.pyc
git status
git commit -m "add match_client"
git push
接着我们进行优化,从控制台输入用户信息,并指定是添加还是删除用户,修改后的client.py
代码如下:
from match_client.match import Match
from match_client.match.ttypes import User
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from sys import stdin
def operate(op, user_id, username, score):
# Make socket
transport = TSocket.TSocket('localhost', 9090)
# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)
# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)
# Create a client to use the protocol encoder
client = Match.Client(protocol)
# Connect!
transport.open()
user = User(user_id, username, score)
if op == "add":
client.add_user(user, "")
elif op == "remove":
client.remove_user(user, "")
# Close!
transport.close()
def main():
for line in stdin:
op, user_id, username, score = line.split(' ')
operate(op, int(user_id), username, int(score))
if __name__ == "__main__":
main()
这样我们的match_client
端就算是完成了。
2.3 match_server端2.0实现
由于server端一方面需要读入或者移出用户,另一方面还要不断地去匹配,因此需要有一个线程去不断添加用户进来,一个线程去进行匹配,匹配完后再将信息传给一个服务器,且这两个操作是完全独立的,有可能长时间没有用户添加进来,但是匹配系统能够匹配两个已经匹配了很久的人。因此在这里需要用到并行技术,C++多线程需要使用到<thread>
头文件。
多线程相关知识点:
- IP和端口:如果把IP地址比作一间房子,端口就是出入这间房子的门。真正的房子只有几个门,但是一个IP地址的端口可以有$65536$($2^{16}$)个之多!端口是通过端口号来标记的,端口号只有整数,范围是从$0$到$65535$($2^{16}-1$)。同一个端口只能由一个进程来监听。所以我们一旦启动了一个服务,那么这个服务就不能在被另一个进程启动了。服务器的端口号要与客户端的端口号相同。
<thread>
库:C++中有一个thread
的库,可以用来开线程。通过定义一个变量将函数名作为参数,就能开一个线程了。首先定义线程的操作。
并行中经典的生产者和消费者模型。生产者、消费者是两个线程。本样例中的生产者:add_user()
、remove_user()
;消费者:匹配用户的功能。
生产者和消费者之间需要一个媒介。这个媒介可以有很多种方法。比如:消费队列。很多语言都有自己实现的消费队列,也可以自己实现消费队列。实现消费队列,就需要用到一些锁(mutex)。并行编程的基本概念:锁。- 互斥锁:在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为“互斥锁”的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。
锁有两个操作:一个$P$操作(上锁),一个$V$操作(解锁)。
定义互斥锁:mutex m
。锁一般使用信号量来实现的,mutex其实就是一个信号量(它特殊也叫互斥量)。互斥量就是同一时间能够分给一个人,即$S=1$。信号量$S=10$表示可以将信号量分给$10$个人来用。
$P$操作的主要动作是:
(1)$S$减$1$;
(2)若$S$减$1$后仍大于或等于$0$,则进程继续执行;
(3)若$S$减$1$后小于$0$,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。
$V$操作的主要动作是:
(1)$S$加$1$;
(2)若相加后结果大于$0$,则进程继续执行;
(3)若相加后结果小于或等于$0$,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。
对于$P$和$V$都是原子操作,就是在执行$P$和$V$操作时,不会被插队。从而实现对共享变量操作的原子性。
特殊:$S=1$表示互斥量,表示同一时间,信号量只能分配给一个线程。
多线程为啥要用锁?因为多线程可能共享一个内存空间,导致出现重复读取并修改的现象。
我们将程序功能修改为傻瓜式匹配,只要匹配池中的玩家数大于等于$2$,那么就将前两名玩家进行匹配,修改后的match_server
端main.cpp
代码如下:
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace std;
struct Task
{
User user;
string type;
};
struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
class Pool // 玩家匹配池
{
public:
void save_result(int a, int b)
{
printf("Match result: %d %d\n", a, b);
}
void match()
{
while (users.size() > 1)
{
auto a = users[0], b = users[1];
users.erase(users.begin());
users.erase(users.begin());
save_result(a.id, b.id);
}
}
void add(User user)
{
users.push_back(user);
}
void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i++)
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
unique_lock<mutex> lck(message_queue.m); // 当lck变量消失即函数结束后锁会自动释放
message_queue.q.push({ user, "add" });
message_queue.cv.notify_all(); // 唤醒条件变量
return 0;
}
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({ user, "remove" });
message_queue.cv.notify_all();
return 0;
}
};
void consume_task() // 消费者模型
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()) // 如果消息队列为空则应该先阻塞,而不能一直循环
{
message_queue.cv.wait(lck); // 先将锁释放,然后卡住,直到在其他地方将这个条件变量唤醒
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 尽早解锁,若等处理完task再解锁就等待时间太长了
// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
printf("Start Match Server\n");
thread matching_thread(consume_task);
server.serve();
return 0;
}
由于使用了线程库,因此编译的时候需要加上参数-pthread
:
g++ -c main.cpp
g++ *.o -o main -lthrift -pthread
测试效果如下图所示:
2.4 save_client端实现
假设接口已经实现,在thrift
文件夹中创建文件save.thrift
,内容如下:
namespace cpp save_service
service Save {
/**
* username: myserver的名称
* password: myserver的密码的md5sum的前8位
* 用户名密码验证成功会返回0,验证失败会返回1
* 验证成功后,结果会被保存到myserver:homework/lesson_6/result.txt中
*/
i32 save_data(1: string username, 2: string password, 3: i32 player1_id, 4: i32 player2_id)
}
在match_system/src
文件夹中输入以下指令生成接口的C++实现,并重命名,然后需要将自动生成的服务端代码删去:
thrift -r --gen cpp ../../thrift/save.thrift
mv gen-cpp save_client
cd save_client
rm Save_server.skeleton.cpp
接下来我们将Thrift官网C++教程中的Client端代码抄下来并相应进行修改,修改后的main.cpp
内容如下:
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace ::save_service; // 注意加上save_client端的命名空间
using namespace std;
struct Task
{
User user;
string type;
};
struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
class Pool // 玩家匹配池
{
public:
void save_result(int a, int b)
{
printf("Match result: %d %d\n", a, b);
std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);
try {
transport->open();
int res = client.save_data("acs_2077", "4503f06d", a, b);
if (!res) puts("Success");
else puts("Failed");
transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
}
void match()
{
while (users.size() > 1)
{
auto a = users[0], b = users[1];
users.erase(users.begin());
users.erase(users.begin());
save_result(a.id, b.id);
}
}
void add(User user)
{
users.push_back(user);
}
void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i++)
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
unique_lock<mutex> lck(message_queue.m); // 当lck变量消失即函数结束后锁会自动释放
message_queue.q.push({ user, "add" });
message_queue.cv.notify_all(); // 唤醒条件变量
return 0;
}
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({ user, "remove" });
message_queue.cv.notify_all();
return 0;
}
};
void consume_task() // 消费者模型
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()) // 如果消息队列为空则应该先阻塞,而不能一直循环
{
message_queue.cv.wait(lck); // 先将锁释放,然后卡住,直到在其他地方将这个条件变量唤醒
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 尽早解锁,若等处理完task再解锁就等待时间太长了
// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
printf("Start Match Server\n");
thread matching_thread(consume_task);
server.serve();
return 0;
}
2.5 match_server端3.0实现
通过修改匹配函数match()
实现将分差小于等于$50$的玩家进行匹配,修改后的代码如下:
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <unistd.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace ::save_service; // 注意加上save_client端的命名空间
using namespace std;
struct Task
{
User user;
string type;
};
struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
class Pool // 玩家匹配池
{
public:
void save_result(int a, int b)
{
printf("Match result: %d %d\n", a, b);
std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);
try {
transport->open();
int res = client.save_data("acs_2077", "4503f06d", a, b);
if (!res) puts("Success");
else puts("Failed");
transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
}
void match()
{
while (users.size() > 1)
{
sort(users.begin(), users.end(), [&](User &a, User &b){
return a.score < b.score;
});
bool flag = true; // 防止玩家之间分数差距都很大导致死循环
for (uint32_t i = 1; i < users.size(); i++)
{
auto a = users[i - 1], b = users[i];
if (b.score - a.score <= 50)
{
save_result(a.id, b.id);
users.erase(users.begin() + i - 1, users.begin() + i + 1);
flag = false;
break;
}
}
if (flag) break;
}
}
void add(User user)
{
users.push_back(user);
}
void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i++)
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
unique_lock<mutex> lck(message_queue.m); // 当lck变量消失即函数结束后锁会自动释放
message_queue.q.push({ user, "add" });
message_queue.cv.notify_all(); // 唤醒条件变量
return 0;
}
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({ user, "remove" });
message_queue.cv.notify_all();
return 0;
}
};
void consume_task() // 消费者模型
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()) // 如果消息队列为空则应该先阻塞,而不能一直循环
{
// message_queue.cv.wait(lck); // 先将锁释放,然后卡住,直到在其他地方将这个条件变量唤醒
lck.unlock();
pool.match();
sleep(1); // 每秒匹配一次
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 尽早解锁,若等处理完task再解锁就等待时间太长了
// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
printf("Start Match Server\n");
thread matching_thread(consume_task);
server.serve();
return 0;
}
2.6 match_server端4.0实现
通过Thrift官网C++教程下的Server端代码可以将match_server
改为多线程,修改后的main.cpp
代码如下:
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>
#include <thrift/TToString.h>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <unistd.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace ::save_service; // 注意加上save_client端的命名空间
using namespace std;
struct Task
{
User user;
string type;
};
struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
class Pool // 玩家匹配池
{
public:
void save_result(int a, int b)
{
printf("Match result: %d %d\n", a, b);
std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);
try {
transport->open();
int res = client.save_data("acs_2077", "4503f06d", a, b);
if (!res) puts("Success");
else puts("Failed");
transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
}
void match()
{
while (users.size() > 1)
{
sort(users.begin(), users.end(), [&](User &a, User &b){
return a.score < b.score;
});
bool flag = true; // 防止玩家之间分数差距都很大导致死循环
for (uint32_t i = 1; i < users.size(); i++)
{
auto a = users[i - 1], b = users[i];
if (b.score - a.score <= 50)
{
save_result(a.id, b.id);
users.erase(users.begin() + i - 1, users.begin() + i + 1);
flag = false;
break;
}
}
if (flag) break;
}
}
void add(User user)
{
users.push_back(user);
}
void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i++)
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
break;
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
unique_lock<mutex> lck(message_queue.m); // 当lck变量消失即函数结束后锁会自动释放
message_queue.q.push({ user, "add" });
message_queue.cv.notify_all(); // 唤醒条件变量
return 0;
}
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({ user, "remove" });
message_queue.cv.notify_all();
return 0;
}
};
class MatchCloneFactory : virtual public MatchIfFactory {
public:
~MatchCloneFactory() override = default;
MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
{
std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
/*cout << "Incoming connection\n";
cout << "\tSocketInfo: " << sock->getSocketInfo() << "\n";
cout << "\tPeerHost: " << sock->getPeerHost() << "\n";
cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
cout << "\tPeerPort: " << sock->getPeerPort() << "\n";*/
return new MatchHandler;
}
void releaseHandler(MatchIf* handler) override {
delete handler;
}
};
void consume_task() // 消费者模型
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()) // 如果消息队列为空则应该先阻塞,而不能一直循环
{
// message_queue.cv.wait(lck); // 先将锁释放,然后卡住,直到在其他地方将这个条件变量唤醒
lck.unlock();
pool.match();
sleep(1); // 每秒匹配一次
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 尽早解锁,若等处理完task再解锁就等待时间太长了
// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
int main(int argc, char **argv) {
TThreadedServer server(
std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
std::make_shared<TServerSocket>(9090), //port
std::make_shared<TBufferedTransportFactory>(),
std::make_shared<TBinaryProtocolFactory>());
printf("Start Match Server\n");
thread matching_thread(consume_task);
server.serve();
return 0;
}
2.7 match_server端5.0实现
通过对匹配机制的修改,实现玩家每等待一秒钟,匹配的分数区间扩大$50$分,修改后的main.cpp
代码如下:
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>
#include <thrift/TToString.h>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <unistd.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace ::save_service; // 注意加上save_client端的命名空间
using namespace std;
struct Task
{
User user;
string type;
};
struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
class Pool // 玩家匹配池
{
public:
void save_result(int a, int b)
{
printf("Match result: %d %d\n", a, b);
std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);
try {
transport->open();
int res = client.save_data("acs_2077", "4503f06d", a, b);
if (!res) puts("Success");
else puts("Failed");
transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
}
bool check_match(uint32_t i, uint32_t j)
{
auto a = users[i], b = users[j];
int dt = abs(a.score - b.score);
int a_max_dif = wt[i] * 50, b_max_dif = wt[j] * 50;
return dt <= a_max_dif && dt <= b_max_dif;
}
void match()
{
for (uint32_t i = 0; i < wt.size(); i++)
wt[i]++; // 表示等待秒数+1
while (users.size() > 1)
{
bool flag = true; // 防止玩家之间分数差距都很大导致死循环
for (uint32_t i = 0; i < users.size(); i++)
{
for (uint32_t j = i + 1; j < users.size(); j++)
if (check_match(i, j))
{
auto a = users[i], b = users[j];
users.erase(users.begin() + j); // 先删后面的再删前面的
users.erase(users.begin() + i);
wt.erase(wt.begin() + j);
wt.erase(wt.begin() + i);
save_result(a.id, b.id);
flag = false;
break;
}
if (!flag) break;
}
if (flag) break;
}
}
void add(User user)
{
users.push_back(user);
wt.push_back(0);
}
void remove(User user)
{
for (uint32_t i = 0; i < users.size(); i++)
if (users[i].id == user.id)
{
users.erase(users.begin() + i);
wt.erase(wt.begin() + i);
break;
}
}
private:
vector<User> users;
vector<int> wt; // 表示玩家的waiting time
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
unique_lock<mutex> lck(message_queue.m); // 当lck变量消失即函数结束后锁会自动释放
message_queue.q.push({ user, "add" });
message_queue.cv.notify_all(); // 唤醒条件变量
return 0;
}
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({ user, "remove" });
message_queue.cv.notify_all();
return 0;
}
};
class MatchCloneFactory : virtual public MatchIfFactory {
public:
~MatchCloneFactory() override = default;
MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
{
std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
/*cout << "Incoming connection\n";
cout << "\tSocketInfo: " << sock->getSocketInfo() << "\n";
cout << "\tPeerHost: " << sock->getPeerHost() << "\n";
cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
cout << "\tPeerPort: " << sock->getPeerPort() << "\n";*/
return new MatchHandler;
}
void releaseHandler(MatchIf* handler) override {
delete handler;
}
};
void consume_task() // 消费者模型
{
while (true)
{
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()) // 如果消息队列为空则应该先阻塞,而不能一直循环
{
// message_queue.cv.wait(lck); // 先将锁释放,然后卡住,直到在其他地方将这个条件变量唤醒
lck.unlock();
pool.match();
sleep(1); // 每秒匹配一次
}
else
{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 尽早解锁,若等处理完task再解锁就等待时间太长了
// do task
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
}
}
}
int main(int argc, char **argv) {
TThreadedServer server(
std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
std::make_shared<TServerSocket>(9090), //port
std::make_shared<TBufferedTransportFactory>(),
std::make_shared<TBinaryProtocolFactory>());
printf("Start Match Server\n");
thread matching_thread(consume_task);
server.serve();
return 0;
}