Code
import duckdb
import requests
from duckdb.typing import VARCHAR, INTEGER, DuckDBPyType
import jsonAugust 10, 2024
I’ve been using DuckDB as a replacement for pandas and Python for data processing tasks. It’s proven to be incredibly convenient.
A friend recently asked about my use of DuckDB in daily work, which inspired me to write a series of articles. This is the first in that series, focusing on how to use DuckDB for intial data processing.
Let’s consider a common scenario: analyzing the transaction information of a specific Ethereum (ETH) address.
Previously, this process involved several steps:
requestspandas to preview and clean the dataDuckDBThe data preparation stage typically involves three essential steps:
In this article, we’ll demonstrate how to streamline this process using the blockscout API1, DuckDB UDF2, and DuckDB Macro3.
We’ll explore two solutions for processing data using DuckDB, First let’s import related dependencies
This approach focuses on quickly retrieving and processing transaction data. It’s ideal for rapid analysis and verification during the development stage.
Get the transaction information of the ETH address through the blockscout API4, the code is as follows:
def blockscout_api(module: str, action: str, address: str, start_block: int, end_block: int, page: int, offset: int) -> list[str]:
url_prefix = f'https://eth.blockscout.com/api?module={module}&action={action}'
result = []
while True:
url = f'{url_prefix}&address={address}&startblock={start_block}&endblock={end_block}&page={page}&offset={offset}&sort=asc'
print(f'query page {page}')
data = requests.get(url).json()
if data['message'] == 'OK':
items = data['result']
result.extend(map(json.dumps,items))
else:
break
if len(items) < offset:
break
page += 1
return resultRegister the custom function of DuckDB
Define the macro of DuckDB, here for demonstration, limit the page and offset, and adjust according to the actual situation when actually using. Note the output query page 1 and query page 2 below
Query the transaction information of the ETH address
conn.execute("""
with raw_transactions as (
select unnest(data) as trx from blockscout_trxs('0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6', 20485198, 20490674)
), decode_transactions as (
select
trx->'$.blockHash' as block_hash,
(trx->'$.blockNumber')::integer as block_number,
(trx->'$.timeStamp')::integer as timestamp,
to_timestamp(timestamp) as datetime,
trx->'$.hash' as hash,
(trx->'$.transactionIndex')::integer as transaction_index,
trx->'$.from' as 'from',
trx->'$.to' as 'to',
trx->'$.value' as value,
trx->'$.contractAddress' as contract_address,
(trx->'$.gas')::integer as gas,
(trx->'$.gasPrice')::bigint as gas_price,
(trx->'$.gasUsed')::integer as gas_used,
trx->'$.isError' as is_error,
trx->'$.txreceipt_status' as txreceipt_status,
trx->'input' as 'input'
from raw_transactions
)
select
block_number,
datetime,
hash,
'from',
'to',
value,
from decode_transactions
""").df()query page 1
query page 2
| block_number | datetime | hash | 'from' | 'to' | value | |
|---|---|---|---|---|---|---|
| 0 | 20485198 | 2024-08-08 16:55:23+00:00 | "0x16e9d0643ce6bf9bc59d5e6c756a196af2941cefc46... | from | to | "500000000000000000" |
| 1 | 20488106 | 2024-08-09 02:38:47+00:00 | "0x3f29ab5ba5779df75aee038cb9d529ab7d7e94ff727... | from | to | "500000000000000000" |
| 2 | 20490674 | 2024-08-09 11:14:23+00:00 | "0xcba85af304112c712c978968ff19fb150cdfd18e1f4... | from | to | "200000000000000000" |
This solution is more robust, suitable for production environments. It addresses potential issues like API field changes and null values in the returned data.
Declare the required fields and types
Request the blockscout API5 and extract valid fields
field_keys = fields.keys()
def blockscout_api_with_fields(module: str, action: str, address: str, start_block: int, end_block: int, page: int, offset: int):
url_prefix = f'https://eth.blockscout.com/api?module={module}&action={action}'
result = []
while True:
url = f'{url_prefix}&address={address}&startblock={start_block}&endblock={end_block}&page={page}&offset={offset}&sort=asc'
print(f'query page {page} -> {url}')
resp = requests.get(url).json()
if resp['message'] == 'OK':
items = resp['result']
result.extend([{f: i[f] for f in field_keys} for i in items])
if len(items) < offset:
break
else:
break
page += 1
return resultRegister the custom function of DuckDB, note the adjustment of page and offset, only get 1 page of data, no pagination demonstration.
conn = duckdb.connect()
conn = conn.create_function(blockscout_api_with_fields.__name__, blockscout_api_with_fields, [VARCHAR, VARCHAR, VARCHAR, INTEGER, INTEGER, INTEGER, INTEGER], DuckDBPyType(list[fields]))
conn.execute("""
CREATE OR REPLACE MACRO blockscout_trxs_with_fields(address, start_block, end_block) as table
select blockscout_api_with_fields('account', 'txlist', address, start_block, end_block, 1, 5) as data
""")Query the transaction information of the ETH address
conn.execute("""
with raw_transactions as (
select unnest(data) as trx from blockscout_trxs_with_fields('0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6', 20485198, 20490674)
), flatten_transactions as (
select unnest(trx) from raw_transactions
)
select
blockNumber as block_number,
to_timestamp(timeStamp) as datetime,
hash,
'from',
'to',
value
from flatten_transactions
""").df()query page 1 -> https://eth.blockscout.com/api?module=account&action=txlist&address=0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6&startblock=20485198&endblock=20490674&page=1&offset=5&sort=asc
| block_number | datetime | hash | 'from' | 'to' | value | |
|---|---|---|---|---|---|---|
| 0 | 20485198 | 2024-08-08 16:55:23+00:00 | 0x16e9d0643ce6bf9bc59d5e6c756a196af2941cefc467... | from | to | 500000000000000000 |
| 1 | 20488106 | 2024-08-09 02:38:47+00:00 | 0x3f29ab5ba5779df75aee038cb9d529ab7d7e94ff7277... | from | to | 500000000000000000 |
| 2 | 20490674 | 2024-08-09 11:14:23+00:00 | 0xcba85af304112c712c978968ff19fb150cdfd18e1f48... | from | to | 200000000000000000 |
By leveraging UDF6 and Macro7 of DuckDB, we can significantly simplify the data processing workflow. This approach makes data analysis more efficient and results in cleaner, more maintainable code.
For day-to-day use, Solution 1 is recommended for quick analysis and verification. However, for production environments, Solution 2 is preferred due to its stricter field constraints, which help prevent issues during data processing.
In addition, you can also use with recursive8 to achieve pagination query, but this process is more complicated, more SQL writing, not recommended for use. If you are interested, I will write an article to share it later.