Skip to content

基本使用

开发一个python的服务器端和客户端调用grpc服务器端

普通rpc协议传输方式

grpc:安装相关的包:

  • pip install grpcio
  • pip install grpcio-tools
  • pip install protobuf

编写.proto代码

test.proto代码如下:

protobuf
syntax = "proto3";   // 使用哪种protobuf
package test;   // 给包起一个名称

// 服务器中需要的服务,服务框架 demo:服务名 hellojlc:服务对应的函数名  函数对应的hellojlcReq和hellojlcReply
service demo {
    rpc hellojlc(hellojlcReq) returns (hellojlcReply){}
}

// 定义数据结构   hellojlcReq和hellojlcReply对应需要传的参数和参数类型
message hellojlcReq {
    string name = 1;
    int32 age = 2;
}

消息定义中的每个字段都有一个唯一的编号。这些字段编号用于以二进制格式标识您的字段,一旦消息类型被使用,就不应该被更改。Tag的取值范围最小是1,最大是229229,但是19000~19999 是 protobuf 预留的,用户不能使用。使用频率高的变量最好设置为1~15,这样可以减少编码后的数据大小,但由于编号一旦指定不能修改,所以为了以后扩展,也记得为未来保留一些 1~15 的编号。

map方式定义数据类型的说明:

protobuf
message hellojlcReq {
    string name = 1;
    int32 age = 2;
message hellojlcReqNumberValue {
string name = 1;
    	    int32 age = 2;
Bool is_active = 3;
}
map<string,hellojlcReqNumberValue > number = 4;
}

对应客户端的输入方式{“a”:pb2.hellojlcReqNumberValue {name : jlc age 23 : is_active :true}}

protobuf
message hellojlcReply {
  string result = 1;
}

编译.proto文件

.proto文件中定义了相关的数据结构,这些数据结构是面向开发者和业务程序的,并不面向存储和传输。当需要把这些数据进行存储或传输时,就需要将这些结构数据进行序列化、反序列化以及读写。ProtoBuf 提供相应的接口代码,可以通过 protoc 这个编译器来生成相应的接口代码,把.proto文件转换成.py文件,在Terminal中输入:

python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. test.proto

编译完生成_pb2.py文件和_pb2_grpc.py文件:

  • _pb2.py文件

    每一个message对应的信息存储,requestresponse消息体在这里被定义extension

    包括message中数据类型及其默认值的信息

  • _pb2_grpc.py文件

用来存储每一个服务的server与客户端以及注册server的工具

客户端名为:server_name + Stub

服务器名为:server_name + Servicer

注册服务函数为:add服务端名to_server

相关代码介绍:

py
class demoStub(object):   # 客户端
    def __init__(self, channel):
        self.hellojlc = channel.unary_unary(   # unary_unary普通的传输类型
                '/test.demo/hellojlc',
                request_serializer=test__pb2.hellojlcReq.SerializeToString,    # 序列化
                response_deserializer=test__pb2.hellojlcReply.FromString,    # 反序列化
                )
class demoServicer(object):  # 服务端
    def hellojlc(self, request, context):
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)   # 定义了一些code码和状态码
        context.set_details('Method not implemented!')     # 定义了一些错误信息
        raise NotImplementedError('Method not implemented!')
def add_demoServicer_to_server(servicer, server):  # 服务注册函数
    rpc_method_handlers = {    # rpc方法的控制器
            'hellojlc': grpc.unary_unary_rpc_method_handler(
                    servicer.hellojlc,
                    request_deserializer=test__pb2.hellojlcReq.FromString,
                    response_serializer=test__pb2.hellojlcReply.SerializeToString,
            ),
    }
    generic_handler = grpc.method_handlers_generic_handler(
            'test.demo', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))

编写服务端service.py文件

py
import time
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
from concurrent import futures    # 创建线程数量

class demo(pb2_grpc.demoServicer):   # 编写服务
    def hellojlc(self, request, context):   # 编写大类里面的服务函数
        name = request.name    # 获取参数
        age = request.age
        result = f'my name is {name}, i am {age} years old'   # 返回参数
        return pb2.hellojlcReply(result=result)
def run():    # 编写启动服务
    grpc_server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=4)   # 定义最大线程数量
    )
    pb2_grpc.add_demoServicer_to_server(demo(),grpc_server)  # 注册服务,把demo()注册到grpc_server
    grpc_server.add_insecure_port('127.0.0.1:5000')   # 绑定ip和端口号
    print('server will start at 127.0.0.1:5000')
    grpc_server.start()    # 启动服务

    try:     # 一直连续启动服务
        while 1:
            time.sleep(3600)
    except KeyboardInterrupt:
        grpc_server.stop(0)   # 安全退出
if __name__ == '__main__':
    run()

编写客户端client.py文件

py
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
def run():
    conn = grpc.insecure_channel('127.0.0.1:5000')   # 定义频道
    client = pb2_grpc.demoStub(channel=conn)    # 生成客户端,指定了conn频道
    resposne = client.hellojlc(pb2.hellojlcReq(      # 传输数据
        name='jlc',
        age=23
    ))
    print(resposne.result)
if __name__ == '__main__':
    run()

单向服务器端发送给客户端流传输

编写.proto代码

test.proto代码如下:

protobuf
syntax = "proto3";
package test;
service demo {
    rpc TestClientRecvStream(TestClientRecvStreamRequest) returns(stream TestClientRecvStreamResponse){}  // 单向服务器端发送给客户端流传输

}
message TestClientRecvStreamRequest {
    string data = 1;
}
message TestClientRecvStreamResponse {
    string result = 1;
}

编写服务端service.py文件

