3.mysql操作 a. 编辑 mysql配置文件/etc/my.cnf vim /etc/my.cnf 在mysqld标签下新增如下代码
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
lower_case_table_names=1
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
validate_password=off
#server_id = 1
#log-bin=mysql-bin
#binlog_format = ROW
#log_timestamps=SYSTEM
b. 执行/export/servers/canal_admin/conf下的sql语句 cat canal_manager.sql 进入mysql,copy sql文件里的所有sql,直接执行
c.创建一个同步用的数据库 CREATE USER canal IDENTIFIED BY ‘123456’; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’; – GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ ; FLUSH PRIVILEGES;
在数据仓库建模中,未经任何加工处理的原始业务层数据,我们称之为ODS(Operational Data Store)数据。在互联网企业中,常见的ODS数据有业务日志数据(Log)和业务DB数据(DB)两类。对于业务DB数据来说,从MySQL等关系型数据库的业务数据进行采集,然后导入到Hive中,是进行数据仓库生产的重要环节。如何准确、高效地把MySQL数据同步到Hive中?一般常用的解决方案是批量取数并Load:直连MySQL去Select表中的数据,然后存到本地文件作为中间存储,最后把文件Load到Hive表中。这种方案的优点是实现简单,但是随着业务的发展,缺点也逐渐暴露出来:
性能瓶颈:随着业务规模的增长,Select From MySQL -> Save to Localfile -> Load to Hive这种数据流花费的时间越来越长,无法满足下游数仓生产的时间要求。
// 取出最后拼接字符串的es字段值,该值为业务时间 long eventTime = Long.parseLong(element.split(“,”)[1]); Date eventDate = new Date(eventTime); return new SimpleDateFormat(“yyyyMMdd”).format(eventDate); } }
如昨日的存量数据code_city,今日增量的数据为code_city_delta,可以通过 FULL OUTER JOIN,将存量和增量数据合并成一张最新的数据表,并作为明天的存量数据:
INSERT OVERWRITE TABLE code_city SELECT COALESCE( t2.id, t1.id ) AS id, COALESCE ( t2.city, t1.city ) AS city, COALESCE ( t2.province, t1.province ) AS province, COALESCE ( t2.event_time, t1.event_time ) AS event_time FROM code_city t1 FULL OUTER JOIN ( SELECT id, city, province, event_time FROM (– 取最后一条状态数据 SELECT id, city, province, dml_type, event_time, row_number ( ) over ( PARTITION BY id ORDER BY event_time DESC ) AS rank FROM code_city_delta WHERE dt = ‘20200324’ — 分区数据 ) temp WHERE rank = 1 ) t2 ON t1.id = t2.id;
# 是否启用binlog日志
show variables like 'log_bin';
# 查看binlog类型
show global variables like 'binlog_format';
# 查看详细的日志配置信息
show global variables like '%log%';
# mysql数据存储目录
show variables like '%dir%';
# 查看binlog的目录
show global variables like "%log_bin%";
# 查看当前服务器使用的biglog文件及大小
show binary logs;
# 查看最新一个binlog日志文件名称和Position
show master status;
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
select user_id,status,COUNT(orderNo) num from (
SELECT orderNo,LAST_VALUE(status) status ,LAST_VALUE(user_id) user_id from order_table group by orderNo
) group by user_id,status
;
接入数据库变更日志(binlog)是一种很好的解耦方式,很多云服务提供的数据库都有对应的CDC(Change Data Capture)功能,像阿里云的Flink服务就提供了对应的connector(mysql-cdc);此外还有一些同步工具通过解析binlog将数据同步出来,通过开发对应的connector都可以对接上flink,例如Aliyun-DTS、canal等。
CREATE FUNCTION statusCount AS 'com.fliaping.flink.udf.StatusCountUdtf' ;
CREATE FUNCTION prevValue AS 'com.fliaping.flink.udf.PrevValueUdaf' ;
-- 采购单的tt数据源
create table xx_source
(
id BIGINT
,gmt_modified VARCHAR
,user_id VARCHAR
,biz_status VARCHAR
,event_time AS PROCTIME()
,PRIMARY KEY (`id`)
) with (
type = 'your-db-type'
...
)
;
-- 状态统计离线表
create table offline_order_status_num
(
user_id VARCHAR COMMENT '供应商ID'
,biz_status VARCHAR COMMENT '业务状态'
,last_day_num BIGINT COMMENT '昨日待办数量'
,num BIGINT COMMENT '待办数量'
,ds VARCHAR COMMENT '分区字段'
,PRIMARY KEY (user_id, biz_status)
,PERIOD FOR SYSTEM_TIME --定义了维表的标识。
) with (
type = 'your-offline-type'
...
)
;
-- 单据状态离线表
create table order_status_history
(
order_no VARCHAR COMMENT '供应商ID'
,biz_status VARCHAR COMMENT '业务状态'
,last_biz_status VARCHAR COMMENT '昨日状态'
,gmt_modified TIMESTAMP COMMENT '修改时间'
,last_gmt_modified TIMESTAMP COMMENT '昨日修改时间'
,ds VARCHAR COMMENT '分区字段'
,PRIMARY KEY (order_no)
,PERIOD FOR SYSTEM_TIME --定义了维表的标识。
) with (
type = 'your-offline-type'
...
)
;
-- 进行数据过滤
create VIEW filter_view AS
select order_no
,user_id
,biz_status
,gmt_modified
,event_time
FROM tt_source
where order_no IS NOT NULL
and user_id IS NOT NULL
AND biz_status IS NOT NULL
AND gmt_modified IS NOT NULL
;
-- 获取Flink存储中的单据前一个状态,并缓存当前状态
CREATE VIEW get_before_biz_status AS
SELECT order_no
,user_id
,biz_status
,TO_TIMESTAMP (gmt_modified) gmt_modified
,DATE_FORMAT (TO_TIMESTAMP (gmt_modified), 'yyyyMMdd') AS gmt_modified_day
,prevValue(biz_status) OVER (PARTITION BY order_no ORDER BY event_time) prev_biz_status
FROM filter_view
;
-- 对于Flink中没有存储上一次状态的单据,去离线数据中查出来
CREATE VIEW order_with_last_biz_status AS
SELECT c.*
-- 考虑每天的离线数据需要一定时间产出,故使用前日离线数据
,IF (
d.ds = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
,d.last_biz_status
,d.biz_status
) prev_biz_status_offline
,IF (
d.ds = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
,d.last_gmt_modified
,d.gmt_modified
) gmt_modified_offline
FROM get_before_biz_status c
LEFT JOIN order_status_history FOR SYSTEM_TIME AS OF PROCTIME () d
ON c.order_no = d.order_no AND c.prev_biz_status IS NULL
;
-- 将状态迁移通过udtf映射为不同状态的操作数,例如,状态迁移为10-15-16,那么10来的时候,状态10的操作是+1,15来时,状态10 -1,15 +1,产生两条数据,16来时,15 -1, 这么做的原因是TT数据对于更新操作,没有更新前数据和更新后数据,对于状态计数只能通过加入业务状态机的方式识别状态变更到来时,哪个状态+1,哪个状态-1
CREATE VIEW status_shift_opt AS
select S.order_no
,S.user_id
,S.biz_status
,S.gmt_modified_day
,S.gmt_modified
,T.effect_biz_status
,T.effect_biz_status_opt
from (
-- 将离线和Flink中取到的前一个状态进行合并,确定该单据前一个状态
SELECT order_no
,user_id
,biz_status
,gmt_modified_day
,gmt_modified
,IF(
prev_biz_status IS NULL AND prev_biz_status_offline IS NOT NULL
,prev_biz_status_offline
,prev_biz_status
) prev_biz_status
FROM order_with_last_biz_status
WHERE (gmt_modified_offline IS NOT NULL AND gmt_modified_offline < gmt_modified)
or (gmt_modified_offline IS NULL)
) as S
,LATERAL TABLE (statusCount (biz_status,prev_biz_status)) as T (effect_biz_status, effect_biz_status_opt)
;
-- 按照天数据进行分组,并过滤出近两天的数据
CREATE VIEW summary_with_day AS
SELECT user_id
,effect_biz_status biz_status
,gmt_modified_day
,SUM(effect_biz_status_opt) num
FROM status_shift_opt
-- 实时数据,只统计昨日和今日的数据
WHERE effect_biz_status IN (10,15)
GROUP BY user_id
,effect_biz_status,gmt_modified_day
;
-- 按照供应商+状态维度进行分组,将近两日数据累加,得到(昨日+今日数量)
CREATE VIEW summary_with_user_id_status AS
SELECT user_id
,biz_status
,SUM(num) num
FROM summary_with_day
WHERE gmt_modified_day = DATE_FORMAT (CURRENT_TIMESTAMP, 'yyyyMMdd')
or gmt_modified_day = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
GROUP BY user_id,biz_status
;
-- 加上离线数量,维表离线数量(截止昨日0点) + 实时数量(昨日+今日)= 总数量
CREATE VIEW plus_offline_data AS
SELECT user_id
,biz_status
,offline_num+num num
,CONCAT(
'offlineNum:'
,offline_num
,',realtimeNum:'
,num
,',sourceType:Flink_STREAM'
) attribute
FROM (
SELECT a.user_id
,a.biz_status
,IF(
b.ds = DATE_FORMAT ( date_sub (CURRENT_TIMESTAMP, 1), 'yyyy-MM-dd', 'yyyyMMdd' )
,IF(b.last_day_num IS NULL,0,b.last_day_num)
,IF(b.num IS NULL,0,b.num)
) offline_num
,a.num
FROM summary_with_user_id_status a
LEFT JOIN offline_order_status_num FOR SYSTEM_TIME AS OF PROCTIME () b
ON a.user_id = b.user_id
and a.biz_status = b.biz_status
)
;
create table mysql_output
(
,`user_id` varchar
,biz_status varchar
,num BIGINT
,attribute varchar
,primary key (biz_status, `user_id`) ) with ( type = 'mysql'
,url = 'jdbc:mysql://xxxxxx'
,tableName = 'order_status_count'
,userName = 'xxx'
,password = 'xxx'
)
;
INSERT INTO mysql_output
SELECT *
FROM plus_offline_data
;
UDF
prevValue函数(UDAF:UDF – Aggregation Functions):
package com.fliaping.flink.udf;
import org.apache.flink.table.functions.AggregateFunction;
public class PrevValueUdaf extends AggregateFunction<String, PrevValueUdaf.PrevValueAccum> {
private static final long serialVersionUID = 3733259132660252997L;
@Override
public PrevValueAccum createAccumulator() {
return new PrevValueAccum();
}
@Override
public String getValue(PrevValueAccum prevValueAccum) {
return prevValueAccum.prevValue;
}
public void accumulate(PrevValueAccum accumulator, String iValue) {
accumulator.prevValue = accumulator.currentValue;
accumulator.currentValue =iValue;
}
public void retract(PrevValueAccum accumulator, String iValue) {
accumulator.currentValue = accumulator.prevValue;
accumulator.prevValue = iValue;
}
public void merge(PrevValueAccum accumulator, Iterable<PrevValueAccum> its) {
for (PrevValueAccum it : its) {
accumulator.currentValue = it.currentValue;
accumulator.prevValue = it.prevValue;
}
}
public static class PrevValueAccum {
private String prevValue;
private String currentValue;
}
}
statusCount函数(UDTF:UDF – Table Functions):
package com.fliaping.flink.udf;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava18.com.google.common.base.Objects;
import org.apache.flink.table.functions.TableFunction;
public class StatusCountUdtf extends TableFunction<Tuple2<String, Integer>>{
private static final long serialVersionUID = 5467096366714935845L;
public void eval(String bizStatus, String prevBizStatus) {
if(Objects.equal(bizStatus, prevBizStatus)) {
return;
}
if (bizStatus != null) {
collect(Tuple2.of(bizStatus, 1));
}
if (prevBizStatus != null) {
collect(Tuple2.of(prevBizStatus, -1));
}
}
}
离线 SQL代码:
-- 计算统计数量,包含day-1和day-2的数据
INSERT OVERWRITE TABLE order_status_num PARTITION(ds='${bizdate}')
SELECT IF(a.user IS NULL,b.user_id,a.user_id) user_id
,IF(a.biz_status IS NULL,b.biz_status,a.biz_status) biz_status
,b.num last_day_num
,a.num
FROM (
SELECT user_id
,biz_status
,COUNT(order_no) num
FROM my_order
WHERE ds = '${bizdate}'
AND biz_status IN (10,15)
GROUP BY user_id
,biz_status
) a
FULL OUTER JOIN (
SELECT user_id
,biz_status
,COUNT(order_no) num
FROM my_order
WHERE ds = to_char(dateadd(to_date('${bizdate}', "yyyymmdd"), - 1, 'dd'), 'yyyymmdd')
AND biz_status IN (10,15)
GROUP BY user_id
,biz_status
) b
ON a.user_id = b.user_id
AND a.biz_status = b.biz_status
;
-- 计算单据状态,包含day-1和day-2的数据
INSERT OVERWRITE TABLE order_status_history PARTITION(ds='${bizdate}')
SELECT IF(
a.order_no IS NULL
,b.order_no
,a.order_no
) order_no
,a.biz_status
,b.biz_status last_biz_status
,a.gmt_modified
,b.gmt_modified last_gmt_modified
FROM (
SELECT order_no
,biz_status
,gmt_modified
FROM my_order
WHERE ds = '${bizdate}'
AND biz_status IN (10,15)
) a
FULL OUTER JOIN (
SELECT order_no
,biz_status
,gmt_modified
FROM my_order
WHERE ds = to_char(dateadd(to_date('${bizdate}', "yyyymmdd"), - 1, 'dd'), 'yyyymmdd')
AND biz_status IN (10,15)
) b
ON a.order_no = b.order_no
;
-- ---------------------------------
-- DIM层,区域维表,
-- 在MySQL中创建视图
-- ---------------------------------
DROP VIEW IF EXISTS dim_province;
CREATE VIEW dim_province AS
SELECT
bp.id AS province_id,
bp.name AS province_name,
br.id AS region_id,
br.region_name AS region_name,
bp.area_code AS area_code
FROM base_region br
JOIN base_province bp ON br.id= bp.region_id
;
-- ---------------------------------
-- DIM层,商品维表,
-- 在MySQL中创建视图
-- ---------------------------------
CREATE VIEW dim_sku_info AS
SELECT
si.id AS id,
si.sku_name AS sku_name,
si.category3_id AS c3_id,
si.weight AS weight,
si.tm_id AS tm_id,
si.price AS price,
si.spu_id AS spu_id,
c3.name AS c3_name,
c2.id AS c2_id,
c2.name AS c2_name,
c3.id AS c1_id,
c3.name AS c1_name
FROM
(
sku_info si
JOIN base_category3 c3 ON si.category3_id = c3.id
JOIN base_category2 c2 ON c3.category2_id =c2.id
JOIN base_category1 c1 ON c2.category1_id = c1.id
);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
Which setup is going to perform best in terms of search performance? Usually, the setup that has fewer shards per node in total will perform better. The reason for that is that it gives a greater share of the available filesystem cache to each shard, and the filesystem cache is probably Elasticsearch’s number 1 performance factor.
So what is the right number of replicas? If you have a cluster that has num_nodes nodes, num_primaries primary shards in total and if you want to be able to cope with max_failures node failures at once at most, then the right number of replicas for you is max(max_failures, ceil(num_nodes / num_primaries) – 1).