Python  

Building a Stock Market API with FastAPI and Python

Introduction

This is a backend API for a stock market application, built using Python and FastAPI. It provides endpoints for accessing stock data, managing user accounts, and handling watchlists and portfolios.

Features

  • Stock Data: Retrieve real-time and historical stock information from an external API (Polygon.io).
  • User Authentication: Secure user registration and login using JWT (JSON Web Tokens).
  • Watchlists: (To be implemented) Manage user watchlists for tracking favorite stocks.
  • Portfolios: (To be implemented) Track user stock portfolios and performance.
  • News: (To be implemented) Fetch relevant financial news.

Technologies Used

  • Python: Programming language.
  • FastAPI: Web framework for building API.
  • Uvicorn: ASGI server for running the API.
  • SQLAlchemy: ORM for database interactions.
  • PostgreSQL: Database system.
  • Pydantic: Data validation and serialization.
  • python-jose: JWT encoding and decoding.
  • passlib: Password hashing.
  • requests: HTTP client for external API requests.
  • python-dotenv: For managing environment variables.

Github

https://github.com/tanujgyan/stock-m-backend/tree/master

I. Project Setup

To begin, we need to set up our development environment and install the necessary dependencies.

Create a backend directory/folder for your project

Go to the terminal and paste the following commands.

mkdir stock-market-backend
cd stock-market-backend

This will create a new directory called stock-market-backend, which will serve as your root directory, and the terminal window will now point to the root directory.

Create a Virtual Environment

This isolates our project's dependencies from the system-wide Python installation. Open a terminal or command prompt and paste the following lines.

python3 -m venv venv
source venv/bin/activate  # macOS/Linux
venv\Scripts\activate      # Windows

Install Dependencies

We'll use FastAPI for our API framework, Uvicorn as the ASGI server, SQLAlchemy for database interaction, Pydantic for data validation, and other supporting libraries.

pip install fastapi uvicorn sqlalchemy psycopg2-binary python-dotenv requests python-jose passlib

Once we have all the dependencies installed and the environment setup, we can use an IDE to start the development process. I will be using PyCharm Community edition to work on this project. You can use the IDE you want. I am also using Black to format the code.

Create the .env file

Create a .env file in the root of your backend directory and add two keys to it.

  • FINANCIAL_API_KEY=
  • DATABASE_URL=

We will set up our database using Docker and get the financial_api_key from Polygon in the upcoming steps, so for now we will leave them blank.

Create the main file and app directory

Open a terminal and cd to your root directory (if not there already) and paste the following commands.

mkdir app
touch app/main.py

This will create a new directory called app and will create a file inside it called main.py.

Basic FastAPI Setup in main.py

Open main.py in your editor and add the following code.

from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Stock Market Backend API"}

This will create a simple, fast API setup, which we will run in the next step to see if our setup is working as expected or not.

Run the API

Go to your terminal window and paste the following code.

uvicorn app.main:app --reload

We are using uvicorn to start the server. If everything goes well, you will see an output in your terminal indicating that the server is now running.

You can then open your web browser and go to http://127.0.0.1:8000 to see the {"message": "Stock Market Backend API"} response.

When I did it for the first time, I got an error saying.

  • Command 'python' not found, did you mean?
  • Command 'python3' from deb python3
  • Command 'python' from deb python-is-python3

It means my system is configured to use Python 3 instead of Python, so I had to adjust the command to create a virtual environment

python3 -m venv venv

Notice how I am using Python instead of Python.

After this, you can run the uvicorn app.main: app-- reload command again to start the server and follow the steps from there. If you did not get this error, you will have a running server at the moment.

II. Postgres Database setup using Docker

Before implementing the steps, you need to ensure you have Docker installed on your machine. Run the following command on any terminal window to check.

docker -v

When I run this, I get a response like this. It indicates I have Docker installed.

Docker version 27.4.1, build b9d17ea

