Skip to content

API Reference

This page is auto-generated from Python docstrings.

streaming.producer_case

src/streaming/producer_case.py - Local producer example.

Reads sales from data/sales.csv and writes records to a local simulated topic file one message at a time.

Start with main() at the bottom. Work up to see how it all fits together.

Many functions are standard helpers and should not need project-specific modifications.

Author: Denise Case Date: 2026-05

Terminal command to run this file from the root project folder:

uv run python -m streaming.producer_case
OBS

Don't edit this file - it should remain a working example. Copy it, rename it producer_yourname.py, and modify your copy.

generate_messages

generate_messages(count: int) -> Generator[dict[str, str]]

Generate a stream of sales from the input CSV file.

Parameters:

Name Type Description Default
count int

How many sales to generate.

required

Yields:

Type Description
Generator[dict[str, str]]

One sale row dictionary at a time.

Source code in src/streaming/producer_case.py
def generate_messages(count: int) -> Generator[dict[str, str]]:
    """Generate a stream of sales from the input CSV file.

    Arguments:
        count: How many sales to generate.

    Yields:
        One sale row dictionary at a time.
    """
    # Call the function read_csv_rows() to read
    # all rows from the SALES_CSV file into a list of dictionaries.
    # A dictionary is a set of key-value pairs,
    # where the keys are the column names from the CSV file
    # Note keys and values are all strings when read from CSV.
    # To use values, we will need to convert them from strings to the appropriate types.
    sales_rows: list[dict[str, str]] = read_csv_rows(SALES_CSV)

    # Use a generator to yield one sale row at a time, up to the specified count.
    # In Python, [start:stop] slicing syntax is used to get a subset of a list.
    # Since we don't provide a start index, it defaults to 0 (the beginning of the list).
    # So this will start at the first row and yield up to 'count' rows.
    yield from sales_rows[:count]

get_message_key

get_message_key(message: dict[str, Any]) -> str

Return the message key for a sale record.

Module 01 does not use Kafka yet, but this function prepares the same shape used later when the key is sent to Kafka.

Source code in src/streaming/producer_case.py
def get_message_key(message: dict[str, Any]) -> str:
    """Return the message key for a sale record.

    Module 01 does not use Kafka yet, but this function prepares the same
    shape used later when the key is sent to Kafka.
    """
    try:
        return str(message["region_id"])
    except KeyError as error:
        msg = missing_csv_field_message(
            field="region_id",
            available_fields=list(message.keys()),
        )
        raise KeyError(msg) from error

load_settings

load_settings() -> None

Load local producer settings from .env and log them.

Source code in src/streaming/producer_case.py
def load_settings() -> None:
    """Load local producer settings from .env and log them."""
    LOG.info("Loading settings from .env...")
    LOG.info(f"KAFKA_TOPIC                       = {TOPIC_NAME}")
    LOG.info(f"KAFKA_CLEAR_TOPIC_ON_START        = {CLEAR_TOPIC_ON_START}")
    LOG.info(f"PRODUCER_MESSAGE_COUNT            = {MESSAGE_COUNT}")
    LOG.info(f"PRODUCER_MESSAGE_INTERVAL_SECONDS = {MESSAGE_INTERVAL_SECONDS}")

log_paths

log_paths() -> None

Log run header and all paths.

Source code in src/streaming/producer_case.py
def log_paths() -> None:
    """Log run header and all paths."""
    log_header(LOG, "P01")
    LOG.info("========================")
    LOG.info("START producer main()")
    LOG.info("========================")
    log_path(LOG, "ROOT_DIR", ROOT_DIR)
    log_path(LOG, "DATA_DIR", DATA_DIR)
    log_path(LOG, "SALES_CSV", SALES_CSV)
    log_path(LOG, "TOPIC_CSV", TOPIC_CSV)

log_summary

log_summary(sent_count: int) -> None

Log final summary statistics.

Source code in src/streaming/producer_case.py
def log_summary(sent_count: int) -> None:
    """Log final summary statistics."""
    LOG.info("Summary:")
    LOG.info(f"Sent {sent_count} message(s).")
    log_path(LOG, "WROTE TOPIC_CSV", TOPIC_CSV)
    LOG.info("========================")
    LOG.info("Producer executed successfully!")
    LOG.info("========================")

main

main() -> None

Main entry point for the local producer.

Source code in src/streaming/producer_case.py
def main() -> None:
    """Main entry point for the local producer."""
    log_paths()

    LOG.info("========================")
    LOG.info("SECTION A. Acquire")
    LOG.info("========================")

    load_settings()
    verify_source()
    prepare_topic_file()

    LOG.info("========================")
    LOG.info("SECTION P. Produce Messages")
    LOG.info("========================")

    sent_count: int = send_messages()

    LOG.info("========================")
    LOG.info("SECTION E. Exit")
    LOG.info("========================")

    log_summary(sent_count)

prepare_topic_file

prepare_topic_file() -> None

Prepare the local simulated topic file.

If KAFKA_CLEAR_TOPIC_ON_START is true, delete the existing topic file so the producer starts with a clean local topic.

