Skip to content

API Reference

Auto-generated from Python docstrings.

Core

src/datafun_streaming/core/types.py.

Shared type aliases used across all datafun_streaming subpackages. Import from here when type-hinting streaming records in any module.

Data Validation

src/datafun_streaming/data_validation/types.py.

Type aliases and dataclasses for validation results.

Import from here whenever you need to type-hint a record or validation result.

ValidationResult dataclass

Result from checking one record against the data contract.

Attributes:

Name Type Description
is_valid bool

True if the record passed all validation checks.

errors ErrorMessages

List of error messages; empty when is_valid is True.

Source code in src/datafun_streaming/data_validation/types.py
@dataclass(frozen=True)
class ValidationResult:
    """Result from checking one record against the data contract.

    Attributes:
        is_valid: True if the record passed all validation checks.
        errors: List of error messages; empty when is_valid is True.
    """

    is_valid: bool
    errors: ErrorMessages

src/datafun_streaming/data_validation/validation_utils.py.

Generic field-level validation functions.

Each function checks one thing about one value and returns a list of error strings, empty if valid, one or more messages if not. These functions know nothing about domains, reference data, or business rules. They only check types, formats, and value ranges.

OBS

Add functions to this file as validation requirements evolve.

add_validation_errors(*, record, errors)

Return a copy of a record with validation errors attached.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
record DataRecordDict

A dictionary representing one data record.

required
errors ErrorMessages

A list of validation error messages.

required

Returns:

Type Description
DataRecordDict

A copy of the record with a validation_errors field appended.

Source code in src/datafun_streaming/data_validation/validation_utils.py
def add_validation_errors(
    *,
    record: DataRecordDict,
    errors: ErrorMessages,
) -> DataRecordDict:
    """Return a copy of a record with validation errors attached.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        record: A dictionary representing one data record.
        errors: A list of validation error messages.

    Returns:
        A copy of the record with a validation_errors field appended.
    """
    output = dict(record)
    output["validation_errors"] = " | ".join(errors)
    return output

validate_boolean_text(value, *, field_name)

Return errors for an invalid boolean text value.

All boolean values must be represented as "true" or "false" (case-insensitive).

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
value str

The text value to validate.

required
field_name str

The name of the field being validated, for error messages.

required

Returns:

Type Description
list[str]

A list of errors, or an empty list if the value is valid.

Source code in src/datafun_streaming/data_validation/validation_utils.py
def validate_boolean_text(value: str, *, field_name: str) -> list[str]:
    """Return errors for an invalid boolean text value.

    All boolean values must be represented as
    "true" or "false" (case-insensitive).

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        value: The text value to validate.
        field_name: The name of the field being validated, for error messages.

    Returns:
        A list of errors, or an empty list if the value is valid.
    """
    allowed_values = {"true", "false"}

    if value.lower() not in allowed_values:
        return [f"{field_name} must be true or false: {value}"]

    return []

validate_datetime(value)

Return errors for an invalid datetime value.

All datetime values must be in ISO 8601 format.

Parameters:

Name Type Description Default
value str

The text value to validate.

required

Returns:

Type Description
list[str]

A list of errors, or an empty list if the value is valid.

Source code in src/datafun_streaming/data_validation/validation_utils.py
def validate_datetime(value: str) -> list[str]:
    """Return errors for an invalid datetime value.

    All datetime values must be in ISO 8601 format.

    Arguments:
        value: The text value to validate.

    Returns:
        A list of errors, or an empty list if the value is valid.
    """
    try:
        datetime.fromisoformat(value.replace("Z", "+00:00"))
    except ValueError:
        return [f"Invalid datetime: {value}"]

    return []

validate_positive_integer(value)

Return errors for an invalid positive integer value.

All positive integer values must be integers greater than or equal to 1.

Parameters:

Name Type Description Default
value str

The text value to validate.

required

Returns:

Type Description
list[str]

A list of errors, or an empty list if the value is valid.

Source code in src/datafun_streaming/data_validation/validation_utils.py
def validate_positive_integer(value: str) -> list[str]:
    """Return errors for an invalid positive integer value.

    All positive integer values must be integers greater than or equal to 1.

    Arguments:
        value: The text value to validate.

    Returns:
        A list of errors, or an empty list if the value is valid.
    """
    try:
        number = int(value)
    except ValueError:
        return [f"Value must be an integer: {value}"]

    if number < 1:
        return [f"Value must be at least 1: {value}"]

    return []

validate_required_fields(*, record, required_fields)

Return errors for missing or blank required fields.

All required fields must be present and not blank.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
record DataRecordDict

A dictionary representing one data record / row.

required
required_fields list[str]

A list of field names that are required.

required

Returns:

Type Description
list[str]

A list of errors, or

list[str]

an empty list if all required fields are present.

Source code in src/datafun_streaming/data_validation/validation_utils.py
def validate_required_fields(
    *,
    record: DataRecordDict,
    required_fields: list[str],
) -> list[str]:
    """Return errors for missing or blank required fields.

    All required fields must be present and not blank.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        record: A dictionary representing one data record / row.
        required_fields: A list of field names that are required.

    Returns:
        A list of errors, or
        an empty list if all required fields are present.
    """
    errors: list[str] = []

    for field_name in required_fields:
        if field_name not in record:
            errors.append(f"Missing required field: {field_name}")
        elif not record[field_name].strip():
            errors.append(f"Required field is blank: {field_name}")

    return errors

src/datafun_streaming/data_validation/reference.py.

Reference data validation helpers.

Provides functions for working with lookup tables: building lookup sets from CSV rows and validating reference records.

make_lookup_set(records, key_field)

Create a set of allowed values for a field in a reference table.

Parameters:

Name Type Description Default
records DataRecordDictList

A list of row dictionaries from a reference CSV file.

required
key_field str

The field to use as the key for allowed values.

required

Returns:

Type Description
AllowedValuesSet

A set of allowed values for the specified key field.

Source code in src/datafun_streaming/data_validation/reference.py
def make_lookup_set(records: DataRecordDictList, key_field: str) -> AllowedValuesSet:
    """Create a set of allowed values for a field in a reference table.

    Arguments:
        records: A list of row dictionaries from a reference CSV file.
        key_field: The field to use as the key for allowed values.

    Returns:
        A set of allowed values for the specified key field.
    """
    values: AllowedValuesSet = set()
    for record in records:
        value: str = record.get(key_field, "").strip()
        if value:
            values.add(value)
    return values

validate_reference_records(*, records, required_fields, label)

Validate reference records and return file-level errors.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
records DataRecordDictList

Reference data records to validate.

required
required_fields list[str]

Field names required in each record.

required
label str

Label for this reference file, used in error messages.

required

Returns:

Type Description
list[str]

A list of errors, or an empty list if all records are valid.

Source code in src/datafun_streaming/data_validation/reference.py
def validate_reference_records(
    *,
    records: DataRecordDictList,
    required_fields: list[str],
    label: str,
) -> list[str]:
    """Validate reference records and return file-level errors.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        records: Reference data records to validate.
        required_fields: Field names required in each record.
        label: Label for this reference file, used in error messages.

    Returns:
        A list of errors, or an empty list if all records are valid.
    """
    errors: list[str] = []
    for record_number, record in enumerate(records, start=1):
        for error in validate_required_fields(
            record=record, required_fields=required_fields
        ):
            errors.append(f"{label} record {record_number}: {error}")
    return errors