If you don't have Docker installed, you can go to the official Docker website and install Docker for your Operating system.

You can install PostgreSQL locally also, but I like to use Docker to run PostgreSQL as it eliminates the need for a local installation.

The first step is to create a Docker Compose. YML file. Open Notepad or any other text editor and paste the following lines.

version: '3.8' # or your desired Compose file version
services:
  db:
    image: postgres:latest # or a specific version like postgres:15
    restart: always
    environment:
      POSTGRES_USER: myuser
      POSTGRES_PASSWORD: mypassword
      POSTGRES_DB: stockmarket
    ports:
      - "5432:5432" # Expose the port (optional, for external access)
    volumes:
      - postgres_data:/var/lib/postgresql/data # Persist data

volumes:
  postgres_data:

This is a YAML file that contains instructions for creating a PostgreSQL database. Let us take a look at some of the keys here

  • Image: postgres:latest: Uses the latest PostgreSQL image from Docker Hub. You can specify a version if needed (e.g., postgres:15).
  • restart: always: Ensures that the container restarts if it stops.
  • Environment: Sets environment variables for the PostgreSQL container.
    • POSTGRES_USER: The username for the database.
    • POSTGRES_PASSWORD: The password for the database.
    • POSTGRES_DB: The name of the database to create.
  • ports: - "5432:5432": (Optional) Exposes the PostgreSQL port (5432) to your host machine, allowing you to connect to it from outside Docker. If you only need this container to be accessed by other containers in the same docker compose network, then you can remove this line.
  • volumes: - postgres_data:/var/lib/postgresql/data: Creates a named volume to persist the database data. This ensures that your data is not lost when the container is stopped or removed.
  • volumes: postgres_data:: Defines the named volume.

Save this file in a directory of your choice.

Run Docker compose: Open a terminal and navigate to the directory where you have saved the YAML file created above, and run the following command.

docker-compose up -d

-d indicates that Docker is running in background mode.

To check if Docker is running correctly, run the following command.

docker ps --format "table {{.Names}} | {{.RunningFor}} | {{.Ports}} | {{.Status}}"

When I run this, I get the following output, which indicates this Docker image has been up for 23 minutes and was created 11 days ago, and is running on port 5432.

Output

III. Update .env file

Go to the official Polygon website https://polygon.io/ and create an account. You can sign up using GitHub or Facebook for easy access and create an API key. You can choose the free tier as they are pretty generous with the number of requests you can get free tier. They have various paid plans as well that you can choose based on your needs.

Go to the .env file and update the key.

FINANCIAL_API_KEY=<your_polygon_key_without_quotes>
DATABASE_URL=postgresql://myuser:mypassword@localhost:5432/stockmarket

IV. Create FastAPI endpoints

Install the requests library.

pip install requests

Inside your app directory, create a new directory called endpoints. As the name suggests, we will be storing all the endpoints here.

Create a new .py file called stocks.py and add the following code.

import os
from datetime import datetime, timedelta

import requests
from dotenv import load_dotenv
from fastapi import APIRouter, HTTPException
from fastapi.params import Depends
from sqlalchemy.orm import Session

from app.database import database, crud, models
from ..DTO.schemas import StockData, HistoricalData

load_dotenv() # this line loads the environment variable from .env file
router = APIRouter()
POLYGON_API_KEY = os.getenv("FINANCIAL_API_KEY") #gets your financial api key from .env file and save it in a variable
BASE_URL = "https://api.polygon.io"

# This calls the endpoint on polygon API to get details for a particular ticker. An example symbol is AAPL
@router.get("/stocks/{symbol}")
async def get_stock_data(symbol: str):
    url = f"{BASE_URL}/reference/tickers/{symbol}?apiKey={POLYGON_API_KEY}"
    response = requests.get(url)

    if response.status_code == 200:
        return response.json()
    else:
        raise HTTPException(status_code=response.status_code, detail="Failed to fetch stock data")

