Skip to content

API Reference

Auto-generated code documentation.

analytics_project

data_prep

Data prep pipeline.

File: src/analytics_project/data_prep.py.

main

main() -> None

Process raw data.

Source code in src/analytics_project/data_prep.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def main() -> None:
    """Process raw data."""
    logger.info("Starting data preparation...")

    # Build explicit paths for each file under data/raw
    customer_path: pathlib.Path = RAW_DATA_DIR.joinpath("customers_data.csv")
    product_path: pathlib.Path = RAW_DATA_DIR.joinpath("products_data.csv")
    sales_path: pathlib.Path = RAW_DATA_DIR.joinpath("sales_data.csv")

    # Read them all into a df we can pass in to the DataScrubber
    df_customers: pd.DataFrame = read_and_log(customer_path)

    if not df_customers.empty:
        scrubber = DataScrubber(df_customers)
        scrubber.remove_duplicate_records()
        df_customers = scrubber.handle_missing_data(fill_value="N/A")
        logger.info("Cleaned customers data.")

    # Can you extend this to read and clean the other files?
    # First, name a variable for what is returned by read_and_log
    # Then, create a DataScrubber instance for that df
    # Finally, call the methods to clean the data
    read_and_log(product_path)
    read_and_log(sales_path)

    # At the very end, log that we're done.
    logger.info("Data preparation complete.")

read_and_log

read_and_log(path: Path) -> pd.DataFrame

Read a CSV at the given path into a DataFrame, with friendly logging.

We know reading a csv file can fail (the file might not exist, it could be corrupted), so we put the statement in a try block. It could fail due to a FileNotFoundError or other exceptions. If it succeeds, we log the shape of the DataFrame. If it fails, we log an error and return an empty DataFrame.

Source code in src/analytics_project/data_prep.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def read_and_log(path: pathlib.Path) -> pd.DataFrame:
    """Read a CSV at the given path into a DataFrame, with friendly logging.

    We know reading a csv file can fail
    (the file might not exist, it could be corrupted),
    so we put the statement in a try block.
    It could fail due to a FileNotFoundError or other exceptions.
    If it succeeds, we log the shape of the DataFrame.
    If it fails, we log an error and return an empty DataFrame.
    """
    try:
        # Typically, we log the start of a file read operation
        logger.info(f"Reading raw data from {path}.")
        df = pd.read_csv(path)
        # Typically, we log the successful completion of a file read operation
        logger.info(
            f"{path.name}: loaded DataFrame with shape {df.shape[0]} rows x {df.shape[1]} cols"
        )
        return df
    except FileNotFoundError:
        logger.error(f"File not found: {path}")
        return pd.DataFrame()
    except Exception as e:
        logger.error(f"Error reading {path}: {e}")
        return pd.DataFrame()

data_preparation

prepare_customers

scripts/data_preparation/prepare_customers.py.

This script reads customer data from the data/raw folder, cleans the data, and writes the cleaned version to the data/prepared folder.

Tasks: - Remove duplicates - Handle missing values - Remove outliers - Ensure consistent formatting

handle_missing_values
handle_missing_values(df: DataFrame) -> pd.DataFrame

Handle missing values by filling or dropping.

This logic is specific to the actual data and business rules.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with missing values handled.

Source code in src/analytics_project/data_preparation/prepare_customers.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame:
    """Handle missing values by filling or dropping.

    This logic is specific to the actual data and business rules.

    Args:
        df (pd.DataFrame): Input DataFrame.

    Returns:
        pd.DataFrame: DataFrame with missing values handled.
    """
    logger.info(f"FUNCTION START: handle_missing_values with dataframe shape={df.shape}")

    # Log missing values count before handling
    missing_before = df.isna().sum().sum()
    logger.info(f"Total missing values before handling: {missing_before}")

    # TODO: Fill or drop missing values based on business rules
    # Example:
    # df['CustomerName'].fillna('Unknown', inplace=True)
    # df.dropna(subset=['CustomerID'], inplace=True)

    # Log missing values count after handling
    missing_after = df.isna().sum().sum()
    logger.info(f"Total missing values after handling: {missing_after}")
    logger.info(f"{len(df)} records remaining after handling missing values.")
    return df
main
main() -> None

Process customer data from raw to prepared format.

Source code in src/analytics_project/data_preparation/prepare_customers.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def main() -> None:
    """Process customer data from raw to prepared format."""
    logger.info("==================================")
    logger.info("STARTING prepare_customers_data.py")
    logger.info("==================================")

    logger.info(f"Root         : {PROJECT_ROOT}")
    logger.info(f"data/raw     : {RAW_DATA_DIR}")
    logger.info(f"data/prepared: {PREPARED_DATA_DIR}")
    logger.info(f"scripts      : {SCRIPTS_DIR}")

    input_file = "customers_data.csv"
    output_file = "customers_prepared.csv"

    # Read raw data
    df = read_raw_data(input_file)

    # Record original shape
    original_shape = df.shape

    # Log initial dataframe information
    logger.info(f"Initial dataframe columns: {', '.join(df.columns.tolist())}")
    logger.info(f"Initial dataframe shape: {df.shape}")

    # Clean column names
    original_columns = df.columns.tolist()
    df.columns = df.columns.str.strip()

    # Log if any column names changed
    changed_columns = [
        f"{old} -> {new}"
        for old, new in zip(original_columns, df.columns, strict=False)
        if old != new
    ]
    if changed_columns:
        logger.info(f"Cleaned column names: {', '.join(changed_columns)}")

    # Remove duplicates
    df = remove_duplicates(df)

    # Handle missing values
    df = handle_missing_values(df)

    # Remove outliers
    df = remove_outliers(df)

    # Save prepared data
    save_prepared_data(df, output_file)

    logger.info("==================================")
    logger.info(f"Original shape: {df.shape}")
    logger.info(f"Cleaned shape:  {original_shape}")
    logger.info("==================================")
    logger.info("FINISHED prepare_customers_data.py")
    logger.info("==================================")
read_raw_data
read_raw_data(file_name: str) -> pd.DataFrame

Read raw data from CSV.

Source code in src/analytics_project/data_preparation/prepare_customers.py
54
55
56
57
58
59
60
61
62
63
64
65
def read_raw_data(file_name: str) -> pd.DataFrame:
    """Read raw data from CSV."""
    file_path: pathlib.Path = RAW_DATA_DIR.joinpath(file_name)
    try:
        logger.info(f"READING: {file_path}.")
        return pd.read_csv(file_path)
    except FileNotFoundError:
        logger.error(f"File not found: {file_path}")
        return pd.DataFrame()  # Return an empty DataFrame if the file is not found
    except Exception as e:
        logger.error(f"Error reading {file_path}: {e}")
        return pd.DataFrame()  # Return an empty DataFrame if any other error occurs
remove_duplicates
remove_duplicates(df: DataFrame) -> pd.DataFrame

Remove duplicate rows from the DataFrame.

How do you decide if a row is duplicated? Which do you keep? Which do you delete?

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with duplicates removed.