data_validation/errors.py.

Error messages for validation.

reference_validation_failed_message(*, label, error_count)

Return help text when a reference data file fails validation.

Source code in src/datafun_streaming/data_validation/errors.py
def reference_validation_failed_message(*, label: str, error_count: int) -> str:
    """Return help text when a reference data file fails validation."""
    return f"""
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
A reference data file failed validation.
File:   {label}
Errors: {error_count} problem(s) found.

The producer cannot run until all reference files are valid.
Fix the reference file before retrying.

CHECK:
1. Open data/{label} and inspect the header row.
2. Confirm all required fields are present and spelled correctly.
3. Confirm no rows have blank values in required fields.
4. See data_contract_case.py for the list of required fields.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
""".strip()

IO

src/datafun_streaming/io/io_utils.py.

CSV and JSON helpers for streaming examples.

append_csv_row(path, row, fieldnames)

Append one row to a CSV file, writing the header first if needed.

Source code in src/datafun_streaming/io/io_utils.py
def append_csv_row(path: Path, row: dict[str, Any], fieldnames: list[str]) -> None:
    """Append one row to a CSV file, writing the header first if needed."""
    path.parent.mkdir(parents=True, exist_ok=True)
    file_exists = path.exists()

    with path.open(mode="a", encoding="utf-8", newline="") as file:
        writer = csv.DictWriter(file, fieldnames=fieldnames)

        if not file_exists:
            writer.writeheader()

        writer.writerow(row)

format_message_for_log(message)

Format one message dictionary for readable log output.

Source code in src/datafun_streaming/io/io_utils.py
def format_message_for_log(message: dict[str, Any]) -> str:
    """Format one message dictionary for readable log output."""
    lines = ["{"]

    for key, value in message.items():
        lines.append(f"  {key}: {value}")

    lines.append("}")
    return "\n".join(lines)

read_csv_as_lookup(path, *, key_field, value_field)

Read a CSV file into a key-value lookup dictionary.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
path Path

Path to the CSV file.

required
key_field str

The column to use as the dictionary key.

required
value_field str

The column to use as the dictionary value.

required

Returns:

Type Description
dict[str, Any]

A dict mapping each key_field value to its value_field value.

Example

region_lookup = read_csv_as_lookup( REGIONS_CSV, key_field="region_id", value_field="tax_rate_pct" ) tax_rate = float(region_lookup["US-MO"]) / 100.0

Source code in src/datafun_streaming/io/io_utils.py
def read_csv_as_lookup(
    path: Path,
    *,
    key_field: str,
    value_field: str,
) -> dict[str, Any]:
    """Read a CSV file into a key-value lookup dictionary.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        path:        Path to the CSV file.
        key_field:   The column to use as the dictionary key.
        value_field: The column to use as the dictionary value.

    Returns:
        A dict mapping each key_field value to its value_field value.

    Example:
        region_lookup = read_csv_as_lookup(
            REGIONS_CSV, key_field="region_id", value_field="tax_rate_pct"
        )
        tax_rate = float(region_lookup["US-MO"]) / 100.0
    """
    rows = read_csv_rows(path)
    return {row[key_field]: row[value_field] for row in rows}

read_csv_rows(path)

Read a CSV file into a list of string dictionaries.

Source code in src/datafun_streaming/io/io_utils.py
def read_csv_rows(path: Path) -> list[dict[str, str]]:
    """Read a CSV file into a list of string dictionaries."""
    if not path.exists():
        msg = missing_csv_file_message(path=path.as_posix())
        raise FileNotFoundError(msg)

    with path.open(mode="r", encoding="utf-8", newline="") as file:
        reader = csv.DictReader(file)

        if reader.fieldnames is None:
            msg = f"CSV file has no header row: {path.as_posix()}"
            raise ValueError(msg)

        return list(reader)

row_from_json(text)

Convert JSON text to a row dictionary.

Source code in src/datafun_streaming/io/io_utils.py
def row_from_json(text: str) -> dict[str, Any]:
    """Convert JSON text to a row dictionary."""
    value = json.loads(text)

    if not isinstance(value, dict):
        msg = "Expected JSON object."
        raise ValueError(msg)

    return value

row_to_json(row)

Convert a row dictionary to compact JSON text.

Source code in src/datafun_streaming/io/io_utils.py
def row_to_json(row: dict[str, Any]) -> str:
    """Convert a row dictionary to compact JSON text."""
    return json.dumps(row, sort_keys=True, separators=(",", ":"))

io/errors.py.

missing_csv_field_message(*, field, available_fields)

Return help text for a missing CSV field.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
field str

The name of the missing field.

required
available_fields list[str]

A list of field names that were found in the CSV file.

required

Returns:

Type Description
str

A help message with troubleshooting steps.

Source code in src/datafun_streaming/io/errors.py
def missing_csv_field_message(*, field: str, available_fields: list[str]) -> str:
    """Return help text for a missing CSV field.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        field: The name of the missing field.
        available_fields: A list of field names that were found in the CSV file.

    Returns:
        A help message with troubleshooting steps.
    """
    fields = ", ".join(available_fields)

    return f"""
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
The project read the CSV file,
but an expected column was not present.
Required CSV field missing:
    {field}

Available fields were:
    {fields}

CHECK:
1. Open data/sales.csv.
2. Confirm the header row includes: {field}
3. Header names must match exactly.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
""".strip()

missing_csv_file_message(*, path)

Return help text for a missing CSV file.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
path str

The file path that was not found.

required

Returns:

Type Description
str

A help message with troubleshooting steps.

Source code in src/datafun_streaming/io/errors.py
def missing_csv_file_message(*, path: str) -> str:
    """Return help text for a missing CSV file.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        path: The file path that was not found.

    Returns:
        A help message with troubleshooting steps.
    """
    return f"""
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
This project needs a CSV file to generate messages.
Required CSV file not found:
    {path}

CHECK:
1. Confirm you are running the command from the project root folder.
2. Confirm the data folder exists.
3. Confirm data/sales.csv exists.
4. If the file was deleted, restore it from the repository.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
""".strip()

Kafka

src/datafun_streaming/kafka/kafka_settings.py.

Kafka settings for producer and consumer examples.

KafkaSettings dataclass

Kafka settings for producer and consumer examples.

