Analyzing Blockchain Data with DuckDB: Data Preparation

duckdb
blockchain
analysis
Published

August 10, 2024

I’ve been using DuckDB as a replacement for pandas and Python for data processing tasks. It’s proven to be incredibly convenient.

A friend recently asked about my use of DuckDB in daily work, which inspired me to write a series of articles. This is the first in that series, focusing on how to use DuckDB for intial data processing.

Use Case: Analyzing ETH Address Transactions

Let’s consider a common scenario: analyzing the transaction information of a specific Ethereum (ETH) address.

The Traditional Process vs DuckDB

Previously, this process involved several steps:

  1. Fetching data using requests
  2. Using pandas to preview and clean the data
  3. Finally, analyzing the data using DuckDB

The data preparation stage typically involves three essential steps:

  1. Handing pagination for third-party APIs
  2. Extracting valid fields from the returned data
  3. Cleaning the data, including null values and ata types

In this article, we’ll demonstrate how to streamline this process using the blockscout API1, DuckDB UDF2, and DuckDB Macro3.

Demonstration

We’ll explore two solutions for processing data using DuckDB, First let’s import related dependencies

Code
import duckdb
import requests
from duckdb.typing import VARCHAR, INTEGER, DuckDBPyType
import json

Solution 1: Explorer Implementation

This approach focuses on quickly retrieving and processing transaction data. It’s ideal for rapid analysis and verification during the development stage.

Get the transaction information of the ETH address through the blockscout API4, the code is as follows:

Code
def blockscout_api(module: str, action: str, address: str, start_block: int, end_block: int, page: int, offset: int) -> list[str]:
    url_prefix = f'https://eth.blockscout.com/api?module={module}&action={action}'
    
    result = []
    while True:
        url = f'{url_prefix}&address={address}&startblock={start_block}&endblock={end_block}&page={page}&offset={offset}&sort=asc'
        print(f'query page {page}')
        data = requests.get(url).json()
        if data['message'] == 'OK':
            items = data['result']
            result.extend(map(json.dumps,items))
        else:
            break
        if len(items) < offset:
            break
        page += 1
    return result

Register the custom function of DuckDB

conn = duckdb.connect()
conn = conn.create_function('blockscout_api', blockscout_api)

Define the macro of DuckDB, here for demonstration, limit the page and offset, and adjust according to the actual situation when actually using. Note the output query page 1 and query page 2 below

conn.execute("""
CREATE OR REPLACE MACRO blockscout_trxs(address, start_block, end_block) as table 
    select blockscout_api('account', 'txlist', address, start_block, end_block, 1, 2) as data
""")

Query the transaction information of the ETH address

conn.execute("""
with raw_transactions as (
    select unnest(data) as trx from blockscout_trxs('0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6', 20485198, 20490674)
), decode_transactions as (
select 
    trx->'$.blockHash' as block_hash,
    (trx->'$.blockNumber')::integer as block_number,
    (trx->'$.timeStamp')::integer as timestamp,
    to_timestamp(timestamp) as datetime,
    trx->'$.hash' as hash,
    (trx->'$.transactionIndex')::integer as transaction_index,
    trx->'$.from' as 'from',
    trx->'$.to' as 'to',
    trx->'$.value' as value,
    trx->'$.contractAddress' as contract_address,
    (trx->'$.gas')::integer as gas,
    (trx->'$.gasPrice')::bigint as gas_price,
    (trx->'$.gasUsed')::integer as gas_used,
    trx->'$.isError' as is_error,
    trx->'$.txreceipt_status' as txreceipt_status,
    trx->'input' as 'input'
from raw_transactions
)
select 
  block_number,
  datetime,
  hash,
  'from',
  'to',
  value,
from decode_transactions
""").df()
query page 1
query page 2
block_number datetime hash 'from' 'to' value
0 20485198 2024-08-08 16:55:23+00:00 "0x16e9d0643ce6bf9bc59d5e6c756a196af2941cefc46... from to "500000000000000000"
1 20488106 2024-08-09 02:38:47+00:00 "0x3f29ab5ba5779df75aee038cb9d529ab7d7e94ff727... from to "500000000000000000"
2 20490674 2024-08-09 11:14:23+00:00 "0xcba85af304112c712c978968ff19fb150cdfd18e1f4... from to "200000000000000000"

