基本使用
开发一个python
的服务器端和客户端调用grpc
服务器端
普通rpc
协议传输方式
grpc
:安装相关的包:
pip install grpcio
pip install grpcio-tools
pip install protobuf
编写.proto
代码
test.proto
代码如下:
syntax = "proto3"; // 使用哪种protobuf
package test; // 给包起一个名称
// 服务器中需要的服务,服务框架 demo:服务名 hellojlc:服务对应的函数名 函数对应的hellojlcReq和hellojlcReply
service demo {
rpc hellojlc(hellojlcReq) returns (hellojlcReply){}
}
// 定义数据结构 hellojlcReq和hellojlcReply对应需要传的参数和参数类型
message hellojlcReq {
string name = 1;
int32 age = 2;
}
消息定义中的每个字段都有一个唯一的编号。这些字段编号用于以二进制格式标识您的字段,一旦消息类型被使用,就不应该被更改。Tag的取值范围最小是1,最大是229229,但是19000~19999 是 protobuf
预留的,用户不能使用。使用频率高的变量最好设置为1~15,这样可以减少编码后的数据大小,但由于编号一旦指定不能修改,所以为了以后扩展,也记得为未来保留一些 1~15 的编号。
map
方式定义数据类型的说明:
message hellojlcReq {
string name = 1;
int32 age = 2;
message hellojlcReqNumberValue {
string name = 1;
int32 age = 2;
Bool is_active = 3;
}
map<string,hellojlcReqNumberValue > number = 4;
}
对应客户端的输入方式{“a”:pb2.hellojlcReqNumberValue {name : jlc age 23 : is_active :true}}
message hellojlcReply {
string result = 1;
}
编译.proto
文件
在.proto
文件中定义了相关的数据结构,这些数据结构是面向开发者和业务程序的,并不面向存储和传输。当需要把这些数据进行存储或传输时,就需要将这些结构数据进行序列化、反序列化以及读写。ProtoBuf
提供相应的接口代码,可以通过 protoc
这个编译器来生成相应的接口代码,把.proto
文件转换成.py
文件,在Terminal
中输入:
python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. test.proto
编译完生成_pb2.py
文件和_pb2_grpc.py
文件:
_pb2.py
文件每一个
message
对应的信息存储,request
和response
消息体在这里被定义extension
包括
message
中数据类型及其默认值的信息_pb2_grpc.py
文件
用来存储每一个服务的server
与客户端以及注册server
的工具
客户端名为:server_name + Stub
服务器名为:server_name + Servicer
注册服务函数为:add
服务端名to_server
相关代码介绍:
class demoStub(object): # 客户端
def __init__(self, channel):
self.hellojlc = channel.unary_unary( # unary_unary普通的传输类型
'/test.demo/hellojlc',
request_serializer=test__pb2.hellojlcReq.SerializeToString, # 序列化
response_deserializer=test__pb2.hellojlcReply.FromString, # 反序列化
)
class demoServicer(object): # 服务端
def hellojlc(self, request, context):
context.set_code(grpc.StatusCode.UNIMPLEMENTED) # 定义了一些code码和状态码
context.set_details('Method not implemented!') # 定义了一些错误信息
raise NotImplementedError('Method not implemented!')
def add_demoServicer_to_server(servicer, server): # 服务注册函数
rpc_method_handlers = { # rpc方法的控制器
'hellojlc': grpc.unary_unary_rpc_method_handler(
servicer.hellojlc,
request_deserializer=test__pb2.hellojlcReq.FromString,
response_serializer=test__pb2.hellojlcReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'test.demo', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
编写服务端service.py
文件
import time
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
from concurrent import futures # 创建线程数量
class demo(pb2_grpc.demoServicer): # 编写服务
def hellojlc(self, request, context): # 编写大类里面的服务函数
name = request.name # 获取参数
age = request.age
result = f'my name is {name}, i am {age} years old' # 返回参数
return pb2.hellojlcReply(result=result)
def run(): # 编写启动服务
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4) # 定义最大线程数量
)
pb2_grpc.add_demoServicer_to_server(demo(),grpc_server) # 注册服务,把demo()注册到grpc_server
grpc_server.add_insecure_port('127.0.0.1:5000') # 绑定ip和端口号
print('server will start at 127.0.0.1:5000')
grpc_server.start() # 启动服务
try: # 一直连续启动服务
while 1:
time.sleep(3600)
except KeyboardInterrupt:
grpc_server.stop(0) # 安全退出
if __name__ == '__main__':
run()
编写客户端client.py
文件
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
def run():
conn = grpc.insecure_channel('127.0.0.1:5000') # 定义频道
client = pb2_grpc.demoStub(channel=conn) # 生成客户端,指定了conn频道
resposne = client.hellojlc(pb2.hellojlcReq( # 传输数据
name='jlc',
age=23
))
print(resposne.result)
if __name__ == '__main__':
run()
单向服务器端发送给客户端流传输
编写.proto
代码
test.proto
代码如下:
syntax = "proto3";
package test;
service demo {
rpc TestClientRecvStream(TestClientRecvStreamRequest) returns(stream TestClientRecvStreamResponse){} // 单向服务器端发送给客户端流传输
}
message TestClientRecvStreamRequest {
string data = 1;
}
message TestClientRecvStreamResponse {
string result = 1;
}
编写服务端service.py
文件
import time
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
from concurrent import futures
class demo(pb2_grpc.demoServicer):
def TestClientRecvStream(self, request, context): # 客户端请求服务器端是一个普通的方式
index = 0
# 监听客户端是不是一个活跃的状态,只有客户端一直活跃,服务器才会一直发数据
while context.is_active():
data = request.data
if data == 'close': # 收到close数据,服务器端停止运行
print('data is close, request will cancel')
context.cancek()
time.sleep(1)
index += 1
result = 'send %d %s' % (index,data)
print(result)
yield pb2.TestClientRecvStreamResponse( # 服务器端要给客户端返回一种流的形式
result = result
)
def run():
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4)
)
pb2_grpc.add_demoServicer_to_server(demo(),grpc_server)
grpc_server.add_insecure_port('127.0.0.1:5000')
print('server will start at 127.0.0.1:5000')
grpc_server.start()
try:
while 1:
time.sleep(3600)
except KeyboardInterrupt:
grpc_server.stop(0)
if __name__ == '__main__':
run()
编写客户端client.py
文件
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
def run():
conn = grpc.insecure_channel('127.0.0.1:5000')
client = pb2_grpc.demoStub(channel=conn)
response = client.TestClientRecvStream(pb2.TestClientRecvStreamRequest( #客户端不停接收流的形式
data = 'jlc'
))
for item in response:
print(item.result)
if __name__ == '__main__':
run()
客户端在不停的收到服务器端的数据流
单向客户端发送给服务器端流传输
服务器端不停接收信息,当达到一定状态时,断开这一次的请求,并且服务器端返回给客户端一个结果
编写.proto
代码
test.proto
代码如下:
syntax = "proto3";
package test;
service demo {
rpc TestClientSendStream(stream TestClientSendStreamRequest) returns(TestClientSendStreamResponse){} // 单向客户端发送给服务器端流传输
}
message TestClientSendStreamRequest {
string data = 1;
}
message TestClientSendStreamResponse {
string result = 1;
}
编写服务端service.py
文件
import time
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
from concurrent import futures
class demo(pb2_grpc.demoServicer):
def TestClientSendStream(self, request_iterator, context): # request_iterator表示迭代器
index = 0
for request in request_iterator:
print(request.data, ':',index)
if index == 10: # index为10,断开连接
break
index += 1
return pb2.TestClientSendStreamResponse(result='ok') # 断开连接后,给客户端返回一个ok
def run():
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4)
)
pb2_grpc.add_demoServicer_to_server(demo(),grpc_server)
grpc_server.add_insecure_port('127.0.0.1:5000')
print('server will start at 127.0.0.1:5000')
grpc_server.start()
try:
while 1:
time.sleep(3600)
except KeyboardInterrupt:
grpc_server.stop(0)
if __name__ == '__main__':
run()
编写客户端client.py
文件
import time
import random
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
def test():
index = 0
while 1:
time.sleep(1)
data = str(random.random())
if index == 5: # index为5,断开连接,客户端不向服务端主动发送数据
break
print(index)
index += 1
yield pb2.TestClientSendStreamRequest(data=data)
def run():
conn = grpc.insecure_channel('127.0.0.1:5000')
client = pb2_grpc.demoStub(channel=conn)
# 单向客户端发送给服务器端流传输方式
response = client.TestClientSendStream(test())
print(response.result)
if __name__ == '__main__':
run()
双向流传输
编写.proto
代码
test.proto
代码如下:
syntax = "proto3";
package test;
service demo {
rpc TestTwoWayStream(stream TestTwoWayStreamRequest) returns(stream TestTwoWayStreamResponse) {} // 双向流传输
}
message TestTwoWayStreamRequest {
string data = 1;
}
message TestTwoWayStreamResponse {
string result = 1;
}
编写服务端service.py
文件
import time
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
from concurrent import futures
class demo(pb2_grpc.demoServicer):
def TestTwoWayStream(self, request_iterator, context):
for request in request_iterator: # 从客户端获取数据的方式
data = request.data
yield pb2.TestTwoWayStreamResponse(result='service send client %s' % data)
def run():
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4)
)
pb2_grpc.add_demoServicer_to_server(demo(),grpc_server)
grpc_server.add_insecure_port('127.0.0.1:5000')
print('server will start at 127.0.0.1:5000')
grpc_server.start()
try:
while 1:
time.sleep(3600)
except KeyboardInterrupt:
grpc_server.stop(0)
if __name__ == '__main__':
run()
编写客户端client.py
文件
import time
import random
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
def test():
index = 0
while 1:
time.sleep(1)
data = str(random.random())
if index == 5: # index为5,断开连接,客户端不向服务端主动发送数据
break
print(index)
index += 1
yield pb2.TestClientSendStreamRequest(data=data)
def run():
conn = grpc.insecure_channel('127.0.0.1:5000')
client = pb2_grpc.demoStub(channel=conn)
# 双向流的方式传输
response = client.TestTwoWayStream(test()) # 客户端以流的形式不停的发信息给服务器
for res in response:
print(res.result)
if __name__ == '__main__':
run()
双向流,客户端一直发送信息,同时一直会受到服务端反馈的流信息