Source code in src/analytics_project/data_preparation/prepare_customers.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def remove_duplicates(df: pd.DataFrame) -> pd.DataFrame:
    """Remove duplicate rows from the DataFrame.

    How do you decide if a row is duplicated?
    Which do you keep? Which do you delete?

    Args:
        df (pd.DataFrame): Input DataFrame.

    Returns:
        pd.DataFrame: DataFrame with duplicates removed.
    """
    logger.info(f"FUNCTION START: remove_duplicates with dataframe shape={df.shape}")

    # Let's delegate this to the DataScrubber class
    # First, create an instance of the DataScrubber class
    # by passing in the dataframe as an argument.
    df_scrubber = DataScrubber(df)

    # Now, call the method on our instance to remove duplicates.
    # This method will return a new dataframe with duplicates removed.
    df_deduped = df_scrubber.remove_duplicate_records()

    logger.info(f"Original dataframe shape: {df.shape}")
    logger.info(f"Deduped  dataframe shape: {df_deduped.shape}")
    return df_deduped
remove_outliers
remove_outliers(df: DataFrame) -> pd.DataFrame

Remove outliers based on thresholds.

This logic is very specific to the actual data and business rules.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with outliers removed.

Source code in src/analytics_project/data_preparation/prepare_customers.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def remove_outliers(df: pd.DataFrame) -> pd.DataFrame:
    """Remove outliers based on thresholds.

    This logic is very specific to the actual data and business rules.

    Args:
        df (pd.DataFrame): Input DataFrame.

    Returns:
        pd.DataFrame: DataFrame with outliers removed.
    """
    logger.info(f"FUNCTION START: remove_outliers with dataframe shape={df.shape}")
    initial_count = len(df)

    # TODO: Define numeric columns and apply rules for outlier removal
    # Example:
    # df = df[(df['Age'] > 18) & (df['Age'] < 100)]

    removed_count = initial_count - len(df)
    logger.info(f"Removed {removed_count} outlier rows")
    logger.info(f"{len(df)} records remaining after removing outliers.")
    return df
save_prepared_data
save_prepared_data(df: DataFrame, file_name: str) -> None

Save cleaned data to CSV.

Parameters:

Name Type Description Default
df DataFrame

Cleaned DataFrame.

required
file_name str

Name of the output file.

required
Source code in src/analytics_project/data_preparation/prepare_customers.py
68
69
70
71
72
73
74
75
76
77
78
79
80
def save_prepared_data(df: pd.DataFrame, file_name: str) -> None:
    """Save cleaned data to CSV.

    Args:
        df (pd.DataFrame): Cleaned DataFrame.
        file_name (str): Name of the output file.
    """
    logger.info(
        f"FUNCTION START: save_prepared_data with file_name={file_name}, dataframe shape={df.shape}"
    )
    file_path = PREPARED_DATA_DIR.joinpath(file_name)
    df.to_csv(file_path, index=False)
    logger.info(f"Data saved to {file_path}")

prepare_products

scripts/data_preparation/prepare_products.py.

This script reads data from the data/raw folder, cleans the data, and writes the cleaned version to the data/prepared folder.

Tasks: - Remove duplicates - Handle missing values - Remove outliers - Ensure consistent formatting

handle_missing_values
handle_missing_values(df: DataFrame) -> pd.DataFrame

Handle missing values by filling or dropping.

This logic is specific to the actual data and business rules.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with missing values handled.

Source code in src/analytics_project/data_preparation/prepare_products.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame:
    """Handle missing values by filling or dropping.

    This logic is specific to the actual data and business rules.

    Args:
        df (pd.DataFrame): Input DataFrame.

    Returns:
        pd.DataFrame: DataFrame with missing values handled.
    """
    logger.info(f"FUNCTION START: handle_missing_values with dataframe shape={df.shape}")

    # Log missing values by column before handling
    # NA means missing or "not a number" - ask your AI for details
    missing_by_col = df.isna().sum()
    logger.info(f"Missing values by column before handling:\n{missing_by_col}")

    # TODO: OPTIONAL - We can implement appropriate missing value handling
    # specific to our data.
    # For example: Different strategies may be needed for different columns
    # USE YOUR COLUMN NAMES - these are just examples
    # df['product_name'].fillna('Unknown Product', inplace=True)
    # df['description'].fillna('', inplace=True)
    # df['price'].fillna(df['price'].median(), inplace=True)
    # df['category'].fillna(df['category'].mode()[0], inplace=True)
    # df.dropna(subset=['product_code'], inplace=True)  # Remove rows without product code

    # Log missing values by column after handling
    missing_after = df.isna().sum()
    logger.info(f"Missing values by column after handling:\n{missing_after}")
    logger.info(f"{len(df)} records remaining after handling missing values.")
    return df
main
main() -> None

Process product data from raw to prepared format.

Source code in src/analytics_project/data_preparation/prepare_products.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def main() -> None:
    """Process product data from raw to prepared format."""
    logger.info("==================================")
    logger.info("STARTING prepare_products_data.py")
    logger.info("==================================")

    logger.info(f"Root         : {PROJECT_ROOT}")
    logger.info(f"data/raw     : {RAW_DATA_DIR}")
    logger.info(f"data/prepared: {PREPARED_DATA_DIR}")
    logger.info(f"scripts      : {SCRIPTS_DIR}")

    input_file = "products_data.csv"
    output_file = "products_prepared.csv"

    # Read raw data
    df = read_raw_data(input_file)

    # Read raw data
    df = read_raw_data(input_file)

    # Record original shape
    original_shape = df.shape

    # Log initial dataframe information
    logger.info(f"Initial dataframe columns: {', '.join(df.columns.tolist())}")
    logger.info(f"Initial dataframe shape: {df.shape}")

    # Clean column names
    original_columns = df.columns.tolist()
    df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")

    # Log if any column names changed
    changed_columns = [
        f"{old} -> {new}"
        for old, new in zip(original_columns, df.columns, strict=True)
        if old != new
    ]
    if changed_columns:
        logger.info(f"Cleaned column names: {', '.join(changed_columns)}")

    # Remove duplicates
    df = remove_duplicates(df)

    # Handle missing values
    df = handle_missing_values(df)

    # TODO:Remove outliers
    df = remove_outliers(df)

    # TODO: Validate data
    df = validate_data(df)

    # TODO: Standardize formats
    df = standardize_formats(df)

    # Save prepared data
    save_prepared_data(df, output_file)

    logger.info("==================================")
    logger.info(f"Original shape: {df.shape}")
    logger.info(f"Cleaned shape:  {original_shape}")
    logger.info("==================================")
    logger.info("FINISHED prepare_products_data.py")
    logger.info("==================================")
read_raw_data
read_raw_data(file_name: str) -> pd.DataFrame

Read raw data from CSV.

Parameters:

Name Type Description Default
file_name str

Name of the CSV file to read.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Loaded DataFrame.

