Use PostgreSQL Database

In previous article, we build our own database. In this article, let's use it to store stock data. First, we will create some tables and here is the SQL script:

$ cat create_stock_tables.sql
CREATE TABLE stocks_symbol (
    symbol TEXT PRIMARY KEY,
    last_date DATE
);

CREATE TABLE stocks_daily (
    symbol TEXT,
    date DATE,
    open REAL NOT NULL,
    close REAL NOT NULL,
    high REAL NOT NULL,
    low REAL NOT NULL,
    volume REAL NOT NULL,
    PRIMARY KEY (symbol, date),
    FOREIGN KEY (symbol)
        REFERENCES stocks_symbol (symbol)
            ON DELETE CASCADE
);
The table stocks_symbol is used to store stock symbol list. The last_date field in the table is the last date on which we have queried the data and is used as the starting date when we update the tables of new data. The table stock_daily stores the daily stock information.

To create the tables, run the script either by the command

$ psql -d mydb -a -f create_stock_tables.sql
or inside psql
mydb=# \i create_stock_tables.sql
Let's check what is created:
mydb=# \dt+
                                       List of relations
 Schema |     Name      | Type  |   Owner   | Persistence | Access method | Size  | Description 
--------+---------------+-------+-----------+-------------+---------------+-------+-------------
 public | stocks_daily  | table | codespace | permanent   | heap          | 16 kB | 
 public | stocks_symbol | table | codespace | permanent   | heap          | 16 kB | 
(2 rows)

mydb=# \di+
                                                  List of relations
 Schema |        Name        | Type  |   Owner   |     Table     | Persistence | Access method | Size  | Description 
--------+--------------------+-------+-----------+---------------+-------------+---------------+-------+-------------
 public | stocks_daily_pkey  | index | codespace | stocks_daily  | permanent   | btree         | 16 kB | 
 public | stocks_symbol_pkey | index | codespace | stocks_symbol | permanent   | btree         | 16 kB | 
(2 rows)
There are two tables created along with two indexes built for the primary keys.

We will use a Python script to fill the tables. Python has various database drivers for PostgreSQL. We will use psycopg2. The psycopg2 database adapter implemented in C as a wrapper of PostgreSQL library libpq. It is probably the most popular and widely-used one for Python. If you do not have it installed, you can install it like

$ pip install psycopg2

Here is the Python script:

from psycopg2 import connect, Error
from urllib.request import urlopen
import csv
import sys

def retrieve_symbols(conn):
    """
    Function to retrieve symbol list from database table stocks_symbol
    """
    with conn:
        with conn.cursor() as cur:
            cur.execute("SELECT symbol from stocks_symbol")
            return [field[0] for field in cur.fetchall()]

def load_stock_data(symbol, conn):
    """
    Function to load the stock data retrieved from Yahoo Finance API to database
    """
    with conn:
        with conn.cursor() as cur:
            # Check if the symbol already exists
            cur.execute("""SELECT last_date, EXTRACT(EPOCH FROM last_date::TIMESTAMP WITHOUT TIME ZONE AT TIME ZONE 'America/New_York')::INTEGER,
                                  EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)::INTEGER FROM stocks_symbol WHERE symbol = '%s'""" %(symbol))
            row = cur.fetchone()
            if row:
                # If the symbol exists, query the data starting from last_date to current time and delete the record at last_date for update
                url = f'https://query1.finance.yahoo.com/v7/finance/download/{symbol}?period1={row[1]}&period2={row[2]}&interval=1d'
                cur.execute("DELETE FROM stocks_daily WHERE symbol = '%s' AND date = '%s'" %(symbol, row[0]))
            else:
                # If the symbol does not exists, insert the symbol and query data within 1 year
                cur.execute("INSERT INTO stocks_symbol (symbol) VALUES ('%s')" %(symbol))
                url = f'https://query1.finance.yahoo.com/v7/finance/download/{symbol}?range=1y&interval=1d'
            with urlopen(url) as response:
                line = response.readline()
                if line:
                    # The first line is header. Note: urlopen returns bytestream, need to convert to string before passing to csv reader
                    row = next(csv.reader([line.decode('utf-8')]))
                    header = {row[i] : i for i in range(len(row))}
                    # Check if all required fields exist
                    for field in ('Date', 'Open', 'Close', 'High', 'Low', 'Volume'):
                        if field not in header:
                            raise Exception(f"Missing required field {field} in stock data.")
                    cur.execute("""PREPARE insert_data (TEXT, DATE, REAL, REAL, REAL, REAL, REAL) as INSERT INTO stocks_daily
                                   (symbol, date, open, close, high, low, volume) VALUES ($1, $2, $3, $4, $5, $6, $7)""")
                    while True:
                        line = response.readline()
                        if not line:
                            break
                        row = next(csv.reader([line.decode('utf-8')]))
                        cur.execute(f"""EXECUTE insert_data('{symbol}', '{row[header['Date']]}', {row[header['Open']]}, {row[header['Close']]},
                                        {row[header['High']]}, {row[header['Low']]}, {row[header['Volume']]})""")
                    if row[header['Date']] == 'Date':
                        # If there is no data, rollback any changes
                        conn.rollback()
                    else:
                        # Use the last record to update last_data
                        cur.execute(f"UPDATE stocks_symbol SET last_date = '{row[header['Date']]}' WHERE symbol = '{symbol}'")
                        conn.commit()

