API Reference¶
Auto-generated from Python docstrings.
Core¶
src/datafun_streaming/core/types.py.
Shared type aliases used across all datafun_streaming subpackages. Import from here when type-hinting streaming records in any module.
Data Validation¶
src/datafun_streaming/data_validation/types.py.
Type aliases and dataclasses for validation results.
Import from here whenever you need to type-hint a record or validation result.
ValidationResult
dataclass
¶
Result from checking one record against the data contract.
Attributes:
| Name | Type | Description |
|---|---|---|
is_valid |
bool
|
True if the record passed all validation checks. |
errors |
ErrorMessages
|
List of error messages; empty when is_valid is True. |
Source code in src/datafun_streaming/data_validation/types.py
src/datafun_streaming/data_validation/validation_utils.py.
Generic field-level validation functions.
Each function checks one thing about one value and returns a list of error strings, empty if valid, one or more messages if not. These functions know nothing about domains, reference data, or business rules. They only check types, formats, and value ranges.
OBS
Add functions to this file as validation requirements evolve.
add_validation_errors(*, record, errors)
¶
Return a copy of a record with validation errors attached.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
record
|
DataRecordDict
|
A dictionary representing one data record. |
required |
errors
|
ErrorMessages
|
A list of validation error messages. |
required |
Returns:
| Type | Description |
|---|---|
DataRecordDict
|
A copy of the record with a validation_errors field appended. |
Source code in src/datafun_streaming/data_validation/validation_utils.py
validate_boolean_text(value, *, field_name)
¶
Return errors for an invalid boolean text value.
All boolean values must be represented as "true" or "false" (case-insensitive).
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
str
|
The text value to validate. |
required |
field_name
|
str
|
The name of the field being validated, for error messages. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
A list of errors, or an empty list if the value is valid. |
Source code in src/datafun_streaming/data_validation/validation_utils.py
validate_datetime(value)
¶
Return errors for an invalid datetime value.
All datetime values must be in ISO 8601 format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
str
|
The text value to validate. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
A list of errors, or an empty list if the value is valid. |
Source code in src/datafun_streaming/data_validation/validation_utils.py
validate_positive_integer(value)
¶
Return errors for an invalid positive integer value.
All positive integer values must be integers greater than or equal to 1.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
str
|
The text value to validate. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
A list of errors, or an empty list if the value is valid. |
Source code in src/datafun_streaming/data_validation/validation_utils.py
validate_required_fields(*, record, required_fields)
¶
Return errors for missing or blank required fields.
All required fields must be present and not blank.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
record
|
DataRecordDict
|
A dictionary representing one data record / row. |
required |
required_fields
|
list[str]
|
A list of field names that are required. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
A list of errors, or |
list[str]
|
an empty list if all required fields are present. |
Source code in src/datafun_streaming/data_validation/validation_utils.py
src/datafun_streaming/data_validation/reference.py.
Reference data validation helpers.
Provides functions for working with lookup tables: building lookup sets from CSV rows and validating reference records.
make_lookup_set(records, key_field)
¶
Create a set of allowed values for a field in a reference table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
records
|
DataRecordDictList
|
A list of row dictionaries from a reference CSV file. |
required |
key_field
|
str
|
The field to use as the key for allowed values. |
required |
Returns:
| Type | Description |
|---|---|
AllowedValuesSet
|
A set of allowed values for the specified key field. |
Source code in src/datafun_streaming/data_validation/reference.py
validate_reference_records(*, records, required_fields, label)
¶
Validate reference records and return file-level errors.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
records
|
DataRecordDictList
|
Reference data records to validate. |
required |
required_fields
|
list[str]
|
Field names required in each record. |
required |
label
|
str
|
Label for this reference file, used in error messages. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
A list of errors, or an empty list if all records are valid. |
Source code in src/datafun_streaming/data_validation/reference.py
data_validation/errors.py.
Error messages for validation.
reference_validation_failed_message(*, label, error_count)
¶
Return help text when a reference data file fails validation.
Source code in src/datafun_streaming/data_validation/errors.py
IO¶
src/datafun_streaming/io/io_utils.py.
CSV and JSON helpers for streaming examples.
append_csv_row(path, row, fieldnames)
¶
Append one row to a CSV file, writing the header first if needed.
Source code in src/datafun_streaming/io/io_utils.py
format_message_for_log(message)
¶
Format one message dictionary for readable log output.
Source code in src/datafun_streaming/io/io_utils.py
read_csv_as_lookup(path, *, key_field, value_field)
¶
Read a CSV file into a key-value lookup dictionary.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path
|
Path to the CSV file. |
required |
key_field
|
str
|
The column to use as the dictionary key. |
required |
value_field
|
str
|
The column to use as the dictionary value. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A dict mapping each key_field value to its value_field value. |
Example
region_lookup = read_csv_as_lookup( REGIONS_CSV, key_field="region_id", value_field="tax_rate_pct" ) tax_rate = float(region_lookup["US-MO"]) / 100.0
Source code in src/datafun_streaming/io/io_utils.py
read_csv_rows(path)
¶
Read a CSV file into a list of string dictionaries.
Source code in src/datafun_streaming/io/io_utils.py
row_from_json(text)
¶
Convert JSON text to a row dictionary.
Source code in src/datafun_streaming/io/io_utils.py
io/errors.py.
missing_csv_field_message(*, field, available_fields)
¶
Return help text for a missing CSV field.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
field
|
str
|
The name of the missing field. |
required |
available_fields
|
list[str]
|
A list of field names that were found in the CSV file. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A help message with troubleshooting steps. |
Source code in src/datafun_streaming/io/errors.py
missing_csv_file_message(*, path)
¶
Return help text for a missing CSV file.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The file path that was not found. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A help message with troubleshooting steps. |
Source code in src/datafun_streaming/io/errors.py
Kafka¶
src/datafun_streaming/kafka/kafka_settings.py.
Kafka settings for producer and consumer examples.
KafkaSettings
dataclass
¶
Kafka settings for producer and consumer examples.
Source code in src/datafun_streaming/kafka/kafka_settings.py
consumer_config()
¶
Return a confluent-kafka consumer configuration dict.
Source code in src/datafun_streaming/kafka/kafka_settings.py
from_env()
classmethod
¶
Create Kafka settings from environment variables.
Source code in src/datafun_streaming/kafka/kafka_settings.py
producer_config()
¶
Return a confluent-kafka producer configuration dict.
Source code in src/datafun_streaming/kafka/kafka_settings.py
src/datafun_streaming/kafka/kafka_producer_utils.py.
Kafka producer helpers for streaming examples.
create_producer(settings)
¶
Create a Kafka producer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
KafkaSettings
|
KafkaSettings object with producer configuration. |
required |
Returns:
| Type | Description |
|---|---|
Producer
|
A confluent_kafka.Producer instance. |
Source code in src/datafun_streaming/kafka/kafka_producer_utils.py
prepare_producer_topic(settings)
¶
Prepare the Kafka topic before producing messages.
If settings.clear_topic_on_start is true, delete and recreate the topic so the producer starts with an empty topic.
If settings.clear_topic_on_start is false, keep an existing topic.
If the topic does not exist, create it.
Source code in src/datafun_streaming/kafka/kafka_producer_utils.py
produce_kafka_message(*, producer, topic, key, message)
¶
Produce one dictionary message to Kafka as JSON.
This function sends one message and waits for delivery before returning. That makes producer examples reliable and easy to reason about.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
producer
|
Producer
|
A confluent_kafka.Producer instance. |
required |
topic
|
str
|
The Kafka topic to produce to. |
required |
key
|
str
|
The Kafka message key. |
required |
message
|
dict[str, Any]
|
The message dictionary to produce. |
required |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If Kafka reports a delivery failure. |
Source code in src/datafun_streaming/kafka/kafka_producer_utils.py
src/datafun_streaming/kafka/kafka_consumer_utils.py.
Consumer helpers for Kafka messages.
consume_kafka_message(*, consumer, timeout_seconds)
¶
Consume one Kafka message and return it as a row dictionary.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
consumer
|
Any
|
A confluent_kafka.Consumer instance. |
required |
timeout_seconds
|
float
|
How long to wait for a message before giving up. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
A dictionary representing the message, or None if no message was received. |
Source code in src/datafun_streaming/kafka/kafka_consumer_utils.py
create_consumer(settings)
¶
create_consumer_from_beginning(settings)
¶
Create a Kafka consumer subscribed to the topic from the beginning.
This is useful for learning examples where each run should read all available messages from the topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
KafkaSettings
|
KafkaSettings instance with the topic and consumer config. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
A confluent_kafka.Consumer instance subscribed to the topic from the beginning. |
Source code in src/datafun_streaming/kafka/kafka_consumer_utils.py
verify_consumer_topic_ready(settings)
¶
Verify the Kafka topic exists and has messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
KafkaSettings
|
KafkaSettings instance with the topic and consumer config. |
required |
Returns:
| Type | Description |
|---|---|
None
|
None |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the topic does not exist or has no messages. |
Source code in src/datafun_streaming/kafka/kafka_consumer_utils.py
src/datafun_streaming/kafka/kafka_admin_utils.py.
Kafka topic management helpers for streaming examples.
create_admin_client(settings)
¶
create_topic(admin, topic, *, num_partitions=1, replication_factor=1)
¶
Create a Kafka topic if it does not already exist.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
admin
|
AdminClient
|
An AdminClient instance. |
required |
topic
|
str
|
The topic name to create. |
required |
num_partitions
|
int
|
Number of partitions (default 1 for local dev). |
1
|
replication_factor
|
int
|
Replication factor (default 1 for local dev). |
1
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If topic creation fails. |
Source code in src/datafun_streaming/kafka/kafka_admin_utils.py
delete_topic(admin, topic)
¶
Delete a Kafka topic if it exists.
Deleting a topic removes all its messages. Run the producer again after deleting to repopulate the topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
admin
|
AdminClient
|
An AdminClient instance. |
required |
topic
|
str
|
The topic name to delete. |
required |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If topic deletion fails. |
Source code in src/datafun_streaming/kafka/kafka_admin_utils.py
get_topic_message_count(admin, topic, settings)
¶
Return the total number of messages available in a topic.
Sums the high-water offset across all partitions. This reflects the total messages ever produced to the topic, not the number of unread messages for a specific consumer group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
admin
|
AdminClient
|
An AdminClient instance. |
required |
topic
|
str
|
The topic name to inspect. |
required |
settings
|
KafkaSettings
|
KafkaSettings instance containing configuration. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Total message count across all partitions, or 0 if topic is empty. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If topic metadata cannot be retrieved. |
Source code in src/datafun_streaming/kafka/kafka_admin_utils.py
list_topics(admin)
¶
Return a sorted list of topic names currently in Kafka.
Retries several times to allow rdkafka's broker handshake to complete. On Windows, the handshake can take up to ~4 seconds after the AdminClient is created, which causes an immediate call to fail with a transport error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
admin
|
AdminClient
|
An AdminClient instance. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
A sorted list of topic name strings. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If Kafka is unreachable after all retries. |
Source code in src/datafun_streaming/kafka/kafka_admin_utils.py
topic_exists(admin, topic)
¶
src/datafun_streaming/kafka/kafka_connection_utils.py.
Kafka connection helpers for streaming examples.
verify_kafka_connection(settings)
¶
Verify that the Kafka bootstrap server is reachable.
Source code in src/datafun_streaming/kafka/kafka_connection_utils.py
src/datafun_streaming/kafka/errors.py.
Error messages for Kafka.
kafka_admin_failed_message(*, operation, topic, detail)
¶
Return help text for a failed Kafka admin operation.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation
|
str
|
The admin operation that failed. |
required |
topic
|
str
|
The Kafka topic involved in the operation. |
required |
detail
|
str
|
Additional details about the failure. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A help message with troubleshooting steps. |
Source code in src/datafun_streaming/kafka/errors.py
kafka_consume_failed_message(*, detail)
¶
Return help text for a Kafka consume failure.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
detail
|
str
|
Additional details about the failure. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A help message with troubleshooting steps. |
Source code in src/datafun_streaming/kafka/errors.py
kafka_delivery_failed_message(*, detail)
¶
Return help text for a Kafka delivery failure.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
detail
|
str
|
Additional details about the failure. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A help message with troubleshooting steps. |
Source code in src/datafun_streaming/kafka/errors.py
kafka_no_messages_message()
¶
Return help text when no Kafka messages are consumed.
Returns:
| Type | Description |
|---|---|
str
|
A help message with troubleshooting steps. |
Source code in src/datafun_streaming/kafka/errors.py
kafka_not_reachable_message(*, bootstrap_servers)
¶
Return help text for a Kafka connection failure.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bootstrap_servers
|
str
|
The Kafka bootstrap servers. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A help message with troubleshooting steps. |
Source code in src/datafun_streaming/kafka/errors.py
kafka_topic_empty_message(*, topic)
¶
Return help text when a Kafka topic exists but has no messages.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The Kafka topic. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A help message with troubleshooting steps. |
Source code in src/datafun_streaming/kafka/errors.py
kafka_topic_not_found_message(*, topic, bootstrap_servers)
¶
Return help text when a required Kafka topic does not exist.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The Kafka topic. |
required |
bootstrap_servers
|
str
|
The Kafka bootstrap servers. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A help message with troubleshooting steps. |
Source code in src/datafun_streaming/kafka/errors.py
Stats¶
stats/stats_utils.py.
Running statistics for streaming data.
Provides a RunningStats class that tracks count, sum, mean, min, and max for a stream of numeric values without storing the full history.
This is domain-agnostic: it works on any numeric field from any message. Pass it a value on each message and read the current statistics at any time.
Author: Denise Case Date: 2026-05
RunningStats
dataclass
¶
Accumulates running statistics for a stream of numeric values.
Updates incrementally (one value at a time) without storing history. Safe to use inside a message processing loop.
Do not use min and max as they would conflict with built-in functions. Access minimum and maximum values via the minimum and maximum attributes.
Attributes:
| Name | Type | Description |
|---|---|---|
count |
int
|
Number of values received so far. |
total |
float
|
Running sum of all values. |
mean |
float
|
Running mean of all values. |
minimum |
float
|
Minimum value seen so far. |
maximum |
float
|
Maximum value seen so far. |
Example
stats = RunningStats() for message in messages: stats.update(message["total"]) print(f"count={stats.count} mean={stats.mean:.2f}")
Source code in src/datafun_streaming/stats/stats_utils.py
is_empty
property
¶
Return True if no values have been received yet.
reset()
¶
Reset all statistics to their initial state.
Use this to start a new window or clear accumulated state.
Returns:
| Type | Description |
|---|---|
None
|
None. |
Source code in src/datafun_streaming/stats/stats_utils.py
summary()
¶
Return a formatted summary string for logging.
Returns:
| Type | Description |
|---|---|
str
|
A single-line string with all current statistics. |
Source code in src/datafun_streaming/stats/stats_utils.py
update(value)
¶
Update statistics with one new value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
float
|
The new numeric value to include. |
required |
Returns:
| Type | Description |
|---|---|
None
|
None. |
Source code in src/datafun_streaming/stats/stats_utils.py
Storage¶
src/datafun_streaming/storage/duckdb_utils.py.
DuckDB utilities for streaming data.
Provides functions to initialize, write to, query, and close a DuckDB database from a streaming consumer.
This is domain-agnostic: it works with any table name and any row dict. Tables are created automatically from the first row received. Schema is inferred from Python value types.
Author: Denise Case Date: 2026-05
close_db(conn)
¶
Close a DuckDB connection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conn
|
DuckDBPyConnection
|
An open DuckDB connection. |
required |
Returns:
| Type | Description |
|---|---|
None
|
None. |
connect_to_database(database_file_path)
¶
Connect to the DuckDB database file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
database_file_path
|
Path
|
Path to the DuckDB database file. |
required |
Returns:
| Type | Description |
|---|---|
DuckDBPyConnection
|
An open DuckDB connection. |
Source code in src/datafun_streaming/storage/duckdb_utils.py
init_db(path)
¶
Open or create a DuckDB database at the given path.
If the file already exists it is opened and reused. If it does not exist it is created.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path
|
File path for the DuckDB database (e.g. data/output/sales.duckdb). |
required |
Returns:
| Type | Description |
|---|---|
DuckDBPyConnection
|
An open DuckDB connection. |
Source code in src/datafun_streaming/storage/duckdb_utils.py
query_db(conn, sql, params=None)
¶
Execute a SQL query and return results as a list of dicts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conn
|
DuckDBPyConnection
|
Open DuckDB connection. |
required |
sql
|
str
|
SQL query string. Use ? for parameter placeholders. |
required |
params
|
list[Any] | None
|
Optional list of parameter values for placeholders. |
None
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
A list of row dicts. Empty list if no rows matched. |
Example
rows = query_db(conn, "SELECT * FROM sales WHERE region_id = ?", ["US-MO"])
Source code in src/datafun_streaming/storage/duckdb_utils.py
safe_table_name(table_name, allowed)
¶
Return table_name after confirming it is in the allowed list.
SQL identifiers (table names, column names) cannot use parameterized query placeholders. This allowlist check prevents accidental injection if a caller passes an unexpected string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
The table name to validate. |
required |
allowed
|
frozenset[str]
|
A frozenset of allowed table names. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The validated table name, unchanged. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If table_name is not in the allowed list. |
Source code in src/datafun_streaming/storage/duckdb_utils.py
upsert_row(conn, *, table, row, primary_key, allowed_tables)
¶
Insert or replace one row in a DuckDB table.
All arguments after the asterisk must be passed as keyword arguments.
Creates the table on the first call if it does not exist. On subsequent calls with the same primary key value, the existing row is replaced with the new values.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conn
|
DuckDBPyConnection
|
Open DuckDB connection. |
required |
table
|
str
|
Table name to write to. |
required |
row
|
dict[str, Any]
|
The row to insert or replace. |
required |
primary_key
|
str
|
The field name that uniquely identifies each row. |
required |
allowed_tables
|
frozenset[str]
|
A frozenset of allowed table names. |
required |
Returns:
| Type | Description |
|---|---|
None
|
None. |
Source code in src/datafun_streaming/storage/duckdb_utils.py
Visualization¶
src/datafun_streaming/visualization/chart_utils.py.
Chart utilities for streaming data.
Provides functions to create, update, and save a line chart that accumulates data points as messages are consumed.
Uses Plotly to generate an interactive HTML chart. The chart is updated in memory as each message arrives and saved to disk at the end of the consume loop (Section C4).
This is domain-agnostic: pass any numeric field and any label. The chart does not know what it is charting.
Author: Denise Case Date: 2026-05
StreamingChart
dataclass
¶
Holds chart state for a single line series.
Updated incrementally as messages arrive. Rendered to HTML when save_chart() is called.
Attributes:
| Name | Type | Description |
|---|---|---|
title |
str
|
Chart title shown at the top. |
x_label |
str
|
Label for the x-axis. |
y_label |
str
|
Label for the y-axis. |
x_values |
list[int | float | str]
|
Accumulated x-axis values (e.g. message count). |
y_values |
list[float]
|
Accumulated y-axis values (e.g. running total). |
Source code in src/datafun_streaming/visualization/chart_utils.py
is_empty
property
¶
Return True if no data points have been added yet.
init_chart(*, title, x_label, y_label)
¶
Create a new empty StreamingChart.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
title
|
str
|
Chart title. |
required |
x_label
|
str
|
Label for the x-axis. |
required |
y_label
|
str
|
Label for the y-axis. |
required |
Returns:
| Type | Description |
|---|---|
StreamingChart
|
An empty StreamingChart ready to receive data points. |
Source code in src/datafun_streaming/visualization/chart_utils.py
save_chart(chart, path)
¶
Render the chart to an interactive HTML file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chart
|
StreamingChart
|
The StreamingChart to render. |
required |
path
|
Path
|
Output file path. Must end in .html. |
required |
Returns:
| Type | Description |
|---|---|
None
|
None. |
Source code in src/datafun_streaming/visualization/chart_utils.py
update_chart(chart, row, *, x_field='_kafka_offset', y_field='total')
¶
Add one data point to the chart from a message row.
All arguments after the asterisk must be passed as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chart
|
StreamingChart
|
The StreamingChart to update. |
required |
row
|
dict[str, Any]
|
The enriched message row. |
required |
x_field
|
str
|
The row field to use as the x-axis value. Defaults to _kafka_offset (message sequence number). |
'_kafka_offset'
|
y_field
|
str
|
The row field to use as the y-axis value. Defaults to total (post-tax total price). |
'total'
|
Returns:
| Type | Description |
|---|---|
None
|
None. |