If KAFKA_CLEAR_TOPIC_ON_START is false, keep existing messages and append.

Source code in src/streaming/producer_case.py
def prepare_topic_file() -> None:
    """Prepare the local simulated topic file.

    If KAFKA_CLEAR_TOPIC_ON_START is true, delete the existing topic file
    so the producer starts with a clean local topic.

    If KAFKA_CLEAR_TOPIC_ON_START is false, keep existing messages and append.
    """
    LOG.info("Preparing local simulated topic file...")
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    if TOPIC_CSV.exists() and CLEAR_TOPIC_ON_START:
        TOPIC_CSV.unlink()
        LOG.info(f"Deleted existing topic file: {TOPIC_CSV.name}")

    if TOPIC_CSV.exists():
        LOG.info(f"Using existing topic file: {TOPIC_CSV.name}")
    else:
        LOG.info(f"Topic file will be created: {TOPIC_CSV.name}")

send_local_message

send_local_message(message: dict[str, Any]) -> None

Write one message to the local simulated topic file.

Source code in src/streaming/producer_case.py
def send_local_message(message: dict[str, Any]) -> None:
    """Write one message to the local simulated topic file."""
    append_csv_row(
        path=TOPIC_CSV,
        row=message,
        fieldnames=list(message.keys()),
    )

send_messages

send_messages() -> int

Generate and write local messages one at a time.

Source code in src/streaming/producer_case.py
def send_messages() -> int:
    """Generate and write local messages one at a time."""
    LOG.info("Sending messages...")
    LOG.info(f"Sending up to {MESSAGE_COUNT} local message(s).")
    LOG.info(f"Writing to simulated topic file: {TOPIC_CSV.name}")
    LOG.info("Watch each sale arrive. Press CTRL+C to stop early.\n")

    sent_count: int = 0

    try:
        for message in generate_messages(MESSAGE_COUNT):
            LOG.info(format_message_for_log(message))

            key = get_message_key(message)
            LOG.info(f"  Sending local message with key={key}")

            send_local_message(message)

            sent_count += 1
            LOG.info(f"  MESSAGE SENT  sent={sent_count}")
            time.sleep(MESSAGE_INTERVAL_SECONDS)

    except (FileNotFoundError, KeyError, RuntimeError, ValueError) as error:
        LOG.error(str(error))
        LOG.error("Producer stopped before completing all messages.")
        raise SystemExit(1) from error

    return sent_count

verify_source

verify_source() -> None

Verify the local source file exists.

Raises:

Type Description
SystemExit

If the source file does not exist.

Source code in src/streaming/producer_case.py
def verify_source() -> None:
    """Verify the local source file exists.

    Raises:
        SystemExit: If the source file does not exist.
    """
    LOG.info("Verifying local source data...")

    if not SALES_CSV.exists():
        LOG.error(f"Source file not found: {SALES_CSV}")
        raise SystemExit(1)

    LOG.info(f"Source file found: {SALES_CSV.name}")

streaming.consumer_case

src/streaming/consumer_case.py - Local consumer example.

Reads messages from a local simulated topic file and writes consumed records to a local output file one message at a time.

Start with main() at the bottom. Work up to see how it all fits together.

Many functions are standard helpers and should not need project-specific modifications.

Author: Denise Case Date: 2026-05

Terminal command to run this file from the root project folder:

uv run python -m streaming.consumer_case
OBS

Don't edit this file - it should remain a working example. Copy it, rename it consumer_yourname.py, and modify your copy.

consume_messages

consume_messages() -> int

Consume and process local messages from the simulated topic file.

Waits for new rows until MAX_MESSAGES is reached or no new message arrives within TIMEOUT_SECONDS.

Returns:

Type Description
int

The number of consumed messages.

Source code in src/streaming/consumer_case.py
def consume_messages() -> int:
    """Consume and process local messages from the simulated topic file.

    Waits for new rows until MAX_MESSAGES is reached or no new message arrives
    within TIMEOUT_SECONDS.

    Returns:
        The number of consumed messages.
    """
    LOG.info("Consuming local messages...")
    LOG.info(f"Waiting for up to {MAX_MESSAGES} message(s).")
    LOG.info(f"Stopping after {TIMEOUT_SECONDS}s with no new message.\n")

    consumed_count = 0
    last_message_time = time.monotonic()

    while consumed_count < MAX_MESSAGES:
        rows = read_csv_rows(TOPIC_CSV)
        new_rows = rows[consumed_count:]

        if not new_rows:
            elapsed = time.monotonic() - last_message_time

            if elapsed >= TIMEOUT_SECONDS:
                LOG.info(f"No new message received within {TIMEOUT_SECONDS}s timeout.")
                LOG.info("Producer finished or paused. Stopping consumer.")
                break

            time.sleep(POLL_INTERVAL_SECONDS)
            continue

        for row in new_rows:
            LOG.info(row)

            processed = process_message(row)

            append_csv_row(
                path=OUTPUT_CSV,
                row=processed,
                fieldnames=list(processed.keys()),
            )

            consumed_count += 1
            last_message_time = time.monotonic()

            LOG.info("MESSAGE CONSUMED")
            LOG.info(f"consumed={consumed_count}")

            if consumed_count >= MAX_MESSAGES:
                break

    return consumed_count

