Fetching Data with ukpyn¶
This tutorial covers advanced data retrieval techniques:
- Understanding the OpenDataSoft Query Language (ODSQL)
- Filtering records with WHERE clauses
- Sorting and ordering results
- Pagination for large datasets
- Field selection and projection
- Working with facets and refinements
Prerequisites: Complete 01-getting-started.ipynb first.
Note: These tutorials require additional dependencies. Install them with
pip install "ukpyn[all]"— see Tutorial 01 for full setup instructions.
import ukpyn
ukpyn.check_api_key()
print("API key configured!")
1. Understanding ODSQL¶
OpenDataSoft Query Language (ODSQL) is a SQL-like language for filtering and querying data.
Key Operators¶
| Operator | Description | Example |
|---|---|---|
= |
Equal | field = 'value' |
!= |
Not equal | field != 'value' |
>, >= |
Greater than | count > 100 |
<, <= |
Less than | count <= 50 |
IN |
In list | status IN ('active', 'pending') |
LIKE |
Pattern match | name LIKE '%London%' |
AND, OR |
Logical | a = 1 AND b = 2 |
NOT |
Negation | NOT status = 'deleted' |
Date Functions¶
| Function | Description | Example |
|---|---|---|
date(field) |
Extract date | date(timestamp) = '2024-01-01' |
year(field) |
Extract year | year(timestamp) = 2024 |
month(field) |
Extract month | month(timestamp) >= 6 |
2. Filtering Records with WHERE Clauses¶
The where parameter accepts ODSQL expressions to filter records.
# First, let's find a dataset to work with
from ukpyn import UKPNClient
async with UKPNClient() as client:
# List available datasets
datasets = await client.list_datasets(limit=20)
print(f"Available datasets ({datasets.total_count} total):\n")
for item in datasets.datasets:
ds = item.dataset
title = ""
if ds.metas and ds.metas.default:
title = ds.metas.default.get("title", ds.dataset_id)
print(f"- {ds.dataset_id}")
if title and title != ds.dataset_id:
print(f" Title: {title}")
# Let's examine a dataset's fields to understand what we can filter on
# Replace with a dataset ID from the list above
DATASET_ID = "ukpn-smart-meter-installation-volumes" # Example - adjust as needed
async with UKPNClient() as client:
try:
dataset = await client.get_dataset(DATASET_ID)
print(f"Dataset: {dataset.dataset_id}")
print("\nAvailable fields for filtering:")
print("-" * 50)
if dataset.fields:
for field in dataset.fields:
print(f" {field.name:<30} ({field.type})")
if field.description:
print(f" └─ {field.description[:60]}...")
except Exception as e:
print(f"Error: {e}")
print("\nTip: Replace DATASET_ID with a valid dataset from the list above.")
# Example: Basic filtering with WHERE
# Adjust the field names based on your chosen dataset
async with UKPNClient() as client:
try:
# Filter with a simple condition
# Example: filter by a text field
records = await client.get_records(
DATASET_ID,
limit=5,
# where="local_authority = 'Surrey'", # Uncomment and adjust
)
print(f"Found {records.total_count} records")
print(f"\nFirst {len(records.records)} records:")
for i, record in enumerate(records.records, 1):
print(f"\n[{i}] ID: {record.id}")
if record.fields:
for key, value in list(record.fields.items())[:5]:
print(f" {key}: {value}")
except Exception as e:
print(f"Error: {e}")
# Example: Combining multiple conditions with AND/OR
TABLE_3A_ID = "ltds-table-3a-load-data-observed-transposed"
async with UKPNClient() as client:
try:
# Multiple conditions
# Example: combining filters
records = await client.get_records(
TABLE_3A_ID,
limit=5,
where="year = '25-26' AND season = 'Winter'", # Adjust to your dataset
)
print(f"Filtered results: {records.total_count} records")
except Exception as e:
print(f"Error: {e}")
print("\nTip: Adjust the WHERE clause to match your dataset's fields.")
3. Sorting and Ordering Results¶
Use order_by to sort records:
field_name- Ascending order (A-Z, 0-9)-field_name- Descending order (Z-A, 9-0)
# Sorting examples
CONSTRAINTS_DATA_ID = "ukpn-constraints-real-time-meter-readings"
async with UKPNClient() as client:
try:
# Sort by record timestamp (newest first)
records = await client.get_records(
CONSTRAINTS_DATA_ID,
limit=5,
# order_by="-timestamp", # Descending (newest first)
)
print("Records sorted by timestamp (newest first):")
print("-" * 50)
print(f"Total records: {records.total_count}")
print(records.records[0].fields if records.records else "No records found")
for record in records.records:
# Access timestamp from fields dict (it's a data field, not metadata)
timestamp = record.fields.get("timestamp") if record.fields else "N/A"
constraint_id = (
record.fields.get("constraint_id") if record.fields else "N/A"
)
print(f" {constraint_id}: {timestamp}")
except Exception as e:
print(f"Error: {e}")
# Sort by a specific field
# Adjust the field name to match your dataset
POLES_DATASET_ID = "ukpn-132kv-poles-towers"
async with UKPNClient() as client:
try:
# Sort ascending
records_asc = await client.get_records(
POLES_DATASET_ID,
limit=3,
# order_by="field_name", # Ascending
)
# Sort descending
records_desc = await client.get_records(
POLES_DATASET_ID,
limit=3,
# order_by="-field_name", # Descending
)
print("Ascending order:")
for r in records_asc.records:
print(f" {r.id}")
print("\nDescending order:")
for r in records_desc.records:
print(f" {r.id}")
except Exception as e:
print(f"Error: {e}")
4. Pagination for Large Datasets¶
Large datasets require pagination. Use limit and offset to page through results.
Page 1: offset=0, limit=10 -> records 1-10
Page 2: offset=10, limit=10 -> records 11-20
Page 3: offset=20, limit=10 -> records 21-30
# Basic pagination example
async with UKPNClient() as client:
page_size = 10
# Get first page
page1 = await client.get_records(
DATASET_ID,
limit=page_size,
offset=0,
)
total_records = page1.total_count
total_pages = (total_records + page_size - 1) // page_size
print(f"Dataset: {DATASET_ID}")
print(f"Total records: {total_records}")
print(f"Page size: {page_size}")
print(f"Total pages: {total_pages}")
print(f"\nPage 1 has {len(page1.records)} records")
# Iterate through multiple pages
async with UKPNClient() as client:
page_size = 10
max_pages = 3 # Limit for this example
all_records = []
for page_num in range(max_pages):
offset = page_num * page_size
response = await client.get_records(
DATASET_ID,
limit=page_size,
offset=offset,
)
records = response.records
all_records.extend(records)
print(
f"Page {page_num + 1}: Retrieved {len(records)} records (offset={offset})"
)
# Stop if we've retrieved all records
if len(records) < page_size:
break
print(f"\nTotal records collected: {len(all_records)}")
# Helper function for paginated retrieval
async def fetch_all_records(
client: UKPNClient,
dataset_id: str,
page_size: int = 100,
max_records: int = 1000,
where: str = None,
):
"""
Fetch multiple pages of records from a dataset.
Args:
client: UKPNClient instance
dataset_id: Dataset to query
page_size: Records per page (max 100)
max_records: Maximum total records to fetch
where: Optional filter expression
Returns:
List of all fetched records
"""
all_records = []
offset = 0
while len(all_records) < max_records:
response = await client.get_records(
dataset_id,
limit=min(page_size, max_records - len(all_records)),
offset=offset,
where=where,
)
records = response.records
if not records:
break
all_records.extend(records)
offset += len(records)
# Stop if we've reached the end
if len(records) < page_size or len(all_records) >= response.total_count:
break
return all_records
# Example usage
async with UKPNClient() as client:
records = await fetch_all_records(
client,
DATASET_ID,
page_size=50,
max_records=200,
)
print(f"Fetched {len(records)} records total")
5. Field Selection and Projection¶
Use select to retrieve only specific fields. This reduces response size and improves performance.
# Select specific fields only
async with UKPNClient() as client:
# First, get all fields to see what's available
dataset = await client.get_dataset(DATASET_ID)
if dataset.fields:
field_names = [f.name for f in dataset.fields[:5]] # First 5 fields
print(f"Selecting fields: {field_names}")
# Query with field selection
records = await client.get_records(
DATASET_ID,
limit=3,
select=", ".join(field_names), # Comma-separated field names
)
print("\nRecords (with selected fields only):")
for record in records.records:
print(f"\n {record.id}:")
if record.fields:
for key, value in record.fields.items():
print(f" {key}: {value}")
# Compare response sizes with and without field selection
import json
async with UKPNClient() as client:
# Fetch without selection (all fields)
full_records = await client.get_records(DATASET_ID, limit=10)
# Fetch with selection (fewer fields)
dataset = await client.get_dataset(DATASET_ID)
if dataset.fields and len(dataset.fields) >= 2:
two_fields = f"{dataset.fields[0].name}, {dataset.fields[1].name}"
selected_records = await client.get_records(
DATASET_ID,
limit=10,
select=two_fields,
)
# Estimate size difference
full_size = sum(
len(json.dumps(r.fields)) for r in full_records.records if r.fields
)
selected_size = sum(
len(json.dumps(r.fields)) for r in selected_records.records if r.fields
)
print(f"All fields: ~{full_size} bytes")
print(f"Selected fields ({two_fields}): ~{selected_size} bytes")
print(f"Reduction: {100 - (selected_size / full_size * 100):.1f}%")
6. Working with Facets and Refinements¶
Facets allow you to filter by categorical values. Use refine and exclude parameters.
# Refine results by a facet value
async with UKPNClient() as client:
try:
# Refine by a specific value
# Adjust the facet name and value for your dataset
records = await client.get_records(
DATASET_ID,
limit=5,
# refine={"region": "London"}, # Uncomment and adjust
)
print(f"Found {records.total_count} records")
except Exception as e:
print(f"Error: {e}")
# Exclude specific facet values
async with UKPNClient() as client:
try:
# Exclude records with specific value
records = await client.get_records(
DATASET_ID,
limit=5,
# exclude={"status": "inactive"}, # Uncomment and adjust
)
print(f"Found {records.total_count} records (excluding specified values)")
except Exception as e:
print(f"Error: {e}")
7. Combining Techniques¶
You can combine filtering, sorting, selection, and pagination for powerful queries.
# Combined query example
async with UKPNClient() as client:
try:
# Get dataset fields
dataset = await client.get_dataset(DATASET_ID)
if dataset.fields:
# Select first 3 fields
fields_to_select = [f.name for f in dataset.fields[:3]]
# Combined query
records = await client.get_records(
DATASET_ID,
limit=10,
offset=0,
select=", ".join(fields_to_select),
order_by="-record_timestamp",
# where="year >= 2023", # Uncomment and adjust
)
print(f"Query: SELECT {', '.join(fields_to_select)}")
print(" ORDER BY -record_timestamp")
print(" LIMIT 10 OFFSET 0")
print(
f"\nResults: {records.total_count} total, showing {len(records.records)}"
)
print("-" * 60)
for record in records.records[:5]:
print(f"\n{record.id}:")
if record.fields:
for key, value in record.fields.items():
print(f" {key}: {value}")
except Exception as e:
print(f"Error: {e}")
Summary¶
You've learned how to:
- Use ODSQL for filtering with
whereclauses - Sort results with
order_by(ascending/descending) - Paginate through large datasets with
limitandoffset - Select specific fields with
selectto improve performance - Use facet refinement with
refineandexclude
Next Steps¶
- Check out 03-analysis-patterns.ipynb for data analysis workflows with pandas
- Explore the ODSQL documentation) for advanced queries
- Try building your own queries on the UK Power Networks datasets!