find missing dates with DuckDB

code
duckdb
发布于

2024年7月26日

需求背景

近期业务反馈存在数据缺失的情况, 需要定位出缺失的日期, 以便补充历史数据.

经过调研决定使用 Gantt 图展示所有任务的每日执行.

DuckDB 登场

考虑到任务数据已经存在 Postgres 中, 而且需要可视化, DuckDBPostgres 的插件可以直接获取数据 因此选择使用 DuckDB 配合 Jupyter Notebook 可以快速验证逻辑.

实际操作

DuckDB 操作 Postgres 的部分可以参考 PostgreSQL Extension

省去掉不想关的代码, 代码逻辑分成几个步骤

1. 获取数据

这里使用 mock 数据

代码
import duckdb

conn = duckdb.connect()

mock_data_sql = """
SELECT * FROM (
    VALUES 
      (1, DATE '2024-07-01'),
      (1, DATE '2024-07-02'),
      (1, DATE '2024-07-03'),
      (1, DATE '2024-07-05'),
      (1, DATE '2024-07-06'),
      (2, DATE '2024-07-01'),
      (2, DATE '2024-07-02'),
      (2, DATE '2024-07-03'),
      (3, DATE '2024-07-01'),
      (3, DATE '2024-07-02'),
      (3, DATE '2024-07-03'),
      (3, DATE '2024-07-04'),
      (3, DATE '2024-07-05'),
      (3, DATE '2024-07-06'),
      (3, DATE '2024-07-07'),
      (3, DATE '2024-07-08'),
      (3, DATE '2024-07-09'),
      (4, DATE '2024-07-05'),
      (4, DATE '2024-07-06'),
      (4, DATE '2024-07-07'),
      (4, DATE '2024-07-08'),
      (4, DATE '2024-07-09'),
  ) AS t(source_id, end_date)
"""
df = conn.execute(mock_data_sql).df()
df
source_id end_date
0 1 2024-07-01
1 1 2024-07-02
2 1 2024-07-03
3 1 2024-07-05
4 1 2024-07-06
5 2 2024-07-01
6 2 2024-07-02
7 2 2024-07-03
8 3 2024-07-01
9 3 2024-07-02
10 3 2024-07-03
11 3 2024-07-04
12 3 2024-07-05
13 3 2024-07-06
14 3 2024-07-07
15 3 2024-07-08
16 3 2024-07-09
17 4 2024-07-05
18 4 2024-07-06
19 4 2024-07-07
20 4 2024-07-08
21 4 2024-07-09

2. 对时间进行分组

寻找缺失的日期, 主要难点在于按照时间连续性进行分组, 连续的时间放在同一个分组中, 这样如果一个 source_id 有多个时间段, 那么表示存在时间空缺.

使用窗口函数, 对当前行的时间进行处理, 根据时间差, 获得当前行的分组时间. 代码如下:

group_date_sql = """
 SELECT 
    source_id,
    end_date,
    end_date - INTERVAL (ROW_NUMBER() OVER (PARTITION BY source_id ORDER BY end_date) - 1) DAY AS group_date
  FROM df
  order by source_id, end_date
"""
grouped_date_df = conn.execute(group_date_sql).df()
grouped_date_df
source_id end_date group_date
0 1 2024-07-01 2024-07-01
1 1 2024-07-02 2024-07-01
2 1 2024-07-03 2024-07-01
3 1 2024-07-05 2024-07-02
4 1 2024-07-06 2024-07-02
5 2 2024-07-01 2024-07-01
6 2 2024-07-02 2024-07-01
7 2 2024-07-03 2024-07-01
8 3 2024-07-01 2024-07-01
9 3 2024-07-02 2024-07-01
10 3 2024-07-03 2024-07-01
11 3 2024-07-04 2024-07-01
12 3 2024-07-05 2024-07-01
13 3 2024-07-06 2024-07-01
14 3 2024-07-07 2024-07-01
15 3 2024-07-08 2024-07-01
16 3 2024-07-09 2024-07-01
17 4 2024-07-05 2024-07-05
18 4 2024-07-06 2024-07-05
19 4 2024-07-07 2024-07-05
20 4 2024-07-08 2024-07-05
21 4 2024-07-09 2024-07-05

3. 根据 source_id 和分组时间分组

group_sql = """
SELECT 
    source_id,
    MIN(end_date) AS start_date,
    MAX(end_date) AS end_date,
  FROM grouped_date_df
  GROUP BY source_id, group_date
  ORDER BY source_id, group_date
"""
grouped_df = conn.execute(group_sql).df()
grouped_df
source_id start_date end_date
0 1 2024-07-01 2024-07-03
1 1 2024-07-05 2024-07-06
2 2 2024-07-01 2024-07-03
3 3 2024-07-01 2024-07-09
4 4 2024-07-05 2024-07-09

4. 可视化

代码
import plotly.express as px
fig = px.timeline(grouped_df, x_start='start_date', x_end='end_date', y='source_id')
fig.update_yaxes(autorange="reversed")
fig.show()

5. 完整代码

sql = """
with raw_data as (
  SELECT * FROM (
    VALUES 
      (1, DATE '2024-07-01'),
      (1, DATE '2024-07-02'),
      (1, DATE '2024-07-03'),
      (1, DATE '2024-07-05'),
      (1, DATE '2024-07-06'),
      (2, DATE '2024-07-01'),
      (2, DATE '2024-07-02'),
      (2, DATE '2024-07-03'),
      (3, DATE '2024-07-01'),
      (3, DATE '2024-07-02'),
      (3, DATE '2024-07-03'),
      (3, DATE '2024-07-04'),
      (3, DATE '2024-07-05'),
      (3, DATE '2024-07-06'),
      (3, DATE '2024-07-07'),
      (3, DATE '2024-07-08'),
      (3, DATE '2024-07-09'),
      (4, DATE '2024-07-05'),
      (4, DATE '2024-07-06'),
      (4, DATE '2024-07-07'),
      (4, DATE '2024-07-08'),
      (4, DATE '2024-07-09'),
  ) AS t(source_id, end_date)
), group_date as (
  SELECT 
    source_id,
    end_date,
    end_date - INTERVAL (ROW_NUMBER() OVER (PARTITION BY source_id ORDER BY end_date) - 1) DAY AS group_date
  FROM raw_data
  order by source_id, end_date
), final as (
  SELECT 
    source_id,
    MIN(end_date) AS start_date,
    MAX(end_date) AS end_date,
  FROM grouped_date_df
  GROUP BY source_id, group_date
  ORDER BY source_id, group_date
)
from final
"""
date_df = conn.execute(sql).df()
gap_fig = px.timeline(date_df, x_start='start_date', x_end='end_date', y='source_id')
gap_fig.update_yaxes(autorange="reversed")
gap_fig.show()

总结

本文主要介绍如何使用 DuckDB 查找缺失的日期, 通过窗口函数和分组函数, 可以快速定位出缺失的日期.

PS: 最近工作中使用 DuckDB 的频率越来越高, 究其原因是 DuckDB 轻量, 速度快, 配合 Jupyter Notebook 效率出奇的高.

二次使用