To make this API call, you can use any free tool like Postman or Insomnia. I will be using Insomnia for the purpose of this article. You can download it from its official website.

Updatethe main.py file to include router information. We have added some extra routes that we will be defining later in the article.

from fastapi import FastAPI

app = FastAPI()

app.include_router(stocks.router)
app.include_router(watchlist.router)
app.include_router(portfolio.router)
app.include_router(news.router)

@app.get("/")
async def root():
    return {"message": "Stock Market Backend API"}

Go to the terminal -> root directory and run the following command to start your FastAPI.

 uvicorn app.main:app --reload

Create a new GET request in Insomnia and pass this as URL http://127.0.0.1:8000/stocks/AAPL and click Send. If everything goes as expected, you will get a 200 Status code and a JSON response like this. This gives you a bunch of information, but we won't be using all of it and will update the code later to utilize only the key value pairs we need.

{
  "request_id": "589094d6bd913062cbb3908aa9bc1972",
  "results": {
    "ticker": "AAPL",
    "name": "Apple Inc.",
    "market": "stocks",
    "locale": "us",
    "primary_exchange": "XNAS",
    "type": "CS",
    "active": true,
    "currency_name": "usd",
    "cik": "0000320193",
    "composite_figi": "BBG000B9XRY4",
    "share_class_figi": "BBG001S5N8V8",
    "market_cap": 2725905366580,
    "phone_number": "(408) 996-1010",
    "address": {
      "address1": "ONE APPLE PARK WAY",
      "city": "CUPERTINO",
      "state": "CA",
      "postal_code": "95014"
    },
    "description": "Apple is among the largest companies in the world, with a broad portfolio of hardware and software products targeted at consumers and businesses. Apple's iPhone makes up a majority of the firm sales, and Apple's other products like Mac, iPad, and Watch are designed around the iPhone as the focal point of an expansive software ecosystem. Apple has progressively worked to add new applications, like streaming video, subscription bundles, and augmented reality. The firm designs its own software and semiconductors while working with subcontractors like Foxconn and TSMC to build its products and chips. Slightly less than half of Apple's sales come directly through its flagship stores, with a majority of sales coming indirectly through partnerships and distribution.",
    "sic_code": "3571",
    "sic_description": "ELECTRONIC COMPUTERS",
    "ticker_root": "AAPL",
    "homepage_url": "https://www.apple.com",
    "total_employees": 164000,
    "list_date": "1980-12-12",
    "branding": {
      "logo_url": "https://api.polygon.io/v1/reference/company-branding/YXBwbGUuY29t/images/2025-04-04_logo.svg",
      "icon_url": "https://api.polygon.io/v1/reference/company-branding/YXBwbGUuY29t/images/2025-04-04_icon.png"
    },
    "share_class_shares_outstanding": 15022070000,
    "weighted_shares_outstanding": 15022073000,
    "round_lot": 100
  },
  "status": "OK"
}

Let's add another endpoint to get historical data. Update the stocks.py file to add the following endpoint.

@router.get("/stocks/{symbol}/historical")
async def get_historical_data(symbol: str, date_from: str, date_to: str):
    url = f"{BASE_URL}/v2/aggs/ticker/{symbol}/range/1/day/{date_from}/{date_to}?adjusted=true&apiKey={POLYGON_API_KEY}"
    response = requests.get(url)

    if response.status_code == 200:
        return response.json()
    else:
        raise HTTPException(status_code=response.status_code, detail="Failed to fetch historical data")

You can read the various parameters we have passed and what each symbol in the response means by going to the Polygon docs. They have very extensive and informative documentation, and you can play with different query params directly on their website.

Let us add a third endpoint that allows us to search for any string in the ticker name or symbol. Update stocks.py to add the following endpoint.

