Analysis Patterns with ukpyn¶
This tutorial demonstrates common data analysis workflows using ukpyn and pandas.
What you'll learn:
- Loading UKPN data into pandas DataFrames
- Data cleaning and preparation
- Exploratory data analysis (EDA)
- Time series analysis
- Aggregations and grouping
- Visualization basics
Prerequisites:
- Complete 01-getting-started.ipynb and 02-fetching-data.ipynb
- Install pandas:
pip install pandas - Optional:
pip install matplotlibfor visualizations - These tutorials require additional dependencies. Install them with
pip install "ukpyn[all]"— see Tutorial 01 for full setup instructions.
In [ ]:
Copied!
# Required imports
from io import BytesIO
import ukpyn
# Check for required packages
try:
import pandas as pd
print(f"pandas version: {pd.__version__}")
except ImportError:
print("pandas not installed. Run: pip install pandas")
try:
import matplotlib.pyplot as plt
print("matplotlib available")
HAS_MATPLOTLIB = True
except ImportError:
print("matplotlib not installed (optional). Run: pip install matplotlib")
HAS_MATPLOTLIB = False
ukpyn.check_api_key()
print("API key configured!")
# Required imports
from io import BytesIO
import ukpyn
# Check for required packages
try:
import pandas as pd
print(f"pandas version: {pd.__version__}")
except ImportError:
print("pandas not installed. Run: pip install pandas")
try:
import matplotlib.pyplot as plt
print("matplotlib available")
HAS_MATPLOTLIB = True
except ImportError:
print("matplotlib not installed (optional). Run: pip install matplotlib")
HAS_MATPLOTLIB = False
ukpyn.check_api_key()
print("API key configured!")
1. Loading UKPN Data into Pandas DataFrames¶
There are two main approaches:
- Export to CSV and load with pandas (recommended for large datasets)
- Convert records directly to DataFrame (for smaller datasets or when filtering)
In [ ]:
Copied!
# Choose a dataset to work with
# Replace with a valid dataset ID from your exploration
from ukpyn import UKPNClient
DATASET_ID = "ukpn-constraints-real-time-meter-readings" # Example
# Method 1: Export to CSV and load into pandas (recommended)
async with UKPNClient() as client:
try:
# Export data as CSV
csv_data = await client.export_data(
DATASET_ID,
format="csv",
limit=500, # Limit for this example
)
# Load into pandas DataFrame
# Note: OpenDataSoft CSV uses semicolon separator
df = pd.read_csv(BytesIO(csv_data), sep=";")
print(f"Loaded {len(df)} rows, {len(df.columns)} columns")
print(f"\nColumns: {list(df.columns)}")
except Exception as e:
print(f"Error: {e}")
print("\nTip: Replace DATASET_ID with a valid dataset.")
df = pd.DataFrame() # Empty fallback
# Choose a dataset to work with
# Replace with a valid dataset ID from your exploration
from ukpyn import UKPNClient
DATASET_ID = "ukpn-constraints-real-time-meter-readings" # Example
# Method 1: Export to CSV and load into pandas (recommended)
async with UKPNClient() as client:
try:
# Export data as CSV
csv_data = await client.export_data(
DATASET_ID,
format="csv",
limit=500, # Limit for this example
)
# Load into pandas DataFrame
# Note: OpenDataSoft CSV uses semicolon separator
df = pd.read_csv(BytesIO(csv_data), sep=";")
print(f"Loaded {len(df)} rows, {len(df.columns)} columns")
print(f"\nColumns: {list(df.columns)}")
except Exception as e:
print(f"Error: {e}")
print("\nTip: Replace DATASET_ID with a valid dataset.")
df = pd.DataFrame() # Empty fallback
In [ ]:
Copied!
# Method 2: Convert API records to DataFrame
# Better for filtered queries or when you need pagination control
async with UKPNClient() as client:
try:
records_response = await client.get_records(
DATASET_ID,
limit=100,
)
# Convert records to list of dicts
data = []
for record in records_response.records:
if record.fields:
row = {"_id": record.id, **record.fields}
if record.record_timestamp:
row["_timestamp"] = record.record_timestamp
data.append(row)
# Create DataFrame
df_records = pd.DataFrame(data)
print(f"Converted {len(df_records)} records to DataFrame")
print(f"\nShape: {df_records.shape}")
display(df_records.head())
except Exception as e:
print(f"Error: {e}")
# Method 2: Convert API records to DataFrame
# Better for filtered queries or when you need pagination control
async with UKPNClient() as client:
try:
records_response = await client.get_records(
DATASET_ID,
limit=100,
)
# Convert records to list of dicts
data = []
for record in records_response.records:
if record.fields:
row = {"_id": record.id, **record.fields}
if record.record_timestamp:
row["_timestamp"] = record.record_timestamp
data.append(row)
# Create DataFrame
df_records = pd.DataFrame(data)
print(f"Converted {len(df_records)} records to DataFrame")
print(f"\nShape: {df_records.shape}")
display(df_records.head())
except Exception as e:
print(f"Error: {e}")
In [ ]:
Copied!
# Helper function: Records to DataFrame
def records_to_dataframe(records: list) -> pd.DataFrame:
"""
Convert ukpyn Record objects to a pandas DataFrame.
Args:
records: List of Record objects from ukpyn
Returns:
pandas DataFrame
"""
data = []
for record in records:
row = {"_id": record.id}
if record.record_timestamp:
row["_timestamp"] = record.record_timestamp
if record.fields:
row.update(record.fields)
data.append(row)
return pd.DataFrame(data)
# Example usage
async with UKPNClient() as client:
response = await client.get_records(DATASET_ID, limit=50)
df = records_to_dataframe(response.records)
print(f"Created DataFrame: {df.shape}")
# Helper function: Records to DataFrame
def records_to_dataframe(records: list) -> pd.DataFrame:
"""
Convert ukpyn Record objects to a pandas DataFrame.
Args:
records: List of Record objects from ukpyn
Returns:
pandas DataFrame
"""
data = []
for record in records:
row = {"_id": record.id}
if record.record_timestamp:
row["_timestamp"] = record.record_timestamp
if record.fields:
row.update(record.fields)
data.append(row)
return pd.DataFrame(data)
# Example usage
async with UKPNClient() as client:
response = await client.get_records(DATASET_ID, limit=50)
df = records_to_dataframe(response.records)
print(f"Created DataFrame: {df.shape}")
2. Data Cleaning and Preparation¶
Common data cleaning tasks for UKPN data.
In [ ]:
Copied!
# Load fresh data for cleaning examples
async with UKPNClient() as client:
csv_data = await client.export_data(DATASET_ID, format="csv", limit=500)
df = pd.read_csv(BytesIO(csv_data), sep=";")
# Display basic info
print("DataFrame Info:")
print(f"Shape: {df.shape}")
print("\nColumn types:")
print(df.dtypes)
# Load fresh data for cleaning examples
async with UKPNClient() as client:
csv_data = await client.export_data(DATASET_ID, format="csv", limit=500)
df = pd.read_csv(BytesIO(csv_data), sep=";")
# Display basic info
print("DataFrame Info:")
print(f"Shape: {df.shape}")
print("\nColumn types:")
print(df.dtypes)
In [ ]:
Copied!
# Check for missing values
missing = df.isnull().sum()
missing_pct = (missing / len(df) * 100).round(2)
missing_info = pd.DataFrame({"Missing": missing, "Percent": missing_pct})
print("Missing Values:")
print(missing_info[missing_info["Missing"] > 0])
if missing.sum() == 0:
print("No missing values found!")
# Check for missing values
missing = df.isnull().sum()
missing_pct = (missing / len(df) * 100).round(2)
missing_info = pd.DataFrame({"Missing": missing, "Percent": missing_pct})
print("Missing Values:")
print(missing_info[missing_info["Missing"] > 0])
if missing.sum() == 0:
print("No missing values found!")
In [ ]:
Copied!
# Convert date columns to datetime
# Identify potential date columns
date_columns = [
col
for col in df.columns
if any(
keyword in col.lower()
for keyword in ["date", "time", "timestamp", "year", "month"]
)
]
print(f"Potential date columns: {date_columns}")
# Convert to datetime (adjust column names as needed)
for col in date_columns:
if col in df.columns:
try:
df[col] = pd.to_datetime(df[col], errors="coerce")
print(f"Converted {col} to datetime")
except Exception as e:
print(f"Could not convert {col}: {e}")
# Convert date columns to datetime
# Identify potential date columns
date_columns = [
col
for col in df.columns
if any(
keyword in col.lower()
for keyword in ["date", "time", "timestamp", "year", "month"]
)
]
print(f"Potential date columns: {date_columns}")
# Convert to datetime (adjust column names as needed)
for col in date_columns:
if col in df.columns:
try:
df[col] = pd.to_datetime(df[col], errors="coerce")
print(f"Converted {col} to datetime")
except Exception as e:
print(f"Could not convert {col}: {e}")
In [ ]:
Copied!
# Convert numeric columns
# Identify potential numeric columns stored as strings
for col in df.select_dtypes(include=["object"]).columns:
# Try to convert to numeric
numeric_col = pd.to_numeric(df[col], errors="coerce")
# If most values converted successfully, use numeric
if numeric_col.notna().sum() > len(df) * 0.8:
df[col] = numeric_col
print(f"Converted {col} to numeric")
print("\nUpdated column types:")
print(df.dtypes)
# Convert numeric columns
# Identify potential numeric columns stored as strings
for col in df.select_dtypes(include=["object"]).columns:
# Try to convert to numeric
numeric_col = pd.to_numeric(df[col], errors="coerce")
# If most values converted successfully, use numeric
if numeric_col.notna().sum() > len(df) * 0.8:
df[col] = numeric_col
print(f"Converted {col} to numeric")
print("\nUpdated column types:")
print(df.dtypes)
In [ ]:
Copied!
# Clean column names (optional)
# Make column names lowercase and replace spaces with underscores
df_clean = df.copy()
df_clean.columns = (
df_clean.columns.str.lower()
.str.replace(" ", "_")
.str.replace(".", "_")
.str.replace("-", "_")
)
print("Cleaned column names:")
print(list(df_clean.columns))
# Clean column names (optional)
# Make column names lowercase and replace spaces with underscores
df_clean = df.copy()
df_clean.columns = (
df_clean.columns.str.lower()
.str.replace(" ", "_")
.str.replace(".", "_")
.str.replace("-", "_")
)
print("Cleaned column names:")
print(list(df_clean.columns))
3. Exploratory Data Analysis (EDA)¶
Basic exploration techniques for understanding UKPN datasets.
In [ ]:
Copied!
# Basic statistics
print("Numeric Summary Statistics:")
display(df.describe())
# Basic statistics
print("Numeric Summary Statistics:")
display(df.describe())
In [ ]:
Copied!
# Categorical column analysis
categorical_cols = df.select_dtypes(include=["object"]).columns
print("Categorical Columns:")
for col in categorical_cols[:5]: # First 5 categorical columns
unique_count = df[col].nunique()
print(f"\n{col}:")
print(f" Unique values: {unique_count}")
if unique_count <= 10:
print(" Value counts:")
print(df[col].value_counts().head())
# Categorical column analysis
categorical_cols = df.select_dtypes(include=["object"]).columns
print("Categorical Columns:")
for col in categorical_cols[:5]: # First 5 categorical columns
unique_count = df[col].nunique()
print(f"\n{col}:")
print(f" Unique values: {unique_count}")
if unique_count <= 10:
print(" Value counts:")
print(df[col].value_counts().head())
In [ ]:
Copied!
# Sample rows
print("Random sample of 5 rows:")
display(df.sample(min(5, len(df))))
# Sample rows
print("Random sample of 5 rows:")
display(df.sample(min(5, len(df))))
In [ ]:
Copied!
# Quick EDA function
def quick_eda(df: pd.DataFrame) -> None:
"""
Perform quick exploratory data analysis on a DataFrame.
"""
print("=" * 60)
print("QUICK EDA REPORT")
print("=" * 60)
print(f"\n1. SHAPE: {df.shape[0]} rows x {df.shape[1]} columns")
print("\n2. COLUMN TYPES:")
print(df.dtypes.value_counts())
print("\n3. MISSING VALUES:")
missing = df.isnull().sum()
if missing.sum() > 0:
print(missing[missing > 0])
else:
print(" No missing values")
print(f"\n4. MEMORY USAGE: {df.memory_usage(deep=True).sum() / 1024:.2f} KB")
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(numeric_cols) > 0:
print("\n5. NUMERIC SUMMARY:")
print(df[numeric_cols].describe().round(2))
# Run quick EDA
quick_eda(df)
# Quick EDA function
def quick_eda(df: pd.DataFrame) -> None:
"""
Perform quick exploratory data analysis on a DataFrame.
"""
print("=" * 60)
print("QUICK EDA REPORT")
print("=" * 60)
print(f"\n1. SHAPE: {df.shape[0]} rows x {df.shape[1]} columns")
print("\n2. COLUMN TYPES:")
print(df.dtypes.value_counts())
print("\n3. MISSING VALUES:")
missing = df.isnull().sum()
if missing.sum() > 0:
print(missing[missing > 0])
else:
print(" No missing values")
print(f"\n4. MEMORY USAGE: {df.memory_usage(deep=True).sum() / 1024:.2f} KB")
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(numeric_cols) > 0:
print("\n5. NUMERIC SUMMARY:")
print(df[numeric_cols].describe().round(2))
# Run quick EDA
quick_eda(df)
4. Time Series Analysis¶
Working with time-based UKPN data.
In [ ]:
Copied!
# Identify datetime columns
datetime_cols = [col for col in df.columns if "datetime" in str(df[col].dtype)]
print(f"Datetime columns: {list(datetime_cols)}")
# If no datetime columns found, look for date-like strings
if len(datetime_cols) == 0:
print("\nNo datetime columns. Looking for date-like columns...")
for col in df.columns:
if df[col].dtype == "object":
sample = df[col].dropna().iloc[0] if len(df[col].dropna()) > 0 else ""
print(f" {col}: {sample}")
# Identify datetime columns
datetime_cols = [col for col in df.columns if "datetime" in str(df[col].dtype)]
print(f"Datetime columns: {list(datetime_cols)}")
# If no datetime columns found, look for date-like strings
if len(datetime_cols) == 0:
print("\nNo datetime columns. Looking for date-like columns...")
for col in df.columns:
if df[col].dtype == "object":
sample = df[col].dropna().iloc[0] if len(df[col].dropna()) > 0 else ""
print(f" {col}: {sample}")
In [ ]:
Copied!
# Time series aggregation example
# Adjust column names based on your dataset
# Example: If you have a date column and a numeric value column
# DATE_COL = "date" # Replace with actual column name
# VALUE_COL = "value" # Replace with actual column name
# Find potential columns
datetime_cols = [col for col in df.columns if "datetime" in str(df[col].dtype)]
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(datetime_cols) > 0 and len(numeric_cols) > 0:
DATE_COL = datetime_cols[0]
VALUE_COL = numeric_cols[0]
print(f"Aggregating {VALUE_COL} by {DATE_COL}")
# Create a copy and set date as index
ts_df = df[[DATE_COL, VALUE_COL]].copy()
ts_df = ts_df.set_index(DATE_COL)
# Resample by month and calculate mean
monthly = ts_df.resample("ME").agg({VALUE_COL: ["mean", "sum", "count"]})
print("\nMonthly aggregation:")
display(monthly.head(10))
else:
print("No suitable datetime + numeric columns found for time series analysis.")
print(f"Datetime columns: {list(datetime_cols)}")
print(f"Numeric columns: {list(numeric_cols)}")
# Time series aggregation example
# Adjust column names based on your dataset
# Example: If you have a date column and a numeric value column
# DATE_COL = "date" # Replace with actual column name
# VALUE_COL = "value" # Replace with actual column name
# Find potential columns
datetime_cols = [col for col in df.columns if "datetime" in str(df[col].dtype)]
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(datetime_cols) > 0 and len(numeric_cols) > 0:
DATE_COL = datetime_cols[0]
VALUE_COL = numeric_cols[0]
print(f"Aggregating {VALUE_COL} by {DATE_COL}")
# Create a copy and set date as index
ts_df = df[[DATE_COL, VALUE_COL]].copy()
ts_df = ts_df.set_index(DATE_COL)
# Resample by month and calculate mean
monthly = ts_df.resample("ME").agg({VALUE_COL: ["mean", "sum", "count"]})
print("\nMonthly aggregation:")
display(monthly.head(10))
else:
print("No suitable datetime + numeric columns found for time series analysis.")
print(f"Datetime columns: {list(datetime_cols)}")
print(f"Numeric columns: {list(numeric_cols)}")
In [ ]:
Copied!
# Extract date components for analysis
# Useful when you don't have a proper datetime column
# Look for year/month columns
year_cols = [c for c in df.columns if "year" in c.lower()]
month_cols = [c for c in df.columns if "month" in c.lower()]
print(f"Year columns: {year_cols}")
print(f"Month columns: {month_cols}")
# If we have year column, analyze by year
if year_cols:
year_col = year_cols[0]
print(f"\nDistribution by {year_col}:")
print(df[year_col].value_counts().sort_index())
# Extract date components for analysis
# Useful when you don't have a proper datetime column
# Look for year/month columns
year_cols = [c for c in df.columns if "year" in c.lower()]
month_cols = [c for c in df.columns if "month" in c.lower()]
print(f"Year columns: {year_cols}")
print(f"Month columns: {month_cols}")
# If we have year column, analyze by year
if year_cols:
year_col = year_cols[0]
print(f"\nDistribution by {year_col}:")
print(df[year_col].value_counts().sort_index())
5. Aggregations and Grouping¶
Common grouping and aggregation patterns.
In [ ]:
Copied!
# Group by categorical column
categorical_cols = df.select_dtypes(include=["object"]).columns
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(categorical_cols) > 0 and len(numeric_cols) > 0:
cat_col = categorical_cols[0]
num_col = numeric_cols[0]
print(f"Grouping by {cat_col}, aggregating {num_col}")
grouped = (
df.groupby(cat_col)[num_col]
.agg(["count", "mean", "sum", "min", "max"])
.round(2)
)
display(grouped.head(10))
else:
print("Need both categorical and numeric columns for this example.")
# Group by categorical column
categorical_cols = df.select_dtypes(include=["object"]).columns
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(categorical_cols) > 0 and len(numeric_cols) > 0:
cat_col = categorical_cols[0]
num_col = numeric_cols[0]
print(f"Grouping by {cat_col}, aggregating {num_col}")
grouped = (
df.groupby(cat_col)[num_col]
.agg(["count", "mean", "sum", "min", "max"])
.round(2)
)
display(grouped.head(10))
else:
print("Need both categorical and numeric columns for this example.")
In [ ]:
Copied!
# Multi-level grouping
# Group by multiple columns
if len(categorical_cols) >= 2 and len(numeric_cols) > 0:
cat1, cat2 = categorical_cols[0], categorical_cols[1]
num_col = numeric_cols[0]
print(f"Grouping by {cat1} and {cat2}")
multi_grouped = df.groupby([cat1, cat2])[num_col].agg(["count", "mean"]).round(2)
# Show top 10 groups
display(multi_grouped.head(10))
else:
print("Need at least 2 categorical columns for multi-level grouping.")
# Multi-level grouping
# Group by multiple columns
if len(categorical_cols) >= 2 and len(numeric_cols) > 0:
cat1, cat2 = categorical_cols[0], categorical_cols[1]
num_col = numeric_cols[0]
print(f"Grouping by {cat1} and {cat2}")
multi_grouped = df.groupby([cat1, cat2])[num_col].agg(["count", "mean"]).round(2)
# Show top 10 groups
display(multi_grouped.head(10))
else:
print("Need at least 2 categorical columns for multi-level grouping.")
In [ ]:
Copied!
# Pivot table example
if len(categorical_cols) >= 2 and len(numeric_cols) > 0:
try:
pivot = pd.pivot_table(
df,
values=numeric_cols[0],
index=categorical_cols[0],
columns=categorical_cols[1],
aggfunc="mean",
fill_value=0,
).round(2)
print(f"Pivot table: {categorical_cols[0]} x {categorical_cols[1]}")
display(pivot.head(10))
except Exception as e:
print(f"Could not create pivot table: {e}")
else:
print("Insufficient columns for pivot table.")
# Pivot table example
if len(categorical_cols) >= 2 and len(numeric_cols) > 0:
try:
pivot = pd.pivot_table(
df,
values=numeric_cols[0],
index=categorical_cols[0],
columns=categorical_cols[1],
aggfunc="mean",
fill_value=0,
).round(2)
print(f"Pivot table: {categorical_cols[0]} x {categorical_cols[1]}")
display(pivot.head(10))
except Exception as e:
print(f"Could not create pivot table: {e}")
else:
print("Insufficient columns for pivot table.")
6. Visualization Basics¶
Simple visualizations using matplotlib (optional dependency).
In [ ]:
Copied!
if HAS_MATPLOTLIB:
# Bar chart of categorical distribution
categorical_cols = df.select_dtypes(include=["object"]).columns
if len(categorical_cols) > 0:
col = categorical_cols[0]
# Get top 10 categories
top_10 = df[col].value_counts().head(10)
fig, ax = plt.subplots(figsize=(10, 6))
top_10.plot(kind="bar", ax=ax)
ax.set_title(f"Top 10 {col} values")
ax.set_xlabel(col)
ax.set_ylabel("Count")
plt.xticks(rotation=45, ha="right")
plt.tight_layout()
plt.show()
else:
print("matplotlib not available. Install with: pip install matplotlib")
if HAS_MATPLOTLIB:
# Bar chart of categorical distribution
categorical_cols = df.select_dtypes(include=["object"]).columns
if len(categorical_cols) > 0:
col = categorical_cols[0]
# Get top 10 categories
top_10 = df[col].value_counts().head(10)
fig, ax = plt.subplots(figsize=(10, 6))
top_10.plot(kind="bar", ax=ax)
ax.set_title(f"Top 10 {col} values")
ax.set_xlabel(col)
ax.set_ylabel("Count")
plt.xticks(rotation=45, ha="right")
plt.tight_layout()
plt.show()
else:
print("matplotlib not available. Install with: pip install matplotlib")
In [ ]:
Copied!
if HAS_MATPLOTLIB:
# Histogram of numeric distribution
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(numeric_cols) > 0:
col = numeric_cols[0]
fig, ax = plt.subplots(figsize=(10, 6))
df[col].hist(bins=30, ax=ax, edgecolor="black")
ax.set_title(f"Distribution of {col}")
ax.set_xlabel(col)
ax.set_ylabel("Frequency")
plt.tight_layout()
plt.show()
else:
print("matplotlib not available.")
if HAS_MATPLOTLIB:
# Histogram of numeric distribution
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(numeric_cols) > 0:
col = numeric_cols[0]
fig, ax = plt.subplots(figsize=(10, 6))
df[col].hist(bins=30, ax=ax, edgecolor="black")
ax.set_title(f"Distribution of {col}")
ax.set_xlabel(col)
ax.set_ylabel("Frequency")
plt.tight_layout()
plt.show()
else:
print("matplotlib not available.")
In [ ]:
Copied!
if HAS_MATPLOTLIB:
# Time series line plot
datetime_cols = [col for col in df.columns if "datetime" in str(df[col].dtype)]
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(datetime_cols) > 0 and len(numeric_cols) > 0:
date_col = datetime_cols[0]
value_col = numeric_cols[0]
# Sort by date
plot_df = df[[date_col, value_col]].sort_values(date_col)
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(
plot_df[date_col], plot_df[value_col], marker=".", linestyle="-", alpha=0.7
)
ax.set_title(f"{value_col} over time")
ax.set_xlabel("Date")
ax.set_ylabel(value_col)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
else:
print("No datetime column found for time series plot.")
else:
print("matplotlib not available.")
if HAS_MATPLOTLIB:
# Time series line plot
datetime_cols = [col for col in df.columns if "datetime" in str(df[col].dtype)]
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(datetime_cols) > 0 and len(numeric_cols) > 0:
date_col = datetime_cols[0]
value_col = numeric_cols[0]
# Sort by date
plot_df = df[[date_col, value_col]].sort_values(date_col)
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(
plot_df[date_col], plot_df[value_col], marker=".", linestyle="-", alpha=0.7
)
ax.set_title(f"{value_col} over time")
ax.set_xlabel("Date")
ax.set_ylabel(value_col)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
else:
print("No datetime column found for time series plot.")
else:
print("matplotlib not available.")
7. Complete Analysis Workflow Example¶
Putting it all together with a real dataset.
In [ ]:
Copied!
async def analyze_dataset(dataset_id: str, limit: int = 1000):
"""
Complete analysis workflow for a UKPN dataset.
Args:
dataset_id: The UKPN dataset to analyze
limit: Maximum records to fetch
"""
print(f"Analyzing dataset: {dataset_id}")
print("=" * 60)
async with UKPNClient() as client:
# Step 1: Get dataset metadata
print("\n1. DATASET METADATA")
print("-" * 40)
dataset = await client.get_dataset(dataset_id)
title = "Unknown"
if dataset.metas and dataset.metas.default:
title = dataset.metas.default.get("title", dataset_id)
print(f"Title: {title}")
print(f"Has records: {dataset.has_records}")
print(f"Fields: {len(dataset.fields) if dataset.fields else 0}")
# Step 2: Load data
print("\n2. LOADING DATA")
print("-" * 40)
csv_data = await client.export_data(
dataset_id,
format="csv",
limit=limit,
)
df = pd.read_csv(BytesIO(csv_data), sep=";")
print(f"Loaded {len(df)} rows x {len(df.columns)} columns")
# Step 3: Quick EDA
print("\n3. DATA QUALITY")
print("-" * 40)
missing = df.isnull().sum().sum()
print(f"Total missing values: {missing}")
print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024:.2f} KB")
# Step 4: Numeric summary
print("\n4. NUMERIC SUMMARY")
print("-" * 40)
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(numeric_cols) > 0:
print(df[numeric_cols[:3]].describe().round(2)) # First 3 numeric cols
else:
print("No numeric columns found.")
# Step 5: Categorical summary
print("\n5. CATEGORICAL SUMMARY")
print("-" * 40)
cat_cols = df.select_dtypes(include=["object"]).columns
for col in cat_cols[:3]: # First 3 categorical cols
print(f"\n{col}: {df[col].nunique()} unique values")
print(df[col].value_counts().head(3))
print("\n" + "=" * 60)
print("Analysis complete!")
return df
# Run the analysis
try:
df_analyzed = await analyze_dataset(DATASET_ID, limit=500)
except Exception as e:
print(f"Analysis failed: {e}")
async def analyze_dataset(dataset_id: str, limit: int = 1000):
"""
Complete analysis workflow for a UKPN dataset.
Args:
dataset_id: The UKPN dataset to analyze
limit: Maximum records to fetch
"""
print(f"Analyzing dataset: {dataset_id}")
print("=" * 60)
async with UKPNClient() as client:
# Step 1: Get dataset metadata
print("\n1. DATASET METADATA")
print("-" * 40)
dataset = await client.get_dataset(dataset_id)
title = "Unknown"
if dataset.metas and dataset.metas.default:
title = dataset.metas.default.get("title", dataset_id)
print(f"Title: {title}")
print(f"Has records: {dataset.has_records}")
print(f"Fields: {len(dataset.fields) if dataset.fields else 0}")
# Step 2: Load data
print("\n2. LOADING DATA")
print("-" * 40)
csv_data = await client.export_data(
dataset_id,
format="csv",
limit=limit,
)
df = pd.read_csv(BytesIO(csv_data), sep=";")
print(f"Loaded {len(df)} rows x {len(df.columns)} columns")
# Step 3: Quick EDA
print("\n3. DATA QUALITY")
print("-" * 40)
missing = df.isnull().sum().sum()
print(f"Total missing values: {missing}")
print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024:.2f} KB")
# Step 4: Numeric summary
print("\n4. NUMERIC SUMMARY")
print("-" * 40)
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(numeric_cols) > 0:
print(df[numeric_cols[:3]].describe().round(2)) # First 3 numeric cols
else:
print("No numeric columns found.")
# Step 5: Categorical summary
print("\n5. CATEGORICAL SUMMARY")
print("-" * 40)
cat_cols = df.select_dtypes(include=["object"]).columns
for col in cat_cols[:3]: # First 3 categorical cols
print(f"\n{col}: {df[col].nunique()} unique values")
print(df[col].value_counts().head(3))
print("\n" + "=" * 60)
print("Analysis complete!")
return df
# Run the analysis
try:
df_analyzed = await analyze_dataset(DATASET_ID, limit=500)
except Exception as e:
print(f"Analysis failed: {e}")
Summary¶
You've learned how to:
- Load data into pandas using CSV export or record conversion
- Clean data by handling missing values, converting types, and standardizing column names
- Explore data with summary statistics and categorical analysis
- Analyze time series with resampling and date component extraction
- Aggregate data using groupby, pivot tables, and multi-level grouping
- Visualize data with bar charts, histograms, and time series plots
Tips for Working with UKPN Data¶
- Start small: Use
limitto work with a sample before loading full datasets - Check field types: UKPN data may have dates as strings that need conversion
- Use CSV export: More efficient than paginating through API records
- Save processed data: Cache cleaned DataFrames locally to avoid repeated API calls
Next Steps¶
- Explore specific UKPN datasets relevant to your use case
- Check out the examples for community contributions
- Consider advanced analysis with scikit-learn for machine learning applications