canal-admin1.1.15界面化安装配置canal集群详解

安装准备

虚拟机服务器四台
192.168.216.110 (主机名mysql)
192.168.216.111 (主机名hadoop01)
192.168.216.112 (主机名hadoop02)
192.168.216.113 (主机名hadoop03)

canal.admin-1.1.5.tar.gz
canal.deployer-1.1.5.tar.gz
下载地址 https://github.com/alibaba/canal/releases/tag
页面底端

自行安装

zk集群 (本demo—zk地址192.168.216.111:2181,192.168.216.112:2181,192.168.216.113:2181)
mysql5.7(本demo—mysql地址192.168.216.110:3306)
kafka集群(本demo—rocketMQ地址192.168.216.111:9876,192.168.216.112:9876,192.168.216.113:9876)

安装canal-admin

1.创建文件夹canal_admin,将canal.admin-1.1.5.tar.gz解压到该目录
mkdir canal_admin
tar -zxvf canal.admin-1.1.5.tar.gz -C /tmp/canal_admin

2.修改配置application.yml
cd /tmp/canal_admin/conf
vim application.yml

server:
  port: 8089       
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 192.168.216.110:3306
  database: canal_manager
  username: root
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: 123456

3.mysql操作
a. 编辑 mysql配置文件/etc/my.cnf
vim /etc/my.cnf
在mysqld标签下新增如下代码

# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]
lower_case_table_names=1
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock

# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
validate_password=off

#server_id = 1
#log-bin=mysql-bin
#binlog_format = ROW
#log_timestamps=SYSTEM

b. 执行/export/servers/canal_admin/conf下的sql语句
cat canal_manager.sql
进入mysql,copy sql文件里的所有sql,直接执行

c.创建一个同步用的数据库
CREATE USER canal IDENTIFIED BY ‘123456’;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;
– GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ ;
FLUSH PRIVILEGES;

4.启动canal-admin

cd /tmp/canal_admin/bin
sh startup.sh

5.浏览器输入192.168.216.111:8089,输入用户名admin,密码123456
a.点击集群管理,新建集群(集群名称:canal_cluster ; zkurl:192.168.216.111:192.168.216.112:192.168.216.113:2181)

b.选择编辑集群配置,下拉选择主配置

具体代码如下(注:此时canal.destinations参数不需要配置,后面会讲到何时配置)
#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
canal.admin.manager = 192.168.216.111:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

canal.zkServers = 192.168.216.111:2181,192.168.216.112:2181,192.168.216.113:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = 123456
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### 		destinations		#############
#################################################
canal.destinations = 
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 		     MQ 		     #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 192.168.216.111:9876,192.168.216.112:9876,192.168.216.113:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false

安装canal服务端

1.创建文件夹canal,将canal.deployer-1.1.5.tar.gz解压到该目录
mkdir canal
tar -zxvf canal.deployer-1.1.5.tar.gz -C /tmp/canal

2.将canal_local.properties文件内容覆盖到canal.properties,并编辑canal.properties

# register ip
canal.register.ip =192.168.216.111

# canal admin config
canal.admin.manager = 192.168.216.111:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = cancal_cluster

3.将canal文件夹发送到 另外2台机
scp -r canal root@hadoop02: /tmp/canal
scp -r canal root@hadoop03: /tmp/canal
并修改canal.properties中canal.register.ip参数为各自服务器ip,其余参数保持不变
hadoop02上 canal.register.ip = 192.168.216.112
hadoop03上 canal.register.ip = 192.168.216.113

4.分别启动三台服务器上的canal服务

cd /tmp/canal/bin
sh startup.sh

5.刷新浏览器,点击server管理,发现已自动注入三个实例

基于Canal与Flink实现数据实时增量同步(二)

本文主要从Binlog实时采集和离线处理Binlog还原业务数据两个方面,来介绍如何实现DB数据准确、高效地进入Hive数仓。

背景

在数据仓库建模中,未经任何加工处理的原始业务层数据,我们称之为ODS(Operational Data Store)数据。在互联网企业中,常见的ODS数据有业务日志数据(Log)和业务DB数据(DB)两类。对于业务DB数据来说,从MySQL等关系型数据库的业务数据进行采集,然后导入到Hive中,是进行数据仓库生产的重要环节。如何准确、高效地把MySQL数据同步到Hive中?一般常用的解决方案是批量取数并Load:直连MySQL去Select表中的数据,然后存到本地文件作为中间存储,最后把文件Load到Hive表中。这种方案的优点是实现简单,但是随着业务的发展,缺点也逐渐暴露出来:

  • 性能瓶颈:随着业务规模的增长,Select From MySQL -> Save to Localfile -> Load to Hive这种数据流花费的时间越来越长,无法满足下游数仓生产的时间要求。
  • 直接从MySQL中Select大量数据,对MySQL的影响非常大,容易造成慢查询,影响业务线上的正常服务。
  • 由于Hive本身的语法不支持更新、删除等SQL原语(高版本Hive支持,但是需要分桶+ORC存储格式),对于MySQL中发生Update/Delete的数据无法很好地进行支持。

为了彻底解决这些问题,我们逐步转向CDC (Change Data Capture) + Merge的技术方案,即实时Binlog采集 + 离线处理Binlog还原业务数据这样一套解决方案。Binlog是MySQL的二进制日志,记录了MySQL中发生的所有数据变更,MySQL集群自身的主从同步就是基于Binlog做的。

实现思路

首先,采用Flink负责把Kafka上的Binlog数据拉取到HDFS上。

然后,对每张ODS表,首先需要一次性制作快照(Snapshot),把MySQL里的存量数据读取到Hive上,这一过程底层采用直连MySQL去Select数据的方式,可以使用Sqoop进行一次性全量导入。

最后,对每张ODS表,每天基于存量数据和当天增量产生的Binlog做Merge,从而还原出业务数据。

Binlog是流式产生的,通过对Binlog的实时采集,把部分数据处理需求由每天一次的批处理分摊到实时流上。无论从性能上还是对MySQL的访问压力上,都会有明显地改善。Binlog本身记录了数据变更的类型(Insert/Update/Delete),通过一些语义方面的处理,完全能够做到精准的数据还原。

实现方案

Flink处理Kafka的binlog日志

使用kafka source,对读取的数据进行JSON解析,将解析的字段拼接成字符串,符合Hive的schema格式,具体代码如下:

package com.etl.kafka2hdfs;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Map;
import java.util.Properties;