@router.get("/stocks/search/{query}")
async def search_stocks(query: str):
    url = f"{BASE_URL}/v3/reference/tickers?search={query}&apiKey={POLYGON_API_KEY}"
    response = requests.get(url)

    if response.status_code == 200:
        return response.json()
    else:
        raise HTTPException(
            status_code=response.status_code, detail="Failed to fetch search results"
        )

V. Create Models

Now we have our basic endpoints working and returning us response in JSON format. We will now focus on models to map the results from JSON to our own classes so we can perform validations and searialize the data. We will be using pydantic for the purpose.

Install Pydantic

pip install pydantic

Inside api directory, create a new directory called DTO, and inside that, add a new file called schemas.py. Copy and paste the following code into the file.

from datetime import datetime
from typing import Optional, List

from pydantic import BaseModel

from app.api.DTO.marketenums import MarketEnum

class StockData(BaseModel):
    ticker: str
    name: str
    market: MarketEnum
    locale: str
    primary_exchange: str
    type: str
    active: bool
    currency_name: str
    cik: Optional[str]
    composite_figi: Optional[str]
    share_class_figi: Optional[str]
    last_updated_utc: datetime

class HistoricalData(BaseModel):
    ticker: str
    queryCount: int
    resultsCount: int
    adjusted: bool
    results: List[dict]
    status: str
    request_id: str
    count: int

class StockSearch(BaseModel):
    results: List[StockData]
    count: int
    status: str
    request_id: str

If you notice on line 12, we are using an enum to define the type of value from the market key. Let's create the enum file in api->DTO directory and name it as marketenums.py. Add the following code to the file.

from enum import Enum

class MarketEnum(str, Enum):
     STOCKS = "stocks"
     CRYPTO = "crypto"
     FX = "fx"
     OTC = "otc"
     INDICIES = "indices"

Let's update the stocks.py endpoints to use the basemodels we just created. Update the stocks.py file.

import os
from datetime import datetime, timedelta

import requests
from dotenv import load_dotenv
from fastapi import APIRouter, HTTPException
from fastapi.params import Depends
from sqlalchemy.orm import Session

from app.database import database, crud, models
from ..DTO.schemas import StockData, HistoricalData

load_dotenv() # this line loads the environment variable from .env file
router = APIRouter()
POLYGON_API_KEY = os.getenv("FINANCIAL_API_KEY") #gets your financial api key from .env file and save it in a variable
BASE_URL = "https://api.polygon.io"

# This calls the endpoint on polygon API to get details for a particular ticker. An example symbol is AAPL
@router.get("/stocks/{symbol}", response_model=StockData)
async def get_stock_data(symbol: str):
    url = f"{BASE_URL}/reference/tickers/{symbol}?apiKey={POLYGON_API_KEY}"
    response = requests.get(url)

    if response.status_code == 200:
        return response.json()
    else:
        raise HTTPException(status_code=response.status_code, detail="Failed to fetch stock data")

@router.get("/stocks/{symbol}/historical", response_model=HistoricalData)
async def get_historical_data(symbol: str, date_from: str, date_to: str):
    url = f"{BASE_URL}/v2/aggs/ticker/{symbol}/range/1/day/{date_from}/{date_to}?adjusted=true&apiKey={POLYGON_API_KEY}"
    response = requests.get(url)

    if response.status_code == 200:
        return response.json()
    else:
        raise HTTPException(status_code=response.status_code, detail="Failed to fetch historical data")

@router.get("/stocks/search/{query}", response_model=StockSearch)
async def search_stocks(query: str):
    url = f"{BASE_URL}/v3/reference/tickers?search={query}&apiKey={POLYGON_API_KEY}"
    response = requests.get(url)

    if response.status_code == 200:
        return response.json()
    else:
        raise HTTPException(
            status_code=response.status_code, detail="Failed to fetch search results"
        )

Note. Your imports may look different than mine, as I may have more imports depending on my actual progress at the time of writing this article. I will try to keep it as accurate as possible. For actual code blocks, please refer to the GitHub repo.

Restart the Uvicorn server to apply the changes.

