canal-admin1.1.15界面化安装配置canal集群详解

安装准备

虚拟机服务器四台
192.168.216.110 (主机名mysql)
192.168.216.111 (主机名hadoop01)
192.168.216.112 (主机名hadoop02)
192.168.216.113 (主机名hadoop03)

canal.admin-1.1.5.tar.gz
canal.deployer-1.1.5.tar.gz
下载地址 https://github.com/alibaba/canal/releases/tag
页面底端

自行安装

zk集群 (本demo—zk地址192.168.216.111:2181,192.168.216.112:2181,192.168.216.113:2181)
mysql5.7(本demo—mysql地址192.168.216.110:3306)
kafka集群(本demo—rocketMQ地址192.168.216.111:9876,192.168.216.112:9876,192.168.216.113:9876)

安装canal-admin

1.创建文件夹canal_admin,将canal.admin-1.1.5.tar.gz解压到该目录
mkdir canal_admin
tar -zxvf canal.admin-1.1.5.tar.gz -C /tmp/canal_admin

2.修改配置application.yml
cd /tmp/canal_admin/conf
vim application.yml

server:
  port: 8089       
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 192.168.216.110:3306
  database: canal_manager
  username: root
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: 123456

3.mysql操作
a. 编辑 mysql配置文件/etc/my.cnf
vim /etc/my.cnf
在mysqld标签下新增如下代码

# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]
lower_case_table_names=1
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock

# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
validate_password=off

#server_id = 1
#log-bin=mysql-bin
#binlog_format = ROW
#log_timestamps=SYSTEM

b. 执行/export/servers/canal_admin/conf下的sql语句
cat canal_manager.sql
进入mysql,copy sql文件里的所有sql,直接执行

c.创建一个同步用的数据库
CREATE USER canal IDENTIFIED BY ‘123456’;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;
– GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ ;
FLUSH PRIVILEGES;

4.启动canal-admin

cd /tmp/canal_admin/bin
sh startup.sh

5.浏览器输入192.168.216.111:8089,输入用户名admin,密码123456
a.点击集群管理,新建集群(集群名称:canal_cluster ; zkurl:192.168.216.111:192.168.216.112:192.168.216.113:2181)

b.选择编辑集群配置,下拉选择主配置

具体代码如下(注:此时canal.destinations参数不需要配置,后面会讲到何时配置)
#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
canal.admin.manager = 192.168.216.111:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

canal.zkServers = 192.168.216.111:2181,192.168.216.112:2181,192.168.216.113:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = 123456
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### 		destinations		#############
#################################################
canal.destinations = 
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 		     MQ 		     #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 192.168.216.111:9876,192.168.216.112:9876,192.168.216.113:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false

安装canal服务端

1.创建文件夹canal,将canal.deployer-1.1.5.tar.gz解压到该目录
mkdir canal
tar -zxvf canal.deployer-1.1.5.tar.gz -C /tmp/canal

2.将canal_local.properties文件内容覆盖到canal.properties,并编辑canal.properties

# register ip
canal.register.ip =192.168.216.111

# canal admin config
canal.admin.manager = 192.168.216.111:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = cancal_cluster

3.将canal文件夹发送到 另外2台机
scp -r canal root@hadoop02: /tmp/canal
scp -r canal root@hadoop03: /tmp/canal
并修改canal.properties中canal.register.ip参数为各自服务器ip,其余参数保持不变
hadoop02上 canal.register.ip = 192.168.216.112
hadoop03上 canal.register.ip = 192.168.216.113

4.分别启动三台服务器上的canal服务

cd /tmp/canal/bin
sh startup.sh

5.刷新浏览器,点击server管理,发现已自动注入三个实例

基于Canal与Flink实现数据实时增量同步(二)

本文主要从Binlog实时采集和离线处理Binlog还原业务数据两个方面,来介绍如何实现DB数据准确、高效地进入Hive数仓。

背景

在数据仓库建模中,未经任何加工处理的原始业务层数据,我们称之为ODS(Operational Data Store)数据。在互联网企业中,常见的ODS数据有业务日志数据(Log)和业务DB数据(DB)两类。对于业务DB数据来说,从MySQL等关系型数据库的业务数据进行采集,然后导入到Hive中,是进行数据仓库生产的重要环节。如何准确、高效地把MySQL数据同步到Hive中?一般常用的解决方案是批量取数并Load:直连MySQL去Select表中的数据,然后存到本地文件作为中间存储,最后把文件Load到Hive表中。这种方案的优点是实现简单,但是随着业务的发展,缺点也逐渐暴露出来:

  • 性能瓶颈:随着业务规模的增长,Select From MySQL -> Save to Localfile -> Load to Hive这种数据流花费的时间越来越长,无法满足下游数仓生产的时间要求。
  • 直接从MySQL中Select大量数据,对MySQL的影响非常大,容易造成慢查询,影响业务线上的正常服务。
  • 由于Hive本身的语法不支持更新、删除等SQL原语(高版本Hive支持,但是需要分桶+ORC存储格式),对于MySQL中发生Update/Delete的数据无法很好地进行支持。

为了彻底解决这些问题,我们逐步转向CDC (Change Data Capture) + Merge的技术方案,即实时Binlog采集 + 离线处理Binlog还原业务数据这样一套解决方案。Binlog是MySQL的二进制日志,记录了MySQL中发生的所有数据变更,MySQL集群自身的主从同步就是基于Binlog做的。

实现思路

首先,采用Flink负责把Kafka上的Binlog数据拉取到HDFS上。

然后,对每张ODS表,首先需要一次性制作快照(Snapshot),把MySQL里的存量数据读取到Hive上,这一过程底层采用直连MySQL去Select数据的方式,可以使用Sqoop进行一次性全量导入。

最后,对每张ODS表,每天基于存量数据和当天增量产生的Binlog做Merge,从而还原出业务数据。

Binlog是流式产生的,通过对Binlog的实时采集,把部分数据处理需求由每天一次的批处理分摊到实时流上。无论从性能上还是对MySQL的访问压力上,都会有明显地改善。Binlog本身记录了数据变更的类型(Insert/Update/Delete),通过一些语义方面的处理,完全能够做到精准的数据还原。

实现方案

Flink处理Kafka的binlog日志

使用kafka source,对读取的数据进行JSON解析,将解析的字段拼接成字符串,符合Hive的schema格式,具体代码如下:

package com.etl.kafka2hdfs;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Map;
import java.util.Properties;