Source code in src/datafun_streaming/kafka/kafka_settings.py
@dataclass(frozen=True)
class KafkaSettings:
    """Kafka settings for producer and consumer examples."""

    bootstrap_servers: str = DEFAULT_BOOTSTRAP_SERVERS
    broker_address_family: str = DEFAULT_BROKER_ADDRESS_FAMILY
    topic: str = DEFAULT_TOPIC
    group_id: str = DEFAULT_GROUP_ID
    auto_offset_reset: str = DEFAULT_AUTO_OFFSET_RESET
    clear_topic_on_start: bool = DEFAULT_CLEAR_TOPIC_ON_START

    @classmethod
    def from_env(cls) -> Self:
        """Create Kafka settings from environment variables."""
        load_dotenv()

        return cls(
            bootstrap_servers=os.getenv(
                "KAFKA_BOOTSTRAP_SERVERS",
                DEFAULT_BOOTSTRAP_SERVERS,
            ),
            broker_address_family=os.getenv(
                "KAFKA_BROKER_ADDRESS_FAMILY",
                DEFAULT_BROKER_ADDRESS_FAMILY,
            ),
            topic=os.getenv("KAFKA_TOPIC", DEFAULT_TOPIC),
            group_id=os.getenv("KAFKA_GROUP_ID", DEFAULT_GROUP_ID),
            auto_offset_reset=os.getenv(
                "KAFKA_AUTO_OFFSET_RESET",
                DEFAULT_AUTO_OFFSET_RESET,
            ),
            clear_topic_on_start=_read_bool_env(
                "KAFKA_CLEAR_TOPIC_ON_START",
                DEFAULT_CLEAR_TOPIC_ON_START,
            ),
        )

    def producer_config(self) -> dict[str, str]:
        """Return a confluent-kafka producer configuration dict."""
        return {
            "bootstrap.servers": self.bootstrap_servers,
            "broker.address.family": self.broker_address_family,
            "log_level": "3",
            "message.timeout.ms": "5000",
            "request.timeout.ms": "5000",
            "socket.timeout.ms": "5000",
        }

    def consumer_config(self) -> dict[str, str]:
        """Return a confluent-kafka consumer configuration dict."""
        return {
            "bootstrap.servers": self.bootstrap_servers,
            "broker.address.family": self.broker_address_family,
            "group.id": self.group_id,
            "auto.offset.reset": self.auto_offset_reset,
            "log_level": "3",
        }

consumer_config()

Return a confluent-kafka consumer configuration dict.

Source code in src/datafun_streaming/kafka/kafka_settings.py
def consumer_config(self) -> dict[str, str]:
    """Return a confluent-kafka consumer configuration dict."""
    return {
        "bootstrap.servers": self.bootstrap_servers,
        "broker.address.family": self.broker_address_family,
        "group.id": self.group_id,
        "auto.offset.reset": self.auto_offset_reset,
        "log_level": "3",
    }

from_env() classmethod

Create Kafka settings from environment variables.

Source code in src/datafun_streaming/kafka/kafka_settings.py
@classmethod
def from_env(cls) -> Self:
    """Create Kafka settings from environment variables."""
    load_dotenv()

    return cls(
        bootstrap_servers=os.getenv(
            "KAFKA_BOOTSTRAP_SERVERS",
            DEFAULT_BOOTSTRAP_SERVERS,
        ),
        broker_address_family=os.getenv(
            "KAFKA_BROKER_ADDRESS_FAMILY",
            DEFAULT_BROKER_ADDRESS_FAMILY,
        ),
        topic=os.getenv("KAFKA_TOPIC", DEFAULT_TOPIC),
        group_id=os.getenv("KAFKA_GROUP_ID", DEFAULT_GROUP_ID),
        auto_offset_reset=os.getenv(
            "KAFKA_AUTO_OFFSET_RESET",
            DEFAULT_AUTO_OFFSET_RESET,
        ),
        clear_topic_on_start=_read_bool_env(
            "KAFKA_CLEAR_TOPIC_ON_START",
            DEFAULT_CLEAR_TOPIC_ON_START,
        ),
    )

producer_config()

Return a confluent-kafka producer configuration dict.

Source code in src/datafun_streaming/kafka/kafka_settings.py
def producer_config(self) -> dict[str, str]:
    """Return a confluent-kafka producer configuration dict."""
    return {
        "bootstrap.servers": self.bootstrap_servers,
        "broker.address.family": self.broker_address_family,
        "log_level": "3",
        "message.timeout.ms": "5000",
        "request.timeout.ms": "5000",
        "socket.timeout.ms": "5000",
    }

src/datafun_streaming/kafka/kafka_producer_utils.py.

Kafka producer helpers for streaming examples.

create_producer(settings)

Create a Kafka producer.

Parameters:

Name Type Description Default
settings KafkaSettings

KafkaSettings object with producer configuration.

required

Returns:

Type Description
Producer

A confluent_kafka.Producer instance.

Source code in src/datafun_streaming/kafka/kafka_producer_utils.py
def create_producer(settings: KafkaSettings) -> Producer:
    """Create a Kafka producer.

    Arguments:
        settings: KafkaSettings object with producer configuration.

    Returns:
        A confluent_kafka.Producer instance.
    """
    return Producer(
        settings.producer_config(),
        logger=logging.getLogger("rdkafka.producer"),
    )

prepare_producer_topic(settings)

Prepare the Kafka topic before producing messages.

If settings.clear_topic_on_start is true, delete and recreate the topic so the producer starts with an empty topic.

If settings.clear_topic_on_start is false, keep an existing topic.

If the topic does not exist, create it.

Source code in src/datafun_streaming/kafka/kafka_producer_utils.py
def prepare_producer_topic(settings: KafkaSettings) -> None:
    """Prepare the Kafka topic before producing messages.

    If settings.clear_topic_on_start is true, delete and recreate the topic
    so the producer starts with an empty topic.

    If settings.clear_topic_on_start is false, keep an existing topic.

    If the topic does not exist, create it.
    """
    admin = create_admin_client(settings)

    if topic_exists(admin, settings.topic):
        if settings.clear_topic_on_start:
            delete_topic(admin, settings.topic)
        else:
            return

    if not topic_exists(admin, settings.topic):
        create_topic(admin, settings.topic)

produce_kafka_message(*, producer, topic, key, message)

Produce one dictionary message to Kafka as JSON.

This function sends one message and waits for delivery before returning. That makes producer examples reliable and easy to reason about.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
producer Producer

A confluent_kafka.Producer instance.

required
topic str

The Kafka topic to produce to.

required
key str

The Kafka message key.

required
message dict[str, Any]

The message dictionary to produce.

required

Raises:

Type Description
RuntimeError

If Kafka reports a delivery failure.

Source code in src/datafun_streaming/kafka/kafka_producer_utils.py
def produce_kafka_message(
    *,
    producer: Producer,
    topic: str,
    key: str,
    message: dict[str, Any],
) -> None:
    """Produce one dictionary message to Kafka as JSON.

    This function sends one message and waits for delivery before returning.
    That makes producer examples reliable and easy to reason about.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        producer: A confluent_kafka.Producer instance.
        topic: The Kafka topic to produce to.
        key: The Kafka message key.
        message: The message dictionary to produce.

    Raises:
        RuntimeError: If Kafka reports a delivery failure.
    """
    delivery_errors: list[str] = []

    def delivery_report(error: Any, _delivered_message: Any) -> None:
        """Record Kafka delivery failure details."""
        if error is not None:
            delivery_errors.append(str(error))

    producer.produce(
        topic=topic,
        key=key.encode("utf-8"),
        value=row_to_json(message).encode("utf-8"),
        callback=delivery_report,
    )

    remaining = producer.flush(timeout=10)

    if remaining > 0:
        detail = f"{remaining} Kafka message(s) were not delivered before timeout."
        msg = kafka_delivery_failed_message(detail=detail)
        raise RuntimeError(msg)

    if delivery_errors:
        detail = "; ".join(delivery_errors)
        msg = kafka_delivery_failed_message(detail=detail)
        raise RuntimeError(msg)

