Elasticsearch原理

动态更新的Lucene索引

Lucene处理方法,新收到的数据写到新文件里
Lucene把每次生成的倒排索引,叫做一个段(segment)。使用一个commit文件记录索引内所有的段,生产的segment的数据来源则是放在内存的buffer

  1. 当前索引有3个segment可用
  2. 新接收的数据进入内存buffer
  3. 内存buffer刷到磁盘,生成一个新的segment,commit文件同步更新
    动态更新的Lucene索引

利用磁盘缓存实现的准实时检索

相对来说,磁盘处理速度很慢,因此在上述第三步中还存在一个中间状态

  1. 内存buffer生成一个新的segment,刷新到文件系统缓存中,Lucene即可检索这个新的segment
  2. 文件系统缓存真正同步到磁盘,commit文件更新
    这一步刷新到文件系统缓存的步骤,在Elasticsearch中默认是1秒间隔,对于大部分应用来说,几乎是实时可搜索了。Elasticsearch也提供了单独的/_refresh接口,用户如果不满意,可以主动调整
    1
    2
    3
    4
    #加大refresh_interval参数
    # curl -XPOST http://localhost:9200/192.168.1.24-weixin_log-2017.10.28/_settings -d '
    { "refresh_interval": "10s" }
    '

如果导入的是历史数据,可以关闭

1
2
3
4
5
6
7
# curl -XPOST http://localhost:9200/192.168.1.24-weixin_log-2017.10.28 -d '
{
"settings": {
"refresh_interval": "-1"
}
}
'

在导入完成后,修改回原来的值或者手动调用一次

1
2
3
4
5
6
7
8
# curl -XPOST http://localhost:9200/192.168.1.24-weixin_log-2017.10.27/_refresh?pretty=true
{
"_shards" : {
"total" : 10,
"successful" : 10,
"failed" : 0
}
}

利用磁盘缓存实现的准实时检索

translog 提供的磁盘同步控制

Elasticsearch在把数据写入到内存buffer时,还记录了一个translog的日志,来保证期间发生主机错误、硬件故障等异常情况时数据不会丢失
如果在这期间发生异常,Elasticsearch会从 commit 位置开始恢复整个translog文件中的记录
translog提供的磁盘同步控制
等到真正把segment刷到磁盘,且commit 文件进行更新的时候,translog文件才清空,这一步叫flush。Elasticsearch也提供了/_flush接口
Elasticsearch默认设置为每30分钟主动执行一次flush,或者当translog文件大于512M时,两者分别可以通过index.translog.flush_threshold_periodindex.translog.flush_threshold_size参数控制,Elasticsearch还可以通过index.translog.flush_threshold_ops参数设置每收到多少条数据后flush一次

默认情况下,Elasticsearch每5秒,或每次请求操作结束前,会强制刷新 translog日志到磁盘。为了保证不丢数据 ,每次index、bulk、delete、update完成时,一定会触发新translog到磁盘,才给请求返回200 OK。这个改变提供数据安全性的同时降低了一点性能
可以在index template里设置如下参数来控制

1
2
3
{
"index.translog.durability": "async"
}

segment merge的影响

从前面看Lucene会写很多“新文件”,但这样会给服务器带来很大的负荷,每个文件都需要文件句柄,内存,CPU等各种资源,磁盘inode也有限
Elasticsearch通过segment merge操作将很多零散的segment做数据归并。这个过程有独立的线程来操作,并不影响新的segment的产生
当归并完成后,较大的segment刷到磁盘,commit文件做出相应变更,删除前面小的segment
segment merge

并归线程配置
segment归并过程中,需要读取segment,归并计算,再写新的segment,最后还要保存到磁盘。此过程非常消耗磁盘I/O和CPU的任务。所以,Elasticsearch提供了对归并线程的限速机制,要确保这个任务不会影响到其他的任务
归并线程的限速配置indices.store.throttle.max_bytes_per_sec在Elasticsearch 5.0默认是10240MB,使用了Lucene CMS(ConcurrentMergeScheduler)的auto throttle机制,正常情况下不需要手动配置。配置参数如下:

