Overview

In fast-moving industries, monitoring inventory levels in real-time is essential to ensuring smooth and successful operations. There are countless factors that affect the supply chain: customer preferences shift, raw materials may suddenly become hard to obtain, and unforeseen circumstances can delay shipments.

Having a live view of stock levels allows companies to respond immediately to changes in demand and supply chain disruptions. With data constantly streamed in, businesses can adjust forecasts based on current sales trends. If delays occur, notifications can be promptly sent to customers, improving transparency higher customer satisfaction.

In this tutorial, you will learn how to utilize inventory and sales data to prevent stock shortages and forecast sales demand.

Prerequisites

  • Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment. For detailed instructions, see Download PostgreSQL.
  • Install and run RisingWave. For detailed instructions on how to quickly get started, see the Quick start guide.
  • Ensure that a Python environment is set up and install the psycopg2 library.

Step 1: Set up the data source tables

Once RisingWave is installed and deployed, run the two SQL queries below to set up the tables. You will insert data into these tables to simulate live data streams.

  1. The table inventory tracks the current stock levels of each product at each warehouse.

    CREATE TABLE inventory (
        warehouse_id INT,
        product_id INT,
        timestamp TIMESTAMP,
        stock_level INT,
        reorder_point INT,
        location VARCHAR
    );
    
  2. The table sales describes the details of each transaction, such as the quantity purchased and the warehouse from which the item was sourced.

    CREATE TABLE sales (
        sale_id INT,
        warehouse_id INT,
        product_id INT,
        quantity_sold INT,
        timestamp TIMESTAMP
    );
    

Step 2: Run the data generator

To keep this demo simple, a Python script is used to generate and insert data into the tables created above.

Clone the awesome-stream-processing repository.

git clone https://github.com/risingwavelabs/awesome-stream-processing.git

Navigate to the warehouse_inventory_mgmt folder.

cd awesome-stream-processing/tree/main/02-simple-demos/logistics/warehouse_inventory_mgmt

Run the data_generator.py file. This Python script utilizes the psycopg2 library to establish a connection with RisingWave so you can generate and insert synthetic data into the tables positions and market_data.

If you are not running RisingWave locally or using default credentials, update the connection parameters accordingly:

default_params = {
    "dbname": "dev",
    "user": "root",
    "password": "",
    "host": "localhost",
    "port": "4566"
}

Step 3: Create materialized views

In this demo, you will create three materialized views to manage inventory levels.

Materialized views contain the results of a view expression and are stored in the RisingWave database. The results of a materialized view are computed incrementally and updated whenever new events arrive and do not require to be refreshed. When you query from a materialized view, it will return the most up-to-date computation results.

Monitor inventory status

The inventory_status materialized view indicates whether or not a product needs to be restocked.

CREATE MATERIALIZED VIEW inventory_status AS
SELECT
    warehouse_id,
    product_id,
    stock_level,
    reorder_point,
    location,
    CASE
        WHEN stock_level <= reorder_point THEN 'Reorder Needed'
        ELSE 'Stock Sufficient'
    END AS reorder_status,
    timestamp AS last_update
FROM
    inventory;

You can query from inventory_status to see the results.

SELECT * FROM inventory_status LIMIT 5;
 warehouse_id | product_id | stock_level | reorder_point |  location   |  reorder_status  |        last_update         
--------------+------------+-------------+---------------+-------------+------------------+----------------------------
            1 |          1 |          64 |           100 | Warehouse 1 | Reorder Needed   | 2024-11-18 14:32:35.808553
            2 |          3 |         137 |           100 | Warehouse 2 | Stock Sufficient | 2024-11-18 14:32:51.023410
            5 |         10 |         493 |           100 | Warehouse 5 | Stock Sufficient | 2024-11-18 14:32:40.933411
            4 |          7 |          68 |           100 | Warehouse 4 | Reorder Needed   | 2024-11-18 14:32:35.827922
            1 |          2 |         416 |           100 | Warehouse 1 | Stock Sufficient | 2024-11-18 14:32:45.952925

Aggregate recent sales

The recent_sales materialized view calculates the number of products sold from each warehouse within the past week. By understanding recent sale trends, you can forecast demand.

A temporal filter, timestamp > NOW() - INTERVAL '7 days' is used to retrieve sales made within the past week. To learn more about temporal filters, see Temporal filters.

CREATE MATERIALIZED VIEW recent_sales AS
SELECT
    warehouse_id,
    product_id,
    SUM(quantity_sold) AS total_quantity_sold,
    MAX(timestamp) AS last_sale
FROM
    sales
WHERE
    timestamp > NOW() - INTERVAL '7 days'
GROUP BY
    warehouse_id, product_id;

You can query from recent_sales to see the results.

SELECT * FROM recent_sales;
 warehouse_id | product_id | total_quantity_sold |         last_sale          
--------------+------------+---------------------+----------------------------
            2 |          3 |                  27 | 2024-11-18 14:33:06.225306
            2 |          8 |                  42 | 2024-11-18 14:33:21.414487
            3 |          1 |                  27 | 2024-11-18 14:33:21.413932
            3 |         10 |                  19 | 2024-11-18 14:33:01.171326
            4 |          1 |                  17 | 2024-11-18 14:33:21.409274

Forecast demand

The demand_forecast materialized view predicts how long the current stock of each product will last based on recent sales trends.

A simple model is used to forecase demand, where the stock_level found in inventory_status is divided by the total_quantity_sold in `recent_sales.

RisingWave supports creating materialized views on top of materialized views. When the source materialized view updates, the child materialized view will update accordingly as well.

CREATE MATERIALIZED VIEW demand_forecast AS
SELECT
    i.warehouse_id,
    i.product_id,
    i.stock_level,
    r.total_quantity_sold AS weekly_sales,
    CASE
        WHEN r.total_quantity_sold = 0 THEN 0
        ELSE ROUND(i.stock_level / r.total_quantity_sold, 2)
    END AS stock_days_remaining
FROM
    inventory_status AS i
LEFT JOIN
    recent_sales AS r
ON
    i.warehouse_id = r.warehouse_id AND i.product_id = r.product_id;

You can query from demand_forecast to see the results.

SELECT * FROM demand_forecast LIMIT 5;
 warehouse_id | product_id | stock_level | weekly_sales | stock_days_remaining 
--------------+------------+-------------+--------------+----------------------
            2 |          4 |         191 |           28 |                    6
            1 |          7 |         157 |           21 |                    7
            4 |          6 |         171 |           67 |                    2
            3 |          6 |         221 |           86 |                    2
            5 |          4 |          92 |           58 |                    1

When finished, press Ctrl+C to close the connection between RisingWave and psycopg2.

Summary

In this tutorial, you learn:

  • How to use temporal filters to retrieve data within a specific time range.
  • How to create materialized views based on materialized views.