src/datafun_streaming/kafka/kafka_consumer_utils.py.

Consumer helpers for Kafka messages.

consume_kafka_message(*, consumer, timeout_seconds)

Consume one Kafka message and return it as a row dictionary.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
consumer Any

A confluent_kafka.Consumer instance.

required
timeout_seconds float

How long to wait for a message before giving up.

required

Returns:

Type Description
dict[str, Any] | None

A dictionary representing the message, or None if no message was received.

Source code in src/datafun_streaming/kafka/kafka_consumer_utils.py
def consume_kafka_message(
    *,
    consumer: Any,
    timeout_seconds: float,
) -> dict[str, Any] | None:
    """Consume one Kafka message and return it as a row dictionary.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        consumer: A confluent_kafka.Consumer instance.
        timeout_seconds: How long to wait for a message before giving up.

    Returns:
        A dictionary representing the message, or None if no message was received.

    """
    message = consumer.poll(timeout_seconds)

    if message is None:
        return None

    if message.error():
        msg = kafka_consume_failed_message(detail=str(message.error()))
        raise RuntimeError(msg)

    raw_value = message.value()

    if raw_value is None:
        return None

    row = row_from_json(raw_value.decode("utf-8"))

    raw_key = message.key()
    row["_kafka_key"] = raw_key.decode("utf-8") if raw_key else ""
    row["_kafka_partition"] = message.partition()
    row["_kafka_offset"] = message.offset()

    return row

create_consumer(settings)

Create a Kafka consumer.

Source code in src/datafun_streaming/kafka/kafka_consumer_utils.py
def create_consumer(settings: KafkaSettings) -> Consumer:
    """Create a Kafka consumer."""
    return Consumer(
        settings.consumer_config(),
        logger=logging.getLogger("rdkafka.consumer"),
    )

create_consumer_from_beginning(settings)

Create a Kafka consumer subscribed to the topic from the beginning.

This is useful for learning examples where each run should read all available messages from the topic.

Parameters:

Name Type Description Default
settings KafkaSettings

KafkaSettings instance with the topic and consumer config.

required

Returns:

Type Description
Any

A confluent_kafka.Consumer instance subscribed to the topic from the beginning.

Source code in src/datafun_streaming/kafka/kafka_consumer_utils.py
def create_consumer_from_beginning(settings: KafkaSettings) -> Any:
    """Create a Kafka consumer subscribed to the topic from the beginning.

    This is useful for learning examples where each run should read all
    available messages from the topic.

    Arguments:
        settings: KafkaSettings instance with the topic and consumer config.

    Returns:
        A confluent_kafka.Consumer instance subscribed to the topic from the beginning.
    """
    consumer = create_consumer(settings)

    consumer.subscribe(
        [settings.topic],
        on_assign=lambda c, partitions: c.assign(
            [
                TopicPartition(
                    partition.topic,
                    partition.partition,
                    OFFSET_BEGINNING,
                )
                for partition in partitions
            ]
        ),
    )

    return consumer

verify_consumer_topic_ready(settings)

Verify the Kafka topic exists and has messages.

Parameters:

Name Type Description Default
settings KafkaSettings

KafkaSettings instance with the topic and consumer config.

required

Returns:

Type Description
None

None

Raises:

Type Description
RuntimeError

If the topic does not exist or has no messages.

Source code in src/datafun_streaming/kafka/kafka_consumer_utils.py
def verify_consumer_topic_ready(settings: KafkaSettings) -> None:
    """Verify the Kafka topic exists and has messages.

    Arguments:
        settings: KafkaSettings instance with the topic and consumer config.

    Returns:
        None

    Raises:
        RuntimeError: If the topic does not exist or has no messages.
    """
    admin = create_admin_client(settings)

    if not topic_exists(admin, settings.topic):
        msg = kafka_topic_not_found_message(
            topic=settings.topic,
            bootstrap_servers=settings.bootstrap_servers,
        )
        raise RuntimeError(msg)

    message_count = get_topic_message_count(admin, settings.topic, settings)

    if message_count == 0:
        msg = kafka_topic_empty_message(topic=settings.topic)
        raise RuntimeError(msg)

src/datafun_streaming/kafka/kafka_admin_utils.py.

Kafka topic management helpers for streaming examples.

create_admin_client(settings)

Create a Kafka AdminClient.

Source code in src/datafun_streaming/kafka/kafka_admin_utils.py
def create_admin_client(settings: KafkaSettings) -> AdminClient:
    """Create a Kafka AdminClient."""
    return AdminClient({"bootstrap.servers": settings.bootstrap_servers})

create_topic(admin, topic, *, num_partitions=1, replication_factor=1)

Create a Kafka topic if it does not already exist.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
admin AdminClient

An AdminClient instance.

required
topic str

The topic name to create.

required
num_partitions int

Number of partitions (default 1 for local dev).

1
replication_factor int

Replication factor (default 1 for local dev).

1

Raises:

Type Description
RuntimeError

If topic creation fails.

Source code in src/datafun_streaming/kafka/kafka_admin_utils.py
def create_topic(
    admin: AdminClient,
    topic: str,
    *,
    num_partitions: int = 1,
    replication_factor: int = 1,
) -> None:
    """Create a Kafka topic if it does not already exist.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        admin: An AdminClient instance.
        topic: The topic name to create.
        num_partitions: Number of partitions (default 1 for local dev).
        replication_factor: Replication factor (default 1 for local dev).

    Raises:
        RuntimeError: If topic creation fails.
    """
    if topic_exists(admin, topic):
        return

    new_topic = NewTopic(
        topic,
        num_partitions=num_partitions,
        replication_factor=replication_factor,
    )

    futures = admin.create_topics([new_topic])

    for topic_name, future in futures.items():
        try:
            future.result()
        except KafkaException as error:
            msg = (
                f"Failed to create topic {topic_name!r}.\n"
                f"Kafka reported: {error}\n\n"
                "Check that Kafka is running and that you have permission to create topics."
            )
            raise RuntimeError(msg) from error

delete_topic(admin, topic)

Delete a Kafka topic if it exists.

Deleting a topic removes all its messages. Run the producer again after deleting to repopulate the topic.

Parameters:

Name Type Description Default
admin AdminClient

An AdminClient instance.

required
topic str

The topic name to delete.

required

Raises:

Type Description
RuntimeError

If topic deletion fails.

Source code in src/datafun_streaming/kafka/kafka_admin_utils.py
def delete_topic(admin: AdminClient, topic: str) -> None:
    """Delete a Kafka topic if it exists.

    Deleting a topic removes all its messages. Run the producer again
    after deleting to repopulate the topic.

    Arguments:
        admin: An AdminClient instance.
        topic: The topic name to delete.

    Raises:
        RuntimeError: If topic deletion fails.
    """
    if not topic_exists(admin, topic):
        return

    futures = admin.delete_topics([topic])

    for topic_name, future in futures.items():
        try:
            future.result()
        except KafkaException as error:
            msg = (
                f"Failed to delete topic {topic_name!r}.\n"
                f"Kafka reported: {error}\n\n"
                "Check that Kafka is running and that you have permission to delete topics."
            )
            raise RuntimeError(msg) from error