1
2
3
4
5
6
# curl -XPUT http://localhost:9200/_cluster/settings -d '
{
"persistent": {
"indices.store.throttle.max_bytes_per_sec": "100mb"
}
}'

归并线程数目,Elasticsearch也通过计算公式控制:Math.min(3,Runtime.getRuntime().availableProcessors()/2)。即服务器CPU核数的一半大于3时,启用3个归并线程,否则启动跟CPU核数的一半相等的线程数。如果确定磁盘I/O不够,可以降低index.merge.schedule.max_thread_count配置修改

归并策略主要有以下几条:

  • index.merge.policy.floor_segment 默认2MB,小于这个值得segment优先被归并
  • index.merge.policy.max_merge_at_once 默认一次归并10个segment
  • index.merge.policy.max_merge_at_once_explicit 默认forcemerge时一次最多归并30个segment
  • index.merge.policy.max_merged_segment 默认是5G,大于这个值得segment,不用参与归并,optimize除外

forcemerge接口
segment最大为5G,也会存在很多的segment,但因归并任务太消耗资源,所以一般不选择加大index.merge.policy.max_merged_segment配置,而在较低负荷时间段,通过forcemerge接口,强行归并segment

1
2
# curl -XPOST http://localhost:9200/192.168.1.24-weixin_log-2017.10.28/_forcemerge?max_num_segments=1
{"_shards":{"total":10,"successful":10,"failed":0}}

forcemerge线程消耗服务器资源比普通的归并线程还大,所以不要在写入数据的热索引执行这个操作,一般索引都是按天分割的

routing 和 replica的读写过程

在Elasticsearch分布层面上,当一个Elasticsearch节点收到一条数据的请求时,通过以下方式来确认数据应该存储到哪个分片的

  1. 路由计算
    Elasticsearch没有额外的依赖,对任意一条数据计算其对应分片的方式如下
    1
    shard = hash(routing) % number_of_primary_shards

每个数据都有一个routing参数,默认情况下,使用其_id值。将其_id值哈希计算后,对索引的主分片数取余,就是数据实际应该存储到的分片ID。
由于取余这种计算方式,完全依赖分母,所以不能修改索引的主分片数。一旦主分片数不一样,所有数据的存储位置计算结果都会改变,索引数据将完全不可读。

副本一致性
数据副本是分布式系统的一个标配。数据流程如下:
数据写入流程

  1. 客户端请求发送给node1节点,图中node1为master节点
  2. node1用数据的id_取余计算得到应该将数据存储到share0上,通过cluster state信息发现share0的主分片已经分配到node3上,node1将请求数据给node3
  3. node3完成请求数据的索引过程,存入主分片0,然后并行转发数据给分配由shard0的副本分片的node1和node2。当收到任一节点汇报副本分片数据写入成功,node3即返回给初始的接收节点node1,宣布数据写入成功。node1返回成功响应给客户端
    下面几个参数可以控制这个过程:
  • wait_for_active_shards: 两个副本只要有一个成功,就返回给客户端,默认值计算如下
    1
    int((primary + number_of_replicas)/2 ) +1

该参数通过index.write.wait_for_active_shards在索引级别设置,也可以根据需要单个写入请求上作为参数使用,设置成为1仅写完主分片就返回;设置为all,表示等所有副本分片都写入完成才返回,还可以设置为介于1和number_of_replicas +1之间的值

  • timeout: 如果集群出现异常,有些分片不可用,Elasticsearch会等待1分钟确认分片是否正常,可以用?timeout=30s参数来缩短这个等待的时间
    副本配置和分片配置不一样,可以随时调整。有些较大的索引,甚至可以在做optimize前,先把副本全部取消,等optimize完成后,在重新开启副本,节约单个segment的重复归并消耗
    1
    2
    3
    4
    5
    # curl -XPUT http://localhost:9200/192.168.1.24-weixin_log-2017.10.28/_settings -d '
    {
    "index": { "number_of_replicas": 0 }
    }
    '

