This tutorial shows you how build a simple Python application with CockroachDB using a PostgreSQL-compatible driver or ORM.

We have tested the Python psycopg2 driver and the SQLAlchemy ORM enough to claim beta-level support, so those are featured here. If you encounter problems, please open an issue with details to help us make progress toward full support.

Before you begin

  1. Install CockroachDB.
  2. Start up a secure or insecure local cluster.
  3. Choose the instructions that correspond to whether your cluster is secure or insecure:

Step 1. Install the psycopg2 driver

To install the Python psycopg2 driver, run the following command:

copy
icon/buttons/copy
$ pip install psycopg2

For other ways to install psycopg2, see the official documentation.

Step 2. Create the maxroach user and bank database

Start the built-in SQL client:

copy
icon/buttons/copy
$ cockroach sql --certs-dir=certs

In the SQL shell, issue the following statements to create the maxroach user and bank database:

copy
icon/buttons/copy
> CREATE USER IF NOT EXISTS maxroach;
copy
icon/buttons/copy
> CREATE DATABASE bank;

Give the maxroach user the necessary permissions:

copy
icon/buttons/copy
> GRANT ALL ON DATABASE bank TO maxroach;

Exit the SQL shell:

copy
icon/buttons/copy
> \q

Step 3. Generate a certificate for the maxroach user

Create a certificate and key for the maxroach user by running the following command. The code samples will run as this user.

copy
icon/buttons/copy
$ cockroach cert create-client maxroach --certs-dir=certs --ca-key=my-safe-directory/ca.key

Step 4. Run the Python code

Now that you have a database and a user, you'll run the code shown below to:

  • Create an accounts table and insert some rows.
  • Transfer funds between two accounts inside a transaction. To ensure that we handle transaction retry errors, we write an application-level retry loop that, in case of error, sleeps before trying the funds transfer again. If it encounters another retry error, it sleeps for a longer interval, implementing exponential backoff.
  • Finally, we delete the accounts from the table before exiting so we can re-run the example code.

Copy the code or download it directly.

copy
icon/buttons/copy
#!/usr/bin/env python3

import psycopg2
import psycopg2.errorcodes
import time
import logging
import random


def create_accounts(conn):
    with conn.cursor() as cur:
        cur.execute('CREATE TABLE IF NOT EXISTS accounts (id INT PRIMARY KEY, balance INT)')
        cur.execute('UPSERT INTO accounts (id, balance) VALUES (1, 1000), (2, 250)')
        logging.debug("create_accounts(): status message: {}".format(cur.statusmessage))
    conn.commit()


def print_balances(conn):
    with conn.cursor() as cur:
        cur.execute("SELECT id, balance FROM accounts")
        logging.debug("print_balances(): status message: {}".format(cur.statusmessage))
        rows = cur.fetchall()
        conn.commit()
        print("Balances at {}".format(time.asctime()))
        for row in rows:
            print([str(cell) for cell in row])


def delete_accounts(conn):
    with conn.cursor() as cur:
        cur.execute("DELETE FROM bank.accounts")
        logging.debug("delete_accounts(): status message: {}".format(cur.statusmessage))
    conn.commit()


# Wrapper for a transaction.
# This automatically re-calls "op" with the open transaction as an argument
# as long as the database server asks for the transaction to be retried.
def run_transaction(conn, op):
    retries = 0
    max_retries = 3
    with conn:
        while True:
            retries +=1
            if retries == max_retries:
                err_msg = "Transaction did not succeed after {} retries".format(max_retries)
                raise ValueError(err_msg)

            try:
                op(conn)

                # If we reach this point, we were able to commit, so we break
                # from the retry loop.
                break
            except psycopg2.Error as e:
                logging.debug("e.pgcode: {}".format(e.pgcode))
                if e.pgcode == '40001':
                    # This is a retry error, so we roll back the current
                    # transaction and sleep for a bit before retrying. The
                    # sleep time increases for each failed transaction.
                    conn.rollback()
                    logging.debug("EXECUTE SERIALIZATION_FAILURE BRANCH")
                    sleep_ms = (2**retries) * 0.1 * (random.random() + 0.5)
                    logging.debug("Sleeping {} seconds".format(sleep_ms))
                    time.sleep(sleep_ms)
                    continue
                else:
                    logging.debug("EXECUTE NON-SERIALIZATION_FAILURE BRANCH")
                    raise e