/**
*  @Created with IntelliJ IDEA.
*  @author : jmx
*  @Date: 2020/3/27
*  @Time: 12:52
*  
*/
public class HdfsSink {
public static void main(String[] args) throws Exception {
String fieldDelimiter = “,”;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// checkpoint
env.enableCheckpointing(10_000);
//env.setStateBackend((StateBackend) new FsStateBackend(“file:///E://checkpoint”));
env.setStateBackend((StateBackend) new FsStateBackend(“hdfs://kms-1:8020/checkpoint”));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

// source
Properties props = new Properties();
props.setProperty(“bootstrap.servers”, “kms-2:9092,kms-3:9092,kms-4:9092”);
// only required for Kafka 0.8
props.setProperty(“zookeeper.connect”, “kms-2:2181,kms-3:2181,kms-4:2181”);
props.setProperty(“group.id”, “test123”);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
“qfbap_ods.code_city”, new SimpleStringSchema(), props);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);

// transform
SingleOutputStreamOperator<String> cityDS = stream
.filter(new FilterFunction<String>() {
// 过滤掉DDL操作
@Override
public boolean filter(String jsonVal) throws Exception {
JSONObject record = JSON.parseObject(jsonVal, Feature.OrderedField);
return record.getString(“isDdl”).equals(“false”);
}
})
.map(new MapFunction<String, String>() {

@Override
public String map(String value) throws Exception {
StringBuilder fieldsBuilder = new StringBuilder();
// 解析JSON数据
JSONObject record = JSON.parseObject(value, Feature.OrderedField);
// 获取最新的字段值
JSONArray data = record.getJSONArray(“data”);
// 遍历,字段值的JSON数组,只有一个元素
for (int i = 0; i < data.size(); i++) {
// 获取到JSON数组的第i个元素
JSONObject obj = data.getJSONObject(i);
if (obj != null) {
fieldsBuilder.append(record.getLong(“id”)); // 序号id
fieldsBuilder.append(fieldDelimiter); // 字段分隔符
fieldsBuilder.append(record.getLong(“es”)); //业务时间戳
fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(record.getLong(“ts”)); // 日志时间戳
fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(record.getString(“type”)); // 操作类型
for (Map.Entry<String, Object> entry : obj.entrySet()) {

fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(entry.getValue()); // 表字段数据
}

}
}
return fieldsBuilder.toString();
}

});

//cityDS.print();
//stream.print();

// sink
// 以下条件满足其中之一就会滚动生成新的文件
RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create()
.withRolloverInterval(60L * 1000L) //滚动写入新文件的时间,默认60s。根据具体情况调节
.withMaxPartSize(1024 * 1024 * 128L) //设置每个文件的最大大小 ,默认是128M,这里设置为128M
.withInactivityInterval(60L * 1000L) //默认60秒,未写入数据处于不活跃状态超时会滚动新文件
.build();

StreamingFileSink<String> sink = StreamingFileSink
//.forRowFormat(new Path(“file:///E://binlog_db/city”), new SimpleStringEncoder<String>())
.forRowFormat(new Path(“hdfs://kms-1:8020/binlog_db/code_city_delta”), new SimpleStringEncoder<String>())
.withBucketAssigner(new EventTimeBucketAssigner())
.withRollingPolicy(rollingPolicy)
.withBucketCheckInterval(1000) // 桶检查间隔,这里设置1S
.build();

cityDS.addSink(sink);
env.execute();
}
}

对于Flink Sink到HDFS,StreamingFileSink 替代了先前的 BucketingSink,用来将上游数据存储到 HDFS 的不同目录中。它的核心逻辑是分桶,默认的分桶方式是 DateTimeBucketAssigner,即按照处理时间分桶。处理时间指的是消息到达 Flink 程序的时间,这点并不符合我们的需求。因此,我们需要自己编写代码将事件时间从消息体中解析出来,按规则生成分桶的名称,具体代码如下:

package com.etl.kafka2hdfs;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
*  @Created with IntelliJ IDEA.
*  @author : jmx
*  @Date: 2020/3/27
*  @Time: 12:49
*  
*/

public class EventTimeBucketAssigner implements BucketAssigner<String, String> {

@Override
public String getBucketId(String element, Context context) {
String partitionValue;
try {
partitionValue = getPartitionValue(element);
} catch (Exception e) {
partitionValue = “00000000”;
}
return “dt=” + partitionValue;//分区目录名称
}

@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
private String getPartitionValue(String element) throws Exception {

// 取出最后拼接字符串的es字段值,该值为业务时间
long eventTime = Long.parseLong(element.split(“,”)[1]);
Date eventDate = new Date(eventTime);
return new SimpleDateFormat(“yyyyMMdd”).format(eventDate);
}
}

离线还原MySQL数据

经过上述步骤,即可将Binlog日志记录写入到HDFS的对应的分区中,接下来就需要根据增量的数据和存量的数据还原最新的数据。Hive 表保存在 HDFS 上,该文件系统不支持修改,因此我们需要一些额外工作来写入数据变更。常用的方式包括:JOIN、Hive 事务、或改用 HBase、kudu。

如昨日的存量数据code_city,今日增量的数据为code_city_delta,可以通过 FULL OUTER JOIN,将存量和增量数据合并成一张最新的数据表,并作为明天的存量数据:


INSERT OVERWRITE TABLE code_city
SELECT
COALESCE( t2.id, t1.id ) AS id,
COALESCE ( t2.city, t1.city ) AS city,
COALESCE ( t2.province, t1.province ) AS province,
COALESCE ( t2.event_time, t1.event_time ) AS event_time
FROM
code_city t1
FULL OUTER JOIN (
SELECT
id,
city,
province,
event_time
FROM
(– 取最后一条状态数据
SELECT
id,
city,
province,
dml_type,
event_time,
row_number ( ) over ( PARTITION BY id ORDER BY event_time DESC ) AS rank
FROM
code_city_delta
WHERE
dt = ‘20200324’ — 分区数据
) temp
WHERE
rank = 1
) t2 ON t1.id = t2.id;

小结

本文主要从Binlog流式采集和基于Binlog的ODS数据还原两方面,介绍了通过Flink实现实时的ETL,此外还可以将binlog日志写入kudu、HBase等支持事务操作的NoSQL中,这样就可以省去数据表还原的步骤。本文是《基于Canal与Flink实现数据实时增量同步》的第二篇,关于canal解析Binlog日志写入kafka的实现步骤,参见《基于Canal与Flink实现数据实时增量同步一》。

基于Canal与Flink实现数据实时增量同步(一)

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

准备

常见的binlog命令

# 是否启用binlog日志
show variables like 'log_bin';
# 查看binlog类型
show global variables like 'binlog_format';
# 查看详细的日志配置信息
show global variables like '%log%';
# mysql数据存储目录
show variables like '%dir%';
# 查看binlog的目录
show global variables like "%log_bin%";
# 查看当前服务器使用的biglog文件及大小
show binary logs;
# 查看最新一个binlog日志文件名称和Position
show master status;

配置MySQL的binlog

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

部署canal

安装canal

[kms@kms-1 softwares]$ tar -xzvf canal.deployer-1.1.4.tar.gz  -C /opt/modules/canal/
  • 目录结构
drwxr-xr-x 2 root root 4096 Mar  5 14:19 bin
drwxr-xr-x 5 root root 4096 Mar  5 13:54 conf
drwxr-xr-x 2 root root 4096 Mar  5 13:04 lib
drwxrwxrwx 4 root root 4096 Mar  5 14:19 logs

配置修改

  • 修改conf/example/instance.properties,修改内容如下:
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = kms-1.apache.com:3306 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
# mq config,kafka topic名称
canal.mq.topic=test

修改conf/canal.properties,修改内容如下:

# 配置zookeeper地址
canal.zkServers =kms-2:2181,kms-3:2181,kms-4:2181
# 可选项: tcp(默认), kafka, RocketMQ,
canal.serverMode = kafka
# 配置kafka地址
canal.mq.servers = kms-2:9092,kms-3:9092,kms-4:9092

启动canal

sh bin/startup.sh

关闭canal

sh bin/stop.sh

部署Canal Admin(可选)

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

要求

canal-admin的限定依赖:

  • MySQL,用于存储配置和节点等相关数据
  • canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

安装canal-admin

[kms@kms-1 softwares]$ tar -xzvf canal.admin-1.1.4.tar.gz -C /opt/modules/canal-admin/
  • 目录结构
drwxrwxr-x 2 kms kms 4096 Mar 6 11:25 bin
drwxrwxr-x 3 kms kms 4096 Mar 6 11:25 conf
drwxrwxr-x 2 kms kms 4096 Mar 6 11:25 lib
drwxrwxr-x 2 kms kms 4096 Sep 2 2019 logs
  • 配置修改
vi conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8

spring.datasource:
address: kms-1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1

canal:
adminUser: admin
adminPasswd: admin
  • 初始化原数据库
mysql -uroot -p
# 导入初始化SQL
#注:(1)初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化
# (2)canal_manager.sql默认会在conf目录下
> mysql> source /opt/modules/canal-admin/conf/canal_manager.sql
  • 启动canal-admin
sh bin/startup.sh
  • 访问

可以通过 http://kms-1:8089/ 访问,默认密码:admin/123456

  • canal-server端配置

使用canal_local.properties的配置覆盖canal.properties,将下面配置内容配置在canal_local.properties文件里面,就可以了。

# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
  • 启动canal-serve
sh bin/startup.sh local

注意:先启canal-server,然后再启动canal-admin,之后登陆canal-admin就可以添加serve和instance了。

启动kafka控制台消费者测试

bin/kafka-console-consumer.sh –bootstrap-server kms-2:9092,kms-3:9092,kms-4:9092 –topic test –from-beginning

此时MySQL数据表若有变化,会将row类型的log写进Kakfa,具体格式为JSON:

  • insert操作
{
“data”:[
{
“id”:”338″,
“city”:”成都”,
“province”:”四川省”
}
],
“database”:”qfbap_ods”,
“es”:1583394964000,
“id”:2,
“isDdl”:false,
“mysqlType”:{
“id”:”int(11)”,
“city”:”varchar(256)”,
“province”:”varchar(256)”
},
“old”:null,
“pkNames”:[
“id”
],
“sql”:””,
“sqlType”:{
“id”:4,
“city”:12,
“province”:12
},
“table”:”code_city”,
“ts”:1583394964361,
“type”:”INSERT”
}
  • update操作
{
“data”:[
{
“id”:”338″,
“city”:”绵阳市”,
“province”:”四川省”
}
],
“database”:”qfbap_ods”,
“es”:1583395177000,
“id”:3,
“isDdl”:false,
“mysqlType”:{
“id”:”int(11)”,
“city”:”varchar(256)”,
“province”:”varchar(256)”
},
“old”:[
{
“city”:”成都”
}
],
“pkNames”:[
“id”
],
“sql”:””,
“sqlType”:{
“id”:4,
“city”:12,
“province”:12
},
“table”:”code_city”,
“ts”:1583395177408,
“type”:”UPDATE”
}
  • delete操作
{
“data”:[
{
“id”:”338″,
“city”:”绵阳市”,
“province”:”四川省”
}
],
“database”:”qfbap_ods”,
“es”:1583395333000,
“id”:4,
“isDdl”:false,
“mysqlType”:{
“id”:”int(11)”,
“city”:”varchar(256)”,
“province”:”varchar(256)”
},
“old”:null,
“pkNames”:[
“id”
],
“sql”:””,
“sqlType”:{
“id”:4,
“city”:12,
“province”:12
},
“table”:”code_city”,
“ts”:1583395333208,
“type”:”DELETE”
}

JSON日志格式解释

  • data:最新的数据,为JSON数组,如果是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据,如果是删除,则表示被删除的数据
  • database:数据库名称
  • es:事件时间,13位的时间戳
  • id:事件操作的序列号,1,2,3…
  • isDdl:是否是DDL操作
  • mysqlType:字段类型
  • old:旧数据
  • pkNames:主键名称
  • sql:SQL语句
  • sqlType:是经过canal转换处理的,比如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal
  • table:表名
  • ts:日志时间
  • type:操作类型,比如DELETE,UPDATE,INSERT

利用Flink SQL实时统计单据状态(含历史数据)

背景

需求抽象下,有一种单据,整个生命周期中有很多种状态,现在要实时按照某个维度(例如用户id)统计其中某几个状态的数量,并按照该维度定时推送统计数据。

对于这个需求,解决办法有很多,那么什么情况下不需要Flink来实现?

  1. MySQL存储,单库单表,数据量比较小,直接sql查询
  2. MySQL存储,单库单表,统计维度和状态字段加了索引,且该维度数据比较分散,不会集中在某几个值(按照维度一次查询的数量比较少)
  3. 业务接受统计数据延迟,例如一个小时更新一次,那么可以用小时离线表来统计
  4. 有AnalyticDB(aliyun)、Elastic Search等分析型数据库可用(当然需要考虑成本问题)

其实上面的场景已经覆盖大部分情况,从技术方案选型的角度来讲Flink一定是最后才考虑的方案,为什么这么说呢?

  1. 流式计算还是比较年轻的技术,开发体验还有待提高,尽管有FlinkSQL,但学习成本依然不低,需要转换思维
  2. 调试成本比较高,无法像写代码那样调试计算逻辑,flink的调试通常只能看输入、输出
  3. 需要有合适的数据输入,如果数据不合适的话,可能非常费事(下文会提到)

那么什么情况下可以采用Flink的方案呢?

  1. 真的对实时性有严格要求,并且由于原始数据量级的问题,无法直接查询原始数据(对数据库压力太大,或耗时过久)
  2. 出于学习的目的,且时间充裕
  3. 其他因为直接查询路子走不通的情况(例如,分库分表了、数据量比较大太耗时)
  4. 由于各种原因,申请不到分析型数据库,或者有现成的flink集群
  5. 其他方案成本太高,包括 开发成本、机器资源成本、嘴皮子成本(开玩笑)

基本思路

那么下面我们就来讨论实时要求下使用Flink的方案实现。

对于实时计算架构,业界有比较成熟两个方案,Lambda和Kappa

Lambda架构比较成熟稳定,但是需要维护两套系统的代码,不易于维护

Kappa舍弃了离线数据,通过回溯流数据的方式来解决出问题后修复,但是如果时间跨度比较大的话,修复时间会很久,对于短期数据的统计,用这种方式肯定是更好的方案

离线和实时数据结合

对于我们这个需求:

  1. 首先时间跨度比较长,至少是几个月的
  2. 然后要考虑应用代码的BUG等问题,如果纯用流式计算,如果不进行维护,后续的误差会越来越大,我们希望的是要有一定的自校准能力。
  3. 另外结果的输出希望直接输出一张表,并且是全量数据,不必在应用层进行汇总,减少了一定的维护成本。

结合我们的需求和这两个方案的优缺点,可以发现并不能照搬以上两种方案,需要做一些微创新

那么一个简单的雏形就出现了,但是会有一些问题

  1. 任务每次重启都要先做离线数据的初始化,如果没有初始化,今日无变化的数据就不会落到结果表中。这样除了实时任务外还需要一个初始化任务,每次重启任务就需要有两步,先初始化,然后将实时任务回溯到今日0点然后重跑
  2. 我们知道离线数据通常需要花费比较久的时间,并且很多任务并不是0点就开始跑,有时候任务跑完就已经5、6点了,这期间离线数据用的是前天的,实时数据用的是今天的,这就会出现错误

针对上面两个问题,可以考虑如下优化

  1. 主要是想省个事,这个好办,每天来定时刷一遍离线数据,如果有实时数据,那就以实时为准,没有则用离线。最重要的是可以通过这种方式用离线来对实时数据做一次校准,因为实时数据的效期是一天,如果第二天没有更新,这条数据就会被清理掉,那么就会替换为离线数据,起到了校准的目的。
  2. 这个问题的本质是离线任务执行时间和实时计算的有效数据窗口之间有GAP,导致在离线任务产出之前,离线数据只有前日汇总可用,而实时计算的区间已经切换到今日,如下图所示

那么解决这个问题也比较简单,拉长实时计算的时间区间,给离线任务充足的时间,让离线数据和实时数据区间重叠即可,如下图所示,离线数据采用前日数据,不管昨日数据是否产生,实时数据从昨日0点开始统计,这样离线任务有整整一天的可执行时间,完全是充足的

实时状态计数如何算

如果熟悉Flink撤回机制,很容易想到,这个不就是状态统计吗,两个group by不就搞定实时计数了吗? 如下面的代码所示,当同一个单据号的状态发生变化,就会撤回之前的那条数据,这个user_id下,前一个状态的计数就会减一。

select user_id,status,COUNT(orderNo) num from (
    SELECT orderNo,LAST_VALUE(status) status ,LAST_VALUE(user_id) user_id from order_table group by orderNo
) group by user_id,status
;

这样是很好,但是没有考虑单据生命周期跨越实时计算覆盖范围的情况,假如实时计算近两天的数据,我们要统计的状态是(10,15),这两个状态是连续的,考虑以下情况:

  1. 单据在两天前创建,状态没有到10,实时计算时有了状态为10的数据,此时 状态10计数+1。
  2. 单据在近两天内创建,状态从10到15,由于单据状态为10或15都在实时计算范围内,根据撤回机制,状态10计数-1,状态15计数+1,很完美。
  3. 单据在两天前创建且状态流转到10,在近两天出现了状态15,根据上面的sql,状态15计数+1,但并没有感知之前的状态10,所以加上离线数据的话,状态10的计数就多加了。

简单说只利用实时计算的撤回机制,对于单据跨越实时计算覆盖范围的情况并不能很好处理。当然如果说我把实时计算区间设置大一些,几个月或者一年,超过单据最大生命周期的时间,这当然是可以的,但如果任务重启咋整,源数据并不会保存那么久,所以还是要考虑这个问题。

那状态变更的计数如何做呢?

根据上面描述,如果能拿到当前状态及前一个状态,那么可以对每个状态的计数进行加减。变更消息过来时,将更新后的状态计数加一,将更新前的状态计数减一,这样就能正确计数。但有时候现实会有很多限制条件,比如:状态变更消息没有更新前状态,或者干脆没有状态更新消息,那怎么办呢?改代码加上带更新前状态的消息,如果这种方式并不简单呢?

接入数据库变更日志(binlog)是一种很好的解耦方式,很多云服务提供的数据库都有对应的CDC(Change Data Capture)功能,像阿里云的Flink服务就提供了对应的connector(mysql-cdc);此外还有一些同步工具通过解析binlog将数据同步出来,通过开发对应的connector都可以对接上flink,例如Aliyun-DTScanal等。

但是有些数据库同步工具没有变更前数据,对于数据同步的场景是无所谓的,但是我们这个场景是必须要的,对于这种情况,如果非要走下去,那就要加个中间层,缓存每个单据的前一次状态。有两种方案:

  1. 可以通过Flink的backend state 来存储,不过这个存储是有时效的,但单据的生命周期有可能大于存储时效,这时可以再引入离线数据。
  2. 直接建一个单据前一次状态表,既作为维表又作为结果表,每次一条数过来先当做维表查询上一次状态,然后再把本次状态写入结果表。

然而第2种方案有些重,引入了一张新的外部表,相当于同步了一份单据前一次状态的数据,并且还是要考虑离线数据的问题。而方案1,根据数据分布的特性,由于我们关心的状态在某一时刻,单据并不会很多,离线单据状态可以直接缓存到Flink,所以这里我决定还是用方案1来实现。

实现方案

整体的架构如上图所示,具体代码见文末附录。

方案说明:

  • 单据上次状态:由于数据源没有更新前数据,因此我们只能利用Flink的存储,并加上离线的单据状态进行计算,得到前一次的状态值,这里自定义了聚合函数prevValue,缓存单据前一个状态,如果没有,那就从离线单据中取得,
  • 单据状态操作值转换:这里根据当前和上一次状态转换为+1或-1操作,如果两个状态一致,我们就要忽略掉,有可能是重复数据;利用UDTF,可能一条数据转为两条,上一次状态-1及当前状态+1;如果前一个状态为空,则忽略。

注意点:

  • 这里需要注意的是,从数据库同步到离线的单据,要选择通过更新时间来分区,这种方式会根据更新时间字段来判断某条数据该不该落到今天这个分区,这样会准确些。而普通的每天全量的方式就可能会把今天更新的数据也放到昨天的分区中,而我们实时计算的覆盖范围是识别更新时间的,这时有可能因为离线计算和实时计算有重叠而导致错误。


数据准确性调试

数据的调试相对于业务逻辑的调试更加麻烦些,有如下原因:

  1. 测试数据不太好造,主要是因为场景多,输入输出输出比较复杂
  2. 线上数据可能不如预期的规范,测试时不能覆盖到该场景

我目前也没有什么比较好的办法,就是花时间造数据测试;在线上数据跑起来后要多验证,对于异常数据要把源数据拉下来找出原因。

可改进点

  • 如果离线单据状态数据比较多,不能全部缓存,或者影响性能的话,可以引入其他存储,例如hbase。


参考资料

附录

CREATE FUNCTION statusCount AS 'com.fliaping.flink.udf.StatusCountUdtf' ;

CREATE FUNCTION prevValue AS 'com.fliaping.flink.udf.PrevValueUdaf' ;

-- 采购单的tt数据源
create table xx_source
(
    id BIGINT
    ,gmt_modified VARCHAR
    ,user_id VARCHAR
    ,biz_status VARCHAR
    ,event_time AS PROCTIME()
    ,PRIMARY KEY (`id`)
) with (
    type = 'your-db-type'
    ...
)
;


-- 状态统计离线表
create table offline_order_status_num
(
    user_id VARCHAR COMMENT '供应商ID'
    ,biz_status VARCHAR COMMENT '业务状态'
    ,last_day_num BIGINT COMMENT '昨日待办数量'
    ,num BIGINT COMMENT '待办数量'
    ,ds VARCHAR COMMENT '分区字段'
    ,PRIMARY KEY (user_id, biz_status)
    ,PERIOD FOR SYSTEM_TIME    --定义了维表的标识。
) with (
    type = 'your-offline-type'
    ...
)
;

-- 单据状态离线表
create table order_status_history
(
    order_no VARCHAR COMMENT '供应商ID'
    ,biz_status VARCHAR COMMENT '业务状态'
    ,last_biz_status VARCHAR COMMENT '昨日状态'
    ,gmt_modified TIMESTAMP COMMENT '修改时间'
    ,last_gmt_modified TIMESTAMP COMMENT '昨日修改时间'
    ,ds VARCHAR COMMENT '分区字段'
    ,PRIMARY KEY (order_no)
    ,PERIOD FOR SYSTEM_TIME    --定义了维表的标识。
) with (
    type = 'your-offline-type'
    ...
)
;

-- 进行数据过滤
create VIEW filter_view AS
select  order_no
        ,user_id
        ,biz_status
        ,gmt_modified
        ,event_time
FROM    tt_source
where   order_no IS NOT NULL
and     user_id IS NOT NULL
AND     biz_status IS NOT NULL
AND     gmt_modified IS NOT NULL
;

-- 获取Flink存储中的单据前一个状态,并缓存当前状态
CREATE VIEW get_before_biz_status AS
SELECT  order_no
        ,user_id
        ,biz_status
        ,TO_TIMESTAMP (gmt_modified) gmt_modified
        ,DATE_FORMAT (TO_TIMESTAMP (gmt_modified), 'yyyyMMdd') AS gmt_modified_day
        ,prevValue(biz_status) OVER (PARTITION BY order_no ORDER BY event_time) prev_biz_status
FROM    filter_view
;

-- 对于Flink中没有存储上一次状态的单据,去离线数据中查出来
CREATE VIEW order_with_last_biz_status AS
SELECT  c.*
        -- 考虑每天的离线数据需要一定时间产出,故使用前日离线数据
        ,IF (
            d.ds = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
            ,d.last_biz_status
            ,d.biz_status
        ) prev_biz_status_offline
        ,IF (
            d.ds = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
            ,d.last_gmt_modified
            ,d.gmt_modified
        ) gmt_modified_offline
FROM    get_before_biz_status c
LEFT JOIN order_status_history FOR SYSTEM_TIME AS OF PROCTIME () d
ON      c.order_no = d.order_no AND c.prev_biz_status IS NULL
;

-- 将状态迁移通过udtf映射为不同状态的操作数,例如,状态迁移为10-15-16,那么10来的时候,状态10的操作是+1,15来时,状态10 -1,15 +1,产生两条数据,16来时,15 -1, 这么做的原因是TT数据对于更新操作,没有更新前数据和更新后数据,对于状态计数只能通过加入业务状态机的方式识别状态变更到来时,哪个状态+1,哪个状态-1
CREATE VIEW status_shift_opt AS
select  S.order_no
        ,S.user_id
        ,S.biz_status
        ,S.gmt_modified_day
        ,S.gmt_modified
        ,T.effect_biz_status
        ,T.effect_biz_status_opt
from    (
    -- 将离线和Flink中取到的前一个状态进行合并,确定该单据前一个状态
            SELECT  order_no
                    ,user_id
                    ,biz_status
                    ,gmt_modified_day
                    ,gmt_modified
                    ,IF(
                        prev_biz_status IS NULL AND prev_biz_status_offline IS NOT NULL
                        ,prev_biz_status_offline
                        ,prev_biz_status
                    ) prev_biz_status
            FROM    order_with_last_biz_status
            WHERE   (gmt_modified_offline IS NOT NULL AND gmt_modified_offline < gmt_modified)
            or      (gmt_modified_offline IS NULL)
        ) as S
        ,LATERAL TABLE (statusCount (biz_status,prev_biz_status)) as T (effect_biz_status, effect_biz_status_opt)
;

-- 按照天数据进行分组,并过滤出近两天的数据
CREATE VIEW summary_with_day AS
SELECT  user_id
        ,effect_biz_status biz_status
        ,gmt_modified_day
        ,SUM(effect_biz_status_opt) num
FROM    status_shift_opt
-- 实时数据,只统计昨日和今日的数据
WHERE   effect_biz_status IN (10,15)
GROUP BY user_id
         ,effect_biz_status,gmt_modified_day
;

-- 按照供应商+状态维度进行分组,将近两日数据累加,得到(昨日+今日数量)
CREATE VIEW summary_with_user_id_status AS
SELECT  user_id
        ,biz_status
        ,SUM(num) num
FROM    summary_with_day
WHERE  gmt_modified_day = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd')
or  gmt_modified_day = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
GROUP BY user_id,biz_status
;

-- 加上离线数量,维表离线数量(截止昨日0点) + 实时数量(昨日+今日)= 总数量
CREATE VIEW plus_offline_data AS
SELECT  user_id
        ,biz_status
        ,offline_num+num num
        ,CONCAT(
            'offlineNum:'
            ,offline_num
            ,',realtimeNum:'
            ,num
            ,',sourceType:Flink_STREAM'
        ) attribute
FROM    (
            SELECT  a.user_id
                    ,a.biz_status
                    ,IF(
                        b.ds = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
                        ,IF(b.last_day_num IS NULL,0,b.last_day_num)
                        ,IF(b.num IS NULL,0,b.num)
                    ) offline_num
                    ,a.num
            FROM    summary_with_user_id_status a
            LEFT JOIN offline_order_status_num FOR SYSTEM_TIME AS OF PROCTIME () b
            ON      a.user_id = b.user_id
            and     a.biz_status = b.biz_status
        )
;



create table mysql_output
(
    ,`user_id` varchar
    ,biz_status varchar
    ,num BIGINT
    ,attribute varchar
    ,primary key (biz_status, `user_id`) ) with ( type = 'mysql'
    ,url = 'jdbc:mysql://xxxxxx'
    ,tableName = 'order_status_count'
    ,userName = 'xxx'
    ,password = 'xxx'
)
;

INSERT INTO mysql_output
SELECT  *
FROM    plus_offline_data
;

UDF

prevValue函数(UDAF:UDF – Aggregation Functions):

package com.fliaping.flink.udf;

import org.apache.flink.table.functions.AggregateFunction;

public class PrevValueUdaf extends AggregateFunction<String, PrevValueUdaf.PrevValueAccum> {

    private static final long serialVersionUID = 3733259132660252997L;

    @Override
    public PrevValueAccum createAccumulator() {
        return new PrevValueAccum();
    }

    @Override
    public String getValue(PrevValueAccum prevValueAccum) {
        return prevValueAccum.prevValue;
    }

    public void accumulate(PrevValueAccum accumulator, String iValue) {
        accumulator.prevValue = accumulator.currentValue;
        accumulator.currentValue =iValue;
    }


    public void retract(PrevValueAccum accumulator, String iValue) {
        accumulator.currentValue = accumulator.prevValue;
        accumulator.prevValue = iValue;
    }

    public void merge(PrevValueAccum accumulator, Iterable<PrevValueAccum> its) {
        for (PrevValueAccum it : its) {
            accumulator.currentValue = it.currentValue;
            accumulator.prevValue = it.prevValue;
        }
    }

    public static class PrevValueAccum {
        private String prevValue;
        private String currentValue;
    }
}

statusCount函数(UDTF:UDF – Table Functions):

package com.fliaping.flink.udf;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava18.com.google.common.base.Objects;
import org.apache.flink.table.functions.TableFunction;

public class StatusCountUdtf extends TableFunction<Tuple2<String, Integer>>{

    private static final long serialVersionUID = 5467096366714935845L;

    public void eval(String bizStatus, String prevBizStatus) {

        if(Objects.equal(bizStatus, prevBizStatus)) {
            return;
        }

        if (bizStatus != null) {
            collect(Tuple2.of(bizStatus, 1));
        }

        if (prevBizStatus != null) {
            collect(Tuple2.of(prevBizStatus, -1));
        }

    }
}

离线 SQL代码:

-- 计算统计数量,包含day-1和day-2的数据
INSERT OVERWRITE TABLE order_status_num PARTITION(ds='${bizdate}')
SELECT  IF(a.user IS NULL,b.user_id,a.user_id) user_id
        ,IF(a.biz_status IS NULL,b.biz_status,a.biz_status) biz_status
        ,b.num last_day_num
        ,a.num
FROM    (
            SELECT  user_id
                    ,biz_status
                    ,COUNT(order_no) num
            FROM    my_order
            WHERE   ds = '${bizdate}'
            AND     biz_status IN (10,15)
            GROUP BY user_id
                     ,biz_status
        ) a
FULL OUTER JOIN (
                    SELECT  user_id
                            ,biz_status
                            ,COUNT(order_no) num
                    FROM    my_order
                    WHERE   ds = to_char(dateadd(to_date('${bizdate}', "yyyymmdd"), - 1, 'dd'), 'yyyymmdd')
                    AND     biz_status IN (10,15)
                    GROUP BY user_id
                             ,biz_status
                ) b
ON      a.user_id = b.user_id
AND     a.biz_status = b.biz_status
;

-- 计算单据状态,包含day-1和day-2的数据
INSERT OVERWRITE TABLE order_status_history PARTITION(ds='${bizdate}')
SELECT  IF(
            a.order_no IS NULL
            ,b.order_no
            ,a.order_no
        ) order_no
        ,a.biz_status
        ,b.biz_status last_biz_status
        ,a.gmt_modified
        ,b.gmt_modified last_gmt_modified
FROM    (
            SELECT  order_no
                    ,biz_status
                    ,gmt_modified
            FROM    my_order
            WHERE   ds = '${bizdate}'
            AND     biz_status IN (10,15)
        ) a
FULL OUTER JOIN (
                    SELECT  order_no
                            ,biz_status
                            ,gmt_modified
                    FROM    my_order
                    WHERE   ds = to_char(dateadd(to_date('${bizdate}', "yyyymmdd"), - 1, 'dd'), 'yyyymmdd')
                    AND     biz_status IN (10,15)
                ) b
ON      a.order_no = b.order_no
;

实时数仓|基于Flink1.11的SQL构建实时数仓探索实践

案例简介

本文会以电商业务为例,展示实时数仓的数据处理流程。另外,本文旨在说明实时数仓的构建流程,所以不会涉及太复杂的数据计算。为了保证案例的可操作性和完整性,本文会给出详细的操作步骤。为了方便演示,本文的所有操作都是在Flink SQL Cli中完成的。

架构设计

具体的架构设计如图所示:首先通过canal解析MySQL的binlog日志,将数据存储在Kafka中。然后使用Flink SQL对原始数据进行清洗关联,并将处理之后的明细宽表写入kafka中。维表数据存储在MySQL中,通过Flink SQL对明细宽表与维表进行JOIN,将聚合后的数据写入MySQL,最后通过FineBI进行可视化展示。

业务数据准备

  • 订单表(order_info)
CREATE TABLE `order_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `operate_time` datetime DEFAULT NULL COMMENT '操作时间',
  `expire_time` datetime DEFAULT NULL COMMENT '失效时间',
  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  `province_id` int(20) DEFAULT NULL COMMENT '地区',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
  • 订单详情表(order_detail)
CREATE TABLE `order_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单详情表';
  • 商品表(sku_info)
CREATE TABLE `sku_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
  `spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',
  `price` decimal(10,0) DEFAULT NULL COMMENT '价格',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',
  `sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格描述',
  `weight` decimal(10,2) DEFAULT NULL COMMENT '重量',
  `tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗余)',
  `category3_id` bigint(20) DEFAULT NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';

商品一级类目表(base_category1)

CREATE TABLE `base_category1` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(10) NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一级分类表';

商品二级类目表(base_category2)

CREATE TABLE `base_category2` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(200) NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint(20) DEFAULT NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二级分类表';

商品三级类目表(base_category3)

CREATE TABLE `base_category3` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(200) NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint(20) DEFAULT NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三级分类表';

省份表(base_province)

CREATE TABLE `base_province` (
  `id` int(20) DEFAULT NULL COMMENT 'id',
  `name` varchar(20) DEFAULT NULL COMMENT '省名称',
  `region_id` int(20) DEFAULT NULL COMMENT '大区id',
  `area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

区域表(base_region)

CREATE TABLE `base_region` (
  `id` int(20) NOT NULL COMMENT '大区id',
  `region_name` varchar(20) DEFAULT NULL COMMENT '大区名称',
   PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

数据处理流程

ODS层数据同步

关于ODS层的数据同步参见我的另一篇文章基于Canal与Flink实现数据实时增量同步(一)。主要使用canal解析MySQL的binlog日志,然后将其写入到Kafka对应的topic中。由于篇幅限制,不会对具体的细节进行说明。同步之后的结果如下图所示:

DIM层维表数据准备

本案例中将维表存储在了MySQL中,实际生产中会用HBase存储维表数据。我们主要用到两张维表:区域维表商品维表。处理过程如下:

区域维表

首先将mydw.base_provincemydw.base_region这个主题对应的数据抽取到MySQL中,主要使用Flink SQL的Kafka数据源对应的canal-json格式,注意:在执行装载之前,需要先在MySQL中创建对应的表,本文使用的MySQL数据库的名字为dim,用于存放维表数据。如下:

-- -------------------------
--   省份
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_province`;
CREATE TABLE `ods_base_province` (
  `id` INT,
  `name` STRING,
  `region_id` INT ,
  `area_code`STRING
) WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_province',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   省份
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_province`;
CREATE TABLE `base_province` (
    `id` INT,
    `name` STRING,
    `region_id` INT ,
    `area_code`STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_province', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--   省份
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_province
SELECT *
FROM ods_base_province;

-- -------------------------
--   区域
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_region`;
CREATE TABLE `ods_base_region` (
  `id` INT,
  `region_name` STRING
) WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_region',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   区域
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_region`;
CREATE TABLE `base_region` (
    `id` INT,
    `region_name` STRING,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_region', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--   区域
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_region
SELECT *
FROM ods_base_region;

经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:dim_province作为维表:

-- ---------------------------------
-- DIM层,区域维表,
-- 在MySQL中创建视图
-- ---------------------------------
DROP VIEW IF EXISTS dim_province;
CREATE VIEW dim_province AS
SELECT
  bp.id AS province_id,
  bp.name AS province_name,
  br.id AS region_id,
  br.region_name AS region_name,
  bp.area_code AS area_code
FROM base_region br 
     JOIN base_province bp ON br.id= bp.region_id
;

这样我们所需要的维表:dim_province就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:

-- -------------------------
--  一级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category1`;
CREATE TABLE `ods_base_category1` (
  `id` BIGINT,
  `name` STRING
)WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.base_category1',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ;

-- -------------------------
--  一级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category1`;
CREATE TABLE `base_category1` (
    `id` BIGINT,
    `name` STRING,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category1', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--  一级类目表
--   MySQL Sink Load Data
-- ------------------------- 

INSERT INTO base_category1
SELECT *
FROM ods_base_category1;

-- -------------------------
--  二级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category2`;
CREATE TABLE `ods_base_category2` (
  `id` BIGINT,
  `name` STRING,
  `category1_id` BIGINT
)WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_category2',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ;

-- -------------------------
--  二级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category2`;
CREATE TABLE `base_category2` (
    `id` BIGINT,
    `name` STRING,
    `category1_id` BIGINT,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category2', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--  二级类目表
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category2
SELECT *
FROM ods_base_category2;

-- -------------------------
-- 三级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category3`;
CREATE TABLE `ods_base_category3` (
  `id` BIGINT,
  `name` STRING,
  `category2_id` BIGINT
)WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_category3',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--  三级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category3`;
CREATE TABLE `base_category3` (
    `id` BIGINT,
    `name` STRING,
    `category2_id` BIGINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category3', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--  三级类目表
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category3
SELECT *
FROM ods_base_category3;

-- -------------------------
--   商品表
--   Kafka Source
-- ------------------------- 

DROP TABLE IF EXISTS `ods_sku_info`;
CREATE TABLE `ods_sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10,0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10,2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0)
) WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.sku_info',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   商品表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `sku_info`;
CREATE TABLE `sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10,0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10,2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0),
   PRIMARY KEY (tm_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'sku_info', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--   商品
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO sku_info
SELECT *
FROM ods_sku_info;

经过上面的步骤,我们可以将创建商品维表的基础数据表同步到MySQL中,同样需要提前创建好对应的数据表。接下来我们使用上面的基础表在mySQL的dim库中创建一张视图:dim_sku_info,用作后续使用的维表。

-- ---------------------------------
-- DIM层,商品维表,
-- 在MySQL中创建视图
-- ---------------------------------
CREATE VIEW dim_sku_info AS
SELECT
  si.id AS id,
  si.sku_name AS sku_name,
  si.category3_id AS c3_id,
  si.weight AS weight,
  si.tm_id AS tm_id,
  si.price AS price,
  si.spu_id AS spu_id,
  c3.name AS c3_name,
  c2.id AS c2_id,
  c2.name AS c2_name,
  c3.id AS c1_id,
  c3.name AS c1_name
FROM
(
  sku_info si 
  JOIN base_category3 c3 ON si.category3_id = c3.id
  JOIN base_category2 c2 ON c3.category2_id =c2.id
  JOIN base_category1 c1 ON c2.category1_id = c1.id
);

至此,我们所需要的维表数据已经准备好了,接下来开始处理DWD层的数据。

DWD层数据处理

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。具体过程如下:

-- -------------------------
--   订单详情
--   Kafka Source
-- ------------------------- 

DROP TABLE IF EXISTS `ods_order_detail`;
CREATE TABLE `ods_order_detail`(
  `id` BIGINT,
  `order_id` BIGINT,
  `sku_id` BIGINT,
  `sku_name` STRING,
  `img_url` STRING,
  `order_price` DECIMAL(10,2),
  `sku_num` INT,
  `create_time` TIMESTAMP(0)
) WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.order_detail',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   订单信息
--   Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_info`;
CREATE TABLE `ods_order_info` (
  `id` BIGINT,
  `consignee` STRING,
  `consignee_tel` STRING,
  `total_amount` DECIMAL(10,2),
  `order_status` STRING,
  `user_id` BIGINT,
  `payment_way` STRING,
  `delivery_address` STRING,
  `order_comment` STRING,
  `out_trade_no` STRING,
  `trade_body` STRING,
  `create_time` TIMESTAMP(0) ,
  `operate_time` TIMESTAMP(0) ,
  `expire_time` TIMESTAMP(0) ,
  `tracking_no` STRING,
  `parent_order_id` BIGINT,
  `img_url` STRING,
  `province_id` INT
) WITH(
'connector' = 'kafka',
 'topic' = 'mydw.order_info',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- ---------------------------------
-- DWD层,支付订单明细表dwd_paid_order_detail
-- ---------------------------------
DROP TABLE IF EXISTS dwd_paid_order_detail;
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,0),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- DWD层,已支付订单明细表
-- 向dwd_paid_order_detail装载数据
-- ---------------------------------
INSERT INTO dwd_paid_order_detail
SELECT
  od.id,
  oi.id order_id,
  oi.user_id,
  oi.province_id,
  od.sku_id,
  od.sku_name,
  od.sku_num,
  od.order_price,
  oi.create_time,
  oi.operate_time
FROM
    (
    SELECT * 
    FROM ods_order_info
    WHERE order_status = '2' -- 已支付
    ) oi JOIN
    (
    SELECT *
    FROM ods_order_detail
    ) od 
    ON oi.id = od.order_id;

ADS层数据

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了Kafka中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

  • ads_province_index

首先在MySQL中创建对应的ADS目标表:ads_province_index

CREATE TABLE ads.ads_province_index(
  province_id INT(10),
  area_code VARCHAR(100),
  province_name VARCHAR(100),
  region_id INT(10),
  region_name VARCHAR(100),
  order_amount DECIMAL(10,2),
  order_count BIGINT(10),
  dt VARCHAR(100),
  PRIMARY KEY (province_id, dt) 
) ;

向MySQL的ADS层目标装载数据:

-- Flink SQL Cli操作
-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
--      2.每天每个省份的订单金额
-- ---------------------------------
CREATE TABLE ads_province_index(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_id INT,
  region_name STRING,
  order_amount DECIMAL(10,2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED  
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/ads',
    'table-name' = 'ads_province_index', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表
-- ---------------------------------
CREATE TABLE tmp_province_index(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE
)WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_province_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index
SELECT
      province_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_province_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_province_index_source(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_province_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

-- ---------------------------------
-- DIM层,区域维表,
-- 创建区域维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_province`;
CREATE TABLE dim_province (
  province_id INT,
  province_name STRING,
  area_code STRING,
  region_id INT,
  region_name STRING ,
  PRIMARY KEY (province_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'dim_province', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'scan.fetch-size' = '100'
);

-- ---------------------------------
-- 向ads_province_index装载数据
-- 维表JOIN
-- ---------------------------------

INSERT INTO ads_province_index
SELECT
  pc.province_id,
  dp.area_code,
  dp.province_name,
  dp.region_id,
  dp.region_name,
  pc.order_amount,
  pc.order_count,
  cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_source pc
  JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp 
  ON dp.province_id = pc.province_id;

当提交任务之后:观察Flink WEB UI:

查看ADS层的ads_province_index表数据:

  • ads_sku_index

首先在MySQL中创建对应的ADS目标表:ads_sku_index

CREATE TABLE ads_sku_index
(
  sku_id BIGINT(10),
  sku_name VARCHAR(100),
  weight DOUBLE,
  tm_id BIGINT(10),
  price DOUBLE,
  spu_id BIGINT(10),
  c3_id BIGINT(10),
  c3_name VARCHAR(100) ,
  c2_id BIGINT(10),
  c2_name VARCHAR(100),
  c1_id BIGINT(10),
  c1_name VARCHAR(100),
  order_amount DOUBLE,
  order_count BIGINT(10),
  sku_count BIGINT(10),
  dt varchar(100),
  PRIMARY KEY (sku_id,dt)
);

向MySQL的ADS层目标装载数据:

-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
--      2.每天每个商品对应的订单金额
--      3.每天每个商品对应的数量
-- ---------------------------------
CREATE TABLE ads_sku_index
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/ads',
    'table-name' = 'ads_sku_index', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe'
);

-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
CREATE TABLE tmp_sku_index(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
 order_sku_num BIGINT,
    pay_date DATE
)WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
      sku_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
   sum(sku_num) order_sku_num,
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
;

-- ---------------------------------
-- tmp_sku_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    order_sku_num BIGINT,
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM层,商品维表,
-- 创建商品维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
  id BIGINT,
  sku_name STRING,
  c3_id BIGINT,
  weight DECIMAL(10,2),
  tm_id BIGINT,
  price DECIMAL(10,2),
  spu_id BIGINT,
  c3_name STRING,
  c2_id BIGINT,
  c2_name STRING,
  c1_id BIGINT,
  c1_name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'dim_sku_info', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向ads_sku_index装载数据
-- 维表JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc 
  JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
  ON ds.id = sc.sku_id
  ;

当提交任务之后:观察Flink WEB UI:

查看ADS层的ads_sku_index表数据:

FineBI结果展示

其他注意点

Flink1.11.0存在的bug

当在代码中使用Flink1.11.0版本时,如果将一个change-log的数据源insert到一个upsert sink时,会报如下异常:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. 
Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])

该bug目前已被修复,修复可以在Flink1.11.1中使用。

docker快速搭建alibaba-canal


需求来源