Try calling the endpoints from Insomnia again to make sure everything is working fine, and also you should see a smaller response compared to what we saw last time, because now the response will be mapped to our basemodels and only those keys will be returned in the final response.

VI. Create Database

Start by creating a new directory inside the app directory called database, and add a new file to it called models.py. Paste the following code in models.py.

from sqlalchemy import Column, Integer, String, Float, ForeignKey, DateTime, Boolean
from sqlalchemy.orm import relationship

from .database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)

    watchlists = relationship("Watchlist", back_populates="user")
    portfolios = relationship("Portfolio", back_populates="user")

class Stock(Base):
    __tablename__ = "stocks"

    id = Column(Integer, primary_key=True, index=True)
    ticker = Column(String, unique=True, index=True)
    name = Column(String)
    market = Column(String)
    locale = Column(String)
    primary_exchange = Column(String)
    type = Column(String)
    currency_name = Column(String)
    cik = Column(String)
    composite_figi = Column(String)
    share_class_figi = Column(String)
    last_updated_utc = Column(DateTime)
    active = Column(Boolean)

    news = relationship("News", back_populates="stock")
    historical_data = relationship("HistoricalStockData", back_populates="stock")

class HistoricalStockData(Base):
    __tablename__ = "historical_stock_data"

    id = Column(Integer, primary_key=True, index=True)
    symbol = Column(String)
    timestamp = Column(DateTime)
    open = Column(Float)
    high = Column(Float)
    low = Column(Float)
    close = Column(Float)
    volume = Column(Integer)
    stock_id = Column(Integer, ForeignKey("stocks.id"))

    stock = relationship("Stock", back_populates="historical_data")

class Watchlist(Base):
    __tablename__ = "watchlists"

    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    stock_id = Column(Integer, ForeignKey("stocks.id"))

    user = relationship("User", back_populates="watchlists")
    stock = relationship("Stock")

class Portfolio(Base):
    __tablename__ = "portfolios"

    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    stock_id = Column(Integer, ForeignKey("stocks.id"))
    quantity = Column(Integer)
    purchase_price = Column(Float)
    purchase_date = Column(DateTime)

    user = relationship("User", back_populates="portfolios")
    stock = relationship("Stock")

class News(Base):
    __tablename__ = "news"

    id = Column(Integer, primary_key=True, index=True)
    stock_id = Column(Integer, ForeignKey("stocks.id"))
    title = Column(String)
    url = Column(String)
    published_at = Column(DateTime)

    stock = relationship("Stock")

Classes Stocks and HistoricalStockData should have the same columns as schemas.py. We have some extra tables that we will use later.

If you notice we have written relationship and back_populates in Stocks and HistoricalStockData. This is to establish a bidirectional relationship between the two tables.

Update main.py to include create_all, which will create the database when we restart the server.

from fastapi import FastAPI

from app.api.endpoints import stocks, watchlist, portfolio, news, auth
from app.database import models, database

models.Base.metadata.create_all(bind=database.engine)

app = FastAPI()

app.include_router(stocks.router)
app.include_router(watchlist.router)
app.include_router(portfolio.router)
app.include_router(news.router)

@app.get("/")
async def root():
    return {"message": "Stock Market Backend API"}

If you make any changes to the database, it would be easiest to delete all the tables and recreate them. This strategy works well for small dbs and when you are in the development phase.

Create a new file called database.py inside app -> database and paste the following code.

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
import os

load_dotenv()

DATABASE_URL = os.getenv("DATABASE_URL")

engine = create_engine(DATABASE_URL)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

We will use this file to get a session for opening/ closing database connections. If you notice we are getting database url from the .env file.

Create a new file called crud.py inside app->database and paste the following code. Make sure your imports are correct, as they can differ based on your directory structure.

from datetime import datetime, timezone

from sqlalchemy.orm import Session

from app.api.DTO.schemas import UserCreate
from app.database import models

