数仓分层设计架构:ODS-DWD-DWS-ADS 与实时 / T+1 双链路落地实践
1. 背景
数仓分层的核心价值,是把数据从“原始采集”逐步加工成“可复用、可分析、可服务业务”的数据资产。
经典数仓分层一般包括:
| 层级 | 名称 | 核心职责 |
|---|---|---|
| ODS | Operational Data Store,原始数据层 | 接入并保留原始业务数据 |
| DWD | Data Warehouse Detail,明细数据层 | 清洗、规范化、事实明细建模 |
| DWS | Data Warehouse Summary,汇总数据层 | 面向主题沉淀公共指标 |
| ADS | Application Data Service,应用数据层 | 面向报表、接口、看板提供数据服务 |
在实际生产中,数仓通常不会只有一条链路,而是同时存在两类数据链路:
- 实时数据链路:本文重点以用户下单风控为例,面向实时特征计算、风控 API 查询和在线决策。
- T+1 批量链路:面向日报、经营分析、财务对账、历史回补等场景。
本文结合 RisingWave、Flink CDC、StarRocks、MySQL、Elasticsearch、DolphinScheduler,介绍一套更贴近实战的数仓分层方案。
2. 总体架构
┌─────────────────────────┐
│ MySQL / PostgreSQL / MQ │
└────────────┬────────────┘
│
Flink CDC
│
┌───────────────────┴───────────────────┐
│ │
v v
实时风控链路 T+1 批量链路
RisingWave StarRocks ODS
│ │
v DolphinScheduler
MySQL 风控特征库 │
│ v
v 批量 SQL / 脚本
通用风控 API │
│ v
v StarRocks DWD/DWS/ADS
风控系统 / 规则引擎
可以简单理解为:
- 实时链路解决“下单当下风险如何判断”。
- T+1 链路解决“昨天最终结果是什么”。
- RisingWave 负责实时 SQL 建模和实时物化视图。
- MySQL 负责承接风控 API 点查所需的实时特征快照。
- StarRocks 负责高性能 OLAP 查询、风控复盘和 T+1 应用层数据服务。
- DolphinScheduler 负责任务编排、依赖调度、失败重试和补数。
3. ODS 层:原始数据层
3.1 职责
ODS 层负责接入业务系统的原始数据,尽量保留业务库原貌。
ODS 层主要职责:
- 接入业务库 CDC 数据、日志数据、消息数据。
- 保留原始字段和业务主键。
- 保留 CDC 变更类型,例如 insert、update、delete。
- 增加采集时间、同步时间、数据来源等技术字段。
- 支持数据回放、问题追溯和历史重算。
3.2 示例
订单原始表进入 ODS 后,可以保留如下结构:
CREATE TABLE ods_trade_order (
order_id BIGINT,
user_id BIGINT,
product_id BIGINT,
order_amount DECIMAL(18,2),
order_status VARCHAR(32),
created_at DATETIME,
updated_at DATETIME,
op_type VARCHAR(16),
sync_time DATETIME
);
3.3 最佳实践
ODS 层不建议写复杂业务逻辑,只做必要的标准化处理:
- 统一字段类型。
- 统一时间格式。
- 增加数据来源标识。
- 保留原始业务主键。
- 保留变更操作类型。
- 不提前过滤可能后续会使用的数据。
这样后续如果业务口径变化,可以从 ODS 重新加工。
4. DWD 层:明细数据层
4.1 职责
DWD 层是在 ODS 基础上完成清洗、关联和事实建模后的明细数据层。
主要职责:
- 过滤无效数据。
- 统一字段命名。
- 统一业务状态枚举。
- 统一金额、时间、地区等口径。
- 关联用户、商品、门店、组织等维度。
- 构建业务事实明细表。
DWD 层仍然保持明细粒度,不建议过早聚合。
4.2 示例
订单明细事实表:
CREATE TABLE dwd_trade_order_detail (
order_id BIGINT,
user_id BIGINT,
product_id BIGINT,
category_id BIGINT,
order_amount DECIMAL(18,2),
pay_amount DECIMAL(18,2),
order_status VARCHAR(32),
order_time DATETIME,
pay_time DATETIME,
province VARCHAR(64),
city VARCHAR(64)
);
4.3 设计重点
DWD 层要重点解决“数据能不能被复用”的问题。
例如订单状态在业务库中可能是数字枚举:
0 = 待支付
1 = 已支付
2 = 已取消
3 = 已完成
在 DWD 层可以统一转换为清晰的业务状态:
CREATED
PAID
CANCELLED
FINISHED
这样 DWS 和 ADS 层在计算指标时就不用重复理解源系统枚举。
5. DWS 层:汇总数据层
5.1 职责
DWS 层面向业务主题沉淀公共指标,是指标复用的核心层。
常见主题包括:
- 订单主题。
- 用户主题。
- 商品主题。
- 支付主题。
- 流量主题。
- 门店主题。
DWS 层不是面向某一个具体报表,而是面向可复用的分析主题。
5.2 示例
城市每日订单汇总:
CREATE TABLE dws_trade_city_day (
stat_date DATE,
province VARCHAR(64),
city VARCHAR(64),
order_count BIGINT,
pay_user_count BIGINT,
gmv DECIMAL(18,2),
pay_amount DECIMAL(18,2)
);
5.3 指标口径
DWS 层要重点保证指标口径统一,例如:
- GMV 是否包含取消订单。
- 支付金额是否扣除退款。
- 用户数按下单用户还是支付用户计算。
- 统计时间按下单时间、支付时间还是完成时间计算。
- 是否包含测试订单、内部订单、异常订单。
如果 DWS 层没有统一口径,ADS 层和报表层很容易出现“同一个指标,不同报表结果不一致”的问题。
6. ADS 层:应用数据层
6.1 职责
ADS 层直接面向业务应用、BI 报表、实时大屏和 API 服务。
主要职责:
- 面向具体业务场景组织数据。
- 提供高性能查询。
- 封装复杂指标。
- 降低前端、BI 和业务系统的使用成本。
ADS 层通常不追求通用性,而是围绕具体应用场景做裁剪和组织。
6.2 示例
销售看板 ADS 表:
CREATE TABLE ads_sales_dashboard_day (
stat_date DATE,
total_gmv DECIMAL(18,2),
total_pay_amount DECIMAL(18,2),
total_order_count BIGINT,
total_pay_user_count BIGINT,
top_city VARCHAR(64),
top_category VARCHAR(64),
updated_at DATETIME
);
ADS 层可以根据具体查询场景设计:
- 宽表。
- 预聚合表。
- 排行榜表。
- 趋势表。
- 明细检索表。
- 看板汇总表。
7. 实战一:用户下单风控实时链路
7.1 业务场景
这里用“用户下单风控”作为实时链路示例。
典型业务流程如下:
- 订单、用户、设备、地址、支付、行为等业务表通过 CDC 实时同步。
- RisingWave 基于 CDC 数据持续计算用户、设备、地址等风控指标。
- 风控指标结果写入 MySQL 风控特征库。
- 用户在业务系统提交订单时,业务系统携带当前订单上下文调用通用风控 API。
- 风控 API 查询 MySQL 中的实时特征,并结合当前订单上下文返回决策结果。
- 风控系统结合规则引擎或模型结果,判断订单是否放行、拦截、人工审核或二次验证。
- 订单落库后再通过 CDC 更新后续实时特征。
链路设计为:
业务库订单表 / 用户表 / 设备表 / 地址表 / 支付表
│
Flink CDC
│
RisingWave
│
实时风控指标计算
│
MySQL 风控特征库
│
通用风控 API
│
风控系统 / 规则引擎 / 模型服务
用户提交订单
│
业务系统携带当前订单上下文调用风控 API
│
风控系统返回决策结果
这个链路的核心是把用户行为特征稳定、低延迟地提供给风控决策系统。
7.2 风控实时指标示例
下单风控常见指标包括:
- 用户近 5 分钟下单次数。
- 用户近 1 小时下单金额。
- 用户近 24 小时取消订单次数。
- 同一设备近 10 分钟下单用户数。
- 同一收货地址近 1 小时下单次数。
- 同一手机号近 24 小时绑定账号数。
- 用户历史拒付、退款、投诉次数。
- 当前订单金额是否显著高于用户历史均值。
- 当前下单城市是否与常用城市偏离。
这些指标适合在 RisingWave 中用物化视图持续维护,然后把最新结果同步到 MySQL。
需要注意:下面的 SQL 主要用于表达指标口径。生产环境中要优先使用事件时间窗口、滑动窗口或固定刷新策略,避免把所有逻辑都写成依赖 NOW() 的大范围扫描。
7.3 实时链路分层设计
ODS:实时原始数据接入
通过 Flink CDC 采集业务库 Binlog。
MySQL Binlog -> Flink CDC -> RisingWave Source
ODS 层保留原始业务数据和 CDC 变更信息。风控场景中,建议至少同步以下表:
- 订单表。
- 用户表。
- 支付流水表。
- 设备绑定表。
- 登录行为表。
- 地址簿表。
- 黑名单、灰名单、白名单表。
DWD:实时明细清洗
在 RisingWave 中使用 SQL 或物化视图完成实时清洗,形成订单风控明细宽表。
CREATE MATERIALIZED VIEW dwd_risk_order_detail AS
SELECT
o.order_id,
o.user_id,
o.device_id,
o.receiver_phone,
o.receiver_address_id,
o.product_id,
o.order_amount,
o.order_status,
o.created_at AS order_time,
u.register_time,
u.user_level,
u.province,
u.city
FROM ods_trade_order o
LEFT JOIN ods_user u ON o.user_id = u.user_id
WHERE o.order_status IN ('CREATED', 'WAIT_PAY', 'PAID');
DWS:实时风控指标聚合
例如统计用户、设备、地址维度的实时下单特征:
CREATE MATERIALIZED VIEW dws_risk_user_order_feature AS
SELECT
user_id,
COUNT(*) FILTER (
WHERE order_time >= NOW() - INTERVAL '5 minutes'
) AS order_cnt_5m,
COUNT(*) FILTER (
WHERE order_time >= NOW() - INTERVAL '1 hour'
) AS order_cnt_1h,
SUM(order_amount) FILTER (
WHERE order_time >= NOW() - INTERVAL '1 hour'
) AS order_amount_1h,
MAX(order_time) AS last_order_time
FROM dwd_risk_order_detail
GROUP BY user_id;
CREATE MATERIALIZED VIEW dws_risk_device_order_feature AS
SELECT
device_id,
COUNT(*) FILTER (
WHERE order_time >= NOW() - INTERVAL '10 minutes'
) AS order_cnt_10m,
COUNT(DISTINCT user_id) FILTER (
WHERE order_time >= NOW() - INTERVAL '10 minutes'
) AS user_cnt_10m,
MAX(order_time) AS last_order_time
FROM dwd_risk_order_detail
WHERE device_id IS NOT NULL
GROUP BY device_id;
CREATE MATERIALIZED VIEW dws_risk_address_order_feature AS
SELECT
receiver_address_id,
COUNT(*) FILTER (
WHERE order_time >= NOW() - INTERVAL '1 hour'
) AS order_cnt_1h,
COUNT(DISTINCT user_id) FILTER (
WHERE order_time >= NOW() - INTERVAL '1 hour'
) AS user_cnt_1h,
SUM(order_amount) FILTER (
WHERE order_time >= NOW() - INTERVAL '1 hour'
) AS order_amount_1h
FROM dwd_risk_order_detail
WHERE receiver_address_id IS NOT NULL
GROUP BY receiver_address_id;
ADS:风控特征服务表
ADS 层把多个维度的实时指标合并成一张风控特征快照表。
CREATE MATERIALIZED VIEW ads_risk_order_feature_snapshot AS
SELECT
o.order_id,
o.user_id,
o.device_id,
o.receiver_address_id,
o.order_amount,
o.order_time,
uf.order_cnt_5m AS user_order_cnt_5m,
uf.order_cnt_1h AS user_order_cnt_1h,
uf.order_amount_1h AS user_order_amount_1h,
df.order_cnt_10m AS device_order_cnt_10m,
df.user_cnt_10m AS device_user_cnt_10m,
af.order_cnt_1h AS address_order_cnt_1h,
af.user_cnt_1h AS address_user_cnt_1h,
af.order_amount_1h AS address_order_amount_1h,
NOW() AS feature_updated_at
FROM dwd_risk_order_detail o
LEFT JOIN dws_risk_user_order_feature uf ON o.user_id = uf.user_id
LEFT JOIN dws_risk_device_order_feature df ON o.device_id = df.device_id
LEFT JOIN dws_risk_address_order_feature af ON o.receiver_address_id = af.receiver_address_id;
这张表的结果可以 sink 到 MySQL,由风控 API 按 order_id 或 user_id + order_id 查询。
8. RisingWave 到 MySQL:风控特征库
8.1 适用场景
RisingWave 到 MySQL 在这个场景中不是为了做分析查询,而是作为在线风控特征库。
链路示例:
RisingWave 风控物化视图 -> MySQL 风控特征表 -> 通用风控 API -> 风控系统
MySQL 表可以按订单维度保存风控特征快照:
CREATE TABLE risk_order_feature_snapshot (
order_id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
device_id VARCHAR(128),
receiver_address_id BIGINT,
order_amount DECIMAL(18,2),
user_order_cnt_5m BIGINT,
user_order_cnt_1h BIGINT,
user_order_amount_1h DECIMAL(18,2),
device_order_cnt_10m BIGINT,
device_user_cnt_10m BIGINT,
address_order_cnt_1h BIGINT,
address_user_cnt_1h BIGINT,
address_order_amount_1h DECIMAL(18,2),
feature_updated_at DATETIME,
created_at DATETIME,
updated_at DATETIME,
KEY idx_user_id (user_id),
KEY idx_feature_updated_at (feature_updated_at)
);
如果风控发生在订单落库前,此时可能还没有正式 order_id。可以先使用业务系统生成的 risk_request_id 或预生成订单号作为查询和审计主键,等订单正式落库后再补充 order_id。
风控 API 查询示例:
GET /risk/features/order?orderId=10000001
返回示例:
{
"orderId": 10000001,
"userId": 8888,
"userOrderCnt5m": 3,
"userOrderCnt1h": 8,
"userOrderAmount1h": 1699.00,
"deviceOrderCnt10m": 12,
"deviceUserCnt10m": 5,
"addressOrderCnt1h": 6,
"addressUserCnt1h": 4,
"featureUpdatedAt": "2026-04-24 10:11:12"
}
8.2 这条链路的问题
这条链路可以落地,但需要注意几个关键问题。
第一,如果风控要在“订单提交前”拦截,仅依赖订单表 CDC 是不够的。CDC 通常发生在业务库提交之后,属于异步链路。也就是说,等订单 CDC 进入 RisingWave 再写入 MySQL,已经晚于用户提交订单的同步决策时刻。
第二,风控决策是强在线链路,MySQL 读写压力要严格控制。如果每次下单都触发大量特征更新,并且风控 API 又高并发读取同一张表,MySQL 容易成为瓶颈。
第三,CDC -> RisingWave -> MySQL 存在端到端延迟。用户刚下单时,订单 CDC 事件不一定已经完成计算并写入 MySQL。如果风控系统立即按订单号查询,可能查不到最新特征。
第四,风控需要的是“决策时刻的特征快照”。如果 API 查询到的是几秒后更新过的特征,可能和订单提交时刻不一致,影响规则解释和事后审计。
第五,MySQL 适合做点查,不适合承载复杂特征查询。复杂聚合、历史窗口计算、宽范围扫描都应该放在 RisingWave 或离线链路中完成。
8.3 优化方案
建议把这条链路拆成“同步决策链路”和“异步特征更新链路”。
同步决策链路:
用户提交订单
│
业务系统携带当前订单上下文调用风控 API
│
风控 API 查询 MySQL 中的用户 / 设备 / 地址历史实时特征
│
规则引擎 / 模型服务
│
返回放行 / 拦截 / 人审 / 二次验证
│
业务系统根据决策结果决定是否创建或推进订单
异步特征更新链路:
业务库 CDC
│
Flink CDC
│
RisingWave 实时计算
│
MySQL 风控特征快照表
│
通用风控 API
│
风控系统
也就是说,当前这笔订单的金额、商品、地址、设备、IP 等上下文应该由业务系统直接传给风控 API;RisingWave + MySQL 主要提供“截至当前时刻之前”的历史实时特征。订单 CDC 进入 RisingWave 后,再更新后续订单可使用的特征。
同时补充以下工程约束:
- MySQL 只存风控 API 必须点查的结果,不存大宽表历史明细。
- 风控 API 必须返回
feature_updated_at,风控系统根据特征新鲜度决定是否降级。 - API 查询不到订单特征时,要有兜底策略,例如查询用户维度特征、使用默认规则、进入人工审核或走低风险放行策略。
- 风控特征表按
order_id、user_id、device_id建索引,但主查询路径要控制在点查或小范围查询。 - 特征快照要保留一段时间,便于风控审计和规则回放。
- 对高频更新特征做聚合降频,例如按秒级或 5 秒级刷新,而不是每条事件都立即写 MySQL。
- 对热点用户、热点设备可以增加缓存层,但缓存必须设置较短 TTL,并携带特征更新时间。
如果风控系统对延迟要求极高,例如要求 50ms 内完成决策,则不建议只依赖 MySQL 查询。可以把关键特征同步到 Redis 或专用在线特征库,MySQL 作为审计和兜底存储。
9. Elasticsearch 与 StarRocks 在风控链路中的位置
9.1 Elasticsearch
Elasticsearch 可以作为风控辅助检索系统,但不建议作为主风控决策特征库。
适合放入 ES 的数据:
- 风控事件日志。
- 拦截原因。
- 用户风险画像。
- 订单审核记录。
- 设备、地址、手机号关联明细。
典型用途:
- 风控运营后台检索。
- 人工审核查询。
- 风险事件排查。
- 关联关系分析入口。
不建议让 ES 承担实时决策中的核心聚合计算。
9.2 StarRocks
StarRocks 更适合风控分析和复盘,不适合放在下单同步决策链路中。
适合放入 StarRocks 的数据:
- 风控命中明细。
- 规则命中结果。
- 模型评分结果。
- 人工审核结果。
- 订单最终履约、退款、拒付结果。
典型用途:
- 风控策略效果分析。
- 规则命中率分析。
- 拦截准确率分析。
- 黑产设备、地址、手机号复盘。
- T+1 风控报表。
推荐链路:
RisingWave / MySQL 风控结果 -> StarRocks -> 风控分析看板
10. 风控实时链路最佳实践
10.1 区分同步决策链路和异步分析链路
下单风控属于同步决策链路,必须控制延迟和依赖数量。
建议:
- 同步链路只读取必要特征。
- 复杂分析放到异步链路。
- 风控 API 要设置超时时间。
- API 超时后必须有明确降级策略。
10.2 特征必须有新鲜度
风控 API 返回特征时必须携带更新时间:
{
"orderId": 10000001,
"userOrderCnt5m": 3,
"featureUpdatedAt": "2026-04-24 10:11:12"
}
风控系统可以根据特征新鲜度判断是否可用:
- 5 秒内:正常使用。
- 5 到 30 秒:谨慎使用,叠加保守规则。
- 超过 30 秒:认为特征过期,触发降级策略。
10.3 保留决策快照
风控判断不能只看当前最新特征,还要保留当时决策使用的特征快照。
建议每次风控决策落库:
- order_id。
- user_id。
- rule_version。
- model_version。
- feature_snapshot。
- risk_score。
- decision_result。
- decision_reason。
- decision_time。
这样后续才能解释为什么某个订单被拦截或放行。
10.4 避免 MySQL 承担大规模历史计算
MySQL 在这条链路中只适合点查和小范围查询。
不建议在 MySQL 中做:
- 近 30 天大窗口聚合。
- 多表复杂 Join。
- 高并发模糊检索。
- 大范围分页查询。
这些计算应该前置到 RisingWave、StarRocks 或离线 T+1 链路。
10.5 建立端到端监控
风控实时链路至少要监控:
- CDC 延迟。
- RisingWave 计算延迟。
- Sink 到 MySQL 的写入延迟。
- MySQL 表更新时间。
- 风控 API P95 / P99 延迟。
- API 查询空结果比例。
- 特征过期比例。
- 风控决策降级比例。
对于风控系统来说,链路是否“稳定新鲜”比单次是否“查得到”更重要。
11. 实时链路与 T+1 链路的口径关系
风控实时链路和 T+1 链路也需要对账。
实时链路提供的是下单当下的风险判断依据,T+1 链路提供的是次日复盘和策略优化依据。
例如:
- 实时链路判断某订单是否疑似风险。
- T+1 链路统计这些订单后续是否退款、拒付、投诉或人工审核确认风险。
- 策略团队基于 T+1 结果调整实时规则和模型。
因此风控数据也建议形成闭环:
实时特征 -> 实时决策 -> 决策结果 -> 履约结果 -> T+1 复盘 -> 策略优化
这个闭环比单纯“实时算几个指标”更重要。
12. 实战二:T+1 批量数据链路
12.1 业务场景
T+1 链路主要服务对准确性、完整性要求更高的场景,例如:
- 经营日报。
- 财务报表。
- 管理驾驶舱。
- 历史趋势分析。
- 数据对账。
- 指标重算和补数。
本文中的 T+1 链路设计为:
业务库
│
Flink CDC
│
StarRocks ODS
│
DolphinScheduler
│
批量 SQL / 脚本
│
StarRocks DWD / DWS / ADS
这里 Flink CDC 负责把业务库数据实时或准实时同步到 StarRocks 的 ODS 层,DolphinScheduler 负责编排每天的批量加工任务。
13. T+1 链路分层设计
13.1 ODS:Flink CDC 同步到 StarRocks
业务库通过 Flink CDC 同步到 StarRocks ODS 表。
MySQL Binlog -> Flink CDC -> StarRocks ODS
ODS 表尽量贴近源表结构。
CREATE TABLE ods_trade_order (
order_id BIGINT,
user_id BIGINT,
product_id BIGINT,
order_amount DECIMAL(18,2),
order_status VARCHAR(32),
created_at DATETIME,
updated_at DATETIME,
deleted_flag TINYINT,
sync_time DATETIME
)
PRIMARY KEY(order_id)
DISTRIBUTED BY HASH(order_id) BUCKETS 32;
对于 CDC 同步,StarRocks 中建议根据业务主键设计 Primary Key 表,方便处理更新和删除。
13.2 DWD:批量清洗明细层
每天凌晨由 DolphinScheduler 调度 SQL 脚本,把前一天数据加工成 DWD 明细表。
INSERT OVERWRITE dwd_trade_order_detail PARTITION(p20260423)
SELECT
o.order_id,
o.user_id,
o.product_id,
p.category_id,
o.order_amount,
CASE
WHEN o.order_status = 'PAID' THEN o.order_amount
ELSE 0
END AS pay_amount,
o.order_status,
o.created_at AS order_time,
u.province,
u.city
FROM ods_trade_order o
LEFT JOIN ods_user u ON o.user_id = u.user_id
LEFT JOIN ods_product p ON o.product_id = p.product_id
WHERE o.created_at >= '2026-04-23 00:00:00'
AND o.created_at < '2026-04-24 00:00:00'
AND o.deleted_flag = 0;
DWD 层重点保证明细数据干净、稳定、可追溯。
13.3 DWS:批量汇总公共指标
DWS 层按主题汇总公共指标。
INSERT OVERWRITE dws_trade_city_day PARTITION(p20260423)
SELECT
DATE(order_time) AS stat_date,
province,
city,
COUNT(*) AS order_count,
COUNT(DISTINCT user_id) AS pay_user_count,
SUM(order_amount) AS gmv,
SUM(pay_amount) AS pay_amount
FROM dwd_trade_order_detail
WHERE order_time >= '2026-04-23 00:00:00'
AND order_time < '2026-04-24 00:00:00'
GROUP BY
DATE(order_time),
province,
city;
DWS 层产出的数据可以被多个 ADS 场景复用。
13.4 ADS:面向报表和应用
ADS 层根据具体业务看板组织数据。
INSERT OVERWRITE ads_sales_daily_report PARTITION(p20260423)
SELECT
stat_date,
SUM(gmv) AS total_gmv,
SUM(pay_amount) AS total_pay_amount,
SUM(order_count) AS total_order_count,
SUM(pay_user_count) AS total_pay_user_count
FROM dws_trade_city_day
WHERE stat_date = '2026-04-23'
GROUP BY stat_date;
ADS 层面向消费端设计,核心目标是查询简单、性能稳定、口径清晰。
14. DolphinScheduler 调度设计
T+1 链路可以拆成多个任务节点:
1. ods_check
2. dwd_trade_order_detail
3. dws_trade_city_day
4. ads_sales_daily_report
5. data_quality_check
6. report_notify
任务依赖关系:
ods_check
│
v
dwd_trade_order_detail
│
v
dws_trade_city_day
│
v
ads_sales_daily_report
│
v
data_quality_check
│
v
report_notify
14.1 调度建议
建议:
- 每天凌晨业务低峰期执行。
- 先检查 ODS 数据是否同步完成。
- DWD、DWS、ADS 分层拆任务。
- 每层任务支持重跑。
- 每个任务使用业务日期参数。
- 失败后告警到企业微信、飞书或邮件。
- 核心任务设置超时时间和失败重试次数。
日期参数示例:
biz_date = ${system.biz.date}
start_time = ${biz_date} 00:00:00
end_time = ${biz_date + 1} 00:00:00
14.2 任务拆分原则
不要把所有 SQL 都写在一个大脚本里。
推荐按以下粒度拆分:
- 一个业务主题一个工作流。
- 一个数仓层级一个或多个任务。
- 一个核心产出表一个任务。
- 数据质量校验单独成任务。
- 通知和下游触发单独成任务。
这样方便失败定位、任务重跑和血缘分析。
15. 批量脚本最佳实践
15.1 所有脚本必须可重跑
T+1 任务一定要支持幂等重跑。
推荐方式:
INSERT OVERWRITE table_name PARTITION(partition_name)
SELECT ...
或者:
DELETE FROM table_name WHERE stat_date = '${biz_date}';
INSERT INTO table_name
SELECT ...
不建议直接无条件 INSERT INTO,否则重跑会产生重复数据。
15.2 按业务日期分区
StarRocks 中建议按日期分区:
PARTITION BY date_trunc('day', stat_date)
好处:
- 查询裁剪分区。
- 重跑某一天数据成本低。
- 方便生命周期管理。
- 方便历史补数。
- 降低全表扫描风险。
15.3 ODS 到 DWD 使用明确口径
T+1 报表通常更关注“截至昨日结束时的最终状态”,因此需要明确使用哪种统计口径:
- 创建时间口径。
- 支付时间口径。
- 完成时间口径。
- 更新时间口径。
- 快照日期口径。
不同口径不能混用。
例如订单 GMV 如果按支付时间统计,就不应该在同一个指标中混入创建时间过滤。
15.4 建立数据质量校验
每个核心任务完成后建议做数据质量检查:
- 总行数是否异常。
- 金额是否为负。
- 主键是否重复。
- 核心字段是否为空。
- 与昨日环比是否波动过大。
- ODS 与 DWD 行数是否大幅不一致。
- DWS 与 ADS 指标是否能对齐。
示例:
SELECT COUNT(*)
FROM dwd_trade_order_detail
WHERE stat_date = '${biz_date}';
SELECT COUNT(*)
FROM dwd_trade_order_detail
WHERE stat_date = '${biz_date}'
AND order_id IS NULL;
质量校验不通过时,不应继续产出 ADS 报表。
16. 实时链路与 T+1 链路的关系
实时链路和 T+1 链路不是互相替代,而是互相补充。
| 对比项 | 实时链路 | T+1 链路 |
|---|---|---|
| 目标 | 低延迟 | 高准确性 |
| 典型场景 | 下单风控、实时特征、在线决策 | 日报、财务、经营分析、风控复盘 |
| 计算方式 | 流式计算 | 批量计算 |
| 核心组件 | RisingWave、Flink CDC | DolphinScheduler、StarRocks |
| 数据修正 | 持续更新 | 批量重算 |
| 查询服务 | MySQL 风控特征库、通用风控 API | StarRocks |
| 关注点 | 延迟、特征新鲜度、API 稳定性、降级策略 | 幂等、补数、质量校验、策略复盘 |
一个成熟的数据平台通常会采用类似 Lambda 架构的思想:
实时链路:提供秒级实时特征和在线决策依据。
T+1 链路:提供最终确认后的权威数据和策略复盘依据。
在业务展示上可以明确区分:
- 今日风控实时特征:来自实时链路。
- 昨日及历史数据:来自 T+1 链路。
- 风控策略效果、财务和结算类指标:优先使用 T+1 链路。
17. StarRocks 建模建议
17.1 ODS 层
ODS 层建议使用 Primary Key 表承接 CDC。
适合场景:
- 需要处理 update/delete。
- 按主键查询。
- 保留最新业务状态。
示例:
PRIMARY KEY(order_id)
DISTRIBUTED BY HASH(order_id)
17.2 DWD 层
DWD 层可以使用 Duplicate Key 或 Primary Key,取决于是否需要更新。
建议:
- 明细追加型事实表使用 Duplicate Key。
- 状态会变化的事实表使用 Primary Key。
- 大表按日期分区。
- 常用过滤字段进入排序键或分桶设计。
17.3 DWS / ADS 层
DWS 和 ADS 层可以根据查询方式选择:
- Aggregate Key:适合固定聚合指标。
- Duplicate Key:适合灵活查询。
- Primary Key:适合频繁更新的结果表。
常见优化:
- 按日期分区。
- 按高频过滤字段分桶。
- 排序键贴近查询条件。
- 避免 ADS 表过度宽泛。
- 热数据保留更细粒度,冷数据做汇总归档。
18. 行业内最佳实践
18.1 指标口径统一管理
建议建立指标字典,统一维护:
- 指标名称。
- 指标英文名。
- 计算逻辑。
- 时间口径。
- 过滤条件。
- 数据来源。
- 负责人。
- 是否实时指标。
- 是否财务确认指标。
否则不同报表会出现“同一个 GMV,多个结果”的问题。
18.2 分层命名规范
推荐命名方式:
ods_业务域_表名
dwd_业务域_事实表
dws_业务域_主题_周期
ads_应用场景_报表名
例如:
ods_trade_order
dwd_trade_order_detail
dws_trade_city_day
ads_dashboard_sales_day
18.3 数据任务必须可观测
生产环境中要监控:
- 任务是否成功。
- 数据是否产出。
- 数据量是否异常。
- 延迟是否异常。
- 下游是否消费成功。
- 查询是否变慢。
- 数据质量是否达标。
实时链路看延迟,批量链路看产出和质量。
18.4 建立补数机制
数据平台一定会遇到补数场景,例如:
- 源系统修复历史数据。
- CDC 中断。
- 业务口径调整。
- 脚本逻辑修复。
- 历史报表重算。
因此所有 T+1 脚本都应该支持传入业务日期:
run.sh 2026-04-23
不要把日期写死在脚本中。
18.5 实时决策与离线结果要对账
实时链路产出的风控决策结果,到了次日应该和 T+1 链路中的履约、退款、拒付、投诉、人工审核结果做对账。
例如:
实时链路在 2026-04-23 拦截的高风险订单
vs
T+1 链路中这些订单的实际风险确认结果
如果差异超过阈值,需要告警并分析原因。
常见原因包括:
- CDC 或 Sink 延迟。
- 特征计算窗口不合理。
- 规则版本和模型版本不一致。
- 风控 API 查询到过期特征。
- 履约、退款、拒付结果晚到。
- CDC 丢失。
- 实时特征口径和 T+1 复盘口径不同。
- 用户、设备、地址等维表更新不一致。
19. 总结
ODS-DWD-DWS-ADS 分层解决的是数据资产组织问题,而 RisingWave、Flink CDC、StarRocks、DolphinScheduler 解决的是工程落地问题。
一套实用的数据架构可以这样划分:
实时链路:
Flink CDC -> RisingWave -> MySQL 风控特征库 -> 通用风控 API -> 风控系统
T+1 链路:
Flink CDC -> StarRocks ODS -> DolphinScheduler -> StarRocks DWD/DWS/ADS
实时链路关注低延迟、特征新鲜度和 API 稳定性,适合用户下单风控这类在线决策场景。
T+1 链路关注准确性和可重跑,适合日报、经营分析、财务统计、风控复盘和历史补数。
最终,实时链路提供“当前这笔订单风险如何判断”,T+1 链路提供“这批决策最终效果如何”。两条链路结合,才能同时满足业务对实时性、准确性和可追溯性的要求。
20. 参考资料
- RisingWave 官方文档:https://docs.risingwave.com/
- RisingWave 官网:https://www.risingwave.com/
- Apache Flink CDC 官方文档:https://flink.apache.org/documentation/flink-cdc-stable/
- Apache Flink CDC Stable Docs:https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/
- StarRocks 官方文档:https://docs.starrocks.io/docs/introduction/
- StarRocks 官网:https://www.starrocks.io/
- Apache DolphinScheduler 官网:https://dolphinscheduler.apache.org/
- Apache DolphinScheduler GitHub:https://github.com/apache/dolphinscheduler