get_topic_message_count(admin, topic, settings)

Return the total number of messages available in a topic.

Sums the high-water offset across all partitions. This reflects the total messages ever produced to the topic, not the number of unread messages for a specific consumer group.

Parameters:

Name Type Description Default
admin AdminClient

An AdminClient instance.

required
topic str

The topic name to inspect.

required
settings KafkaSettings

KafkaSettings instance containing configuration.

required

Returns:

Type Description
int

Total message count across all partitions, or 0 if topic is empty.

Raises:

Type Description
RuntimeError

If topic metadata cannot be retrieved.

Source code in src/datafun_streaming/kafka/kafka_admin_utils.py
def get_topic_message_count(
    admin: AdminClient, topic: str, settings: KafkaSettings
) -> int:
    """Return the total number of messages available in a topic.

    Sums the high-water offset across all partitions.
    This reflects the total messages ever produced to the topic,
    not the number of unread messages for a specific consumer group.

    Arguments:
        admin: An AdminClient instance.
        topic: The topic name to inspect.
        settings: KafkaSettings instance containing configuration.

    Returns:
        Total message count across all partitions, or 0 if topic is empty.

    Raises:
        RuntimeError: If topic metadata cannot be retrieved.
    """
    try:
        metadata = admin.list_topics(topic=topic, timeout=5)
    except KafkaException as error:
        msg = kafka_admin_failed_message(
            operation="list_topics",
            topic=topic,
            detail=str(error),
        )
        raise RuntimeError(msg) from error

    topic_metadata = metadata.topics.get(topic)
    if topic_metadata is None:
        return 0

    bootstrap_servers = settings.bootstrap_servers
    temp_consumer = Consumer(
        {
            "bootstrap.servers": bootstrap_servers,
            "broker.address.family": settings.broker_address_family,
            "group.id": "_offset_inspector",
            "enable.auto.commit": "false",
        }
    )

    total = 0
    try:
        for partition_id in topic_metadata.partitions:
            tp = TopicPartition(topic, partition_id)
            low, high = temp_consumer.get_watermark_offsets(tp, timeout=5)
            total += max(0, high - low)
    finally:
        temp_consumer.close()

    return total

list_topics(admin)

Return a sorted list of topic names currently in Kafka.

Retries several times to allow rdkafka's broker handshake to complete. On Windows, the handshake can take up to ~4 seconds after the AdminClient is created, which causes an immediate call to fail with a transport error.

Parameters:

Name Type Description Default
admin AdminClient

An AdminClient instance.

required

Returns:

Type Description
list[str]

A sorted list of topic name strings.

Raises:

Type Description
RuntimeError

If Kafka is unreachable after all retries.

Source code in src/datafun_streaming/kafka/kafka_admin_utils.py
def list_topics(admin: AdminClient) -> list[str]:
    """Return a sorted list of topic names currently in Kafka.

    Retries several times to allow rdkafka's broker handshake to complete.
    On Windows, the handshake can take up to ~4 seconds after the AdminClient
    is created, which causes an immediate call to fail with a transport error.

    Arguments:
        admin: An AdminClient instance.

    Returns:
        A sorted list of topic name strings.

    Raises:
        RuntimeError: If Kafka is unreachable after all retries.
    """
    last_error: Exception | None = None

    for attempt in range(1, ADMIN_READY_RETRIES + 1):
        try:
            metadata = admin.list_topics(timeout=5)
            return sorted(metadata.topics.keys())
        except KafkaException as error:
            last_error = error
            if attempt < ADMIN_READY_RETRIES:
                time.sleep(ADMIN_READY_DELAY_SECONDS)

    msg = kafka_admin_failed_message(
        operation="list_topics",
        topic="(all)",
        detail=(
            f"Kafka did not respond after {ADMIN_READY_RETRIES} attempts.\n"
            f"    Last error: {last_error}"
        ),
    )
    raise RuntimeError(msg) from last_error

topic_exists(admin, topic)

Return True if the topic already exists in Kafka.

Source code in src/datafun_streaming/kafka/kafka_admin_utils.py
def topic_exists(admin: AdminClient, topic: str) -> bool:
    """Return True if the topic already exists in Kafka."""
    return topic in list_topics(admin)

src/datafun_streaming/kafka/kafka_connection_utils.py.

Kafka connection helpers for streaming examples.

verify_kafka_connection(settings)

Verify that the Kafka bootstrap server is reachable.

Source code in src/datafun_streaming/kafka/kafka_connection_utils.py
def verify_kafka_connection(settings: KafkaSettings) -> None:
    """Verify that the Kafka bootstrap server is reachable."""
    bootstrap_server = settings.bootstrap_servers.split(",")[0].strip()

    if ":" not in bootstrap_server:
        msg = (
            "KAFKA_BOOTSTRAP_SERVERS must include host and port, "
            f"but got {bootstrap_server!r}."
        )
        raise ConnectionError(msg)

    host, port_text = bootstrap_server.rsplit(":", 1)

    try:
        port = int(port_text)
    except ValueError as error:
        msg = f"KAFKA_BOOTSTRAP_SERVERS has an invalid port. Got {bootstrap_server!r}."
        raise ConnectionError(msg) from error

    try:
        with socket.create_connection((host, port), timeout=5):
            return
    except OSError as error:
        msg = kafka_not_reachable_message(
            bootstrap_servers=settings.bootstrap_servers,
        )
        raise ConnectionError(msg) from error

src/datafun_streaming/kafka/errors.py.

Error messages for Kafka.

kafka_admin_failed_message(*, operation, topic, detail)

Return help text for a failed Kafka admin operation.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
operation str

The admin operation that failed.

required
topic str

The Kafka topic involved in the operation.

required
detail str

Additional details about the failure.

required

Returns:

Type Description
str

A help message with troubleshooting steps.

Source code in src/datafun_streaming/kafka/errors.py
def kafka_admin_failed_message(*, operation: str, topic: str, detail: str) -> str:
    """Return help text for a failed Kafka admin operation.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        operation: The admin operation that failed.
        topic: The Kafka topic involved in the operation.
        detail: Additional details about the failure.

    Returns:
        A help message with troubleshooting steps.
    """
    return f"""
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
A Kafka admin operation failed.
Operation: {operation}
Topic:     {topic}
Details:
    {detail}

CHECK:
1. Confirm Kafka is running. Follow ref_START_KAFKA.md.
2. Confirm you have permission to {operation} topics.
3. Try the operation manually from the CLI:
   cd ~/kafka
   bin/kafka-topics.sh --list --bootstrap-server localhost:9092
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
""".strip()

kafka_consume_failed_message(*, detail)

Return help text for a Kafka consume failure.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
detail str

Additional details about the failure.

required

Returns:

Type Description
str

A help message with troubleshooting steps.

Source code in src/datafun_streaming/kafka/errors.py
def kafka_consume_failed_message(*, detail: str) -> str:
    """Return help text for a Kafka consume failure.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        detail: Additional details about the failure.

    Returns:
        A help message with troubleshooting steps.
    """
    return f"""
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
The consumer ran, but Kafka did not return a usable message.
Kafka reported an error while consuming a message.
Details:
    {detail}

CHECK:
1. Confirm Kafka is running.
2. Confirm the topic exists. Follow MANAGE_TOPIC.md.
3. Run the producer again if the topic has no messages.
4. If you already consumed these messages,
   set a different KAFKA_GROUP_ID in .env.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
""".strip()