Source code in src/analytics_project/data_preparation/prepare_products.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def read_raw_data(file_name: str) -> pd.DataFrame:
    """Read raw data from CSV.

    Args:
        file_name (str): Name of the CSV file to read.

    Returns:
        pd.DataFrame: Loaded DataFrame.
    """
    logger.info(f"FUNCTION START: read_raw_data with file_name={file_name}")
    file_path = RAW_DATA_DIR.joinpath(file_name)
    logger.info(f"Reading data from {file_path}")
    df = pd.read_csv(file_path)
    logger.info(f"Loaded dataframe with {len(df)} rows and {len(df.columns)} columns")

    # TODO: OPTIONAL Add data profiling here to understand the dataset
    # Suggestion: Log the datatypes of each column and the number of unique values
    # Example:
    # logger.info(f"Column datatypes: \n{df.dtypes}")
    # logger.info(f"Number of unique values: \n{df.nunique()}")

    return df
remove_duplicates
remove_duplicates(df: DataFrame) -> pd.DataFrame

Remove duplicate rows from the DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with duplicates removed.

Source code in src/analytics_project/data_preparation/prepare_products.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def remove_duplicates(df: pd.DataFrame) -> pd.DataFrame:
    """Remove duplicate rows from the DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame.

    Returns:
        pd.DataFrame: DataFrame with duplicates removed.
    """
    logger.info(f"FUNCTION START: remove_duplicates with dataframe shape={df.shape}")
    initial_count = len(df)

    # TODO: Consider which columns should be used to identify duplicates
    # Example: For products, SKU or product code is typically unique
    # So we could do something like this:
    # df = df.drop_duplicates(subset=['product_code'])
    df = df.drop_duplicates()

    removed_count = initial_count - len(df)
    logger.info(f"Removed {removed_count} duplicate rows")
    logger.info(f"{len(df)} records remaining after removing duplicates.")
    return df
remove_outliers
remove_outliers(df: DataFrame) -> pd.DataFrame

Remove outliers based on thresholds.

This logic is very specific to the actual data and business rules.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with outliers removed.

Source code in src/analytics_project/data_preparation/prepare_products.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def remove_outliers(df: pd.DataFrame) -> pd.DataFrame:
    """Remove outliers based on thresholds.

    This logic is very specific to the actual data and business rules.

    Args:
        df (pd.DataFrame): Input DataFrame.

    Returns:
        pd.DataFrame: DataFrame with outliers removed.
    """
    logger.info(f"FUNCTION START: remove_outliers with dataframe shape={df.shape}")
    initial_count = len(df)

    # TODO: Identify numeric columns that might have outliers.
    # Recommended - just use ranges based on reasonable data
    # People should not be 22 feet tall, etc.
    # OPTIONAL ADVANCED: Use IQR method to identify outliers in numeric columns
    # Example:
    # for col in ['price', 'weight', 'length', 'width', 'height']:
    #     if col in df.columns and df[col].dtype in ['int64', 'float64']:
    #         Q1 = df[col].quantile(0.25)
    #         Q3 = df[col].quantile(0.75)
    #         IQR = Q3 - Q1
    #         lower_bound = Q1 - 1.5 * IQR
    #         upper_bound = Q3 + 1.5 * IQR
    #         df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
    #         logger.info(f"Applied outlier removal to {col}: bounds [{lower_bound}, {upper_bound}]")

    removed_count = initial_count - len(df)
    logger.info(f"Removed {removed_count} outlier rows")
    logger.info(f"{len(df)} records remaining after removing outliers.")
    return df
save_prepared_data
save_prepared_data(df: DataFrame, file_name: str) -> None

Save cleaned data to CSV.

Parameters:

Name Type Description Default
df DataFrame

Cleaned DataFrame.

required
file_name str

Name of the output file.

required
Source code in src/analytics_project/data_preparation/prepare_products.py
77
78
79
80
81
82
83
84
85
86
87
88
89
def save_prepared_data(df: pd.DataFrame, file_name: str) -> None:
    """Save cleaned data to CSV.

    Args:
        df (pd.DataFrame): Cleaned DataFrame.
        file_name (str): Name of the output file.
    """
    logger.info(
        f"FUNCTION START: save_prepared_data with file_name={file_name}, dataframe shape={df.shape}"
    )
    file_path = PREPARED_DATA_DIR.joinpath(file_name)
    df.to_csv(file_path, index=False)
    logger.info(f"Data saved to {file_path}")
standardize_formats
standardize_formats(df: DataFrame) -> pd.DataFrame

Standardize the formatting of various columns.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with standardized formatting.

Source code in src/analytics_project/data_preparation/prepare_products.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def standardize_formats(df: pd.DataFrame) -> pd.DataFrame:
    """Standardize the formatting of various columns.

    Args:
        df (pd.DataFrame): Input DataFrame.

    Returns:
        pd.DataFrame: DataFrame with standardized formatting.
    """
    logger.info(f"FUNCTION START: standardize_formats with dataframe shape={df.shape}")

    # TODO: OPTIONAL ADVANCED Implement standardization for product data
    # Suggestion: Consider standardizing text fields, units, and categorical variables
    # Examples (update based on your column names and types):
    # df['product_name'] = df['product_name'].str.title()  # Title case for product names
    # df['category'] = df['category'].str.lower()  # Lowercase for categories
    # df['price'] = df['price'].round(2)  # Round prices to 2 decimal places
    # df['weight_unit'] = df['weight_unit'].str.upper()  # Uppercase units

    logger.info("Completed standardizing formats")
    return df
validate_data
validate_data(df: DataFrame) -> pd.DataFrame

Validate data against business rules.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Validated DataFrame.

Source code in src/analytics_project/data_preparation/prepare_products.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
    """Validate data against business rules.

    Args:
        df (pd.DataFrame): Input DataFrame.

    Returns:
        pd.DataFrame: Validated DataFrame.
    """
    logger.info(f"FUNCTION START: validate_data with dataframe shape={df.shape}")

    # TODO: Implement data validation rules specific to products
    # Suggestion: Check for valid values in critical fields
    # Example:
    # invalid_prices = df[df['price'] < 0].shape[0]
    # logger.info(f"Found {invalid_prices} products with negative prices")
    # df = df[df['price'] >= 0]

    logger.info("Data validation complete")
    return df

prepare_sales

scripts/data_preparation/prepare_sales.py.

This script reads data from the data/raw folder, cleans the data, and writes the cleaned version to the data/prepared folder.

Tasks: - Remove duplicates - Handle missing values - Remove outliers - Ensure consistent formatting

main
main() -> None

Process sales data from raw to prepared format.

Source code in src/analytics_project/data_preparation/prepare_sales.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def main() -> None:
    """Process sales data from raw to prepared format."""
    logger.info("==================================")
    logger.info("STARTING prepare_sales_data.py")
    logger.info("==================================")

    logger.info(f"Root         : {PROJECT_ROOT}")
    logger.info(f"data/raw     : {RAW_DATA_DIR}")
    logger.info(f"data/prepared: {PREPARED_DATA_DIR}")
    logger.info(f"scripts      : {SCRIPTS_DIR}")

    input_file = "sales_data.csv"
    # TODO: Uncomment after implementing sales data preparation
    # output_file = "sales_prepared.csv"

    # Read raw data
    df = read_raw_data(input_file)

    # Record original shape
    original_shape = df.shape

    # Log initial dataframe information
    logger.info(f"Initial dataframe columns: {', '.join(df.columns.tolist())}")
    logger.info(f"Initial dataframe shape: {df.shape}")

    # Clean column names
    original_columns = df.columns.tolist()
    df.columns = df.columns.str.strip()

    # Log if any column names changed
    changed_columns = [
        f"{old} -> {new}"
        for old, new in zip(original_columns, df.columns, strict=True)
        if old != new
    ]
    if changed_columns:
        logger.info(f"Cleaned column names: {', '.join(changed_columns)}")

    # TODO: Remove duplicates

    # TODO:Handle missing values

    # TODO:Remove outliers

    # TODO:Save prepared data

    logger.info("==================================")
    logger.info(f"Original shape: {df.shape}")
    logger.info(f"Cleaned shape:  {original_shape}")
    logger.info("==================================")
    logger.info("FINISHED prepare_sales_data.py")
    logger.info("==================================")
