并发冲突解决

4/10/2020 EsException

摘要

System:Centos7.X
JDK Version:1.8
Es Version:6.5.4 Es Version:7.X

# 一:概述

# 异常信息

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[_doc][1]: version conflict, current version [17] is different than the one provided [16]",
        "index_uuid": "GpWRwRXPS0qNoZ-FR0X3aA",
        "shard": "3",
        "index": "ccjjltx"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[_doc][1]: version conflict, current version [17] is different than the one provided [16]",
    "index_uuid": "GpWRwRXPS0qNoZ-FR0X3aA",
    "shard": "3",
    "index": "ccjjltx"
  },
  "status": 409
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 背景

es的并发冲突问题,会导致数据不准确。当并发操作es的线程越多,或者读取一份数据,供用户查询和操作的时间越长,在这段时间里,如果数据被其他用户修改,那么我们拿到的就是旧数据,基于旧数据去操作,就会导致错误的结果

# 并发控制方案

悲观锁、乐观锁并发控制方案简单说明

  • 悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
  • 乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。

两者之间的优缺点

悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。但是并发效率很低,同一时间只能有一条线程操作数据 悲观锁并发能力很高,不给数据加锁,大量线程并发操作,但是每次更新的时候,都要先比对版本号,然后可能需要更新数据,再次修改,再写

# ES乐观锁

# _version字段说明

当向es插入一条数据时:

PUT ccjjltx/_doc/1
{
  "name": "ccj"
}
1
2
3
4
结果
{
  "_index" : "ccjjltx",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

第一次创建document时,它的_version就是1,每次对document进行修改或删除,都会对这个_version进行版本号的加1,哪怕是删除,也会对这条数据的版本号加1 操作示例:修改数据两次,然后再删除 修改一次

PUT ccjjltx/_doc/1
{
  "name": "ccjjltx"
}
1
2
3
4
结果
{
  "_index" : "ccjjltx",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

再修改一次,用post进行修改

POST ccjjltx/_doc/1/_update
{
  "doc": {
    "name": "jltx"
  }
}
1
2
3
4
5
6
结果
{
  "_index" : "ccjjltx",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 3,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 2,
  "_primary_term" : 1
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

再删除,删除后_version也加1了

DELETE ccjjltx/_doc/1
1
结果
{
  "found": true,
  "_index": "test_index",
  "_type": "test_type",
  "_id": "1",
  "_version": 4,
  "result": "deleted",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 基于_version的乐观锁并发控制

** 每次更新数据时都带上_version参数,_version参数的值必须和更新前查询出来的_version值一致时,才能更新成功(即:先查询当前数据的_version的值,假设为now_1,然后更新时,带上参数_version=now_1去更新);如果_version的版本号不一致的话,此次的更新失败 ** 新增一条数据

PUT ccjjltx/_doc/2
{
  "name": "ccj"
}
1
2
3
4
结果
{
  "_index" : "ccjjltx",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

基于上面新插入的数据进行更新,即_version=1,此时更新成功

PUT ccjjltx/_doc/2?version=1
{
  "name": "ccjjltx"
}
1
2
3
4
结果
{
  "_index" : "ccjjltx",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

再基于_version=1去更新,更新失败(原因是版本不一致)

PUT ccjjltx/_doc/2?version=1
{
  "name": "ccjjltx"
}
1
2
3
4
结果
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[_doc][2]: version conflict, current version [2] is different than the one provided [1]",
        "index_uuid": "GpWRwRXPS0qNoZ-FR0X3aA",
        "shard": "2",
        "index": "ccjjltx"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[_doc][2]: version conflict, current version [2] is different than the one provided [1]",
    "index_uuid": "GpWRwRXPS0qNoZ-FR0X3aA",
    "shard": "2",
    "index": "ccjjltx"
  },
  "status": 409
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 基于external version的乐观锁并发控制

es提供了一个新特性,就是说,你可以不用基于它提供的内部_version版本号进行并发控制,可以基于自己维护的一个版本号来进行并发控制。

参数写法

?version=1&version_type=external
1

?version=1 和 ?version=1&version_type=external的区别

  • _version,只有当你提供的version与es中的_version一模一样的时候,才可以进行修改,只要不一样,就报错;
  • 当version_type=external的时候,只有当你提供的version比es中的_version大的时候,才能完成修改
GET ccjjltx/_doc/2
1
结果
{
  "_index" : "ccjjltx",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 2,
  "found" : true,
  "_source" : {
    "name" : "ccjjltx"
  }
}
1
2
3
4
5
6
7
8
9
10

如果external version更此条数据,_version的值应该大于2

PUT ccjjltx/_doc/2?version=5&version_type=external
{
  "name": "ccj"
}
1
2
3
4
结果
{
  "_index" : "ccjjltx",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 5,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 2,
  "_primary_term" : 1
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# _seq_no 与 _primary_term

ES是分布式的。当文档发生创建、更新、删除等写操作时,新版本的文档必须复制到集群中的其他节点。Elasticsearch 也是异步和并发的,这意味着这些复制请求被并行发送,并且到达目的地时也许 顺序是乱的 。ES使用 _seq_no(sequence number) 和 _primary_term(primary term) 来确保文档的旧版本不会覆盖新的版本。sequence number在每次操作后递增,因此新的操作的_seq_no一定高于老的操作的_seq_n 创建对应文档

PUT ccc/_doc/1
{
  "name": "ccj",
  "age": 10
}
1
2
3
4
5
结果
{
  "_index" : "ccc",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET /demo/_doc/1
## response
{
  "_index" : "demo",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "name" : "ddd",
    "age" : 10
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

返回值中,我们可以看到sequence number 对应的 _seq_no 和primary term对应的 _primary_term字段。在一批update操作后,_seq_no_primary_term 的相关value变到了以下状态。

{
  "_index" : "demo",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 20,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 19,
  "_primary_term" : 2
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

此时可以根据 _seq_no_primary_term 乐观更新。sequence number 和 the primary term 独一无二地标识一次变动。

PUT demo/_doc/1?if_seq_no=19&if_primary_term=2
{
  "name" : "dsadas",
  "age" : 20
}
1
2
3
4
5

response

{
  "_index" : "demo",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 21,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 20,
  "_primary_term" : 2
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

如果并发了,此时可能有一个 _seq_no 仍为19的记录请求进行update,将会报错。

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[_doc][1]: version conflict, required seqNo [19], primary term [2]. current document has seqNo [20] and primary term [2]",
        "index_uuid": "aTLF5rchS7C1jbRTyMfdgQ",
        "shard": "3",
        "index": "demo"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[_doc][1]: version conflict, required seqNo [19], primary term [2]. current document has seqNo [20] and primary term [2]",
    "index_uuid": "aTLF5rchS7C1jbRTyMfdgQ",
    "shard": "3",
    "index": "demo"
  },
  "status": 409
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

在es中,如何能够解决这种并发冲突的问题?

  • 通过_version版本号的方式进行乐观锁并发控制

在es内部第次一创建document的时候,它的_version默认会是1,之后进行的删除和修改的操作_version都会增加1。可以看到删除一个document之后,再进行同一个id的document添加操作,版本号是加1而不是初始化为1,从而可以说明document并不是正真地被物理删除,它的一些版本号信息一样会存在,而是会在某个时刻一起被清除。

在es后台,有很多类似于replica同步的请求,这些请求都是异步多线程的,对于多个修改请求是乱序的,因此会使用_version乐观锁来控制这种并发的请求处理。当后续的修改请求先到达,对应修改成功之后_version会加1,然后检测到之前的修改到达会直接丢弃掉该请求;而当后续的修改请求按照正常顺序到达则会正常修改然后_version在前一次修改后的基础上加1(此时_version可能就是3,会基于之前修改后的状态)。

es提供了一个外部版本号的乐观控制方案来替代内部的_version。例如:

?version=1&version_type=external
1

和内在的_version的区别在于。对于内在_version=1,只有在后续请求满足?_version=1的时候才能够更新成功;对于外部_version=1,只有在后续请求满足?_version>1才能够修改成功。


  • 通过悲观锁的方式进行乐观锁并发控制

1.全局锁,通过doc来进行对整个index上锁

一个线程进行操作之前创建一个锁,例如:

PUT /lockindex/locktype/global/_create
{}
1
2

同时如果有另一个线程要进行相关更新操作,那么同样执行上述代码是会报错。在上述线程执行完DELETE对应doc之后,该线程就可以重新获取到doc的锁从而执行自己的一些列操作。

这种方式,操作很简单,但是锁住了整个index,导致整个系统的并发能力很低。

2.document锁,粒度更细的锁 需要通过脚本来实现:

POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
  "params": {
    "process_id": 123
  }
}
1
2
3
4
5
6
7
8

process_id,很重要,会在lock中,设置对对应的doc加锁的进程的id,这样其他进程过来的时候,才知道,这条数据已经被别人给锁了 assert false,不是当前进程加锁的话,则抛出异常 ctx.op=’noop’,不做任何修改 params,里面有个process_id,是你的要执行增删改操作的进程的唯一id

对于同一个process_id的进程是都可以来修改doc,但是用不同的process_id去修改已经上锁的其他process_id是会assert false抛错。

3.共享锁与排它锁 共享锁:数据是共享的,多个线程可以获取同一个数据的共享锁,然后对这个数据执行读操作 排它锁:只能有一个线程获取排它锁,然后执行更新操作

共享锁与排他锁是互斥的特性,如果有一个线程想要去修改一个数据,也就是获取一个排它锁,此时需要等待其他所有的共享锁先释放掉才能够进行操作,反之亦然。

首先添加共享锁,其他线程也可以来读取数据:

judge-lock-2.groovy: if (ctx._source.lock_type == 'exclusive') { assert false }; ctx._source.lock_count++

POST /fs/lock/1/_update
{
  "upsert": {
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
    "lang": "groovy",
    "file": "judge-lock-2"
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

如果其他线程也需要获取共享锁,那么执行上述同样的代码即可,最终只是lock_count加1了:

GET /fs/lock/1

{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 3,
  "found": true,
  "_source": {
    "lock_type": "shared",
    "lock_count": 3
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

当添加排他锁的时候:

PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
1
2

则会报错

对共享锁进行解锁:

POST /fs/lock/1/_update
{
  "script": {
    "lang": "groovy",
    "file": "unlock-shared"
  }
}
1
2
3
4
5
6
7

添加过多少个共享锁,对应的执行解锁操作相应次数即可完全解锁。每次解锁lock_count对应减1,当为0的时候就将/fs/lock/1删除

对应的解除排它锁:

DELETE /fs/lock/1
1

# 官方文档

# 参考链接

最后更新: 5/7/2020, 1:39:11 PM