kafka_delivery_failed_message(*, detail)

Return help text for a Kafka delivery failure.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
detail str

Additional details about the failure.

required

Returns:

Type Description
str

A help message with troubleshooting steps.

Source code in src/datafun_streaming/kafka/errors.py
def kafka_delivery_failed_message(*, detail: str) -> str:
    """Return help text for a Kafka delivery failure.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        detail: Additional details about the failure.

    Returns:
        A help message with troubleshooting steps.
    """
    return f"""
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
The message was generated, but Kafka did not accept it.
Kafka did not confirm message delivery.
Details:
    {detail}

CHECK:
1. Confirm Kafka is running.
2. Confirm the topic exists.
3. Confirm the broker is reachable at localhost:9092.
4. Try MANAGE_TOPIC.md to verify Kafka independently of Python.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
""".strip()

kafka_no_messages_message()

Return help text when no Kafka messages are consumed.

Returns:

Type Description
str

A help message with troubleshooting steps.

Source code in src/datafun_streaming/kafka/errors.py
def kafka_no_messages_message() -> str:
    """Return help text when no Kafka messages are consumed.

    Returns:
        A help message with troubleshooting steps.
    """
    return """
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Kafka may be reachable, but no unread messages
were available for this consumer.
No message received before timeout.

CHECK:
1. Confirm Kafka is running.
2. Confirm the topic exists. Follow MANAGE_TOPIC.md.
3. Run the producer in another project terminal.
4. If this consumer group already read the messages,
   set a different KAFKA_GROUP_ID in .env.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
""".strip()

kafka_not_reachable_message(*, bootstrap_servers)

Return help text for a Kafka connection failure.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
bootstrap_servers str

The Kafka bootstrap servers.

required

Returns:

Type Description
str

A help message with troubleshooting steps.

Source code in src/datafun_streaming/kafka/errors.py
def kafka_not_reachable_message(*, bootstrap_servers: str) -> str:
    """Return help text for a Kafka connection failure.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        bootstrap_servers: The Kafka bootstrap servers.

    Returns:
        A help message with troubleshooting steps.
    """
    return f"""
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Python code is running,
but Kafka is not available.
Kafka is not reachable.
The program tried to connect to:
    KAFKA_BOOTSTRAP_SERVERS = {bootstrap_servers}

CHECK:
1. Start Kafka first. Follow START_KAFKA.md.
2. Verify Kafka is running. In a terminal, run:
   cd ~/kafka
   bin/kafka-topics.sh --list --bootstrap-server localhost:9092
3. Verify the topic exists. Follow MANAGE_TOPIC.md.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
""".strip()

kafka_topic_empty_message(*, topic)

Return help text when a Kafka topic exists but has no messages.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
topic str

The Kafka topic.

required

Returns:

Type Description
str

A help message with troubleshooting steps.

Source code in src/datafun_streaming/kafka/errors.py
def kafka_topic_empty_message(*, topic: str) -> str:
    """Return help text when a Kafka topic exists but has no messages.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        topic: The Kafka topic.

    Returns:
        A help message with troubleshooting steps.
    """
    return f"""
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
The topic exists but contains no messages.
Topic is empty:
    KAFKA_TOPIC = {topic}

CHECK:
1. Run the producer first to send messages to this topic.
2. If you already ran the producer, confirm it completed successfully.
3. If messages were consumed by another consumer group,
   run the producer again to repopulate the topic.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
""".strip()

kafka_topic_not_found_message(*, topic, bootstrap_servers)

Return help text when a required Kafka topic does not exist.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
topic str

The Kafka topic.

required
bootstrap_servers str

The Kafka bootstrap servers.

required

Returns:

Type Description
str

A help message with troubleshooting steps.

Source code in src/datafun_streaming/kafka/errors.py
def kafka_topic_not_found_message(*, topic: str, bootstrap_servers: str) -> str:
    """Return help text when a required Kafka topic does not exist.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        topic: The Kafka topic.
        bootstrap_servers: The Kafka bootstrap servers.

    Returns:
        A help message with troubleshooting steps.
    """
    return f"""
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
The topic does not exist in Kafka.
Topic not found:
    KAFKA_TOPIC = {topic}
    KAFKA_BOOTSTRAP_SERVERS = {bootstrap_servers}

CHECK:
1. Create the topic first. Follow ref_MANAGE_TOPIC.md.
   cd ~/kafka
   bin/kafka-topics.sh --create --topic {topic} --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
2. Confirm the topic was created:
   bin/kafka-topics.sh --list --bootstrap-server localhost:9092
3. Confirm KAFKA_TOPIC in .env matches the topic you created.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
""".strip()

Stats

stats/stats_utils.py.

Running statistics for streaming data.

Provides a RunningStats class that tracks count, sum, mean, min, and max for a stream of numeric values without storing the full history.

This is domain-agnostic: it works on any numeric field from any message. Pass it a value on each message and read the current statistics at any time.

Author: Denise Case Date: 2026-05

RunningStats dataclass

Accumulates running statistics for a stream of numeric values.

Updates incrementally (one value at a time) without storing history. Safe to use inside a message processing loop.

Do not use min and max as they would conflict with built-in functions. Access minimum and maximum values via the minimum and maximum attributes.

Attributes:

Name Type Description
count int

Number of values received so far.

total float

Running sum of all values.

mean float

Running mean of all values.

minimum float

Minimum value seen so far.

maximum float

Maximum value seen so far.

Example

stats = RunningStats() for message in messages: stats.update(message["total"]) print(f"count={stats.count} mean={stats.mean:.2f}")

Source code in src/datafun_streaming/stats/stats_utils.py
@dataclass
class RunningStats:
    """Accumulates running statistics for a stream of numeric values.

    Updates incrementally (one value at a time) without storing history.
    Safe to use inside a message processing loop.

    Do not use min and max as they would conflict with
    built-in functions.
    Access minimum and maximum values
    via the minimum and maximum attributes.

    Attributes:
        count: Number of values received so far.
        total: Running sum of all values.
        mean:  Running mean of all values.
        minimum:   Minimum value seen so far.
        maximum:   Maximum value seen so far.

    Example:
        stats = RunningStats()
        for message in messages:
            stats.update(message["total"])
            print(f"count={stats.count}  mean={stats.mean:.2f}")
    """

    count: int = 0
    total: float = 0.0
    mean: float = 0.0
    minimum: float = float("inf")
    maximum: float = float("-inf")

    def update(self, value: float) -> None:
        """Update statistics with one new value.

        Arguments:
            value: The new numeric value to include.

        Returns:
            None.
        """
        self.count += 1
        self.total += value
        self.mean = self.total / self.count
        if value < self.minimum:
            self.minimum = value
        if value > self.maximum:
            self.maximum = value

    def reset(self) -> None:
        """Reset all statistics to their initial state.

        Use this to start a new window or clear accumulated state.

        Returns:
            None.
        """
        self.count = 0
        self.total = 0.0
        self.mean = 0.0
        self.minimum = float("inf")
        self.maximum = float("-inf")

    @property
    def is_empty(self) -> bool:
        """Return True if no values have been received yet."""
        return self.count == 0

    def summary(self) -> str:
        """Return a formatted summary string for logging.

        Returns:
            A single-line string with all current statistics.
        """
        if self.is_empty:
            return "RunningStats: no values received yet."
        return (
            f"count={self.count}  "
            f"total={self.total:,.2f}  "
            f"mean={self.mean:,.2f}  "
            f"minimum={self.minimum:,.2f}  "
            f"maximum={self.maximum:,.2f}"
        )