/**
*  @Created with IntelliJ IDEA.
*  @author : jmx
*  @Date: 2020/3/27
*  @Time: 12:52
*  
*/
public class HdfsSink {
public static void main(String[] args) throws Exception {
String fieldDelimiter = “,”;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// checkpoint
env.enableCheckpointing(10_000);
//env.setStateBackend((StateBackend) new FsStateBackend(“file:///E://checkpoint”));
env.setStateBackend((StateBackend) new FsStateBackend(“hdfs://kms-1:8020/checkpoint”));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

// source
Properties props = new Properties();
props.setProperty(“bootstrap.servers”, “kms-2:9092,kms-3:9092,kms-4:9092”);
// only required for Kafka 0.8
props.setProperty(“zookeeper.connect”, “kms-2:2181,kms-3:2181,kms-4:2181”);
props.setProperty(“group.id”, “test123”);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
“qfbap_ods.code_city”, new SimpleStringSchema(), props);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);

// transform
SingleOutputStreamOperator<String> cityDS = stream
.filter(new FilterFunction<String>() {
// 过滤掉DDL操作
@Override
public boolean filter(String jsonVal) throws Exception {
JSONObject record = JSON.parseObject(jsonVal, Feature.OrderedField);
return record.getString(“isDdl”).equals(“false”);
}
})
.map(new MapFunction<String, String>() {

@Override
public String map(String value) throws Exception {
StringBuilder fieldsBuilder = new StringBuilder();
// 解析JSON数据
JSONObject record = JSON.parseObject(value, Feature.OrderedField);
// 获取最新的字段值
JSONArray data = record.getJSONArray(“data”);
// 遍历,字段值的JSON数组,只有一个元素
for (int i = 0; i < data.size(); i++) {
// 获取到JSON数组的第i个元素
JSONObject obj = data.getJSONObject(i);
if (obj != null) {
fieldsBuilder.append(record.getLong(“id”)); // 序号id
fieldsBuilder.append(fieldDelimiter); // 字段分隔符
fieldsBuilder.append(record.getLong(“es”)); //业务时间戳
fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(record.getLong(“ts”)); // 日志时间戳
fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(record.getString(“type”)); // 操作类型
for (Map.Entry<String, Object> entry : obj.entrySet()) {

fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(entry.getValue()); // 表字段数据
}

}
}
return fieldsBuilder.toString();
}

});

//cityDS.print();
//stream.print();

// sink
// 以下条件满足其中之一就会滚动生成新的文件
RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create()
.withRolloverInterval(60L * 1000L) //滚动写入新文件的时间,默认60s。根据具体情况调节
.withMaxPartSize(1024 * 1024 * 128L) //设置每个文件的最大大小 ,默认是128M,这里设置为128M
.withInactivityInterval(60L * 1000L) //默认60秒,未写入数据处于不活跃状态超时会滚动新文件
.build();

StreamingFileSink<String> sink = StreamingFileSink
//.forRowFormat(new Path(“file:///E://binlog_db/city”), new SimpleStringEncoder<String>())
.forRowFormat(new Path(“hdfs://kms-1:8020/binlog_db/code_city_delta”), new SimpleStringEncoder<String>())
.withBucketAssigner(new EventTimeBucketAssigner())
.withRollingPolicy(rollingPolicy)
.withBucketCheckInterval(1000) // 桶检查间隔,这里设置1S
.build();

cityDS.addSink(sink);
env.execute();
}
}

对于Flink Sink到HDFS,StreamingFileSink 替代了先前的 BucketingSink,用来将上游数据存储到 HDFS 的不同目录中。它的核心逻辑是分桶,默认的分桶方式是 DateTimeBucketAssigner,即按照处理时间分桶。处理时间指的是消息到达 Flink 程序的时间,这点并不符合我们的需求。因此,我们需要自己编写代码将事件时间从消息体中解析出来,按规则生成分桶的名称,具体代码如下:

package com.etl.kafka2hdfs;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
*  @Created with IntelliJ IDEA.
*  @author : jmx
*  @Date: 2020/3/27
*  @Time: 12:49
*  
*/

public class EventTimeBucketAssigner implements BucketAssigner<String, String> {

@Override
public String getBucketId(String element, Context context) {
String partitionValue;
try {
partitionValue = getPartitionValue(element);
} catch (Exception e) {
partitionValue = “00000000”;
}
return “dt=” + partitionValue;//分区目录名称
}

@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
private String getPartitionValue(String element) throws Exception {

// 取出最后拼接字符串的es字段值,该值为业务时间
long eventTime = Long.parseLong(element.split(“,”)[1]);
Date eventDate = new Date(eventTime);
return new SimpleDateFormat(“yyyyMMdd”).format(eventDate);
}
}

离线还原MySQL数据

经过上述步骤,即可将Binlog日志记录写入到HDFS的对应的分区中,接下来就需要根据增量的数据和存量的数据还原最新的数据。Hive 表保存在 HDFS 上,该文件系统不支持修改,因此我们需要一些额外工作来写入数据变更。常用的方式包括:JOIN、Hive 事务、或改用 HBase、kudu。

如昨日的存量数据code_city,今日增量的数据为code_city_delta,可以通过 FULL OUTER JOIN,将存量和增量数据合并成一张最新的数据表,并作为明天的存量数据:


INSERT OVERWRITE TABLE code_city
SELECT
COALESCE( t2.id, t1.id ) AS id,
COALESCE ( t2.city, t1.city ) AS city,
COALESCE ( t2.province, t1.province ) AS province,
COALESCE ( t2.event_time, t1.event_time ) AS event_time
FROM
code_city t1
FULL OUTER JOIN (
SELECT
id,
city,
province,
event_time
FROM
(– 取最后一条状态数据
SELECT
id,
city,
province,
dml_type,
event_time,
row_number ( ) over ( PARTITION BY id ORDER BY event_time DESC ) AS rank
FROM
code_city_delta
WHERE
dt = ‘20200324’ — 分区数据
) temp
WHERE
rank = 1
) t2 ON t1.id = t2.id;

小结

本文主要从Binlog流式采集和基于Binlog的ODS数据还原两方面,介绍了通过Flink实现实时的ETL,此外还可以将binlog日志写入kudu、HBase等支持事务操作的NoSQL中,这样就可以省去数据表还原的步骤。本文是《基于Canal与Flink实现数据实时增量同步》的第二篇,关于canal解析Binlog日志写入kafka的实现步骤,参见《基于Canal与Flink实现数据实时增量同步一》。

基于Canal与Flink实现数据实时增量同步(一)

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

准备

常见的binlog命令

# 是否启用binlog日志
show variables like 'log_bin';
# 查看binlog类型
show global variables like 'binlog_format';
# 查看详细的日志配置信息
show global variables like '%log%';
# mysql数据存储目录
show variables like '%dir%';
# 查看binlog的目录
show global variables like "%log_bin%";
# 查看当前服务器使用的biglog文件及大小
show binary logs;
# 查看最新一个binlog日志文件名称和Position
show master status;

配置MySQL的binlog

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

部署canal

安装canal

[kms@kms-1 softwares]$ tar -xzvf canal.deployer-1.1.4.tar.gz  -C /opt/modules/canal/
  • 目录结构
drwxr-xr-x 2 root root 4096 Mar  5 14:19 bin
drwxr-xr-x 5 root root 4096 Mar  5 13:54 conf
drwxr-xr-x 2 root root 4096 Mar  5 13:04 lib
drwxrwxrwx 4 root root 4096 Mar  5 14:19 logs

配置修改

  • 修改conf/example/instance.properties,修改内容如下:
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = kms-1.apache.com:3306 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
# mq config,kafka topic名称
canal.mq.topic=test

修改conf/canal.properties,修改内容如下:

# 配置zookeeper地址
canal.zkServers =kms-2:2181,kms-3:2181,kms-4:2181
# 可选项: tcp(默认), kafka, RocketMQ,
canal.serverMode = kafka
# 配置kafka地址
canal.mq.servers = kms-2:9092,kms-3:9092,kms-4:9092

启动canal

sh bin/startup.sh

关闭canal

sh bin/stop.sh

部署Canal Admin(可选)

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

要求

canal-admin的限定依赖:

  • MySQL,用于存储配置和节点等相关数据
  • canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

安装canal-admin

[kms@kms-1 softwares]$ tar -xzvf canal.admin-1.1.4.tar.gz -C /opt/modules/canal-admin/
  • 目录结构
drwxrwxr-x 2 kms kms 4096 Mar 6 11:25 bin
drwxrwxr-x 3 kms kms 4096 Mar 6 11:25 conf
drwxrwxr-x 2 kms kms 4096 Mar 6 11:25 lib
drwxrwxr-x 2 kms kms 4096 Sep 2 2019 logs
  • 配置修改
vi conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8

spring.datasource:
address: kms-1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1

canal:
adminUser: admin
adminPasswd: admin
  • 初始化原数据库
mysql -uroot -p
# 导入初始化SQL
#注:(1)初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化
# (2)canal_manager.sql默认会在conf目录下
> mysql> source /opt/modules/canal-admin/conf/canal_manager.sql
  • 启动canal-admin
sh bin/startup.sh
  • 访问

可以通过 http://kms-1:8089/ 访问,默认密码:admin/123456

  • canal-server端配置

使用canal_local.properties的配置覆盖canal.properties,将下面配置内容配置在canal_local.properties文件里面,就可以了。

# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
  • 启动canal-serve
sh bin/startup.sh local

注意:先启canal-server,然后再启动canal-admin,之后登陆canal-admin就可以添加serve和instance了。

启动kafka控制台消费者测试

bin/kafka-console-consumer.sh –bootstrap-server kms-2:9092,kms-3:9092,kms-4:9092 –topic test –from-beginning

此时MySQL数据表若有变化,会将row类型的log写进Kakfa,具体格式为JSON:

  • insert操作
{
“data”:[
{
“id”:”338″,
“city”:”成都”,
“province”:”四川省”
}
],
“database”:”qfbap_ods”,
“es”:1583394964000,
“id”:2,
“isDdl”:false,
“mysqlType”:{
“id”:”int(11)”,
“city”:”varchar(256)”,
“province”:”varchar(256)”
},
“old”:null,
“pkNames”:[
“id”
],
“sql”:””,
“sqlType”:{
“id”:4,
“city”:12,
“province”:12
},
“table”:”code_city”,
“ts”:1583394964361,
“type”:”INSERT”
}
  • update操作
{
“data”:[
{
“id”:”338″,
“city”:”绵阳市”,
“province”:”四川省”
}
],
“database”:”qfbap_ods”,
“es”:1583395177000,
“id”:3,
“isDdl”:false,
“mysqlType”:{
“id”:”int(11)”,
“city”:”varchar(256)”,
“province”:”varchar(256)”
},
“old”:[
{
“city”:”成都”
}
],
“pkNames”:[
“id”
],
“sql”:””,
“sqlType”:{
“id”:4,
“city”:12,
“province”:12
},
“table”:”code_city”,
“ts”:1583395177408,
“type”:”UPDATE”
}
  • delete操作
{
“data”:[
{
“id”:”338″,
“city”:”绵阳市”,
“province”:”四川省”
}
],
“database”:”qfbap_ods”,
“es”:1583395333000,
“id”:4,
“isDdl”:false,
“mysqlType”:{
“id”:”int(11)”,
“city”:”varchar(256)”,
“province”:”varchar(256)”
},
“old”:null,
“pkNames”:[
“id”
],
“sql”:””,
“sqlType”:{
“id”:4,
“city”:12,
“province”:12
},
“table”:”code_city”,
“ts”:1583395333208,
“type”:”DELETE”
}

JSON日志格式解释

  • data:最新的数据,为JSON数组,如果是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据,如果是删除,则表示被删除的数据
  • database:数据库名称
  • es:事件时间,13位的时间戳
  • id:事件操作的序列号,1,2,3…
  • isDdl:是否是DDL操作
  • mysqlType:字段类型
  • old:旧数据
  • pkNames:主键名称
  • sql:SQL语句
  • sqlType:是经过canal转换处理的,比如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal
  • table:表名
  • ts:日志时间
  • type:操作类型,比如DELETE,UPDATE,INSERT

利用Flink SQL实时统计单据状态(含历史数据)

背景

需求抽象下,有一种单据,整个生命周期中有很多种状态,现在要实时按照某个维度(例如用户id)统计其中某几个状态的数量,并按照该维度定时推送统计数据。

对于这个需求,解决办法有很多,那么什么情况下不需要Flink来实现?

  1. MySQL存储,单库单表,数据量比较小,直接sql查询
  2. MySQL存储,单库单表,统计维度和状态字段加了索引,且该维度数据比较分散,不会集中在某几个值(按照维度一次查询的数量比较少)
  3. 业务接受统计数据延迟,例如一个小时更新一次,那么可以用小时离线表来统计
  4. 有AnalyticDB(aliyun)、Elastic Search等分析型数据库可用(当然需要考虑成本问题)

其实上面的场景已经覆盖大部分情况,从技术方案选型的角度来讲Flink一定是最后才考虑的方案,为什么这么说呢?

  1. 流式计算还是比较年轻的技术,开发体验还有待提高,尽管有FlinkSQL,但学习成本依然不低,需要转换思维
  2. 调试成本比较高,无法像写代码那样调试计算逻辑,flink的调试通常只能看输入、输出
  3. 需要有合适的数据输入,如果数据不合适的话,可能非常费事(下文会提到)

那么什么情况下可以采用Flink的方案呢?

  1. 真的对实时性有严格要求,并且由于原始数据量级的问题,无法直接查询原始数据(对数据库压力太大,或耗时过久)
  2. 出于学习的目的,且时间充裕
  3. 其他因为直接查询路子走不通的情况(例如,分库分表了、数据量比较大太耗时)
  4. 由于各种原因,申请不到分析型数据库,或者有现成的flink集群
  5. 其他方案成本太高,包括 开发成本、机器资源成本、嘴皮子成本(开玩笑)

基本思路

那么下面我们就来讨论实时要求下使用Flink的方案实现。

对于实时计算架构,业界有比较成熟两个方案,Lambda和Kappa

Lambda架构比较成熟稳定,但是需要维护两套系统的代码,不易于维护

Kappa舍弃了离线数据,通过回溯流数据的方式来解决出问题后修复,但是如果时间跨度比较大的话,修复时间会很久,对于短期数据的统计,用这种方式肯定是更好的方案

离线和实时数据结合

对于我们这个需求:

  1. 首先时间跨度比较长,至少是几个月的
  2. 然后要考虑应用代码的BUG等问题,如果纯用流式计算,如果不进行维护,后续的误差会越来越大,我们希望的是要有一定的自校准能力。
  3. 另外结果的输出希望直接输出一张表,并且是全量数据,不必在应用层进行汇总,减少了一定的维护成本。

结合我们的需求和这两个方案的优缺点,可以发现并不能照搬以上两种方案,需要做一些微创新

那么一个简单的雏形就出现了,但是会有一些问题

  1. 任务每次重启都要先做离线数据的初始化,如果没有初始化,今日无变化的数据就不会落到结果表中。这样除了实时任务外还需要一个初始化任务,每次重启任务就需要有两步,先初始化,然后将实时任务回溯到今日0点然后重跑
  2. 我们知道离线数据通常需要花费比较久的时间,并且很多任务并不是0点就开始跑,有时候任务跑完就已经5、6点了,这期间离线数据用的是前天的,实时数据用的是今天的,这就会出现错误

针对上面两个问题,可以考虑如下优化

  1. 主要是想省个事,这个好办,每天来定时刷一遍离线数据,如果有实时数据,那就以实时为准,没有则用离线。最重要的是可以通过这种方式用离线来对实时数据做一次校准,因为实时数据的效期是一天,如果第二天没有更新,这条数据就会被清理掉,那么就会替换为离线数据,起到了校准的目的。
  2. 这个问题的本质是离线任务执行时间和实时计算的有效数据窗口之间有GAP,导致在离线任务产出之前,离线数据只有前日汇总可用,而实时计算的区间已经切换到今日,如下图所示

那么解决这个问题也比较简单,拉长实时计算的时间区间,给离线任务充足的时间,让离线数据和实时数据区间重叠即可,如下图所示,离线数据采用前日数据,不管昨日数据是否产生,实时数据从昨日0点开始统计,这样离线任务有整整一天的可执行时间,完全是充足的

实时状态计数如何算

如果熟悉Flink撤回机制,很容易想到,这个不就是状态统计吗,两个group by不就搞定实时计数了吗? 如下面的代码所示,当同一个单据号的状态发生变化,就会撤回之前的那条数据,这个user_id下,前一个状态的计数就会减一。

select user_id,status,COUNT(orderNo) num from (
    SELECT orderNo,LAST_VALUE(status) status ,LAST_VALUE(user_id) user_id from order_table group by orderNo
) group by user_id,status
;

这样是很好,但是没有考虑单据生命周期跨越实时计算覆盖范围的情况,假如实时计算近两天的数据,我们要统计的状态是(10,15),这两个状态是连续的,考虑以下情况:

  1. 单据在两天前创建,状态没有到10,实时计算时有了状态为10的数据,此时 状态10计数+1。
  2. 单据在近两天内创建,状态从10到15,由于单据状态为10或15都在实时计算范围内,根据撤回机制,状态10计数-1,状态15计数+1,很完美。
  3. 单据在两天前创建且状态流转到10,在近两天出现了状态15,根据上面的sql,状态15计数+1,但并没有感知之前的状态10,所以加上离线数据的话,状态10的计数就多加了。

简单说只利用实时计算的撤回机制,对于单据跨越实时计算覆盖范围的情况并不能很好处理。当然如果说我把实时计算区间设置大一些,几个月或者一年,超过单据最大生命周期的时间,这当然是可以的,但如果任务重启咋整,源数据并不会保存那么久,所以还是要考虑这个问题。

那状态变更的计数如何做呢?

根据上面描述,如果能拿到当前状态及前一个状态,那么可以对每个状态的计数进行加减。变更消息过来时,将更新后的状态计数加一,将更新前的状态计数减一,这样就能正确计数。但有时候现实会有很多限制条件,比如:状态变更消息没有更新前状态,或者干脆没有状态更新消息,那怎么办呢?改代码加上带更新前状态的消息,如果这种方式并不简单呢?

接入数据库变更日志(binlog)是一种很好的解耦方式,很多云服务提供的数据库都有对应的CDC(Change Data Capture)功能,像阿里云的Flink服务就提供了对应的connector(mysql-cdc);此外还有一些同步工具通过解析binlog将数据同步出来,通过开发对应的connector都可以对接上flink,例如Aliyun-DTScanal等。

但是有些数据库同步工具没有变更前数据,对于数据同步的场景是无所谓的,但是我们这个场景是必须要的,对于这种情况,如果非要走下去,那就要加个中间层,缓存每个单据的前一次状态。有两种方案:

  1. 可以通过Flink的backend state 来存储,不过这个存储是有时效的,但单据的生命周期有可能大于存储时效,这时可以再引入离线数据。
  2. 直接建一个单据前一次状态表,既作为维表又作为结果表,每次一条数过来先当做维表查询上一次状态,然后再把本次状态写入结果表。

然而第2种方案有些重,引入了一张新的外部表,相当于同步了一份单据前一次状态的数据,并且还是要考虑离线数据的问题。而方案1,根据数据分布的特性,由于我们关心的状态在某一时刻,单据并不会很多,离线单据状态可以直接缓存到Flink,所以这里我决定还是用方案1来实现。

实现方案

整体的架构如上图所示,具体代码见文末附录。

方案说明:

  • 单据上次状态:由于数据源没有更新前数据,因此我们只能利用Flink的存储,并加上离线的单据状态进行计算,得到前一次的状态值,这里自定义了聚合函数prevValue,缓存单据前一个状态,如果没有,那就从离线单据中取得,
  • 单据状态操作值转换:这里根据当前和上一次状态转换为+1或-1操作,如果两个状态一致,我们就要忽略掉,有可能是重复数据;利用UDTF,可能一条数据转为两条,上一次状态-1及当前状态+1;如果前一个状态为空,则忽略。

注意点:

  • 这里需要注意的是,从数据库同步到离线的单据,要选择通过更新时间来分区,这种方式会根据更新时间字段来判断某条数据该不该落到今天这个分区,这样会准确些。而普通的每天全量的方式就可能会把今天更新的数据也放到昨天的分区中,而我们实时计算的覆盖范围是识别更新时间的,这时有可能因为离线计算和实时计算有重叠而导致错误。


数据准确性调试

数据的调试相对于业务逻辑的调试更加麻烦些,有如下原因:

  1. 测试数据不太好造,主要是因为场景多,输入输出输出比较复杂
  2. 线上数据可能不如预期的规范,测试时不能覆盖到该场景

我目前也没有什么比较好的办法,就是花时间造数据测试;在线上数据跑起来后要多验证,对于异常数据要把源数据拉下来找出原因。

可改进点

  • 如果离线单据状态数据比较多,不能全部缓存,或者影响性能的话,可以引入其他存储,例如hbase。


参考资料

附录

CREATE FUNCTION statusCount AS 'com.fliaping.flink.udf.StatusCountUdtf' ;

CREATE FUNCTION prevValue AS 'com.fliaping.flink.udf.PrevValueUdaf' ;

-- 采购单的tt数据源
create table xx_source
(
    id BIGINT
    ,gmt_modified VARCHAR
    ,user_id VARCHAR
    ,biz_status VARCHAR
    ,event_time AS PROCTIME()
    ,PRIMARY KEY (`id`)
) with (
    type = 'your-db-type'
    ...
)
;


-- 状态统计离线表
create table offline_order_status_num
(
    user_id VARCHAR COMMENT '供应商ID'
    ,biz_status VARCHAR COMMENT '业务状态'
    ,last_day_num BIGINT COMMENT '昨日待办数量'
    ,num BIGINT COMMENT '待办数量'
    ,ds VARCHAR COMMENT '分区字段'
    ,PRIMARY KEY (user_id, biz_status)
    ,PERIOD FOR SYSTEM_TIME    --定义了维表的标识。
) with (
    type = 'your-offline-type'
    ...
)
;

-- 单据状态离线表
create table order_status_history
(
    order_no VARCHAR COMMENT '供应商ID'
    ,biz_status VARCHAR COMMENT '业务状态'
    ,last_biz_status VARCHAR COMMENT '昨日状态'
    ,gmt_modified TIMESTAMP COMMENT '修改时间'
    ,last_gmt_modified TIMESTAMP COMMENT '昨日修改时间'
    ,ds VARCHAR COMMENT '分区字段'
    ,PRIMARY KEY (order_no)
    ,PERIOD FOR SYSTEM_TIME    --定义了维表的标识。
) with (
    type = 'your-offline-type'
    ...
)
;

-- 进行数据过滤
create VIEW filter_view AS
select  order_no
        ,user_id
        ,biz_status
        ,gmt_modified
        ,event_time
FROM    tt_source
where   order_no IS NOT NULL
and     user_id IS NOT NULL
AND     biz_status IS NOT NULL
AND     gmt_modified IS NOT NULL
;

-- 获取Flink存储中的单据前一个状态,并缓存当前状态
CREATE VIEW get_before_biz_status AS
SELECT  order_no
        ,user_id
        ,biz_status
        ,TO_TIMESTAMP (gmt_modified) gmt_modified
        ,DATE_FORMAT (TO_TIMESTAMP (gmt_modified), 'yyyyMMdd') AS gmt_modified_day
        ,prevValue(biz_status) OVER (PARTITION BY order_no ORDER BY event_time) prev_biz_status
FROM    filter_view
;

-- 对于Flink中没有存储上一次状态的单据,去离线数据中查出来
CREATE VIEW order_with_last_biz_status AS
SELECT  c.*
        -- 考虑每天的离线数据需要一定时间产出,故使用前日离线数据
        ,IF (
            d.ds = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
            ,d.last_biz_status
            ,d.biz_status
        ) prev_biz_status_offline
        ,IF (
            d.ds = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
            ,d.last_gmt_modified
            ,d.gmt_modified
        ) gmt_modified_offline
FROM    get_before_biz_status c
LEFT JOIN order_status_history FOR SYSTEM_TIME AS OF PROCTIME () d
ON      c.order_no = d.order_no AND c.prev_biz_status IS NULL
;

-- 将状态迁移通过udtf映射为不同状态的操作数,例如,状态迁移为10-15-16,那么10来的时候,状态10的操作是+1,15来时,状态10 -1,15 +1,产生两条数据,16来时,15 -1, 这么做的原因是TT数据对于更新操作,没有更新前数据和更新后数据,对于状态计数只能通过加入业务状态机的方式识别状态变更到来时,哪个状态+1,哪个状态-1
CREATE VIEW status_shift_opt AS
select  S.order_no
        ,S.user_id
        ,S.biz_status
        ,S.gmt_modified_day
        ,S.gmt_modified
        ,T.effect_biz_status
        ,T.effect_biz_status_opt
from    (
    -- 将离线和Flink中取到的前一个状态进行合并,确定该单据前一个状态
            SELECT  order_no
                    ,user_id
                    ,biz_status
                    ,gmt_modified_day
                    ,gmt_modified
                    ,IF(
                        prev_biz_status IS NULL AND prev_biz_status_offline IS NOT NULL
                        ,prev_biz_status_offline
                        ,prev_biz_status
                    ) prev_biz_status
            FROM    order_with_last_biz_status
            WHERE   (gmt_modified_offline IS NOT NULL AND gmt_modified_offline < gmt_modified)
            or      (gmt_modified_offline IS NULL)
        ) as S
        ,LATERAL TABLE (statusCount (biz_status,prev_biz_status)) as T (effect_biz_status, effect_biz_status_opt)
;

-- 按照天数据进行分组,并过滤出近两天的数据
CREATE VIEW summary_with_day AS
SELECT  user_id
        ,effect_biz_status biz_status
        ,gmt_modified_day
        ,SUM(effect_biz_status_opt) num
FROM    status_shift_opt
-- 实时数据,只统计昨日和今日的数据
WHERE   effect_biz_status IN (10,15)
GROUP BY user_id
         ,effect_biz_status,gmt_modified_day
;

-- 按照供应商+状态维度进行分组,将近两日数据累加,得到(昨日+今日数量)
CREATE VIEW summary_with_user_id_status AS
SELECT  user_id
        ,biz_status
        ,SUM(num) num
FROM    summary_with_day
WHERE  gmt_modified_day = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd')
or  gmt_modified_day = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
GROUP BY user_id,biz_status
;

-- 加上离线数量,维表离线数量(截止昨日0点) + 实时数量(昨日+今日)= 总数量
CREATE VIEW plus_offline_data AS
SELECT  user_id
        ,biz_status
        ,offline_num+num num
        ,CONCAT(
            'offlineNum:'
            ,offline_num
            ,',realtimeNum:'
            ,num
            ,',sourceType:Flink_STREAM'
        ) attribute
FROM    (
            SELECT  a.user_id
                    ,a.biz_status
                    ,IF(
                        b.ds = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
                        ,IF(b.last_day_num IS NULL,0,b.last_day_num)
                        ,IF(b.num IS NULL,0,b.num)
                    ) offline_num
                    ,a.num
            FROM    summary_with_user_id_status a
            LEFT JOIN offline_order_status_num FOR SYSTEM_TIME AS OF PROCTIME () b
            ON      a.user_id = b.user_id
            and     a.biz_status = b.biz_status
        )
;



create table mysql_output
(
    ,`user_id` varchar
    ,biz_status varchar
    ,num BIGINT
    ,attribute varchar
    ,primary key (biz_status, `user_id`) ) with ( type = 'mysql'
    ,url = 'jdbc:mysql://xxxxxx'
    ,tableName = 'order_status_count'
    ,userName = 'xxx'
    ,password = 'xxx'
)
;

INSERT INTO mysql_output
SELECT  *
FROM    plus_offline_data
;

UDF

prevValue函数(UDAF:UDF – Aggregation Functions):

package com.fliaping.flink.udf;

import org.apache.flink.table.functions.AggregateFunction;

public class PrevValueUdaf extends AggregateFunction<String, PrevValueUdaf.PrevValueAccum> {

    private static final long serialVersionUID = 3733259132660252997L;

    @Override
    public PrevValueAccum createAccumulator() {
        return new PrevValueAccum();
    }

    @Override
    public String getValue(PrevValueAccum prevValueAccum) {
        return prevValueAccum.prevValue;
    }

    public void accumulate(PrevValueAccum accumulator, String iValue) {
        accumulator.prevValue = accumulator.currentValue;
        accumulator.currentValue =iValue;
    }


    public void retract(PrevValueAccum accumulator, String iValue) {
        accumulator.currentValue = accumulator.prevValue;
        accumulator.prevValue = iValue;
    }

    public void merge(PrevValueAccum accumulator, Iterable<PrevValueAccum> its) {
        for (PrevValueAccum it : its) {
            accumulator.currentValue = it.currentValue;
            accumulator.prevValue = it.prevValue;
        }
    }

    public static class PrevValueAccum {
        private String prevValue;
        private String currentValue;
    }
}

statusCount函数(UDTF:UDF – Table Functions):

package com.fliaping.flink.udf;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava18.com.google.common.base.Objects;
import org.apache.flink.table.functions.TableFunction;

public class StatusCountUdtf extends TableFunction<Tuple2<String, Integer>>{

    private static final long serialVersionUID = 5467096366714935845L;

    public void eval(String bizStatus, String prevBizStatus) {

        if(Objects.equal(bizStatus, prevBizStatus)) {
            return;
        }

        if (bizStatus != null) {
            collect(Tuple2.of(bizStatus, 1));
        }

        if (prevBizStatus != null) {
            collect(Tuple2.of(prevBizStatus, -1));
        }

    }
}

离线 SQL代码:

-- 计算统计数量,包含day-1和day-2的数据
INSERT OVERWRITE TABLE order_status_num PARTITION(ds='${bizdate}')
SELECT  IF(a.user IS NULL,b.user_id,a.user_id) user_id
        ,IF(a.biz_status IS NULL,b.biz_status,a.biz_status) biz_status
        ,b.num last_day_num
        ,a.num
FROM    (
            SELECT  user_id
                    ,biz_status
                    ,COUNT(order_no) num
            FROM    my_order
            WHERE   ds = '${bizdate}'
            AND     biz_status IN (10,15)
            GROUP BY user_id
                     ,biz_status
        ) a
FULL OUTER JOIN (
                    SELECT  user_id
                            ,biz_status
                            ,COUNT(order_no) num
                    FROM    my_order
                    WHERE   ds = to_char(dateadd(to_date('${bizdate}', "yyyymmdd"), - 1, 'dd'), 'yyyymmdd')
                    AND     biz_status IN (10,15)
                    GROUP BY user_id
                             ,biz_status
                ) b
ON      a.user_id = b.user_id
AND     a.biz_status = b.biz_status
;

-- 计算单据状态,包含day-1和day-2的数据
INSERT OVERWRITE TABLE order_status_history PARTITION(ds='${bizdate}')
SELECT  IF(
            a.order_no IS NULL
            ,b.order_no
            ,a.order_no
        ) order_no
        ,a.biz_status
        ,b.biz_status last_biz_status
        ,a.gmt_modified
        ,b.gmt_modified last_gmt_modified
FROM    (
            SELECT  order_no
                    ,biz_status
                    ,gmt_modified
            FROM    my_order
            WHERE   ds = '${bizdate}'
            AND     biz_status IN (10,15)
        ) a
FULL OUTER JOIN (
                    SELECT  order_no
                            ,biz_status
                            ,gmt_modified
                    FROM    my_order
                    WHERE   ds = to_char(dateadd(to_date('${bizdate}', "yyyymmdd"), - 1, 'dd'), 'yyyymmdd')
                    AND     biz_status IN (10,15)
                ) b
ON      a.order_no = b.order_no
;

实时数仓|基于Flink1.11的SQL构建实时数仓探索实践

案例简介

本文会以电商业务为例,展示实时数仓的数据处理流程。另外,本文旨在说明实时数仓的构建流程,所以不会涉及太复杂的数据计算。为了保证案例的可操作性和完整性,本文会给出详细的操作步骤。为了方便演示,本文的所有操作都是在Flink SQL Cli中完成的。

架构设计

具体的架构设计如图所示:首先通过canal解析MySQL的binlog日志,将数据存储在Kafka中。然后使用Flink SQL对原始数据进行清洗关联,并将处理之后的明细宽表写入kafka中。维表数据存储在MySQL中,通过Flink SQL对明细宽表与维表进行JOIN,将聚合后的数据写入MySQL,最后通过FineBI进行可视化展示。

业务数据准备

  • 订单表(order_info)
CREATE TABLE `order_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `operate_time` datetime DEFAULT NULL COMMENT '操作时间',
  `expire_time` datetime DEFAULT NULL COMMENT '失效时间',
  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  `province_id` int(20) DEFAULT NULL COMMENT '地区',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
  • 订单详情表(order_detail)
CREATE TABLE `order_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单详情表';
  • 商品表(sku_info)
CREATE TABLE `sku_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
  `spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',
  `price` decimal(10,0) DEFAULT NULL COMMENT '价格',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',
  `sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格描述',
  `weight` decimal(10,2) DEFAULT NULL COMMENT '重量',
  `tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗余)',
  `category3_id` bigint(20) DEFAULT NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';

商品一级类目表(base_category1)

CREATE TABLE `base_category1` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(10) NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一级分类表';

商品二级类目表(base_category2)

CREATE TABLE `base_category2` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(200) NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint(20) DEFAULT NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二级分类表';

商品三级类目表(base_category3)

CREATE TABLE `base_category3` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(200) NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint(20) DEFAULT NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三级分类表';

省份表(base_province)

CREATE TABLE `base_province` (
  `id` int(20) DEFAULT NULL COMMENT 'id',
  `name` varchar(20) DEFAULT NULL COMMENT '省名称',
  `region_id` int(20) DEFAULT NULL COMMENT '大区id',
  `area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

区域表(base_region)

CREATE TABLE `base_region` (
  `id` int(20) NOT NULL COMMENT '大区id',
  `region_name` varchar(20) DEFAULT NULL COMMENT '大区名称',
   PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

数据处理流程

ODS层数据同步

关于ODS层的数据同步参见我的另一篇文章基于Canal与Flink实现数据实时增量同步(一)。主要使用canal解析MySQL的binlog日志,然后将其写入到Kafka对应的topic中。由于篇幅限制,不会对具体的细节进行说明。同步之后的结果如下图所示:

DIM层维表数据准备

本案例中将维表存储在了MySQL中,实际生产中会用HBase存储维表数据。我们主要用到两张维表:区域维表商品维表。处理过程如下:

区域维表

首先将mydw.base_provincemydw.base_region这个主题对应的数据抽取到MySQL中,主要使用Flink SQL的Kafka数据源对应的canal-json格式,注意:在执行装载之前,需要先在MySQL中创建对应的表,本文使用的MySQL数据库的名字为dim,用于存放维表数据。如下:

-- -------------------------
--   省份
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_province`;
CREATE TABLE `ods_base_province` (
  `id` INT,
  `name` STRING,
  `region_id` INT ,
  `area_code`STRING
) WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_province',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   省份
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_province`;
CREATE TABLE `base_province` (
    `id` INT,
    `name` STRING,
    `region_id` INT ,
    `area_code`STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_province', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--   省份
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_province
SELECT *
FROM ods_base_province;

-- -------------------------
--   区域
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_region`;
CREATE TABLE `ods_base_region` (
  `id` INT,
  `region_name` STRING
) WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_region',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   区域
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_region`;
CREATE TABLE `base_region` (
    `id` INT,
    `region_name` STRING,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_region', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--   区域
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_region
SELECT *
FROM ods_base_region;

经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:dim_province作为维表:

-- ---------------------------------
-- DIM层,区域维表,
-- 在MySQL中创建视图
-- ---------------------------------
DROP VIEW IF EXISTS dim_province;
CREATE VIEW dim_province AS
SELECT
  bp.id AS province_id,
  bp.name AS province_name,
  br.id AS region_id,
  br.region_name AS region_name,
  bp.area_code AS area_code
FROM base_region br 
     JOIN base_province bp ON br.id= bp.region_id
;

这样我们所需要的维表:dim_province就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:

-- -------------------------
--  一级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category1`;
CREATE TABLE `ods_base_category1` (
  `id` BIGINT,
  `name` STRING
)WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.base_category1',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ;

-- -------------------------
--  一级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category1`;
CREATE TABLE `base_category1` (
    `id` BIGINT,
    `name` STRING,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category1', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--  一级类目表
--   MySQL Sink Load Data
-- ------------------------- 

INSERT INTO base_category1
SELECT *
FROM ods_base_category1;

-- -------------------------
--  二级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category2`;
CREATE TABLE `ods_base_category2` (
  `id` BIGINT,
  `name` STRING,
  `category1_id` BIGINT
)WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_category2',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ;

-- -------------------------
--  二级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category2`;
CREATE TABLE `base_category2` (
    `id` BIGINT,
    `name` STRING,
    `category1_id` BIGINT,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category2', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--  二级类目表
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category2
SELECT *
FROM ods_base_category2;

-- -------------------------
-- 三级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category3`;
CREATE TABLE `ods_base_category3` (
  `id` BIGINT,
  `name` STRING,
  `category2_id` BIGINT
)WITH(
'connector' = 'kafka',
 'topic' = 'mydw.base_category3',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--  三级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category3`;
CREATE TABLE `base_category3` (
    `id` BIGINT,
    `name` STRING,
    `category2_id` BIGINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'base_category3', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--  三级类目表
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category3
SELECT *
FROM ods_base_category3;

-- -------------------------
--   商品表
--   Kafka Source
-- ------------------------- 

DROP TABLE IF EXISTS `ods_sku_info`;
CREATE TABLE `ods_sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10,0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10,2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0)
) WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.sku_info',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   商品表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `sku_info`;
CREATE TABLE `sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10,0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10,2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0),
   PRIMARY KEY (tm_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'sku_info', -- MySQL中的待插入数据的表
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
--   商品
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO sku_info
SELECT *
FROM ods_sku_info;

经过上面的步骤,我们可以将创建商品维表的基础数据表同步到MySQL中,同样需要提前创建好对应的数据表。接下来我们使用上面的基础表在mySQL的dim库中创建一张视图:dim_sku_info,用作后续使用的维表。

-- ---------------------------------
-- DIM层,商品维表,
-- 在MySQL中创建视图
-- ---------------------------------
CREATE VIEW dim_sku_info AS
SELECT
  si.id AS id,
  si.sku_name AS sku_name,
  si.category3_id AS c3_id,
  si.weight AS weight,
  si.tm_id AS tm_id,
  si.price AS price,
  si.spu_id AS spu_id,
  c3.name AS c3_name,
  c2.id AS c2_id,
  c2.name AS c2_name,
  c3.id AS c1_id,
  c3.name AS c1_name
FROM
(
  sku_info si 
  JOIN base_category3 c3 ON si.category3_id = c3.id
  JOIN base_category2 c2 ON c3.category2_id =c2.id
  JOIN base_category1 c1 ON c2.category1_id = c1.id
);

至此,我们所需要的维表数据已经准备好了,接下来开始处理DWD层的数据。

DWD层数据处理

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。具体过程如下:

-- -------------------------
--   订单详情
--   Kafka Source
-- ------------------------- 

DROP TABLE IF EXISTS `ods_order_detail`;
CREATE TABLE `ods_order_detail`(
  `id` BIGINT,
  `order_id` BIGINT,
  `sku_id` BIGINT,
  `sku_name` STRING,
  `img_url` STRING,
  `order_price` DECIMAL(10,2),
  `sku_num` INT,
  `create_time` TIMESTAMP(0)
) WITH(
 'connector' = 'kafka',
 'topic' = 'mydw.order_detail',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- -------------------------
--   订单信息
--   Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_info`;
CREATE TABLE `ods_order_info` (
  `id` BIGINT,
  `consignee` STRING,
  `consignee_tel` STRING,
  `total_amount` DECIMAL(10,2),
  `order_status` STRING,
  `user_id` BIGINT,
  `payment_way` STRING,
  `delivery_address` STRING,
  `order_comment` STRING,
  `out_trade_no` STRING,
  `trade_body` STRING,
  `create_time` TIMESTAMP(0) ,
  `operate_time` TIMESTAMP(0) ,
  `expire_time` TIMESTAMP(0) ,
  `tracking_no` STRING,
  `parent_order_id` BIGINT,
  `img_url` STRING,
  `province_id` INT
) WITH(
'connector' = 'kafka',
 'topic' = 'mydw.order_info',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
) ; 

-- ---------------------------------
-- DWD层,支付订单明细表dwd_paid_order_detail
-- ---------------------------------
DROP TABLE IF EXISTS dwd_paid_order_detail;
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,0),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- DWD层,已支付订单明细表
-- 向dwd_paid_order_detail装载数据
-- ---------------------------------
INSERT INTO dwd_paid_order_detail
SELECT
  od.id,
  oi.id order_id,
  oi.user_id,
  oi.province_id,
  od.sku_id,
  od.sku_name,
  od.sku_num,
  od.order_price,
  oi.create_time,
  oi.operate_time