# This function is used to test the transaction retry logic.  It can be deleted
# from production code.
def test_retry_loop(conn):
    with conn.cursor() as cur:
        # The first statement in a transaction can be retried transparently on
        # the server, so we need to add a dummy statement so that our
        # force_retry() statement isn't the first one.
        cur.execute('SELECT now()')
        # The function below can only be run by the root user.  Trying to run
        # it as user 'maxroach' will fail with an error.
        cur.execute("SELECT crdb_internal.force_retry('1s'::INTERVAL)")
    logging.debug("test_retry_loop(): status message: {}".format(cur.statusmessage))


def transfer_funds(conn, frm, to, amount):
    with conn.cursor() as cur:

        # Check the current balance.
        cur.execute("SELECT balance FROM accounts WHERE id = " + str(frm))
        from_balance = cur.fetchone()[0]
        if from_balance < amount:
            err_msg = "Insufficient funds in account {}: have {}, need {}".format(frm, from_balance, amount)
            raise RuntimeError(err_msg)

        # Perform the transfer.
        cur.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    (amount, frm))
        cur.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",
                    (amount, to))
    conn.commit()
    logging.debug("transfer_funds(): status message: {}".format(cur.statusmessage))


def main():

    conn = psycopg2.connect(
        database='bank',
        user='maxroach',
        sslmode='require',
        sslrootcert='certs/ca.crt',
        sslkey='certs/client.maxroach.key',
        sslcert='certs/client.maxroach.crt',
        port=26257,
        host='localhost'
    )

    # Uncomment the below to turn on logging to the console.  This was useful
    # when testing transaction retry handling.  It is not necessary for
    # production code.
    # log_level = getattr(logging, 'DEBUG', None)
    # logging.basicConfig(level=log_level)

    create_accounts(conn)

    print_balances(conn)

    amount = 100
    fromId = 1
    toId = 2

    try:
        run_transaction(conn, lambda conn: transfer_funds(conn, fromId, toId, amount))

        # The function below is used to test the transaction retry logic.  It
        # can be deleted from production code.
        # run_transaction(conn, lambda conn: test_retry_loop(conn))
    except ValueError as ve:
        # Below, we print the error and continue on so this example is easy to
        # run (and run, and run...).  In real code you should handle this error
        # and any others thrown by the database interaction.
        logging.debug("run_transaction(conn, op) failed: {}".format(ve))
        pass

    print_balances(conn)

    delete_accounts(conn)

    # Close communication with the database.
    conn.close()


if __name__ == '__main__':
    main()

Then run the code:

copy
icon/buttons/copy
$ python basic-sample.py

The output should show the account balances before and after the funds transfer:

Balances at Wed Aug  7 12:11:23 2019
['1', '1000']
['2', '250']
Balances at Wed Aug  7 12:11:23 2019
['1', '900']
['2', '350']

Step 2. Create the maxroach user and bank database

Start the built-in SQL client:

copy
icon/buttons/copy
$ cockroach sql --insecure

In the SQL shell, issue the following statements to create the maxroach user and bank database:

copy
icon/buttons/copy
> CREATE USER IF NOT EXISTS maxroach;
copy
icon/buttons/copy
> CREATE DATABASE bank;

Give the maxroach user the necessary permissions:

copy
icon/buttons/copy
> GRANT ALL ON DATABASE bank TO maxroach;

Exit the SQL shell:

copy
icon/buttons/copy
> \q

Step 3. Run the Python code

Now that you have a database and a user, you'll run the code shown below to:

  • Create an accounts table and insert some rows.
  • Transfer funds between two accounts inside a transaction. To ensure that we handle transaction retry errors, we write an application-level retry loop that, in case of error, sleeps before trying the funds transfer again. If it encounters another retry error, it sleeps for a longer interval, implementing exponential backoff.
  • Finally, we delete the accounts from the table before exiting so we can re-run the example code.

Copy the code or download it directly.

copy
icon/buttons/copy
#!/usr/bin/env python3

import psycopg2
import psycopg2.errorcodes
import time
import logging
import random


def create_accounts(conn):
    with conn.cursor() as cur:
        cur.execute('CREATE TABLE IF NOT EXISTS accounts (id INT PRIMARY KEY, balance INT)')
        cur.execute('UPSERT INTO accounts (id, balance) VALUES (1, 1000), (2, 250)')
        logging.debug("create_accounts(): status message: {}".format(cur.statusmessage))
    conn.commit()


