This tutorial shows you how build a simple Python application with CockroachDB and the psycopg2 driver.

We have tested the psycopg2 driver enough to claim beta-level support. If you encounter problems, please open an issue with details to help us make progress toward full support.

Before you begin

  1. Install CockroachDB.
  2. Start up a secure or insecure local cluster.
  3. Choose the instructions that correspond to whether your cluster is secure or insecure:

Step 1. Install the psycopg2 driver

To install the Python psycopg2 driver, run the following command:

copy
icon/buttons/copy
$ pip install psycopg2

For other ways to install psycopg2, see the official documentation.

Step 2. Create the maxroach user and bank database

Start the built-in SQL shell:

copy
icon/buttons/copy
$ cockroach sql --certs-dir=certs

In the SQL shell, issue the following statements to create the maxroach user and bank database:

copy
icon/buttons/copy
> CREATE USER IF NOT EXISTS maxroach;
copy
icon/buttons/copy
> CREATE DATABASE bank;

Give the maxroach user the necessary permissions:

copy
icon/buttons/copy
> GRANT ALL ON DATABASE bank TO maxroach;

Exit the SQL shell:

copy
icon/buttons/copy
> \q

Step 3. Generate a certificate for the maxroach user

Create a certificate and key for the maxroach user by running the following command. The code samples will run as this user.

copy
icon/buttons/copy
$ cockroach cert create-client maxroach --certs-dir=certs --ca-key=my-safe-directory/ca.key

Step 4. Run the Python code

Now that you have a database and a user, you'll run the code shown below to:

  • Create an accounts table and insert some rows.
  • Transfer funds between two accounts inside a transaction. To ensure that we handle transaction retry errors, we write an application-level retry loop that, in case of error, sleeps before trying the funds transfer again. If it encounters another retry error, it sleeps for a longer interval, implementing exponential backoff.
  • Finally, we delete the accounts from the table before exiting so we can re-run the example code.

Copy the code or download it directly.

copy
icon/buttons/copy
#!/usr/bin/env python3

import psycopg2
import psycopg2.errorcodes
import time
import logging
import random


def create_accounts(conn):
    with conn.cursor() as cur:
        cur.execute('CREATE TABLE IF NOT EXISTS accounts (id INT PRIMARY KEY, balance INT)')
        cur.execute('UPSERT INTO accounts (id, balance) VALUES (1, 1000), (2, 250)')
        logging.debug("create_accounts(): status message: {}".format(cur.statusmessage))
    conn.commit()


def print_balances(conn):
    with conn.cursor() as cur:
        cur.execute("SELECT id, balance FROM accounts")
        logging.debug("print_balances(): status message: {}".format(cur.statusmessage))
        rows = cur.fetchall()
        conn.commit()
        print("Balances at {}".format(time.asctime()))
        for row in rows:
            print([str(cell) for cell in row])


def delete_accounts(conn):
    with conn.cursor() as cur:
        cur.execute("DELETE FROM bank.accounts")
        logging.debug("delete_accounts(): status message: {}".format(cur.statusmessage))
    conn.commit()


# Wrapper for a transaction.
# This automatically re-calls "op" with the open transaction as an argument
# as long as the database server asks for the transaction to be retried.
def run_transaction(conn, op):
    retries = 0
    max_retries = 3
    with conn:
        while True:
            retries +=1
            if retries == max_retries:
                err_msg = "Transaction did not succeed after {} retries".format(max_retries)
                raise ValueError(err_msg)

            try:
                op(conn)

                # If we reach this point, we were able to commit, so we break
                # from the retry loop.
                break
            except psycopg2.Error as e:
                logging.debug("e.pgcode: {}".format(e.pgcode))
                if e.pgcode == '40001':
                    # This is a retry error, so we roll back the current
                    # transaction and sleep for a bit before retrying. The
                    # sleep time increases for each failed transaction.
                    conn.rollback()
                    logging.debug("EXECUTE SERIALIZATION_FAILURE BRANCH")
                    sleep_ms = (2**retries) * 0.1 * (random.random() + 0.5)
                    logging.debug("Sleeping {} seconds".format(sleep_ms))
                    time.sleep(sleep_ms)
                    continue
                else:
                    logging.debug("EXECUTE NON-SERIALIZATION_FAILURE BRANCH")
                    raise e