FROM
    (
    SELECT * 
    FROM ods_order_info
    WHERE order_status = '2' -- 已支付
    ) oi JOIN
    (
    SELECT *
    FROM ods_order_detail
    ) od 
    ON oi.id = od.order_id;

ADS层数据

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了Kafka中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

  • ads_province_index

首先在MySQL中创建对应的ADS目标表:ads_province_index

CREATE TABLE ads.ads_province_index(
  province_id INT(10),
  area_code VARCHAR(100),
  province_name VARCHAR(100),
  region_id INT(10),
  region_name VARCHAR(100),
  order_amount DECIMAL(10,2),
  order_count BIGINT(10),
  dt VARCHAR(100),
  PRIMARY KEY (province_id, dt) 
) ;

向MySQL的ADS层目标装载数据:

-- Flink SQL Cli操作
-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
--      2.每天每个省份的订单金额
-- ---------------------------------
CREATE TABLE ads_province_index(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_id INT,
  region_name STRING,
  order_amount DECIMAL(10,2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED  
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/ads',
    'table-name' = 'ads_province_index', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表
-- ---------------------------------
CREATE TABLE tmp_province_index(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE
)WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_province_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index
SELECT
      province_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_province_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_province_index_source(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_province_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

-- ---------------------------------
-- DIM层,区域维表,
-- 创建区域维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_province`;
CREATE TABLE dim_province (
  province_id INT,
  province_name STRING,
  area_code STRING,
  region_id INT,
  region_name STRING ,
  PRIMARY KEY (province_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'dim_province', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'scan.fetch-size' = '100'
);

-- ---------------------------------
-- 向ads_province_index装载数据
-- 维表JOIN
-- ---------------------------------

INSERT INTO ads_province_index
SELECT
  pc.province_id,
  dp.area_code,
  dp.province_name,
  dp.region_id,
  dp.region_name,
  pc.order_amount,
  pc.order_count,
  cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_source pc
  JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp 
  ON dp.province_id = pc.province_id;

当提交任务之后:观察Flink WEB UI:

查看ADS层的ads_province_index表数据:

  • ads_sku_index

首先在MySQL中创建对应的ADS目标表:ads_sku_index

CREATE TABLE ads_sku_index
(
  sku_id BIGINT(10),
  sku_name VARCHAR(100),
  weight DOUBLE,
  tm_id BIGINT(10),
  price DOUBLE,
  spu_id BIGINT(10),
  c3_id BIGINT(10),
  c3_name VARCHAR(100) ,
  c2_id BIGINT(10),
  c2_name VARCHAR(100),
  c1_id BIGINT(10),
  c1_name VARCHAR(100),
  order_amount DOUBLE,
  order_count BIGINT(10),
  sku_count BIGINT(10),
  dt varchar(100),
  PRIMARY KEY (sku_id,dt)
);

向MySQL的ADS层目标装载数据:

-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
--      2.每天每个商品对应的订单金额
--      3.每天每个商品对应的数量
-- ---------------------------------
CREATE TABLE ads_sku_index
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/ads',
    'table-name' = 'ads_sku_index', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe'
);

-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
CREATE TABLE tmp_sku_index(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
 order_sku_num BIGINT,
    pay_date DATE
)WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
      sku_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
   sum(sku_num) order_sku_num,
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
;

-- ---------------------------------
-- tmp_sku_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    order_sku_num BIGINT,
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM层,商品维表,
-- 创建商品维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
  id BIGINT,
  sku_name STRING,
  c3_id BIGINT,
  weight DECIMAL(10,2),
  tm_id BIGINT,
  price DECIMAL(10,2),
  spu_id BIGINT,
  c3_name STRING,
  c2_id BIGINT,
  c2_name STRING,
  c1_id BIGINT,
  c1_name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'dim_sku_info', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向ads_sku_index装载数据
-- 维表JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc 
  JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
  ON ds.id = sc.sku_id
  ;

当提交任务之后:观察Flink WEB UI:

查看ADS层的ads_sku_index表数据:

FineBI结果展示

其他注意点

Flink1.11.0存在的bug

当在代码中使用Flink1.11.0版本时,如果将一个change-log的数据源insert到一个upsert sink时,会报如下异常:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. 
Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])

该bug目前已被修复,修复可以在Flink1.11.1中使用。

docker快速搭建alibaba-canal


需求来源

 跟踪表对比前后修改数据变化

方案

  1. AOP、拦截器优点: 实现简单缺点:
    1. 代码入侵性强 每加入一个模块就需要处理切面添加切入点,模块配置也需要增加。
    2. 项目依赖麻烦,档案模块需要依赖对应模块api包进行序列化实现对比,业务模块需要实现查询接口。
    3. 模块适配插拔式实现困难
    4. sql解析困难复杂容易出错(拦截mapper层解析update语句)
  2. 日志采集方案优点:
    1. 只需要处理数据变化对比,可以直接集成三方插件如es查询,无代码入侵。后续扩展强,无论任何任何模块需要加入类似功能只需要中间件配置,添加log监听即可。
    缺点:
    1. 需要部署其他服务,增加运维成本,依赖于三方服务可能存在数据丢失

canal部署

环境

这里使用的是mac os 搭配docker部署,几乎和linux环境差不多

canal + rocketMQ + ES 实现

基础同步技术介绍

canal

准备

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

快速启动参考:QuickStart

其中主要注意配置匹配数据库、表规则配置,可以选择匹配部分表做记录。 也可以通过配置不同的topic消费做消费实现不同的业务。参考文档:配置 主要关注Topic配置匹配规则

RoketMQ 搭建

docker 使用 foxiswho/rocketmq 镜像 参考博客

docker run -d -p 10911:10911 -p 10909:10909\
 --name rmqbroker --link rmqserver:namesrv\
 -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt"\
 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m"\
 foxiswho/rocketmq:broker-4.5.1

使用上面docker命令拉取镜像,快速搭建

搭建canal

docker搭建官方文档

下载docker启动文件

linux命令

wget  https://github.com/alibaba/canal/blob/master/docker/run.sh

mac 命令

curl -0 https://github.com/alibaba/canal/blob/master/docker/run.sh

配置canal配置项

参考配置

主要注意配置

#  按需修改成自己的数据库信息
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# mq config 这个配置使用动态topic就可以不配置,选其一
canal.mq.topic=example
# 针对库名或者表名发送动态topic,参考配置
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
sh run.sh -e canal.auto.scan=false \
		  -e canal.destinations=test \
		  -e canal.instance.master.address=172.17.0.7:3306  \
		  -e canal.instance.dbUsername=admin  \
		  -e canal.instance.dbPassword=123456  \
		  -e canal.instance.defaultDatabaseName=mytest  \
		  -e canal.instance.connectionCharset=UTF-8 \
		  -e canal.instance.tsdb.enable=true \
		  -e canal.instance.gtidon=false  \
		  -e canal.mq.dynamicTopic=mytest:mytest.sg_pop_base_info  \
		  -e canal.mq.partition=0 \
		  -e canal.serverMode=rocketmq \
		  -e canal.mq.servers=172.17.0.10:9876 \
		  -e canal.mq.retries=0 \
		  -e canal.mq.batchSize=16384 \
		  -e canal.mq.maxRequestSize=1048576 \
		  -e canal.mq.lingerMs=200 \
		  -e canal.mq.bufferMemory=33554432 \
		  -e canal.mq.canalBatchSize=50  \
		  -e canal.mq.canalGetTimeout=100 \
		  -e canal.mq.flatMessage=true \
		  -e canal.mq.compressionType=none \
		  -e canal.mq.acks=all \
		  -e canal.mq.transaction=false 

mysql、mq如果使用的容器,注意添加link配置 或者 通过命令docker inspect [容器名] 查看IPAddress配置到配置中

docker查看启动成功

#运行 docker logs 容器名称
docker logs canal-server

# 如下表示成功
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Generating SSH1 RSA host key:                              [  OK  ]
Starting sshd:                                             [  OK  ]
Starting crond:                                            [  OK  ]
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...

有兴趣的可以看一下run.sh这个文件,搞过服务器的肯定看得懂,就是各种变量然后组装docker 的run命令

查看MQ消息

修改监控表数据即可查看mq中消息,如果修改了数据没有消息,查看canal-server容器中日志报错

本地mq客户端查看消息http://localhost:8180/#/message

实现技术通过canal获取日志拿到对比数据放入MQ,MQ消息数据如下:

新增消息

{
    "data": [
        {
            "id": "111",
            "name": "ad ",
            "create_date": "2020-04-15 14:31:03",
            "update_date": "2020-04-15 14:31:08",
            "create_user": "asdf ",
            "update_user": "sadf ",
            "is_deleted": "0"
        }
    ],
    "database": "grid_comm_center",
    "es": 1586932274000,
    "id": 66,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(64)",
        "name": "varchar(200)",
        "create_date": "timestamp",
        "update_date": "timestamp",
        "create_user": "varchar(32)",
        "update_user": "varchar(32)",
        "is_deleted": "tinyint(2) unsigned"
    },
    "old": null,
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "name": 12,
        "create_date": 93,
        "update_date": 93,
        "create_user": 12,
        "update_user": 12,
        "is_deleted": -6
    },
    "table": "demo_id",
    "ts": 1586932275053,
    "type": "INSERT"
}

修改消息

{
    "data": [
        {
            "id": "111",
            "name": "ad asdf ",
            "create_date": "2020-04-15 14:31:03",
            "update_date": "2020-04-15 14:31:54",
            "create_user": "asdf adf ",
            "update_user": "sadf asdf",
            "is_deleted": "0"
        }
    ],
    "database": "grid_comm_center",
    "es": 1586932314000,
    "id": 67,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(64)",
        "name": "varchar(200)",
        "create_date": "timestamp",
        "update_date": "timestamp",
        "create_user": "varchar(32)",
        "update_user": "varchar(32)",
        "is_deleted": "tinyint(2) unsigned"
    },
    "old": [
        {
            "name": "ad ",
            "update_date": "2020-04-15 14:31:08",
            "create_user": "asdf ",
            "update_user": "sadf "
        }
    ],
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "name": 12,
        "create_date": 93,
        "update_date": 93,
        "create_user": 12,
        "update_user": 12,
        "is_deleted": -6
    },
    "table": "demo_id",
    "ts": 1586932314983,
    "type": "UPDATE"
}

删除消息

{
    "data": [
        {
            "id": "1111",
            "name": "123123",
            "create_date": "2020-03-17 17:48:23",
            "update_date": "2020-03-17 17:48:26",
            "create_user": "你好",
            "update_user": "你好",
            "is_deleted": "1"
        }
    ],
    "database": "grid_comm_center",
    "es": 1587366810000,
    "id": 1049,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(64)",
        "name": "varchar(200)",
        "create_date": "timestamp",
        "update_date": "timestamp",
        "create_user": "varchar(32)",
        "update_user": "varchar(32)",
        "is_deleted": "tinyint(2) unsigned"
    },
    "old": null,
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "name": 12,
        "create_date": 93,
        "update_date": 93,
        "create_user": 12,
        "update_user": 12,
        "is_deleted": -6
    },
    "table": "demo_id",
    "ts": 1587366810548,
    "type": "DELETE"
}

如上消息可以清楚拿到数据库、表详细变更,通过拿到变更数据解析插入数据库即可。

官方中还可以通过ClientAdapter 适配器入库到RDB、HBase、ES,这里根据需求做部署操作吧。目前这个里自己做档案模块进行消费写入档案数据,预计搜索可以放入es可以加快查询效率。

查看这些消息,然后做消费即可做自己想做的事情,至此已经完成了canal的基础搭建

消费消息写入ElasticSearch

参考博客

maven配置

<elasticsearch.version>7.2.0</elasticsearch.version>
<!-- Java High Level REST Client -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>${elasticsearch.version}</version>
</dependency>

Spring boot封装工具类

mq使用的是公司封装的包,所以这里省略mq相关代码。直接集成mq消费相关消费做自己业务处理即可。

通过案例看Elasticsearch优化

我瓣用户产品中有多个核心场景都在使用 Elasticsearch (以下简称 ES),主要用在全文搜索、聚合计算等方面。ES 相关的功能开发者主要是我,我最早是 2016 年在 选影视 的功能上开始使用 ES 做结构化搜索,这个工具给运营同学和资深用户提供了按各种复杂条件找电影的,可以说是最好的途径。

这个项目为多个核心产品功能提供原始数据,这几年它历经多次迭代和功能完善,不过最近它出了一点问题,我觉得解决问题的过程很值得写篇文章分享一下,所以就有了此文。

为了让大家能更理解下面的内容,先介绍下「选影视」存储在 ES 里面的文档部分字段:

class Subject(DocType):
    title = Text()
    genres = Text()
    countries = Text()
    standard_tags = Text()
    ...

每个文档的_id是条目 ID,也包含很多条目属性字段 (如上述列出的标题、类型、地区、标签,以及未列出的一些字段),其中标签 (standard_tags) 字段是非常重要的,正是由于这个字段的数据,可以基于 ES 搜索包含某个 (些) 标签条目 ID 列表。

问题

8 月 28 日 (下图中的最高峰那天) 平台同事 @我,说发现这个项目最近服务调用慢且有大量超时,由于项目是一个微服务,所以经常触发服务调用的熔断,希望解决。

这个项目由于最近改动并不大,我已经有一段时间没有特别关注了,赶紧打开对应的 Sentry 页面,通过一个 Issue 页面找到问题原因:

某个 API 请求 ES 很容易造成请求超时 (抛 ConnectionTimeout)

下面是按天统计的事件总数的趋势图:

设置的超时间隔是 2 秒:这已经是 API 请求里面非常宽容的阈值了,事实上在之前的使用中大部分对 ES 的 API 请求在几毫秒到几十毫秒之间,鲜有超时问题。

PS: 这个图下面还会出现多次,会分别展示对应时间点下每天事件的总数范围。

很快和对应开发同事确认,造成问题的接口是给新版本 APP 里面的新功能提供的,我们发现问题时这个超时事件已经达到每天 4-6w+,虽然看起来量倒不算大,但是我依然觉得需要快速解决它:

  • 已经影响了服务质量
  • 功能比较隐蔽,如果不是主动用这个功能是不会触发 API 请求的,我体验了下对应功能确实很容易出现点开页面还没加载出来或者干脆窗口空白的情况,这太影响用户体验了
  • 这个超时量会随着新版本 APP 的装机量不断提高
  • 这些慢查询给 ES 带来压力,也影响了其他正常查询请求

回到问题,这个特别慢的请求是做一个聚合计算,看一下请求的 body:

{
  'aggs': {
    'by_standard_tags': {
      'terms': {
        'field': 'standard_tags.keyword',
        'size': 100
      }
    }
  },
  'query': {
    'bool': {
      'must': [{'ids': {'values': IDS}}]
    }
  }
}

其中 IDS 是条目 ID 列表,这个请求是让 ES 聚合这些条目包含的标签总数。举个例子,ID 为 1 的条目有「剧情 / 犯罪 / 动作」三个标签;ID 为 2 的条目有「喜剧 / 剧情 / 爱情」三个标签,那么 IDS 为[1, 2] 时,返回的内容中「剧情」为 2 (2 个条目都有这个标签),其他的标签都是 1 (只有一个条目包含)

OK,现在事情很明朗了,我们开始解决超时的问题吧。

这个接口的代码我当时一眼看去并没有发现问题,那马上想到的就是缓存和减少调用这 2 条路,但是很遗憾行不通:

  • 缓存。IDS 是用户看过 / 想看电影的条目列表,每个人都不同。用户访问和用户兴趣相关,且频率不高,由于用户量庞大不值得为了这么个小功能就主动缓存并添加一个好的更新缓存的机制,成本太高甚至效果会更差
  • 减少调用。目前的调用已经是按需请求,没有找到明显的可以优化调用量的地方

OK,既然「绕」不了了,就直面吧,我继续尝试其他解决超时问题的方法

优化聚合计算请求

这算是我的个人性格,接下来我第一个想法的思路就是优化这个聚合请求的写法。其实在 5 年前我的那个 《Python 高级编程》 的 PPT 里面就说过:

  1. 在合适的地方用合适的技巧
  2. 不是它不好,而是你没有用好

这一直算是我的技术格言吧,无论是不熟悉的还是熟知的内容,我都会保持敬畏。出了问题会首先考虑是不是我没有用对用好。就拿上面的这个 body 来说,其实做 2 件事:

  1. 限定要查询的 _id 范围 (in IDS)
  2. 聚合查找 standard_tags 字段中的标签数据,返回匹配标签数量最多的 100 个标签和包含的条目数量

那怎么优化呢?首先我试了一下改用Term Query代替IDs QueryIDs Query的写法是我开始用的,之后其他同学有这样的需求就按着我这种写法来了,我怀疑是这种查询语句的问题,改成下面这样的效果:

{
  'aggs': {
    'by_standard_tags': {
      'terms': {                                                                                                 'field': 'standard_tags.keyword',
        'size': 100
      }
    }
  },
  'query': {
    'bool': {
      'must': [{'terms': {'_id': IDS}}]
    }
  }
}

上线后发现修改对超时没有帮助 😢,说明IDs Query的写法没有问题。

优化点 1

这让我一时间没有了头绪,我开始搜索一些「优化 Elasticsearch」相关的技术博文和开源书籍,希望从中找找灵感。然后就在 Elastic 社区找到了一个 query+aggs 查询性能问题 的帖子,其作者也遇到了使用聚合后查询非常慢的问题,@kennywu76 给出了一个方案: 「在每一层 terms aggregation 内部加一个 {“execution_hint”:”map”}」。在评论区 @kennywu76 也给了详细的解释,我认为算是全网最好的解释了,转发一下:

Terms aggregation 默认的计算方式并非直观感觉上的先查询,然后在查询结果上直接做聚合。

ES 假定用户需要聚合的数据集是海量的,如果将查询结果全部读取回来放到内存里计算,内存消耗会非常大。因此 ES 利用了一种叫做 global ordinals 的数据结构来对聚合的字段来做 bucket 分配,这个 ordinals 用有序的数值来代表字段里唯一的一个字符串,因此为每个 ordinals 值分配一个 bucket 就等同于为每个唯一的 term 分配了 bucket。 之后遍历查询结果的时候,可以将结果映射到各个 bucket 里,就可以很快的统计出每个 bucket 里的文档数了。

这种计算方式主要开销在构建 global ordinals 和分配 bucket 上,如果索引包含的原始文档非常多,查询结果包含的文档也很多,那么默认的这种计算方式是内存消耗最小,速度最快的。

如果指定 execution_hint:map 则会更改聚合执行的方式,这种方式不需要构造 global ordinals,而是直接将查询结果拿回来在内存里构造一个 map 来计算,因此在查询结果集很小的情况下会显著的比 global ordinals 快。

要注意的是这中间有一个平衡点,当结果集大到一定程度的时候,map 的内存开销带来的代价可能就抵消了构造 global ordinals 的开销,从而比 global ordinals 更慢,所以需要根据实际情况测试对比一下才能找好平衡点。

对于我们这个场景,IDS 量级比较小所以查询结果集很小,可以改用execution_hint:map这种方式:

{
  'aggs': {
    'by_standard_tags': {
      'terms': {
        'execution_hint': 'map',
        'field': 'standard_tags.keyword',
        'size': 100
      }
    }
  }
}

优化点 2

优化过程里我突然想起了官网对于Filter or Query的优化意见 (详见延伸阅读链接 1),文中说:

过滤查询(Filtering queries)只是简单的检查包含或者排除,这就使得计算起来非常快。考虑到至少有一个过滤查询(filtering query)的结果是 “稀少的”(很少匹配的文档),并且经常使用不评分查询(non-scoring queries),结果会被缓存到内存中以便快速读取,所以有各种各样的手段来优化查询结果。

相反,评分查询(scoring queries)不仅仅要找出 匹配的文档,还要计算每个匹配文档的相关性,计算相关性使得它们比不评分查询费力的多。同时,查询结果并不缓存。

多亏倒排索引(inverted index),一个简单的评分查询在匹配少量文档时可能与一个涵盖百万文档的 filter 表现的一样好,甚至会更好。但是在一般情况下,一个 filter 会比一个评分的 query 性能更优异,并且每次都表现的很稳定。

过滤(filtering)的目标是减少那些需要通过评分查询(scoring queries)进行检查的文档。

这段话之前我也看过,但是这次再看发现了一个可优化的点,就是用Filter替代Query,因为我们想要的只是聚合结果,完全不需要做评分查询。请求 body 就改成了这样:

{
  'aggs': {
    'by_standard_tags': {
      'terms': {
        'execution_hint': 'map',
        'field': 'standard_tags.keyword',
        'size': 100
      }
    }
  },
 'query': {
   'bool': {
     'filter': [{'ids': {'values': IDS}}]
    }
  }
}

优化点 3

正是上面的这段对于Filter or Query的优化意见,让我也注意到一个词:「不评分查询」,前面的查询结果其实会返回传入的 IDS 列表的对应文档结果,但是我们根本不需要,所以可以不返回 source:

{
  'aggs': {
    'by_standard_tags': {
      'terms': {
        'execution_hint': 'map',
        'field': 'standard_tags.keyword',
        'size': 100
      }
    }
  },
 'query': {
   'bool': {
     'filter': [{'ids': {'value': IDS}}]
    }
  },
  '_source': False
}

优化结果

基于上面三点优化。上线后超时问题得到了很大缓解:

优化效果比较显著:

  1. 超时数降到了之前的约 1/4
  2. 项目总超时 (还有其他查询引起的超时) 降到了约之前的 1/5
  3. 找了一些 BadCase 对比效果,优化后请求耗时最差降到之前的一半,约 1/3 的请求的时间重回小于 100ms 级别

虽然已经不会触发平台的熔断了,但是超时事件总量依然很大,需要进一步优化。

优化分片数

我继续保持怀疑的态度寻找优化方案,在我的认知里面,像 Elasticsearch 这样成熟的、广受关注和欢迎的项目经过多年的发展,我不相信一个很常见的聚合查询就能引起集群负载的波动,也不相信它能引起这么大量的请求超时,所以我还在怀疑问题出在使用姿势上。由于我不熟悉 Java 语言,在短时间不具备阅读 ES 源码的能力,所以开始希望通过别人的文章中获得灵感。

在看到《Mastering Elasticsearch》中的「选择恰当的分片数量和分片副本数量」(延伸阅读链接 4) 这一节后,我赶紧看了下项目用的这个索引的分片情况 (通过http http://ES_URL/_cat/shards/INDEX_NAME),并和萨 (sa) 确认了一下。我瓣一开始用的 ES 版本比较低 – 2.3,之后升级到当时最新的 6.4,ES 在 6.X 及之前的版本默认索引分片数为 5 (Primary Shards)、副本数为 1 (Replica,当一个节点的主分片丢失,ES 可以把任意一个可用的分片副本推举为主分片),从 ES7.0 开始调整为默认索引分片数为 1、副本数为 1 (详见延伸阅读链接 2 和链接 3)。而我瓣为了数据安全,默认每个索引分片数为 5、副本数为 2,也就是这个索引一共有 15 个分片(总分片数 = 主分片数*(副分片数 + 1))。

ES 默认的分片配置并不适用于所有业务场景,那么分片数应该怎么安排呢?延伸阅读链接 5 是 ES 官方博客中的一篇叫做「Elasticsearch 究竟要设置多少分片数?」的文章,其中有这么 2 段内容:

Elasticsearch 中的数据组织成索引。每一个索引由一个或多个分片组成。每个分片是 Luncene 索引的一个实例,你可以把实例理解成自管理的搜索引擎,用于在 Elasticsearch 集群中对一部分数据进行索引和处理查询。

构建 Elasticsearch 集群的初期如果集群分片设置不合理,可能在项目的中后期就会出现性能问题。

通过分片,ES 把数据放在不同节点上,这样可以存储超过单节点容量的数据。而副本分片数的增加可以提高搜索的吞吐量 (主分片与副本都能处理查询请求,ES 会自动对搜索请求进行负载均衡)。在《Mastering Elasticsearch》里面还提到了路由 (routing) 功能 (延伸阅读链接 6),ES 在写入文档时,文档会通过一个公式路由到一个索引中的一个分片上。默认的选择分片公式如下:

shard_num = hash(_routing) % num_primary_shards

_routing字段的取值默_id字段。如果不指定路由,在查询 / 聚合时就需要由 ES 协调节点搜集到每个分片 (每个主分片或者其副本上查询结果,再将查询的结果进行排序然后返回:在我们这个场景,每次要从 5 个分片上聚合。

如果能够确定文档会被映射到哪个 (些) 分片,可以只在对应的一 (多) 个分片上执行查询命令,不用全局的搜索。我实际的试了下,用 routing 确实快了很多。所以看到这里我第一个感觉是分片多了会带来路由问题,前面说了,「每个分片是自管理的,对一部分数据进行索引和处理查询」,查询和聚合都需要在最后把结果搜集起来。那可不可以就把数据放在一个分片上,这样就没有路由的问题了?另外 ES7.0 默认的分片方案也是朝着这个方面走的,所以我觉得方案调整应该是经过一段时间的实践,考虑到大部分场景下1 Shard + 1 Replica这样的方案是更高的选择。

Ok, 现在优化的目的就是选择合适的主分片数 + 合适的副本分片数

要改善现在面临的问题,考虑本业务的数据量,分片坏掉的概率和数据安全等因素,我直观的感受就是我瓣的索引分片数太多了。但由于索引创建好后,主分片数量不可修改,只可以修改副本分片数量。而要修改主分片数量,只能重建或者建新的索引,代价比较大。

无论是官方还是一些大厂相关文章都没有对于分片数做出完美的公式,默认的不一定是最好的,但什么样的组合还是需要实际测试,所以我尝试了多个分片方案,如下:

Shard(s) Replica(s)
1 1
1 2
2 1
2 2
3 1
3 2
4 1
4 2
5 2
6 1
6 2
9 1
9 2

Note: 这里副本分片数没有大于 2 的方案是所用到的 ES 集群节点规模所限。

为了不影响现有业务,用的都是新索引,这么创建:

curl -XPUT "http://ES_URL/INDEXNAME/" -H 'Content-Type: application/json' -d '{
    "settings" : {
        "index" : {
            "number_of_shards" : 1,
            "number_of_replicas" : 1
        }
    }
}'

在数据上我使用 elasticsearch-dump 把旧索引的中数据灌到新索引,另外一方面在新索引的因业务逻辑上订阅 Kafka 消息同步对索引数据的修改。为了让写入更快,我还在灌数据过程中关闭了副本和索引刷新:

curl -XPUT "http://ES_URL/INDEXNAME/_settings" -H 'Content-Type: application/json' -d '{ "index" : { "refresh_interval" : "-1", "number_of_replicas": 0 } }'

优化结果

上述分片方案的尝试并不是按表格顺序来做的,而且一开始我的理解是分片太多,所以最初的主要目的是要减少分片数。我一开始是在当前的主分片方案上尝试5 Shards + 1 Replica,也就是减少副本分片数:

就是图中 9 月 2 日这天,超时降得很明显,这给我带来了非常大的信心,说明我的方向是对的。当时由于数据的问题,后来迁回了原来的索引一直到 9 月 4 日。

刚才提到,官方从 7.0 开始改为默认1 Shard + 1 Replica这样的方案,所以接下来我新建了一个1 Shard + 2 Replicas的新索引,9 月 4 日上线,当时效果也非常好。

但是通过上图可以看到当天超时量又涨起来了,其实这是我犯的一个错误,早上测试1 Shard + 2 Replica观察了一段时间效果非常好,我认为还可以继续降分片数以提高「路由效率」,所以调整成了1 Shard + 1 Replica,我当时觉得毫无疑问效果会更好,然后下午就请假了… 结果过了 2 个小时发现超时涨的非常厉害,就回滚了。

不过到这里,可以感受到官方默认的方案对于我们这个例子是不可取的:如果当时选择1 Shard + 1 Replica运行满一天,我相信超时量将远高于之前 8 月 28 日最高峰的超时量。我觉得造成这个问题是由于分片数太少了,2 个分片扛不住这样的吞吐量。

在接下来的一段时间里面在准备好数据后。我分别尝试了上述提到的主分片小于 5 的各种组合,结果非常反直觉:

超时情况在 1 Shard + 2 Replicas 和 5 Shard + 1 Replica 这 2 个方案下表现是最好的,其他的方案的效果都很差。

为什么说反直觉呢?我本来认为:

  • 考虑请求 ES 集群带来的压力,在一定分片数范围内增加主分片能提高吞吐量,由于路由效率超过一定阈值应该会起反作用
  • 副本分片数多的副作用只是硬盘空间的「浪费」,但是能对查询效率有帮助,所以可以在一定范围内增加副本

在《eBay 的 Elasticsearch 性能调优实践》(延伸阅读链接 7) 中有「搜索性能和副本数之间的关系」和「搜索性能和分片数量之间的关系」的 2 张图表,支持了我的自觉:

  • 分片数增加的过程中,开始时搜索吞吐量增大 (响应时间减少),但随着分片数量的增加,搜索吞吐量减小 (响应时间增加)
  • 搜索吞吐量几乎与副本数量成线性关系

现在的测试结果和预想对不上,尤其是5 Shard + 1 Replica5 Shard + 2 Replicas(最初的方案) 怎么效果差这么多?

我继续搜索,找到官方对副本数的建议和问题解释 (延伸阅读链接 7):

Which setup is going to perform best in terms of search performance? Usually, the setup that has fewer shards per node in total will perform better. The reason for that is that it gives a greater share of the available filesystem cache to each shard, and the filesystem cache is probably Elasticsearch’s number 1 performance factor.

So what is the right number of replicas? If you have a cluster that has num_nodes nodes, num_primaries primary shards in total and if you want to be able to cope with max_failures node failures at once at most, then the right number of replicas for you is max(max_failures, ceil(num_nodes / num_primaries) – 1).

也就是说,当我能接受的max_failures为 1、num_nodes为 3:

In : from math import ceil

In : max(1, ceil(3 / 5) - 1)  # num_primaries = 5
Out: 1 # 5 Shard + 1 Replica

In : max(1, ceil(3 / 1) - 1)
Out: 2 # 1 Shard + 2 Replicas  # num_primaries = 1

In : max(1, ceil(3 / 6) - 1)  # num_primaries = 6
Out: 1

In : max(1, ceil(3 / 9) - 1)  # num_primaries = 9
Out: 1

大家可以看图示,从 9 月 4 日到 9 月 19 日每日的超时量大部分在 100 – 300,也有几天达到了 4000+,较之前的超时量也可说降到了之前的 1%。每天几百的量级已经很少了:

那么是否可以继续优化呢?我找萨跑了下Slow Log想分析这些慢的请求 body 的特点,结果发现这些请求并没有什 么特殊性。我又想如果选「大于 5 个主分片」这种反直觉的方案,也就是让分片数变的更多会怎么样呢?所以,我试了6 Shard9 Shards,可以看最近几天:

结论是大于 5 分片的全部方案效果都不错。最好的是9 Shards + 1 Replica,每天超时数小于 30 之间

这些超时都发生下凌晨各服务定期任务对这个服务产生大量大量引起的,不会影响用户体验。找调用方确认了下,有重试机制,所以到现在,我们的优化任务告一个段落了。

后记

还是那句话:

不是它不好,而是你没有用好

通过这个带着问题做优化的案例,让我对 ES 有了更深入的了解。我获得的经验是:

  • 默认的分片方案不一定合适,具体的分片方案应该根据业务场景具体实验
  • 如官网所说「副本可能有助于提高吞吐量,但并不总是如此」,主要是由于节点数少儿带来的文件系统缓存性能问题,所以副本数不是越多越好,还是尽量按照官方推荐的来
  • 主分片数在一定范围内越多越好。我之前只觉得路由效率的问题,但是另外一个角度,分片多那么每个分片上的数据就变少了,查询和聚合要更快。

这次优化是从开发者有权限的地方去找优化思路的,没有考虑服务器资源和 ES 配置方面的优化方向,相信也会有收获。

另外本来还准备了「 使用 preference 优化缓存利用率 」、「 6.0 新增的 Index Sorting 」、「 直接路由 」等多个思路来优化。之后有时间我准备调低现在超时的阈值,相信到时候都能用上。

延伸阅读

  1. https://www.elastic.co/guide/cn/elasticsearch/guide/current/_queries_and_filters.html
  2. https://www.elastic.co/guide/en/elasticsearch/reference/6.2/_basic_concepts.html
  3. https://www.elastic.co/guide/en/elasticsearch/reference/7.3/indices-create-index.html
  4. https://doc.yonyoucloud.com/doc/mastering-elasticsearch/chapter-4/41_README.html
  5. https://www.elastic.co/blog/how-many-shards-should-i-have-in-my-elasticsearch-cluster
  6. https://doc.yonyoucloud.com/doc/mastering-elasticsearch/chapter-4/42_README.html
  7. https://www.infoq.cn/article/elasticsearch-performance-tuning-practice-at-ebay
  8. https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-search-speed.html#_replicas_might_help_with_throughput_but_not_always

Elasticsearch 使用copy_to组合字段进行查询

copy_to 介绍

copy_to 参数允许将多个字段的值复制到另一个字段中,然后对该字段进行查询。例如,first_name和last_name字段可以复制到full_name字段,如下所示:

PUT /my_index

{ “mappings”: { “properties”: { “first_name”:{ “type”: “text”, “copy_to”: “full_name” }, “last_name”:{ “type”: “text”, “copy_to”: “full_name” }, “full_name”:{ “type”: “text” } } } }

 

PUT my_index/_doc/1

{ “first_name”: “John”, “last_name”: “Smith” }

 

 

GET /my_index/_search

{ “query”: { “match”: { “full_name”:{ “query”: “John Smith”, “operator”: “and” } } } }

 

 

 

  • first_name 和 last_name 的值被复制到 full_name 字段
  • first_name 和 last_name 仍然可以被作为查询条件进行查询

其他注意点

  • copy_to 复制的是字段值,而不是分词产生的词语
  • _source 字段集合不会显示复制后的字段值
  • copy_to 可以复制到多个字段,例如:“copy_to”: [ “field_1”, “field_2” ]
  • 不能使用中间字段进行复制,例如:field_1 复制到 field_2,field_2复制到field_3,这样是无效的。应该直接使用多字段复制,让field_1复制到 [ “field_2”, “field_3” ]

ElasticSearch聚合 多字段

多字段聚合
目前针对 ElasticSearch聚合 并且是多多字段
通常情况,terms聚合都是仅针对于一个字段的聚合。因为该聚合是需要把词条放入一个哈希表中,如果多个字段就会造成n^2的内存消耗。

不过,对于多字段,ElasticSearch也提供了下面两种方式:

1 使用脚本合并字段
2 使用copy_to方法,合并两个字段,创建出一个新的字段,对新字段执行单个字段的聚合。

 

collect模式

 

对于子聚合的计算,有两种方式:

 

  • depth_first 直接进行子聚合的计算
  • breadth_first 先计算出当前聚合的结果,针对这个结果在对子聚合进行计算。

 

默认情况下ES会使用深度优先,不过可以手动设置成广度优先,比如:

 

{
    "aggs" : {
        "actors" : {
             "terms" : {
                 "field" : "actors",
                 "size" : 10,
                 "collect_mode" : "breadth_first"
             },
            "aggs" : {
                "costars" : {
                     "terms" : {
                         "field" : "actors",
                         "size" : 5
                     }
                 }
            }
         }
    }
}

《从0到1学习Flink》—— Apache Flink 介绍

前言

Flink 是一种流式计算框架,为什么我会接触到 Flink 呢?因为我目前在负责的是监控平台的告警部分,负责采集到的监控数据会直接往 kafka 里塞,然后告警这边需要从 kafka topic 里面实时读取到监控数据,并将读取到的监控数据做一些 聚合/转换/计算 等操作,然后将计算后的结果与告警规则的阈值进行比较,然后做出相应的告警措施(钉钉群、邮件、短信、电话等)。画了个简单的图如下:

监控告警

目前告警这块的架构是这样的结构,刚进公司那会的时候,架构是所有的监控数据直接存在 ElasticSearch 中,然后我们告警是去 ElasticSearch 中搜索我们监控指标需要的数据,幸好 ElasticSearch 的搜索能力够强大。但是你有没有发现一个问题,就是所有的监控数据从采集、采集后的数据做一些 计算/转换/聚合、再通过 Kafka 消息队列、再存进 ElasticSearch 中,再而去 ElasticSearch 中查找我们的监控数据,然后做出告警策略。整个流程对监控来说看起来很按照常理,但是对于告警来说,如果中间某个环节出了问题,比如 Kafka 消息队列延迟、监控数据存到 ElasticSearch 中写入时间较长、你的查询姿势写的不对等原因,这都将导致告警从 ElasticSearch 查到的数据是有延迟的。也许是 30 秒、一分钟、或者更长,这样对于告警来说这无疑将导致告警的消息没有任何的意义。

为什么这么说呢?为什么需要监控告警平台呢?无非就是希望我们能够尽早的发现问题,把问题给告警出来,这样开发和运维人员才能够及时的处理解决好线上的问题,以免给公司造成巨大的损失。

更何况现在还有更多的公司在做那种提前预警呢!这种又该如何做呢?需要用大数据和机器学习的技术去分析周期性的历史数据,然后根据这些数据可以整理出来某些监控指标的一些周期性(一天/七天/一月/一季度/一年)走势图,这样就大概可以绘图出来。然后根据这个走势图,可以将当前时间点的监控指标的数据使用量和走势图进行对比,在快要达到我们告警规则的阈值时,这时就可以提前告一个预警出来,让运维提前知道预警,然后提前查找问题,这样就能够提早发现问题所在,避免损失,将损失降到最小!当然,这种也是我打算做的,应该可以学到不少东西的。

于是乎,我现在就在接触流式计算框架 Flink,类似的还有常用的 Spark 等。

自己也接触了 Flink 一段时间了,这块中文资料目前书籍是只有一本很薄的,英文书籍也是三本不超过。

我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以关注我的公众号:zhisheng,然后回复关键字:Flink 即可无条件获取到。

另外这里也推荐一些博客可以看看:

1、官网:[https://flink.apache.org/]()

2、GitHub: [https://github.com/apache/flink]()

3、[https://blog.csdn.net/column/…]()

4、[https://blog.csdn.net/lmalds/…]()

5、[http://wuchong.me/]()

6、[https://blog.csdn.net/liguohu…]()

下面的介绍可能也有不少参考以上所有的资料,感谢他们!在介绍 Flink 前,我们先看看 数据集类型 和 数据运算模型 的种类。

数据集类型有哪些呢:

  • 无穷数据集:无穷的持续集成的数据集合
  • 有界数据集:有限不会改变的数据集合

那么那些常见的无穷数据集有哪些呢?

  • 用户与客户端的实时交互数据
  • 应用实时产生的日志
  • 金融市场的实时交易记录

数据运算模型有哪些呢:

  • 流式:只要数据一直在产生,计算就持续地进行
  • 批处理:在预先定义的时间内运行计算,当完成时释放计算机资源

Flink 它可以处理有界的数据集、也可以处理无界的数据集、它可以流式的处理数据、也可以批量的处理数据。

Flink 是什么 ?

flink-01

flink-02

flink-03

上面三张图转自 云邪 成都站 《Flink 技术介绍与未来展望》,侵删。

从下至上,Flink 整体结构

flink-stack-frontpage

从下至上:

1、部署:Flink 支持本地运行、能在独立集群或者在被 YARN 或 Mesos 管理的集群上运行, 也能部署在云上。

2、运行:Flink 的核心是分布式流式数据引擎,意味着数据以一次一个事件的形式被处理。

3、API:DataStream、DataSet、Table、SQL API。

4、扩展库:Flink 还包括用于复杂事件处理,机器学习,图形处理和 Apache Storm 兼容性的专用代码库。

Flink 数据流编程模型

抽象级别

Flink 提供了不同的抽象级别以开发流式或批处理应用。

2018-10-14_09-34-17

  • 最底层提供了有状态流。它将通过 过程函数(Process Function)嵌入到 DataStream API 中。它允许用户可以自由地处理来自一个或多个流数据的事件,并使用一致、容错的状态。除此之外,用户可以注册事件时间和处理事件回调,从而使程序可以实现复杂的计算。
  • DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。
  • Table API 是以  为中心的声明式 DSL,其中表可能会动态变化(在表达流数据时)。Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁(代码量更少)。

你可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。

  • Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

Flink 程序与数据流结构

2018-10-14_09-51-09

Flink 应用程序结构就是如上图所示:

1、Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。

2、Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。

3、Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。

为什么选择 Flink?

Flink 是一个开源的分布式流式处理框架:

①提供准确的结果,甚至在出现无序或者延迟加载的数据的情况下。

②它是状态化的容错的,同时在维护一次完整的的应用状态时,能无缝修复错误。

③大规模运行,在上千个节点运行时有很好的吞吐量和低延迟。

更早的时候,我们讨论了数据集类型(有界 vs 无穷)和运算模型(批处理 vs 流式)的匹配。Flink 的流式计算模型启用了很多功能特性,如状态管理,处理无序数据,灵活的视窗,这些功能对于得出无穷数据集的精确结果是很重要的。

  • Flink 保证状态化计算强一致性。”状态化“意味着应用可以维护随着时间推移已经产生的数据聚合或者,并且 Filnk 的检查点机制在一次失败的事件中一个应用状态的强一致性。

exactly_once_state

  • Flink 支持流式计算和带有事件时间语义的视窗。事件时间机制使得那些事件无序到达甚至延迟到达的数据流能够计算出精确的结果。

out_of_order_stream

  • 除了提供数据驱动的视窗外,Flink 还支持基于时间,计数,session 等的灵活视窗。视窗能够用灵活的触发条件定制化从而达到对复杂的流传输模式的支持。Flink 的视窗使得模拟真实的创建数据的环境成为可能。

windows

  • Flink 的容错能力是轻量级的,允许系统保持高并发,同时在相同时间内提供强一致性保证。Flink 以零数据丢失的方式从故障中恢复,但没有考虑可靠性和延迟之间的折衷。

distributed_snapshots

  • Flink 能满足高并发和低延迟(计算大量数据很快)。下图显示了 Apache Flink 与 Apache Storm 在完成流数据清洗的分布式任务的性能对比。

streaming_performance

  • Flink 保存点提供了一个状态化的版本机制,使得能以无丢失状态和最短停机时间的方式更新应用或者回退历史数据。

savepoints

  • Flink 被设计成能用上千个点在大规模集群上运行。除了支持独立集群部署外,Flink 还支持 YARN 和Mesos 方式部署。
  • Flink 的程序内在是并行和分布式的,数据流可以被分区成 stream partitions,operators 被划分为operator subtasks; 这些 subtasks 在不同的机器或容器中分不同的线程独立运行;operator subtasks 的数量在具体的 operator 就是并行计算数,程序不同的 operator 阶段可能有不同的并行数;如下图所示,source operator 的并行数为 2,但最后的 sink operator 为1;

parallel_dataflows

  • 自己的内存管理Flink 在 JVM 中提供了自己的内存管理,使其独立于 Java 的默认垃圾收集器。 它通过使用散列,索引,缓存和排序有效地进行内存管理。
  • 丰富的库Flink 拥有丰富的库来进行机器学习,图形处理,关系数据处理等。 由于其架构,很容易执行复杂的事件处理和警报。

分布式运行

flink 作业提交架构流程可见下图:

1、Program Code:我们编写的 Flink 应用程序代码

2、Job Client:Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点。 Job Client 负责接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。 执行完成后,Job Client 将结果返回给用户

3、Job Manager:主进程(也称为作业管理器)协调和管理程序的执行。 它的主要职责包括安排任务,管理checkpoint ,故障恢复等。机器集群中至少要有一个 master,master 负责调度 task,协调 checkpoints 和容灾,高可用设置的话可以有多个 master,但要保证一个是 leader, 其他是 standby; Job Manager 包含 Actor system、Scheduler、Check pointing 三个重要的组件

4、Task Manager:从 Job Manager 处接收需要部署的 Task。Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点。 任务执行的并行性由每个 Task Manager 上可用的任务槽决定。 每个任务代表分配给任务槽的一组资源。 例如,如果 Task Manager 有四个插槽,那么它将为每个插槽分配 25% 的内存。 可以在任务槽中运行一个或多个线程。 同一插槽中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。

最后

本文主要讲了我接触到 Flink 的缘由,然后从数据集类型和数据运算模型开始讲起,接着介绍了下 Flink 是什么、Flink 的整体架构、提供的 API、Flink 的优点所在以及 Flink 的分布式作业运行的方式。水文一篇,希望你能够对 Flink 稍微有一点概念了。