py
import time
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
from concurrent import futures

class demo(pb2_grpc.demoServicer):
    def TestClientRecvStream(self, request, context):  # 客户端请求服务器端是一个普通的方式
        index = 0
        # 监听客户端是不是一个活跃的状态,只有客户端一直活跃,服务器才会一直发数据
        while context.is_active():   
            data = request.data
if data == 'close':    # 收到close数据,服务器端停止运行
               print('data is close, request will cancel')
               context.cancek()
            time.sleep(1)
            index += 1
result = 'send %d %s' % (index,data)
print(result)
            yield pb2.TestClientRecvStreamResponse(    # 服务器端要给客户端返回一种流的形式
                result = result
            )
def run():
    grpc_server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=4)
    )
    pb2_grpc.add_demoServicer_to_server(demo(),grpc_server)
    grpc_server.add_insecure_port('127.0.0.1:5000')
    print('server will start at 127.0.0.1:5000')
    grpc_server.start()

    try:
        while 1:
            time.sleep(3600)
    except KeyboardInterrupt:
        grpc_server.stop(0)

if __name__ == '__main__':
    run()

编写客户端client.py文件

py
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc

def run():
    conn = grpc.insecure_channel('127.0.0.1:5000')
    client = pb2_grpc.demoStub(channel=conn)
    response = client.TestClientRecvStream(pb2.TestClientRecvStreamRequest(   #客户端不停接收流的形式
        data = 'jlc'
    ))
    for item in response:
        print(item.result)

if __name__ == '__main__':
    run()

客户端在不停的收到服务器端的数据流


单向客户端发送给服务器端流传输

服务器端不停接收信息,当达到一定状态时,断开这一次的请求,并且服务器端返回给客户端一个结果

编写.proto代码

test.proto代码如下:

protobuf
syntax = "proto3";
package test;
service demo {
    rpc TestClientSendStream(stream TestClientSendStreamRequest) returns(TestClientSendStreamResponse){}  // 单向客户端发送给服务器端流传输

}
message TestClientSendStreamRequest {
    string data = 1;
}
message TestClientSendStreamResponse {
    string result = 1;
}

编写服务端service.py文件

py
import time
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
from concurrent import futures

class demo(pb2_grpc.demoServicer):
    def TestClientSendStream(self, request_iterator, context): # request_iterator表示迭代器
        index = 0
        for request in request_iterator:
            print(request.data, ':',index)
            if index == 10:         # index为10,断开连接
                break
            index += 1
        return pb2.TestClientSendStreamResponse(result='ok') # 断开连接后,给客户端返回一个ok

def run():
    grpc_server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=4)
    )
    pb2_grpc.add_demoServicer_to_server(demo(),grpc_server)
    grpc_server.add_insecure_port('127.0.0.1:5000')
    print('server will start at 127.0.0.1:5000')
    grpc_server.start()

    try:
        while 1:
            time.sleep(3600)
    except KeyboardInterrupt:
        grpc_server.stop(0)

if __name__ == '__main__':
    run()

编写客户端client.py文件

py
import time
import random
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc

def test():
    index = 0
    while 1:
        time.sleep(1)
        data = str(random.random())
        if index == 5:   # index为5,断开连接,客户端不向服务端主动发送数据
            break
        print(index)
        index += 1
        yield pb2.TestClientSendStreamRequest(data=data)

def run():
    conn = grpc.insecure_channel('127.0.0.1:5000')
    client = pb2_grpc.demoStub(channel=conn)
    # 单向客户端发送给服务器端流传输方式
    response = client.TestClientSendStream(test())
    print(response.result)

if __name__ == '__main__':
    run()

双向流传输

编写.proto代码

test.proto代码如下:

protobuf
syntax = "proto3";
package test;
service demo {
        rpc TestTwoWayStream(stream TestTwoWayStreamRequest) returns(stream TestTwoWayStreamResponse) {}  // 双向流传输
}
message TestTwoWayStreamRequest {
    string data = 1;
}
message TestTwoWayStreamResponse {
    string result = 1;
}

编写服务端service.py文件

py
import time
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc
from concurrent import futures

class demo(pb2_grpc.demoServicer):
        def TestTwoWayStream(self, request_iterator, context):
        for request in request_iterator:   # 从客户端获取数据的方式
            data = request.data
            yield pb2.TestTwoWayStreamResponse(result='service send client %s' % data)

def run():
    grpc_server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=4)
    )
    pb2_grpc.add_demoServicer_to_server(demo(),grpc_server)
    grpc_server.add_insecure_port('127.0.0.1:5000')
    print('server will start at 127.0.0.1:5000')
    grpc_server.start()

    try:
        while 1:
            time.sleep(3600)
    except KeyboardInterrupt:
        grpc_server.stop(0)

if __name__ == '__main__':
    run()

编写客户端client.py文件

py
import time
import random
import grpc
import test_pb2 as pb2
import test_pb2_grpc as pb2_grpc

def test():
    index = 0
    while 1:
        time.sleep(1)
        data = str(random.random())
        if index == 5:   # index为5,断开连接,客户端不向服务端主动发送数据
            break
        print(index)
        index += 1
        yield pb2.TestClientSendStreamRequest(data=data)

def run():
    conn = grpc.insecure_channel('127.0.0.1:5000')
    client = pb2_grpc.demoStub(channel=conn)
    # 双向流的方式传输
    response = client.TestTwoWayStream(test())    # 客户端以流的形式不停的发信息给服务器
    for res in response:
        print(res.result)

if __name__ == '__main__':
    run()

双向流,客户端一直发送信息,同时一直会受到服务端反馈的流信息

Released under the MIT License.