def main():
    """
    Main function to load stock data into PostgreSQL database
    """
    config = {"host": "localhost", "dbname": "mydb", "user": "codespace", "port": "5432"}

    conn = connect(**config)
    # Get symbol list from command line arguments if existing or from database
    symbols = sys.argv[1:]
    if len(symbols) == 0:
        symbols = retrieve_symbols(conn)

    for symbol in symbols:
        try:
            load_stock_data(symbol, conn)
        except (Exception, Error) as error:
            print(f"Error: Load data for symbol {symbol} failed -", error)

    conn.close()

if __name__ == "__main__":
    main()

The script uses Yahoo Finance API to retrieve stock data in CSV format. It retrieves 1-year of stock data for the new added symbol or updates its data to the current. We manage the database transactions by with statement - If an exception happen within the with statement, it rolls back the whole transations. On success, however, different versions of psycopg2 have different behaviors. The newer version commits the transactions and the old version (before version 2.5) does nothing. To handle both cases, we call commit() if some records have been inserted. Note that the "with conn" statement does not close the connection (this behavior is changed in psycopg3), so we can use the same connection in multiple calls of with statement.

We use prepared statement in daily data insertion. This saves time from parsing SQL statement each time when there are many records inserted into the table (e.g., when inserting 1-year of data). There are more performance improvements we could do. Note that we call execute() to insert row by row into stocks_daily table. This could cause a lot of roundtrip networking to the database server. To reduce the number of round trips, we can read multiple lines from CSV data and use execute_batch() to insert them in one call. The fatest way is to use copy_from() but it needs a file-like object as data source (psycopg3 has changed this with a new method - copy). For our example, the performance may not be a real concern since we only retrieve 1-year of data in the beginning and most time we just fetch few records during updates.

Let's use the script to get the stock data of Google:

$ python ./load_stock_data.py GOOG
$ psql -d mydb
mydb-# SELECT * FROM stocks_daily WHERE symbol = 'GOOG';
 symbol |    date    |  open   |  close  |  high   |   low   |    volume     
--------+------------+---------+---------+---------+---------+---------------
 GOOG   | 2022-12-05 |  99.815 |   99.87 |  101.75 |  99.355 |   1.99555e+07
 GOOG   | 2022-12-06 |   99.67 |   97.31 |  100.21 |   96.76 |   2.08776e+07
 GOOG   | 2022-12-07 |   96.77 |   95.15 |   97.31 |  95.025 |   2.66479e+07
...
 GOOG   | 2023-12-01 |  133.32 |  133.32 |   133.5 | 132.152 |   2.42584e+07
 GOOG   | 2023-12-04 | 131.294 |  130.63 |  131.45 |   129.4 |   2.40836e+07
 GOOG   | 2023-12-05 |  130.37 |     132 |  132.15 |  129.73 |  2.874445e+06
(252 rows)
We have retrieved 1-year of Google stock data plus one day's of current time, total 252 rows. Since we are running the script during the trading time, we can run the script one more time to update the data:
$ python ./load_stock_data.py
mydb=# SELECT * FROM stocks_daily WHERE symbol = 'GOOG' ORDER BY date DESC LIMIT 3;
 symbol |    date    |  open   | close  |   high   |   low   |    volume    
--------+------------+---------+--------+----------+---------+--------------
 GOOG   | 2023-12-05 |  130.37 | 131.93 | 132.3499 |  129.73 | 3.706532e+06
 GOOG   | 2023-12-04 | 131.294 | 130.63 |   131.45 |   129.4 |  2.40836e+07
 GOOG   | 2023-12-01 |  133.32 | 133.32 |    133.5 | 132.152 |  2.42584e+07
(3 rows)
We can see the data in the current day is changed but other two days remain same.

The libpq can be used directly in a C program. Let's see how to write a C program with libpq to retrieve the stock data from the PostgreSQL database:


#include <stdio.h>
#include <stdlib.h>
#include <libpq-fe.h>

