企业级Flink SQL实时数仓
生产落地完整教程
V3.1 | Flink On YARN Application + Paimon 流批一体 | 2026-06-08
文档说明
定位:无删减、无跳过、可零基础复刻、可直接上线的大厂生产级实时数仓全套文档。
- 核心架构:Flink On YARN Application Mode + Paimon FileSystem Catalog + 四层数仓
- 适用场景:实时数仓、用户行为分析、实时UV/PV统计、实时业务报表
- 版本基线:JDK1.8 / Hadoop3.3.6 / ZK3.7.1 / Kafka3.2.3 / Flink1.18.0 / Paimon0.8.2 / MySQL8.0 / ClickHouse22.8
版本兼容性:Flink CDC 2.4.0 + Paimon 0.8.2 + Flink 1.18.0;ClickHouse JDBC Connector 1.18.0 + ClickHouse 22.8.x
一、整体企业级架构详解
1.1 架构数据流
1.2 生产硬性规范
- 部署模式:生产环境禁止使用Session模式,统一YARN Application模式
- 元数据管理:Paimon文件目录元数据,无需Hive Metastore,元数据持久化HDFS
- 数据一致性:RocksDB状态后端 + 增量Checkpoint + Savepoint,保障Exactly-Once
- 数仓分层:严格ODS/DWD/DWS/ADS四层,职责单一、数据可追溯、可复用
- 交付规范:纯SQL文件交付,版本可控,禁止线上交互式临时运行
1.3 整体架构拓扑图
二、超详细服务器基础环境搭建
2.1 集群节点规划
| 节点IP | 主机名 | 部署组件 |
|---|---|---|
| 192.168.73.205 | hadoop102 | Hadoop NN、ZK、Kafka、Flink JM |
| 192.168.73.206 | hadoop103 | Hadoop DN、ZK、Kafka、Flink TM |
| 192.168.73.207 | hadoop104 | Hadoop DN、ZK、Kafka、Flink TM |
| 192.168.130.145 | - | MySQL8.0、ClickHouse22.8 |
资源基线:JM 4G / TM 8G x 4slot / 并行度4,单作业约 5万条/秒 CDC 吞吐。
2.2 全节点基础优化配置
注意:以下 2.2.1 ~ 2.2.5 所有操作需在 hadoop102、hadoop103、hadoop104 三台节点上分别执行。
2.2.1 关闭防火墙、SELinux(全节点执行)
systemctl stop firewalld
systemctl disable firewalld
setenforce 0
sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config
2.2.2 主机名与hosts映射(全节点执行)
hostnamectl set-hostname hadoop102 # 各节点分别执行
cat >> /etc/hosts << EOF
192.168.73.205 hadoop102
192.168.73.206 hadoop103
192.168.73.207 hadoop104
EOF
2.2.3 创建专属运行用户(全节点执行)
useradd flink
passwd flink
usermod -aG wheel flink
# /opt/software - 安装包存放目录(tar.gz/rpm包)
# /opt/bigdata - 大数据组件安装目录(解压后的程序)
# /data/hadoop - Hadoop NameNode/DataNode 元数据与数据块
# /data/zookeeper - ZooKeeper 快照与事务日志
# /data/kafka - Kafka 消息日志(Segment文件)
# /data/flink - Flink RocksDB 状态本地存储
mkdir -p /opt/software /opt/bigdata /data/hadoop /data/zookeeper /data/kafka /data/flink
chown -R flink:flink /opt/software /opt/bigdata /data/
2.2.4 配置免密登录(全节点执行)
su - flink
ssh-keygen
ssh-copy-id flink@hadoop102
ssh-copy-id flink@hadoop103
ssh-copy-id flink@hadoop104
2.2.5 全节点安装JDK1.8(全节点执行)
yum remove -y openjdk*
tar -zxvf /opt/software/jdk-8u381-linux-x64.tar.gz -C /opt/bigdata/
cat >> /etc/profile << EOF
export JAVA_HOME=/opt/bigdata/jdk1.8.0_381
export JRE_HOME=\$JAVA_HOME/jre
export CLASSPATH=.:\$JAVA_HOME/lib:\$JRE_HOME/lib
export PATH=\$JAVA_HOME/bin:\$PATH
EOF
source /etc/profile
java -version
三、大数据组件集群超详细搭建
3.1 Hadoop3.3.6 集群搭建
tar -zxvf /opt/software/hadoop-3.3.6.tar.gz -C /opt/bigdata/
chown -R flink:flink /opt/bigdata/hadoop-3.3.6
core-site.xml
<configuration>
<property><name>fs.defaultFS</name><value>hdfs://hadoop102:9000</value></property>
<property><name>hadoop.tmp.dir</name><value>/data/hadoop/tmp</value></property>
<property><name>hadoop.proxyuser.flink.hosts</name><value>*</value></property>
<property><name>hadoop.proxyuser.flink.groups</name><value>*</value></property>
</configuration>
core-site.xml 参数说明
| 参数 | 说明 |
|---|---|
fs.defaultFS | HDFS NameNode 地址,所有组件通过此地址访问 HDFS |
hadoop.tmp.dir | Hadoop 临时数据目录,必须自定义,否则重启后数据丢失 |
hadoop.proxyuser.* | 代理用户配置,Flink on YARN 必需 |
hdfs-site.xml
<configuration>
<property><name>dfs.replication</name><value>2</value></property>
<property><name>dfs.namenode.secondary.http-address</name><value>hadoop103:50090</value></property>
</configuration>
hdfs-site.xml 参数说明
| 参数 | 说明 |
|---|---|
dfs.replication | 数据块副本数。3节点集群设为2,5+节点建议设为3 |
dfs.namenode.secondary.http-address | Secondary NameNode 地址,定期合并 FsImage 和 EditLog |
yarn-site.xml
<configuration>
<property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
<property><name>yarn.resourcemanager.hostname</name><value>hadoop102</value></property>
<property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value></property>
<property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value></property>
</configuration>
yarn-site.xml 参数说明
| 参数 | 说明 |
|---|---|
yarn.nodemanager.aux-services | NodeManager 辅助服务,MR 框架所需的数据交换通道 |
yarn.resourcemanager.hostname | ResourceManager 主机名,集群全局唯一的资源调度中心 |
yarn.nodemanager.vmem-check-enabled | 设为 false 防止 Flink TM 因虚拟内存超额被 YARN 杀掉 |
yarn.nodemanager.pmem-check-enabled | 设为 false 同理,避免容器被意外 kill |
workers: hadoop102 / hadoop103 / hadoop104
scp -r /opt/bigdata/hadoop-3.3.6 flink@hadoop103:/opt/bigdata/
scp -r /opt/bigdata/hadoop-3.3.6 flink@hadoop104:/opt/bigdata/
hdfs namenode -format # 仅首次执行!
start-dfs.sh
start-yarn.sh
jps
3.2 ZooKeeper3.7.1 集群搭建
tar -zxvf /opt/software/apache-zookeeper-3.7.1-bin.tar.gz -C /opt/bigdata/
cp /opt/bigdata/apache-zookeeper-3.7.1-bin/conf/zoo_sample.cfg /opt/bigdata/apache-zookeeper-3.7.1-bin/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=hadoop102:2888:3888
server.2=hadoop103:2888:3888
server.3=hadoop104:2888:3888
zoo.cfg 参数说明
| 参数 | 说明 |
|---|---|
tickTime | 基本时间单元(毫秒),ZK 中所有超时都以此为基准 |
initLimit | Follower 初始连接 Leader 的超时倍数。10 x 2s = 20秒 |
syncLimit | Leader 与 Follower 间心跳超时倍数。5 x 2s = 10秒 |
dataDir | ZK 数据快照和事务日志存储路径 |
clientPort | 客户端连接端口,Flink HA、Kafka Discovery 都通过此端口 |
server.N | 集群成员列表。2888是Leader选举端口,3888是数据同步端口 |
echo 1 > /data/zookeeper/myid # hadoop102
echo 2 > /data/zookeeper/myid # hadoop103
echo 3 > /data/zookeeper/myid # hadoop104
scp -r /opt/bigdata/apache-zookeeper-3.7.1-bin flink@hadoop103:/opt/bigdata/
scp -r /opt/bigdata/apache-zookeeper-3.7.1-bin flink@hadoop104:/opt/bigdata/
zkServer.sh start # 所有节点
zkServer.sh status
3.3 Kafka3.2.3 集群搭建
tar -zxvf /opt/software/kafka_2.12-3.2.3.tgz -C /opt/bigdata/
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.73.205:9092
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
num.partitions=3
default.replication.factor=2
log.dirs=/data/kafka/logs
Kafka server.properties 参数说明
| 参数 | 说明 |
|---|---|
broker.id | 集群中每个 Broker 的唯一标识,三节点必须不同 |
listeners | 监听地址。0.0.0.0 绑定所有网卡 |
advertised.listeners | 对外广播地址,必须是客户端可达的 IP |
zookeeper.connect | ZK 集群地址,Kafka 用 ZK 存储元数据 |
num.partitions | 新建 Topic 默认分区数 |
default.replication.factor | 默认副本数。设为2保证单节点故障不丢数据 |
log.dirs | Kafka 消息日志存储路径,建议用独立磁盘 |
kafka-server-start.sh -daemon /opt/bigdata/kafka_2.12-3.2.3/config/server.properties
3.4 MySQL8.0 搭建
yum remove -y mariadb*
yum install -y mysql-server
systemctl start mysqld
systemctl enable mysqld
grep 'temporary password' /var/log/mysqld.log
mysql -uroot -p
ALTER USER 'root'@'localhost' IDENTIFIED BY 'YourStrongPassword@2026';
CREATE USER 'flink'@'192.168.73.%' IDENTIFIED BY 'YourStrongPassword@2026';
GRANT SELECT, INSERT, UPDATE, DELETE ON biz_db.* TO 'flink'@'192.168.73.%';
GRANT SELECT, INSERT, UPDATE, DELETE ON ads_db.* TO 'flink'@'192.168.73.%';
FLUSH PRIVILEGES;
CREATE DATABASE biz_db;
CREATE DATABASE ads_db;
安全警告:密码仅为示例,生产环境必须使用16位以上强密码。MySQL 必须开启 binlog(
binlog_format=ROW),否则 CDC 无法工作。3.5 ClickHouse22.8 离线部署
# 有网络的机器上提前下载 RPM 包
wget https://github.com/ClickHouse/ClickHouse/releases/download/v22.8.20.22-lts/clickhouse-server-22.8.20.22-1.el8.x86_64.rpm
wget https://github.com/ClickHouse/ClickHouse/releases/download/v22.8.20.22-lts/clickhouse-client-22.8.20.22-1.el8.x86_64.rpm
wget https://github.com/ClickHouse/ClickHouse/releases/download/v22.8.20.22-lts/clickhouse-common-static-22.8.20.22-1.el8.x86_64.rpm
# 拷贝至生产服务器后执行
yum localinstall -y /opt/software/clickhouse*.rpm
systemctl start clickhouse-server
systemctl enable clickhouse-server
clickhouse-client -q "CREATE DATABASE IF NOT EXISTS dws_db;"
四、Flink1.18.0 生产部署
tar -zxvf /opt/software/flink-1.18.0-bin-scala_2.12.tgz -C /opt/bigdata/
cat >> /etc/profile << EOF
export FLINK_HOME=/opt/bigdata/flink-1.18.0
export HADOOP_CONF_DIR=/opt/bigdata/hadoop-3.3.6/etc/hadoop
export YARN_CONF_DIR=/opt/bigdata/hadoop-3.3.6/etc/hadoop
export PATH=\$FLINK_HOME/bin:\$PATH
EOF
source /etc/profile
4.2 生产级 flink-conf.yaml
jobmanager.memory.process.size: 4g
taskmanager.memory.process.size: 8g
taskmanager.numberOfTaskSlots: 4
parallelism.default: 4
state.backend: rocksdb
state.backend.rocksdb.localdir: /data/flink/rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/savepoints
execution.checkpointing.interval: 30000
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 600000
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
state.backend.incremental: true
high-availability: zookeeper
high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.storageDir: hdfs:///flink/ha
high-availability.cluster-id: /flink_prod_cluster
yarn.application-attempts: 3
yarn.maximum-failed-containers: 10
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
Flink 核心配置参数详解
| 参数 | 值 | 说明 |
|---|---|---|
jobmanager.memory.process.size | 4g | JM 进程总内存。管理作业图、调度 Checkpoint |
taskmanager.memory.process.size | 8g | TM 进程总内存。执行算子逻辑、维护状态 |
taskmanager.numberOfTaskSlots | 4 | 每个 TM 的 slot 数,1 slot = 1 个算子线程 |
parallelism.default | 4 | 默认并行度。等于 slot 总数时集群利用率最高 |
state.backend | rocksdb | RocksDB 支持增量 Checkpoint 和超大状态,生产唯一选择 |
state.backend.incremental | true | 增量 Checkpoint,大幅减少 Checkpoint 耗时 |
execution.checkpointing.interval | 30000 | Checkpoint 触发间隔(毫秒),30s 是生产常用值 |
execution.checkpointing.mode | EXACTLY_ONCE | 精准一次语义,保证每条数据恰好被处理一次 |
externalized-checkpoint-retention | RETAIN_ON_CANCELLATION | 生产必开,作业取消后保留 Checkpoint |
high-availability | zookeeper | ZK 管理 JM Leader 选举,JM 故障时自动切换 |
yarn.application-attempts | 3 | Application 故障后 YARN 自动重试 |
hdfs dfs -mkdir -p /flink/checkpoints /flink/savepoints /flink/ha /flink/jars /flink/paimon_warehouse
hdfs dfs -chown -R flink:flink /flink
hdfs dfs -chmod -R 755 /flink
hdfs dfs -put /opt/software/*.jar /flink/jars/
五、Paimon Catalog 元数据初始化(仅执行一次)
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs:///flink/paimon_warehouse',
'hadoop.conf.dir' = '/opt/bigdata/hadoop-3.3.6/etc/hadoop',
'table-default-file-format' = 'parquet'
);
USE CATALOG paimon_catalog;
CREATE DATABASE IF NOT EXISTS ods;
CREATE DATABASE IF NOT EXISTS dwd;
CREATE DATABASE IF NOT EXISTS dws;
CREATE DATABASE IF NOT EXISTS ads;
Paimon Catalog 参数说明
| 参数 | 说明 |
|---|---|
type | Catalog 类型,固定为 paimon。Paimon 自管理元数据,无需 Hive Metastore |
warehouse | 数据仓库根路径。所有 Paimon 表数据存储在此 HDFS 目录下 |
hadoop.conf.dir | Hadoop 配置目录,Paimon 通过此路径读取 HDFS 连接配置 |
table-default-file-format | 默认文件格式。Parquet 列式存储,压缩率高、查询快 |
sql-client.sh embedded -f /opt/bigdata/flink-1.18.0/sql/catalog_init.sql
六、四层实时数仓完整生产SQL
交付规范:文件命名 catalog_init.sql / ods_user_behavior.sql / dwd_user_behavior.sql / dws_behavior_1min.sql / ads_daily_report.sql
6.1 ODS层:MySQL CDC全量+增量同步
USE CATALOG paimon_catalog;
USE ods;
CREATE TABLE IF NOT EXISTS ods_user_behavior (
user_id STRING, item_id STRING, behavior STRING,
operate_time TIMESTAMP(3), dt STRING, hr STRING,
PRIMARY KEY (user_id, operate_time) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc', 'hostname' = '192.168.130.145', 'port' = '3306',
'username' = 'flink', 'password' = 'YourStrongPassword@2026',
'database-name' = 'biz_db', 'table-name' = 'user_behavior',
'server-id' = 5001, 'scan.startup.mode' = 'initial',
'debezium.snapshot.locking.mode' = 'none'
);
CREATE TABLE IF NOT EXISTS ods_user_behavior_paimon (
user_id STRING, item_id STRING, behavior STRING,
operate_time TIMESTAMP(3), dt STRING, hr STRING
) WITH (
'connector' = 'paimon', 'bucket' = '4', 'compression' = 'zstd',
'write-mode' = 'append'
);
INSERT INTO ods_user_behavior_paimon
SELECT user_id, item_id, behavior, operate_time,
DATE_FORMAT(operate_time, 'yyyy-MM-dd') AS dt,
DATE_FORMAT(operate_time, 'HH') AS hr
FROM ods_user_behavior;
ODS层核心参数
| 参数 | 说明 |
|---|---|
server-id | CDC 伪随机 ID 范围起点,多个作业区间不能重叠 |
scan.startup.mode | initial:先全量快照再增量 binlog |
debezium.snapshot.locking.mode | none:快照时不锁表,避免阻塞业务写入 |
bucket | Paimon 数据桶数,建议设为并行度的 1-2 倍 |
compression | zstd 压缩率高,查询性能接近 snappy,推荐生产使用 |
write-mode | ODS 层用 append:不按主键去重,完整保留原始数据 |
6.2 DWD层:明细清洗过滤
USE CATALOG paimon_catalog;
USE dwd;
CREATE TABLE IF NOT EXISTS dwd_user_behavior (
user_id STRING, item_id STRING, behavior STRING,
operate_time TIMESTAMP(3), dt STRING, hr STRING,
PRIMARY KEY (user_id, operate_time) NOT ENFORCED
) WITH (
'connector' = 'paimon', 'bucket' = '4', 'compression' = 'zstd',
'write-mode' = 'merge'
);
INSERT INTO dwd_user_behavior
SELECT * FROM paimon_catalog.ods.ods_user_behavior_paimon
WHERE user_id IS NOT NULL AND item_id IS NOT NULL
AND behavior IN ('click','cart','fav','buy');
DWD 写模式:
merge 模式按主键 upsert 去重,ODS 的 append 不去重,DWD 的 merge 去重——两层职责差异的关键。6.3 DWS层:分钟级窗口聚合
USE CATALOG paimon_catalog;
USE dws;
CREATE TABLE IF NOT EXISTS dws_behavior_1min (
dt STRING, hr STRING, minute STRING, behavior STRING,
uv BIGINT, pv BIGINT, window_end TIMESTAMP(3),
PRIMARY KEY (dt, hr, minute, behavior) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'jdbc:clickhouse://192.168.130.145:8123/dws_db',
'table-name' = 'behavior_1min',
'username' = 'default', 'password' = '',
'sink.buffer-flush.interval' = '1s'
);
INSERT INTO dws_behavior_1min
SELECT dt, hr,
DATE_FORMAT(TUMBLE_END(operate_time, INTERVAL '1' MINUTE), 'mm') AS minute,
behavior, COUNT(DISTINCT user_id) AS uv, COUNT(item_id) AS pv,
TUMBLE_END(operate_time, INTERVAL '1' MINUTE) AS window_end
FROM paimon_catalog.dwd.dwd_user_behavior
GROUP BY TUMBLE(operate_time, INTERVAL '1' MINUTE), dt, hr, behavior;
DWS层核心参数
| 参数 | 说明 |
|---|---|
sink.buffer-flush.interval | ClickHouse 写入缓冲刷新间隔,1s 平衡延迟与吞吐 |
TUMBLE(operate_time, INTERVAL '1' MINUTE') | 1分钟滚动窗口,每条数据恰好属于一个窗口 |
COUNT(DISTINCT user_id) | UV 去重统计,Flink 使用 MinHash/Bundle 优化 |
ClickHouse端建表DDL(需提前执行):
CREATE TABLE IF NOT EXISTS dws_db.behavior_1min (
dt String, hr String, minute String, behavior String,
uv UInt64, pv UInt64, window_end DateTime
) ENGINE = MergeTree()
ORDER BY (dt, hr, minute, behavior)
PARTITION BY dt
TTL window_end + INTERVAL 90 DAY;
ClickHouse 建表参数
| 参数 | 说明 |
|---|---|
MergeTree() | 支持分区、排序、TTL,高吞吐写入 |
ORDER BY | 排序键,查询时做主索引跳数 |
PARTITION BY dt | 按天分区,方便按日期范围查询和 TTL 过期删除 |
TTL window_end + INTERVAL 90 DAY | 数据保留 90 天后自动删除 |
Watermark说明:MySQL CDC connector 自动生成 watermark。若存在大量迟到数据,可在DWD层添加:
WATERMARK FOR operate_time AS operate_time - INTERVAL '5' SECOND。6.4 ADS层:每日业务报表
USE CATALOG paimon_catalog;
USE ads;
CREATE TABLE IF NOT EXISTS dws_source (
dt STRING, hr STRING, minute STRING, behavior STRING,
uv BIGINT, pv BIGINT, window_end TIMESTAMP(3)
) WITH (
'connector' = 'clickhouse',
'url' = 'jdbc:clickhouse://192.168.130.145:8123/dws_db',
'table-name' = 'behavior_1min',
'username' = 'default', 'password' = ''
);
CREATE TABLE IF NOT EXISTS ads_daily_report (
stat_date STRING, click_uv BIGINT, buy_uv BIGINT, total_pv BIGINT,
update_time TIMESTAMP(3),
PRIMARY KEY (stat_date) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.130.145:3306/ads_db?useSSL=false&serverTimezone=Asia/Shanghai',
'table-name' = 'daily_report',
'username' = 'flink', 'password' = 'YourStrongPassword@2026',
'driver' = 'com.mysql.cj.jdbc.Driver'
);
INSERT INTO ads_daily_report
SELECT dt AS stat_date,
SUM(CASE WHEN behavior = 'click' THEN uv ELSE 0 END) AS click_uv,
SUM(CASE WHEN behavior = 'buy' THEN uv ELSE 0 END) AS buy_uv,
SUM(pv) AS total_pv, CURRENT_TIMESTAMP AS update_time
FROM dws_source GROUP BY dt;
七、标准化生产作业提交脚本
#!/bin/bash
FLINK_HOME=/opt/bigdata/flink-1.18.0
HDFS_JAR_PATH=hdfs:///flink/jars
CHECKPOINT_BASE=hdfs:///flink/checkpoints
SQL_FILE=$1; JOB_NAME=$2
if [ -z "$SQL_FILE" ] || [ -z "$JOB_NAME" ]; then echo "用法: $0 "; exit 1; fi
LOG_FILE=${FLINK_HOME}/logs/${JOB_NAME}_$(date +%Y%m%d_%H%M%S).log
mkdir -p ${FLINK_HOME}/logs
nohup ${FLINK_HOME}/bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=4g -Dtaskmanager.memory.process.size=8g \
-Dparallelism.default=4 -Dstate.checkpoints.dir=${CHECKPOINT_BASE}/${JOB_NAME} \
-Dexecution.checkpointing.interval=30000 --allow-non-restored-state \
-j ${HDFS_JAR_PATH}/flink-sql-connector-kafka_2.12-1.18.0.jar \
-j ${HDFS_JAR_PATH}/flink-sql-connector-mysql-cdc-2.4.0.jar \
-j ${HDFS_JAR_PATH}/flink-sql-connector-clickhouse-1.18.0.jar \
-j ${HDFS_JAR_PATH}/flink-sql-connector-paimon-0.8.2.jar \
-j ${HDFS_JAR_PATH}/mysql-connector-java-8.0.33.jar \
-f ${FLINK_HOME}/sql/${SQL_FILE} > ${LOG_FILE} 2>&1 &
PID=$!; sleep 3
if kill -0 $PID 2>/dev/null; then echo "【${JOB_NAME}】提交成功"; else echo "【${JOB_NAME}】可能失败,请查日志"; tail -20 ${LOG_FILE}; exit 1; fi
chmod +x /opt/bigdata/flink-1.18.0/shell/submit_job.sh
提交脚本关键参数
| 参数 | 说明 |
|---|---|
--allow-non-restored-state | 允许从不兼容的 Checkpoint 恢复 |
-j xxx.jar | 添加额外依赖 Jar,SQL 作业的 Connector 必须通过 -j 手动指定 |
-f xxx.sql | 指定 SQL 文件路径 |
/opt/bigdata/flink-1.18.0/shell/submit_job.sh ods_user_behavior.sql ods_job
/opt/bigdata/flink-1.18.0/shell/submit_job.sh dwd_user_behavior.sql dwd_job
/opt/bigdata/flink-1.18.0/shell/submit_job.sh dws_behavior_1min.sql dws_job
/opt/bigdata/flink-1.18.0/shell/submit_job.sh ads_daily_report.sql ads_job
八、企业级运维手册
yarn application -list | grep Flink
tail -f /opt/bigdata/flink-1.18.0/logs/ods_job_*.log
yarn application -kill <应用ID>
flink savepoint <JobID> hdfs:///flink/savepoints/
8.3 自动清理过期Checkpoint
# 仅清理Checkpoint,禁止自动清理Savepoint!
0 2 * * * hdfs dfs -find /flink/checkpoints -mtime +7 -delete
重要提醒:Savepoint 禁止自动删除。建议按作业ID隔离目录:
hdfs:///flink/savepoints/{job_name}/。8.4 关键监控指标
| 指标 | 告警阈值 | 含义 |
|---|---|---|
checkpoint duration | > 5min | 状态过大或HDFS写入慢 |
checkpoint failed | > 0 | 需立即排查 |
checkpoint size | 持续增长 | 状态泄漏 |
busyTimeMsPerSecond | > 80% | 反压风险 |
records-lag-max | 持续增长 | 需扩容 |
九、生产规范与避坑总结
生产上线铁律
- 禁止使用 Flink Session 集群,必须 Application 模式
- Paimon Catalog 全局唯一,仅初始化一次
- 所有作业必须开启 Checkpoint,大状态开启增量快照
- 作业更新前必须手动生成 Savepoint
- 脚本、SQL 纳入 Git 版本管理
- 统一 flink 用户运行,禁止 root
- 密码禁止硬编码
- 数据库用户遵循最小权限原则
9.1 Paimon Compaction 调优
table.exec.async-compact: true
# 'num-sorted-run.compaction-trigger' = '3'
# 'compaction.min.file-num' = '3'
# 'compaction.max.file-num' = '10'
Paimon Compaction 参数
| 参数 | 说明 |
|---|---|
table.exec.async-compact | 异步 Compaction,后台合并小文件,生产必开 |
num-sorted-run.compaction-trigger | 触发 Compaction 的 sorted-run 数量阈值 |
compaction.min/max-file-num | 单次 Compaction 合并的文件数范围 |
9.2 版本兼容性矩阵
| 组件 | 版本 | 说明 |
|---|---|---|
| Flink | 1.18.0 | scala_2.12 |
| Flink CDC | 2.4.0 | mysql-cdc connector |
| Paimon | 0.8.2 | paimon-flink-1.18 |
| ClickHouse Connector | 1.18.0 | jdbc:clickhouse:// 前缀 |
| Hadoop | 3.3.6 | 依赖基座 |
| MySQL | 8.0 | binlog_format=ROW |