6. thrift教程
thrift官网:进入官网->Tutorial->tutorial.thrift
6.1 天梯游戏的匹配分析
游戏服务器 python3实现 AC Terminal
游戏服务器–>匹配系统: add_user 添加一名玩家
游戏服务器–>匹配系统:sremove_user 删除一名用户
match-client
匹配系统 cpp实现 AC Terminal
match-server
save-client
数据存储服务器 Myserver 9090端口
匹配系统–>数据存储服务器:save_data 保存匹配信息
三个节点(功能)是完全独立的,既可以在同一个服务器上,也可以在不同服务器上。
每一个节点就是一个进程,每个进程可以使用不同的语言来实现。
创建两个文件夹表示game节点(game)和匹配服务节点(match_system),也可以放在不同的服务器上,而数据存储节点的服务端已经实现好了,只要调用服务接口实现的函数即可。
thrift步骤
- 定义接口
- server
- client
6.2 知识点
6.2.1 线程
#include <thread>
// 开线程的库
一个程序运行是一个进程,一个进程中至少有一个线程。
线程是进程当中的一条执行流程。线程之间可以并发运行且共享相同的地址空间。
如果只有一个线程,则第二个任务必须等到第一个任务结束后才能进行,
如果使用多线程则在主线程执行任务的同时可以执行其他任务,而不需要等待。
6.2.2 互斥锁
#include <mutex>
// 锁 pv操作 将消费队列放在两个锁之间,保证同时只有一个进程写这个队列每个对象都对应于一个可称为” 互斥锁” 的标记,保证在任一时刻,只能有一个线程访问该对象。
加锁的目的就是保证共享资源在任意时间里,只有一个线程访问,这样就可以避免多线程导致共享数据错乱的问题。
锁一般使用信号量来实现的。
通常信号量表示资源的数量为S。
互斥量就是同一时间只能够分给一个线程,即S=1。
两个原子操作的系统调用函数来控制信号量分别为一个P操作(上锁),一个V操作(解锁)。 必须成对出现
P操作的主要动作是:
①S减1;
②若S减1后仍大于或等于0,则进程继续执行;
③若S减1后小于0,则该进程/线程进入阻塞等待,表明 P 操作可能会阻塞;V操作的主要动作是:
①S加1;
②若相加后结果大于0,则进程继续执行;
③若相加后结果小于或等于0,唤醒一个等待中的进程/线程,表明 V 操作不会阻塞。
6.2.3 生产者消费者模型
- 生产者在生成数据后,放在一个缓冲区中; (终端输入用户信息)
- 消费者从缓冲区取出数据处理; (取出用户信息匹配)
- 任何时刻,只能有一个生产者或消费者可以访问缓冲区;
生产者消费者模型的引入:在多线程开发中
生产者>消费者:生产过多的数据可能会导致存储不足
生产者<消费者:消费者经常处于等待状态
达到生产者和消费者生产数据和消费数据之间的平衡:需要一个缓冲区用来存储生产者生产的数据
生产者和消费者需要同步
当缓冲区满的时候,生产者会进入休眠状态,当下次消费者开始消耗缓冲区的数据时,生产者才会被唤醒,开始往缓冲区中添加数据;
当缓冲区空的时候,消费者也会进入休眠状态,直到生产者往缓冲区中添加数据时才会被唤醒。
6.2.4 消息队列
在生产者消费者模型中缓冲区的实现就是由队列来实现,当生产者生产数据后将信息入队,消费者获取信息后信息出队。
消息队列提供了异步通信协议
消息的发送者和接收者不需要同时与消息队列交互,消息会保存在队列中,直到接收者使用它
6.2.5 条件变量
#include <condition_variable>
//条件变量:对锁进行封装
互斥锁是线程间互斥的机制,条件变量则是同步机制。
互斥锁一个明显的缺点是他只有两种状态:锁定和非锁定。
而条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足,和互斥锁一起使用,以免出现竞态条件。
当条件不满足时,线程往往解开相应的互斥锁并阻塞线程然后等待条件发生变化。一旦其他的某个线程改变了条件变量,将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程。
6.3 创建作业 & 测试作业的正确性
homework 6 create 可以重新创建所有lesson_6的作业
homework 6 test 可以评测lesson_6的所有作业
6.4 作业
本次作业为复现课上最后一个版本的内容,课程视频地址
注意:本次作业的2个题目采用整体评测,即如果两个作业同时正确,则得100分;否则如果至少有一个作业错误,则得0分。
创建好作业后,先进入文件夹/home/acs/homework/lesson_6/
,当前目录的文件结构如下:
`-- thrift_lesson
|-- game
| `-- src
|-- match_system
| `-- src
|-- readme.md
`-- thrift
|-- match.thrift
`-- save.thrift
(0) 进入thrift_lesson/match_system/src/
目录,用cpp实现课上的match-server
和save-client
逻辑。
接口文件在thrift_lesson/thrift/
中。
实现后启动server,监听端口9090。
(1) 进入thrift_lesson/game/src/
目录,用python3实现课上的match-client
逻辑。
文件名和输入格式与课上内容相同。
6.4.1 version 1.0
homework 6 create
cd homework/lesson_6/thrift_lesson/
git init
git add .
git commit -m "version 1.0"
# gitLab新建项目
# 上传
git remote add origin git@git.acwing.com:Lsf/thrift_lesson.git
git push -u origin master
gitLab新建项目:参考git教程5.5 homework_5
6.4.2 version 2.0
6.4.2.1 服务端 match-server
match.thrift
单纯写lesson_6作业,create已经有,建议直接自己写熟练一下
namespace cpp match_service
struct User {
1: i32 id, // i32表示int
2: string name,
3: i32 score
}
service Match {
/**
* user: 添加的用户信息
* info: 附加信息
* 在匹配池中添加一个名用户
*/
i32 add_user(1: User user, 2: string info),
/**
* user: 删除的用户信息
* info: 附加信息
* 从匹配池中删除一名用户
*/
i32 remove_user(1: User user, 2: string info),
}
thrift --gen <language> <Thrift filename>
`-- gen-cpp
|-- Match.cpp
|-- Match.h
|-- Match_server.skeleton.cpp # cpp实现接口的一个样例
|-- match_types.cpp
`-- match_types.h
cd match_system/src
thrift -r --gen cpp ../../thrift/match.thrift
mv gen-cpp/ match_server
mv match_server/Match_server.skeleton.cpp main.cpp
# 修改该main.cpp
vim main.cpp
i
4:修改为 #include "match_server/Match.h"
+34:return 0; # 34行add_user函数位置加上return 0;先定义通过
+48:return 0;
[Esc]
gg=G
:wq
g++ -c main.cpp match_server/*.cpp # 编译
# g++ *.o -o main # 链接
g++ *.o -o main -lthrift # 链接+thrift动态链接库
./main # 运行
# crtl + c停止
git add .
git restore --staged *.o # 将.o文件从暂存区里移除,不上传 (可执行文件和编译文件不上传)
git restore --staged main
git commit -m "add match server"
git push
6.4.2.2 客户端 match-client
thrift -r --gen py tutorial.thrift
`-- gen-py
|-- __init__.py
`-- match
|-- Match-remote # 接口样例
|-- Match.py
|-- __init__.py
|-- constants.py
`-- ttypes.py
cd ../../game/src
thrift -r --gen py ../../thrift/match.thrift
mv gen-py/ match_client
cd match_client/match
rm Match-remote
cd ../../
vim client.py
python3 client.py # 服务端./main要运行
git add .
git restore --staged *.pyc
# git restore --staged *.swp # 打开vim产生的
git commit -m "add match client"
git push
client.py
from match_client.match import Match
from match_client.match.ttypes import User
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
def main():
# Make socket
transport = TSocket.TSocket('localhost', 9090)
# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)
# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)
# Create a client to use the protocol encoder
client = Match.Client(protocol)
# Connect!
transport.open()
user = User(1, 'Lsf', 1800)
client.add_user(user,"");
# Close!
transport.close()
if __name__ == "__main__":
main()
完善client.py
from match_client.match import Match
from match_client.match.ttypes import User
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from sys import stdin
def operate(op, user_id, username, score):
# Make socket
transport = TSocket.TSocket('localhost', 9090)
# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)
# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)
# Create a client to use the protocol encoder
client = Match.Client(protocol)
# Connect!
transport.open()
user = User(user_id, username, score)
if op == "add":
client.add_user(user,"")
elif op == "remove":
client.remove_user(user,"")
# Close!
transport.close()
def main():
for line in stdin:
op, user_id, username,score = line.split(' ')
operate(op, int(user_id), username, int(score))
if __name__ == "__main__":
main()
执行命令
git add client.py
git commit -m "finish match client"
git push
6.4.2.3 匹配单开线程
main.cpp
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <iostream>
#include <thread> // 开线程的库
#include <mutex> // 锁 pv操作 将消费队列放在两个锁之间,保证同时只有一个进程写这个队列
#include <condition_variable> //条件变量:对锁进行封装
#include <queue>
#include <vector>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace std;
struct Task{
User user;
string type;
};
// 自定义一个消费队列
struct MessageQueue{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
// 自定义玩家池
class Pool{
public:
void save_result(int a, int b){
printf("Match Result: %d %d\n", a, b);
}
void match(){
while(users.size() > 1){
auto a = users[0], b = users[1];
users.erase(users.begin());
users.erase(users.begin());
save_result(a.id, b.id);
}
}
void add(User user){
users.push_back(user);
}
void remove(User user){
for (uint32_t i = 0; i < users.size(); i++){
if (users[i].id == user.id){
users.erase(users.begin() + i);
break;
}
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
/**
* user: 添加的用户信息
* info: 附加信息
* 在匹配池中添加一个名用户
*
* @param user
* @param info
*/
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
unique_lock<mutex> lck(message_queue.m); //加锁,不用显式解锁
message_queue.q.push({user, "add"});
message_queue.cv.notify_all(); // 唤醒,也可notify_one
return 0;
}
/**
* user: 删除的用户信息
* info: 附加信息
* 从匹配池中删除一名用户
*
* @param user
* @param info
*/
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "remove"});
message_queue.cv.notify_all();
return 0;
}
};
/*
* 消费者函数
*/
void consume_task(){
while(true){
unique_lock<mutex> lck(message_queue.m);
// 刚开始队列为空,陷入死循环,占用CPU:应该把线程卡住,直到新玩家添加才继续执(使用条件变量)
if (message_queue.q.empty()){
message_queue.cv.wait(lck); // 卡到其他地方将条件变量唤醒
}
else{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 解锁
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
thread matching_thread(consume_task);
server.serve();
return 0;
}
执行命令
# 修改main.cpp后
g++ -c main.cpp
g++ *.o -o main -lthrift -pthread # 链接+thrift动态链接库+线程动态链接库
./main
git add main.cpp
git commit -m "match-server version 2.0"
git push
6.4.3 version 3.0
6.4.3.1 save-client
执行命令
cd ../../thrift
homework 4 getinfo
md5sum
[Enter]
Crtl+D
cd ../match_system/src/
thrift -r --gen cpp ../../thrift/save.thrift
mv gen-cpp save_client
cd save_client
rm Save_server.skeleton.cpp
cd ../
vim main.cpp
# 案例的main函数放在save_result函数里
localhost --> myserver地址
# game/src/client.py --> localhost-->127.0.0.1本地地址
g++ -c save_client/*.cpp
g++ *.o -o main -lthrift -pthread
g++ -c main.cpp
g++ *.o -o main -lthrift -pthread
./main
# myserver
cd homework/lesson_6
cat result.txt
save.thrift
namespace cpp save_service
service Save {
/**
* username: myserver的名称
* password: myserver的密码的md5sum的前8位
* 用户名密码验证成功会返回0,验证失败会返回1
* 验证成功后,结果会被保存到myserver:homework/lesson_6/result.txt中
*/
i32 save_data(1: string username, 2: string password, 3: i32 player1_id, 4: i32 player2_id)
}
main.cpp
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include "save_client/Save.h" //自己的头文件
#include <thrift/protocol/TBinaryProtocol.h> // 系统头文件
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>
#include <iostream>
#include <thread> // 开线程的库
#include <mutex> // 锁 pv操作 将消费队列放在两个锁之间,保证同时只有一个进程写这个队列
#include <condition_variable> //条件变量:对锁进行封装
#include <queue>
#include <vector>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service; // match.thrift里的命名空间
using namespace ::save_service; // save.thrift里的命名空间
using namespace std;
struct Task{
User user;
string type;
};
// 自定义一个消费队列
struct MessageQueue{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
// 自定义玩家池
class Pool{
public:
void save_result(int a, int b){
printf("Match Result: %d %d\n", a, b);
std::shared_ptr<TTransport> socket(new TSocket("123.57.67.128", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);
try {
transport->open();
client.save_data("acs_10062","de80a9d3",a,b); // save_data格式
transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
}
void match(){
while(users.size() > 1){
auto a = users[0], b = users[1];
users.erase(users.begin());
users.erase(users.begin());
save_result(a.id, b.id);
}
}
void add(User user){
users.push_back(user);
}
void remove(User user){
for (uint32_t i = 0; i < users.size(); i++){
if (users[i].id == user.id){
users.erase(users.begin() + i);
break;
}
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
/**
* user: 添加的用户信息
* info: 附加信息
* 在匹配池中添加一个名用户
*
* @param user
* @param info
*/
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
unique_lock<mutex> lck(message_queue.m); //加锁,不用显式解锁
message_queue.q.push({user, "add"});
message_queue.cv.notify_all(); // 唤醒,也可notify_one
return 0;
}
/**
* user: 删除的用户信息
* info: 附加信息
* 从匹配池中删除一名用户
*
* @param user
* @param info
*/
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "remove"});
message_queue.cv.notify_all();
return 0;
}
};
/*
* 消费者函数
*/
void consume_task(){
while(true){
unique_lock<mutex> lck(message_queue.m);
// 刚开始队列为空,陷入死循环,占用CPU:应该把线程卡住,直到新玩家添加才继续执(使用条件变量)
if (message_queue.q.empty()){
message_queue.cv.wait(lck); // 卡到其他地方将条件变量唤醒
}
else{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 解锁
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
thread matching_thread(consume_task);
server.serve();
return 0;
}
上传gitLab
git add main.cpp
git add ../../thrift/sace.thrift
git add save_client/
git add ../../game/src/client.py
git commit -m "implement save-client"
git push
6.4.3.2 升级匹配系统
main.cpp
/*
完整函数查看仓库历史 https://git.acwing.com/Lsf/thrift_lesson/-/commit/a1dd3ae9699d3b823b2f59428f8fd86ac6c61f8c
*/
#include <unistd.h> // 提供sleep
// 自定义玩家池
class Pool{
public:
void match(){
while(users.size() > 1){
sort(users.begin(), users.end(), [&](User& a, User b){
return a.score < b.score;
}); // 实现按score排序
bool flag = true; // 防止while死循环
for (uint32_t i = 1; i < users.size(); i++){
auto a = users[i - 1], b = users[i];
if (b.score - a.score <= 50){
users.erase(users.begin() + i - 1, users.begin() + i + 1);
save_result(a.id, b.id);
flag = false;
break;
}
}
if(flag) break;
}
}
}pool;
/*
* 消费者函数
*/
void consume_task(){
while(true){
unique_lock<mutex> lck(message_queue.m);
// 刚开始队列为空,陷入死循环,占用CPU:应该把线程卡住,直到新玩家添加才继续执(使用条件变量)
if (message_queue.q.empty()){
// message_queue.cv.wait(lck); // 卡到其他地方将条件变量唤醒
lck.unlock(); // 实现每一秒匹配一次
pool.match();
sleep(1);
}
else{
auto task = message_queue.q.front();
message_queue.q.pop();
lck.unlock(); // 解锁
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
执行命令
g++ -c main.cpp
g++ *.o -o main -lthrift -pthread
git add main.cpp
git commit -m "match version 3.0"
git push
6.4.4 version 4.0
vim替换命令
:1,$s/Calculator/Match/g
main.cpp
/*
完整函数查看仓库历史 https://git.acwing.com/Lsf/thrift_lesson/-/commit/38e3c5874ecca6b15b3e43aba82bece1dacaf36a
*/
执行命令
g++ -c main.cpp
g++ *.o -o main -lthrift -pthread
git add main.cpp
git commit -m "match version 4.0"
git push
6.4.5 version 5.0
随时间不断扩大匹配范围
main.cpp
/*
完整函数查看仓库历史 https://git.acwing.com/Lsf/thrift_lesson/-/commit/570a48ba204805be61eab66180ea678d2528b2dc
*/
执行命令
g++ -c main.cpp
g++ *.o -o main -lthrift -pthread
git add main.cpp
git commit -m "match version 5.0"
git push
测试作业
cd homework/lesson_6/thrift_lesson/match_system/src/
./main
cd homework/lesson_6/thrift_lesson/game/src/
homework 6 test
[----------------------------------------]
homework_0 is Right!
homework_1 is Right!
score: 100/100