is_empty property

Return True if no values have been received yet.

reset()

Reset all statistics to their initial state.

Use this to start a new window or clear accumulated state.

Returns:

Type Description
None

None.

Source code in src/datafun_streaming/stats/stats_utils.py
def reset(self) -> None:
    """Reset all statistics to their initial state.

    Use this to start a new window or clear accumulated state.

    Returns:
        None.
    """
    self.count = 0
    self.total = 0.0
    self.mean = 0.0
    self.minimum = float("inf")
    self.maximum = float("-inf")

summary()

Return a formatted summary string for logging.

Returns:

Type Description
str

A single-line string with all current statistics.

Source code in src/datafun_streaming/stats/stats_utils.py
def summary(self) -> str:
    """Return a formatted summary string for logging.

    Returns:
        A single-line string with all current statistics.
    """
    if self.is_empty:
        return "RunningStats: no values received yet."
    return (
        f"count={self.count}  "
        f"total={self.total:,.2f}  "
        f"mean={self.mean:,.2f}  "
        f"minimum={self.minimum:,.2f}  "
        f"maximum={self.maximum:,.2f}"
    )

update(value)

Update statistics with one new value.

Parameters:

Name Type Description Default
value float

The new numeric value to include.

required

Returns:

Type Description
None

None.

Source code in src/datafun_streaming/stats/stats_utils.py
def update(self, value: float) -> None:
    """Update statistics with one new value.

    Arguments:
        value: The new numeric value to include.

    Returns:
        None.
    """
    self.count += 1
    self.total += value
    self.mean = self.total / self.count
    if value < self.minimum:
        self.minimum = value
    if value > self.maximum:
        self.maximum = value

Storage

src/datafun_streaming/storage/duckdb_utils.py.

DuckDB utilities for streaming data.

Provides functions to initialize, write to, query, and close a DuckDB database from a streaming consumer.

This is domain-agnostic: it works with any table name and any row dict. Tables are created automatically from the first row received. Schema is inferred from Python value types.

Author: Denise Case Date: 2026-05

close_db(conn)

Close a DuckDB connection.

Parameters:

Name Type Description Default
conn DuckDBPyConnection

An open DuckDB connection.

required

Returns:

Type Description
None

None.

Source code in src/datafun_streaming/storage/duckdb_utils.py
def close_db(conn: duckdb.DuckDBPyConnection) -> None:
    """Close a DuckDB connection.

    Arguments:
        conn: An open DuckDB connection.

    Returns:
        None.
    """
    conn.close()
    LOG.debug("DuckDB connection closed.")

connect_to_database(database_file_path)

Connect to the DuckDB database file.

Parameters:

Name Type Description Default
database_file_path Path

Path to the DuckDB database file.

required

Returns:

Type Description
DuckDBPyConnection

An open DuckDB connection.

Source code in src/datafun_streaming/storage/duckdb_utils.py
def connect_to_database(database_file_path: Path) -> duckdb.DuckDBPyConnection:
    """Connect to the DuckDB database file.

    Arguments:
        database_file_path: Path to the DuckDB database file.

    Returns:
        An open DuckDB connection.
    """
    return duckdb.connect(str(database_file_path))

init_db(path)

Open or create a DuckDB database at the given path.

If the file already exists it is opened and reused. If it does not exist it is created.

Parameters:

Name Type Description Default
path Path

File path for the DuckDB database (e.g. data/output/sales.duckdb).

required

Returns:

Type Description
DuckDBPyConnection

An open DuckDB connection.

Source code in src/datafun_streaming/storage/duckdb_utils.py
def init_db(path: Path) -> duckdb.DuckDBPyConnection:
    """Open or create a DuckDB database at the given path.

    If the file already exists it is opened and reused.
    If it does not exist it is created.

    Arguments:
        path: File path for the DuckDB database (e.g. data/output/sales.duckdb).

    Returns:
        An open DuckDB connection.
    """
    path.parent.mkdir(parents=True, exist_ok=True)
    conn = duckdb.connect(str(path))
    LOG.debug(f"DuckDB opened: {path}")
    return conn

query_db(conn, sql, params=None)

Execute a SQL query and return results as a list of dicts.

Parameters:

Name Type Description Default
conn DuckDBPyConnection

Open DuckDB connection.

required
sql str

SQL query string. Use ? for parameter placeholders.

required
params list[Any] | None

Optional list of parameter values for placeholders.

None

Returns:

Type Description
list[dict[str, Any]]

A list of row dicts. Empty list if no rows matched.

Example

rows = query_db(conn, "SELECT * FROM sales WHERE region_id = ?", ["US-MO"])

Source code in src/datafun_streaming/storage/duckdb_utils.py
def query_db(
    conn: duckdb.DuckDBPyConnection,
    sql: str,
    params: list[Any] | None = None,
) -> list[dict[str, Any]]:
    """Execute a SQL query and return results as a list of dicts.

    Arguments:
        conn:   Open DuckDB connection.
        sql:    SQL query string. Use ? for parameter placeholders.
        params: Optional list of parameter values for placeholders.

    Returns:
        A list of row dicts. Empty list if no rows matched.

    Example:
        rows = query_db(conn, "SELECT * FROM sales WHERE region_id = ?", ["US-MO"])
    """
    result = conn.execute(sql, params or [])
    columns = [desc[0] for desc in result.description]
    return [dict(zip(columns, row, strict=False)) for row in result.fetchall()]

safe_table_name(table_name, allowed)

Return table_name after confirming it is in the allowed list.

SQL identifiers (table names, column names) cannot use parameterized query placeholders. This allowlist check prevents accidental injection if a caller passes an unexpected string.

Parameters:

Name Type Description Default
table_name str

The table name to validate.

required
allowed frozenset[str]

A frozenset of allowed table names.

required

Returns:

Type Description
str

The validated table name, unchanged.

Raises:

Type Description
ValueError

If table_name is not in the allowed list.

Source code in src/datafun_streaming/storage/duckdb_utils.py
def safe_table_name(table_name: str, allowed: frozenset[str]) -> str:
    """Return table_name after confirming it is in the allowed list.

    SQL identifiers (table names, column names) cannot use parameterized
    query placeholders. This allowlist check prevents accidental injection
    if a caller passes an unexpected string.

    Arguments:
        table_name: The table name to validate.
        allowed: A frozenset of allowed table names.

    Returns:
        The validated table name, unchanged.

    Raises:
        ValueError: If table_name is not in the allowed list.
    """
    if table_name not in allowed:
        raise ValueError(
            f"Table name {table_name!r} is not in the allowed list. "
            f"Allowed: {sorted(allowed)}"
        )
    return table_name

upsert_row(conn, *, table, row, primary_key, allowed_tables)

