通过案例看Elasticsearch优化

我瓣用户产品中有多个核心场景都在使用 Elasticsearch (以下简称 ES),主要用在全文搜索、聚合计算等方面。ES 相关的功能开发者主要是我,我最早是 2016 年在 选影视 的功能上开始使用 ES 做结构化搜索,这个工具给运营同学和资深用户提供了按各种复杂条件找电影的,可以说是最好的途径。

这个项目为多个核心产品功能提供原始数据,这几年它历经多次迭代和功能完善,不过最近它出了一点问题,我觉得解决问题的过程很值得写篇文章分享一下,所以就有了此文。

为了让大家能更理解下面的内容,先介绍下「选影视」存储在 ES 里面的文档部分字段:

class Subject(DocType):
    title = Text()
    genres = Text()
    countries = Text()
    standard_tags = Text()
    ...

每个文档的_id是条目 ID,也包含很多条目属性字段 (如上述列出的标题、类型、地区、标签,以及未列出的一些字段),其中标签 (standard_tags) 字段是非常重要的,正是由于这个字段的数据,可以基于 ES 搜索包含某个 (些) 标签条目 ID 列表。

问题

8 月 28 日 (下图中的最高峰那天) 平台同事 @我,说发现这个项目最近服务调用慢且有大量超时,由于项目是一个微服务,所以经常触发服务调用的熔断,希望解决。

这个项目由于最近改动并不大,我已经有一段时间没有特别关注了,赶紧打开对应的 Sentry 页面,通过一个 Issue 页面找到问题原因:

某个 API 请求 ES 很容易造成请求超时 (抛 ConnectionTimeout)

下面是按天统计的事件总数的趋势图:

设置的超时间隔是 2 秒:这已经是 API 请求里面非常宽容的阈值了,事实上在之前的使用中大部分对 ES 的 API 请求在几毫秒到几十毫秒之间,鲜有超时问题。

PS: 这个图下面还会出现多次,会分别展示对应时间点下每天事件的总数范围。

很快和对应开发同事确认,造成问题的接口是给新版本 APP 里面的新功能提供的,我们发现问题时这个超时事件已经达到每天 4-6w+,虽然看起来量倒不算大,但是我依然觉得需要快速解决它:

  • 已经影响了服务质量
  • 功能比较隐蔽,如果不是主动用这个功能是不会触发 API 请求的,我体验了下对应功能确实很容易出现点开页面还没加载出来或者干脆窗口空白的情况,这太影响用户体验了
  • 这个超时量会随着新版本 APP 的装机量不断提高
  • 这些慢查询给 ES 带来压力,也影响了其他正常查询请求

回到问题,这个特别慢的请求是做一个聚合计算,看一下请求的 body:

{
  'aggs': {
    'by_standard_tags': {
      'terms': {
        'field': 'standard_tags.keyword',
        'size': 100
      }
    }
  },
  'query': {
    'bool': {
      'must': [{'ids': {'values': IDS}}]
    }
  }
}

其中 IDS 是条目 ID 列表,这个请求是让 ES 聚合这些条目包含的标签总数。举个例子,ID 为 1 的条目有「剧情 / 犯罪 / 动作」三个标签;ID 为 2 的条目有「喜剧 / 剧情 / 爱情」三个标签,那么 IDS 为[1, 2] 时,返回的内容中「剧情」为 2 (2 个条目都有这个标签),其他的标签都是 1 (只有一个条目包含)

OK,现在事情很明朗了,我们开始解决超时的问题吧。

这个接口的代码我当时一眼看去并没有发现问题,那马上想到的就是缓存和减少调用这 2 条路,但是很遗憾行不通:

  • 缓存。IDS 是用户看过 / 想看电影的条目列表,每个人都不同。用户访问和用户兴趣相关,且频率不高,由于用户量庞大不值得为了这么个小功能就主动缓存并添加一个好的更新缓存的机制,成本太高甚至效果会更差
  • 减少调用。目前的调用已经是按需请求,没有找到明显的可以优化调用量的地方

OK,既然「绕」不了了,就直面吧,我继续尝试其他解决超时问题的方法

优化聚合计算请求

这算是我的个人性格,接下来我第一个想法的思路就是优化这个聚合请求的写法。其实在 5 年前我的那个 《Python 高级编程》 的 PPT 里面就说过:

  1. 在合适的地方用合适的技巧
  2. 不是它不好,而是你没有用好

