ORM: Peewee 实践
Peewee是Python的RDBMS ORM库。与node的mongoose之于MongoDB一样,在数据库上面封装一层来抽象数据库操作。在编写业务逻辑代码时,利用Peewee可以采取OO的风格,避免编写复杂的SQL语句代码来提高写代码的效率。
Peewee与其他ORM相比,有着更简洁轻量的功能模块,在5000多行代码内完成基本功能,这包括了数据库的连接和操作、映射模型、类型定义、查询操作符等。在学习成本方面,比功能强大的sqlalchemy要简单易用得多。在兼容方面,peewee直接调用pysqlite2、psycopg2、MySQLdb库来操作SQLite、Postgresql、MySQL这三类数据库(支持扩展更多的数据库类型),并且还实现了一层连接池。
在我们的项目中,本地数据库只需存储处理结果并提供给API和二级缓存(一级缓存使用memcache)实用,使用Peewee这种小锤子就可以满足。下面,我先介绍Peewee的使用流程,再将其应用在项目上。
基本概念
使用方法还是和其他ORM一致,首先时定义数据模型Model。Model等相关概念与RDB概念的对应关系如下表。数据模型除了定义表的schema外还可以定义模型的方法。我们在模型内部嵌套一个meta类来指定Mode连接的数据库、约束条件等信息。
| ORM | RDB |
|:--------------:|:-----------------------:|
| Model class | Database table |
| Field instance | Column on a table |
| Model instance | Row in a database table |
连接数据库后,如果Model的表没有创建的话,还需要先使用db.create_tables([Model])创建表。
Peewee提供多种方法来操作数据库的CRUD。而Model的方法有继承自Peewee Model类的方法,这些方法时Peewee预定义的常用操作,比如.insert_many()、.get_or_create()等常用方法。也可以自定义与业务自身相关的方法。在下一节中我将介绍与项目相关的自定义方法。
实践
这里我用的例子是项目中Bottle框架来与Peewee组合完成基本的API。在获得相关资源的HTTP请求时可以将数据处理交给Peewee的Model来完成。
首先我们创建两个表来存储应用数据。一个是Schema存储数据仓库中的表名和字段名。另一个是MetaColumn,存储数据字段的组合。首先我们要定义一个Model基类,其除了继承Model的通用方法并且拥有自定义的方法:.models_to_array()
方法是将查询结果转换为数组。另一个.models_to_object_array()
方法是将结果转换为对象数组。
# *-./models/base.py -*
from peewee import *
from common.db import db_client
import logging
class BaseModel(Model):
@staticmethod
def models_to_array(models, field):
result = []
for model in models:
# hurry code need reduce the object
result.append(model._data[field])
logging.info('query %d record.' % len(result))
return result
@staticmethod
def models_to_object_array(models, fields):
result = []
for model in models:
result.append({ field: model._data[field] for field in fields})
logging.info('query %d record.' % len(result))
return result
class Meta:
database = db_client # should be set by user
这样,接下来的两个表就可以直接继承并调用这两个方法,而不必重复定义。
Schema用来储存平台的存储数据表的信息,起结构比较简单,包括表名和字段名。
# *-./models/schema.py -*
from peewee import *
from base import BaseModel
class Schema(BaseModel):
columns = CharField()
category = CharField()
class Meta:
db_table = 'schemas'
这样,直接调用Schema.select().where(Schema.category == 'alerts').get()
就能获得alerts表的结构信息。
而MetaColumn类处理的事情相对复杂,既需要完成数据导入,也需要对数据进行查询。
数据导入分为初始导入和增量导入,初始导入就是无脑的分批次调用.insert_many()
。而增量导入对于已存在数据则不进行处理(‘IGNORE’)。
而查询则是通过自定义的.query_by_option()
方法。.query_by_option()
可以通过option参数选择查询的字段,而对于条件的选择是通过category、customer、env、componen、item这五个字段来确定,再调用私用函数.__generate_clauses()
可以将条件字段转换为满足Peewee查询的参数格式(其实就是Field instance)。
# *-./models/meta_column.py -*
from peewee import *
from base import BaseModel
from common.db import db_client
import logging
import operator
class MetaColumn(BaseModel):
category = CharField(null=True)
customer = CharField(null=True)
env = CharField(null=True)
component = CharField(null=True)
item = CharField(null=False)
@staticmethod
def query_by_option(option='', category='', customer='', env='', component='', item=''):
if option.strip() != '':
clauses = MetaColumn.__generate_clauses(category, customer, env, component, item)
meta_columns = MetaColumn.select(getattr(MetaColumn, option)) \
.distinct() \
.where(clauses) \
.order_by(getattr(MetaColumn, option))
return BaseModel.models_to_array(meta_columns, option)
else:
return []
@staticmethod
def __generate_clauses(category='', customer='', env='', component='', item=''):
clauses = []
if category.strip() != '':
clauses.append((MetaColumn.category == category))
if customer.strip() != '':
clauses.append((MetaColumn.customer == customer))
if env.strip() != '':
clauses.append((MetaColumn.env == env))
if component.strip() != '':
clauses.append((MetaColumn.component == component))
if item.strip() != '':
clauses.append((MetaColumn.item == item))
if len(clauses) == 0:
return None
else:
return reduce(operator.and_, clauses)
@staticmethod
def insert_one_by_one(data):
for one_line in data:
obj_inserted = {
'category': one_line[0],
'customer': one_line[1],
'env': one_line[2],
'component': one_line[3],
'item': one_line[4]
}
try:
return MetaColumn.insert(**obj_inserted).on_conflict('IGNORE').execute() # return model instance
except:
logging.error('save one meta_column record failed.')
@staticmethod
def insert_from_array(data, bulk_size=256):
"""insert from a 2-d array of strings"""
with db_client.transaction():
for idx in range(0, len(data), bulk_size):
MetaColumn.insert_many(map(lambda i: {
'category': i[0],
'customer': i[1],
'env': i[2],
'component': i[3],
'item': i[4],
}, data[idx:idx + bulk_size])).execute()
@staticmethod
def check_empty():
nums = MetaColumn.select().count()
return nums == 0
class Meta:
db_table = 'meta_columns'
indexes = (
# create a unique on category/customer/env/component/item
(('category', 'customer', 'env', 'component', 'item'), True),
)
这样在Bottle server上响应请求的函数内调用对应表的Model的方法就可以对数据进行查询了。具体内容如下。
# *-./server.py -*
@app.route('/options/<option_type>', method=['GET'])
@check_params(option_type=str, category=str, customer=int, env=str, component=str, item =str)
def options(option_type='', category='', customer='', env='', component='', item =''):
try:
result = MetaColumn.query_by_option(option_type, category, customer, env, component, item)
return json.dumps(result)
except Exception, e:
return error(500, e)
迁移
迁移是在工程上的概念。当我们处在开发阶段需要常规性的修改数据表的结构时,必须要保证原始数据库能够平滑地迁移到最新的结构上。这里采用的方法是按日期编写数据库迁移代码,迁移代码放在项目的./migrations
目录下,以时间为前缀+_migration命名文件,比如./migrations/20160802_migration.py
。
# *-./migrations/20160802_migration.py -*
import sys
import os
import datetime
from playhouse.migrate import *
PROJECT_ROOT = os.path.join(os.path.dirname(__file__), '..')
sys.path.append(PROJECT_ROOT) # fix directory issue
from common.db import connect_db_client
from common.config import load_config
PROJECT_ROOT = os.path.dirname(__file__) + '/../'
os.chdir(PROJECT_ROOT)
def main():
conf = load_config()
db_client = connect_db_client(conf['db']['type'], os.path.join(PROJECT_ROOT, conf['db']['url']))
migrator = SqliteMigrator(db_client)
table_name = 'user_logs'
col_name = 'created_on'
# check column name exist
col_exist = False
cursor = db_client.execute_sql('PRAGMA table_info(user_logs);')
for row in cursor.fetchall():
if col_name == row[1]:
col_exist = True
if col_exist:
print('Column exist.')
else:
# modify table schema
created_on_field = DateTimeField(default=datetime.datetime.now)
try:
with db_client.transaction():
migrate(
migrator.add_column(table_name, col_name, created_on_field),
)
except Exception, e:
raise e
else:
print('Add Column to table.')
db_client.close()
return 0
if __name__ == "__main__":
main()
这个脚本向user_logs表添加created_on字段。处于安全的考虑,首先检查字段是否存在。如果存在则不需要在添加字段。当然,这样只是修改数据库的结构,还需要修改原来的Model文件。这么做无疑很麻烦,但是Peewee的反射 (能够从已有数据库上反射Model的成员变量)功能比较薄弱。
我在install_local_db.py
中创建了一个数据库表结构安装器对象。DbSchemasInstaller主要完成两个功能。第一个是创建表结构,第二个依次运行migration下的文件来更新表结构。
# *-./install_local_db.py -*
class DbSchemasInstaller:
def __init__(self, schemas, migrations_dir='./migrations', error_file='./error_file'):
self.schemas = schemas
self.migrations_dir = migrations_dir
self.error_file = error_file
def create_tables(self):
try:
db.create_tables(self.schemas,
safe=True) # use safe=True to pass table already exists error
except Exception, e: # other error occur
logging.fatal(e)
else:
logging.info('finish create tables')
def migrate_one_by_one(self):
files = self.__get_files_by_date()
for f in files:
p = subprocess.Popen([sys.executable, f], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = p.communicate()
# try:
if p.returncode != 0:
logging.error("migrate %s failed %d %s %s" % (f, p.returncode, output, error))
# write error_file
self.__write_error_file(f)
return # TODO considering many migration situation
else:
logging.info("migrate %s step output: %s" % (f[-21:-3], "".join(output.strip().split('\n'))))
# finally:
# p.terminate()
# p.wait()
logging.info('finish migrations')
def __get_files_by_date(self):
error_pyfile_name = self.__read_error_file()
files = glob.glob(self.migrations_dir + "/*.py")
files.sort() # sort by first name date string
if error_pyfile_name is None:
return files
else:
os.system('rm ' + self.error_file)
return files[files.index(error_pyfile_name):]
def __read_error_file(self):
if os.path.isfile(self.error_file):
try:
f = open(self.error_file, 'r')
error_file = f.readline().strip()
except Exception, e:
logging.error('read file failed.')
f.close()
return None
else:
f.close()
return error_file
else:
return None
def __write_error_file(self, pyfile):
try:
with open(self.error_file, 'w') as f:
f.write(pyfile)
except:
logging.error('write file failed.')
f.close()
在.migrate_one_by_one()
首先读取迁移文件,然后再起一个进程来修改数据库结构。当迁移过程中出现错误时,将错误输出并记录错误文件。这样在修复问题后,可以接着执行这个错误的文件的更新内容。