read_raw_data
read_raw_data(file_name: str) -> pd.DataFrame

Read raw data from CSV.

Parameters:

Name Type Description Default
file_name str

Name of the CSV file to read.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Loaded DataFrame.

Source code in src/analytics_project/data_preparation/prepare_sales.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def read_raw_data(file_name: str) -> pd.DataFrame:
    """Read raw data from CSV.

    Args:
        file_name (str): Name of the CSV file to read.

    Returns:
        pd.DataFrame: Loaded DataFrame.
    """
    logger.info(f"FUNCTION START: read_raw_data with file_name={file_name}")
    file_path = RAW_DATA_DIR.joinpath(file_name)
    logger.info(f"Reading data from {file_path}")
    df = pd.read_csv(file_path)
    logger.info(f"Loaded dataframe with {len(df)} rows and {len(df.columns)} columns")

    # TODO: OPTIONAL Add data profiling here to understand the dataset
    # Suggestion: Log the datatypes of each column and the number of unique values
    # Example:
    # logger.info(f"Column datatypes: \n{df.dtypes}")
    # logger.info(f"Number of unique values: \n{df.nunique()}")

    return df

data_scrubber

data_scrubber.py.

Reusable utility class for performing common data cleaning and preparation tasks on a pandas DataFrame.

This class provides methods for: - Checking data consistency - Removing duplicates - Handling missing values - Filtering outliers - Renaming and reordering columns - Formatting strings - Parsing date fields

Use this class to perform similar cleaning operations across multiple files. You are not required to use this class, but it shows how we can organize reusable data cleaning logic - or you can use the logic examples in your own code.

Example

from .data_scrubber import DataScrubber scrubber = DataScrubber(df) df = scrubber.remove_duplicate_records().handle_missing_data(fill_value="N/A")

DataScrubber

A utility class for performing common data cleaning and preparation tasks on pandas DataFrames.

This class provides methods for checking data consistency, removing duplicates, handling missing values, filtering outliers, renaming and reordering columns, formatting strings, and parsing date fields.

Attributes

df : pd.DataFrame The DataFrame to be scrubbed and cleaned.

Methods

check_data_consistency_before_cleaning() -> dict Check data consistency before cleaning by calculating counts of null and duplicate entries. check_data_consistency_after_cleaning() -> dict Check data consistency after cleaning to ensure there are no null or duplicate entries. remove_duplicate_records() -> pd.DataFrame Remove duplicate rows from the DataFrame. handle_missing_data(drop: bool = False, fill_value = None) -> pd.DataFrame Handle missing data in the DataFrame by dropping or filling values.