 跟踪表对比前后修改数据变化

方案

  1. AOP、拦截器优点: 实现简单缺点:
    1. 代码入侵性强 每加入一个模块就需要处理切面添加切入点,模块配置也需要增加。
    2. 项目依赖麻烦,档案模块需要依赖对应模块api包进行序列化实现对比,业务模块需要实现查询接口。
    3. 模块适配插拔式实现困难
    4. sql解析困难复杂容易出错(拦截mapper层解析update语句)
  2. 日志采集方案优点:
    1. 只需要处理数据变化对比,可以直接集成三方插件如es查询,无代码入侵。后续扩展强,无论任何任何模块需要加入类似功能只需要中间件配置,添加log监听即可。
    缺点:
    1. 需要部署其他服务,增加运维成本,依赖于三方服务可能存在数据丢失

canal部署

环境

这里使用的是mac os 搭配docker部署,几乎和linux环境差不多

canal + rocketMQ + ES 实现

基础同步技术介绍

canal

准备

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

快速启动参考:QuickStart

其中主要注意配置匹配数据库、表规则配置,可以选择匹配部分表做记录。 也可以通过配置不同的topic消费做消费实现不同的业务。参考文档:配置 主要关注Topic配置匹配规则

RoketMQ 搭建

docker 使用 foxiswho/rocketmq 镜像 参考博客

docker run -d -p 10911:10911 -p 10909:10909\
 --name rmqbroker --link rmqserver:namesrv\
 -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt"\
 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m"\
 foxiswho/rocketmq:broker-4.5.1

使用上面docker命令拉取镜像,快速搭建

搭建canal

docker搭建官方文档

下载docker启动文件

linux命令

wget  https://github.com/alibaba/canal/blob/master/docker/run.sh

mac 命令

curl -0 https://github.com/alibaba/canal/blob/master/docker/run.sh

配置canal配置项

参考配置

主要注意配置

#  按需修改成自己的数据库信息
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# mq config 这个配置使用动态topic就可以不配置,选其一
canal.mq.topic=example
# 针对库名或者表名发送动态topic,参考配置
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
sh run.sh -e canal.auto.scan=false \
		  -e canal.destinations=test \
		  -e canal.instance.master.address=172.17.0.7:3306  \
		  -e canal.instance.dbUsername=admin  \
		  -e canal.instance.dbPassword=123456  \
		  -e canal.instance.defaultDatabaseName=mytest  \
		  -e canal.instance.connectionCharset=UTF-8 \
		  -e canal.instance.tsdb.enable=true \
		  -e canal.instance.gtidon=false  \
		  -e canal.mq.dynamicTopic=mytest:mytest.sg_pop_base_info  \
		  -e canal.mq.partition=0 \
		  -e canal.serverMode=rocketmq \
		  -e canal.mq.servers=172.17.0.10:9876 \
		  -e canal.mq.retries=0 \
		  -e canal.mq.batchSize=16384 \
		  -e canal.mq.maxRequestSize=1048576 \
		  -e canal.mq.lingerMs=200 \
		  -e canal.mq.bufferMemory=33554432 \
		  -e canal.mq.canalBatchSize=50  \
		  -e canal.mq.canalGetTimeout=100 \
		  -e canal.mq.flatMessage=true \
		  -e canal.mq.compressionType=none \
		  -e canal.mq.acks=all \
		  -e canal.mq.transaction=false 

mysql、mq如果使用的容器,注意添加link配置 或者 通过命令docker inspect [容器名] 查看IPAddress配置到配置中

docker查看启动成功

#运行 docker logs 容器名称
docker logs canal-server

# 如下表示成功
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Generating SSH1 RSA host key:                              [  OK  ]
Starting sshd:                                             [  OK  ]
Starting crond:                                            [  OK  ]
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...

有兴趣的可以看一下run.sh这个文件,搞过服务器的肯定看得懂,就是各种变量然后组装docker 的run命令

查看MQ消息

修改监控表数据即可查看mq中消息,如果修改了数据没有消息,查看canal-server容器中日志报错

本地mq客户端查看消息http://localhost:8180/#/message

实现技术通过canal获取日志拿到对比数据放入MQ,MQ消息数据如下:

新增消息

{
    "data": [
        {
            "id": "111",
            "name": "ad ",
            "create_date": "2020-04-15 14:31:03",
            "update_date": "2020-04-15 14:31:08",
            "create_user": "asdf ",
            "update_user": "sadf ",
            "is_deleted": "0"
        }
    ],
    "database": "grid_comm_center",
    "es": 1586932274000,
    "id": 66,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(64)",
        "name": "varchar(200)",
        "create_date": "timestamp",
        "update_date": "timestamp",
        "create_user": "varchar(32)",
        "update_user": "varchar(32)",
        "is_deleted": "tinyint(2) unsigned"
    },
    "old": null,
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "name": 12,
        "create_date": 93,
        "update_date": 93,
        "create_user": 12,
        "update_user": 12,
        "is_deleted": -6
    },
    "table": "demo_id",
    "ts": 1586932275053,
    "type": "INSERT"
}

