SQLAlchemy

つーことで、PyCon APAC2013のCFP没ネタの2つ目。

インストール

特にめんどうなことなくpipでインストールできます。

$ pip install sqlalchemy

モデル定義

テーブル定義とクラス定義をして、それらをマッピングするのが、データマッパーの本来の方法ですが、sqlalchemy.ext.declarative を使うのが圧倒的に楽です。 これを使うとテーブル定義をクラス内で行い、自動でマッピングまで行ってくれます。 declarative_baseでベースクラスを定義して、それを継承してモデルを定義します。:

from sqlalchemy import (
    Column,
    Integer,
    ForeignKey,
)
from sqlalchemy.orm import (
    relationship,
    scoped_session,
    sessionmaker,
)

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
DBSession = scoped_session(sessionmaker())

class BankAccount(Base):
    __tablename__ = 'bankaccount'
    query = DBSession.query_property()
    id = Column(Integer, primary_key=True)
    balance = Column(Integer, default=0)
    bank_id = Column(Integer, ForeignKey('bank.id'))
    bank = relationship('Bank', backref='accounts')

    def deposit(self, amount):
        self.balance += amount

    def withdraw(self, amount):
        self.balance -= amount

class Bank(Base):
    __tablename__ = 'bankaccount'
    query = DBSession.query_property()
    id = Column(Integer, primary_key=True)

データ作成とかクエリとか

新規作成は作ったオブジェクトを sessionにaddします。:

bank = Bank()
DBSession.add(bank)
DBSession.commit()

DBSessionはaddされたオブジェクトを flush メソッドが呼ばれたときに INSERT します。 また、commitメソッドでDBにcommitします。このときflushが必要だったときに自動でflushを呼びます。

既にDBSession管理下のオブジェクトに新規オブジェクトを関連づけても、INSERTされます。:

bank.accounts.append(BankAccount())
DBSession.commit()

モデルを取得する場合は、DBSessionのqueryメソッドを使用します。 クエリには、filterメソッドで条件を追加できます。:

DBSession.query(BankAccount)
DBSession.query(BankAccount).filter(BankAccount.balance>=100)

queryメソッドはクエリを作成するだけで、実際に取得させるには、allやone, firstなどのメソッドを呼ぶか、 for文などでイテレータとして評価させます。:

DBSession.query(BankAccount).all()
DBSession.query(BankAccount).first()
DBSession.query(BankAccount).one()
for account in DBSession.query(BankAccount):
    account.withdraw(199)

また、DBSession.query_propertyで、そのクラスのクエリを生成するプロパティを作成できます。(上記の例でも定義してあります。):

BankAccount.query.filter(BankAccount.balance>=50).all()

プラクティス

テストとかの注意

view内でcommitすると、テスト中でもデータベースが必要になります。 また、テスト後にrollbackしてもテストで作成したデータが消えないため他のテストに影響がでます。

SQLAlchemyは、SessionがUnitOfWorkパターンを用いて更新情報を管理しているので、モデルの状態変更だけを気にするようにしましょう。

commit,rolbackは、wsgiミドルウェアやpyramidのtweenなどの仕組みを活用します。 zope.sqlalchemyのZopeTransactionExtensionを使うと、repoze.tm2やpyramid_tmでトランザクション管理ができます。 SQLAlchemyだけでよい場合は以下のようなwsgiミドルウェアが利用できます。:

class SQLATransactionMiddleware(object):
    def __init__(self, app, dbsession):
        self.app = app
        self.dbsession = dbsession

    @wsgify
    def __call__(self, request):
        try:
            request.get_response(self.app)
            self.dbsession.commit()
        except Exception as e:
            self.dbsession.abort()
            six.reraise()
        finally:
            self.dbsession.remove()

primaryjoinやsecondaryjoinの条件を活用する

relationshipではForeignKeyを自動で結合条件として認識しますが、primaryjoinやsecondaryjoinを使ってさらに条件を追加できます。 たとえば、BlogとBlogEntryのような関係があった場合に、publicフラグがTrueのものだけを必要とする場合を考えてみましょう。:

class Blog(Base):
    __tablename__ = 'blogs'
    id = Column(Integer, primary_key=True)
    title = Column(UnicodeText)

    entries = relationship('BlogEntry', backref="blog")
    public_entries = relationship('BlogEntry', primaryjoin='and_(Blog.id==BlogEntry.blog_id, BlogEntry.published!=None)')


class BlogEntry(Base):
    __tablename__ = 'blog_entries'
    id = Column(Integer, primary_key=True)
    title = Column(UnicodeText)
    published = Column(DateTime)
    blog_id = Column(Integer, ForeignKey('blogs.id'))

entriesにはprimaryjoinがないので、 BlogEntryのblog_idの外部キーを自動で条件に採用します。 public_entriesでは、primaryjoinで条件を指定しています。blog_idの条件およびpublishedが設定されているものが、public_entriesで取得されるようになります。

1テーブル 1クラス という妄想を捨てる

SQLAlchemyはデータマッパーなので、複数テーブル複数クラスなどのマッピングもできます。(Djang○ ORM とは違うのだよ) declarative_baseを使っているとクラス内でテーブル構成を定義するのでついつい1テーブル1クラスという考えになってしまいがちです。 複数テーブルを結合した集計サマリーなどのクエリをviewに書くような原始的な開発はもうやめましょう。

class Customer(Base):
    __tablename__ = 'customers'
    id = Column(Integer, primary_key=True)
    name = Column(UnicodeText)


class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    amount = Column(Integer)
    customer_id = Column(Integer, ForeignKey('customers.id'))
    customer = relationship('Customer', backref="orders")


order_summary = select([func.sum(Order.__table__.c.amount).label('amount'),
                        Order.__table__.c.customer_id]
).group_by(
    Order.__table__.c.customer_id
).alias()


class CustomerOrder(Base):
    __table__ = Customer.__table__.join(order_summary)

CustomerOrder は __tablename__ ではなく __table__を使っています。 このクラスには、customersと、orderを集計した結果を結合したものをマッピングしています。 喜んでください!みなさんが大好きな宣言的な書き方ですよ!

Comments !

blogroll

social