Code
import duckdb
import requests
from duckdb.typing import VARCHAR, INTEGER, DuckDBPyType
import json
August 10, 2024
I’ve been using DuckDB
as a replacement for pandas
and Python
for data processing tasks. It’s proven to be incredibly convenient.
A friend recently asked about my use of DuckDB
in daily work, which inspired me to write a series of articles. This is the first in that series, focusing on how to use DuckDB
for intial data processing.
Let’s consider a common scenario: analyzing the transaction information of a specific Ethereum (ETH
) address.
Previously, this process involved several steps:
requests
pandas
to preview and clean the dataDuckDB
The data preparation stage typically involves three essential steps:
In this article, we’ll demonstrate how to streamline this process using the blockscout
API1, DuckDB UDF
2, and DuckDB Macro
3.
We’ll explore two solutions for processing data using DuckDB
, First let’s import related dependencies
This approach focuses on quickly retrieving and processing transaction data. It’s ideal for rapid analysis and verification during the development stage.
Get the transaction information of the ETH
address through the blockscout
API4, the code is as follows:
def blockscout_api(module: str, action: str, address: str, start_block: int, end_block: int, page: int, offset: int) -> list[str]:
url_prefix = f'https://eth.blockscout.com/api?module={module}&action={action}'
result = []
while True:
url = f'{url_prefix}&address={address}&startblock={start_block}&endblock={end_block}&page={page}&offset={offset}&sort=asc'
print(f'query page {page}')
data = requests.get(url).json()
if data['message'] == 'OK':
items = data['result']
result.extend(map(json.dumps,items))
else:
break
if len(items) < offset:
break
page += 1
return result
Register the custom function of DuckDB
Define the macro of DuckDB
, here for demonstration, limit the page and offset, and adjust according to the actual situation when actually using. Note the output query page 1
and query page 2
below
Query the transaction information of the ETH
address
conn.execute("""
with raw_transactions as (
select unnest(data) as trx from blockscout_trxs('0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6', 20485198, 20490674)
), decode_transactions as (
select
trx->'$.blockHash' as block_hash,
(trx->'$.blockNumber')::integer as block_number,
(trx->'$.timeStamp')::integer as timestamp,
to_timestamp(timestamp) as datetime,
trx->'$.hash' as hash,
(trx->'$.transactionIndex')::integer as transaction_index,
trx->'$.from' as 'from',
trx->'$.to' as 'to',
trx->'$.value' as value,
trx->'$.contractAddress' as contract_address,
(trx->'$.gas')::integer as gas,
(trx->'$.gasPrice')::bigint as gas_price,
(trx->'$.gasUsed')::integer as gas_used,
trx->'$.isError' as is_error,
trx->'$.txreceipt_status' as txreceipt_status,
trx->'input' as 'input'
from raw_transactions
)
select
block_number,
datetime,
hash,
'from',
'to',
value,
from decode_transactions
""").df()
query page 1
query page 2
block_number | datetime | hash | 'from' | 'to' | value | |
---|---|---|---|---|---|---|
0 | 20485198 | 2024-08-08 16:55:23+00:00 | "0x16e9d0643ce6bf9bc59d5e6c756a196af2941cefc46... | from | to | "500000000000000000" |
1 | 20488106 | 2024-08-09 02:38:47+00:00 | "0x3f29ab5ba5779df75aee038cb9d529ab7d7e94ff727... | from | to | "500000000000000000" |
2 | 20490674 | 2024-08-09 11:14:23+00:00 | "0xcba85af304112c712c978968ff19fb150cdfd18e1f4... | from | to | "200000000000000000" |
This solution is more robust, suitable for production environments. It addresses potential issues like API field changes and null values in the returned data.
Declare the required fields and types
Request the blockscout
API5 and extract valid fields
field_keys = fields.keys()
def blockscout_api_with_fields(module: str, action: str, address: str, start_block: int, end_block: int, page: int, offset: int):
url_prefix = f'https://eth.blockscout.com/api?module={module}&action={action}'
result = []
while True:
url = f'{url_prefix}&address={address}&startblock={start_block}&endblock={end_block}&page={page}&offset={offset}&sort=asc'
print(f'query page {page} -> {url}')
resp = requests.get(url).json()
if resp['message'] == 'OK':
items = resp['result']
result.extend([{f: i[f] for f in field_keys} for i in items])
if len(items) < offset:
break
else:
break
page += 1
return result
Register the custom function of DuckDB
, note the adjustment of page
and offset
, only get 1 page of data, no pagination demonstration.
conn = duckdb.connect()
conn = conn.create_function(blockscout_api_with_fields.__name__, blockscout_api_with_fields, [VARCHAR, VARCHAR, VARCHAR, INTEGER, INTEGER, INTEGER, INTEGER], DuckDBPyType(list[fields]))
conn.execute("""
CREATE OR REPLACE MACRO blockscout_trxs_with_fields(address, start_block, end_block) as table
select blockscout_api_with_fields('account', 'txlist', address, start_block, end_block, 1, 5) as data
""")
Query the transaction information of the ETH
address
conn.execute("""
with raw_transactions as (
select unnest(data) as trx from blockscout_trxs_with_fields('0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6', 20485198, 20490674)
), flatten_transactions as (
select unnest(trx) from raw_transactions
)
select
blockNumber as block_number,
to_timestamp(timeStamp) as datetime,
hash,
'from',
'to',
value
from flatten_transactions
""").df()
query page 1 -> https://eth.blockscout.com/api?module=account&action=txlist&address=0x603602E9A2ac7f1E26717C2b2193Fd68f5fafFf6&startblock=20485198&endblock=20490674&page=1&offset=5&sort=asc
block_number | datetime | hash | 'from' | 'to' | value | |
---|---|---|---|---|---|---|
0 | 20485198 | 2024-08-08 16:55:23+00:00 | 0x16e9d0643ce6bf9bc59d5e6c756a196af2941cefc467... | from | to | 500000000000000000 |
1 | 20488106 | 2024-08-09 02:38:47+00:00 | 0x3f29ab5ba5779df75aee038cb9d529ab7d7e94ff7277... | from | to | 500000000000000000 |
2 | 20490674 | 2024-08-09 11:14:23+00:00 | 0xcba85af304112c712c978968ff19fb150cdfd18e1f48... | from | to | 200000000000000000 |
By leveraging UDF
6 and Macro
7 of DuckDB
, we can significantly simplify the data processing workflow. This approach makes data analysis more efficient and results in cleaner, more maintainable code.
For day-to-day use, Solution 1 is recommended for quick analysis and verification. However, for production environments, Solution 2 is preferred due to its stricter field constraints, which help prevent issues during data processing.
In addition, you can also use with recursive
8 to achieve pagination query, but this process is more complicated, more SQL
writing, not recommended for use. If you are interested, I will write an article to share it later.