def create_stock(db: Session, stock_data: dict) -> models.Stock:
    stock_data_filtered = {
        key: value
        for key, value in stock_data.items()
        if key in models.Stock.__table__.columns
    } # create a new object stock_data_filtered that parses through stock_data input and stores all the columns that are there in DB
    stock_data_filtered["last_updated_utc"] = datetime.now(timezone.utc) # update last updated date time
    db_stock = models.Stock(**stock_data_filtered) #create an object that can be saved in db
    db.add(db_stock)
    db.commit()
    db.refresh(db_stock)
    return db_stock

#get stocks from db based on symbol name
def get_stock_by_ticker(db: Session, ticker: str) -> models.Stock:
    return db.query(models.Stock).filter(models.Stock.ticker == ticker).first()

def create_historical_data(
    db: Session, historical_data: dict
) -> models.HistoricalStockData:
    historical_data_filtered = {
        key: value
        for key, value in historical_data.items()
        if key in models.HistoricalStockData.__table__.columns
    }
    db_historical_data = models.HistoricalStockData(**historical_data_filtered)
    db.add(db_historical_data)
    db.commit()
    db.refresh(db_historical_data)
    return db_historical_data

def get_historical_data_by_stock_and_date(
    db: Session, stock_id: int, timestamp: datetime
) -> models.HistoricalStockData:
    return (
        db.query(models.HistoricalStockData)
        .filter(
            models.HistoricalStockData.stock_id == stock_id,
            models.HistoricalStockData.timestamp == timestamp,
        )
        .first()
    )

VII. Update endpoints to save/get data from the database

Calling the Polygon API for every request is not feasible, as you will reach your limits quickly. The best way to handle this is by caching data in your PostgreSQL DB. This would mean your application won't show the latest data, so later we can introduce paid plans for showing the latest data, but for free tier users, we can set the expectation that the data shown is notthe latest and is delayed by x minutes.

Open stocks.py and update the endpoint "/stocks/{symbol}" to include database logic. I have added comments to explain what it does. In a nutshell when this endpoint is called, we will look into the DB to get values. If DB doesn't contain the symbol, we will fetch it from Polygon, save it in DB, and return data.

@router.get("/stocks/{symbol}", response_model=StockData)
async def get_stock_data(symbol: str, db: Session = Depends(database.get_db)):
    db_stock = crud.get_stock_by_ticker(db, symbol) # try getting data from database first
    if db_stock:
        return db_stock
    else: # try getting data from Polygon if its not there in DB
        url = f"{BASE_URL}/v3/reference/tickers/{symbol}?apiKey={POLYGON_API_KEY}"
        response = requests.get(url)

        if response.status_code == 200 and response.json()["results"]:
            stock_data = response.json()["results"]
            db_stock = crud.create_stock(db, stock_data) # save data to db
            return db_stock
        else:
            raise HTTPException(
                status_code=response.status_code, detail="Failed to fetch stock data"
            )

Update get_historical_data endpoint. In a nutshell, this method first tries to check if we can create results from the data in the database and return them if we can (lines 26 to 63). If it doesn't find all the data it fetches it from Polygon, saves missing dates in the db, and returns the result.