shard 的 allocate 控制

Elasticsearch由以下策略来决定某个shard分配在哪个节点,是由Elasticsearch自行决定的

  • 新索引的生成
  • 索引的删除
  • 新增副本分片
  • 节点增减引发的数据平衡

Elasticsearch控制这部分逻辑的参数如下:

  • cluster.routing.allocation.enable 参数用来控制允许分配哪种分片。默认是all。可选项包含primariesnew_primariesnone则彻底拒绝分片
  • cluster.routing.allocation.allow_rebalance 参数用来控制什么时候允许数据平衡。默认是indices_all_active,即要求所有分片都正常启用以后,才可以进行数据平衡操作,否则在集群重启阶段,会浪费很多流量
  • cluster.routing.allocation.cluster_concurrent_rebalance 参数用来控制集群内同时运行的数据均衡任务个数。默认是2个,如果有节点增减,且集群负载压力不高的情况下,可以适当加大
  • cluster.routing.allocation.node_initial_primaries_recoveries 参数用来控制节点重启时,允许同时恢复几个主分片。默认是4个,如果节点是多磁盘,且I/O压力不大,可以适当增加
  • cluster.routing.allocation.node_concurrent_recoveries 参数用来控制节点除了主分片重启恢复以外其他情况下,允许同时运行的数据恢复任务。默认是2个。所以,节点重启时,可以看到主分片通过本地恢复迅速完成,副分片通过网络复制的恢复却很慢,并发线程本身也减少了一半。在Elasticsearch 1.6后,冷索引的副本分片也可以本地恢复,可以适当加大
  • indices.recovery.concurrent_streams 参数用来控制节点从网络复制恢复副本分片时的数据流个数。默认是3个,可以配合上一条配置一起加大
  • indices.recovery.max_bytes_per_sec 参数用来控制节点恢复时的速率。默认是 40MB

运维人员较常见的策略有2种:

  • 磁盘限额,为了保护节点数据安全。Elasticsearch会定时(cluster.info.update.interval默认为30秒)检查一下各节点的数据目录磁盘使用情况。在达到cluster.routing.allocation.disk.watermark.low(默认85%)的时候,新索引分片就不会再分配到这个节点上了。在达到cluster.routing.allocation.disk.watermark.high(默认90%)的时候就会触发该节点现存分片的数据平衡,把数据挪到其他节点上去。这两个值可以写成百分比或者具体字节数。可以适当修改参数配置:

    1
    2
    3
    4
    5
    6
    7
    # curl -XPUT http://localhost:9200/_cluster/settings -d '{
    "transient": {
    "cluster.routing.allocation.disk.watermark.low": "85%",
    "cluster.routing.allocation.disk.watermark.high": "10gb",
    "cluster.info.update.interval": "1m"
    }
    }'
  • 热索引分片不均。默认情况下,Elasticsearch集群的数据均衡策略是以各节点的分片总数(indices_all_active)作为基准的。可以提高均衡搜索压力。一般压力集群中在新索引的数据写入方面,正常运行时,没有问题的。但当集群扩容时,新加入集群的节点,分片总数远低于其他节点,如果有新的索引创建,Elasticsearch的默认策略会导致新索引的所有主分片几乎全部分配到这个新节点上。整个集群的写入压力全部在这个节点上,可以会导致这个节点出现异常,集群出现异常。
    在ELK环境中,需要预先计算好索引的分片数,配置好单节点的分片限额。比如:一个5节点的集群,索引主分片10个,副本1个,则平均下来每个节点应该有4个分片,则应配置为:

    1
    2
    3
    # curl -XPUT http://localhost:9200/192.168.1.24-weixin_log-2017.10.28/_settings -d '{
    "index": { "routing.allocation.total_shards_per_node": "5" }
    }'

