Analyzing Blockchain Data with DuckDB: Data Preparation

duckdb
blockchain
analysis
发布于

2024年8月10日

最近使用 DuckDB 替换 pandas + python 做数据处理的部分越来越多, 用起来非常顺手.

有朋友想了解下我日常是如何使用 DuckDB 的, 考虑写成系列文章, 本篇是第一篇, 主要分享如何使用 DuckDB 做前期数据处理.

案例

用日常案例举例, 希望分析 ETH 某个地址交易信息.

演示

这个过程在以往, 首先通过 requests 获取数据, 接着用 pandas 预览数据和清洗数据, 最后才会使用 DuckDB 进行分析.

在准备数据阶段有 3 个必要步骤

  1. 三方接口的翻页处理
  2. 提取接口返回数据中的有效字段
  3. 清洗数据,空值和类型等处理

下面展示使用 blockscout API1, 通过 DuckDB UDF2DuckDB Macro3 简化这个过程.

导入相关依赖

代码
import duckdb
import requests
from duckdb.typing import VARCHAR, INTEGER, DuckDBPyType
import json

方案1

通过 blockscout API4 获取 ETH 地址的交易信息, 代码如下:

代码
def blockscout_api(module: str, action: str, address: str, start_block: int, end_block: int, page: int, offset: int) -> list[str]:
    url_prefix = f'https://eth.blockscout.com/api?module={module}&action={action}'
    
    result = []
    while True:
        url = f'{url_prefix}&address={address}&startblock={start_block}&endblock={end_block}&page={page}&offset={offset}&sort=asc'
        print(f'query page {page}')
        data = requests.get(url).json()
        if data['message'] == 'OK':
            items = data['result']
            result.extend(map(json.dumps,items))
        else:
            break
        if len(items) < offset:
            break
        page += 1
    return result

注册 DuckDB 的自定义函数

conn = duckdb.connect()
conn = conn.create_function('blockscout_api', blockscout_api)

定义 DuckDB 的宏, 这里为了演示, 限制 page 和 offset, 实际使用时会根据实际情况调整. 注意下面的输出 query page 1query page 2

conn.execute("""
CREATE OR REPLACE MACRO blockscout_trxs(address, start_block, end_block) as table 
    select blockscout_api('account', 'txlist', address, start_block, end_block, 1, 2) as data
""")

查询 ETH 地址的交易信息

conn.execute("""
with raw_transactions as (
    select unnest(data) as trx from blockscout_trxs('0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6', 20485198, 20490674)
), decode_transactions as (
select 
    trx->'$.blockHash' as block_hash,
    (trx->'$.blockNumber')::integer as block_number,
    (trx->'$.timeStamp')::integer as timestamp,
    to_timestamp(timestamp) as datetime,
    trx->'$.hash' as hash,
    (trx->'$.transactionIndex')::integer as transaction_index,
    trx->'$.from' as 'from',
    trx->'$.to' as 'to',
    trx->'$.value' as value,
    trx->'$.contractAddress' as contract_address,
    (trx->'$.gas')::integer as gas,
    (trx->'$.gasPrice')::bigint as gas_price,
    (trx->'$.gasUsed')::integer as gas_used,
    trx->'$.isError' as is_error,
    trx->'$.txreceipt_status' as txreceipt_status,
    trx->'input' as 'input'
from raw_transactions
)
select 
  block_number,
  datetime,
  hash,
  'from',
  'to',
  value,
from decode_transactions
""").df()
query page 1
query page 2
block_number datetime hash 'from' 'to' value
0 20485198 2024-08-08 16:55:23+00:00 "0x16e9d0643ce6bf9bc59d5e6c756a196af2941cefc46... from to "500000000000000000"
1 20488106 2024-08-09 02:38:47+00:00 "0x3f29ab5ba5779df75aee038cb9d529ab7d7e94ff727... from to "500000000000000000"
2 20490674 2024-08-09 11:14:23+00:00 "0xcba85af304112c712c978968ff19fb150cdfd18e1f4... from to "200000000000000000"

方案2

实际使用时候, 三方 API 字段可能会调整, 返回的数据会存在空值情况, 因此必须要严格字段约束, 这里也演示下.

申明需要的字段与类型

fields = {
    'blockHash': str,
    'blockNumber': int,
    'timeStamp': int,
    'hash': str,
    'transactionIndex': int,
    'from': str,
    'to': str,
    'value': str,
    'contractAddress': str,
    'gas': int,
    'gasPrice': int,
    'gasUsed': int,
    'isError': int,
    'txreceipt_status': int,
    'input': str,
}

请求 blockscout API5 并提取有效字段

代码
field_keys = fields.keys()

def blockscout_api_with_fields(module: str, action: str, address: str, start_block: int, end_block: int, page: int, offset: int):
    url_prefix = f'https://eth.blockscout.com/api?module={module}&action={action}'
    result = []
    while True:
        url = f'{url_prefix}&address={address}&startblock={start_block}&endblock={end_block}&page={page}&offset={offset}&sort=asc'
        print(f'query page {page} -> {url}')
        resp = requests.get(url).json()
        if resp['message'] == 'OK':
            items = resp['result']
            result.extend([{f: i[f] for f in field_keys} for i in items])
            if len(items) < offset:
                break
        else:
            break
        page += 1
    return result

注册 DuckDB 的自定义函数, 注意 pageoffset 调整, 只获取 1 页面数据, 不做翻页演示.

代码
conn = duckdb.connect()
conn = conn.create_function(blockscout_api_with_fields.__name__, blockscout_api_with_fields, [VARCHAR, VARCHAR, VARCHAR, INTEGER, INTEGER, INTEGER, INTEGER], DuckDBPyType(list[fields]))
conn.execute("""
CREATE OR REPLACE MACRO blockscout_trxs_with_fields(address, start_block, end_block) as table 
    select blockscout_api_with_fields('account', 'txlist', address, start_block, end_block, 1, 5) as data
""")

查询 ETH 地址的交易信息

conn.execute("""
with raw_transactions as (
    select unnest(data) as trx from blockscout_trxs_with_fields('0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6', 20485198, 20490674)
), flatten_transactions as (
  select unnest(trx) from raw_transactions
)
select 
  blockNumber as block_number,
  to_timestamp(timeStamp) as datetime,
  hash,
  'from',
  'to',
  value
from flatten_transactions
""").df()
query page 1 -> https://eth.blockscout.com/api?module=account&action=txlist&address=0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6&startblock=20485198&endblock=20490674&page=1&offset=5&sort=asc
block_number datetime hash 'from' 'to' value
0 20485198 2024-08-08 16:55:23+00:00 0x16e9d0643ce6bf9bc59d5e6c756a196af2941cefc467... from to 500000000000000000
1 20488106 2024-08-09 02:38:47+00:00 0x3f29ab5ba5779df75aee038cb9d529ab7d7e94ff7277... from to 500000000000000000
2 20490674 2024-08-09 11:14:23+00:00 0xcba85af304112c712c978968ff19fb150cdfd18e1f48... from to 200000000000000000

总结

通过 DuckDBUDF6Macro7 可以简化数据处理的过程, 使得数据处理更加高效, 代码更加简洁. 日常使用, 分析阶段推荐大家使用方案 1 快速验证, 发布生产环境推荐大家使用方案2, 严格字段约束, 避免数据处理过程中出现问题.

另外, 也可以通过 with recursive8 实现翻页查询, 但是这个过程比较复杂, SQL 写的更多, 不推荐使用, 如果大家感兴趣后续有时间会写篇文章分享.

references