// Macros to check return status
#define PQ_CHECK_RESULT_OK(status) \
    if (PQresultStatus(res) != status) { \
	    fprintf(stderr, "Error in %s line %d: %s", __FILE__, __LINE__, PQerrorMessage(conn)); \
	    PQclear(res); \
	    PQfinish(conn); \
	    exit(1); \
    }
#define PQ_CHECK_COMMAND_OK PQ_CHECK_RESULT_OK(PGRES_COMMAND_OK)
#define PQ_CHECK_TUPLES_OK PQ_CHECK_RESULT_OK(PGRES_TUPLES_OK)

int main(int argc, char *argv[])
{
    const char *conninfo = "host=localhost dbname=mydb user=codespace port=5432";

    if (argc < 2) {
        printf("Usage: %s symbol\n", argv[0]);
        exit(0);
    }

    // Connect to database
    PGconn *conn = PQconnectdb(conninfo);
    if (PQstatus(conn) != CONNECTION_OK) {
	    fprintf(stderr, "Error in %s line %d: %s", __FILE__, __LINE__, PQerrorMessage(conn));
	    PQfinish(conn);
	    exit(1);
    }

    // Cursor needs to run inside a trasaction
    PGresult *res = PQexec(conn, "BEGIN");
    PQ_CHECK_COMMAND_OK
    PQclear(res);
    
    // Declare server-side cursor
    res = PQexecParams(conn, "DECLARE cur CURSOR FOR SELECT * FROM stocks_daily WHERE symbol = $1",
                       1, NULL, (const char *const []){argv[1]}, NULL, NULL, 0);
    PQ_CHECK_COMMAND_OK
    PQclear(res);
    
    int nFields = -1;
    while (1) {
        // Use cursor to retrieve 100 rows each time
        res = PQexec(conn, "FETCH 100 IN cur");
        PQ_CHECK_TUPLES_OK
        if (nFields < 0) {
            nFields = PQnfields(res);
            // Print field names
            for (int i = 0; i < nFields; ++ i) {
	            printf("%-12s", PQfname(res, i));
            }
            printf("\n");
        }
        int nRows = PQntuples(res);
        if (nRows == 0) break;
        // Print field values
        for (int j = 0; j < nRows; ++ j) {
	        for (int i = 0; i < nFields; ++ i) {
	            printf("%-12s", PQgetvalue(res, j, i));
	        }
	        printf("\n");
        }
        PQclear(res);
    }

    res = PQexec(conn, "CLOSE cur");
    PQ_CHECK_COMMAND_OK
    PQclear(res);
    
    res = PQexec(conn, "END");
    PQ_CHECK_COMMAND_OK
    PQclear(res);

    PQfinish(conn);
    return 0;
}
The program retrieves stock data from stocks_daily for a symbol and prints the data. It uses some C99 features and must be compiled with a C compiler that supports C99 standard.

In the program we use the server-side cursor. This is different from the cursor used in the Python script, where conn.cursor() returns a client-side cursor. The client-side cursor fetches everything at once. If there are millions of rows to fetch, this could sometimes crash the application. The server-side cursor can iterate through the data and fetch, like in the program, 100 rows each time. The psycopg2 supports server-side cursor also, passing a name in the cursor constructor like conn.cursor('cur') returns a server-side cursor.

To compile and build the C program, we need to have libpq installed. If you do not have it in the system, you can install it like

$sudo apt install libpq5
$sudo apt install libpq-dev
To locate the header and library files, We can use pg_config tool:
$ pg_config --includedir
/home/codespace/.asdf/installs/postgres/16.1/include
$ pg_config --libdir
/home/codespace/.asdf/installs/postgres/16.1/lib
Then compile and run the program:
$ cc -o retrieve_stock_data -I /home/codespace/.asdf/installs/postgres/16.1/include retrieve_stock_data.
c -L /home/codespace/.asdf/installs/postgres/16.1/lib -lpq
$ ./retrieve_stock_data GOOG
symbol      date        open        close       high        low         volume      
GOOG        2022-12-05  99.815      99.87       101.75      99.355      1.99555e+07 
GOOG        2022-12-06  99.67       97.31       100.21      96.76       2.08776e+07 
GOOG        2022-12-07  96.77       95.15       97.31       95.025      2.66479e+07
...
GOOG        2023-12-01  133.32      133.32      133.5       132.152     2.42584e+07 
GOOG        2023-12-04  131.294     130.63      131.45      129.4       2.40836e+07 
GOOG        2023-12-05  130.37      131.93      132.3499    129.73      3.706532e+06

This concludes our series of articles about PostgreSQL. In the series, we talked about how to install PostgreSQL into codespace, build a PostgreSQL database and use Python and C programs to access the database.

< Previous