Asynchronous Database code with Tornado and Postgresql

In order to write asynchronous code is not enough use a framework like Tornado, all libraries that we use in order to talk with databases, web clients, etc, must support asynchronous style too. momoko is a wrapper for Psycopg2 that allows use postgresql with tornado asynchronously. Full documentation can be found at http://momoko.61924.nl/en/latest/

Initializing Database connection

We can use momoko.Pool function to get a postgresql asynchronous connection http://momoko.61924.nl/en/latest/api.html#connections


    from daos import UserDAO
    from tornado import httpserver, ioloop, options, web, gen
    from tornado.escape import json_decode

    import psycopg2
    import momoko



    class Application(web.Application):
        def __init__(self):
            handlers = [
                (r"/create-table/", PostgresTableHandler),
                (r"/delete-table/", PostgresTableHandler),
                (r"/user/", PostgresUserHandler)
            ]
            web.Application.__init__(self, handlers)
            dsn = 'dbname=ds_test user=db_test password=test ' \
                  'host=localhost port=5432'
            self.db = momoko.Pool(dsn=dsn, size=5)


    class PostgresHandler(web.RequestHandler):
        SUPPORTED_METHODS = ("GET", "POST", "DELETE", PUT")

        @gen.coroutine
        def prepare(self):
            if self.request.headers.get("Content-Type") == "application/json":
                try:
                    self.json_args = json_decode(self.request.body)
                except Exception as error:
                    self.finish('invalid request')
        @property
        def db(self):
            return self.application.db


A simple DAO class


    from tornado import gen

    import momoko
    import string
    import random

    class UserDAO(object):
        def __init__(self, db):
            self.db = db

        def _get_random_str(self, size=10):
            return ''.join(random.choice(string.ascii_uppercase + string.digits)
                           for x in range(size))

        @gen.coroutine
        def get(self, id):
            sql = """
                SELECT id, username, email, password
                FROM users_user
                WHERE id=%s
            """
            cursor = yield momoko.Op(self.db.execute, sql, (id,))
            desc = cursor.description
            result = [dict(zip([col[0] for col in desc], row))
                             for row in cursor.fetchall()]

            cursor.close()
            return result

        @gen.coroutine
        def get_list(self):
            sql = """
                SELECT id, username, email, password
                FROM users_user
            """
            cursor = yield momoko.Op(self.db.execute, sql)
            desc = cursor.description
            result = [dict(zip([col[0] for col in desc], row))
                             for row in cursor.fetchall()]

            cursor.close()
            return result

        @gen.coroutine
        def create(self):
            sql = """
                INSERT INTO users_user (username, email, password)
                VALUES (%s, %s, %s  )
            """
            username = self._get_random_str()
            email = '{0}@{1}.com'.format(self._get_random_str(),
                                         self._get_random_str())
            password = self._get_random_str()
            cursor = yield momoko.Op(self.db.execute, sql, (username, email, password))
            return cursor


        @gen.coroutine
        def update(self, id, data={}):
            fields = ''
            for key in data.keys():
                fields += '{0}=%s,'.format(key)

            sql = """
                UPDATE users_user
                SET {0}
                WHERE id=%s
            """.format(fields[0:-1])
            params = list(data.values())
            params.append(id)
            cursor = yield momoko.Op(self.db.execute, sql, params)
            return cursor


        @gen.coroutine
        def delete_table(self):
            sql = """
                DROP TABLE IF EXISTS users_user;
                DROP SEQUENCE IF EXISTS user_id;
            """
            cursor = yield momoko.Op(self.db.execute, sql)
            return cursor

        @gen.coroutine
        def delete(self, id):
            sql = """
                DELETE
                FROM users_user
                WHERE id=%s
            """
            cursor = yield momoko.Op(self.db.execute, sql, (id,))
            cursor.close()
            return ''

        @gen.coroutine
        def create_table(self, callback=None):
            sql = """
                CREATE SEQUENCE  user_id;
                CREATE TABLE IF NOT EXISTS users_user (
                    id integer PRIMARY KEY DEFAULT nextval('user_id') ,
                    username  varchar(80) UNIQUE,
                    email  varchar(80) UNIQUE,
                    password  varchar(80) 
                );
                ALTER SEQUENCE user_id OWNED BY users_user.id;
            """
            cursor = yield momoko.Op(self.db.execute, sql)
            return cursor

Table RequestHandler

This handler has two methods: one for create the table and another to delete it

    class PostgresTableHandler(PostgresHandler):

        @gen.coroutine
        def post(self):
            dao = UserDAO(self.db)
            cursor = yield (dao.create_table())
            if not cursor.closed:
                self.write('closing cursor')
                cursor.close()
            self.finish()

        @gen.coroutine
        def delete(self):
            dao = UserDAO(self.db)
            cursor = yield (dao.delete_table())
            if not cursor.closed:
                self.write('closing cursor')
                cursor.close()
            self.finish()

User RequestHandler

Handles simple CRUD operations using 4 HTTP methods (GET, PUT, POST, DELETE):

    class PostgresUserHandler(PostgresHandler):
        @gen.coroutine
        def get(self, id=None):
            dao = UserDAO(self.db)
            if not id:
                dict_result = yield (dao.get_list())
            else:
                dict_result = yield (dao.get(id))
            self.write(json.dumps(dict_result))
            self.finish()

        @gen.coroutine
        def post(self):
            dao = UserDAO(self.db)
            cursor = yield (dao.create())
            if not cursor.closed:
                self.write('closing cursor')
                cursor.close()
            self.finish()

        @gen.coroutine
        def put(self, id=None):
            if not hasattr(self, 'json_args'):
                self.write('invalid request')
                self.finish()
            else:
                dao = UserDAO(self.db)
                if id:
                    result = yield (dao.update(id, data=self.json_args))
                    dict_result = yield (dao.get(id))
                    self.write(json.dumps(dict_result))
                else:
                    self.write('invalid user')
                self.finish()

        @gen.coroutine
        def delete(self, id=None):
            if id:
                dao = UserDAO(self.db)
                result = yield (dao.delete(id))
                self.write('user deleted')
            else:
                self.write('invalid user')
            self.finish()

Using the Application

Creating the table


    $ curl  -X POST  localhost:8004/create-table

Creating random users

   $ curl  -X POST  localhost:8004/user 

Get user list


   $ curl  -X GET  localhost:8004/user
    [{"id": 1, "email": "OUGI14GG3Q@BYQMP4RC1B.com", "password": "ND3ISHYVJA", "username": "W30HNJAYI4"}, {"id": 2, "email": "J2LQPR0Z94@VC0XP3DZV8.com", "password": "CCJVGTB9CF", "username": "3LRZDYYD52"}] 

Get user by id

   $ curl  -X GET  localhost:8004/user/1
    [{"id": 1, "email": "OUGI14GG3Q@BYQMP4RC1B.com", "password": "ND3ISHYVJA", "username": "W30HNJAYI4"}]     

Update user

   $ curl -X PUT -H "Content-Type: application/json"   localhost:8004/user/2 -d '{"username": "userupdated577"}'
    [{"id": 2, "email": "J2LQPR0Z94@VC0XP3DZV8.com", "password": "CCJVGTB9CF", "username": "userupdated577"}] 


Delete user

   $ curl  -X DELETE  localhost:8004/user/1
    user deleted 

Delete table

   curl  -X DELETE  localhost:8004/delete-table 

Example code can be found at this link