修改消息

{
    "data": [
        {
            "id": "111",
            "name": "ad asdf ",
            "create_date": "2020-04-15 14:31:03",
            "update_date": "2020-04-15 14:31:54",
            "create_user": "asdf adf ",
            "update_user": "sadf asdf",
            "is_deleted": "0"
        }
    ],
    "database": "grid_comm_center",
    "es": 1586932314000,
    "id": 67,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(64)",
        "name": "varchar(200)",
        "create_date": "timestamp",
        "update_date": "timestamp",
        "create_user": "varchar(32)",
        "update_user": "varchar(32)",
        "is_deleted": "tinyint(2) unsigned"
    },
    "old": [
        {
            "name": "ad ",
            "update_date": "2020-04-15 14:31:08",
            "create_user": "asdf ",
            "update_user": "sadf "
        }
    ],
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "name": 12,
        "create_date": 93,
        "update_date": 93,
        "create_user": 12,
        "update_user": 12,
        "is_deleted": -6
    },
    "table": "demo_id",
    "ts": 1586932314983,
    "type": "UPDATE"
}

删除消息

{
    "data": [
        {
            "id": "1111",
            "name": "123123",
            "create_date": "2020-03-17 17:48:23",
            "update_date": "2020-03-17 17:48:26",
            "create_user": "你好",
            "update_user": "你好",
            "is_deleted": "1"
        }
    ],
    "database": "grid_comm_center",
    "es": 1587366810000,
    "id": 1049,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(64)",
        "name": "varchar(200)",
        "create_date": "timestamp",
        "update_date": "timestamp",
        "create_user": "varchar(32)",
        "update_user": "varchar(32)",
        "is_deleted": "tinyint(2) unsigned"
    },
    "old": null,
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "name": 12,
        "create_date": 93,
        "update_date": 93,
        "create_user": 12,
        "update_user": 12,
        "is_deleted": -6
    },
    "table": "demo_id",
    "ts": 1587366810548,
    "type": "DELETE"
}