#this endpoint will return every day price between the given date range
@router.get("/stocks/{symbol}/historical", response_model=HistoricalData)
async def get_historical_data(
    symbol: str,
    date_from: str,
    date_to: str,
    multiplier: int, #this is not used 
    timespan: str, #this is not used
    db: Session = Depends(database.get_db),
):
#check if the stock exists
    try:
        stock = await get_stock_data(symbol, db)
    except HTTPException as e:
        if e.status_code == 404:
            raise HTTPException(status_code=404, detail="Stock not found")
        else:
            raise

    date_from_obj = datetime.strptime(date_from, "%Y-%m-%d")
    date_to_obj = datetime.strptime(date_to, "%Y-%m-%d")
    results = []
    current_date = date_from_obj
    all_data_exists = True
    # try constructing the object from the data we have in database
    while current_date <= date_to_obj:
        db_data = (
            db.query(models.HistoricalStockData)
            .filter(
                models.HistoricalStockData.symbol == symbol,
                models.HistoricalStockData.timestamp == current_date,
            )
            .first()
        )

        if not db_data:
            all_data_exists = False
            break  # Exit the loop if data is missing
        else:
            results.append(
                {
                    "o": db_data.open,
                    "h": db_data.high,
                    "l": db_data.low,
                    "c": db_data.close,
                    "v": db_data.volume,
                    "t": int(db_data.timestamp.timestamp() * 1000),
                }
            )
        current_date += timedelta(days=1)

    if all_data_exists:
        return {
            "ticker": symbol,
            "queryCount": len(results),
            "resultsCount": len(results),
            "adjusted": True,
            "results": results,
            "status": "OK",
            "request_id": "db_fetch",
            "count": len(results),
        }

    else:
        # Fetch missing data from Polygon.io
        url = f"{BASE_URL}/v2/aggs/ticker/{symbol}/range/1/day/{date_from}/{date_to}?adjusted=true&apiKey={POLYGON_API_KEY}"
        response = requests.get(url)

        if response.status_code == 200 and response.json()["results"]:
            historical_data = response.json()
            for result in historical_data["results"]:
                timestamp = datetime.fromtimestamp(result["t"] / 1000)
                #save missing data
                if (
                    not db.query(models.HistoricalStockData)
                    .filter(
                        models.HistoricalStockData.symbol == symbol,
                        models.HistoricalStockData.timestamp == timestamp,
                    )
                    .first()
                ):
                    result["symbol"] = symbol
                    result["timestamp"] = timestamp
                    result["open"] = result["o"]
                    result["high"] = result["h"]
                    result["low"] = result["l"]
                    result["close"] = result["c"]
                    result["volume"] = result["v"]
                    crud.create_historical_data(db, result)
            return historical_data
        else:
            raise HTTPException(
                status_code=response.status_code,
                detail="Failed to fetch historical data",
            )

VIII. Add Authentication using JWT

If we need personalized endpoints like portfolio or watchlist, we need user accounts, as this data will be different for different users. We need to build an auth mechanism to ensure users' data is safe and cannot be accessed by unauthorized users.

Open the terminal and paste the following command.

pip install python-jose passlib

This will install two packages.

  • Passlib is a library for password hashing.
  • Python-jose is a library for working with JWTs.

We already have a User model in models.py with all the necessary properties, so we can move to the next step. That is to create a new file to add all the authentication helper methods. Create a new folder call auth inside app/api. Inside the auth folder, create a new file called authhelper.py.

Go to your .env file and add a new key called SECRET_KEY. You can generate a random GUID or any string as value for this key.

Paste the following code in the auth.py file.

import os
from datetime import datetime, timedelta, timezone

import jwt
from dotenv import load_dotenv
from fastapi import HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
from sqlalchemy.orm import Session

from app.database import database, crud

load_dotenv()

SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 #set expiration time to 30 minutes

