API Reference¶
This page is auto-generated from Python docstrings.
streaming.producer_case ¶
src/streaming/producer_case.py - Local producer example.
Reads sales from data/sales.csv and writes records to a local simulated topic file one message at a time.
Start with main() at the bottom. Work up to see how it all fits together.
Many functions are standard helpers and should not need project-specific modifications.
Author: Denise Case Date: 2026-05
Terminal command to run this file from the root project folder:
uv run python -m streaming.producer_case
OBS
Don't edit this file - it should remain a working example. Copy it, rename it producer_yourname.py, and modify your copy.
generate_messages ¶
Generate a stream of sales from the input CSV file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count
|
int
|
How many sales to generate. |
required |
Yields:
| Type | Description |
|---|---|
Generator[dict[str, str]]
|
One sale row dictionary at a time. |
Source code in src/streaming/producer_case.py
get_message_key ¶
Return the message key for a sale record.
Module 01 does not use Kafka yet, but this function prepares the same shape used later when the key is sent to Kafka.
Source code in src/streaming/producer_case.py
load_settings ¶
Load local producer settings from .env and log them.
Source code in src/streaming/producer_case.py
log_paths ¶
Log run header and all paths.
Source code in src/streaming/producer_case.py
log_summary ¶
Log final summary statistics.
Source code in src/streaming/producer_case.py
main ¶
Main entry point for the local producer.
Source code in src/streaming/producer_case.py
prepare_topic_file ¶
Prepare the local simulated topic file.
If KAFKA_CLEAR_TOPIC_ON_START is true, delete the existing topic file so the producer starts with a clean local topic.
If KAFKA_CLEAR_TOPIC_ON_START is false, keep existing messages and append.
Source code in src/streaming/producer_case.py
send_local_message ¶
Write one message to the local simulated topic file.
send_messages ¶
Generate and write local messages one at a time.
Source code in src/streaming/producer_case.py
verify_source ¶
Verify the local source file exists.
Raises:
| Type | Description |
|---|---|
SystemExit
|
If the source file does not exist. |
Source code in src/streaming/producer_case.py
streaming.consumer_case ¶
src/streaming/consumer_case.py - Local consumer example.
Reads messages from a local simulated topic file and writes consumed records to a local output file one message at a time.
Start with main() at the bottom. Work up to see how it all fits together.
Many functions are standard helpers and should not need project-specific modifications.
Author: Denise Case Date: 2026-05
Terminal command to run this file from the root project folder:
uv run python -m streaming.consumer_case
OBS
Don't edit this file - it should remain a working example. Copy it, rename it consumer_yourname.py, and modify your copy.
consume_messages ¶
Consume and process local messages from the simulated topic file.
Waits for new rows until MAX_MESSAGES is reached or no new message arrives within TIMEOUT_SECONDS.
Returns:
| Type | Description |
|---|---|
int
|
The number of consumed messages. |
Source code in src/streaming/consumer_case.py
initialize_output ¶
Initialize output directory and clear consumed CSV from prior runs.
Source code in src/streaming/consumer_case.py
load_settings ¶
Load local consumer settings from .env and log them.
Source code in src/streaming/consumer_case.py
log_paths ¶
Log run header and all paths.
Source code in src/streaming/consumer_case.py
log_summary ¶
Log final summary statistics.
Source code in src/streaming/consumer_case.py
main ¶
Main entry point for the local consumer.
Source code in src/streaming/consumer_case.py
process_message ¶
Process one local message.
Module 01 does not validate, enrich, chart, store, or use Kafka yet. It simply returns the raw message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
row
|
dict[str, Any]
|
A local message row. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
The same row. |
Source code in src/streaming/consumer_case.py
save_artifacts ¶
verify_topic_file ¶
Wait for the local simulated topic file to exist.
Raises:
| Type | Description |
|---|---|
SystemExit
|
If the topic file does not appear before timeout. |