这里配置的是5而不是4,需要预防有机器故障,分片发生迁移失败的情况

Elasticsearch中有一系列参数互相影响,最终联合决定分片分配:

  • cluster.routing.allocation.balance.shard 节点上分配分片的权重,默认值为 0.45。 数值越大越倾向于在节点层面均衡分片
  • cluster.routing.allocation.balance.index 每个索引往单个节点上分配分片的权重,默认值为0.55。 数值越大越倾向于在索引层面均衡分片
  • cluster.routing.allocation.balance.threshold 大于阈值则触发均衡操作。默认值为1。 Elasticsearch计算方法是:
    1
    (indexBalance (node.numShards(index) - avgShardsPerNode(index)) + shardBalance (node.numShards() - avgShardsPerNode)) <=> weightthreshold

所以,也可以采取加大cluster.routing.allocation.balance.index的措施,甚至设置cluster.routing.allocation.balance.shard 为0来尽量采用索引内的节点均衡

reroute 接口

上述都是从策略层面控制分片的选择,在必要的时候可以通过Elasticsearch的reroute接口,手动完成对分片的分配选择的控制
reroute接口支持三种指令:allocatemovecancel,常用的是allocatemove

  • allocate指令:因为负载过高等原因,有时候个别分片长期处于UNASSIGNED状态,可以手动分配分片到指定节点上。默认情况下只允许手动分配副本分片,所以如果是主分片故障,需要单独加一个allow_primary选项
    1
    2
    3
    4
    5
    6
    7
    # curl -XPOST http://localhost:9200/_cluster/reroute -d '{
    "commands": [{
    "allocate": {
    "index": "192.168.1.24-weixin_log-2017.10.28", "shard": 61, "node": "192.168.1.42", "allow_primary": true
    }
    }]
    }'

注:如果是历史数据的话,需要提前确认哪个节点上保留有这个分片的实际目录,且目录大小最大。然后手动分片到这个节点上,以此减少数据丢失。

  • move指令: 因负载过高,磁盘利用率过高,服务器下线,更换磁盘等原因,可能会需要从节点上移走部分分片
    1
    2
    3
    4
    5
    6
    7
    # curl -XPOST http://localhost:9200/_cluster/reroute -d '{
    "command": [{
    "move": {
    "index": "192.168.1.24-weixin_log-2017.10.28", "shard": 0, "from_node": "192.168.1.43", "to_node": "192.168.1.42"
    }
    }]
    }'

如果自己手动reroute失败,Elasticsearch返回的响应中会带上失败的原因。从Elasticsearch 5.0开始,新增了一个allocation explain接口,用来解释指定分片的具体失败原因

1
2
3
4
5
# curl -XGET 'http://localhost:9200/_cluster/allocation/explain' -d '{
"index": "192.168.1.24-weixin_log-2017.10.28",
"shard": 0,
"primary": false
}'

得到的响应如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
"shard": {
"index": "192.168.1.24-weixin_log-2017.10.28",
"index_uuid": "KnW0-zELRs6PK8410r38ZA",
"id": 0,
"primary": false
},
"assigned": false,
"shard_state_fetch_pending": false,
"unassigned_info": {
"reason": "INDEX_CREATED",
"at": "2017-10-28T20:04:23.620Z"
},
"allocation_delay_ms": 0,
"remaining_delay_ms": 0,
"nodes": {
"V-Spi0AyRZ6ZvKbaI3691w": {
"node_name": "H5dfFeA",
"node_attributes": {
"bar": "baz"
},
"store": {
"shard_copy": "NONE"
},
"final_decision": "NO",
"final_explanation": "the shard cannot be assigned because one or more allocation decider returns a 'NO' decision",
"weight": 0.06666675,
"decisions": [{
"decider": "filter",
"decision": "NO",
"explanation": "node does not match index include filters [foo:\"bar\"]"
}]
},
"Qc6VL8c5RWaw1qXZ0Rg57g": {
...
}
}
}