Insert or replace one row in a DuckDB table.

All arguments after the asterisk must be passed as keyword arguments.

Creates the table on the first call if it does not exist. On subsequent calls with the same primary key value, the existing row is replaced with the new values.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
conn DuckDBPyConnection

Open DuckDB connection.

required
table str

Table name to write to.

required
row dict[str, Any]

The row to insert or replace.

required
primary_key str

The field name that uniquely identifies each row.

required
allowed_tables frozenset[str]

A frozenset of allowed table names.

required

Returns:

Type Description
None

None.

Source code in src/datafun_streaming/storage/duckdb_utils.py
def upsert_row(
    conn: duckdb.DuckDBPyConnection,
    *,
    table: str,
    row: dict[str, Any],
    primary_key: str,
    allowed_tables: frozenset[str],  # ← caller provides this
) -> None:
    """Insert or replace one row in a DuckDB table.

    All arguments after the asterisk must be passed as keyword arguments.

    Creates the table on the first call if it does not exist.
    On subsequent calls with the same primary key value,
    the existing row is replaced with the new values.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        conn:        Open DuckDB connection.
        table:       Table name to write to.
        row:         The row to insert or replace.
        primary_key: The field name that uniquely identifies each row.
        allowed_tables: A frozenset of allowed table names.

    Returns:
        None.
    """
    # Validate before use: table name must be in the allowlist,
    # primary key must be a field in the row.
    safe = safe_table_name(table, allowed_tables)
    safe_pk = safe_table_name(primary_key, frozenset(row.keys()))

    # Pass the validated names so _ensure_table never receives raw input.
    _ensure_table(conn, safe, row, safe_pk)

    pk_value = row[safe_pk]
    conn.execute(
        f"DELETE FROM {safe} WHERE {safe_pk} = ?",  # noqa: S608 - identifiers validated via safe_table_name allowlist
        [pk_value],
    )

    cols = ", ".join(row.keys())
    placeholders = ", ".join(["?"] * len(row))
    conn.execute(
        f"INSERT INTO {safe} ({cols}) VALUES ({placeholders})",  # noqa: S608 - identifiers validated via safe_table_name allowlist
        list(row.values()),
    )
    LOG.debug(f"Upserted row into {safe} with primary key {safe_pk}={pk_value}")

Visualization

src/datafun_streaming/visualization/chart_utils.py.

Chart utilities for streaming data.

Provides functions to create, update, and save a line chart that accumulates data points as messages are consumed.

Uses Plotly to generate an interactive HTML chart. The chart is updated in memory as each message arrives and saved to disk at the end of the consume loop (Section C4).

This is domain-agnostic: pass any numeric field and any label. The chart does not know what it is charting.

Author: Denise Case Date: 2026-05

StreamingChart dataclass

Holds chart state for a single line series.

Updated incrementally as messages arrive. Rendered to HTML when save_chart() is called.

Attributes:

Name Type Description
title str

Chart title shown at the top.

x_label str

Label for the x-axis.

y_label str

Label for the y-axis.

x_values list[int | float | str]

Accumulated x-axis values (e.g. message count).

y_values list[float]

Accumulated y-axis values (e.g. running total).

Source code in src/datafun_streaming/visualization/chart_utils.py
@dataclass
class StreamingChart:
    """Holds chart state for a single line series.

    Updated incrementally as messages arrive.
    Rendered to HTML when save_chart() is called.

    Attributes:
        title:    Chart title shown at the top.
        x_label:  Label for the x-axis.
        y_label:  Label for the y-axis.
        x_values: Accumulated x-axis values (e.g. message count).
        y_values: Accumulated y-axis values (e.g. running total).
    """

    title: str
    x_label: str
    y_label: str
    x_values: list[int | float | str] = field(default_factory=list)
    y_values: list[float] = field(default_factory=list)

    @property
    def is_empty(self) -> bool:
        """Return True if no data points have been added yet."""
        return len(self.x_values) == 0

is_empty property

Return True if no data points have been added yet.

init_chart(*, title, x_label, y_label)

Create a new empty StreamingChart.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
title str

Chart title.

required
x_label str

Label for the x-axis.

required
y_label str

Label for the y-axis.

required

Returns:

Type Description
StreamingChart

An empty StreamingChart ready to receive data points.

Source code in src/datafun_streaming/visualization/chart_utils.py
def init_chart(
    *,
    title: str,
    x_label: str,
    y_label: str,
) -> StreamingChart:
    """Create a new empty StreamingChart.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        title:   Chart title.
        x_label: Label for the x-axis.
        y_label: Label for the y-axis.

    Returns:
        An empty StreamingChart ready to receive data points.
    """
    return StreamingChart(title=title, x_label=x_label, y_label=y_label)

save_chart(chart, path)

Render the chart to an interactive HTML file.

Parameters:

Name Type Description Default
chart StreamingChart

The StreamingChart to render.

required
path Path

Output file path. Must end in .html.

required

Returns:

Type Description
None

None.

Source code in src/datafun_streaming/visualization/chart_utils.py
def save_chart(chart: StreamingChart, path: Path) -> None:
    """Render the chart to an interactive HTML file.

    Arguments:
        chart: The StreamingChart to render.
        path:  Output file path. Must end in .html.

    Returns:
        None.
    """
    if chart.is_empty:
        return

    fig = go.Figure()

    fig.add_trace(
        go.Scatter(
            x=chart.x_values,
            y=chart.y_values,
            mode="lines+markers",
            name=chart.y_label,
            line={"width": 2},
            marker={"size": 4},
        )
    )

    fig.update_layout(
        title=chart.title,
        xaxis_title=chart.x_label,
        yaxis_title=chart.y_label,
        hovermode="x unified",
        template="plotly_white",
    )

    path.parent.mkdir(parents=True, exist_ok=True)
    fig.write_html(str(path))

update_chart(chart, row, *, x_field='_kafka_offset', y_field='total')

Add one data point to the chart from a message row.

All arguments after the asterisk must be passed as keyword arguments.

Parameters:

Name Type Description Default
chart StreamingChart

The StreamingChart to update.

required
row dict[str, Any]

The enriched message row.

required
x_field str

The row field to use as the x-axis value. Defaults to _kafka_offset (message sequence number).

'_kafka_offset'
y_field str

The row field to use as the y-axis value. Defaults to total (post-tax total price).

'total'

Returns:

Type Description
None

None.

Source code in src/datafun_streaming/visualization/chart_utils.py
def update_chart(
    chart: StreamingChart,
    row: dict[str, Any],
    *,
    x_field: str = "_kafka_offset",
    y_field: str = "total",
) -> None:
    """Add one data point to the chart from a message row.

    All arguments after the asterisk must be passed as keyword arguments.

    Arguments:
        chart:   The StreamingChart to update.
        row:     The enriched message row.
        x_field: The row field to use as the x-axis value.
                 Defaults to _kafka_offset (message sequence number).
        y_field: The row field to use as the y-axis value.
                 Defaults to total (post-tax total price).

    Returns:
        None.
    """
    x_value = row.get(x_field, len(chart.x_values))
    y_value = row.get(y_field, 0.0)

    chart.x_values.append(x_value)
    chart.y_values.append(float(y_value))