欢迎投稿

今日深度:

ElasticSearch并发操作之乐观锁的使用,elasticsearch并发

ElasticSearch并发操作之乐观锁的使用,elasticsearch并发


上篇介绍了关于ES嵌套索引的增删改,本篇就接着上篇主题继续深入聊一下,上篇的添加和更新操作,其实是不安全的,所有的数据库db系统都会存在并发问题像关系型数据库MySQL,Oracle,SQL Server默认采用的是悲观锁。

在ElasticSearch中采用的乐观锁,下面先熟悉下什么是乐观锁和悲观锁:

悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库如果提供类似于write_condition机制的其实都是提供的乐观锁。

两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。

从上面的介绍中,我们不难发现es为什么要采用乐观锁,因为es大部分场景下都是一个读多写少的系统,如果按照悲观锁的策略,会大大降低es的吞吐,当然并发问题是真实存在,下面给大家分享实际工作中遇到的并发问题。

最好的方式是在设计上就排除并发问题,比如我们的一个项目消费kafka,经过计算后的数据存入es中,如果不设计打入kafka时的策略,就可能会遇到并发插入和更新问题,sparkstreaming集成kafka时,kafka的有多少个分区,就需要给spark设置相应数目的Executors进程,比如10个kafka分区,现在有10个sparkstreaming进程在处理数据,同一个usser用户同一时刻的数据,如果被分发带不同的机器上计算完更新到es,那么就会遇到并发问题。比如是对一个数累加操作,原始是100,A进程和B进程同时读到这条数据做更新,A进程加10,B进程加20,正确的结果应该是130,但是由于并发更新,可能会导致A进程的累加操作丢失,最终的结果是120,或者B进程的累加操作丢失,那么最终的结果就是110,不管怎么更新在不考虑锁的情况下,都会导致数据有问题。那么如果我能将同一个用户的数据发送到kafka里面同一个分区内,那么就容易了,如果都在同一个分区内,一个分区内的数据处理是串行的这样就能避免并发问题。

当然如果不能避免,我们就需要通过es的乐观锁类解决并发问题。下面来看下在es中如何使用乐观锁处理并发问题,首先看下并发插入的问题,多个进程同时得到一个用户的数据,然后同时插入es,如果不加锁,后到的数据是会覆盖掉前面的数据,实际我们想要的是,如果存在并发插入,那么第二条数据应该是以更新的方式添加的,而不是覆盖。

如何实现?

在插入时,使用es提供的create(true)方法,标记同一个时刻插入的数据,只会有一条数据插入成功,插入失败的会抛出文档已经存在的异常,那么应用程序端捕捉异常在代码里控制重试插入。重试时候会判断该条数据是否已经存在,如果存在就更新。

scala代码如下:

def  insert(active:String,indexTime:Long,indexTemplate:String):Unit={
        val json = JSON.toJSON(active).toString
        val irb=client.prepareIndex(indexTemplate, activeIndexName,pid);
        irb.setCreate(true)//保证同一时刻只会有一个数据插入成功
        try{
          irb.setSource(json).execute().actionGet()
        }catch {
          case e: DocumentAlreadyExistsException=>{
            
            Thread.sleep(fail_sleep*1000)////插入失败时,等待1秒后重试
            insert(active,indexTime,indexTemplate);//重新插入
          }
          case e:Exception=>{
            log.error(插入失败,异常:"+ ExceptionUtils.getStackTrace(e))
          }
        }
	}

上面说的是插入时的并发问题解决策略,接着我们看下更新时候遇到并发问题如何处理,主要有2种思路:

(1)如果是针对某个数值做累加或者减,可以使用es服务端冲突重试机制解决,这个方式比较简单,不需要 我们在程序中处理并发逻辑,我们所需要做的就是评估同一条数据的并发程度,然后设置合理重试次数就行,在重试之后如果仍然失败就会抛出异常,然后我们针对做处理。

核心代码如下:

-       val sb_json = new StringBuffer("ctx._source.ct +=  inc");
        val params: java.util.HashMap[String, Int] = new java.util.HashMap[String, Int]
        params.put("inc", active.getCt)

        val script = new Script(sb_json.toString(), ScriptService.ScriptType.INLINE, "groovy", params)
        val up=client.prepareUpdate(indexTemplate,activeIndexName,pid)
        up.setRefresh(true)
        up.setScript(script)
        up.setRetryOnConflict(retryConunt)//设置重试次数
        up.get()

(2)此外,我们还可以通过es内部维护的version字段来自定义实现灵活控制的乐观锁。

我们知道当我们第一次插入一条数据成功时,es返回的reponse里面会给出当前这条数据的_version=1,如果我们更新这条数据前,读取这条数据当前的version=1,然后在更新时候只有携带的version=1时才能更新成功,如果更新成功version会加1,同一时刻当有两个进程都携带version=1去更新数据,最终只会有一条数据更新成功,只要更新成功version会累加=2,然后其他进程会更新失败,报版本冲突,因为最新是2,其他的都是1,所以更新失败,会抛出冲突异常:

{
   "error": {
      "root_cause": [
         {
            "type": "version_conflict_engine_exception",
            "reason": "[blog][1]: version conflict, current [2], provided [1]",
            "index": "website",
            "shard": "3"
         }
      ],
      "type": "version_conflict_engine_exception",
      "reason": "[blog][1]: version conflict, current [2], provided [1]",
      "index": "website",
      "shard": "3"
   },
   "status": 409
}

内部维护的version可以在更新和删除的api时使用

下面我们看一下使用外部version来控制乐观锁,上面的version每次更新成功的+1操作都是es内部维护的,除此之外我们还可以使用外部自定义维护的版本进行插入,删除,更新操作:

比如

PUT /website/blog/2?version=5&version_type=external
{
  "title": "My first external blog entry",
  "text":  "Starting to get the hang of this..."
}

结果:

{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "2",
  "_version": 5,
  "created":  true
}

现在我们指定version=10去更新后,返回的新响应如下:

PUT /website/blog/2?version=10&version_type=external
{
  "title": "My first external blog entry",
  "text":  "This is a piece of cake..."
}
//===================

{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "2",
  "_version": 10,
  "created":  false
}

如果再次执行上面的那个请求就会失败,因为新版本必须大于已经存在的版本号

利用这个特性,我们也可以将时间戳当做版本,传进去,能保证当前的数据只有是最新的数据才能插入更新

总结:

本篇主要了介绍了es里面乐观锁的使用,如果仅仅增量的累加或者累减操作,不关注顺序,关注最终结果,我们可以使用es服务端保证冲突重试就行,这样非常方便的就解决了并发冲突问题,如果关注增量顺序,比如索引和更新操作默认采用的最后的数据覆盖以前的数据,如果冲突了我们可以使用version字段来处理冲突问题,此外version可以使用es内部维护的version值,也可以使用我们外部应用传过来的值,并指定version去使用乐观锁进行更新。


有什么问题可以扫码关注

www.htsjk.Com true http://www.htsjk.com/Elasticsearch/36388.html NewsArticle ElasticSearch并发操作之乐观锁的使用,elasticsearch并发 上篇介绍了关于ES嵌套索引的增删改,本篇就接着上篇主题继续深入聊一下,上篇的添加和更新操作,其实是不安全的,所有的数据...
相关文章
    暂无相关文章
评论暂时关闭