Source code in src/analytics_project/data_scrubber.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
class DataScrubber:
    """A utility class for performing common data cleaning and preparation tasks on pandas DataFrames.

    This class provides methods for checking data consistency, removing duplicates,
    handling missing values, filtering outliers, renaming and reordering columns,
    formatting strings, and parsing date fields.

    Attributes
    ----------
    df : pd.DataFrame
        The DataFrame to be scrubbed and cleaned.

    Methods
    -------
    check_data_consistency_before_cleaning() -> dict
        Check data consistency before cleaning by calculating counts of null and duplicate entries.
    check_data_consistency_after_cleaning() -> dict
        Check data consistency after cleaning to ensure there are no null or duplicate entries.
    remove_duplicate_records() -> pd.DataFrame
        Remove duplicate rows from the DataFrame.
    handle_missing_data(drop: bool = False, fill_value = None) -> pd.DataFrame
        Handle missing data in the DataFrame by dropping or filling values.
    """

    def __init__(self, df: pd.DataFrame):
        """Initialize the DataScrubber with a DataFrame.

        Args:
            df (pd.DataFrame): The DataFrame to be scrubbed.
        """
        self.df = df

    def check_data_consistency_before_cleaning(self) -> dict[str, pd.Series | int]:
        """Check data consistency before cleaning by calculating counts of null and duplicate entries.

        Returns:
            dict: Dictionary with counts of null values and duplicate rows.
        """
        null_counts = self.df.isnull().sum()
        duplicate_count = self.df.duplicated().sum()
        return {"null_counts": null_counts, "duplicate_count": duplicate_count}

    def check_data_consistency_after_cleaning(self) -> dict[str, pd.Series | int]:
        """Check data consistency after cleaning to ensure there are no null or duplicate entries.

        Returns:
            dict: Dictionary with counts of null values and duplicate rows, expected to be zero for each.
        """
        null_counts = self.df.isnull().sum()
        duplicate_count = self.df.duplicated().sum()
        assert null_counts.sum() == 0, "Data still contains null values after cleaning."
        assert duplicate_count == 0, "Data still contains duplicate records after cleaning."
        return {"null_counts": null_counts, "duplicate_count": duplicate_count}

    def convert_column_to_new_data_type(self, column: str, new_type: type) -> pd.DataFrame:
        """Convert a specified column to a new data type.

        Args:
            column: Name of the column to convert.
            new_type: The target data type (e.g., 'int', 'float', 'str').

        Returns:
            pd.DataFrame: Updated DataFrame with the column type converted.

        Raises:
            ValueError: If the specified column not found in the DataFrame.
        """
        try:
            self.df[column] = self.df[column].astype(new_type)
            return self.df
        except KeyError as exc:
            raise ValueError(f"Column name '{column}' not found in the DataFrame.") from exc

    def drop_columns(self, columns: list[str]) -> pd.DataFrame:
        """Drop specified columns from the DataFrame.

        Parameters
        ----------
        columns : list[str]
            List of column names to drop.

        Returns
        -------
            pd.DataFrame: Updated DataFrame with specified columns removed.

        Raises
        ------
            ValueError: If a specified column is not found in the DataFrame.
        """
        for column in columns:
            if column not in self.df.columns:
                raise ValueError(f"Column name '{column}' not found in the DataFrame.")
        self.df = self.df.drop(columns=columns)
        return self.df

    def filter_column_outliers(
        self, column: str, lower_bound: float | int, upper_bound: float | int
    ) -> pd.DataFrame:
        """Filter outliers in a specified column based on lower and upper bounds.

        Args:
            column: Name of the column to filter for outliers.
            lower_bound: Lower threshold for outlier filtering.
            upper_bound: Upper threshold for outlier filtering.

        Returns:
            pd.DataFrame: Updated DataFrame with outliers filtered out.

        Raises:
            ValueError: If the specified column not found in the DataFrame.
        """
        try:
            self.df = self.df[(self.df[column] >= lower_bound) & (self.df[column] <= upper_bound)]
            return self.df
        except KeyError as exc:
            raise ValueError(f"Column name '{column}' not found in the DataFrame.") from exc

    def format_column_strings_to_lower_and_trim(self, column: str) -> pd.DataFrame:
        """Format strings in a specified column by converting to lowercase and trimming whitespace.

        Parameters
        ----------
        column : str
            Name of the column to format.

        Returns
        -------
            pd.DataFrame: Updated DataFrame with formatted string column.

        Raises
        ------
            ValueError: If the specified column not found in the DataFrame.
        """
        try:
            self.df[column] = self.df[column].str.lower().str.strip()
            return self.df
        except KeyError as exc:
            raise ValueError(f"Column name '{column}' not found in the DataFrame.") from exc

    def format_column_strings_to_upper_and_trim(self, column: str) -> pd.DataFrame:
        """Format strings in a specified column by converting to uppercase and trimming whitespace.

        Parameters
        ----------
        column : str
            Name of the column to format.

        Returns
        -------
            pd.DataFrame: Updated DataFrame with formatted string column.

        Raises
        ------
            ValueError: If the specified column not found in the DataFrame.
        """
        try:
            # TODO: Fix the following logic to call str.upper() and str.strip() on the given column
            # HINT: See previous function for an example
            self.df[column] = self.df[column]
            return self.df
        except KeyError as exc:
            raise ValueError(f"Column name '{column}' not found in the DataFrame.") from exc

    def handle_missing_data(
        self, drop: bool = False, fill_value: None | float | int | str = None
    ) -> pd.DataFrame:
        """Handle missing data in the DataFrame.

        Parameters
        ----------
        drop : bool, optional
            If True, drop rows with missing values. Default is False.
        fill_value : None | float | int | str, optional
            Value to fill in for missing entries if drop is False.

        Returns
        -------
            pd.DataFrame: Updated DataFrame with missing data handled.
        """
        if drop:
            self.df = self.df.dropna()
        elif fill_value is not None:
            self.df = self.df.fillna(fill_value)
        return self.df

    def inspect_data(self) -> tuple[str, str]:
        """Inspect the data by providing DataFrame information and summary statistics.

        Returns:
            tuple: (info_str, describe_str), where `info_str` is a string representation of DataFrame.info()
                   and `describe_str` is a string representation of DataFrame.describe().
        """
        buffer = io.StringIO()
        self.df.info(buf=buffer)
        info_str = buffer.getvalue()  # Retrieve the string content of the buffer

        # Capture the describe output as a string
        describe_str = (
            self.df.describe().to_string()
        )  # Convert DataFrame.describe() output to a string
        return info_str, describe_str

    def parse_dates_to_add_standard_datetime(self, column: str) -> pd.DataFrame:
        """Parse a specified column as datetime format and add it as a new column named 'StandardDateTime'.

        Parameters
        ----------
        column : str
            Name of the column to parse as datetime.

        Returns
        -------
            pd.DataFrame: Updated DataFrame with a new 'StandardDateTime' column containing parsed datetime values.

        Raises
        ------
            ValueError: If the specified column not found in the DataFrame.
        """
        try:
            self.df["StandardDateTime"] = pd.to_datetime(self.df[column])
            return self.df
        except KeyError as exc:
            raise ValueError(f"Column name '{column}' not found in the DataFrame.") from exc

    def remove_duplicate_records(self) -> pd.DataFrame:
        """Remove duplicate rows from the DataFrame.

        Returns:
            pd.DataFrame: Updated DataFrame with duplicates removed.

        """
        self.df = self.df.drop_duplicates()
        return self.df

    def rename_columns(self, column_mapping: dict[str, str]) -> pd.DataFrame:
        """Rename columns in the DataFrame based on a provided mapping.

        Parameters
        ----------
        column_mapping : dict[str, str]
            Dictionary where keys are old column names and values are new names.

        Returns
        -------
            pd.DataFrame: Updated DataFrame with renamed columns.

        Raises
        ------
            ValueError: If a specified column is not found in the DataFrame.
        """
        for old_name, _new_name in column_mapping.items():
            if old_name not in self.df.columns:
                raise ValueError(f"Column '{old_name}' not found in the DataFrame.")

        self.df = self.df.rename(columns=column_mapping)
        return self.df

    def reorder_columns(self, columns: list[str]) -> pd.DataFrame:
        """Reorder columns in the DataFrame based on the specified order.

        Parameters
        ----------
        columns : list[str]
            List of column names in the desired order.

        Returns
        -------
            pd.DataFrame: Updated DataFrame with reordered columns.

        Raises
        ------
            ValueError: If a specified column is not found in the DataFrame.
        """
        for column in columns:
            if column not in self.df.columns:
                raise ValueError(f"Column name '{column}' not found in the DataFrame.")
        self.df = self.df[columns]
        return self.df
__init__
__init__(df: DataFrame)

Initialize the DataScrubber with a DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to be scrubbed.

required
Source code in src/analytics_project/data_scrubber.py
55
56
57
58
59
60
61
def __init__(self, df: pd.DataFrame):
    """Initialize the DataScrubber with a DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame to be scrubbed.
    """
    self.df = df
check_data_consistency_after_cleaning
check_data_consistency_after_cleaning() -> dict[
    str, pd.Series | int
]

Check data consistency after cleaning to ensure there are no null or duplicate entries.

Returns:

Name Type Description
dict dict[str, Series | int]

Dictionary with counts of null values and duplicate rows, expected to be zero for each.

Source code in src/analytics_project/data_scrubber.py
73
74
75
76
77
78
79
80
81
82
83
def check_data_consistency_after_cleaning(self) -> dict[str, pd.Series | int]:
    """Check data consistency after cleaning to ensure there are no null or duplicate entries.

    Returns:
        dict: Dictionary with counts of null values and duplicate rows, expected to be zero for each.
    """
    null_counts = self.df.isnull().sum()
    duplicate_count = self.df.duplicated().sum()
    assert null_counts.sum() == 0, "Data still contains null values after cleaning."
    assert duplicate_count == 0, "Data still contains duplicate records after cleaning."
    return {"null_counts": null_counts, "duplicate_count": duplicate_count}
check_data_consistency_before_cleaning
check_data_consistency_before_cleaning() -> dict[
    str, pd.Series | int
]

Check data consistency before cleaning by calculating counts of null and duplicate entries.

Returns:

Name Type Description
dict dict[str, Series | int]

Dictionary with counts of null values and duplicate rows.

Source code in src/analytics_project/data_scrubber.py
63
64
65
66
67
68
69
70
71
def check_data_consistency_before_cleaning(self) -> dict[str, pd.Series | int]:
    """Check data consistency before cleaning by calculating counts of null and duplicate entries.

    Returns:
        dict: Dictionary with counts of null values and duplicate rows.
    """
    null_counts = self.df.isnull().sum()
    duplicate_count = self.df.duplicated().sum()
    return {"null_counts": null_counts, "duplicate_count": duplicate_count}
convert_column_to_new_data_type
convert_column_to_new_data_type(
    column: str, new_type: type
) -> pd.DataFrame

Convert a specified column to a new data type.

Parameters:

Name Type Description Default
column str