如上消息可以清楚拿到数据库、表详细变更,通过拿到变更数据解析插入数据库即可。

官方中还可以通过ClientAdapter 适配器入库到RDB、HBase、ES,这里根据需求做部署操作吧。目前这个里自己做档案模块进行消费写入档案数据,预计搜索可以放入es可以加快查询效率。

查看这些消息,然后做消费即可做自己想做的事情,至此已经完成了canal的基础搭建

消费消息写入ElasticSearch

参考博客

maven配置

<elasticsearch.version>7.2.0</elasticsearch.version>
<!-- Java High Level REST Client -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>${elasticsearch.version}</version>
</dependency>

Spring boot封装工具类

mq使用的是公司封装的包,所以这里省略mq相关代码。直接集成mq消费相关消费做自己业务处理即可。

《从0到1学习Flink》—— Apache Flink 介绍

前言

Flink 是一种流式计算框架,为什么我会接触到 Flink 呢?因为我目前在负责的是监控平台的告警部分,负责采集到的监控数据会直接往 kafka 里塞,然后告警这边需要从 kafka topic 里面实时读取到监控数据,并将读取到的监控数据做一些 聚合/转换/计算 等操作,然后将计算后的结果与告警规则的阈值进行比较,然后做出相应的告警措施(钉钉群、邮件、短信、电话等)。画了个简单的图如下:

