NoSQL
ElasticSearch 简介
概述
Elasticsearch是一个建立在全文搜索引擎库Apache Lucene 基础上的实时的分布式搜索和分析引擎,它可以帮助我们用很快的速度去处理大规模数据,可以用于全文检索、结构化检索、推荐、分析以及统计聚合等多种场景。
数据模型
Lucene
Lucene提供了最基本的索引和查询的功能,是一个单机的搜索库,并且没有主键概念和更新逻辑
Lucene/Elasticsearch数据模型
@startmindmap
*[#orange] 基本数据类型
**[#lightgreen]: Index 索引
由很多的Document组成;
**[#lightgreen]: Document 文档
由很多的Field组成,是Index和Search的最小单位;
**[#lightgreen]: Field 字段
由很多的Term组成,包括Field Name和Field Value;
**[#lightgreen]: Term 词元
由很多的字节组成,可以分词;
*[#orange] 索引类型
**[#lightblue]: Invert Index 倒排索引
索引的Key是Term,Value是DocID的链表
通过Term可以查询到拥有该Term的文档;
***[#cyan] 存储类型
****[#FFBBCC] DOCS 只存储DocID
****[#FFBBCC]: DOCS_AND_FREQS
存储DocID和词频TermFreq;
****[#FFBBCC]: DOCS_AND_FREQS_AND_POSITIONS
存储DocID、词频TermFreq和位置;
****[#FFBBCC]: DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS
存储DocID、词频TermFreq、位置和偏移;
**[#lightblue]: DocValues 正排索引
Key 是DocID和FieldName,Value是FieldValue
列式存储,通过DocID可以快速读取到该Doc的特定字段的值
一般用于sort,agg等需要高频读取Doc字段值的场景;
**[#lightblue]: Store 字段原始内容存储
Key是Doc ID,Value是FiledName和FiledValue
同一文档的多个Field的Store会存储在一起;
@endmindmap
Elasticsearch对Lucene的扩展
Elasticsearch通过增加_id、_version、_source、_routing和_seq_no等多个系统字段,实现了分布式搜索和部分字段更新等Lucene缺失的功能
System Field | 含义 | Lucene Index | Lucene DocValues | Lucene Store |
---|---|---|---|---|
_uid | 主键 | Y | Y | |
_version | 版本 | Y | ||
_source | 原始值 | Y | ||
_seq_no | 序号 | Y | Y | |
_primary_term | 主编号 | Y | ||
_routing | 路由 | Y | Y | |
_field_names | 字段名 | Y |
1 _id
_id是一个用户级别的虚拟字段,Lucene不会存储该字段的值。表示Doc的主键,在写入的时候,可以指定该Doc的ID值,如果不指定,则系统自动生成一个唯一的UUID值。
Lucene中没有主键索引,要保证系统中同一个Doc不会重复,Elasticsearch引入了_id字段来实现主键。每次写入的时候都会先查询id,如果有,则说明已经有相同Doc存在了。
2 _uid
Index#Doc, Store
_uid的格式是:type + ‘#’ + id。同Index下值是唯一的
3 _version
DcoValues
每个Doc都会有一个Version,该Version可以由用户指定,也可以由系统自动生成。如果是系统自动生成,那么每次Version都是递增1。
Elasticsearch通过使用version来保证对文档的变更能以正确的顺序执行,避免乱序造成的数据丢失:
- 首次写入Doc的时候,会为Doc分配一个初始的Version:V0,该值根据VersionType不同而不同。
- 再次写入Doc的时候,如果Request中没有指定Version,则会先加锁,然后去读取该Doc的最大版本V1,然后将V1+1后的新版本写入Lucene中。
- 再次写入Doc的时候,如果Request中指定了Version:V1,则继续会先加锁,然后去读该Doc的最大版本V2,判断V1==V2,如果不相等,则发生版本冲突。否则版本吻合,继续写入Lucene。
- 当做部分更新的时候,会先通过GetRequest读取当前id的完整Doc和V1,接着和当前Request中的Doc合并为一个完整Doc。然后执行一些逻辑后,加锁,再次读取该Doc的最大版本号V2,判断V1==V2,如果不相等,则在刚才执行其他逻辑时被其他线程更改了当前文档,需要报错后重试。如果相等,则期间没有其他线程修改当前文档,继续写入Lucene中。这个过程就是一个典型的read-then-update事务。
4 _source
Store
存储原始文档,也可以通过过滤设置只存储特定Field
_source字段的主要目的是通过doc_id读取该文档的原始内容,所以只需要存储Store即可
Elasticsearch中使用_source字段可以实现以下功能:
- Update:部分更新时,需要从读取文档保存在_source字段中的原文,然后和请求中的部分字段合并为一个完整文档。如果没有_source,则不能完成部分字段的Update操作。
- Rebuild:最新的版本中新增了rebuild接口,可以通过Rebuild API完成索引重建,过程中不需要从其他系统导入全量数据,而是从当前文档的_source中读取。如果没有_source,则不能使用Rebuild API。
- Script:不管是Index还是Search的Script,都可能用到存储在Store中的原始内容,如果禁用了_source,则这部分功能不再可用。 Summary:摘要信息也是来源于_source字段。
5 _seq_no
Index#DOCS_AND_FREQS_AND_POSITIONS, Analyzer, DocValues
严格递增的顺序号,每个文档一个,Shard级别严格递增,保证后写入的Doc的_seq_no大于先写入的Doc的_seq_no。任何类型的写操作,包括index、create、update和Delete,都会生成一个_seq_no。
每个文档在使用Lucene的document操作接口之前,会获取到一个_seq_no,这个_seq_no会以系统保留Field的名义存储到Lucene中,文档写入Lucene成功后,会标记该seq_no为完成状态,这时候会使用当前seq_no更新local_checkpoint。
checkpoint分为local_checkpoint和global_checkpoint,主要是用于保证有序性,以及减少Shard恢复时数据拷贝的数据拷贝量。
Elasticsearch中_seq_no的作用有两个,一是通过doc_id查询到该文档的seq_no,二是通过seq_no范围查找相关文档,所以也就需要存储为Index和DocValues(或者Store)。由于是在冲突检测时才需要读取文档的_seq_no,而且此时只需要读取_seq_no,不需要其他字段,这时候存储为列式存储的DocValues比Store在性能上更好一些。
6 _primary_term
DocValues
每当Primary Shard发生重新分配时,比如重启,Primary选举等,_primary_term会递增1。_primary_term主要是用来恢复数据时处理当多个文档的_seq_no一样时的冲突,避免Primary Shard上的写入被覆盖。
Elasticsearch中_primary_term只需要通过doc_id读取到即可,所以只需要保存为DocValues就可以了.
7 _routing
Index#Doc, Store
在mapping中,或者Request中可以指定按某个字段路由。默认是按照_Id值路由。
Elasticsearch中文档级别的_routing主要有两个目的,一是可以查询到使用某种_routing的文档有哪些,当发生_routing变化时,可以对历史_routing的文档重新读取再Index,这个需要倒排Index。另一个是查询到文档后,在Response里面展示该文档使用的_routing规则,这里需要存储为Store。
8 _field_names
Index#Doc
该字段会索引某个Field的名称,用来判断某个Doc中是否存在某个Field,用于exists或者missing请求。
Elasticsearch中_field_names的目的是查询哪些Doc的这个Field是否存在,所以只需要倒排Index即可。
ElasticSearch 查询
查询流程
Lucene的读
TopDocs search(Query query, int n)
返回最满足Query的N个结果Document doc(int docID)
通过doc id查询Doc内容int count(Query query)
通过Query获取到命中数
Search类实时(Near Real Time)请求:同时查询内存和磁盘上的Segment,最后将结果合并后返回。所有的搜索系统一般都是两阶段查询,第一阶段查询到匹配的DocID,第二阶段再查询DocID对应的完整文档,在Elasticsearch中称为query_then_fetch
。而一阶段查询query_and_fetch
,适用于查询一个Shard的请求。根据TF(Term Frequency)和DF(Document Frequency)来算分的查询使用三阶段查询,即先收集所有Shard中的TF和DF值,然后将这些值带入请求中,再次执行query_then_fetch,这样算分的时候TF和DF就是准确的,称作DFS_query_then_fetch
,另有DFS_query_and_fetch
。另一种选择是用BM25代替TF/DF模型
Get类实时(Real Time)请求:先查询内存中的TransLog,没找到则再查询磁盘上的TransLog,还没有则再查询磁盘上的Segment。这种查询顺序可以保证查询到最新版本的Doc
Client Node
Get Remove Cluster Shard 判断是否需要跨集群访问,如果需要,则获取到要访问的Shard列表
Get Search Shard Iterator 获取当前Cluster中要访问的Shard,和上一步中的Remove Cluster Shard合并,构建出最终要访问的完整Shard列表。 这一步中,会在Primary Node和多个Replica Node中选择出一个要访问的Shard
For Every Shard:Perform 遍历每个Shard,对每个Shard执行后面逻辑
Send Request To Query Shard 将查询阶段请求发送给相应的Shard
Merge Docs 上一步将请求发送给多个Shard后,这一步就是异步等待返回结果,然后对结果合并。这里的合并策略是维护一个Top N大小的优先级队列,每当收到一个shard的返回,就把结果放入优先级队列做一次排序,直到所有的Shard都返回。
Send Request To Fetch Shard 选出Top N个Doc ID后发送Fetch Shard给这些Doc ID所在的Shard,最后会返回Top N的Doc的内容。
Query Phase
Create Search Context 创建Search Context,之后Search过程中的所有中间状态都会存在Context中
Parse Query 解析Query的Source,将结果存入Search Context
Get From Cache 判断请求是否允许被Cache,如果允许,则检查Cache中是否已经有结果,如果有则直接读取Cache,如果没有则继续执行后续步骤,执行完后,再将结果加入Cache
Add Collectors Collector主要目标是收集查询结果,实现排序,自定义结果集过滤和收集等。这一步会增加多个Collectors,多个Collector组成一个List
Lucene::search 调用Lucene中IndexSearch的search接口,执行真正的搜索逻辑。每个Shard中会有多个Segment,每个Segment对应一个LeafReaderContext,这里会遍历每个Segment,到每个Segment中去Search结果,然后计算分数
Rescore 根据Request中是否包含rescore配置决定是否进行二阶段排序
Suggest::execute 如果有推荐请求,则在这里执行推荐请求。
Aggregation::execute 如果含有聚合统计请求,则在这里执行。Elasticsearch中的aggregate的处理逻辑也类似于Search,通过多个Collector来实现。在Client Node中也需要对aggregation做合并
Fetch Phase
通过DocID获取到用户需要的完整Doc内容。这些内容包括了DocValues,Store,Source,Script和Highlight等
ElasticSearch 写入
写入流程
写操作在搜索系统和NoSQL数据库中的对比
- 实时性:
- 搜索系统的Index一般都是NRT(Near Real Time),近实时的,比如Elasticsearch中,Index的实时性是由refresh控制的,默认是1s,最快可到100ms,那么也就意味着Index doc成功后,需要等待一秒钟后才可以被搜索到。
- NoSQL数据库的Write基本都是RT(Real Time),实时的,写入成功后,立即是可见的。Elasticsearch中的Index请求也能保证是实时的,因为Get请求会直接读内存中尚未Flush到存储介质的TransLog。
- 可靠性:
- 搜索系统对可靠性要求都不高,一般数据的可靠性通过将原始数据存储在另一个存储系统来保证,当搜索系统的数据发生丢失时,再从其他存储系统导一份数据过来重新rebuild就可以了。在Elasticsearch中,通过设置TransLog的Flush频率可以控制可靠性,要么是按请求,每次请求都Flush;要么是按时间,每隔一段时间Flush一次。一般为了性能考虑,会设置为每隔5秒或者1分钟Flush一次,Flush间隔时间越长,可靠性就会越低。
- NoSQL数据库作为一款数据库,必须要有很高的可靠性,数据可靠性是生命底线,决不能有闪失。如果把Elasticsearch当做NoSQL数据库,此时需要设置TransLog的Flush策略为每个请求都要Flush,这样才能保证当前Shard写入成功后,数据能尽量持久化下来。
写操作的关键点
- 可靠性:或者是持久性,数据写入系统成功后,数据不会被回滚或丢失。
- 一致性:数据写入成功后,再次查询时必须能保证读取到最新版本的数据,不能读取到旧数据。
- 原子性:一个写入或者更新操作,要么完全成功,要么完全失败,不允许出现中间状态。
- 隔离性:多个写入操作相互不影响。
- 实时性:写入后是否可以立即被查询到。
- 性能:写入性能,吞吐量到底怎么样。
Elasticsearch的写
Replica 副本
Elasticsearch采用多Shard方式,通过配置routing规则将数据分成多个数据子集,每个数据子集提供独立的索引和搜索功能。当写入文档的时候,根据routing规则,将文档发送给特定Shard中建立索引。
Elasticsearch整体架构上采用了一主多副的方式:
@startuml
!theme mars
skinparam component {
BackgroundColor gold
ArrowColor #FF6655
}
frame "DataNode" as primary {
node [Primary] as p1
}
frame "DataNode" as replica1 {
node [Replica1] as r1
p1 --> r1
}
frame "DataNode" as replica2 {
node [Replica2] as r2
p1 --> r2
}
@enduml
每个Index由多个Shard组成,每个Shard有一个主节点和多个副本节点,副本个数可配。但每次写入的时候,写入请求会先根据_routing规则选择发给哪个Shard,Index Request中可以设置使用哪个Filed的值作为路由参数,如果没有设置,则使用Mapping中的配置,如果mapping中也没有配置,则使用_id作为路由参数,然后通过_routing的Hash值选择出Shard(在OperationRouting类中),最后从集群的Meta中找出出该Shard的Primary节点。
请求接着会发送给Primary Shard,在Primary Shard上执行成功后,再从Primary Shard上将请求同时发送给多个Replica Shard,请求在多个Replica Shard上执行成功并返回给Primary Shard后,写入请求执行成功,返回结果给客户端。
这种模式下,写入操作的延时就等于latency = Latency(Primary Write) + Max(Replicas Write)。只要有副本在,写入延时最小也是两次单Shard的写入时延总和,写入效率会较低,但是这样的好处也很明显,避免写入后,单机或磁盘故障导致数据丢失,在数据重要性和性能方面,一般都是优先选择数据,除非一些允许丢数据的特殊场景。
Elasticsearch里为了减少磁盘IO保证读写性能,一般是每隔一段时间(比如5分钟)才会把Lucene的Segment写入磁盘持久化,对于写入内存,但还未Flush到磁盘的Lucene数据,如果发生机器宕机或者掉电,那么内存中的数据也会丢失,这时候Elasticsearch使用TransLog来保证数据不丢失
TransLog
对于每一个Shard,写入请求到达该Shard后,先写Lucene文件,创建好索引,此时索引还在内存里面,接着去写TransLog,写完TransLog后,刷新TransLog数据到磁盘上,写磁盘成功后,请求返回给用户。
在写Lucene内存后,需要通过Refresh把内存的对象转成完整的Segment后,然后再次reopen后才能被搜索,一般这个时间设置为1秒钟,导致写入Elasticsearch的文档,最快要1秒钟才可被从搜索到,所以Elasticsearch在搜索方面是NRT(Near Real Time)近实时的系统。而如果写完之后实时使用GetById查询,则可以直接从TransLog中查询到,这时候就成了RT(Real Time)实时系统。
每隔一段比较长的时间,比如30分钟后,Lucene会把内存中生成的新Segment刷新到磁盘上,这时会清空掉旧的TransLog。
Update流程
@startuml
!theme mars
start
:Update请求;
:GetDocById,找到相应的文档Doc;
if (内存TransLog上找到该文档) then (yes)
elseif (磁盘TransLog上找到该文档) then (yes)
elseif (磁盘Lucene Segment上找到该文档) then (yes)
else (nothing)
:文档未找到;
stop
endif
:找到完整Doc后,记录版本号为V1;
:将版本V1的全量Doc和请求中的部分字段Doc合并为一个完整的Doc
后续操作相当于Index请求+Delete请求,即先增后删;
:加锁;
:从versionMap中读取该id的最大版本号V2;
if (检查版本是否冲突V1==V2) then (冲突)
:回退到开始的“Update doc”阶段,重新执行;
endif
:将Version + 1得到V3,再将Doc加入到Lucene中去;
:写入Lucene成功后,将当前V3更新到versionMap中;
:释放锁;
end
@enduml
Client Node
Ingest Pipeline 在这一步可以对原始文档做一些处理,比如HTML解析,自定义的处理,具体处理逻辑可以通过插件来实现。在Elasticsearch中,由于Ingest Pipeline会比较耗费CPU等资源,可以设置专门的Ingest Node,专门用来处理Ingest Pipeline逻辑。如果当前Node不能执行Ingest Pipeline,则会将请求发给另一台可以执行Ingest Pipeline的Node
Auto Create Index 判断当前Index是否存在,如果不存在,则需要自动创建Index,这里需要和Master交互。也可以通过配置关闭自动创建Index的功能
Set Routing 设置路由条件,如果Request中指定了路由条件,则直接使用Request中的Routing,否则使用Mapping中配置的,如果Mapping中无配置,则使用默认的_id字段值。在这一步中,如果没有指定id字段,则会自动生成一个唯一的_id字段,目前使用的是UUID
Construct BulkShardRequest 由于Bulk Request中会包括多个(Index/Update/Delete)请求,这些请求根据routing可能会落在多个Shard上执行,这一步会按Shard挑拣Single Write Request,同一个Shard中的请求聚集在一起,构建BulkShardRequest,每个BulkShardRequest对应一个Shard
Send Request To Primary 这一步会将每一个BulkShardRequest请求发送给相应Shard的Primary Node
Primary Node
PrimaryOperationTransportHandler.messageReceived
1. Index or Update or Delete
循环执行每个Single Write Request,对于每个Request,根据操作类型(CREATE/INDEX/UPDATE/DELETE)选择不同的处理逻辑。其中,Create/Index是直接新增Doc,Delete是直接根据_id删除Doc
2. Translate Update To Index or Delete
这一步是Update操作的特有步骤,在这里,会将Update请求转换为Index或者Delete请求。首先,会通过GetRequest查询到已经存在的同_id Doc(如果有)的完整字段和值(依赖_source字段),然后和请求中的Doc合并。同时,这里会获取到读到的Doc版本号,记做V1
3. Parse Doc
这里会解析Doc中各个字段。生成ParsedDocument对象,同时会生成uid Term
4. Update Mapping
Elasticsearch中有个自动更新Mapping的功能,就在这一步生效。会先挑选出Mapping中未包含的新Field,然后判断是否运行自动更新Mapping,如果允许,则更新Mapping
5. Get Sequence Id and Version
由于当前是Primary Shard,则会从SequenceNumber Service获取一个sequenceID和Version。SequenceID在Shard级别每次递增1,SequenceID在写入Doc成功后,会用来初始化LocalCheckpoint。Version则是根据当前Doc的最大Version递增1
6. Add Doc To Lucene
这一步开始的时候会给特定_uid加锁,然后判断该_uid对应的Version是否等于之前Translate Update To Index步骤里获取到的Version,如果不相等,则说明刚才读取Doc后,该Doc发生了变化,出现了版本冲突,这时候会抛出一个VersionConflict的异常,该异常会在Primary Node最开始处捕获,重新从“Translate Update To Index or Delete”开始执行
如果Version相等,则继续执行,如果已经存在同id的Doc,则会调用Lucene的UpdateDocument(uid, doc)接口,先根据uid删除Doc,然后再Index新Doc。如果是首次写入,则直接调用Lucene的AddDocument接口完成Doc的Index,AddDocument也是通过UpdateDocument实现
这里如何保证Delete-Then-Add的原子性,怎么避免中间状态时被Refresh?答案是在开始Delete之前,会加一个Refresh Lock,禁止被Refresh,只有等Add完后释放了Refresh Lock后才能被Refresh,这样就保证了Delete-Then-Add的原子性
7. Write Translog
写完Lucene的Segment后,会以keyvalue的形式写TransLog,Key是_id,Value是Doc内容。当查询的时候,如果请求是GetDocByID,则可以直接根据_id从TransLog中读取到,满足NoSQL场景下的实时性要去。这一步的最后,会标记当前SequenceID已经成功执行,接着会更新当前Shard的LocalCheckPoint
8. Renew Bulk Request
这里会重新构造Bulk Request
9. Flush Translog
这里会根据TransLog的策略,选择不同的执行方式,要么是立即Flush到磁盘,要么是等到以后再Flush。Flush的频率越高,可靠性越高,对写入性能影响越大
10. Send Requests To Replicas
这里会将刚才构造的新的Bulk Request并行发送给多个Replica,然后等待Replica的返回,这里需要等待所有Replica返回后(可能有成功,也有可能失败),Primary Node才会返回用户。如果某个Replica失败了,则Primary会给Master发送一个Remove Shard请求,要求Master将该Replica Shard从可用节点中移除。同时会将SequenceID,PrimaryTerm,GlobalCheckPoint等传递给Replica
发送给Replica的请求中,Action Name等于原始ActionName + [R],这里的R表示Replica。通过这个[R]的不同,可以找到处理Replica请求的Handler
11. Receive Response From Replicas
Replica中请求都处理完后,会更新Primary Node的LocalCheckPoint
Replica Node
ReplicaOperationTransportHandler.messageReceived
1. Index or Delete
2. Parse Doc
3. Update Mapping
4. Get Sequence Id and Version
Primary Node中会生成Sequence ID和Version,然后放入ReplicaRequest中,这里只需要从Request中获取到就行
5. Add Doc To Lucene
6. Write Translog
7. Flush Translog
写操作的关键点
- 可靠性:由于Lucene的设计中不考虑可靠性,在Elasticsearch中通过Replica和TransLog两套机制保证数据的可靠性。
- 一致性:Lucene中的Flush锁只保证Update接口里面Delete和Add中间不会Flush,但是Add完成后仍然有可能立即发生Flush,导致Segment可读。这样就没法保证Primary和所有其他Replica可以同一时间Flush,就会出现查询不稳定的情况,这里只能实现最终一致性。
- 原子性:Add和Delete都是直接调用Lucene的接口,是原子的。当部分更新时,使用Version和锁保证更新是原子的。
- 隔离性:仍然采用Version和局部锁来保证更新的是特定版本的数据。
- 实时性:使用定期Refresh Segment到内存,并且Reopen Segment方式保证搜索可以在较短时间(比如1秒)内被搜索到。通过将未刷新到磁盘数据记入TransLog,保证对未提交数据可以通过ID实时访问到。
- 性能:一是不需要所有Replica都返回后才能返回给用户,只需要返回特定数目的就行;二是生成的Segment先在内存中提供服务,等一段时间后才刷新到磁盘,Segment在内存这段时间的可靠性由TransLog保证;三是TransLog可以配置为周期性的Flush,但这个会给可靠性带来伤害;四是每个线程持有一个Segment,多线程时相互不影响,相互独立,性能更好;五是系统的写入流程对版本依赖较重,读取频率较高,因此采用了versionMap,减少热点数据的多次磁盘IO开销。
Elasticsearch 查询性能
Lucene查询原理
Lucene的数据结构
FST
保存term字典,可以在FST上实现单Term、Term范围、Term前缀和通配符查询等
对字符串范围/前缀/通配符查询,Lucene会从FST中获取到符合条件的所有Term,然后就可以根据这些Term再查找倒排链,找到符合条件的doc。FST相关的字符串查询要比倒排链查询慢很多(以上10倍)
倒排链
保存了每个term对应的docId的列表,采用skipList的结构保存,用于快速跳跃
对单个词条进行查询,Lucene会读取该词条的倒排链,倒排链中是一个有序的docId列表。单个倒排链扫描的性能在每秒千万级
BKD-Tree
BKD-Tree是一种保存多维空间点的数据结构,用于数值类型(包括空间点)的快速查找
对数字类型进行范围查找,Lucene会通过BKD-Tree找到符合条件的docId集合,但这个集合中的docId并非有序的。和其他条件做交集的时候,需要构建有序的docID数组或BitSet
DocValues
基于docId的列式存储,由于列式存储的特点,可以有效提升排序聚合的性能
IndexSorting
IndexSorting是一种预排序,即数据预先按照某种方式进行排序,它是Index的一个设置,这里数据是在每个Segment内有序。一个Segment中的每个文档,都会被分配一个docID,docID从0开始,顺序分配。在没有IndexSorting时,docID是按照文档写入的顺序进行分配的,在设置了IndexSorting之后,docID的顺序就与IndexSorting的顺序一致
优化查询性能
提前中断
查询的Sort顺序与IndexSorting的顺序相同,并且不需要获取符合条件的记录总数(TotalHits)时,这时候可以提前中断查询
提高数据压缩率
不管行存(Store)与列存(DocValues)的存储方式,应用IndexSorting后,相邻数据的相似度就会越高,也就越利于压缩
降低写入性能
实现原理
Flush时保证Segment内数据有序
数据写入Lucene后,并不是立即可查的,要生成Segment之后才能被查到。为了保证近实时的查询,ES会每隔一秒进行一次Refresh,Refresh就会调用到Lucene的Flush生成新的Segment
每个doc写入进来之后,按照写入顺序被分配一个docID,然后被IndexingChain处理,依次要对invert index、store fields、doc values和point values进行处理,有些数据会直接写到文件里,主要是store field和term vector,其他的数据会放到memory buffer中
在Flush时,首先根据设定的列排序,这个排序可以利用内存中的doc values,排序之后得到老的docID到新docID的映射,因为之前docID是按照写入顺序生成的,现在重排后,生成的是新的排列。对于已经写到文件中的数据,比如store field和term vector,需要从文件中读出来,重新排列后再写到一个新文件里,原来的文件就相当于一个临时文件。对于内存中的数据结构,直接在内存中重排后写到文件中。
相比没有IndexSorting时,对性能影响比较大的一块就是store field的重排,因为这部分需要从文件中读出再写回,而其他部分都是内存操作,性能影响稍小一些
Merge时保证新的Segment数据有序
由于Flush时Segment已经是有序的了,所以在Merge时也就可以采用非常高效的Merge Sort的方式进行
Elasticsearch 分布式原理Node篇
ES集群构成
配置文件 conf/elasticsearch.yml
node.master: true
node.data: false
当node.master为true时,其表示这个node是一个master的候选节点,可以参与选举,在ES的文档中常被称作 master-eligible node,类似于Leader和Candidate的关系。当node.data为true时,这个节点作为一个数据节点,会存储分配在该node上的shard的数据并负责这些shard的写入、查询等。此外,任何一个集群内的node都可以执行任何请求,其会负责将请求转发给对应的node进行处理
节点发现
ES ZenDiscovery在选举时没有term的概念,不能保证每轮每个节点只投一票
Node启动后,首先要通过节点发现功能加入集群。ZenDiscovery是ES自己实现的一套用于节点发现和选主等功能的模块
# 推荐这里设置为所有的master-eligible node
discovery.zen.ping.unicast.hosts: [x.x.x.1, x.x.x.2, x.x.x.3]
Master选举
保证选举出的master被多数派(quorum)的master-eligible node认可,以此来保证只有一个master
# 设置 quorum
discovery.zen.minimum_master_nodes: 2
master选举发起的时机
当一个节点发现包括自己在内的多数派的master-eligible节点认为集群没有master时,就可以发起master选举
- 该master-eligible节点的状态为
当前没有master
- 该master-eligible节点通过ZenDiscovery模块的ping操作询问其已知的集群其他节点的信息,获取到的状态都为
当前没有master
- 包括本节点在内,当前已有超过minimum_master_nodes个节点认为集群没有master
master选举的投票对象
当clusterStateVersion越大,优先级越高。这是为了保证新Master拥有最新的clusterState(即集群的meta),避免已经commit的meta变更丢失
class Some {
// 计算出选举的投票对象
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
assert hasEnoughCandidates(candidates);
List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
sortedCandidates.sort(MasterCandidate::compare);
return sortedCandidates.get(0);
}
// 先根据节点的clusterStateVersion比较,相同时,则按照节点的Id比较(Id为节点第一次启动时随机生成)
public static int compare(MasterCandidate c1, MasterCandidate c2) {
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
if (ret == 0) {
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
}
达成选举
假设Node_A选Node_B当Master
Node_A会向Node_B发送join请求,那么此时:
(1) 如果Node_B已经成为Master,那么Node_B就会把Node_A加入到集群中,然后发布最新的cluster_state, 最新的cluster_state就会包含Node_A的信息。相当于一次正常情况的新节点加入。对于Node_A,等新的cluster_state发布到Node_A的时候,Node_A也就完成join了
(2) 如果Node_B在竞选Master,那么Node_B会把这次join当作一张选票。对于这种情况,Node_A会等待一段时间,看Node_B是否能成为真正的Master,直到超时或者有别的Master选成功。、
(3) 如果Node_B认为自己不是Master(现在不是,将来也选不上),那么Node_B会拒绝这次join。对于这种情况,Node_A会开启下一轮选举
假设Node_A选自己当Master
此时NodeA会等别的node来join,即等待别的node的选票,当收集到超过半数的选票时,认为自己成为master,然后变更cluster_state中的master node为自己,并向集群发布这一消息
脑裂保证
一个节点可能在相邻的两轮选主中选择不同的主,恰好导致这两个被选的节点都成为了主。但是这种脑裂很快会自动恢复,因为不一致发生后某个master再次发布cluster_state时就会发现无法达到多数派条件,或者是发现它的follower并不构成多数派而自动降级为candidate等
错误检测
MasterFaultDetection
Master定期检测集群内其他的Node
如果Master检测到某个Node连不上了,会执行removeNode的操作,将节点从cluster_state中移除,并发布新的cluster_state。当各个模块apply新的cluster_state时,就会执行一些恢复操作,比如选择新的primaryShard或者replica,执行数据复制等
NodesFaultDetection
集群内其他的Node定期检测当前集群的Master
如果某个Node发现Master连不上了,会清空pending在内存中还未commit的new cluster_state,然后发起rejoin,重新加入集群(如果达到选举条件则触发新master选举)。
rejoin
Master发现自己已经不满足多数派条件(>=minimumMasterNodes)了,需要主动退出master状态(退出master状态并执行rejoin)以避免脑裂的发生
electMasterService.hasEnoughMasterNodes
当有节点连不上时,会执行removeNode。在执行removeNode时判断剩余的Node是否满足多数派条件,如果不满足,则执行rejoinpublishClusterState.publish
在publish新的cluster_state时,分为send阶段和commit阶段,send阶段要求多数派必须成功,然后再进行commit。如果在send阶段没有实现多数派返回成功,那么可能是有了新的master或者是无法连接到多数派个节点等,则master需要执行rejoinotherClusterStateVersion > localClusterState.version()
在对其他节点进行定期的ping时,发现有其他节点也是master,此时会比较本节点与另一个master节点的cluster_state的version,谁的version大谁成为master,version小的执行rejoin
集群扩缩容
扩容DataNode
rebalance
https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-reroute.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/shards-allocation.html
缩容DataNode
https://www.elastic.co/guide/en/elasticsearch/reference/current/allocation-filtering.html
PUT /_cluster/settings
{
"transient" : {
"cluster.routing.allocation.exclude._ip" : "10.0.0.1"
}
}
扩容MasterNode
先增大quorum,然后扩容节点
PUT /_cluster/settings
{
"persistent" : {
"discovery.zen.minimum_master_nodes" : 3
}
}'
缩容MasterNode
缩容MasterNode与扩容跟扩容是相反的流程,我们需要先把节点缩下来,再把quorum数调下来
Elasticsearch 分布式原理Meta篇
Meta
数据
Master节点管理Meta数据并通知其他节点,来驱动各个模块工作,比如创建Shard等
Meta是用来描述数据的数据。在ES中,Index的mapping结构、配置、持久化状态等就属于meta数据,集群的一些配置信息也属于meta。这类meta数据非常重要,假如记录某个index的meta数据丢失了,那么集群就认为这个index不再存在了。ES中的meta数据只能由master进行更新
Meta
数据结构
集群中的每个节点都会在内存中维护一个当前的ClusterState,表示当前集群的各种状态
ClusterState
class ClusterState {
long version; // 当前版本号,每次更新加1
String stateUUID; // 该state对应的唯一id
RoutingTable routingTable; // 所有index的路由表
DiscoveryNodes nodes; // 当前集群节点
MetaData metaData; // 集群的meta数据,需要持久化
ClusterBlocks blocks; // 用于屏蔽某些操作
ImmutableOpenMap<String, Custom> customs; // 自定义配置
ClusterName clusterName; // 集群名
}
MetaData
class MetaData {
String clusterUUID; // 集群的唯一id
long version; // 当前版本号,每次更新加1
Settings persistentSettings; // 持久化的集群设置
ImmutableOpenMap<String, IndexMetaData> indices; // 所有Index的Meta
ImmutableOpenMap<String, IndexTemplateMetaData> templates; // 所有模版的Meta
ImmutableOpenMap<String, Custom> customs; // 自定义配置
}
IndexMetaData
class IndexMetaData {
long version; // 当前版本号,每次更新加1
int routingNumShards:; // 用于routing的shard数, 只能是该Index的numberOfShards的倍数,用于split
State state; // Index的状态, OPEN或CLOSE
Settings settings; // numbersOfShards,numbersOfRepilicas等配置
ImmutableOpenMap<String, MappingMetaData> mappings; // Index的mapping
ImmutableOpenMap<String, Custom> customs; // 自定义配置。
ImmutableOpenMap<String, AliasMetaData> aliases; // 别名
long[] primaryTerms; // primaryTerm在每次Shard切换Primary时加1,用于保序
ImmutableOpenIntMap<Set<String>> inSyncAllocationIds; // 处于InSync状态的AllocationId,用于保证数据一致性
}
Meta的存储
位于节点的data目录
nodes/$node_id/_state/
global-1.st
存储MetaData中除去IndexMetaData的部分
node-0.st
存储NodeId
nodes/$node_id/indices/$index_id/_state/
state-2.st
存储IndexMetaData
nodes/$node_id/indices/$index_id/$shard_id/_state/
state-0.st
存储ShardStateMetaData,包含是否是primary和allocationId等信息
Meta的恢复
当Master进程决定进行恢复Meta时,它会向集群中的MasterNode和DataNode请求其机器上的MetaData。对于集群的Meta,选择其中version最大的版本。对于每个Index的Meta,也选择其中最大的版本。然后将集群的Meta和每个Index的Meta再组合起来,构成当前的最新Meta
ClusterState commit的一致性
MasterService
串行处理ClusterState的变更
有缺陷的两阶段提交
把Master发布ClusterState分成两步,第一步是向所有节点send最新的ClusterState,当有超过半数的master节点返回ack时,再发送commit请求,要求节点commit接收到的ClusterState。如果没有超过半数的节点返回ack,那么认为本次发布失败,同时退出master状态,执行rejoin重新加入集群
一致性问题:如果master在commit阶段,只commit了少数几个节点就出现了网络分区,将master与这几个少数节点分在了一起,其他节点可以互相访问。此时其他节点构成多数派,会选举出新的master,由于这部分节点中没有任何节点commit了新的ClusterState,所以新的master仍会使用更新前的ClusterState,造成Meta不一致
解决方案
实现一个标准的一致性算法,比如raft
raft算法中,follower接收到日志后就会进行持久化,写到磁盘上。ES中,节点接收到ClusterState只是放到内存中的一个队列中即返回,并不持久化
借助额外的组件保证meta一致性
用Zookeeper来保证Meta的一致性
使用共享存储来保存Meta
首先保证不会出现脑裂,然后可以使用共享存储来保存Meta
Elasticsearch 分布式原理Data篇
Replication写入流程
ES中每个Index会划分为多个Shard,Shard分布在不同的Node上,以此来实现分布式的存储和查询,支撑大规模的数据集。对于每个Shard,又会有多个Shard的副本,其中一个为Primary,其余的一个或多个为Replica。数据在写入时,会先写入Primary,由Primary将数据再同步给Replica。在读取时,为了提高读取能力,Primary和Replica都会接受读请求
1. 检查Active的Shard数
wait_for_active_shards
在每次写入前,该shard至少具有的active副本数
2. 写入Primary
3. 并发的向所有Replicate发起写入请求
4. 等所有Replicate返回或者失败后,返回给Client
如果Replica写入失败,ES会执行一些重试逻辑等,但最终并不强求一定要在多少个节点写入成功
如果一个Replica写失败了,Primary会将这个信息报告给Master,然后Master会在Meta中更新这个Index的InSyncAllocations配置,将这个Replica从中移除,移除后它就不再承担读请求
PacificA算法
一种用于日志复制系统的分布式一致性算法
- 强一致性
- 单Primary向多Secondary的数据同步模式
- 使用额外的一致性组件Configuration Manager维护Configuration
- 少数派Replica可用时仍可写入
名词解释
Replica Group
一个互为副本的数据集合叫做Replica Group,每个副本是一个Replica。一个Replica Group中只有一个副本是Primary,其余为Secondary
Configuration
一个Replica Group的Configuration描述了这个Replica Group包含哪些副本,其中Primary是谁等
Configuration Version
Configuration的版本号,每次Configuration发生变更时加1
Configuration Manager
管理Configuration的全局组件,其保证Configuration数据的一致性。Configuration变更会由某个Replica发起,带着Version发送给Configuration Manager,Configuration Manager会检查Version是否正确,如果不正确则拒绝更改
Query & Update
对一个Replica Group的操作分为两种,Query和Update,Query不会改变数据,Update会更改数据
Serial Number sn
代表每个Update操作执行的顺序,每次Update操作加1,为连续的数字
Prepared List
Update操作的准备序列
Committed List
Update操作的提交序列,提交序列中的操作一定不会丢失(除非全部副本挂掉)。在同一个Replica上,Committed List一定是Prepared List的前缀
Primary Invariant
任何时候,当一个Replica认为自己是Primary时,Configuration Manager中维护的Configuration也认为其是当前的Primary。任何时候,最多只有一个Replica认为自己是这个Replica Group的Primary
如果不能满足Primary Invariant,那么Query请求就可能发送给Old Primary,读到旧的数据
Primary会定期获取一个Lease,获取之后认为某段时间内自己肯定是Primary,一旦超过这个时间还未获取到新的Lease就退出Primary状态。只要各个机器的CPU不出现较大的时钟漂移,那么就能够保证Lease机制的有效性
实现Lease机制的方式是,Primary定期向所有Secondary发送心跳来获取Lease,而不是所有节点都向某个中心化组件获取Lease。这样的好处是分散了压力,不会出现中心化组件故障而导致所有节点失去Lease的情况
Commited Invariant
SecondaryCommittedList一定是PrimaryCommittedList的前缀,PrimaryCommittedList一定是SecondaryPreparedList的前缀
Query
Query只能发送给Primary,Primary根据最新commit的数据,返回对应的值。由于算法要求满足Primary Invariant,所以Query总是能读到最新commit的数据
Update
- Primary分配一个Serial Number(简称sn)给一个UpdateRequest
- Primary将这个UpdateRequest加入自己的Prepared List,同时向所有Secondary发送Prepare请求,要求将这个UpdateRequest加入Prepared List
- 当所有Replica都完成了Prepare,即所有Replica的Prepared List中都包含了该Update请求时,Primary开始Commit这个请求,即将这个UpdateRequest放入Committed List中,同时Apply这个Update。因为同一个Replica上,Committed List永远是Prepared List的前缀,所以Primary实际上是提高Committed Point,把这个Update Request包含进来
- 返回客户端,Update操作成功
当下一次Primary向Secondary发送请求时,会带上Primary当前的Committed Point,此时Secondary才会提高自己的Committed Point
Reconfiguration Invariant
当一个新的Primary在T时刻完成Reconciliation时,那么T时刻之前任何节点(包括原Primary)的Commited List都是新Primary当前Commited List的前缀
Reconfiguration Invariant表明了已经Commit的数据在Reconfiguration过程中不会丢
1. Secondary故障
当一个Secondary故障时,Primary向Configuration Manager发起Reconfiguration,将故障节点从Replica Group中删除。一旦移除这个Replica,它就不属于这个Replica Group了,所有请求都不会再发给它
假设某个Primary和Secondary发生了网络分区,但是都可以连接Configuration Manager。这时候Primary会检测到Secondary没有响应了,Secondary也会检测到Primary没有响应。此时两者都会试图发起Reconfiguration,将对方从Replica Group中移除,这里的策略是First Win的原则,谁先到Configuration Manager中更改成功,谁就留在Replica Group里,而另外一个已经不属于Replica Group了,也就无法再更新Configuration了。由于Primary会向Secondary请求一个Lease,在Lease有效期内Secondary不会执行Reconfiguration,而Primary的探测间隔必然是小于Lease时间的
2. Primary故障
当一个Primary故障时,Secondary会收不到Primary的心跳,如果超过Lease的时间,那么Secondary就会发起Reconfiguration,将Primary剔除,这里也是First Win的原则,哪个Secondary先成功,就会变成Primary
当一个Secondary变成Primary后,需要先经过一个叫做Reconciliation的阶段才能提供服务。由于上述的Commited Invariant,所以原先的Primary的Committed List一定是新的Primary的Prepared List的前缀,那么我们将新的Primary的Prepared List中的内容与当前Replica Group中的其他节点对齐,相当于把该节点上未Commit的记录在所有节点上再Commit一次,那么就一定包含之前所有的Commit记录
3. 新加节点
新加的节点需要先成为Secondary Candidate,这时候Primary就开始向其发送Prepare请求,此时这个节点还会追之前未同步过来的记录,一旦追平,就申请成为一个Secondary,然后Primary向Configuration Manager发起配置变更,将这个节点加入Replica Group
如果一个节点曾经在Replica Group中,由于临时发生故障被移除,现在需要重新加回来。此时这个节点上的Commited List中的数据肯定是已经被Commit的了,但是Prepared List中的数据未必被Commit,所以应该将未Commit的数据移除,从Committed Point开始向Primary请求数据
SequenceNumber、Checkpoint与故障恢复
Term和SequenceNumber
每个写操作都会分配两个值,Term和SequenceNumber。Term在每次Primary变更时都会加1,类似于PacificA论文中的Configuration Version。SequenceNumber在每次操作后加1,类似于PacificA论文中的Serial Number。由于写请求总是发给Primary,所以Term和SequenceNumber会由Primary分配,在向Replica发送同步请求时,会带上这两个值
LocalCheckpoint和GlobalCheckpoint
LocalCheckpoint代表本Shard中所有小于该值的请求都已经处理完毕。GlobalCheckpoint代表所有小于该值的请求在所有的Replica上都处理完毕。GlobalCheckpoint会由Primary进行维护,每个Replica会向Primary汇报自己的LocalCheckpoint,Primary根据这些信息来提升GlobalCheckpoint
GlobalCheckpoint是一个全局的安全位置,代表其前面的请求都被所有Replica正确处理了,可以应用在节点故障恢复后的数据回补。另一方面,GlobalCheckpoint也可以用于Translog的GC,因为之前的操作记录可以不保存了
快速故障恢复
当一个Replica故障时,ES会将其移除,当故障超过一定时间,ES会分配一个新的Replica到新的Node上,此时需要全量同步数据。但是如果之前故障的Replica回来了,就可以只回补故障之后的数据,追平后加回来即可,实现快速故障恢复。实现快速故障恢复的条件有两个,一个是能够保存故障期间所有的操作以及其顺序,另一个是能够知道从哪个点开始同步数据。第一个条件可以通过保存一定时间的Translog实现,第二个条件可以通过Checkpoint实现,所以就能够实现快速的故障恢复
ES与PacificA的比较
- Meta一致性和Data一致性分开处理:PacificA中通过Configuration Manager维护Configuration的一致性,ES中通过Master维护Meta的一致性
- 维护同步中的副本集合:PacificA中维护Replica Group,ES中维护InSyncAllocationIds
- SequenceNumber:在PacificA和ES中,写操作都具有SequenceNumber,记录操作顺序
Lucene 简介
简介
Apache Lucene是一个开源的高性能、可扩展的信息检索引擎,提供了强大的数据检索能力
- Scalable, High-Performance Indexing
- Powerful, Accurate and Efficient Search Algorithms
概念
Index(索引)
类似数据库的表的概念,但是没有Scheme,相当于Document的集合
Document(文档)
类似数据库内的行或者文档数据库内的文档的概念,写入Index的Document会被分配一个唯一的ID,即Sequence Number(更多被叫做DocId)
Field(字段)
一个Document会由一个或多个Field组成,Field是Lucene中数据索引的最小定义单位。Lucene提供多种不同类型的Field,例如StringField、TextField、LongFiled或NumericDocValuesField等
Term和Term Dictionary
Lucene中索引和搜索的最小单位,一个Field会由一个或多个Term组成,Term是由Field经过Analyzer(分词)产生。Term Dictionary即Term词典,是根据条件查找Term的基本索引
Segment
一个Index会由一个或多个sub-index构成,sub-index被称为Segment。Lucene的Segment设计思想,与LSM类似但又有些不同,继承了LSM中数据写入的优点,但是在查询上只能提供近实时而非实时查询
Lucene中的数据写入会先写内存的一个Buffer(类似LSM的MemTable,但是不可读),当Buffer内数据到一定量后会被flush成一个Segment,每个Segment有自己独立的索引,可独立被查询,但数据永远不能被更改。删除时,由另外一个文件保存需要被删除的文档的DocID。Index的查询需要对多个Segment进行查询并对结果进行合并,还需要处理被删除的文档,为了对查询进行优化,Lucene会有策略对多个Segment进行合并,这点与LSM对SSTable的Merge类似
Segment在被flush或commit之前,数据保存在内存中,是不可被搜索的。原因是Lucene中数据搜索依赖构建的索引(例如倒排依赖Term Dictionary),Lucene中对数据索引的构建会在Segment flush时,而非实时构建,目的是为了构建最高效索引
Sequence Number
DocId(即Sequence Number)实际上并不在Index内唯一,而是Segment内唯一,取值从0开始递增,且不一定连续,如果有Doc被删除,那可能会存在空洞。一个文档对应的DocId可能会发生变化,主要是发生在Segment合并时
Field(字段)
class Field {
protected final IndexableFieldType type;
protected final String name;
protected Object fieldsData;
}
interface IndexableFieldType {
boolean stored(); // 是否需要保存该字段
boolean tokenized(); // 是否做分词,针对TextField
boolean storeTermVectors(); // 是否储存Term Vector,对于长度较小的字段不建议开启
boolean omitNorms(); // 允许每个文档的每个字段都存储一个normalization factor
IndexOptions indexOptions();
DocValuesType docValuesType();
int pointDimensionCount(); // 多维数据的索引,一般经纬度数据会采取这个索引方式
int pointIndexDimensionCount();
int pointNumBytes();
int vectorDimension();
}
Term Vector
保存了一个文档内所有的term的相关信息,包括Term值、出现次数(frequencies)以及位置(positions)等,是一个per-document inverted index,提供了根据docid来查找该文档内所有term信息的能力。Term Vector的用途主要有两个,一是关键词高亮,二是做文档间的相似度匹配(more-like-this)
Normalization Factor
和搜索时的相关性计算有关的一个系数。Norms存储占一个字节,每个文档的每个字段都会独立存储一份,且Norms数据会全部加载到内存。若关闭了Norms,则无法做index-time boosting以及length normalization
IndexOptions
倒排索引的5种可选参数,用于选择该字段是否需要被索引,以及索引哪些内容
enum IndexOptions {
NONE,
DOCS,
DOCS_AND_FREQS,
DOCS_AND_FREQS_AND_POSITIONS,DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS
}
DocValue
正向索引(docid到field的一个列存)
enum DocValuesType {
NONE,
NUMERIC,
BINARY,
SORTED,
SORTED_NUMERIC,
SORTED_SET
}
数据结构
FST
Lucene使用FST数据结构来存储Term Dict Index
有向无环图,在范围,前缀搜索以及压缩率上都有明显的优势
BKDTree
Lucene IndexWriter
IndexWriterConfig
IndexDeletionPolicy:Lucene开放对commit point的管理,通过对commit point的管理可以实现例如snapshot等功能。Lucene默认配置的DeletionPolicy,只会保留最新的一个commit point
Similarity:搜索的核心是相关性,Similarity是相关性算法的抽象接口,Lucene默认实现了TF-IDF和BM25算法。相关性计算在数据写入和搜索时都会发生,数据写入时的相关性计算称为Index-time boosting,计算Normalizaiton并写入索引,搜索时的相关性计算称为query-time boosting
MergePolicy:Lucene内部数据写入会产生很多Segment,查询时会对多个Segment查询并合并结果。所以Segment的数量一定程度上会影响查询的效率,所以需要对Segment进行合并,合并的过程就称为Merge,而何时触发Merge由MergePolicy决定
MergeScheduler:当MergePolicy触发Merge后,执行Merge会由MergeScheduler来管理。Merge通常是比较耗CPU和IO的过程,MergeScheduler提供了对Merge过程定制管理的能力
Codec:Codec可以说是Lucene中最核心的部分,定义了Lucene内部所有类型索引的Encoder和Decoder。Lucene在Config这一层将Codec配置化,主要目的是提供对不同版本数据的处理能力。对于Lucene用户来说,这一层的定制需求通常较少,能玩Codec的通常都是顶级玩家了
IndexerThreadPool:管理IndexWriter内部索引线程(DocumentsWriterPerThread)池,这也是Lucene内部定制资源管理的一部分
FlushPolicy:FlushPolicy决定了In-memory buffer何时被flush,默认的实现会根据RAM大小和文档个数来判断Flush的时机,FlushPolicy会在每次文档add/update/delete时调用判定
MaxBufferedDoc:Lucene提供的默认FlushPolicy的实现FlushByRamOrCountsPolicy中允许DocumentsWriterPerThread使用的最大文档数上限,超过则触发Flush
RAMBufferSizeMB:Lucene提供的默认FlushPolicy的实现FlushByRamOrCountsPolicy中允许DocumentsWriterPerThread使用的最大内存上限,超过则触发flush
RAMPerThreadHardLimitMB:除了FlushPolicy能决定Flush外,Lucene还会有一个指标强制限制DocumentsWriterPerThread占用的内存大小,当超过阈值则强制flush
Analyzer:即分词器,这个通常是定制化最多的,特别是针对不同的语言
核心操作
addDocument:比较纯粹的一个API,就是向Lucene内新增一个文档。Lucene内部没有主键索引,所有新增文档都会被认为一个新的文档,分配一个独立的docId
updateDocuments:更新文档,但是和数据库的更新不太一样。数据库的更新是查询后更新,Lucene的更新是查询后删除再新增。流程是先delete by term,后add document。但是这个流程又和直接先调用delete后调用add效果不一样,只有update能够保证在Thread内部删除和新增保证原子性,详细流程在下一章节会细说
deleteDocument:删除文档,支持两种类型删除,by term和by query。在IndexWriter内部这两种删除的流程不太一样,在下一章节再细说
flush:触发强制flush,将所有Thread的In-memory buffer flush成segment文件,这个动作可以清理内存,强制对数据做持久化
prepareCommit/commit/rollback:commit后数据才可被搜索,commit是一个二阶段操作,prepareCommit是二阶段操作的第一个阶段,也可以通过调用commit一步完成,rollback提供了回滚到last commit的操作
maybeMerge/forceMerge:maybeMerge触发一次MergePolicy的判定,而forceMerge则触发一次强制merge
数据路径
并发模型
空间隔离式的数据写入方式
- 多线程并发调用IndexWriter的写接口,在IndexWriter内部具体请求会由DocumentsWriter来执行。DocumentsWriter内部在处理请求之前,会先根据当前执行操作的Thread来分配
DocumentsWriterPerThread
(DWPT) - 每个线程在其独立的DocumentsWriterPerThread空间内部进行数据处理,包括分词、相关性计算、索引构建等
- 数据处理完毕后,在DocumentsWriter层面执行一些后续动作,例如触发FlushPolicy的判定等
只需要对以上第一步和第三步进行加锁。每个DWPT内单独包含一个In-memory buffer,这个buffer最终会flush成不同的独立的segment文件
add & update
updateDocument
- 根据Thread分配DWPT
- 在DWPT内执行delete
- 在DWPT内执行add
delete
在IndexWriter内部会有一个全局的Deletion Queue,称为Global Deletion Queue,而在每个DWPT内部,还会有一个独立的Deletion Queue,称为Pending Updates。DWPT Pending Updates会与Global Deletion Queue进行双向同步,因为文档删除是全局范围的,不应该只发生在DWPT范围内
Segment中有一个特殊的文件叫live docs,内部是一个位图的数据结构,记录了这个Segment内部哪些DocId是存活的,哪些DocId是被删除的。所以删除的过程就是构建live docs标记位图的过程,数据实际上不会被真正删除,只是在live docs里会被标记删除。live docs只影响倒排,所以在live docs里被标记删除的文档没有办法通过倒排索引检索出,但是还能够通过doc id查询到store fields。文档数据最终是会被真正物理删除,这个过程会发生在merge时
flush
flush是将DWPT内In-memory buffer里的数据持久化到文件的过程,flush会在每次新增文档后由FlushPolicy判定自动触发,也可以通过IndexWriter的flush接口手动触发。每个DWPT会flush成一个segment文件,flush完成后这个segment文件是不可被搜索的,只有在commit之后,所有commit之前flush的文件才可被搜索
commit
commit时会触发数据的一次强制flush,commit完成后再此之前flush的数据才可被搜索。commit动作会触发生成一个commit point,commit point是一个文件。Commit point会由IndexDeletionPolicy管理,lucene默认配置的策略只会保留last commit point
merge
merge是对segment文件合并的动作,合并的好处是能够提高查询的效率以及回收一些被删除的文档。Merge会在segment文件flush时触发MergePolicy来判定自动触发,也可通过IndexWriter进行一次force merge
IndexingChain
在IndexWriter内部,indexing chain上索引构建顺序是invert index、store fields、doc values和point values
Codec
,每种类型索引的Encoder和Decoder
- BlockTreeTermsWriter:倒排索引对应的Codec,其中倒排表部分使用Lucene50PostingsWriter(Block方式写入倒排链)和Lucene50SkipWriter(对Block的SkipList索引),词典部分则是使用FST(针对倒排表Block级的词典索引)
- CompressingTermVectorsWriter:对应Term vector索引的Writer,底层是压缩Block格式
- CompressingStoredFieldsWriter:对应Store fields索引的Writer,底层是压缩Block格式
- Lucene70DocValuesConsumer:对应Doc values索引的Writer
- Lucene60PointsWriter:对应Point values索引的Writer
Redis 简介
概述
Remote Dictionary Server
Redis是一种支持key-value等多种数据结构的存储系统。可用于缓存,事件发布或订阅,高速队列等场景。支持网络,提供字符串,哈希,列表,队列,集合结构直接存取,基于内存,可持久化
数据类型
https://redis.io/docs/manual/data-types/
限制:值的长度限制为512MB,元素大小限制为2^32-1
string
https://redis.io/commands/?group=string incr, decr, incrby append, getrange, setrange setbit, getbit
list
lpush, ltrim, blpop
set
spop, srandmember, sinter
zset
zrange, zrank, zrangebyscore
hash
stream
https://redis.io/docs/manual/data-types/streams/#consumer-groups xadd, xrange, xrevrange, xread, xgroup,
Streams 是一个只追加的数据结构
序号范围扫描,id定位的复杂度O(logN)
# key: mystream
# stream id: * 或手工指定
# * 自动生成<millisecondsTime>-<sequenceNumber>
# sensor-id 1234 temperature 19.8 空格分隔为列表
xadd mystream * sensor-id 1234 temperature 19.8
xadd somestream 0-* baz qux
# 最小id - 和最大id +之间的所有数据
xrange mystream - + count 10
# 时间戳范围
xrange mystream <millisecondsTime_start> <millisecondsTime_end>
# (id1 表示id刚好比id1大的数据,可用于迭代扫描
xrange mystream (id1 + count 10
# 反序
xrevrange + - count 1
监听流
# 非阻塞地从多个流中获取id大于指定id的数据
xread [count 2] streams mystream1 mystream2 id_or_time1 id_or_time2
# block 0 阻塞0ms 即永久阻塞
# $表示最后一条数据
xread block 0 streams mystream $
消费组
# 创建一个消费组,并指定消费偏移量
# $ 最后一条消息 0 最早一条消息
xgroup create mystream mygroup $
# 不存在则创建流
xgroup create newstream mygroup $ mkstream
# > 特殊ID,指到目前为止,消息从未传递给同组的其他消费者,如果指定0之类的真实ID,则表示往后没有被xack的消息
xreadgroup group mygroup consumer_name count 1 streams mystream >
流程
线程模型核心是基于非阻塞的IO多路复用机制
持久化
RDB
把当前内存数据生成快照保存到硬盘
手动触发
手动触发对应save命令,会阻塞当前Redis服务器,直到RDB过程完成为止
自动触发
自动触发对应bgsave命令,Redis进程执行fork操作创建子进程,RDB持久化过程由子进程负责,完成后自动结束。阻塞只发生在fork阶段
save <seconds> <changes>
,表示xx秒内数据修改xx次时自动触发bgsave
- 从节点执行全量复制操作,主节点自动执行bgsave生成RDB文件并发送给从节点
- 默认情况下执行shutdown命令时,如果没有开启AOF持久化功能则 自动执行bgsave
AOF
以独立日志的方式记录每次写命令, 重启时再重新执行AOF文件中的命令达到恢复数据的目的
appendonly yes
开启AOF
appendfilename appendonly.aof
AOF文件名
- 所有的写入命令会追加到aof_buf(缓冲区)中
- AOF缓冲区根据对应的策略向硬盘做同步操作(fsync 策略:无fsync、每秒fsync 、每写fsync, 默认每秒)
- 定期对AOF文件进行重写,达到压缩的目的
- 当Redis服务器重启时,加载AOF文件进行数据恢复
AOF重写
- 手动触发:
bgrewriteaof
- 自动触发
auto-aof-rewrite-min-size
:运行AOF重写时文件最小体积,默认为64MBauto-aof-rewrite-percentage
:当前AOF文件空间aof_current_size
和上一次重写后AOF文件空间aof_base_size
的最小比值
info persistence
可查看aof_current_size
和aof_base_size
,以上两个条件同时满足即触发AOF重写
Redis 缓存
缓存问题
Redis 高可用,主从+哨兵,Redis cluster,避免全盘崩溃
本地 ehcache 缓存 + hystrix 限流&降级,避免 MySQL 被打死
缓存穿透
缓存穿透是指用户请求的数据在缓存中不存在即没有命中,同时在数据库中也不存在,导致用户每次请求该数据都要去数据库中查询一遍,然后返回空
- 布隆过滤器
布隆过滤器(Bloom Filter,简称BF),是一种空间效率高的概率型数据结构,用来检测集合中是否存在特定的元素
布隆过滤器由一个长度为m比特的位数组(bit array)与k个哈希函数(hash function)组成的数据结构。位数组初始化均为0,所有的哈希函数都可以分别把输入数据尽量均匀地散列。当要向布隆过滤器中插入一个元素时,该元素经过k个哈希函数计算产生k个哈希值,以哈希值作为位数组中的下标,将所有k个对应的比特值由0置为1
当要查询一个元素时,同样将其经过哈希函数计算产生哈希值,然后检查对应的k个比特值:如果有任意一个比特为0,表明该元素一定不在集合中;如果所有比特均为1,表明该集合有可能性在集合中(哈希碰撞)
- 返回空对象
当缓存未命中,查询持久层也为空,可以将返回的空对象写到缓存中。为了避免存储过多空对象,通常会给空对象设置一个过期时间
缓存击穿
缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库
- 使用互斥锁(mutex key)
让一个线程回写缓存,其他线程等待回写缓存线程执行完,重新读缓存
- 热点数据永不过期
针对热点key不设置过期时间,或者把过期时间存在key对应的value里,如果发现要过期了,通过一个后台的异步线程进行缓存的构建
缓存雪崩
缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,请求直接落到数据库上
- 均匀过期 设置不同的过期时间,让缓存失效的时间点尽量均匀。通常可以为有效期增加随机值或者统一规划有效期
- 双层缓存策略
- 加互斥锁
- 缓存永不过期
缓存预热
缓存预热就是系统上线后,将相关的缓存数据直接加载到缓存系统
缓存降级
缓存降级是指缓存失效或缓存服务器挂掉的情况下,不去访问数据库,直接返回默认数据或访问服务的内存数据
Redis的内存淘汰机制
maxmemory 0
,在64位操作系统下最大内存为操作系统剩余内存
淘汰策略
- noeviction 默认策略,对于写请求直接返回错误,不进行淘汰
- allkeys-lru 从所有的key中使用近似LRU算法(最近最少使用)进行淘汰
- volatile-lru 从设置了过期时间的key中使用近似LRU算法进行淘汰
- allkeys-random 从所有的key中随机淘汰
- volatile-random 从设置了过期时间的key中随机淘汰
- volatile-ttl 在设置了过期时间的key中根据key的过期时间进行淘汰,越早过期的越优先被淘汰
- allkeys-lfu 从所有的key中使用近似LFU算法(最少使用频率)进行淘汰
- volatile-lfu 设置了过期时间的key中使用近似LFU算法进行淘汰
缓存更新机制
Cache aside 旁路缓存
读请求:应用首先会判断缓存是否有该数据,缓存命中直接返回数据,缓存未命中即缓存穿透到数据库,从数据库查询数据然后回写到缓存中,最后返回数据给客户端 写请求:首先更新数据库,然后从缓存中删除该数据
更新缓存的时间要远大于数据库,故如果读请求早于写请求读到数据库的旧值,则会更早地把旧值写到缓存中,紧接着,写请求从缓存删除该旧值
- 先更新数据库,再更新缓存:写请求冲突,旧的写请求会在更新缓存时覆盖新的写请求
- 先删缓存,再更新数据库(或先更新数据库,再删除缓存):旧的读请求会在新的写请求删完缓存中的旧值之后,又写入自己读取到的旧值
Read/Write through
将数据库的同步委托给缓存提供程序Cache Provider
Write behind/back
即延迟写入
,应用程序更新数据时只更新缓存,Cache Provider每隔一段时间将数据刷新到数据库中
Redis 特性
发布订阅
事件推送
事件类型
__keyspace@db:key_pattern
键空间通知(key-space notification)__keyevent@db:ops_type
键事件通知(key-event notification)
执行del mykey
之后,相当于发送以下事件
publish __keyspace@0__:mykey del
publish __keyevent@0__:del mykey
开启E键事件通知,针对x过期事件(键访问触发检查而删除该键,或者后台查找并删除)
config set notify-keyspace-events Ex
Redis的过期事件发生的时机,是真正删除的时候,而不是在理论上生存时间达到零值时生成