这一直算是我的技术格言吧,无论是不熟悉的还是熟知的内容,我都会保持敬畏。出了问题会首先考虑是不是我没有用对用好。就拿上面的这个 body 来说,其实做 2 件事:

  1. 限定要查询的 _id 范围 (in IDS)
  2. 聚合查找 standard_tags 字段中的标签数据,返回匹配标签数量最多的 100 个标签和包含的条目数量

那怎么优化呢?首先我试了一下改用Term Query代替IDs QueryIDs Query的写法是我开始用的,之后其他同学有这样的需求就按着我这种写法来了,我怀疑是这种查询语句的问题,改成下面这样的效果:

{
  'aggs': {
    'by_standard_tags': {
      'terms': {                                                                                                 'field': 'standard_tags.keyword',
        'size': 100
      }
    }
  },
  'query': {
    'bool': {
      'must': [{'terms': {'_id': IDS}}]
    }
  }
}

上线后发现修改对超时没有帮助 😢,说明IDs Query的写法没有问题。

优化点 1

这让我一时间没有了头绪,我开始搜索一些「优化 Elasticsearch」相关的技术博文和开源书籍,希望从中找找灵感。然后就在 Elastic 社区找到了一个 query+aggs 查询性能问题 的帖子,其作者也遇到了使用聚合后查询非常慢的问题,@kennywu76 给出了一个方案: 「在每一层 terms aggregation 内部加一个 {“execution_hint”:”map”}」。在评论区 @kennywu76 也给了详细的解释,我认为算是全网最好的解释了,转发一下:

Terms aggregation 默认的计算方式并非直观感觉上的先查询,然后在查询结果上直接做聚合。

ES 假定用户需要聚合的数据集是海量的,如果将查询结果全部读取回来放到内存里计算,内存消耗会非常大。因此 ES 利用了一种叫做 global ordinals 的数据结构来对聚合的字段来做 bucket 分配,这个 ordinals 用有序的数值来代表字段里唯一的一个字符串,因此为每个 ordinals 值分配一个 bucket 就等同于为每个唯一的 term 分配了 bucket。 之后遍历查询结果的时候,可以将结果映射到各个 bucket 里,就可以很快的统计出每个 bucket 里的文档数了。

这种计算方式主要开销在构建 global ordinals 和分配 bucket 上,如果索引包含的原始文档非常多,查询结果包含的文档也很多,那么默认的这种计算方式是内存消耗最小,速度最快的。

如果指定 execution_hint:map 则会更改聚合执行的方式,这种方式不需要构造 global ordinals,而是直接将查询结果拿回来在内存里构造一个 map 来计算,因此在查询结果集很小的情况下会显著的比 global ordinals 快。

要注意的是这中间有一个平衡点,当结果集大到一定程度的时候,map 的内存开销带来的代价可能就抵消了构造 global ordinals 的开销,从而比 global ordinals 更慢,所以需要根据实际情况测试对比一下才能找好平衡点。

对于我们这个场景,IDS 量级比较小所以查询结果集很小,可以改用execution_hint:map这种方式:

{
  'aggs': {
    'by_standard_tags': {
      'terms': {
        'execution_hint': 'map',
        'field': 'standard_tags.keyword',
        'size': 100
      }
    }
  }
}

优化点 2

优化过程里我突然想起了官网对于Filter or Query的优化意见 (详见延伸阅读链接 1),文中说:

过滤查询(Filtering queries)只是简单的检查包含或者排除,这就使得计算起来非常快。考虑到至少有一个过滤查询(filtering query)的结果是 “稀少的”(很少匹配的文档),并且经常使用不评分查询(non-scoring queries),结果会被缓存到内存中以便快速读取,所以有各种各样的手段来优化查询结果。

相反,评分查询(scoring queries)不仅仅要找出 匹配的文档,还要计算每个匹配文档的相关性,计算相关性使得它们比不评分查询费力的多。同时,查询结果并不缓存。

多亏倒排索引(inverted index),一个简单的评分查询在匹配少量文档时可能与一个涵盖百万文档的 filter 表现的一样好,甚至会更好。但是在一般情况下,一个 filter 会比一个评分的 query 性能更优异,并且每次都表现的很稳定。

过滤(filtering)的目标是减少那些需要通过评分查询(scoring queries)进行检查的文档。

