REST API Crawler
This guide explains how to use the
create_rest_factory_asset
factory function to automatically build queries and scrape REST APIs.
When working with APIs, you often need to write boilerplate code to fetch data, parse it, and load it into your data warehouse. This process can be time-consuming and error-prone.
To simplify this process, we've created tools that allow you to ingest data from APIs with minimal effort:
- Minimal boilerplate: Define your endpoints once and let the factory handle the rest
- Efficient asset creation: Create a single unified asset for your API data
- Seamless OSO integration: Built to fit our data ingestion approach
Defining Your API Crawler
Below is an example showing how to ingest data from the DefiLlama API, which retrieves data on various DeFi protocols.
1. Create a Configuration Object
The configuration object defines how to interact with the API. Here's a simple example:
from dlt.sources.rest_api.typing import RESTAPIConfig
def mk_defillama_config() -> RESTAPIConfig:
"""
Create a simple REST API config for fetching DefiLlama data.
"""
# Define protocols we want to fetch
protocols = ["aave-v3", "uniswap", "velodrome"]
return {
"client": {
"base_url": "https://api.llama.fi/",
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "replace",
},
"resources": [
{
"name": f"{protocol}_data",
"endpoint": {
"path": f"protocol/{protocol}",
"data_selector": "$",
},
}
for protocol in protocols
],
}
For the full config
spec, see the
dlt
documentation. Not all fields are covered here, but you can add more as needed.
The configuration has three main parts:
- A
client
object with the base URL and client-level settings - A
resource_defaults
object containing default settings for all resources - A list of
resources
, each describing a single endpoint or data source
2. Use the Factory Function
The
create_rest_factory_asset
factory function takes your configuration and returns a callable factory that
creates a Dagster asset with all the specified API resources.
from ..factories.rest import create_rest_factory_asset
# Create the factory with your config
dlt_assets = create_rest_factory_asset(config=mk_defillama_config())
# Create the asset
defillama_assets = dlt_assets(
key_prefix="defillama", # Asset key prefix (required)
name="tvl", # Asset name (required)
op_tags={
"dagster/concurrency_key": "defillama_tvl",
},
)
The factory function requires both:
- A
key_prefix
(string or list of strings) to define the asset path - A
name
to identify the asset
Under the hood, this creates a Dagster asset with all your
infrastructure-specific details configured. For the full reference of available
parameters, check the
dagster
documentation.
That's it! These few lines produce a Dagster asset that pulls data from all the specified DefiLlama endpoints. When you run your Dagster job, the data will be ingested into your OSO warehouse.
Advanced Features
The REST API Crawler supports several advanced features that make it powerful for complex data ingestion scenarios.
Resource Dependencies and Data Transformations
One of the most powerful features is the ability to create dependencies between resources and transform data during ingestion. Here's an advanced example:
from typing import Dict, Generator, List
import dlt
import requests
from dlt.sources.helpers.requests import Session
from dlt.sources.rest_api.typing import RESTAPIConfig
from ..factories.rest import create_rest_factory_asset
# Custom function to fetch protocols dynamically
@dlt.resource(name="protocols")
def fetch_defillama_protocols() -> Generator[List[Dict[str, str]], None, None]:
"""Fetch all valid defillama protocols."""
# Get protocols from an external source or database
try:
r = requests.get("https://api.llama.fi/protocols", timeout=5)
r.raise_for_status()
valid_protocols = [{"name": x["slug"]} for x in r.json() if x["slug"]]
yield valid_protocols
except requests.exceptions.RequestException:
# Fallback to hardcoded list if API fails
yield [{"name": slug} for slug in ["aave-v3", "uniswap", "velodrome"]]
# Data transformation function
def add_original_slug(record):
"""Add the original slug to the record for reference."""
record["slug"] = record.get("_protocols_name", "")
del record["_protocols_name"] # Remove the original field
return record
# Create a config with resource dependencies
def mk_defillama_config() -> RESTAPIConfig:
return {
"client": {
"base_url": "https://api.llama.fi/",
"session": Session(timeout=300),
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "replace",
"parallelized": True,
},
"resources": [
# This resource depends on the "protocols" resource
{
"name": "slugs",
"endpoint": {
"path": "protocol/{protocol}",
"data_selector": "$",
"params": {
"protocol": {
"type": "resolve", # Create a dependency
"resource": "protocols", # Reference another resource
"field": "name", # Use the 'name' field from protocols
},
},
},
"include_from_parent": ["name"], # Include parent fields
"processing_steps": [
{"map": add_original_slug}, # Transform each record
], # type: ignore
},
# The base resource that provides protocol names
fetch_defillama_protocols(),
],
}
# Create and register the asset
def build_defillama_assets():
dlt_assets = create_rest_factory_asset(config=mk_defillama_config())
assets = dlt_assets(
key_prefix="defillama",
name="tvl",
op_tags={
"dagster/concurrency_key": "defillama_tvl",
},
)
return assets
defillama_assets = build_defillama_assets()
How to Run and View Results
If you haven't set up your local Dagster environment yet, please follow our quickstart guide.
After having your Dagster instance running, follow the
Dagster Asset Guide to materialize the assets. Our example
asset is located under the key path defillama/tvl
.
Advanced Features
The REST API Crawler supports several advanced features:
Dynamic Data Sources
You can create a dynamic list of endpoints by using a custom resource function
like fetch_defillama_protocols()
in our example. This lets you:
- Pull from a database or external source to determine which endpoints to query
- Filter or transform the list of endpoints before querying them
- Implement custom logic for endpoint selection
Understanding Resource Dependencies
In the advanced example above, we use several important concepts:
-
Resource Dependencies: The
"type": "resolve"
parameter creates a dependency between resources. In this case, theslugs
resource depends on theprotocols
resource:"protocol": {
"type": "resolve", # Create a dependency
"resource": "protocols", # The resource to depend on
"field": "name", # The field to use from that resource
}This means that for each protocol name returned by the
protocols
resource, the factory will make a separate API call to fetch that protocol's data. -
Data Transformations: The
processing_steps
parameter allows you to transform data before it's loaded:"processing_steps": [
{"map": add_original_slug}, # Apply a function to each record
{"filter": lambda r: r.get("tvl") > 1000000}, # Filter records based on a condition
] -
Dynamic Resource Generation: The
fetch_defillama_protocols()
function is a custom resource that dynamically generates the list of protocols to fetch, rather than hardcoding them in the configuration.
Current Limitations
While the factory approach is flexible, there are some limitations:
- Single Asset: Unlike the previous version, this factory creates a single asset for all resources rather than one asset per resource. This speeds up greatly the asset creation process, but requires all resources to be materialized together.
Conclusion
With this REST API Crawler, you can connect OSO to virtually any API with minimal code. The approach eliminates repetitive tasks and maintains a consistent approach to data ingestion. When you need to modify your data sources, simply update your configuration object.
Does this factory not fit your needs? Check the GraphQL API Crawler as an alternative.