监控告警

目前告警这块的架构是这样的结构,刚进公司那会的时候,架构是所有的监控数据直接存在 ElasticSearch 中,然后我们告警是去 ElasticSearch 中搜索我们监控指标需要的数据,幸好 ElasticSearch 的搜索能力够强大。但是你有没有发现一个问题,就是所有的监控数据从采集、采集后的数据做一些 计算/转换/聚合、再通过 Kafka 消息队列、再存进 ElasticSearch 中,再而去 ElasticSearch 中查找我们的监控数据,然后做出告警策略。整个流程对监控来说看起来很按照常理,但是对于告警来说,如果中间某个环节出了问题,比如 Kafka 消息队列延迟、监控数据存到 ElasticSearch 中写入时间较长、你的查询姿势写的不对等原因,这都将导致告警从 ElasticSearch 查到的数据是有延迟的。也许是 30 秒、一分钟、或者更长,这样对于告警来说这无疑将导致告警的消息没有任何的意义。

为什么这么说呢?为什么需要监控告警平台呢?无非就是希望我们能够尽早的发现问题,把问题给告警出来,这样开发和运维人员才能够及时的处理解决好线上的问题,以免给公司造成巨大的损失。

更何况现在还有更多的公司在做那种提前预警呢!这种又该如何做呢?需要用大数据和机器学习的技术去分析周期性的历史数据,然后根据这些数据可以整理出来某些监控指标的一些周期性(一天/七天/一月/一季度/一年)走势图,这样就大概可以绘图出来。然后根据这个走势图,可以将当前时间点的监控指标的数据使用量和走势图进行对比,在快要达到我们告警规则的阈值时,这时就可以提前告一个预警出来,让运维提前知道预警,然后提前查找问题,这样就能够提早发现问题所在,避免损失,将损失降到最小!当然,这种也是我打算做的,应该可以学到不少东西的。

