Are you an LLM? You can read better optimized documentation at /docs/implementation/agent-metadata-parsing.md for this page in Markdown format
Description
Practical guide for parsing ERC-8004 agent metadata
Audience: Backend developers, indexer authors, explorer builders
Overview
This guide provides practical instructions for implementing a robust ERC-8004 agent metadata parser that handles real-world data patterns.
What You'll Learn:
- ✅ Parse 7 URI formats (IPFS, HTTP, Data URI variants)
- ✅ Handle malformed metadata gracefully
- ✅ Extract structured data (endpoints, OASF skills, wallets)
- ✅ Validate against ERC-8004 standard
- ✅ Generate helpful warnings for developers
Quick Start
Minimal Parser (5 Steps)
python
import json
import base64
from urllib.parse import unquote
def parse_metadata(uri: str) -> dict | None:
"""Minimal ERC-8004 metadata parser"""
# Step 1: Try Data URI (base64)
if uri.startswith("data:application/json;base64,"):
encoded = uri.replace("data:application/json;base64,", "")
# Handle edge case: claimed base64 but plain JSON
if encoded.startswith("{"):
return json.loads(encoded)
decoded = base64.b64decode(encoded).decode("utf-8")
return json.loads(decoded)
# Step 2: Try Data URI (plain)
if uri.startswith("data:application/json,"):
data = uri.replace("data:application/json,", "")
data = unquote(data) # Handle URL encoding
return json.loads(data)
# Step 3: Try plain JSON (fallback)
if uri.startswith("{"):
return json.loads(uri)
# Step 4: Fetch from IPFS/HTTP (not shown, use requests/httpx)
# ...
return None1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Usage:
python
metadata = parse_metadata(agent_uri)
if metadata:
name = metadata.get("name")
endpoints = metadata.get("endpoints", [])1
2
3
4
2
3
4
URI Format Parsers
1. Data URI (Base64)
Format: data:application/json;base64,<BASE64>
Implementation:
python
def parse_data_uri_base64(uri: str) -> tuple[dict | None, list[str]]:
"""Parse base64 data URI with error collection"""
warnings = []
if not uri.startswith("data:application/json;base64,"):
return None, warnings
encoded_data = uri.replace("data:application/json;base64,", "")
# Edge case: ChaosChain pattern (claimed base64 but plain JSON)
if encoded_data.startswith("{") or encoded_data.startswith("["):
warnings.append("base64_uri_with_plain_json")
return json.loads(encoded_data), warnings
try:
decoded_bytes = base64.b64decode(encoded_data)
decoded_str = decoded_bytes.decode("utf-8")
metadata = json.loads(decoded_str)
return metadata, warnings
except base64.binascii.Error:
warnings.append("invalid_base64_encoding")
return None, warnings
except UnicodeDecodeError:
warnings.append("base64_decode_utf8_error")
return None, warnings
except json.JSONDecodeError:
warnings.append("base64_invalid_json")
return None, warnings1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Real-World Example:
python
# Standard base64
uri1 = "data:application/json;base64,eyJ0eXBlIjoiaHR0cHM6Ly9laXBzLmV0aGVyZXVtLm9yZy9FSVBTL2VpcC04MDA0I3JlZ2lzdHJhdGlvbi12MSIsIm5hbWUiOiJBZ2VudCJ9"
metadata, warnings = parse_data_uri_base64(uri1)
# Returns: {"type": "https://...", "name": "Agent"}, []
# Edge case: claimed base64 but plain JSON
uri2 = 'data:application/json;base64,{"type":"https://...","name":"Agent"}'
metadata, warnings = parse_data_uri_base64(uri2)
# Returns: {"type": "https://...", "name": "Agent"}, ["base64_uri_with_plain_json"]1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
2. Data URI (Plain)
Format: data:application/json,<JSON>
Implementation:
python
def parse_data_uri_plain(uri: str) -> tuple[dict | None, list[str]]:
"""Parse plain data URI with URL decoding"""
warnings = []
if not uri.startswith("data:application/json,"):
return None, warnings
data = uri.replace("data:application/json,", "")
# Try URL decoding (some URIs are URL-encoded)
try:
decoded = unquote(data)
if decoded != data:
warnings.append("url_decoded_plain_uri")
data = decoded
except Exception:
pass
try:
metadata = json.loads(data)
return metadata, warnings
except json.JSONDecodeError:
warnings.append("plain_uri_invalid_json")
return None, warnings1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Real-World Example:
python
# Standard plain
uri1 = 'data:application/json,{"type":"https://...","name":"Agent"}'
metadata, warnings = parse_data_uri_plain(uri1)
# Returns: {"type": "https://...", "name": "Agent"}, []
# URL-encoded plain
uri2 = 'data:application/json,%7B%22type%22%3A%22https%3A%2F%2F...%22%7D'
metadata, warnings = parse_data_uri_plain(uri2)
# Returns: {"type": "https://..."}, ["url_decoded_plain_uri"]1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
3. IPFS URIs
Format: ipfs://CID or ipfs://CID/path
Implementation:
python
import httpx
IPFS_GATEWAYS = [
"https://ipfs.io/ipfs/{}",
"https://cloudflare-ipfs.com/ipfs/{}",
"https://gateway.pinata.cloud/ipfs/{}",
]
async def fetch_ipfs(uri: str, timeout: int = 5) -> tuple[dict | None, list[str]]:
"""Fetch IPFS content with gateway fallback"""
warnings = []
if not uri.startswith("ipfs://"):
return None, warnings
cid_with_path = uri.replace("ipfs://", "")
async with httpx.AsyncClient(timeout=timeout) as client:
for gateway_template in IPFS_GATEWAYS:
gateway_url = gateway_template.format(cid_with_path)
try:
response = await client.get(gateway_url)
if response.status_code == 200:
metadata = response.json()
return metadata, warnings
except Exception:
continue
warnings.append("ipfs_fetch_failed")
return None, warnings1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Real-World Example:
python
uri = "ipfs://bafkreiaqdaerh5dvqmtjievkfnwfft6psghkxezygncbuvhl2uwyu6scn4"
metadata, warnings = await fetch_ipfs(uri)
# Returns: {metadata dict}, [] or [], ["ipfs_fetch_failed"]1
2
3
2
3
4. HTTP/HTTPS URLs
Implementation:
python
async def fetch_http(uri: str, timeout: int = 5) -> tuple[dict | None, list[str]]:
"""Fetch HTTP(S) content"""
warnings = []
if not (uri.startswith("http://") or uri.startswith("https://")):
return None, warnings
if uri.startswith("http://"):
warnings.append("http_not_https")
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.get(uri)
if response.status_code == 200:
metadata = response.json()
return metadata, warnings
else:
warnings.append(f"http_status_{response.status_code}")
return None, warnings
except httpx.TimeoutException:
warnings.append("http_timeout")
return None, warnings
except Exception as e:
warnings.append(f"http_error_{type(e).__name__}")
return None, warnings1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Schema Validation
Required Fields Checker
python
REQUIRED_FIELDS = {"type", "name", "description", "image"}
def validate_required_fields(metadata: dict) -> list[str]:
"""Check for missing required fields"""
warnings = []
missing = REQUIRED_FIELDS - set(metadata.keys())
if missing:
for field in sorted(missing):
warnings.append(f"missing_required_{field}")
# Validate type field value
expected_type = "https://eips.ethereum.org/EIPS/eip-8004#registration-v1"
if metadata.get("type") != expected_type:
warnings.append("invalid_type_field")
return warnings1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Endpoint Validation
python
def validate_endpoints(metadata: dict) -> list[str]:
"""Validate endpoints array structure and contents"""
warnings = []
# Check for common typo
if "endpoint" in metadata and "endpoints" not in metadata:
warnings.append("typo_endpoint_singular")
return warnings
endpoints = metadata.get("endpoints")
if endpoints is None:
warnings.append("missing_endpoints")
return warnings
if not isinstance(endpoints, list):
warnings.append("endpoints_not_array")
return warnings
if len(endpoints) == 0:
warnings.append("empty_endpoints")
return warnings
# Validate each endpoint
for i, endpoint in enumerate(endpoints):
name = endpoint.get("name")
if name == "MCP":
warnings.extend(validate_mcp_endpoint(endpoint, i))
elif name == "A2A":
warnings.extend(validate_a2a_endpoint(endpoint, i))
elif name == "OASF":
warnings.extend(validate_oasf_endpoint(endpoint, i))
elif name == "agentWallet":
warnings.extend(validate_wallet_endpoint(endpoint, i))
return warnings1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
MCP Endpoint Validation
python
def validate_mcp_endpoint(endpoint: dict, index: int) -> list[str]:
"""Validate MCP endpoint structure"""
warnings = []
if "endpoint" not in endpoint:
warnings.append(f"mcp_missing_endpoint_at_{index}")
elif not endpoint["endpoint"]:
warnings.append(f"mcp_empty_endpoint_at_{index}")
if "version" not in endpoint:
warnings.append(f"mcp_missing_version_at_{index}")
elif endpoint["version"] != "2025-06-18":
warnings.append(f"mcp_nonstandard_version_at_{index}")
return warnings1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
A2A Endpoint Validation
python
def validate_a2a_endpoint(endpoint: dict, index: int) -> list[str]:
"""Validate A2A endpoint structure"""
warnings = []
if "endpoint" not in endpoint:
warnings.append(f"a2a_missing_endpoint_at_{index}")
elif not endpoint["endpoint"]:
warnings.append(f"a2a_empty_endpoint_at_{index}")
if "version" not in endpoint:
warnings.append(f"a2a_missing_version_at_{index}")
elif endpoint["version"] not in ["0.3.0", "0.30"]: # Allow common typo
warnings.append(f"a2a_nonstandard_version_at_{index}")
# Check for .well-known path
endpoint_url = endpoint.get("endpoint", "")
if endpoint_url and ".well-known/agent-card.json" not in endpoint_url:
warnings.append(f"a2a_missing_well_known_at_{index}")
return warnings1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CAIP Format Validation
python
import re
def validate_caip2(value: str) -> bool:
"""Validate CAIP-2 format: namespace:chainId:reference"""
pattern = r'^[a-z][-a-z0-9]{0,31}:[0-9]+:0x[a-fA-F0-9]{40}$'
return bool(re.match(pattern, value))
def validate_caip10(value: str) -> bool:
"""Validate CAIP-10 format: namespace:chainId:accountAddress"""
pattern = r'^[a-z][-a-z0-9]{0,31}:[0-9]+:0x[a-fA-F0-9]{40}$'
return bool(re.match(pattern, value))
def validate_wallet_endpoint(endpoint: dict, index: int) -> list[str]:
"""Validate agentWallet endpoint"""
warnings = []
wallet_address = endpoint.get("endpoint", "")
if not validate_caip10(wallet_address):
warnings.append(f"wallet_invalid_format_at_{index}")
return warnings1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Examples:
python
validate_caip2("eip155:1:0x8004a6090Cd10A7288092483047B097295Fb8847") # True
validate_caip2("eip155:0x8004...") # False (missing chainId)
validate_caip10("eip155:11155111:0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb7") # True1
2
3
4
2
3
4
Field Extraction
Extract Structured Data
python
def extract_agent_data(metadata: dict) -> dict:
"""Extract structured data for database storage"""
# Basic fields
data = {
"name": metadata.get("name"),
"description": metadata.get("description"),
"image_url": metadata.get("image"),
"active": metadata.get("active", False),
"x402_support": metadata.get("x402support", False),
"updated_at": metadata.get("updatedAt"),
}
# Extract endpoints
endpoints = metadata.get("endpoints", [])
data["mcp_server"] = extract_endpoint_value(endpoints, "MCP")
data["a2a_endpoint"] = extract_endpoint_value(endpoints, "A2A")
data["agent_wallet"] = extract_endpoint_value(endpoints, "agentWallet")
data["ens"] = extract_endpoint_value(endpoints, "ENS")
data["did"] = extract_endpoint_value(endpoints, "DID")
# Extract OASF data
oasf_endpoint = next((e for e in endpoints if e.get("name") == "OASF"), None)
if oasf_endpoint:
data["oasf_skills"] = oasf_endpoint.get("skills", [])
data["oasf_domains"] = oasf_endpoint.get("domains", [])
# Store full metadata
data["metadata_json"] = metadata
return data
def extract_endpoint_value(endpoints: list, name: str) -> str | None:
"""Extract endpoint URL by name"""
endpoint = next((e for e in endpoints if e.get("name") == name), None)
return endpoint.get("endpoint") if endpoint else None1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Usage:
python
metadata = parse_metadata(uri)
agent_data = extract_agent_data(metadata)
# Store in database
await db.execute(
"""
INSERT INTO agents (name, description, image_url, mcp_server, ...)
VALUES (:name, :description, :image_url, :mcp_server, ...)
""",
agent_data
)1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
Complete Parser Example
python
class AgentMetadataParser:
"""Complete ERC-8004 agent metadata parser"""
def __init__(self):
self.errors = []
self.warnings = []
async def parse(self, uri: str) -> dict | None:
"""Parse agent metadata from URI"""
self.errors = []
self.warnings = []
# Try data URI formats first (fastest)
metadata = self._try_data_uris(uri)
# Fetch from remote if needed
if metadata is None:
metadata = await self._try_remote_fetch(uri)
# Validate if parsed successfully
if metadata:
self._validate(metadata)
return metadata
def _try_data_uris(self, uri: str) -> dict | None:
"""Try all data URI formats"""
# Base64
metadata, warnings = parse_data_uri_base64(uri)
self.warnings.extend(warnings)
if metadata:
return metadata
# Plain
metadata, warnings = parse_data_uri_plain(uri)
self.warnings.extend(warnings)
if metadata:
return metadata
# Plain JSON fallback
if uri.startswith("{"):
try:
metadata = json.loads(uri)
self.warnings.append("plain_json_without_uri_scheme")
return metadata
except json.JSONDecodeError:
pass
return None
async def _try_remote_fetch(self, uri: str) -> dict | None:
"""Fetch from IPFS/HTTP/Arweave"""
# IPFS
if uri.startswith("ipfs://"):
metadata, warnings = await fetch_ipfs(uri)
self.warnings.extend(warnings)
return metadata
# HTTP(S)
if uri.startswith("http://") or uri.startswith("https://"):
metadata, warnings = await fetch_http(uri)
self.warnings.extend(warnings)
return metadata
# Arweave
if uri.startswith("ar://"):
# Similar to IPFS
pass
self.errors.append("unsupported_uri_format")
return None
def _validate(self, metadata: dict):
"""Validate metadata and collect warnings"""
self.warnings.extend(validate_required_fields(metadata))
self.warnings.extend(validate_endpoints(metadata))
# ... other validations
def get_status(self) -> str:
"""Get parse status"""
if self.errors:
return "error"
elif self.warnings:
return "warning"
else:
return "success"1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
Usage:
python
parser = AgentMetadataParser()
metadata = await parser.parse(agent_uri)
if metadata:
agent_data = extract_agent_data(metadata)
agent_data["parse_status"] = parser.get_status()
agent_data["parse_warnings"] = parser.warnings
# Store in database
else:
print(f"Parse failed: {parser.errors}")1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
Testing
Unit Tests
python
import pytest
@pytest.mark.asyncio
async def test_parse_base64_data_uri():
"""Test base64 data URI parsing"""
uri = "data:application/json;base64,eyJ0eXBlIjoiaHR0cHM6Ly9laXBzLmV0aGVyZXVtLm9yZy9FSVBTL2VpcC04MDA0I3JlZ2lzdHJhdGlvbi12MSIsIm5hbWUiOiJUZXN0In0="
parser = AgentMetadataParser()
metadata = await parser.parse(uri)
assert metadata is not None
assert metadata["type"] == "https://eips.ethereum.org/EIPS/eip-8004#registration-v1"
assert metadata["name"] == "Test"
assert parser.get_status() == "warning" # Missing description, image
@pytest.mark.asyncio
async def test_parse_chaoschain_edge_case():
"""Test ChaosChain pattern (claimed base64 but plain JSON)"""
uri = 'data:application/json;base64,{"type":"https://eips.ethereum.org/EIPS/eip-8004#registration-v1","name":"Bob"}'
parser = AgentMetadataParser()
metadata = await parser.parse(uri)
assert metadata is not None
assert metadata["name"] == "Bob"
assert "base64_uri_with_plain_json" in parser.warnings1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Performance Optimization
Caching
python
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def parse_data_uri_cached(uri: str) -> tuple[dict | None, tuple]:
"""Cache parsed data URIs (immutable)"""
metadata, warnings = parse_data_uri_base64(uri)
if metadata is None:
metadata, warnings = parse_data_uri_plain(uri)
return metadata, tuple(warnings)
async def fetch_ipfs_cached(uri: str) -> dict | None:
"""Cache IPFS fetches using Redis"""
cache_key = f"ipfs:{hashlib.sha256(uri.encode()).hexdigest()}"
# Try cache
cached = await redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch and cache
metadata, _ = await fetch_ipfs(uri)
if metadata:
await redis.setex(cache_key, 3600, json.dumps(metadata))
return metadata1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Common Issues
Issue 1: Empty Endpoints Array
Pattern:
json
{
"endpoints": []
}1
2
3
2
3
Fix: Warn but don't fail
python
if len(endpoints) == 0:
warnings.append("empty_endpoints")
# Agent not reachable, but metadata is valid1
2
3
2
3
Issue 2: Null agentId in Registrations
Pattern:
json
{
"registrations": [{ "agentId": null, "agentRegistry": "eip155:..." }]
}1
2
3
2
3
Context: Common during registration flow (agentId assigned after)
Fix: Warn but store
python
if registration.get("agentId") is None:
warnings.append("registration_null_agent_id")
# Still store, will be updated later1
2
3
2
3
Issue 3: Version Number Variations
Pattern:
json
{ "name": "A2A", "version": "0.30" } // Should be "0.3.0"1
Fix: Accept both, prefer standard
python
if endpoint["version"] in ["0.3.0", "0.30"]:
# Accept both
if endpoint["version"] == "0.30":
warnings.append("a2a_version_typo")1
2
3
4
2
3
4