Name of the column to convert.

required
new_type type

The target data type (e.g., 'int', 'float', 'str').

required

Returns:

Type Description
DataFrame

pd.DataFrame: Updated DataFrame with the column type converted.

Raises:

Type Description
ValueError

If the specified column not found in the DataFrame.

Source code in src/analytics_project/data_scrubber.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def convert_column_to_new_data_type(self, column: str, new_type: type) -> pd.DataFrame:
    """Convert a specified column to a new data type.

    Args:
        column: Name of the column to convert.
        new_type: The target data type (e.g., 'int', 'float', 'str').

    Returns:
        pd.DataFrame: Updated DataFrame with the column type converted.

    Raises:
        ValueError: If the specified column not found in the DataFrame.
    """
    try:
        self.df[column] = self.df[column].astype(new_type)
        return self.df
    except KeyError as exc:
        raise ValueError(f"Column name '{column}' not found in the DataFrame.") from exc
drop_columns
drop_columns(columns: list[str]) -> pd.DataFrame

Drop specified columns from the DataFrame.

Parameters

columns : list[str] List of column names to drop.

Returns
pd.DataFrame: Updated DataFrame with specified columns removed.
Raises
ValueError: If a specified column is not found in the DataFrame.
Source code in src/analytics_project/data_scrubber.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def drop_columns(self, columns: list[str]) -> pd.DataFrame:
    """Drop specified columns from the DataFrame.

    Parameters
    ----------
    columns : list[str]
        List of column names to drop.

    Returns
    -------
        pd.DataFrame: Updated DataFrame with specified columns removed.

    Raises
    ------
        ValueError: If a specified column is not found in the DataFrame.
    """
    for column in columns:
        if column not in self.df.columns:
            raise ValueError(f"Column name '{column}' not found in the DataFrame.")
    self.df = self.df.drop(columns=columns)
    return self.df
filter_column_outliers
filter_column_outliers(
    column: str,
    lower_bound: float | int,
    upper_bound: float | int,
) -> pd.DataFrame

Filter outliers in a specified column based on lower and upper bounds.

Parameters:

Name Type Description Default
column str

Name of the column to filter for outliers.

required
lower_bound float | int

Lower threshold for outlier filtering.

required
upper_bound float | int

Upper threshold for outlier filtering.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Updated DataFrame with outliers filtered out.

Raises:

Type Description
ValueError

If the specified column not found in the DataFrame.

Source code in src/analytics_project/data_scrubber.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def filter_column_outliers(
    self, column: str, lower_bound: float | int, upper_bound: float | int
) -> pd.DataFrame:
    """Filter outliers in a specified column based on lower and upper bounds.

    Args:
        column: Name of the column to filter for outliers.
        lower_bound: Lower threshold for outlier filtering.
        upper_bound: Upper threshold for outlier filtering.

    Returns:
        pd.DataFrame: Updated DataFrame with outliers filtered out.

    Raises:
        ValueError: If the specified column not found in the DataFrame.
    """
    try:
        self.df = self.df[(self.df[column] >= lower_bound) & (self.df[column] <= upper_bound)]
        return self.df
    except KeyError as exc:
        raise ValueError(f"Column name '{column}' not found in the DataFrame.") from exc
format_column_strings_to_lower_and_trim
format_column_strings_to_lower_and_trim(
    column: str,
) -> pd.DataFrame

Format strings in a specified column by converting to lowercase and trimming whitespace.

Parameters

column : str Name of the column to format.

Returns
pd.DataFrame: Updated DataFrame with formatted string column.
Raises
ValueError: If the specified column not found in the DataFrame.
Source code in src/analytics_project/data_scrubber.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def format_column_strings_to_lower_and_trim(self, column: str) -> pd.DataFrame:
    """Format strings in a specified column by converting to lowercase and trimming whitespace.

    Parameters
    ----------
    column : str
        Name of the column to format.

    Returns
    -------
        pd.DataFrame: Updated DataFrame with formatted string column.

    Raises
    ------
        ValueError: If the specified column not found in the DataFrame.
    """
    try:
        self.df[column] = self.df[column].str.lower().str.strip()
        return self.df
    except KeyError as exc:
        raise ValueError(f"Column name '{column}' not found in the DataFrame.") from exc
format_column_strings_to_upper_and_trim
format_column_strings_to_upper_and_trim(
    column: str,
) -> pd.DataFrame

Format strings in a specified column by converting to uppercase and trimming whitespace.

Parameters

column : str Name of the column to format.

Returns
pd.DataFrame: Updated DataFrame with formatted string column.
Raises
ValueError: If the specified column not found in the DataFrame.
Source code in src/analytics_project/data_scrubber.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def format_column_strings_to_upper_and_trim(self, column: str) -> pd.DataFrame:
    """Format strings in a specified column by converting to uppercase and trimming whitespace.

    Parameters
    ----------
    column : str
        Name of the column to format.

    Returns
    -------
        pd.DataFrame: Updated DataFrame with formatted string column.

    Raises
    ------
        ValueError: If the specified column not found in the DataFrame.
    """
    try:
        # TODO: Fix the following logic to call str.upper() and str.strip() on the given column
        # HINT: See previous function for an example
        self.df[column] = self.df[column]
        return self.df
    except KeyError as exc:
        raise ValueError(f"Column name '{column}' not found in the DataFrame.") from exc
handle_missing_data
handle_missing_data(
    drop: bool = False,
    fill_value: None | float | int | str = None,
) -> pd.DataFrame

Handle missing data in the DataFrame.

Parameters

drop : bool, optional If True, drop rows with missing values. Default is False. fill_value : None | float | int | str, optional Value to fill in for missing entries if drop is False.

Returns
pd.DataFrame: Updated DataFrame with missing data handled.
Source code in src/analytics_project/data_scrubber.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def handle_missing_data(
    self, drop: bool = False, fill_value: None | float | int | str = None
) -> pd.DataFrame:
    """Handle missing data in the DataFrame.

    Parameters
    ----------
    drop : bool, optional
        If True, drop rows with missing values. Default is False.
    fill_value : None | float | int | str, optional
        Value to fill in for missing entries if drop is False.

    Returns
    -------
        pd.DataFrame: Updated DataFrame with missing data handled.
    """
    if drop:
        self.df = self.df.dropna()
    elif fill_value is not None:
        self.df = self.df.fillna(fill_value)
    return self.df
inspect_data
inspect_data() -> tuple[str, str]

Inspect the data by providing DataFrame information and summary statistics.

Returns:

Name Type Description
tuple tuple[str, str]

(info_str, describe_str), where info_str is a string representation of DataFrame.info() and describe_str is a string representation of DataFrame.describe().

Source code in src/analytics_project/data_scrubber.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def inspect_data(self) -> tuple[str, str]:
    """Inspect the data by providing DataFrame information and summary statistics.

    Returns:
        tuple: (info_str, describe_str), where `info_str` is a string representation of DataFrame.info()
               and `describe_str` is a string representation of DataFrame.describe().
    """
    buffer = io.StringIO()
    self.df.info(buf=buffer)
    info_str = buffer.getvalue()  # Retrieve the string content of the buffer

    # Capture the describe output as a string
    describe_str = (
        self.df.describe().to_string()
    )  # Convert DataFrame.describe() output to a string
    return info_str, describe_str
