elasticsearch 数据丢失的分析

大家关注分布式系统可靠性的话,也许读过Call me maybe这个博客系列。博主Kyle Kingsbury用自己开发的Jepsen测试过多种分布式系统(NoSQL、Message Queue等),其中包括我们常用的Elasticsearch(简称ES),先后针对ES 1.1.0ES 1.5.0测过两轮。得到相同的悲观结论:因为可靠性原因,ES不能作为主存储。具体来说,ES节点在录入数据时,如果进程由于软硬件原因崩溃,即使重新启动,一段时间内录入的数据也会丢失无法恢复。所以不适合要求数据100%不丢失的使用场景。

这个问题不止Kyle Kingsbury发现过,Elasticsearch Github Issues里其他用户同样咨询过这个问题。ES committer Shay Banon解释如下:

kimchy commented

The thought process we had is that the most common deployments of ES when its introduced to be used is next to a database, a replay-able upstream, or typically in a logging/metrics use case, where the default expected behavior of ES is to be semi geared towards faster insertion rate by not fsync‘ing each operation (though one can configure ES to do so).

也就是:ES确实不能作为100%可靠的主存储,我们设计时候目标是高写入速度,大家应该搭配个100%可靠的数据库(或者其他啥机制)作为主存储机制。但这段话说得很含糊,只说和fsync相关,具体fsync的每个操作是指啥?要弄清楚这个问题,需要对机制作分析。下面是我们对ES 1.7.0源码的跟踪结果:

ES添加一条记录(即index)的过程

数据恢复的两种方式

1. 分布式系统设计中的一个重要目标:系统可以从部分失效中自动恢复,而且不会严重地影响整体性能。特别是,当故障发生时,分布式系统应该可以一边进行恢复,一边降低服务质量继续操作。在容错的分布式系统中,从错误中恢复需要系统事先把它的状态保存到硬盘等存储中。它被称为快照,一般为减少空间,记录的是从上个状态点开始的增量变化。Elasticsearch中的translog(transaction log,记录ES的每一条增删改记录操作)文件存储着这样的信息。

2. 为了增强系统的可靠性和提高性能,分布式系统要对数据进行备份,因为如果一个文件系统已经实现数据多备份,那么当一个备份被破坏后,文件系统只需转换到另一个数据备份就可以继续运行下去。故障节点恢复时也可以从其他节点上的备份得到正确数据,来达到最终容错的目的。数据多备份另一个作用是防止数据破坏。ES中数据单位是shard,每个shard有配置指定的多个replica,即备份,存储在多个节点上。

也就是说:ES数据要做得能恢复,必须要么在本节点以translog存储到磁盘,要么以replica方式备份到其他节点。

备注

备份导致了一致性问题:每当一个备份更新后,该备份就变得与其他备份不同,为了保持各个备份一致,分布式系统需要以一种让人注意不到的暂时的不一致方式存储。而这种一致性处理不适当,也可能会导致数据的丢失。此中情况暂时超出文本内容,以后文章会讨论。

ES新增一条记录的代码分析

当ES通过Netty网络框架得到一条加记录请求时,并经过很多解析,最终启动在TransportShardResplicationOperationAction中的线程,启动代码,如下:

TransportShardReplicationOperationAction.java
if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
    try {
        if (internalRequest.request().operationThreaded()) {
            internalRequest.request().beforeLocalFork();
            threadPool.executor(executor).execute(new Runnable() {
                @Override
                public void run() {//===
                    try {
                        performOnPrimary(shard.id(), shard);
                    } catch (Throwable t) {
                        listener.onFailure(t);
                    }
                }
            });
        } else {
            performOnPrimary(shard.id(), shard);
        }
    } catch (Throwable t) {
        listener.onFailure(t);
    }
}

而对于我们的问题,在这个线程中找到一部分答案。如下图所示:

技术分享

  1. 1 shardOperationOnPrimary():主shard解析请求数据,得到存储相同内容的节点路由,一起添加到Lucene中。

    1. 1.1 prepareCreate(): 得到与请求数据相关的创建数据

      1. 1.1.1 create(): 用创建数据添加进Lucene的记录(Lucene称为一条document)

        1. 1.1.1.1 create(): 预备Lucene,添加document

          1. 1.1.1.1.1 addDocument():Lucene添加数据到shard的内存中

          2. 1.1.1.1.2 add(): 将添加这个操作,以及操作的数据,存储入translog的内存或者文件中(如果有两个文件current和trans新的数据信息两个都添加)

  2. 2 performReplicas(): 根据集群状态将创建数据发送给刚刚添加的这个shard的备份shard的其他节点(可能在此shard添加document过程中,此shard已经不再是主shard)

    1. 2.1 performOnReplica(): 依据节点情况将创建数据发送给刚刚添加的这个shard的一个备份shard或者自己添加(如果在刚刚添加的过程中,主shard改变成另外的shard时)

      1. 2.1.1 sendRequest(): 准备,并发发送添加的数据信息

        1. 2.1.1.1 sendRequest(): 将数据发送节点

          1. 2.1.1.1.1 write(): 利用Netty的ChannelFuture将数据异步发送走

    2. 2.2 onResponse():在performReplicas或者performOnResponse 只要确定向所有其他相关节点异步发送启动就回应客户端,由于多个函数调用表示结束就没有画出,特说明

  3. 3 sync():按照一定条件将translog中存储在内存中的操作数据写入translog文件中

  4. 4 flush():按照一定条件将Lucene新加入的数据提交到Lucene的shard文件中

    1. 4.1 newTransientTranslog():增加新的tanslog文件trans对象

    2. 4.2 commit():Lucene将新加入的数据提交到磁盘

    3. 4.3 makeTransientCurrent():将Lucene提交的数据的translog文件(current)删除,刚新增translog对象(trans)成为新的提交对象的translog

主Shard数据操作分析

数据在主Shard插入时,先得到相关数据(比如这条记录的id)并添加入Lucene的内存,在符合一定条件下存入磁盘,默认条件如下原代码:

TranslogService.java
public TranslogService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, ThreadPool threadPool, IndexShard indexShard, Translog translog) {
  super(shardId, indexSettings);
  this.threadPool = threadPool;
  this.indexSettingsService = indexSettingsService;
  this.indexShard = indexShard;
  this.translog = translog;
  //translog对象存储所有操作(增加、更新)次数(包括文件中和内存中)的默认阈值
  this.flushThresholdOperations = componentSettings.getAsInt(FLUSH_THRESHOLD_OPS_KEY, componentSettings.getAsInt("flush_threshold", Integer.MAX_VALUE));
  //translog对象存储所有数据所占空间(包括文件中和内存中)的默认阈值
  this.flushThresholdSize = componentSettings.getAsBytesSize(FLUSH_THRESHOLD_SIZE_KEY, new ByteSizeValue(512, ByteSizeUnit.MB));
  //Lucene提交数据的时间间隔的默认阈值
  this.flushThresholdPeriod = componentSettings.getAsTime(FLUSH_THRESHOLD_PERIOD_KEY, TimeValue.timeValueMinutes(30));
  this.interval = componentSettings.getAsTime(FLUSH_THRESHOLD_INTERVAL_KEY, timeValueMillis(500000));
  this.disableFlush = componentSettings.getAsBoolean(FLUSH_THRESHOLD_DISABLE_FLUSH_KEY, false);

  logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod);

  this.future = threadPool.schedule(interval, ThreadPool.Names.SAME, new TranslogBasedFlush());

  indexSettingsService.addListener(applySettings);
}

当translog满足任意阈值,或者Lucene超过flushThresholdPeriod时间没有提交,那么,ES 才会启动一线程将Lucene增加的数据存入磁盘,数据算稳定。默认情况translog也不是直接存入文件中的,如下代码

FsTranslog.java
public Location add(Operation operation) throws TranslogException {
  rwl.readLock().lock();
  boolean released = false;
  ReleasableBytesStreamOutput out = null;
  try {
    out = new ReleasableBytesStreamOutput(bigArrays);
    TranslogStreams.writeTranslogOperation(out, operation);
    ReleasableBytesReference bytes = out.bytes();
    Location location = current.add(bytes);
    if (syncOnEachOperation) {
      current.sync();
    }

    assert new BytesArray(current.read(location)).equals(bytes);

    FsTranslogFile trans = this.trans;
    if (trans != null) {
      try {
        location = trans.add(bytes);
      } catch (ClosedChannelException e) {
        // ignore
      }
    }
    Releasables.close(bytes);
    released = true;
    return location;
  } catch (Throwable e) {
    throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
  } finally {
    rwl.readLock().unlock();
    if (!released && out != null) {
      Releasables.close(out.bytes());
    }
  }
}

syncOnEachOperation = false (FsTranslog.java 中 81行),translog 会准备 64KB(FsTranslog.java 中116行)的内存,来暂存与客户输入相关数据,以减少IO的消耗。当然,并不是一定要累计到超过64KB的数据才会存储入 translog,ES 每隔 5s 有一个线程将translog 内存中的数据存入translog文件中,代码如下:

LocalIndexShardGeteway.java
class Sync implements Runnable {
  @Override
  public void run() {
    // don‘t re-schedule  if its closed..., we are done
    if (indexShard.state() == IndexShardState.CLOSED) {
      return;
    }
    if (indexShard.state() == IndexShardState.STARTED && indexShard.translog().syncNeeded()) {
      threadPool.executor(ThreadPool.Names.FLUSH).execute(new Runnable() {
        @Override
        public void run() {
          try {
            indexShard.translog().sync();//= xiancheng
          } catch (Exception e) {
            if (indexShard.state() == IndexShardState.STARTED) {
              logger.warn("failed to sync translog", e);
            }
          }
          if (indexShard.state() != IndexShardState.CLOSED) {
            flushScheduler = threadPool.schedule(syncInterval, ThreadPool.Names.SAME, Sync.this);
          }
        }
      });
    } else {
      flushScheduler = threadPool.schedule(syncInterval, ThreadPool.Names.SAME, Sync.this);
    }
  }
}

当然,我们也可以不用默认方式,在配置文件(config/elasticsearch.yml)中添加一句:index.gateway.local.sync: 0,syncOnEachOperation 会为 true,translog 也不会准备 64KB 这样的内存存储输入数据。

如上所述,translog中的数据和Lucene中的数据,一段时间内都不会直接存入文件,而是保存在内存中。此时节点宕机重启动,数据就可能会失去。通过数据复制未必都能挽救成功。

数据复制

由时序图可知,数据插入后进行才进行数据复制,那此时宕机数据是否直接丢失?不会。如果真出现这种情况,主节点就会回应客户端插入失败,客户可以选择稍后重新发起请求。 所以分析数据复制,也是有必要的,当然此时的逻辑有点乱,需要仔细的想这个过程。提示:因为插入数据时,此节点为主节点,但是经过一系列的操作后, 可能发送数据时发现自己主节点的头衔就被别人霸占的现象(比如此节点在插入时,因为网络原因没有和其他节点维持心跳,其他节选出新的主节点)。但是最终是通过调用Netty的ChannelFuture将数据传送出去,如下源代码:

NettyTransport.java
NettyHeader.writeHeader(buffer, requestId, status, version);
ChannelFuture future = targetChannel.write(buffer);
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);
addedReleaseListener = true;
transportServiceAdapter.onRequestSent(node, requestId, action, request, options);

由上面的代码,buffer是由Netty的ChannelFuture 来完成发送任务的,是个异步事件。由于数据复制是异步的,所以不会阻塞,客户端会由

listener.onResponse(response.response());

发送给客户,但是可能在客户得到回应时,复制过程还没有完成,所以在此时节点出现宕机,数据丢失。

 Tip

ChannelFuture:异步结果,这个是Netty异步事件处理的关键,当一个事件被处理时,可以直接以ChannelFuture的形式直接返回,不用在当前操作中被阻塞。可以通过 ChannelFuture得到最终的执行结果,具体的做法是在ChannelFuture添加监听器listener,当操作最终被执行完 后,listener会被触发,我们可以在listener的回调函数中预定义我们的业务代码

总结

由上可知,存储是异步由内存存入磁盘,不能保证数据不被丢失,而复制也是也不发送,节点没有得到成功传输的答复就回应了客户端,更别说在复制结点上是否能正确存储,所以更不能保证数据不被丢失。

性能还是可靠性?

通过把Translog 文件可以直接写入磁盘,能避免数据在本文中可能出现的丢失情况。但是后果就是Shay Banon所说的牺牲了高写入速度。

那么究竟直接写入磁盘会对性能有多大影响呢?我写了个脚本测试,写入10000条小的数据,速度减小了7~8倍,损失很大。

解决这个问题有如下几种办法:

1. 不算办法的办法就是忽视这个问题。大部分场景送入ES的日志并不关键,数据丢失无妨。或者ES只是作为方便文本查询的辅助工具,主数据在更可靠的存储媒介上。比如HBase一些方案就是通过Coprecessor让ES为文本建立倒排索引。这种场景下也能容忍数据丢失。

2. 乐观数据恢复机制。数据送入ES后仍然保留一段时间,监控ES宕机事件,ES重启后用查询判断最后输入数据的位置,从下一条数据开始重送。这也是我们目前的做法。

3. 重新设计ES这块的逻辑,毕竟很多NoSQL能同时做到高可靠和高写入。具体方案我们目前还在研究中。

瀚思原创技术博文,如需转载,请联系marketing_ops@hansight.com

文章来自:http://www.cnblogs.com/bmaker/p/5472523.html
© 2021 jiaocheng.bubufx.com  联系我们
ICP备案:鲁ICP备09046678号-3