本章会用两种方式来实现,原生和grpc框架来实现。
一、基础实现
1.1、Server
from multiprocessing.connection import Listener
from threading import Thread
from remote_call import RPCHandler
def rpc_server(handler, address, authkey):
    sock = Listener(address, authkey=authkey)
    while True:
        client = sock.accept()
        t = Thread(target=handler.handle_connection, args=(client,))
        t.daemon = True
        t.start()
# Some remote functions
def add(x, y):
    return x + y
def sub(x, y):
    return x - y
# Register with a handler
handler = RPCHandler()
handler.register_function(add)
handler.register_function(sub)
# Run the server
rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')1.2、Handler
class RPCHandler:
    def __init__(self):
        self._functions = { }
    def register_function(self, func):
        self._functions[func.__name__] = func
    def handle_connection(self, connection):
        try:
            while True:
                # Receive a message
                func_name, args, kwargs = pickle.loads(connection.recv())
                # Run the RPC and send a response
                try:
                    r = self._functions[func_name](*args,**kwargs)
                    connection.send(pickle.dumps(r))
                except Exception as e:
                    connection.send(pickle.dumps(e))
        except EOFError:
             pass1.3、Proxy
import pickle
class RPCProxy:
    def __init__(self, connection):
        self._connection = connection
    def __getattr__(self, name):
        def do_rpc(*args, **kwargs):
            self._connection.send(pickle.dumps((name, args, kwargs)))
            result = pickle.loads(self._connection.recv())
            if isinstance(result, Exception):
                raise result
            return result
        return do_rpc1.4、Client
from multiprocessing.connection import Client
from chapter11.rpc_proxy import RPCProxy
c = Client(('localhost', 17000), authkey=b'peekaboo')
proxy = RPCProxy(c)
print(f'add(3, 5) = {proxy.add(3, 5)}')
print(f'sub(5, 12) = {proxy.sub(5, 12)}')
proxy.sub([1, 2], 4)1.5、JSON实现
1.5.1、Server
import json
class RPCHandler:
    def __init__(self):
        self._functions = { }
    def register_function(self, func):
        self._functions[func.__name__] = func
    def handle_connection(self, connection):
        try:
            while True:
                # Receive a message
                func_name, args, kwargs = json.loads(connection.recv())
                # Run the RPC and send a response
                try:
                    r = self._functions[func_name](*args,**kwargs)
                    connection.send(json.dumps(r))
                except Exception as e:
                    connection.send(json.dumps(str(e)))
        except EOFError:
             pass1.5.2、Client
import json
class RPCProxy:
    def __init__(self, connection):
        self._connection = connection
    def __getattr__(self, name):
        def do_rpc(*args, **kwargs):
            self._connection.send(json.dumps((name, args, kwargs)))
            result = json.loads(self._connection.recv())
            return result
        return do_rpc1.6、xml
from xmlrpc.server import SimpleXMLRPCServer
class KeyValueServer:
    _rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']
    def __init__(self, address):
        self._data = {}
        self._serv = SimpleXMLRPCServer(address, allow_none=True)
        for name in self._rpc_methods_:
            self._serv.register_function(getattr(self, name))
    def get(self, name):
        return self._data[name]
    def set(self, name, value):
        self._data[name] = value
    def delete(self, name):
        del self._data[name]
    def exists(self, name):
        return name in self._data
    def keys(self):
        return list(self._data)
    def serve_forever(self):
        self._serv.serve_forever()
if __name__ == '__main__':
    kvserv = KeyValueServer(('', 15000))
    kvserv.serve_forever()from xmlrpc.client import ServerProxy
s = ServerProxy('https://localhost:15000', allow_none=True)
s.set('foo','bar')
s.set('spam', [1, 2, 3])
s.keys()
s.get('foo')
s.get('spam')
s.delete('spam')
s.exists('spam')二、Grpc框架
2.1、安装
sudo python3 -m pip install grpcio
sudo python3 -m pip install grpcio-tools
python -m grpc_tools.protoc -I../../protos --python_out=. --grpc_python_out=. ../../protos/product.proto2.2、Server
from concurrent import futures
import logging
import uuid
import grpc
import time
import product_info_pb2
import product_info_pb2_grpc
class ProductInfoServicer(product_info_pb2_grpc.ProductInfoServicer):
    def __init__(self):
        self.productMap = {}
    def addProduct(self, request, context):
        id = uuid.uuid1()
        request.id = str(id)
        print("addProduct:request", request)
        self.productMap[str(id)] = request
        response = product_info_pb2.ProductID(value = str(id))
        print("addProduct:response", response)
        return response
    def getProduct(self, request, context):
        print("getProduct:request", request)
        id = request.value
        response = self.productMap[str(id)]
        print("getProduct:response", response)
        return response
# create a gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# use the generated function `add_CalculatorServicer_to_server`
# to add the defined class to the server
product_info_pb2_grpc.add_ProductInfoServicer_to_server(
        ProductInfoServicer(), server)
# listen on port 50051
print('Starting server. Listening on port 50051.')
server.add_insecure_port('[::]:50051')
server.start()
# since server.start() will not block,
# a sleep-loop is added to keep alive
try:
    while True:
        time.sleep(86400)
except KeyboardInterrupt:
    server.stop(0)2.3、Client
import grpc
import product_info_pb2
import product_info_pb2_grpc
import time;
def run():
    # open a gRPC channel
    channel = grpc.insecure_channel('localhost:50051')
    # create a stub (client)
    stub = product_info_pb2_grpc.ProductInfoStub(channel)
    response = stub.addProduct(product_info_pb2.Product(name = "Apple iPhone 11", description = "Meet Apple iPhone 11. All-new dual-camera system with Ultra Wide and Night mode.", price = 699.0 ))
    print("add product: response", response)
    productInfo = stub.getProduct(product_info_pb2.ProductID(value = response.value))
    print("get product: response", productInfo)
run()