# This function is used to test the transaction retry logic.  It can be deleted
# from production code.
def test_retry_loop(conn):
    with conn.cursor() as cur:
        # The first statement in a transaction can be retried transparently on
        # the server, so we need to add a dummy statement so that our
        # force_retry() statement isn't the first one.
        cur.execute('SELECT now()')
        cur.execute("SELECT crdb_internal.force_retry('1s'::INTERVAL)")
    logging.debug("test_retry_loop(): status message: {}".format(cur.statusmessage))


def transfer_funds(conn, frm, to, amount):
    with conn.cursor() as cur:

        # Check the current balance.
        cur.execute("SELECT balance FROM accounts WHERE id = " + str(frm))
        from_balance = cur.fetchone()[0]
        if from_balance < amount:
            err_msg = "Insufficient funds in account {}: have {}, need {}".format(frm, from_balance, amount)
            raise RuntimeError(err_msg)

        # Perform the transfer.
        cur.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    (amount, frm))
        cur.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",
                    (amount, to))
    conn.commit()
    logging.debug("transfer_funds(): status message: {}".format(cur.statusmessage))


def main():

    conn = psycopg2.connect(
        database='bank',
        user='maxroach',
        sslmode='require',
        sslrootcert='certs/ca.crt',
        sslkey='certs/client.maxroach.key',
        sslcert='certs/client.maxroach.crt',
        port=26257,
        host='localhost'
    )

    # Uncomment the below to turn on logging to the console.  This was useful
    # when testing transaction retry handling.  It is not necessary for
    # production code.
    # log_level = getattr(logging, 'DEBUG', None)
    # logging.basicConfig(level=log_level)

    create_accounts(conn)

    print_balances(conn)

    amount = 100
    fromId = 1
    toId = 2

    try:
        run_transaction(conn, lambda conn: transfer_funds(conn, fromId, toId, amount))

        # The function below is used to test the transaction retry logic.  It
        # can be deleted from production code.
        # run_transaction(conn, lambda conn: test_retry_loop(conn))
    except ValueError as ve:
        # Below, we print the error and continue on so this example is easy to
        # run (and run, and run...).  In real code you should handle this error
        # and any others thrown by the database interaction.
        logging.debug("run_transaction(conn, op) failed: {}".format(ve))
        pass

    print_balances(conn)

    delete_accounts(conn)

    # Close communication with the database.
    conn.close()


if __name__ == '__main__':
    main()

Then run the code:

copy
icon/buttons/copy
$ python basic-sample.py

The output should show the account balances before and after the funds transfer:

Balances at Wed Aug  7 12:11:23 2019
['1', '1000']
['2', '250']
Balances at Wed Aug  7 12:11:23 2019
['1', '900']
['2', '350']

Step 2. Create the maxroach user and bank database

Start the built-in SQL shell:

copy
icon/buttons/copy
$ cockroach sql --insecure

In the SQL shell, issue the following statements to create the maxroach user and bank database:

copy
icon/buttons/copy
> CREATE USER IF NOT EXISTS maxroach;
copy
icon/buttons/copy
> CREATE DATABASE bank;

Give the maxroach user the necessary permissions:

copy
icon/buttons/copy
> GRANT ALL ON DATABASE bank TO maxroach;

Exit the SQL shell:

copy
icon/buttons/copy
> \q

Step 3. Run the Python code

Now that you have a database and a user, you'll run the code shown below to:

  • Create an accounts table and insert some rows.
  • Transfer funds between two accounts inside a transaction. To ensure that we handle transaction retry errors, we write an application-level retry loop that, in case of error, sleeps before trying the funds transfer again. If it encounters another retry error, it sleeps for a longer interval, implementing exponential backoff.
  • Finally, we delete the accounts from the table before exiting so we can re-run the example code.

Copy the code or download it directly.

copy
icon/buttons/copy
#!/usr/bin/env python3

import psycopg2
import psycopg2.errorcodes
import time
import logging
import random


def create_accounts(conn):
    with conn.cursor() as cur:
        cur.execute('CREATE TABLE IF NOT EXISTS accounts (id INT PRIMARY KEY, balance INT)')
        cur.execute('UPSERT INTO accounts (id, balance) VALUES (1, 1000), (2, 250)')
        logging.debug("create_accounts(): status message: {}".format(cur.statusmessage))
    conn.commit()