parse_dates_to_add_standard_datetime
parse_dates_to_add_standard_datetime(
    column: str,
) -> pd.DataFrame

Parse a specified column as datetime format and add it as a new column named 'StandardDateTime'.

Parameters

column : str Name of the column to parse as datetime.

Returns
pd.DataFrame: Updated DataFrame with a new 'StandardDateTime' column containing parsed datetime values.
Raises
ValueError: If the specified column not found in the DataFrame.
Source code in src/analytics_project/data_scrubber.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def parse_dates_to_add_standard_datetime(self, column: str) -> pd.DataFrame:
    """Parse a specified column as datetime format and add it as a new column named 'StandardDateTime'.

    Parameters
    ----------
    column : str
        Name of the column to parse as datetime.

    Returns
    -------
        pd.DataFrame: Updated DataFrame with a new 'StandardDateTime' column containing parsed datetime values.

    Raises
    ------
        ValueError: If the specified column not found in the DataFrame.
    """
    try:
        self.df["StandardDateTime"] = pd.to_datetime(self.df[column])
        return self.df
    except KeyError as exc:
        raise ValueError(f"Column name '{column}' not found in the DataFrame.") from exc
remove_duplicate_records
remove_duplicate_records() -> pd.DataFrame

Remove duplicate rows from the DataFrame.

Returns:

Type Description
DataFrame

pd.DataFrame: Updated DataFrame with duplicates removed.

Source code in src/analytics_project/data_scrubber.py
255
256
257
258
259
260
261
262
263
def remove_duplicate_records(self) -> pd.DataFrame:
    """Remove duplicate rows from the DataFrame.

    Returns:
        pd.DataFrame: Updated DataFrame with duplicates removed.

    """
    self.df = self.df.drop_duplicates()
    return self.df
rename_columns
rename_columns(
    column_mapping: dict[str, str],
) -> pd.DataFrame

Rename columns in the DataFrame based on a provided mapping.

Parameters

column_mapping : dict[str, str] Dictionary where keys are old column names and values are new names.

Returns
pd.DataFrame: Updated DataFrame with renamed columns.
Raises
ValueError: If a specified column is not found in the DataFrame.
Source code in src/analytics_project/data_scrubber.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def rename_columns(self, column_mapping: dict[str, str]) -> pd.DataFrame:
    """Rename columns in the DataFrame based on a provided mapping.

    Parameters
    ----------
    column_mapping : dict[str, str]
        Dictionary where keys are old column names and values are new names.

    Returns
    -------
        pd.DataFrame: Updated DataFrame with renamed columns.

    Raises
    ------
        ValueError: If a specified column is not found in the DataFrame.
    """
    for old_name, _new_name in column_mapping.items():
        if old_name not in self.df.columns:
            raise ValueError(f"Column '{old_name}' not found in the DataFrame.")

    self.df = self.df.rename(columns=column_mapping)
    return self.df
reorder_columns
reorder_columns(columns: list[str]) -> pd.DataFrame

Reorder columns in the DataFrame based on the specified order.

Parameters

columns : list[str] List of column names in the desired order.

Returns
pd.DataFrame: Updated DataFrame with reordered columns.
Raises
ValueError: If a specified column is not found in the DataFrame.
Source code in src/analytics_project/data_scrubber.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def reorder_columns(self, columns: list[str]) -> pd.DataFrame:
    """Reorder columns in the DataFrame based on the specified order.

    Parameters
    ----------
    columns : list[str]
        List of column names in the desired order.

    Returns
    -------
        pd.DataFrame: Updated DataFrame with reordered columns.

    Raises
    ------
        ValueError: If a specified column is not found in the DataFrame.
    """
    for column in columns:
        if column not in self.df.columns:
            raise ValueError(f"Column name '{column}' not found in the DataFrame.")
    self.df = self.df[columns]
    return self.df

dw

etl_to_dw

ETL script to load prepared data into the data warehouse (SQLite database).

File: src/analytics_project/dw/etl_to_dw.py

This file assumes the following structure (yours may vary):

project_root/ │ ├─ data/ │ ├─ raw/ │ ├─ prepared/ │ └─ warehouse/ │ └─ src/ └─ analytics_project/ ├─ data_preparation/ ├─ dw/ ├─ analytics/ └─ utils_logger.py

By switching to a modern src/ layout and using init.py files, we no longer need any sys.path modifications.

Remember to put init.py files (empty is fine) in each folder to make them packages.

NOTE on column names: This example uses inconsistent naming conventions for column names in the cleaned data. A good business intelligence project would standardize these during data preparation. Your names should be more standard after cleaning and pre-processing the data.

Database names generally follow snake_case conventions for SQL compatibility. "snake_case" = all lowercase with underscores between words.

create_schema
create_schema(cursor: Cursor) -> None

Create tables in the data warehouse if they don't exist.