Solution 2: Advanced Implementation with Field constraints

This solution is more robust, suitable for production environments. It addresses potential issues like API field changes and null values in the returned data.

Declare the required fields and types

fields = {
    'blockHash': str,
    'blockNumber': int,
    'timeStamp': int,
    'hash': str,
    'transactionIndex': int,
    'from': str,
    'to': str,
    'value': str,
    'contractAddress': str,
    'gas': int,
    'gasPrice': int,
    'gasUsed': int,
    'isError': int,
    'txreceipt_status': int,
    'input': str,
}

Request the blockscout API5 and extract valid fields

Code
field_keys = fields.keys()

def blockscout_api_with_fields(module: str, action: str, address: str, start_block: int, end_block: int, page: int, offset: int):
    url_prefix = f'https://eth.blockscout.com/api?module={module}&action={action}'
    result = []
    while True:
        url = f'{url_prefix}&address={address}&startblock={start_block}&endblock={end_block}&page={page}&offset={offset}&sort=asc'
        print(f'query page {page} -> {url}')
        resp = requests.get(url).json()
        if resp['message'] == 'OK':
            items = resp['result']
            result.extend([{f: i[f] for f in field_keys} for i in items])
            if len(items) < offset:
                break
        else:
            break
        page += 1
    return result

Register the custom function of DuckDB, note the adjustment of page and offset, only get 1 page of data, no pagination demonstration.

Code
conn = duckdb.connect()
conn = conn.create_function(blockscout_api_with_fields.__name__, blockscout_api_with_fields, [VARCHAR, VARCHAR, VARCHAR, INTEGER, INTEGER, INTEGER, INTEGER], DuckDBPyType(list[fields]))
conn.execute("""
CREATE OR REPLACE MACRO blockscout_trxs_with_fields(address, start_block, end_block) as table 
    select blockscout_api_with_fields('account', 'txlist', address, start_block, end_block, 1, 5) as data
""")

Query the transaction information of the ETH address

conn.execute("""
with raw_transactions as (
    select unnest(data) as trx from blockscout_trxs_with_fields('0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6', 20485198, 20490674)
), flatten_transactions as (
  select unnest(trx) from raw_transactions
)
select 
  blockNumber as block_number,
  to_timestamp(timeStamp) as datetime,
  hash,
  'from',
  'to',
  value
from flatten_transactions
""").df()
query page 1 -> https://eth.blockscout.com/api?module=account&action=txlist&address=0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6&startblock=20485198&endblock=20490674&page=1&offset=5&sort=asc
block_number datetime hash 'from' 'to' value
0 20485198 2024-08-08 16:55:23+00:00 0x16e9d0643ce6bf9bc59d5e6c756a196af2941cefc467... from to 500000000000000000
1 20488106 2024-08-09 02:38:47+00:00 0x3f29ab5ba5779df75aee038cb9d529ab7d7e94ff7277... from to 500000000000000000
2 20490674 2024-08-09 11:14:23+00:00 0xcba85af304112c712c978968ff19fb150cdfd18e1f48... from to 200000000000000000

Summary

By leveraging UDF6 and Macro7 of DuckDB, we can significantly simplify the data processing workflow. This approach makes data analysis more efficient and results in cleaner, more maintainable code.

For day-to-day use, Solution 1 is recommended for quick analysis and verification. However, for production environments, Solution 2 is preferred due to its stricter field constraints, which help prevent issues during data processing.

In addition, you can also use with recursive8 to achieve pagination query, but this process is more complicated, more SQL writing, not recommended for use. If you are interested, I will write an article to share it later.

references