于是乎,我现在就在接触流式计算框架 Flink,类似的还有常用的 Spark 等。

自己也接触了 Flink 一段时间了,这块中文资料目前书籍是只有一本很薄的,英文书籍也是三本不超过。

我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以关注我的公众号:zhisheng,然后回复关键字:Flink 即可无条件获取到。

另外这里也推荐一些博客可以看看:

1、官网:[https://flink.apache.org/]()

2、GitHub: [https://github.com/apache/flink]()

3、[https://blog.csdn.net/column/…]()

4、[https://blog.csdn.net/lmalds/…]()

5、[http://wuchong.me/]()

6、[https://blog.csdn.net/liguohu…]()

下面的介绍可能也有不少参考以上所有的资料,感谢他们!在介绍 Flink 前,我们先看看 数据集类型 和 数据运算模型 的种类。

数据集类型有哪些呢:

  • 无穷数据集:无穷的持续集成的数据集合
  • 有界数据集:有限不会改变的数据集合

那么那些常见的无穷数据集有哪些呢?

  • 用户与客户端的实时交互数据
  • 应用实时产生的日志
  • 金融市场的实时交易记录

数据运算模型有哪些呢:

  • 流式:只要数据一直在产生,计算就持续地进行
  • 批处理:在预先定义的时间内运行计算,当完成时释放计算机资源

Flink 它可以处理有界的数据集、也可以处理无界的数据集、它可以流式的处理数据、也可以批量的处理数据。

Flink 是什么 ?

flink-01

flink-02

flink-03

上面三张图转自 云邪 成都站 《Flink 技术介绍与未来展望》,侵删。

从下至上,Flink 整体结构

flink-stack-frontpage

从下至上:

1、部署:Flink 支持本地运行、能在独立集群或者在被 YARN 或 Mesos 管理的集群上运行, 也能部署在云上。

2、运行:Flink 的核心是分布式流式数据引擎,意味着数据以一次一个事件的形式被处理。

3、API:DataStream、DataSet、Table、SQL API。

4、扩展库:Flink 还包括用于复杂事件处理,机器学习,图形处理和 Apache Storm 兼容性的专用代码库。

Flink 数据流编程模型

抽象级别

Flink 提供了不同的抽象级别以开发流式或批处理应用。

2018-10-14_09-34-17

  • 最底层提供了有状态流。它将通过 过程函数(Process Function)嵌入到 DataStream API 中。它允许用户可以自由地处理来自一个或多个流数据的事件,并使用一致、容错的状态。除此之外,用户可以注册事件时间和处理事件回调,从而使程序可以实现复杂的计算。
  • DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。
  • Table API 是以  为中心的声明式 DSL,其中表可能会动态变化(在表达流数据时)。Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁(代码量更少)。

你可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。

  • Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

Flink 程序与数据流结构

2018-10-14_09-51-09

Flink 应用程序结构就是如上图所示:

1、Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。

2、Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。

3、Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。

为什么选择 Flink?

Flink 是一个开源的分布式流式处理框架:

①提供准确的结果,甚至在出现无序或者延迟加载的数据的情况下。

②它是状态化的容错的,同时在维护一次完整的的应用状态时,能无缝修复错误。

③大规模运行,在上千个节点运行时有很好的吞吐量和低延迟。

更早的时候,我们讨论了数据集类型(有界 vs 无穷)和运算模型(批处理 vs 流式)的匹配。Flink 的流式计算模型启用了很多功能特性,如状态管理,处理无序数据,灵活的视窗,这些功能对于得出无穷数据集的精确结果是很重要的。

  • Flink 保证状态化计算强一致性。”状态化“意味着应用可以维护随着时间推移已经产生的数据聚合或者,并且 Filnk 的检查点机制在一次失败的事件中一个应用状态的强一致性。

exactly_once_state

  • Flink 支持流式计算和带有事件时间语义的视窗。事件时间机制使得那些事件无序到达甚至延迟到达的数据流能够计算出精确的结果。

out_of_order_stream

  • 除了提供数据驱动的视窗外,Flink 还支持基于时间,计数,session 等的灵活视窗。视窗能够用灵活的触发条件定制化从而达到对复杂的流传输模式的支持。Flink 的视窗使得模拟真实的创建数据的环境成为可能。

windows

  • Flink 的容错能力是轻量级的,允许系统保持高并发,同时在相同时间内提供强一致性保证。Flink 以零数据丢失的方式从故障中恢复,但没有考虑可靠性和延迟之间的折衷。

distributed_snapshots

  • Flink 能满足高并发和低延迟(计算大量数据很快)。下图显示了 Apache Flink 与 Apache Storm 在完成流数据清洗的分布式任务的性能对比。

streaming_performance

  • Flink 保存点提供了一个状态化的版本机制,使得能以无丢失状态和最短停机时间的方式更新应用或者回退历史数据。

savepoints

  • Flink 被设计成能用上千个点在大规模集群上运行。除了支持独立集群部署外,Flink 还支持 YARN 和Mesos 方式部署。
  • Flink 的程序内在是并行和分布式的,数据流可以被分区成 stream partitions,operators 被划分为operator subtasks; 这些 subtasks 在不同的机器或容器中分不同的线程独立运行;operator subtasks 的数量在具体的 operator 就是并行计算数,程序不同的 operator 阶段可能有不同的并行数;如下图所示,source operator 的并行数为 2,但最后的 sink operator 为1;

parallel_dataflows

  • 自己的内存管理Flink 在 JVM 中提供了自己的内存管理,使其独立于 Java 的默认垃圾收集器。 它通过使用散列,索引,缓存和排序有效地进行内存管理。
  • 丰富的库Flink 拥有丰富的库来进行机器学习,图形处理,关系数据处理等。 由于其架构,很容易执行复杂的事件处理和警报。

分布式运行

flink 作业提交架构流程可见下图:

1、Program Code:我们编写的 Flink 应用程序代码

2、Job Client:Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点。 Job Client 负责接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。 执行完成后,Job Client 将结果返回给用户

3、Job Manager:主进程(也称为作业管理器)协调和管理程序的执行。 它的主要职责包括安排任务,管理checkpoint ,故障恢复等。机器集群中至少要有一个 master,master 负责调度 task,协调 checkpoints 和容灾,高可用设置的话可以有多个 master,但要保证一个是 leader, 其他是 standby; Job Manager 包含 Actor system、Scheduler、Check pointing 三个重要的组件

4、Task Manager:从 Job Manager 处接收需要部署的 Task。Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点。 任务执行的并行性由每个 Task Manager 上可用的任务槽决定。 每个任务代表分配给任务槽的一组资源。 例如,如果 Task Manager 有四个插槽,那么它将为每个插槽分配 25% 的内存。 可以在任务槽中运行一个或多个线程。 同一插槽中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。

最后

本文主要讲了我接触到 Flink 的缘由,然后从数据集类型和数据运算模型开始讲起,接着介绍了下 Flink 是什么、Flink 的整体架构、提供的 API、Flink 的优点所在以及 Flink 的分布式作业运行的方式。水文一篇,希望你能够对 Flink 稍微有一点概念了。

Apache Flink

Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用JavaScala编写的分布式流数据流引擎。[2][3]Flink以数据并行管道方式执行任意流数据程序[4],Flink的流水线运行时系统可以执行批处理和流处理程序。[5][6]此外,Flink的运行时本身也支持迭代算法的执行。[7]

Flink提供高吞吐量、低延迟的流数据引擎[8]以及对事件-时间处理和状态管理的支持。Flink应用程序在发生机器故障时具有容错能力,并且支持exactly-once语义。[9]程序可以用Java、Scala[10]Python[11]SQL[12]等语言编写,并自动编译和优化[13]到在集群或云环境中运行的数据流程序。[14]

Flink并不提供自己的数据存储系统,但为Amazon KinesisApache KafkaAlluxioHDFSApache CassandraElasticsearch等系统提供了数据源和接收器。