Source code in src/analytics_project/dw/etl_to_dw.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def create_schema(cursor: sqlite3.Cursor) -> None:
    """Create tables in the data warehouse if they don't exist."""
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS customer (
            customer_id INTEGER PRIMARY KEY,
            name TEXT,
            region TEXT,
            join_date TEXT
        )
    """)

    cursor.execute("""
        CREATE TABLE IF NOT EXISTS product (
            product_id INTEGER PRIMARY KEY,
            product_name TEXT,
            category TEXT,
            unit_price REAL
        )
    """)

    cursor.execute("""
        CREATE TABLE IF NOT EXISTS sale (
            sale_id INTEGER PRIMARY KEY,
            customer_id INTEGER,
            product_id INTEGER,
            sale_amount REAL,
            sale_date TEXT,
            FOREIGN KEY (customer_id) REFERENCES customer (customer_id),
            FOREIGN KEY (product_id) REFERENCES product (product_id)
        )
    """)
delete_existing_records
delete_existing_records(cursor: Cursor) -> None

Delete all existing records from the customer, product, and sale tables.

Source code in src/analytics_project/dw/etl_to_dw.py
108
109
110
111
112
def delete_existing_records(cursor: sqlite3.Cursor) -> None:
    """Delete all existing records from the customer, product, and sale tables."""
    cursor.execute("DELETE FROM customer")
    cursor.execute("DELETE FROM product")
    cursor.execute("DELETE FROM sale")
insert_customers
insert_customers(
    customers_df: DataFrame, cursor: Cursor
) -> None

Insert customer data into the customer table.

Source code in src/analytics_project/dw/etl_to_dw.py
115
116
117
118
def insert_customers(customers_df: pd.DataFrame, cursor: sqlite3.Cursor) -> None:
    """Insert customer data into the customer table."""
    logger.info(f"Inserting {len(customers_df)} customer rows.")
    customers_df.to_sql("customer", cursor.connection, if_exists="append", index=False)
insert_products
insert_products(
    products_df: DataFrame, cursor: Cursor
) -> None

Insert product data into the product table.

Source code in src/analytics_project/dw/etl_to_dw.py
121
122
123
124
def insert_products(products_df: pd.DataFrame, cursor: sqlite3.Cursor) -> None:
    """Insert product data into the product table."""
    logger.info(f"Inserting {len(products_df)} product rows.")
    products_df.to_sql("product", cursor.connection, if_exists="append", index=False)
insert_sales
insert_sales(sales_df: DataFrame, cursor: Cursor) -> None

Insert sales data into the sales table.

Source code in src/analytics_project/dw/etl_to_dw.py
127
128
129
130
def insert_sales(sales_df: pd.DataFrame, cursor: sqlite3.Cursor) -> None:
    """Insert sales data into the sales table."""
    logger.info(f"Inserting {len(sales_df)} sale rows.")
    sales_df.to_sql("sale", cursor.connection, if_exists="append", index=False)
load_data_to_db
load_data_to_db() -> None

Load clean data into the data warehouse.

Source code in src/analytics_project/dw/etl_to_dw.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def load_data_to_db() -> None:
    """Load clean data into the data warehouse."""
    logger.info("Starting ETL: loading clean data into the warehouse.")

    # Make sure the warehouse directory exists
    WAREHOUSE_DIR.mkdir(parents=True, exist_ok=True)

    # If an old database exists, remove and recreate with the latest table definitions.
    if DB_PATH.exists():
        logger.info(f"Removing existing warehouse database at: {DB_PATH}")
        DB_PATH.unlink()

    # Initialize a connection variable
    # before the try block so we can close it in finally
    conn: sqlite3.Connection | None = None

    try:
        # Connect to SQLite. Create the file if it doesn't exist
        conn = sqlite3.connect(DB_PATH)
        cursor = conn.cursor()

        # Create schema and clear existing records
        create_schema(cursor)
        delete_existing_records(cursor)

        # Load prepared data using pandas
        customers_df = pd.read_csv(CLEAN_DATA_DIR.joinpath("customers_prepared.csv"))
        products_df = pd.read_csv(CLEAN_DATA_DIR.joinpath("products_prepared.csv"))
        # TODO: Uncomment after implementing sales data preparation
        # sales_df = pd.read_csv(CLEAN_DATA_DIR.joinpath("sales_prepared.csv"))

        # Rename clean columns to match database schema if necessary
        # Clean column name : Database column name
        customers_df = customers_df.rename(
            columns={
                "CustomerID": "customer_id",
                "Name": "name",
                "Region": "region",
                "JoinDate": "join_date",
            }
        )
        logger.info(f"Customer columns (cleaned): {list(customers_df.columns)}")

        # Rename clean columns to match database schema if necessary
        # Clean column name : Database column name
        products_df = products_df.rename(
            columns={
                "productid": "product_id",
                "productname": "product_name",
                "category": "category",
                "unitprice": "unit_price",
            }
        )
        logger.info(f"Product columns (cleaned):  {list(products_df.columns)}")

        # TODO: Rename sales_df columns to match database schema if necessary

        # Insert data into the database for all tables

        insert_customers(customers_df, cursor)

        insert_products(products_df, cursor)

        # TODO: Uncomment after implementing sales data preparation
        # insert_sales(sales_df, cursor)

        conn.commit()
        logger.info("ETL finished successfully. Data loaded into the warehouse.")
    finally:
        # Regardless of success or failure, close the DB connection if it exists
        if conn is not None:
            logger.info("Closing database connection.")
            conn.close()

utils_logger

Provide centralized logging for professional analytics projects.

This module configures project-wide logging to track events, debug issues, and maintain audit trails during data analysis workflows.

Module Information
  • Filename: utils_logger.py
  • Module: utils_logger
  • Location: src/analytics_project/
Key Concepts
  • Centralized logging configuration
  • Log levels (DEBUG, INFO, WARNING, ERROR)
  • File-based log persistence
  • Colorized console output with Loguru
Professional Applications
  • Production debugging and troubleshooting
  • Audit trails for regulatory compliance
  • Performance monitoring and optimization
  • Error tracking in data pipelines

get_log_file_path

get_log_file_path() -> pathlib.Path

Return the path to the active log file, or default path if not initialized.

Source code in src/analytics_project/utils_logger.py
48
49
50
51
52
53
def get_log_file_path() -> pathlib.Path:
    """Return the path to the active log file, or default path if not initialized."""
    if _log_file_path is not None:
        return _log_file_path
    # Fallback: predictable location even before init_logger() runs
    return project_root / "project.log"

init_logger

init_logger(
    level: str = 'INFO',
    *,
    log_dir: str | Path = project_root,
    log_file_name: str = 'project.log',
) -> pathlib.Path

Initialize the logger and return the log file path.

Ensures the log folder exists and configures logging to write to a file.

Parameters:

Name Type Description Default
level str

Logging level (e.g., "INFO", "DEBUG").

'INFO'
log_dir str | Path

Directory where the log file will be written.

project_root
log_file_name str

File name for the log file.

'project.log'

Returns:

Type Description
Path

pathlib.Path: The resolved path to the log file.

Source code in src/analytics_project/utils_logger.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def init_logger(
    level: str = "INFO",
    *,
    log_dir: str | pathlib.Path = project_root,
    log_file_name: str = "project.log",
) -> pathlib.Path:
    """Initialize the logger and return the log file path.

    Ensures the log folder exists and configures logging to write to a file.

    Args:
        level (str): Logging level (e.g., "INFO", "DEBUG").
        log_dir: Directory where the log file will be written.
        log_file_name: File name for the log file.

    Returns:
        pathlib.Path: The resolved path to the log file.
    """
    global _is_configured
    if _is_configured:
        # If already configured once for this process
        return pathlib.Path(log_dir) / log_file_name

    # print a visual separator before logs
    print("-----------------------")

    # Resolve and ensure log folder exists
    log_folder = pathlib.Path(log_dir).expanduser().resolve()
    log_folder.mkdir(parents=True, exist_ok=True)

    # Build log file path
    log_file = log_folder / log_file_name

    try:
        fmt = "{time:YYYY-MM-DD HH:mm}:{level:<7} AT {file}:{line}: {message}"
        # Remove any existing Loguru handlers to avoid duplicate output
        logger.remove()
        logger.add(sys.stderr, level=level, format=fmt)
        logger.add(
            log_file,
            level=level,
            enqueue=True,
            backtrace=True,
            diagnose=False,
            rotation="10 MB",
            retention="7 days",
            encoding="utf-8",
            format=fmt,
        )
        logger.info(f"Logging to file: {log_file.resolve()}")
        _is_configured = True
        _log_file_path = log_file  # cache for retrieval
    except Exception as e:
        logger.error(f"Error configuring logger to write to file: {e}")

    return log_file

log_example

log_example() -> None

Demonstrate logging behavior with example messages.

Source code in src/analytics_project/utils_logger.py
114
115
116
117
118
def log_example() -> None:
    """Demonstrate logging behavior with example messages."""
    logger.info("This is an example info message.")
    logger.warning("This is an example warning message.")
    logger.error("This is an example error message.")

main

main() -> None

Execute logger setup and demonstrate its usage.

Source code in src/analytics_project/utils_logger.py
121
122
123
124
125
def main() -> None:
    """Execute logger setup and demonstrate its usage."""
    log_file = init_logger()
    log_example()
    logger.info(f"View the log output at {log_file}")