def print_balances(conn):
    with conn.cursor() as cur:
        cur.execute("SELECT id, balance FROM accounts")
        logging.debug("print_balances(): status message: {}".format(cur.statusmessage))
        rows = cur.fetchall()
        conn.commit()
        print("Balances at {}".format(time.asctime()))
        for row in rows:
            print([str(cell) for cell in row])


def delete_accounts(conn):
    with conn.cursor() as cur:
        cur.execute("DELETE FROM bank.accounts")
        logging.debug("delete_accounts(): status message: {}".format(cur.statusmessage))
    conn.commit()


# Wrapper for a transaction.
# This automatically re-calls "op" with the open transaction as an argument
# as long as the database server asks for the transaction to be retried.
def run_transaction(conn, op):
    retries = 0
    max_retries = 3
    with conn:
        while True:
            retries +=1
            if retries == max_retries:
                err_msg = "Transaction did not succeed after {} retries".format(max_retries)
                raise ValueError(err_msg)

            try:
                op(conn)

                # If we reach this point, we were able to commit, so we break
                # from the retry loop.
                break
            except psycopg2.Error as e:
                logging.debug("e.pgcode: {}".format(e.pgcode))
                if e.pgcode == '40001':
                    # This is a retry error, so we roll back the current
                    # transaction and sleep for a bit before retrying. The
                    # sleep time increases for each failed transaction.
                    conn.rollback()
                    logging.debug("EXECUTE SERIALIZATION_FAILURE BRANCH")
                    sleep_ms = (2**retries) * 0.1 * (random.random() + 0.5)
                    logging.debug("Sleeping {} seconds".format(sleep_ms))
                    time.sleep(sleep_ms)
                    continue
                else:
                    logging.debug("EXECUTE NON-SERIALIZATION_FAILURE BRANCH")
                    raise e


# This function is used to test the transaction retry logic.  It can be deleted
# from production code.
def test_retry_loop(conn):
    with conn.cursor() as cur:
        # The first statement in a transaction can be retried transparently on
        # the server, so we need to add a dummy statement so that our
        # force_retry() statement isn't the first one.
        cur.execute('SELECT now()')
        cur.execute("SELECT crdb_internal.force_retry('1s'::INTERVAL)")
    logging.debug("test_retry_loop(): status message: {}".format(cur.statusmessage))


def transfer_funds(conn, frm, to, amount):
    with conn.cursor() as cur:

        # Check the current balance.
        cur.execute("SELECT balance FROM accounts WHERE id = " + str(frm))
        from_balance = cur.fetchone()[0]
        if from_balance < amount:
            err_msg = "Insufficient funds in account {}: have {}, need {}".format(frm, from_balance, amount)
            raise RuntimeError(err_msg)

        # Perform the transfer.
        cur.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    (amount, frm))
        cur.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",
                    (amount, to))
    conn.commit()
    logging.debug("transfer_funds(): status message: {}".format(cur.statusmessage))


def main():

    dsn = 'postgresql://maxroach@localhost:26257/bank?sslmode=disable'
    conn = psycopg2.connect(dsn)

    # Uncomment the below to turn on logging to the console.  This was useful
    # when testing transaction retry handling.  It is not necessary for
    # production code.
    # log_level = getattr(logging, 'DEBUG', None)
    # logging.basicConfig(level=log_level)

    create_accounts(conn)

    print_balances(conn)

    amount = 100
    fromId = 1
    toId = 2

    try:
        run_transaction(conn, lambda conn: transfer_funds(conn, fromId, toId, amount))

        # The function below is used to test the transaction retry logic.  It
        # can be deleted from production code.
        # run_transaction(conn, lambda conn: test_retry_loop(conn))
    except ValueError as ve:
        # Below, we print the error and continue on so this example is easy to
        # run (and run, and run...).  In real code you should handle this error
        # and any others thrown by the database interaction.
        logging.debug("run_transaction(conn, op) failed: {}".format(ve))
        pass

    print_balances(conn)

    delete_accounts(conn)

    # Close communication with the database.
    conn.close()


if __name__ == '__main__':
    main()

Then run the code:

copy
icon/buttons/copy
$ python basic-sample.py

The output should show the account balances before and after the funds transfer:

Balances at Wed Jul 24 15:58:40 2019
['1', '1000']
['2', '250']
Balances at Wed Jul 24 15:58:40 2019
['1', '900']
['2', '350']

What's next?

Read more about using the Python psycopg2 driver.

You might also be interested in the following pages:



Yes No