这是一段很长的JSON字符,会把集群里所有节点都列上来,挨个解释为什么不能分配到这个节点上

节点下线(迁移)

集群中个别节点出现故障预警,也是在Elasticsearch运维工作中常见的情况。如果已经稳定运行了一段时间的集群,每个节点上都保存有数量不少的分片,这时通过 reroute接口手动转移就太麻烦了,可以采取另一种方式:

1
2
3
4
5
# curl -XPUT http://localhost:9200/_cluster/settings -d '{
"transient": {
"cluster.routing.allocation.exclude._ip": "192.168.1.43"
}
}'

Elasticsearch就会把这个IP节点上的所有分片,都自动转移到其他的节点上。等转移完成后,这个节点就可以毫无影响的下线了
_ip类似的参数还有_host_name等。这类参数不仅仅是cluster级别,也可以是index级别

冷热数据的读写分离

实施步骤如下:

  1. N台机器做热数据的存储,上面只放当天的数据。这N台数据节点上面的elasticsearch.yml中配置node.tag: host
  2. 之前的数据放在另外的M台机器上,这M台机器冷数据节点中配置node.tag: stale
  3. 模版中控制对新建索引添加hot标签:

    1
    2
    3
    4
    5
    6
    7
    {
    "order": 0,
    "template": "*",
    "settings": {
    "index.routing.allocation.require.tag": "hot"
    }
    }
  4. 每天计划任务更新索引的配置,将tag更变为stale,索引会自动迁移到M台冷数据节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # curl -XPUT http://localhost:9200/indexname/_settings -d '
    {
    "index": {
    "routing": {
    "allocation": {
    "require": {
    "tag": "stale"
    }
    }
    }
    }
    }'

这样,写操作就在N台热数据节点上,大范围的独操作集中在M台冷数据节点上,避免了堵塞影响
参考: https://www.elastic.co/guide/en/elasticsearch/reference/master/shard-allocation-filtering.html

Elasticsearch自动发现的配置

Elasticsearch是P2P类型(gossip协议)的分布式系统,除了集群状态管理以外,其他所有的请求都可以发送到集群内任意一台节点上,这个节点可以自己找到需求转发给哪些节点,并且直接跟这个节点通讯。
在Elasticsearch 5.0以后自动发现方式为单播(unicast)方式,原来的组播(multicast)被彻底删除。
单播方式的配置里,提供几台节点的地址里(和可选端口),Elasticsearch将其视作 gossip router角色,借以完成集群的发现。由于这只是Elasticsearch内很小的一个功能,所以,gossip router角色并不需要单独配置,每个Elasticsearch节点都可以担任。所以采用单播方式的集群,各个节点都配置相同的几个节点列表作为router即可。
此外,考虑到节点有时候因为高负载,慢GC等原因,可能会有偶尔没有及时相应的ping包,一般建议稍微加大Fault Detection的超时时间。同样基于安全考虑做的变更,还有监听的主机名,现在默认只监听本地lo网卡上的信号。所以正式环境上需要修改配置为监听具体的网卡:

1
2
3
4
5
6
network.host: "192.168.1.41"
discovery.zen.minimum_master_nodes:3
discovery.zen.ping.timeout: 100s
discovery.zen.fd.ping_timeout: 100s
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicase.hosts: ["192.168.1.41","192.168.1.42","192.168.1.43"]

上面的fdfault detection的缩写

  • discovery.zen.ping.timeout 参数仅在加入或者选举master主节点的时候才起作用
  • discovery.zen.fd.ping_timeout 参数在稳定运行的集群中,master检测所有节点,以及节点检测master是否正常时,长期有用

既然是长期使用,自然还有运行间隔和重试配置,可以根据实际情况调整:

1
2
discovery.zen.fd.ping_intervel: 10s
discovery.zen.fd.ping_retries: 10


本文出自”Jack Wang Blog”:http://www.yfshare.vip/2017/11/02/Elasticsearch原理/