这段话之前我也看过,但是这次再看发现了一个可优化的点,就是用Filter替代Query,因为我们想要的只是聚合结果,完全不需要做评分查询。请求 body 就改成了这样:

{
  'aggs': {
    'by_standard_tags': {
      'terms': {
        'execution_hint': 'map',
        'field': 'standard_tags.keyword',
        'size': 100
      }
    }
  },
 'query': {
   'bool': {
     'filter': [{'ids': {'values': IDS}}]
    }
  }
}

优化点 3

正是上面的这段对于Filter or Query的优化意见,让我也注意到一个词:「不评分查询」,前面的查询结果其实会返回传入的 IDS 列表的对应文档结果,但是我们根本不需要,所以可以不返回 source:

{
  'aggs': {
    'by_standard_tags': {
      'terms': {
        'execution_hint': 'map',
        'field': 'standard_tags.keyword',
        'size': 100
      }
    }
  },
 'query': {
   'bool': {
     'filter': [{'ids': {'value': IDS}}]
    }
  },
  '_source': False
}

优化结果

基于上面三点优化。上线后超时问题得到了很大缓解:

优化效果比较显著:

  1. 超时数降到了之前的约 1/4
  2. 项目总超时 (还有其他查询引起的超时) 降到了约之前的 1/5
  3. 找了一些 BadCase 对比效果,优化后请求耗时最差降到之前的一半,约 1/3 的请求的时间重回小于 100ms 级别

虽然已经不会触发平台的熔断了,但是超时事件总量依然很大,需要进一步优化。

优化分片数

我继续保持怀疑的态度寻找优化方案,在我的认知里面,像 Elasticsearch 这样成熟的、广受关注和欢迎的项目经过多年的发展,我不相信一个很常见的聚合查询就能引起集群负载的波动,也不相信它能引起这么大量的请求超时,所以我还在怀疑问题出在使用姿势上。由于我不熟悉 Java 语言,在短时间不具备阅读 ES 源码的能力,所以开始希望通过别人的文章中获得灵感。

