package com.hm.dfs.namenode.server; import java.util.LinkedList; import java.util.List; /** * 负责管理edits log日志的核心组件 */ public class FSEditlog { /** * 当前递增的txid序号 */ private Long txidSeq = 0L; private volatile Long maxTxid = 0L; /** * 是否有线程执行刷入磁盘操作标志位 */ private volatile boolean isSyncRunning = false; private volatile boolean isWaitSync = false; private ThreadLocal threadLocal = new ThreadLocal(); DoubleBuffer doubleBuffer; FSEditlog() { doubleBuffer = new DoubleBuffer(); } /** * 记录editlog日志 * * @param context */ public void logEdit(String context) { System.out.println("记录editlog日志信息"); //注意这里需要加锁,因为有可能多个线程对txidSeq进行操作 synchronized (this) { txidSeq++; //往本地线程副本中放入txid值 threadLocal.set(txidSeq); Long txid = txidSeq; EditLog editLog = new EditLog(context, txid); doubleBuffer.write(editLog); } logSync(); } /** * 将内存缓存中的数据刷入到磁盘中 * 注意这里其实是通过一个线程批量的将数据刷入到磁盘 */ private void logSync() { //重复加锁 synchronized (this) { //判断是否有线程执行刷入磁盘 if (isSyncRunning) { Long currTxid = threadLocal.get(); if (currTxid < maxTxid) { return; } //是否有线程在排队等待同步,如果有也直接退出 if (isWaitSync) { return; } isWaitSync = true; while (isSyncRunning) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } isWaitSync = false; } //交换两块缓存区的数据 doubleBuffer.setReadyToSynch(); // 然后可以保存一下当前要同步到磁盘中去的最大的txid // 此时editLogBuffer中的syncBuffer这块区域,交换完以后这里可能有多条数据 maxTxid = doubleBuffer.getMaxTxid(); //更改标志位 isSyncRunning = true; } //可以执行刷磁盘操作 doubleBuffer.flush(); //重新加锁更改标志位 synchronized (this) { // 同步完了磁盘之后,就会将标志位复位,再释放锁 isSyncRunning = false; // 唤醒可能正在等待他同步完磁盘的线程 notifyAll(); } } /** * 用来刷入到内存双缓存的editlog对象 */ class EditLog { private String context; private Long txidSeq; public EditLog(String context, Long txidSeq) { this.context = context; this.txidSeq = txidSeq; } } /** * 内存双缓存区 */ class DoubleBuffer { /** * 专门用来承载线程写入的logbuffer */ private LinkedList currentBuffer = new LinkedList<>(); /** * 专门用来做数据同步的logBuffer */ private LinkedList syncBuffer = new LinkedList<>(); private void write(EditLog editLog) { currentBuffer.add(editLog); } /** * 交换两块缓存区,为同步数据做准备 */ public void setReadyToSynch() { LinkedList tmp = currentBuffer; currentBuffer = syncBuffer; syncBuffer = tmp; } /** * 获取内存缓存区中最大的txid值 * * @return */ public Long getMaxTxid() { return syncBuffer.getLast().txidSeq; } /** * 将同步数据的logbuffer刷入到磁盘去 */ private void flush() { for (EditLog editLog : syncBuffer) { System.out.println("将log数据刷入到磁盘中去"); //这里其实就应该用流处理 } syncBuffer.clear(); } } }