# --- Password Hashing ---
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")  # Endpoint for obtaining token

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(
            minutes=ACCESS_TOKEN_EXPIRE_MINUTES
        )
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(
    token: str = Depends(oauth2_scheme), db: Session = Depends(database.get_db)
):
    credentials_exception = HTTPException(
        status_code=401,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except jwt.PyJWTError:
        raise credentials_exception
    user = crud.get_user_by_username(db, username=username)
    if user is None:
        raise credentials_exception
    return user

Let's try and understand various classes and methods of this code.

  • Password Hashing: passlib's CryptContext is used to hash and verify passwords securely.
  • create_access_token method is used to create JSON web tokens with an expiration time.
  • get_current_user decodes the JWT, verifies its signature and expiration, and retrieves the corresponding user from the database.
  • OAuth2PasswordBearer is used to define how the access token is obtained (in our case, through the "token" endpoint).

Let's update app/database/crud.py to include user-related methods. Paste the following code.

def get_user_by_username(db: Session, username: str):
    return db.query(models.User).filter(models.User.username == username).first()

def create_user(db: Session, user: UserCreate, hashed_password: str):
    db_user = models.User(
        username=user.username, email=user.email, hashed_password=hashed_password
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

We have added two methods, one to retrieve a user based on username and the other to create a new user and save its username, password, and email.

Let's update the app/api/DTO/schemas.py file.

class Token(BaseModel):
    access_token: str
    token_type: str

class UserBase(BaseModel):
    username: str
    email: str

class UserCreate(UserBase):
    password: str

class User(UserBase):
    id: int

    class Config:
        orm_mode = True

Let's create another file inside the end of the points folder called auth.py. This will be used to call the various authentication helper methods for user authentication. Go to app/api/endpoints and create a new file called authendpoints.py. Paste the following code inside the file.

from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session

from app.database import database, crud, models
from ..DTO import schemas
from ..DTO.schemas import UserCreate, Token

router = APIRouter()

@router.post("/token", response_model=Token)
async def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: Session = Depends(database.get_db),
):
    user = crud.get_user_by_username(db, username=form_data.username)
    if not user or not auth.verify_password(form_data.password, user.hashed_password):
        raise HTTPException(status_code=401, detail="Incorrect username or password")

    access_token_expires = auth.timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = auth.create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    return {
        "access_token": access_token,
        "token_type": "bearer"
    }

@router.post("/users/", response_model=schemas.User)
async def create_user(
    user: UserCreate,
    db: Session = Depends(database.get_db)
):
    db_user = crud.get_user_by_username(db, username=user.username)
    if db_user:
        raise HTTPException(status_code=400, detail="Username already registered")

    hashed_password = auth.get_password_hash(user.password)
    db_user = crud.create_user(db, user=user, hashed_password=hashed_password)
    return db_user

@router.get("/users/me/", response_model=schemas.User)
async def read_users_me(
    current_user: models.User = Depends(auth.get_current_user)
):
    return current_user 
  • login_for_access_token method takes username and password in form_data and checks if the user exists, and if it does, then matches the user's password against the form_data password and if everything is fine it creates a token and sends it back
  • create_user method expects the user to pass three properties - username, password, and email, and then checks if the username is available or not, and creates a new user.
  • read_users_me gets the current user after checking if the JWT token is valid or not

Let's try testing these endpoints using Insomnia.

Make a POST call to http://127.0.0.1:8000/auth/users/ with the Body as JSON.

{
	 "username": "tanujgyan",
    "password": "password",
    "email": "[email protected]"
}

It should create a new user for you.

Make the same call again, and you should get an error response.

{
	"detail": "Username already registered"
}

Make a POST call to http://127.0.0.1:8000/auth/token/ and pass the params as Form Data.

  • username: tanujgyan
  • password: password

You should get a response back with the token, for example,

{
	"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0YW51amd5YW4iLCJleHAiOjE3NDQ3OTg0MTN9.eS7Gj4n6Qp9D2xS5owwdKAM-Byg3eWtEGdQweiGstDs",
	"token_type": "bearer"
}

Make a GET call to http://127.0.0.1:8000/auth/users/me/ and in the Auth Tab choose Bearer Token and pass the value as the value obtained in previous call without quotes. You should get a response like this.

{
	"username": "tanujgyan",
	"email": "[email protected]",
	"id": 1
}

Summary

In this article, we learned the following things.

  • How to create a new DB using Postgres and Docker
  • How to create models in Python and call endpoints using Flash API
  • How to generate bearer tokens for authorization

In the upcoming articles, we will see how we can use the tokens to build more personalized pages like News and Portfolio etc.

Thanks for reading, please feel free to comment or ask any questions you have.