提交 0b5ba119 编写于 作者: ayu_programer's avatar ayu_programer

提交分布式文件系统代码

上级
# 项目整体背景
比如说现在要执行一个命令,hadoop fs -mkdir /usr/warehouse
来创建一个目录节点,这里其实是做了两件事--
1. 在内存里的目录树中加入对应的目录节点
2. 在磁盘里写入一条edits log,记录本次元数据的修改
**而针对于hdfs client 去创建目录的话,会给hdfs NameNode发送一条rpc接口调用的请求,
来调用mkdir接口,在那个接口里面就会完成上述的两件事情**
## 分布式存储系统的创建目录功能实现
注意一点的就是内存里面的文件目录树肯定是多线程并发写的资源,所以必须要用synchronized
来保护起来
1. 创建目录树时首先是把路径截为数组
2. 遍历数组,判断是否是第一个节点""" 如果是的话直接continue
3. 接着会走递归方法
3.1 递归中首先会判断dirTree 内存目录树中是否有字节点,如果没有直接退出
3.2 然后如果有字节点的话那么就取出子节点,并获取字节点的path跟当前的splitpath比较,如果相等直接返回字节点目录
3.3 如果不相等的话,那么就递归调用--参数传入子节点和splitpath,
并判断返回结果是否为空,如果不为空也直接返回结果
4. 判断递归调用的结果是否为空,如果不为空,就将结果赋给父节点目录,并continue
5. 如果递归调用共结果为空,那么就直接基于splitpath new出来一个目录对象加入到父目录对象中
## edits_log的全局txid机制以及双缓冲机制实现
1. 首先创建一个成员变量--txidSeq:即为当前递增的txid的序号,并在记录日志方法中对txidSeq进行++操作
2. 创建一个内存双缓存
2.1 里面放入一个用于记录日志的currentBuffer,还放入一个同步日志的synchBuffer
2.2 用于记录日志的write方法
2.3 用于currentBuffer和synchBuffer交换数据的方法
2.4 用于synchBuffer刷入数据到磁盘的同步方法
3. 在logEdit()方法中将日志信息写入到buffer缓存区
4 注意在写入缓存区需要加锁---因为有可能多个线程对txidSeq进行操作
### 基于editlog实现分段加锁机制
### 基于wait与notify实现edits_log批量刷磁盘
通过分段加锁,把耗时的写磁盘或者写网络放在锁代码块之外。
需要注意的点:需要体会在哪个地方wait,又在哪个地方通过notify进行唤醒
基于editlog实现分段加锁和基于wait与notify实现edits_log批量刷磁盘整体逻辑
1. 首先还是会再次加锁 synchronized(this)
2. 然后判断是否正在刷磁盘的标志位--isSyncRunning 是否为true
如果为true,表示有线程正在刷磁盘
2.1 那么就取出当前线程的txid,并获取editLogBuffer中最大的txid,两者进行比较
2.2 如果txid小于editLogBuffer中最大的txid说明当前线程的edit log已经被线程刷入或将要刷入磁盘,然后直接return 做别的事
2.3 反之如果小于,那就再继续看是否有线程在等待刷磁盘的标志位isWaitSync是否为true,如果有也直接return
2.4 反之如果isWaitSync 为false,那么当前这个线程需要把标志位设为true
2.5 然后执行while(isSyncRunning)---wait(2000) 操作,注意这一步如果执行到wait,意味着会释放锁,然后2s后继续执行while
2.6 更改isWaitSync是否为false
3. 如果为false表示没有正在刷磁盘的,
3.1 那么需要交换currentBuffer与synchBuffer;
3.2 保存当前要同步到磁盘中去的最大的txid;
3.3 更改标志位isSyncRunning为true
4. 执行到这里线程就出了再次加锁的代码块,这个时候其他的线程就可以重新进入logEdit中的代码块,将edit log信息写入到内存缓冲
5. 然后这个线程会执行刷磁盘操作
6. 执行完刷磁盘操作后就会再次更改标志位isSyncRunning 设为false,
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hm.demo</groupId>
<artifactId>distributed-filesystem</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.hm.dfs.namenode.server;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* 负责管理文件目录树的核心组件
*/
public class FSDirectory {
public static void main(String[] args) {
String path = "/usr/warehouse/hive";
FSDirectory directory = new FSDirectory();
directory.mkdir(path);
System.out.println(directory.dirTree);
}
/**
* 内存中的文件目录树
*/
private INodeDirectory dirTree;
public FSDirectory() {
this.dirTree = new INodeDirectory("/");
}
/**
* @param path
*/
public void mkdir(String path) {
// path = /usr/warehouse/hive
// 你应该先判断一下,“/”根目录下有没有一个“usr”目录的存在
// 如果说有的话,那么再判断一下,“/usr”目录下,有没有一个“/warehouse”目录的存在
// 如果说没有,那么就得先创建一个“/warehosue”对应的目录,挂在“/usr”目录下
// 接着再对“/hive”这个目录创建一个节点挂载上去
synchronized (dirTree) {
System.out.println("创建目录树");
//1 先把目录path截开
String[] pathArr = path.split("/");
INodeDirectory parentDir = new INodeDirectory("/");
//2 for循环处理判断
//如果是首节点--‘’的时候退出
for (String splitPath : pathArr) {
if (Objects.equals(splitPath.trim(), "")) {
continue;
}
//根据要新增的节点和内存目录树进行递归处理
INodeDirectory childDir = findDirectory(dirTree, splitPath);
if (childDir != null) {
parentDir = childDir;
continue;
}
//如果directory目录结构为空--即只有一个”“
//那么就直接加入进来
INodeDirectory childNode = new INodeDirectory(splitPath);
parentDir.addChildren(childNode);
}
System.out.println(parentDir.getPath());
}
}
/**
* 递归查找节点
*
* @param dirTree
* @param splitPath
* @return
*/
private INodeDirectory findDirectory(INodeDirectory dirTree, String splitPath) {
//先是判断内存目录树下是否有字节点,如果没有直接返回
if (dirTree.getChildren().size() == 0) {
return null;
}
//如果内存树下有子节点
for (INode node : dirTree.getChildren()) {
if (node instanceof INodeDirectory) {
INodeDirectory childDir = (INodeDirectory) node;
String childPath = childDir.getPath();
//如果相等--有--就返回目录树下的目录
if (Objects.equals(childPath, splitPath)) {
return childDir;
}
//如果不相等--没有--就返回
INodeDirectory targetDir = findDirectory(childDir, splitPath);
if (targetDir != null) {
return targetDir;
}
}
}
return null;
}
/**
* 代表文件目录树中的一个节点
*/
private interface INode {
}
/**
* 代表文件目录树中的一个目录
*/
public static class INodeDirectory implements INode {
//这是一个目录
private String path;
//用来保存字节点
private List<INode> children;
public INodeDirectory(String path) {
this.path = path;
this.children = new ArrayList<INode>();
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public List<INode> getChildren() {
return children;
}
public void addChildren(INode iNode) {
this.children = children;
}
}
/**
* 代表文件目录树中的一个文件
*/
public static class INodeFile implements INode {
//文件名称
private String fileName;
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
}
}
package com.hm.dfs.namenode.server;
import java.util.LinkedList;
import java.util.List;
/**
* 负责管理edits log日志的核心组件
*/
public class FSEditlog {
/**
* 当前递增的txid序号
*/
private Long txidSeq = 0L;
private volatile Long maxTxid = 0L;
/**
* 是否有线程执行刷入磁盘操作标志位
*/
private volatile boolean isSyncRunning = false;
private volatile boolean isWaitSync = false;
private ThreadLocal<Long> threadLocal = new ThreadLocal();
DoubleBuffer doubleBuffer;
FSEditlog() {
doubleBuffer = new DoubleBuffer();
}
/**
* 记录editlog日志
*
* @param context
*/
public void logEdit(String context) {
System.out.println("记录editlog日志信息");
//注意这里需要加锁,因为有可能多个线程对txidSeq进行操作
synchronized (this) {
txidSeq++;
//往本地线程副本中放入txid值
threadLocal.set(txidSeq);
Long txid = txidSeq;
EditLog editLog = new EditLog(context, txid);
doubleBuffer.write(editLog);
}
logSync();
}
/**
* 将内存缓存中的数据刷入到磁盘中
* 注意这里其实是通过一个线程批量的将数据刷入到磁盘
*/
private void logSync() {
//重复加锁
synchronized (this) {
//判断是否有线程执行刷入磁盘
if (isSyncRunning) {
Long currTxid = threadLocal.get();
if (currTxid < maxTxid) {
return;
}
//是否有线程在排队等待同步,如果有也直接退出
if (isWaitSync) {
return;
}
isWaitSync = true;
while (isSyncRunning) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isWaitSync = false;
}
//交换两块缓存区的数据
doubleBuffer.setReadyToSynch();
// 然后可以保存一下当前要同步到磁盘中去的最大的txid
// 此时editLogBuffer中的syncBuffer这块区域,交换完以后这里可能有多条数据
maxTxid = doubleBuffer.getMaxTxid();
//更改标志位
isSyncRunning = true;
}
//可以执行刷磁盘操作
doubleBuffer.flush();
//重新加锁更改标志位
synchronized (this) {
// 同步完了磁盘之后,就会将标志位复位,再释放锁
isSyncRunning = false;
// 唤醒可能正在等待他同步完磁盘的线程
notifyAll();
}
}
/**
* 用来刷入到内存双缓存的editlog对象
*/
class EditLog {
private String context;
private Long txidSeq;
public EditLog(String context, Long txidSeq) {
this.context = context;
this.txidSeq = txidSeq;
}
}
/**
* 内存双缓存区
*/
class DoubleBuffer {
/**
* 专门用来承载线程写入的logbuffer
*/
private LinkedList<EditLog> currentBuffer = new LinkedList<>();
/**
* 专门用来做数据同步的logBuffer
*/
private LinkedList<EditLog> syncBuffer = new LinkedList<>();
private void write(EditLog editLog) {
currentBuffer.add(editLog);
}
/**
* 交换两块缓存区,为同步数据做准备
*/
public void setReadyToSynch() {
LinkedList<EditLog> tmp = currentBuffer;
currentBuffer = syncBuffer;
syncBuffer = tmp;
}
/**
* 获取内存缓存区中最大的txid值
*
* @return
*/
public Long getMaxTxid() {
return syncBuffer.getLast().txidSeq;
}
/**
* 将同步数据的logbuffer刷入到磁盘去
*/
private void flush() {
for (EditLog editLog : syncBuffer) {
System.out.println("将log数据刷入到磁盘中去");
//这里其实就应该用流处理
}
syncBuffer.clear();
}
}
}
package com.hm.dfs.namenode.server;
/**
* 负责管理元数据的核心组件
*/
public class FSNamesystem {
private FSDirectory directory;
private FSEditlog editlog;
public FSNamesystem() {
this.directory = new FSDirectory();
this.editlog = new FSEditlog();
}
/**
* 创建目录
* @param path
* @return
*/
public Boolean mkdir(String path) {
this.directory.mkdir(path);
this.editlog.logEdit("创建了一个目录:" + path);
return true;
}
}
package com.hm.dfs.namenode.server;
/**
* NameNode核心启动类
*/
public class NameNode {
private volatile boolean shouldRun;
/**
*负责管理元数据的核心组件
*
*/
private FSNamesystem fsNamesystem;
/**
* 负责对外的rpc调用请求server
*/
private NameNodeRpcServer nameNodeRpcServer;
public NameNode(){
shouldRun = true;
}
/**
* 初始化nameNode
*/
private void initialize(){
this.fsNamesystem = new FSNamesystem();
this.nameNodeRpcServer = new NameNodeRpcServer(fsNamesystem);
nameNodeRpcServer.start();
}
/**
* 执行run方法
*/
private void run(){
//如果为true就会一直等待
while (shouldRun){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
NameNode nameNode = new NameNode();
nameNode.initialize();
nameNode.run();
}
}
package com.hm.dfs.namenode.server;
/**
* NameNode的RPC服务接口
*/
public class NameNodeRpcServer {
private FSNamesystem namesystem;
/**
* 构造方法 根据namesystem构造
* @param fsNamesystem
*/
public NameNodeRpcServer(FSNamesystem fsNamesystem) {
namesystem = fsNamesystem;
}
/**
* 创建目录
*
* @param path 目录路径
* @return 是否创建成功
*/
public Boolean mkdir(String path) {
//把FSNamesystem创建目录逻辑引入进来
return this.namesystem.mkdir(path);
// return true;
}
/**
* 开始监听指定的rpc server端口号
*/
public void start() {
System.out.println("开始监听指定的rpc server的端口号,开始接收请求.......");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册