def print_balances(conn):
    with conn.cursor() as cur:
        cur.execute("SELECT id, balance FROM accounts")
        logging.debug("print_balances(): status message: {}".format(cur.statusmessage))
        rows = cur.fetchall()
        conn.commit()
        print("Balances at {}".format(time.asctime()))
        for row in rows:
            print([str(cell) for cell in row])


def delete_accounts(conn):
    with conn.cursor() as cur:
        cur.execute("DELETE FROM bank.accounts")
        logging.debug("delete_accounts(): status message: {}".format(cur.statusmessage))
    conn.commit()


# Wrapper for a transaction.
# This automatically re-calls "op" with the open transaction as an argument
# as long as the database server asks for the transaction to be retried.
def run_transaction(conn, op):
    retries = 0
    max_retries = 3
    with conn:
        while True:
            retries +=1
            if retries == max_retries:
                err_msg = "Transaction did not succeed after {} retries".format(max_retries)
                raise ValueError(err_msg)

            try:
                op(conn)

                # If we reach this point, we were able to commit, so we break
                # from the retry loop.
                break
            except psycopg2.Error as e:
                logging.debug("e.pgcode: {}".format(e.pgcode))
                if e.pgcode == '40001':
                    # This is a retry error, so we roll back the current
                    # transaction and sleep for a bit before retrying. The
                    # sleep time increases for each failed transaction.
                    conn.rollback()
                    logging.debug("EXECUTE SERIALIZATION_FAILURE BRANCH")
                    sleep_ms = (2**retries) * 0.1 * (random.random() + 0.5)
                    logging.debug("Sleeping {} seconds".format(sleep_ms))
                    time.sleep(sleep_ms)
                    continue
                else:
                    logging.debug("EXECUTE NON-SERIALIZATION_FAILURE BRANCH")
                    raise e


# This function is used to test the transaction retry logic.  It can be deleted
# from production code.
def test_retry_loop(conn):
    with conn.cursor() as cur:
        # The first statement in a transaction can be retried transparently on
        # the server, so we need to add a dummy statement so that our
        # force_retry() statement isn't the first one.
        cur.execute('SELECT now()')
        # The function below can only be run by the root user.  Trying to run
        # it as user 'maxroach' will fail with an error.
        cur.execute("SELECT crdb_internal.force_retry('1s'::INTERVAL)")
    logging.debug("test_retry_loop(): status message: {}".format(cur.statusmessage))


def transfer_funds(conn, frm, to, amount):
    with conn.cursor() as cur:

        # Check the current balance.
        cur.execute("SELECT balance FROM accounts WHERE id = " + str(frm))
        from_balance = cur.fetchone()[0]
        if from_balance < amount:
            err_msg = "Insufficient funds in account {}: have {}, need {}".format(frm, from_balance, amount)
            raise RuntimeError(err_msg)

        # Perform the transfer.
        cur.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    (amount, frm))
        cur.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",
                    (amount, to))
    conn.commit()
    logging.debug("transfer_funds(): status message: {}".format(cur.statusmessage))


def main():

    dsn = 'postgresql://maxroach@localhost:26257/bank?sslmode=disable'
    conn = psycopg2.connect(dsn)

    # Uncomment the below to turn on logging to the console.  This was useful
    # when testing transaction retry handling.  It is not necessary for
    # production code.
    # log_level = getattr(logging, 'DEBUG', None)
    # logging.basicConfig(level=log_level)

    create_accounts(conn)

    print_balances(conn)

    amount = 100
    fromId = 1
    toId = 2

    try:
        run_transaction(conn, lambda conn: transfer_funds(conn, fromId, toId, amount))

        # The function below is used to test the transaction retry logic.  It
        # can be deleted from production code.
        # run_transaction(conn, lambda conn: test_retry_loop(conn))
    except ValueError as ve:
        # Below, we print the error and continue on so this example is easy to
        # run (and run, and run...).  In real code you should handle this error
        # and any others thrown by the database interaction.
        logging.debug("run_transaction(conn, op) failed: {}".format(ve))
        pass

    print_balances(conn)

    delete_accounts(conn)

    # Close communication with the database.
    conn.close()


if __name__ == '__main__':
    main()

Then run the code:

copy
icon/buttons/copy
$ python basic-sample.py

The output should show the account balances before and after the funds transfer:

Balances at Wed Jul 24 15:58:40 2019
['1', '1000']
['2', '250']
Balances at Wed Jul 24 15:58:40 2019
['1', '900']
['2', '350']

What's next?

Read more about using the Python psycopg2 driver.

You might also be interested in using a local cluster to explore the following CockroachDB benefits:



Yes No