代码
import duckdb
import requests
from duckdb.typing import VARCHAR, INTEGER, DuckDBPyType
import json
2024年8月10日
最近使用 DuckDB
替换 pandas
+ python
做数据处理的部分越来越多, 用起来非常顺手.
有朋友想了解下我日常是如何使用 DuckDB 的, 考虑写成系列文章, 本篇是第一篇, 主要分享如何使用 DuckDB
做前期数据处理.
用日常案例举例, 希望分析 ETH
某个地址交易信息.
这个过程在以往, 首先通过 requests
获取数据, 接着用 pandas
预览数据和清洗数据, 最后才会使用 DuckDB
进行分析.
在准备数据阶段有 3 个必要步骤
下面展示使用 blockscout
API1, 通过 DuckDB UDF
2 和 DuckDB Macro
3 简化这个过程.
导入相关依赖
通过 blockscout
API4 获取 ETH
地址的交易信息, 代码如下:
def blockscout_api(module: str, action: str, address: str, start_block: int, end_block: int, page: int, offset: int) -> list[str]:
url_prefix = f'https://eth.blockscout.com/api?module={module}&action={action}'
result = []
while True:
url = f'{url_prefix}&address={address}&startblock={start_block}&endblock={end_block}&page={page}&offset={offset}&sort=asc'
print(f'query page {page}')
data = requests.get(url).json()
if data['message'] == 'OK':
items = data['result']
result.extend(map(json.dumps,items))
else:
break
if len(items) < offset:
break
page += 1
return result
注册 DuckDB
的自定义函数
定义 DuckDB
的宏, 这里为了演示, 限制 page 和 offset, 实际使用时会根据实际情况调整. 注意下面的输出 query page 1
和 query page 2
查询 ETH
地址的交易信息
conn.execute("""
with raw_transactions as (
select unnest(data) as trx from blockscout_trxs('0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6', 20485198, 20490674)
), decode_transactions as (
select
trx->'$.blockHash' as block_hash,
(trx->'$.blockNumber')::integer as block_number,
(trx->'$.timeStamp')::integer as timestamp,
to_timestamp(timestamp) as datetime,
trx->'$.hash' as hash,
(trx->'$.transactionIndex')::integer as transaction_index,
trx->'$.from' as 'from',
trx->'$.to' as 'to',
trx->'$.value' as value,
trx->'$.contractAddress' as contract_address,
(trx->'$.gas')::integer as gas,
(trx->'$.gasPrice')::bigint as gas_price,
(trx->'$.gasUsed')::integer as gas_used,
trx->'$.isError' as is_error,
trx->'$.txreceipt_status' as txreceipt_status,
trx->'input' as 'input'
from raw_transactions
)
select
block_number,
datetime,
hash,
'from',
'to',
value,
from decode_transactions
""").df()
query page 1
query page 2
block_number | datetime | hash | 'from' | 'to' | value | |
---|---|---|---|---|---|---|
0 | 20485198 | 2024-08-08 16:55:23+00:00 | "0x16e9d0643ce6bf9bc59d5e6c756a196af2941cefc46... | from | to | "500000000000000000" |
1 | 20488106 | 2024-08-09 02:38:47+00:00 | "0x3f29ab5ba5779df75aee038cb9d529ab7d7e94ff727... | from | to | "500000000000000000" |
2 | 20490674 | 2024-08-09 11:14:23+00:00 | "0xcba85af304112c712c978968ff19fb150cdfd18e1f4... | from | to | "200000000000000000" |
实际使用时候, 三方 API 字段可能会调整, 返回的数据会存在空值情况, 因此必须要严格字段约束, 这里也演示下.
申明需要的字段与类型
请求 blockscout
API5 并提取有效字段
field_keys = fields.keys()
def blockscout_api_with_fields(module: str, action: str, address: str, start_block: int, end_block: int, page: int, offset: int):
url_prefix = f'https://eth.blockscout.com/api?module={module}&action={action}'
result = []
while True:
url = f'{url_prefix}&address={address}&startblock={start_block}&endblock={end_block}&page={page}&offset={offset}&sort=asc'
print(f'query page {page} -> {url}')
resp = requests.get(url).json()
if resp['message'] == 'OK':
items = resp['result']
result.extend([{f: i[f] for f in field_keys} for i in items])
if len(items) < offset:
break
else:
break
page += 1
return result
注册 DuckDB
的自定义函数, 注意 page
和 offset
调整, 只获取 1 页面数据, 不做翻页演示.
conn = duckdb.connect()
conn = conn.create_function(blockscout_api_with_fields.__name__, blockscout_api_with_fields, [VARCHAR, VARCHAR, VARCHAR, INTEGER, INTEGER, INTEGER, INTEGER], DuckDBPyType(list[fields]))
conn.execute("""
CREATE OR REPLACE MACRO blockscout_trxs_with_fields(address, start_block, end_block) as table
select blockscout_api_with_fields('account', 'txlist', address, start_block, end_block, 1, 5) as data
""")
查询 ETH
地址的交易信息
conn.execute("""
with raw_transactions as (
select unnest(data) as trx from blockscout_trxs_with_fields('0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6', 20485198, 20490674)
), flatten_transactions as (
select unnest(trx) from raw_transactions
)
select
blockNumber as block_number,
to_timestamp(timeStamp) as datetime,
hash,
'from',
'to',
value
from flatten_transactions
""").df()
query page 1 -> https://eth.blockscout.com/api?module=account&action=txlist&address=0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6&startblock=20485198&endblock=20490674&page=1&offset=5&sort=asc
block_number | datetime | hash | 'from' | 'to' | value | |
---|---|---|---|---|---|---|
0 | 20485198 | 2024-08-08 16:55:23+00:00 | 0x16e9d0643ce6bf9bc59d5e6c756a196af2941cefc467... | from | to | 500000000000000000 |
1 | 20488106 | 2024-08-09 02:38:47+00:00 | 0x3f29ab5ba5779df75aee038cb9d529ab7d7e94ff7277... | from | to | 500000000000000000 |
2 | 20490674 | 2024-08-09 11:14:23+00:00 | 0xcba85af304112c712c978968ff19fb150cdfd18e1f48... | from | to | 200000000000000000 |
通过 DuckDB
的 UDF
6 和 Macro
7 可以简化数据处理的过程, 使得数据处理更加高效, 代码更加简洁. 日常使用, 分析阶段推荐大家使用方案 1 快速验证, 发布生产环境推荐大家使用方案2, 严格字段约束, 避免数据处理过程中出现问题.
另外, 也可以通过 with recursive
8 实现翻页查询, 但是这个过程比较复杂, SQL
写的更多, 不推荐使用, 如果大家感兴趣后续有时间会写篇文章分享.