在看到《Mastering Elasticsearch》中的「选择恰当的分片数量和分片副本数量」(延伸阅读链接 4) 这一节后,我赶紧看了下项目用的这个索引的分片情况 (通过http http://ES_URL/_cat/shards/INDEX_NAME),并和萨 (sa) 确认了一下。我瓣一开始用的 ES 版本比较低 – 2.3,之后升级到当时最新的 6.4,ES 在 6.X 及之前的版本默认索引分片数为 5 (Primary Shards)、副本数为 1 (Replica,当一个节点的主分片丢失,ES 可以把任意一个可用的分片副本推举为主分片),从 ES7.0 开始调整为默认索引分片数为 1、副本数为 1 (详见延伸阅读链接 2 和链接 3)。而我瓣为了数据安全,默认每个索引分片数为 5、副本数为 2,也就是这个索引一共有 15 个分片(总分片数 = 主分片数*(副分片数 + 1))。

ES 默认的分片配置并不适用于所有业务场景,那么分片数应该怎么安排呢?延伸阅读链接 5 是 ES 官方博客中的一篇叫做「Elasticsearch 究竟要设置多少分片数?」的文章,其中有这么 2 段内容:

Elasticsearch 中的数据组织成索引。每一个索引由一个或多个分片组成。每个分片是 Luncene 索引的一个实例,你可以把实例理解成自管理的搜索引擎,用于在 Elasticsearch 集群中对一部分数据进行索引和处理查询。

构建 Elasticsearch 集群的初期如果集群分片设置不合理,可能在项目的中后期就会出现性能问题。

通过分片,ES 把数据放在不同节点上,这样可以存储超过单节点容量的数据。而副本分片数的增加可以提高搜索的吞吐量 (主分片与副本都能处理查询请求,ES 会自动对搜索请求进行负载均衡)。在《Mastering Elasticsearch》里面还提到了路由 (routing) 功能 (延伸阅读链接 6),ES 在写入文档时,文档会通过一个公式路由到一个索引中的一个分片上。默认的选择分片公式如下:

shard_num = hash(_routing) % num_primary_shards

_routing字段的取值默_id字段。如果不指定路由,在查询 / 聚合时就需要由 ES 协调节点搜集到每个分片 (每个主分片或者其副本上查询结果,再将查询的结果进行排序然后返回:在我们这个场景,每次要从 5 个分片上聚合。

如果能够确定文档会被映射到哪个 (些) 分片,可以只在对应的一 (多) 个分片上执行查询命令,不用全局的搜索。我实际的试了下,用 routing 确实快了很多。所以看到这里我第一个感觉是分片多了会带来路由问题,前面说了,「每个分片是自管理的,对一部分数据进行索引和处理查询」,查询和聚合都需要在最后把结果搜集起来。那可不可以就把数据放在一个分片上,这样就没有路由的问题了?另外 ES7.0 默认的分片方案也是朝着这个方面走的,所以我觉得方案调整应该是经过一段时间的实践,考虑到大部分场景下1 Shard + 1 Replica这样的方案是更高的选择。

Ok, 现在优化的目的就是选择合适的主分片数 + 合适的副本分片数

要改善现在面临的问题,考虑本业务的数据量,分片坏掉的概率和数据安全等因素,我直观的感受就是我瓣的索引分片数太多了。但由于索引创建好后,主分片数量不可修改,只可以修改副本分片数量。而要修改主分片数量,只能重建或者建新的索引,代价比较大。

无论是官方还是一些大厂相关文章都没有对于分片数做出完美的公式,默认的不一定是最好的,但什么样的组合还是需要实际测试,所以我尝试了多个分片方案,如下:

Shard(s) Replica(s)
1 1
1 2
2 1
2 2
3 1
3 2
4 1
4 2
5 2
6 1
6 2
9 1
9 2

Note: 这里副本分片数没有大于 2 的方案是所用到的 ES 集群节点规模所限。

为了不影响现有业务,用的都是新索引,这么创建:

curl -XPUT "http://ES_URL/INDEXNAME/" -H 'Content-Type: application/json' -d '{
    "settings" : {
        "index" : {
            "number_of_shards" : 1,
            "number_of_replicas" : 1
        }
    }
}'

在数据上我使用 elasticsearch-dump 把旧索引的中数据灌到新索引,另外一方面在新索引的因业务逻辑上订阅 Kafka 消息同步对索引数据的修改。为了让写入更快,我还在灌数据过程中关闭了副本和索引刷新:

curl -XPUT "http://ES_URL/INDEXNAME/_settings" -H 'Content-Type: application/json' -d '{ "index" : { "refresh_interval" : "-1", "number_of_replicas": 0 } }'

优化结果

上述分片方案的尝试并不是按表格顺序来做的,而且一开始我的理解是分片太多,所以最初的主要目的是要减少分片数。我一开始是在当前的主分片方案上尝试5 Shards + 1 Replica,也就是减少副本分片数:

就是图中 9 月 2 日这天,超时降得很明显,这给我带来了非常大的信心,说明我的方向是对的。当时由于数据的问题,后来迁回了原来的索引一直到 9 月 4 日。

刚才提到,官方从 7.0 开始改为默认1 Shard + 1 Replica这样的方案,所以接下来我新建了一个1 Shard + 2 Replicas的新索引,9 月 4 日上线,当时效果也非常好。

但是通过上图可以看到当天超时量又涨起来了,其实这是我犯的一个错误,早上测试1 Shard + 2 Replica观察了一段时间效果非常好,我认为还可以继续降分片数以提高「路由效率」,所以调整成了1 Shard + 1 Replica,我当时觉得毫无疑问效果会更好,然后下午就请假了… 结果过了 2 个小时发现超时涨的非常厉害,就回滚了。

不过到这里,可以感受到官方默认的方案对于我们这个例子是不可取的:如果当时选择1 Shard + 1 Replica运行满一天,我相信超时量将远高于之前 8 月 28 日最高峰的超时量。我觉得造成这个问题是由于分片数太少了,2 个分片扛不住这样的吞吐量。

在接下来的一段时间里面在准备好数据后。我分别尝试了上述提到的主分片小于 5 的各种组合,结果非常反直觉:

超时情况在 1 Shard + 2 Replicas 和 5 Shard + 1 Replica 这 2 个方案下表现是最好的,其他的方案的效果都很差。

为什么说反直觉呢?我本来认为:

  • 考虑请求 ES 集群带来的压力,在一定分片数范围内增加主分片能提高吞吐量,由于路由效率超过一定阈值应该会起反作用
  • 副本分片数多的副作用只是硬盘空间的「浪费」,但是能对查询效率有帮助,所以可以在一定范围内增加副本

在《eBay 的 Elasticsearch 性能调优实践》(延伸阅读链接 7) 中有「搜索性能和副本数之间的关系」和「搜索性能和分片数量之间的关系」的 2 张图表,支持了我的自觉:

  • 分片数增加的过程中,开始时搜索吞吐量增大 (响应时间减少),但随着分片数量的增加,搜索吞吐量减小 (响应时间增加)
  • 搜索吞吐量几乎与副本数量成线性关系

现在的测试结果和预想对不上,尤其是5 Shard + 1 Replica5 Shard + 2 Replicas(最初的方案) 怎么效果差这么多?

我继续搜索,找到官方对副本数的建议和问题解释 (延伸阅读链接 7):

Which setup is going to perform best in terms of search performance? Usually, the setup that has fewer shards per node in total will perform better. The reason for that is that it gives a greater share of the available filesystem cache to each shard, and the filesystem cache is probably Elasticsearch’s number 1 performance factor.

So what is the right number of replicas? If you have a cluster that has num_nodes nodes, num_primaries primary shards in total and if you want to be able to cope with max_failures node failures at once at most, then the right number of replicas for you is max(max_failures, ceil(num_nodes / num_primaries) – 1).

也就是说,当我能接受的max_failures为 1、num_nodes为 3:

In : from math import ceil

In : max(1, ceil(3 / 5) - 1)  # num_primaries = 5
Out: 1 # 5 Shard + 1 Replica

In : max(1, ceil(3 / 1) - 1)
Out: 2 # 1 Shard + 2 Replicas  # num_primaries = 1

In : max(1, ceil(3 / 6) - 1)  # num_primaries = 6
Out: 1

In : max(1, ceil(3 / 9) - 1)  # num_primaries = 9
Out: 1

大家可以看图示,从 9 月 4 日到 9 月 19 日每日的超时量大部分在 100 – 300,也有几天达到了 4000+,较之前的超时量也可说降到了之前的 1%。每天几百的量级已经很少了:

那么是否可以继续优化呢?我找萨跑了下Slow Log想分析这些慢的请求 body 的特点,结果发现这些请求并没有什 么特殊性。我又想如果选「大于 5 个主分片」这种反直觉的方案,也就是让分片数变的更多会怎么样呢?所以,我试了6 Shard9 Shards,可以看最近几天:

结论是大于 5 分片的全部方案效果都不错。最好的是9 Shards + 1 Replica,每天超时数小于 30 之间

这些超时都发生下凌晨各服务定期任务对这个服务产生大量大量引起的,不会影响用户体验。找调用方确认了下,有重试机制,所以到现在,我们的优化任务告一个段落了。

后记

还是那句话:

不是它不好,而是你没有用好

通过这个带着问题做优化的案例,让我对 ES 有了更深入的了解。我获得的经验是:

  • 默认的分片方案不一定合适,具体的分片方案应该根据业务场景具体实验
  • 如官网所说「副本可能有助于提高吞吐量,但并不总是如此」,主要是由于节点数少儿带来的文件系统缓存性能问题,所以副本数不是越多越好,还是尽量按照官方推荐的来
  • 主分片数在一定范围内越多越好。我之前只觉得路由效率的问题,但是另外一个角度,分片多那么每个分片上的数据就变少了,查询和聚合要更快。

这次优化是从开发者有权限的地方去找优化思路的,没有考虑服务器资源和 ES 配置方面的优化方向,相信也会有收获。

另外本来还准备了「 使用 preference 优化缓存利用率 」、「 6.0 新增的 Index Sorting 」、「 直接路由 」等多个思路来优化。之后有时间我准备调低现在超时的阈值,相信到时候都能用上。

延伸阅读

  1. https://www.elastic.co/guide/cn/elasticsearch/guide/current/_queries_and_filters.html
  2. https://www.elastic.co/guide/en/elasticsearch/reference/6.2/_basic_concepts.html
  3. https://www.elastic.co/guide/en/elasticsearch/reference/7.3/indices-create-index.html
  4. https://doc.yonyoucloud.com/doc/mastering-elasticsearch/chapter-4/41_README.html
  5. https://www.elastic.co/blog/how-many-shards-should-i-have-in-my-elasticsearch-cluster
  6. https://doc.yonyoucloud.com/doc/mastering-elasticsearch/chapter-4/42_README.html
  7. https://www.infoq.cn/article/elasticsearch-performance-tuning-practice-at-ebay
  8. https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-search-speed.html#_replicas_might_help_with_throughput_but_not_always

Elasticsearch 使用copy_to组合字段进行查询

copy_to 介绍

copy_to 参数允许将多个字段的值复制到另一个字段中,然后对该字段进行查询。例如,first_name和last_name字段可以复制到full_name字段,如下所示:

PUT /my_index

{ “mappings”: { “properties”: { “first_name”:{ “type”: “text”, “copy_to”: “full_name” }, “last_name”:{ “type”: “text”, “copy_to”: “full_name” }, “full_name”:{ “type”: “text” } } } }

 

PUT my_index/_doc/1

{ “first_name”: “John”, “last_name”: “Smith” }

 

 

GET /my_index/_search

{ “query”: { “match”: { “full_name”:{ “query”: “John Smith”, “operator”: “and” } } } }

 

 

 

  • first_name 和 last_name 的值被复制到 full_name 字段
  • first_name 和 last_name 仍然可以被作为查询条件进行查询

其他注意点

  • copy_to 复制的是字段值,而不是分词产生的词语
  • _source 字段集合不会显示复制后的字段值
  • copy_to 可以复制到多个字段,例如:“copy_to”: [ “field_1”, “field_2” ]
  • 不能使用中间字段进行复制,例如:field_1 复制到 field_2,field_2复制到field_3,这样是无效的。应该直接使用多字段复制,让field_1复制到 [ “field_2”, “field_3” ]

ElasticSearch聚合 多字段

多字段聚合
目前针对 ElasticSearch聚合 并且是多多字段
通常情况,terms聚合都是仅针对于一个字段的聚合。因为该聚合是需要把词条放入一个哈希表中,如果多个字段就会造成n^2的内存消耗。

不过,对于多字段,ElasticSearch也提供了下面两种方式:

1 使用脚本合并字段
2 使用copy_to方法,合并两个字段,创建出一个新的字段,对新字段执行单个字段的聚合。

 

collect模式

 

对于子聚合的计算,有两种方式:

 

  • depth_first 直接进行子聚合的计算
  • breadth_first 先计算出当前聚合的结果,针对这个结果在对子聚合进行计算。

 

默认情况下ES会使用深度优先,不过可以手动设置成广度优先,比如:

 

{
    "aggs" : {
        "actors" : {
             "terms" : {
                 "field" : "actors",
                 "size" : 10,
                 "collect_mode" : "breadth_first"
             },
            "aggs" : {
                "costars" : {
                     "terms" : {
                         "field" : "actors",
                         "size" : 5
                     }
                 }
            }
         }
    }
}

《从0到1学习Flink》—— Apache Flink 介绍

前言

Flink 是一种流式计算框架,为什么我会接触到 Flink 呢?因为我目前在负责的是监控平台的告警部分,负责采集到的监控数据会直接往 kafka 里塞,然后告警这边需要从 kafka topic 里面实时读取到监控数据,并将读取到的监控数据做一些 聚合/转换/计算 等操作,然后将计算后的结果与告警规则的阈值进行比较,然后做出相应的告警措施(钉钉群、邮件、短信、电话等)。画了个简单的图如下:

监控告警

目前告警这块的架构是这样的结构,刚进公司那会的时候,架构是所有的监控数据直接存在 ElasticSearch 中,然后我们告警是去 ElasticSearch 中搜索我们监控指标需要的数据,幸好 ElasticSearch 的搜索能力够强大。但是你有没有发现一个问题,就是所有的监控数据从采集、采集后的数据做一些 计算/转换/聚合、再通过 Kafka 消息队列、再存进 ElasticSearch 中,再而去 ElasticSearch 中查找我们的监控数据,然后做出告警策略。整个流程对监控来说看起来很按照常理,但是对于告警来说,如果中间某个环节出了问题,比如 Kafka 消息队列延迟、监控数据存到 ElasticSearch 中写入时间较长、你的查询姿势写的不对等原因,这都将导致告警从 ElasticSearch 查到的数据是有延迟的。也许是 30 秒、一分钟、或者更长,这样对于告警来说这无疑将导致告警的消息没有任何的意义。

为什么这么说呢?为什么需要监控告警平台呢?无非就是希望我们能够尽早的发现问题,把问题给告警出来,这样开发和运维人员才能够及时的处理解决好线上的问题,以免给公司造成巨大的损失。

更何况现在还有更多的公司在做那种提前预警呢!这种又该如何做呢?需要用大数据和机器学习的技术去分析周期性的历史数据,然后根据这些数据可以整理出来某些监控指标的一些周期性(一天/七天/一月/一季度/一年)走势图,这样就大概可以绘图出来。然后根据这个走势图,可以将当前时间点的监控指标的数据使用量和走势图进行对比,在快要达到我们告警规则的阈值时,这时就可以提前告一个预警出来,让运维提前知道预警,然后提前查找问题,这样就能够提早发现问题所在,避免损失,将损失降到最小!当然,这种也是我打算做的,应该可以学到不少东西的。

于是乎,我现在就在接触流式计算框架 Flink,类似的还有常用的 Spark 等。

自己也接触了 Flink 一段时间了,这块中文资料目前书籍是只有一本很薄的,英文书籍也是三本不超过。

我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以关注我的公众号:zhisheng,然后回复关键字:Flink 即可无条件获取到。

另外这里也推荐一些博客可以看看:

1、官网:[https://flink.apache.org/]()

2、GitHub: [https://github.com/apache/flink]()

3、[https://blog.csdn.net/column/…]()

4、[https://blog.csdn.net/lmalds/…]()

5、[http://wuchong.me/]()

6、[https://blog.csdn.net/liguohu…]()

下面的介绍可能也有不少参考以上所有的资料,感谢他们!在介绍 Flink 前,我们先看看 数据集类型 和 数据运算模型 的种类。

数据集类型有哪些呢:

  • 无穷数据集:无穷的持续集成的数据集合
  • 有界数据集:有限不会改变的数据集合

那么那些常见的无穷数据集有哪些呢?

  • 用户与客户端的实时交互数据
  • 应用实时产生的日志
  • 金融市场的实时交易记录

数据运算模型有哪些呢:

  • 流式:只要数据一直在产生,计算就持续地进行
  • 批处理:在预先定义的时间内运行计算,当完成时释放计算机资源

Flink 它可以处理有界的数据集、也可以处理无界的数据集、它可以流式的处理数据、也可以批量的处理数据。

Flink 是什么 ?

flink-01

flink-02

flink-03

上面三张图转自 云邪 成都站 《Flink 技术介绍与未来展望》,侵删。

从下至上,Flink 整体结构

flink-stack-frontpage

从下至上:

1、部署:Flink 支持本地运行、能在独立集群或者在被 YARN 或 Mesos 管理的集群上运行, 也能部署在云上。

2、运行:Flink 的核心是分布式流式数据引擎,意味着数据以一次一个事件的形式被处理。

3、API:DataStream、DataSet、Table、SQL API。

4、扩展库:Flink 还包括用于复杂事件处理,机器学习,图形处理和 Apache Storm 兼容性的专用代码库。

Flink 数据流编程模型

抽象级别

Flink 提供了不同的抽象级别以开发流式或批处理应用。

2018-10-14_09-34-17

  • 最底层提供了有状态流。它将通过 过程函数(Process Function)嵌入到 DataStream API 中。它允许用户可以自由地处理来自一个或多个流数据的事件,并使用一致、容错的状态。除此之外,用户可以注册事件时间和处理事件回调,从而使程序可以实现复杂的计算。
  • DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。
  • Table API 是以  为中心的声明式 DSL,其中表可能会动态变化(在表达流数据时)。Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁(代码量更少)。

你可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。

  • Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

Flink 程序与数据流结构

2018-10-14_09-51-09

Flink 应用程序结构就是如上图所示:

1、Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。

2、Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。

3、Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。

为什么选择 Flink?

Flink 是一个开源的分布式流式处理框架:

①提供准确的结果,甚至在出现无序或者延迟加载的数据的情况下。

②它是状态化的容错的,同时在维护一次完整的的应用状态时,能无缝修复错误。

③大规模运行,在上千个节点运行时有很好的吞吐量和低延迟。

更早的时候,我们讨论了数据集类型(有界 vs 无穷)和运算模型(批处理 vs 流式)的匹配。Flink 的流式计算模型启用了很多功能特性,如状态管理,处理无序数据,灵活的视窗,这些功能对于得出无穷数据集的精确结果是很重要的。

  • Flink 保证状态化计算强一致性。”状态化“意味着应用可以维护随着时间推移已经产生的数据聚合或者,并且 Filnk 的检查点机制在一次失败的事件中一个应用状态的强一致性。

exactly_once_state

  • Flink 支持流式计算和带有事件时间语义的视窗。事件时间机制使得那些事件无序到达甚至延迟到达的数据流能够计算出精确的结果。

out_of_order_stream

  • 除了提供数据驱动的视窗外,Flink 还支持基于时间,计数,session 等的灵活视窗。视窗能够用灵活的触发条件定制化从而达到对复杂的流传输模式的支持。Flink 的视窗使得模拟真实的创建数据的环境成为可能。

windows

  • Flink 的容错能力是轻量级的,允许系统保持高并发,同时在相同时间内提供强一致性保证。Flink 以零数据丢失的方式从故障中恢复,但没有考虑可靠性和延迟之间的折衷。

distributed_snapshots

  • Flink 能满足高并发和低延迟(计算大量数据很快)。下图显示了 Apache Flink 与 Apache Storm 在完成流数据清洗的分布式任务的性能对比。

streaming_performance

  • Flink 保存点提供了一个状态化的版本机制,使得能以无丢失状态和最短停机时间的方式更新应用或者回退历史数据。

savepoints

  • Flink 被设计成能用上千个点在大规模集群上运行。除了支持独立集群部署外,Flink 还支持 YARN 和Mesos 方式部署。
  • Flink 的程序内在是并行和分布式的,数据流可以被分区成 stream partitions,operators 被划分为operator subtasks; 这些 subtasks 在不同的机器或容器中分不同的线程独立运行;operator subtasks 的数量在具体的 operator 就是并行计算数,程序不同的 operator 阶段可能有不同的并行数;如下图所示,source operator 的并行数为 2,但最后的 sink operator 为1;

parallel_dataflows

  • 自己的内存管理Flink 在 JVM 中提供了自己的内存管理,使其独立于 Java 的默认垃圾收集器。 它通过使用散列,索引,缓存和排序有效地进行内存管理。
  • 丰富的库Flink 拥有丰富的库来进行机器学习,图形处理,关系数据处理等。 由于其架构,很容易执行复杂的事件处理和警报。

分布式运行

flink 作业提交架构流程可见下图:

1、Program Code:我们编写的 Flink 应用程序代码

2、Job Client:Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点。 Job Client 负责接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。 执行完成后,Job Client 将结果返回给用户

3、Job Manager:主进程(也称为作业管理器)协调和管理程序的执行。 它的主要职责包括安排任务,管理checkpoint ,故障恢复等。机器集群中至少要有一个 master,master 负责调度 task,协调 checkpoints 和容灾,高可用设置的话可以有多个 master,但要保证一个是 leader, 其他是 standby; Job Manager 包含 Actor system、Scheduler、Check pointing 三个重要的组件

4、Task Manager:从 Job Manager 处接收需要部署的 Task。Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点。 任务执行的并行性由每个 Task Manager 上可用的任务槽决定。 每个任务代表分配给任务槽的一组资源。 例如,如果 Task Manager 有四个插槽,那么它将为每个插槽分配 25% 的内存。 可以在任务槽中运行一个或多个线程。 同一插槽中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。

最后

本文主要讲了我接触到 Flink 的缘由,然后从数据集类型和数据运算模型开始讲起,接着介绍了下 Flink 是什么、Flink 的整体架构、提供的 API、Flink 的优点所在以及 Flink 的分布式作业运行的方式。水文一篇,希望你能够对 Flink 稍微有一点概念了。

Apache Flink

Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用JavaScala编写的分布式流数据流引擎。[2][3]Flink以数据并行管道方式执行任意流数据程序[4],Flink的流水线运行时系统可以执行批处理和流处理程序。[5][6]此外,Flink的运行时本身也支持迭代算法的执行。[7]

Flink提供高吞吐量、低延迟的流数据引擎[8]以及对事件-时间处理和状态管理的支持。Flink应用程序在发生机器故障时具有容错能力,并且支持exactly-once语义。[9]程序可以用Java、Scala[10]Python[11]SQL[12]等语言编写,并自动编译和优化[13]到在集群或云环境中运行的数据流程序。[14]

Flink并不提供自己的数据存储系统,但为Amazon KinesisApache KafkaAlluxioHDFSApache CassandraElasticsearch等系统提供了数据源和接收器。