使用 Thrift
ApacheThrift 最初是 Facebook 实现的一种支持多种编程语言、高效的远程服务调用框架,2008 年进入 Apache 开源项目。它采用中间语言(IDL,接口描述语言)定义 RPC 的接口和数据类型,通过一个编译器生成不同语言的代码(支持 C++、Java、Python、Ruby 等多种语言),其数据传输采用二进制格式,相对 XML 和 JSON 而言体积更小,对于高并发、大数据量和多语言的环境更有优势。
我们先安装它:
> wget http://mirrors.cnnic.cn/apache/thrift/0.9.3/thrift-0.9.3.tar.gz > tar zxf thrift-0.9.3.tar.gz > cd thrift-0.9.3 # 由于安装 RabbitMQ 的时候安装了 Erlang,可以禁用 Erlang > ./configure--without-erlang > make && sudo make install > cd lib/py > sudo make install
安装 Thrift 的 Python 库的时候提示了包的安装路径,在 server.py 一开始要指定这个包路径:
import sys
sys.path.append('gen-py')
sys.path.append('/usr/lib/python2.7/site-packages')
定义 IDL 文件
我们将把文件托管项目服务化。首先定义.thrift 文件(pastefile.thrift):
struct PasteFile{
1:required i32 id,
2:required string filename,
3:required string filehash,
4:required string filemd5,
5:required string uploadtime,
6:required string mimetype,
7:required i64 size,
8:required string url_s,
9:required string url_i,
10:required list<i32>image_size,
11:required string url_d,
12:required string url_p,
13:required string size_humanize,
14:required string type,
15:required string quoteurl,
}
struct CreatePasteFileRequest{
1:required string filehash,
2:required string filename,
3:required string mimetype,
4:optional i32 width,
5:optional i32 height,
}
exception ImageNotSupported{
1:string message
}
exception UploadImageError{
1:string message
}
exception NotFound{
1:i32 code
}
exception ServiceUnavailable{
1:string message
}
service PasteFileService{
PasteFile get(1:i32 pid)
throws(
1:ServiceUnavailable service_error,
2:NotFound not_found
),
list<string>get_file_info(1:string filename, 2:string mimetype)
PasteFile create(1:CreatePasteFileRequest request)
throws(
1:ServiceUnavailable service_error,
2:ImageNotSupported error,
3:UploadImageError image_error
),
}
解析一下.thrift 文件的语法:
1.struct 关键字表示 Thrift 的结构体,概念上类似于一个 C 结构体,它将相关属性组合在一起。
2.Thrift 要求预先定义好字段和返回值类型,i32、i64、string 等都是 Thrift 内置的类型,当然也可以自定义类型。
3.list 表示有序列表。除此之外还支持 map(Python 中的字典)和 set(无序不重复元素集)。
4.exception 关键字表示 Thrift 的异常。
5.service 是 Thrift 的服务接口(类似于 Python 的方法)。其中包含三个接口:get、get_file_info 与 create,每个接口参数不同,但是需要定义参数的类型和顺序。每行定义的第一个字段表示接口返回的类型,比如 PasteFile get(1:i32 pid) 表示执行 get 接口返回一个 PasteFile 类型的对象。需要注意,不一定返回的对象都是定义的结构体,也可以是内置的类型,比如 get_file_info 接口返回的就是一个字符串的列表。
6.throws 块内列出了可能抛出的异常。
生成 Thrift 代码:
> thrift -r --gen py pastefile.thrift
生成的代码目录结构如下:

服务端实现
先引入 Thrift 相关的模块:
from thrift.transport import TTransport, TSocket
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from pastefile import PasteFileService
from pastefile.ttypes import PasteFile, UploadImageError, NotFound
基于 PasteFile 实现了 RealPasteFile,需要重载两个方法。第一个是 get_url 方法,原来的用法是:
from flask import request
def get_url(self, subtype, is_symlink=False):
hash_or_link=self.symlink if is_symlink else self.filehash
return 'http://{host}/{subtype}/{hash_or_link}'.format(
subtype=subtype, host=request.host, hash_or_link=hash_or_link)
由于 request 来自 Flask,服务化之后不能使用 request.host,需要在响应之前再填充 host 这个参数,修改之后是这个样子:
def get_url(self, subtype, is_symlink=False):
hash_or_link=self.symlink if is_symlink else self.filehash
return 'http://%s/{subtype}/{hash_or_link}'.format(
subtype=subtype, hash_or_link=hash_or_link)
也就是预先留了一个%s 占位,等待 API 返回之前再拼进去。
另一个要修改的是 create_by_upload_file,之前参数 uploaded_file 是一个上传的文件对象,如果做了服务化则需要把这个对象通过 client 传给 server,再由 server 保存,这相当于增加了复杂度和网络延迟,所以直接保存文件,在 create_by_upload_file 中只判断重复:
@classmethod
def create_by_upload_file(cls, uploaded_file):
rst=uploaded_file
with open(rst.path) as f:
filemd5=get_file_md5(f)
uploaded_file=cls.get_by_md5(filemd5)
if uploaded_file:
os.remove(rst.path)
return uploaded_file
filestat=os.stat(rst.path)
rst.size=filestat.st_size
rst.filemd5=filemd5
return rst
再定义服务处理的类,这个类其实就是接口的封装:
class PasteFileHandler(object):
# 这一步比较绕,使用 Flask 保存文件在 app.py 中执行,没有必要传输到服务端再保
存,需要预先生成文件路径
def get_file_info(self, filename, mimetype):
rst=PasteFileModel(filename, mimetype, 0)
return rst.filehash, rst.path
# 方法的参数类型已经在 pastefile.thrift 中定义了,request 是一个
CreatePasteFileRequest 实例
def create(self, request):
width=request.width
height=request.height
filehash=request.filehash
filename=request.filename
mimetype=request.mimetype
uploaded_file=PasteFileModel.get_path(filehash)
uploaded_file.filename=filename
uploaded_file.mimetype=mimetype
try:
if width and height:
paste_file=RealPasteFile.rsize(uploaded_file, width, height)
else:
paste_file,_=RealPasteFile.create_by_upload_file(
uploaded_file, filehash)
except:
raise UploadImageError()
db.session.add(paste_file)
db.session.commit()
return self.convert_type(paste_file)
def get(self, pid):
paste_file=PasteFileModel.query.filter_by(id=pid).first()
if not paste_file:
raise NotFound() # 如果不使用预先定义的异常类,抛出的异常都是
TApplicationException
return self.convert_type(paste_file)
@classmethod
def convert_type(cls, paste_file):
'''将模型转化为 Thrift 结构体的类型'''
new_paste_file=PasteFile()
for attr in ('id', 'filehash', 'filename', 'filemd5', 'uploadtime',
'mimetype', 'symlink', 'size', 'quoteurl', 'size', 'type',
'url_d', 'url_i', 'url_s', 'url_p'):
val=getattr(paste_file, attr)
if isinstance(val, unicode):
# 因为需要传输字符串,所以对 unicode 要编码
val=val.encode('utf-8')
# Thrift 不支持 Python 的时间格式,需要转换一下,在客户端再转换回来
if isinstance(val, datetime):
val=str(val)
setattr(new_paste_file, attr, val)
return new_paste_file
启动服务的代码如下:
import logging
logging.basicConfig() # 这一步很重要,可以收到 Thrift 发出来的异常日志
handler=PasteFileHandler()
# Processor 用来从连接中读取数据,将处理授权给 handler(自己实现),最后将结果写到
连接上
processor=PasteFileService.Processor(handler)
# 服务端使用 8200 端口,transport 是网络读写抽象层,为到来的连接创建传输对象
transport=TSocket.TServerSocket(port=8200)
tfactory=TTransport.TBufferedTransportFactory()
pfactory=TBinaryProtocol.TBinaryProtocolFactory()
server=TServer.TThreadPoolServer(
processor, transport, tfactory, pfactory)
print 'Starting the server...'
server.serve()
客户端实现
client.py 中同样先引入 Thrift 相关的定义:
from thrift.transport import TTransport, TSocket
from thrift.protocol import TBinaryProtocol
from pastefile import PasteFileService
from pastefile.ttypes import (
PasteFile, CreatePasteFileRequest, UploadImageError,
NotFound)
为了让客户端连接发生在服务器启动之后,而且能重用连接,我们使用了 LocalProxy 包装 client:
from werkzeug.local import LocalProxy
def get_client():
# 同样使用 8200 端口,使用阻塞式 I/O 进行传输,是最常见的模式
transport=TSocket.TSocket('localhost', 8200)
transport=TTransport.TBufferedTransport(transport)
# 封装协议,使用二进制编码格式进行数据传输
protocol=TBinaryProtocol.TBinaryProtocol(transport)
client=PasteFileService.Client(protocol)
transport.open() # 打开连接
return client
client=LocalProxy(get_client)
这样就可以在下面的逻辑中直接使用 client 了。
由于上传逻辑出现在两个视图中,所以抽象一个连接函数,让代码复用:
def create(uploaded_file, width=None, height=None):
filename=uploaded_file.filename.encode('utf-8')
mimetype=uploaded_file.mimetype.encode('utf-8')
filehash, path=client.get_file_info(filename, mimetype)
create_request=CreatePasteFileRequest()
create_request.filename=filename
create_request.mimetype=mimetype
create_request.filehash=filehash
# 接收上传文件,直接保存,没有必要传输到服务端再去保存
uploaded_file.save(path)
if width is not None and height is not None:
create_request.width=width
create_request.height=height
try:
pastefile=client.create(create_request)
except UploadImageError: # 异常是在 PasteFileHandler 的 create 方法中预先定义的
return{'r':1, 'error':'upload fail'}
print isinstance(pastefile, PasteFile) # 只是验证
try : # 事实上没有必要重新 get 一次,因为 create 方法已经返回了 PasteFile 实例,这里
只是演示
paste_file=client.get(pastefile.id)
except NotFound:
return{'r':1, 'error':'not found'}
return{'r':0, 'paste_file':paste_file}
在 app.py 里面引用 create 方法,并且应用于视图:
from client import create
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method=='POST':
uploaded_file=request.files['file']
w=request.form.get('w', None)
h=request.form.get('h', None)
if not uploaded_file:
return abort(400)
# 使用 Thrift 客户端代码请求服务端之后获得创建的文件对象
rs=create(uploaded_file, width=w, height=h)
if rs['r']:
return rs['error']
paste_file=rs['paste_file']
return jsonify({
'url_d':paste_file.url_d%request.host, # 由于之前 get_url 的值中的主机
名使用了%s 占位,这里填充进去
'url_i':paste_file.url_i%request.host,
'url_s':paste_file.url_s%request.host,
'url_p':paste_file.url_p%request.host,
'filename':paste_file.filename,
'size':humanize_bytes(paste_file.size),
'uploadtime':paste_file.uploadtime,
'type':paste_file.type,
'quoteurl':paste_file.quoteurl.replace('%25s', request.host) # quoteurl 已
经是 url 编码后的结果,需要使用替换的方式
})
return render_template('index.html',**locals())
这样就实现了服务化:app.py 专注于视图逻辑;client.py 专注于请求服务;server.py 处理客户端发来的接口请求。实际生产环境中还可以考虑 gRPC 和 Nameko(http://bit.ly/1W3N8qK ),它们可作为实现 RPC 框架的参考。
如果你所在的生产环境只使用 Python 一种语言,不喜欢 Thrift 生成的不符合 Python 编码规范的代码,而且对 Python 实现的 RPC 情有独钟,可以选择饿了么开源的纯 Python 实现的 thriftpy(https://github.com/eleme/thriftpy )。
我们先安装它:
> pip install thriftpy cython
安装 cython 是因为 thriftpy 的二进制协议实现可以使用 cython 加速。接下来实现一个 add 功能的服务。首先看 Thrift 定义(calc.thrift):
service CalcService{
i64 add(1:i64 a, 2:i64 b),
}
看一下服务端的实现(server_with_thriftpy.py):
import os
import logging
import thriftpy
from thriftpy.rpc import make_server
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.transport import TBufferedTransportFactory
HERE=os.path.abspath(os.path.dirname(__file__))
logging.basicConfig()
calc_thrift=thriftpy.load(
os.path.join(HERE, 'calc.thrift'),
module_name='calc_thrift')
class Dispatcher(object):
def add(self, a, b):
return a+b
server=make_server(calc_thrift.CalcService,
Dispatcher(),
'127.0.0.1', 8300,
proto_factory=TBinaryProtocolFactory(),
trans_factory=TBufferedTransportFactory())
print 'serving...'
server.serve()
启动它:
> python chapter10/section2/server_with_thriftpy.py
再看一下客户端的实现(client_with_thriftpy.py):
import os
import thriftpy
from thriftpy.rpc import client_context
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.transport import TBufferedTransportFactory
HERE=os.path.abspath(os.path.dirname(__file__))
calc_thrift=thriftpy.load(
os.path.join(HERE, 'calc.thrift'),
module_name='calc_thrift')
with client_context(calc_thrift.CalcService,
'127.0.0.1', 8300,
proto_factory=TBinaryProtocolFactory(),
trans_factory=TBufferedTransportFactory(),
timeout=None) as calc:
rs=calc.add(1, 2)
print 'Result is:{}'.format(rs)
现在执行这个客户端程序,就能获得两数相加的结果了:
> python client_with_thriftpy.py Result is: 3
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论