initialize_output

initialize_output() -> None

Initialize output directory and clear consumed CSV from prior runs.

Source code in src/streaming/consumer_case.py
def initialize_output() -> None:
    """Initialize output directory and clear consumed CSV from prior runs."""
    LOG.info("Initializing output...")
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    if OUTPUT_CSV.exists():
        OUTPUT_CSV.unlink()

    LOG.info(f"Output CSV cleared: {OUTPUT_CSV.name}")

load_settings

load_settings() -> None

Load local consumer settings from .env and log them.

Source code in src/streaming/consumer_case.py
def load_settings() -> None:
    """Load local consumer settings from .env and log them."""
    LOG.info("Loading settings from .env...")
    LOG.info(f"KAFKA_TOPIC                    = {TOPIC_NAME}")
    LOG.info(f"CONSUMER_MAX_MESSAGES          = {MAX_MESSAGES}")
    LOG.info(f"CONSUMER_POLL_INTERVAL_SECONDS = {POLL_INTERVAL_SECONDS}")
    LOG.info(f"CONSUMER_TIMEOUT_SECONDS       = {TIMEOUT_SECONDS}")

log_paths

log_paths() -> None

Log run header and all paths.

Source code in src/streaming/consumer_case.py
def log_paths() -> None:
    """Log run header and all paths."""
    log_header(LOG, "C01")
    LOG.info("========================")
    LOG.info("START consumer main()")
    LOG.info("========================")
    log_path(LOG, "ROOT_DIR", ROOT_DIR)
    log_path(LOG, "DATA_DIR", DATA_DIR)
    log_path(LOG, "TOPIC_CSV", TOPIC_CSV)
    log_path(LOG, "OUTPUT_CSV", OUTPUT_CSV)

log_summary

log_summary(consumed_count: int) -> None

Log final summary statistics.

Source code in src/streaming/consumer_case.py
def log_summary(consumed_count: int) -> None:
    """Log final summary statistics."""
    LOG.info("Summary:")
    LOG.info(f"Consumed {consumed_count} message(s).")
    log_path(LOG, "OUTPUT_CSV", OUTPUT_CSV)
    LOG.info("========================")
    LOG.info("Consumer executed successfully!")
    LOG.info("========================")

main

main() -> None

Main entry point for the local consumer.

Source code in src/streaming/consumer_case.py
def main() -> None:
    """Main entry point for the local consumer."""
    log_paths()

    LOG.info("========================")
    LOG.info("SECTION A. Acquire")
    LOG.info("========================")

    load_settings()
    verify_topic_file()

    LOG.info("========================")
    LOG.info("SECTION C. Consume and Process Messages")
    LOG.info("========================")

    initialize_output()
    consumed_count = consume_messages()
    save_artifacts()

    LOG.info("========================")
    LOG.info("SECTION E. Exit")
    LOG.info("========================")

    log_summary(consumed_count)

process_message

process_message(row: dict[str, Any]) -> dict[str, Any]

Process one local message.

Module 01 does not validate, enrich, chart, store, or use Kafka yet. It simply returns the raw message.

Parameters:

Name Type Description Default
row dict[str, Any]

A local message row.

required

Returns:

Type Description
dict[str, Any]

The same row.

Source code in src/streaming/consumer_case.py
def process_message(row: dict[str, Any]) -> dict[str, Any]:
    """Process one local message.

    Module 01 does not validate, enrich, chart, store, or use Kafka yet.
    It simply returns the raw message.

    Arguments:
        row: A local message row.

    Returns:
        The same row.
    """
    LOG.info("Processing raw local message.")
    return row

save_artifacts

save_artifacts() -> None

Log output artifacts.

Source code in src/streaming/consumer_case.py
def save_artifacts() -> None:
    """Log output artifacts."""
    LOG.info("Saving artifacts...")
    log_path(LOG, "WROTE OUTPUT_CSV", OUTPUT_CSV)

verify_topic_file

verify_topic_file() -> None

Wait for the local simulated topic file to exist.

Raises:

Type Description
SystemExit

If the topic file does not appear before timeout.

Source code in src/streaming/consumer_case.py
def verify_topic_file() -> None:
    """Wait for the local simulated topic file to exist.

    Raises:
        SystemExit: If the topic file does not appear before timeout.
    """
    LOG.info("Verifying local simulated topic file...")
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    start_time = time.monotonic()

    while not TOPIC_CSV.exists():
        elapsed = time.monotonic() - start_time

        if elapsed >= TIMEOUT_SECONDS:
            LOG.error(f"Topic file not found: {TOPIC_CSV}")
            LOG.error(
                "Run the local producer first, or run producer and consumer together."
            )
            raise SystemExit(1)

        LOG.info("Topic file not found yet. Waiting...")
        time.sleep(POLL_INTERVAL_SECONDS)

    LOG.info(f"Topic file found: {TOPIC_CSV.name}")