`
uestzengting
  • 浏览: 94371 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hbase HLog源代码阅读笔记

 
阅读更多
HLog
当客户端往RegionServer上提交了一个更新操作后,会调用HLog的append方法往WAL上写一个节点,入口方法就是append
1.append
  public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
    final long now)
  throws IOException {
    if (edits.isEmpty()) return;
    if (this.closed) {
      throw new IOException("Cannot append; log is closed");
    }
    synchronized (this.updateLock) {
      long seqNum = obtainSeqNum();
      byte [] hriKey = info.getEncodedNameAsBytes();
      this.lastSeqWritten.putIfAbsent(hriKey, seqNum);//存的是一个最老的sqeNum,这是代表,比该值等于或大于的数据都是没有持久化的
      HLogKey logKey = makeKey(hriKey, tableName, seqNum, now);
      doWrite(info, logKey, edits);//写数据,关键方法
      this.numEntries.incrementAndGet();
    }
    // Sync if catalog region, and if not then check if that table supports
    // deferred log flushing
    if (info.isMetaRegion() ||
        !info.getTableDesc().isDeferredLogFlush()) {
      this.sync();//如果是Meta表或是表不允许延迟同步,则立即同步
    }
  }
2.doWrite
  protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
  throws IOException {
    if (!this.enabled) {
      return;
    }
    if (!this.listeners.isEmpty()) {
      for (WALObserver i: this.listeners) {
        i.visitLogEntryBeforeWrite(info, logKey, logEdit);//观察者模式,以便调起其他需要通知的方法
      }
    }
    try {
      long now = System.currentTimeMillis();
      this.writer.append(new HLog.Entry(logKey, logEdit));//重要方法是这句
      long took = System.currentTimeMillis() - now;
      writeTime += took;
      writeOps++;
      if (took > 1000) {
        long len = 0;
        for(KeyValue kv : logEdit.getKeyValues()) {
          len += kv.getLength();
        }
        LOG.warn(String.format(
          "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
          Thread.currentThread().getName(), took, this.numEntries.get(),
          StringUtils.humanReadableInt(len)));//记录用时,如果超一秒则警告
      }
    } catch (IOException e) {
      LOG.fatal("Could not append. Requesting close of hlog", e);
      requestLogRoll();//如果写出错日志会被截断
      throw e;
    }
  }
SequenceFileLogWriter
3.append
  public void append(HLog.Entry entry) throws IOException {
    this.writer.append(entry.getKey(), entry.getEdit());
  }
SequenceFile.Writer
4.append
最终是调用hadoop的SequenceFile.Writer.append将数据持久化的。


当Region的memstore flush之后,会往HLog里写一条日志,标明哪个表的哪个分区在哪个sequenceId这里持久化过一遍
1.completeCacheFlush
  public void completeCacheFlush(final byte [] encodedRegionName,
      final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
  throws IOException {
    try {
      if (this.closed) {
        return;
      }
      synchronized (updateLock) {
        long now = System.currentTimeMillis();
        WALEdit edit = completeCacheFlushLogEdit();//这一句表名是Flush这种操作的日志
        HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
            System.currentTimeMillis());//这一句表明该日志记录下了表名、分区名、当前的日志SequenceId
        this.writer.append(new Entry(key, edit));//这一句写入日志文件
        writeTime += System.currentTimeMillis() - now;
        writeOps++;
        this.numEntries.incrementAndGet();
        Long seq = this.lastSeqWritten.get(encodedRegionName);
        if (seq != null && logSeqId >= seq.longValue()) {
          this.lastSeqWritten.remove(encodedRegionName);//每个Region最后更新SequenceId被删除,表明该Region没有数据需要持久化。
        }
      }
      // sync txn to file system
      this.sync();//这种flush操作很重要,一定要同步到hdfs的其他节点上

    } finally {
      this.cacheFlushLock.unlock();
    }
  }
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics