提交 a5028afd 编写于 作者: J jingxiaolu

async-libfuse: import async-libfuse

Import async_fuse of github.com/datenlord/datenlord as initial
version of async-libfuse
Signed-off-by: Njingxiaolu <lujingxiao@huawei.com>
上级 5fc76f68
[package]
name = "async_fuse"
version = "0.1.0"
authors = ["Pu Wang <nicolas.weeks@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.31"
env_logger = "0.6.0"
futures = "0.3.5"
futures-core = "0.3.5"
futures-io = "0.3.5"
futures-util = "0.3.5"
lazy_static = "1.4.0"
libc = "0.2"
log = "0.4.6"
nix = "0.17.0"
pin-project-lite ="0.1.7"
smol = "0.1.11"
[features]
abi-7-9 = []
abi-7-10 = ["abi-7-9"]
abi-7-11 = ["abi-7-10"]
abi-7-12 = ["abi-7-11"]
abi-7-13 = ["abi-7-12"]
abi-7-14 = ["abi-7-13"]
abi-7-15 = ["abi-7-14"]
abi-7-16 = ["abi-7-15"]
abi-7-17 = ["abi-7-16"]
abi-7-18 = ["abi-7-17"]
abi-7-19 = ["abi-7-18"]
木兰宽松许可证, 第2版
木兰宽松许可证, 第2版
2020年1月 http://license.coscl.org.cn/MulanPSL2
您对“软件”的复制、使用、修改及分发受木兰宽松许可证,第2版(“本许可证”)的如下条款的约束:
0. 定义
“软件”是指由“贡献”构成的许可在“本许可证”下的程序和相关文档的集合。
“贡献”是指由任一“贡献者”许可在“本许可证”下的受版权法保护的作品。
“贡献者”是指将受版权法保护的作品许可在“本许可证”下的自然人或“法人实体”。
“法人实体”是指提交贡献的机构及其“关联实体”。
“关联实体”是指,对“本许可证”下的行为方而言,控制、受控制或与其共同受控制的机构,此处的控制是指有受控方或共同受控方至少50%直接或间接的投票权、资金或其他有价证券。
1. 授予版权许可
每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的版权许可,您可以复制、使用、修改、分发其“贡献”,不论修改与否。
2. 授予专利许可
每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的(根据本条规定撤销除外)专利许可,供您制造、委托制造、使用、许诺销售、销售、进口其“贡献”或以其他方式转移其“贡献”。前述专利许可仅限于“贡献者”现在或将来拥有或控制的其“贡献”本身或其“贡献”与许可“贡献”时的“软件”结合而将必然会侵犯的专利权利要求,不包括对“贡献”的修改或包含“贡献”的其他结合。如果您或您的“关联实体”直接或间接地,就“软件”或其中的“贡献”对任何人发起专利侵权诉讼(包括反诉或交叉诉讼)或其他专利维权行动,指控其侵犯专利权,则“本许可证”授予您对“软件”的专利许可自您提起诉讼或发起维权行动之日终止。
3. 无商标许可
“本许可证”不提供对“贡献者”的商品名称、商标、服务标志或产品名称的商标许可,但您为满足第4条规定的声明义务而必须使用除外。
4. 分发限制
您可以在任何媒介中将“软件”以源程序形式或可执行形式重新分发,不论修改与否,但您必须向接收者提供“本许可证”的副本,并保留“软件”中的版权、商标、专利及免责声明。
5. 免责声明与责任限制
“软件”及其中的“贡献”在提供时不带任何明示或默示的担保。在任何情况下,“贡献者”或版权所有者不对任何人因使用“软件”或其中的“贡献”而引发的任何直接或间接损失承担责任,不论因何种原因导致或者基于何种法律理论,即使其曾被建议有此种损失的可能性。
6. 语言
“本许可证”以中英文双语表述,中英文版本具有同等法律效力。如果中英文版本存在任何冲突不一致,以中文版为准。
条款结束
如何将木兰宽松许可证,第2版,应用到您的软件
如果您希望将木兰宽松许可证,第2版,应用到您的新软件,为了方便接收者查阅,建议您完成如下三步:
1, 请您补充如下声明中的空白,包括软件名、软件的首次发表年份以及您作为版权人的名字;
2, 请您在软件包的一级目录下创建以“LICENSE”为名的文件,将整个许可证文本放入该文件中;
3, 请将如下声明文本放入每个源文件的头部注释中。
Copyright (c) [Year] [name of copyright holder]
[Software Name] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
Mulan Permissive Software License,Version 2
Mulan Permissive Software License,Version 2 (Mulan PSL v2)
January 2020 http://license.coscl.org.cn/MulanPSL2
Your reproduction, use, modification and distribution of the Software shall be subject to Mulan PSL v2 (this License) with the following terms and conditions:
0. Definition
Software means the program and related documents which are licensed under this License and comprise all Contribution(s).
Contribution means the copyrightable work licensed by a particular Contributor under this License.
Contributor means the Individual or Legal Entity who licenses its copyrightable work under this License.
Legal Entity means the entity making a Contribution and all its Affiliates.
Affiliates means entities that control, are controlled by, or are under common control with the acting entity under this License, ‘control’ means direct or indirect ownership of at least fifty percent (50%) of the voting power, capital or other securities of controlled or commonly controlled entity.
1. Grant of Copyright License
Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable copyright license to reproduce, use, modify, or distribute its Contribution, with modification or not.
2. Grant of Patent License
Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable (except for revocation under this Section) patent license to make, have made, use, offer for sale, sell, import or otherwise transfer its Contribution, where such patent license is only limited to the patent claims owned or controlled by such Contributor now or in future which will be necessarily infringed by its Contribution alone, or by combination of the Contribution with the Software to which the Contribution was contributed. The patent license shall not apply to any modification of the Contribution, and any other combination which includes the Contribution. If you or your Affiliates directly or indirectly institute patent litigation (including a cross claim or counterclaim in a litigation) or other patent enforcement activities against any individual or entity by alleging that the Software or any Contribution in it infringes patents, then any patent license granted to you under this License for the Software shall terminate as of the date such litigation or activity is filed or taken.
3. No Trademark License
No trademark license is granted to use the trade names, trademarks, service marks, or product names of Contributor, except as required to fulfill notice requirements in Section 4.
4. Distribution Restriction
You may distribute the Software in any medium with or without modification, whether in source or executable forms, provided that you provide recipients with a copy of this License and retain copyright, patent, trademark and disclaimer statements in the Software.
5. Disclaimer of Warranty and Limitation of Liability
THE SOFTWARE AND CONTRIBUTION IN IT ARE PROVIDED WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED. IN NO EVENT SHALL ANY CONTRIBUTOR OR COPYRIGHT HOLDER BE LIABLE TO YOU FOR ANY DAMAGES, INCLUDING, BUT NOT LIMITED TO ANY DIRECT, OR INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES ARISING FROM YOUR USE OR INABILITY TO USE THE SOFTWARE OR THE CONTRIBUTION IN IT, NO MATTER HOW IT’S CAUSED OR BASED ON WHICH LEGAL THEORY, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
6. Language
THIS LICENSE IS WRITTEN IN BOTH CHINESE AND ENGLISH, AND THE CHINESE VERSION AND ENGLISH VERSION SHALL HAVE THE SAME LEGAL EFFECT. IN THE CASE OF DIVERGENCE BETWEEN THE CHINESE AND ENGLISH VERSIONS, THE CHINESE VERSION SHALL PREVAIL.
END OF THE TERMS AND CONDITIONS
How to Apply the Mulan Permissive Software License,Version 2 (Mulan PSL v2) to Your Software
To apply the Mulan PSL v2 to your work, for easy identification by recipients, you are suggested to complete following three steps:
i Fill in the blanks in following statement, including insert your software name, the year of the first publication of your software, and your name identified as the copyright owner;
ii Create a file named “LICENSE” which contains the whole context of this License in the first directory of your software package;
iii Attach the statement to the appropriate annotated syntax at the beginning of each source file.
Copyright (c) [Year] [name of copyright holder]
[Software Name] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
use anyhow::{anyhow, Context, Result};
use nix::{
fcntl::{self, FcntlArg, FdFlag, OFlag},
ioctl_read,
sys::stat::Mode,
unistd::close,
};
use std::os::unix::io::RawFd;
use super::session::Session;
#[derive(Debug)]
pub(crate) struct Channel {
// channel fd cloned from session fd
chan_fd: RawFd,
}
impl Channel {
#[allow(dead_code)]
pub async fn new(session: &Session) -> Result<Channel> {
let devname = "/dev/fuse";
let clonefd = fcntl::open(devname, OFlag::O_RDWR | OFlag::O_CLOEXEC, Mode::empty());
let clonefd = match clonefd {
Err(err) => {
return Err(anyhow!("fuse: failed to open {:?}: {:?}", devname, err));
}
Ok(fd) => fd,
};
if let Err(err) = fcntl::fcntl(clonefd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)) {
return Err(anyhow::anyhow!(
"fuse: failed to set clonefd to FD_CLOEXEC: {:?}",
err
));
}
ioctl_read!(clone, 299, 0, i32);
let mut masterfd = session.fd();
let res = unsafe { clone(clonefd, &mut masterfd) };
if let Err(err) = res {
close(clonefd).context("fuse: failed to close clone device")?;
return Err(anyhow::anyhow!(
"fuse: failed to clone device fd: {:?}\n",
err
));
}
Ok(Channel { chan_fd: clonefd })
}
#[allow(dead_code)]
pub fn fd(&self) -> RawFd {
self.chan_fd
}
}
use nix::sys::stat::SFlag;
use std::ffi::{CStr, OsStr, OsString};
use std::os::unix::ffi::OsStrExt;
use std::os::unix::io::RawFd;
use std::ptr;
use super::super::protocol::INum;
#[cfg(target_os = "linux")]
use libc::{dirent64 as dirent, readdir64_r as readdir_r};
#[cfg(not(target_os = "linux"))]
use libc::{dirent, readdir_r};
#[derive(Debug)]
pub(crate) struct Dir(ptr::NonNull<libc::DIR>);
impl Dir {
/// Converts from a file descriptor, closing it on success or failure.
pub fn from_fd(fd: RawFd) -> nix::Result<Self> {
let d = unsafe { libc::fdopendir(fd) };
if d.is_null() {
let e = nix::Error::last();
unsafe { libc::close(fd) };
return Err(e);
};
// Always guaranteed to be non-null by the previous check
Ok(Dir(ptr::NonNull::new(d).unwrap()))
}
}
// `Dir` is safe to pass from one thread to another, as it's not reference-counted.
unsafe impl Send for Dir {}
#[derive(Debug)]
pub(crate) struct DirEntry {
ino: INum,
entry_type: SFlag,
name: OsString,
}
impl DirEntry {
pub fn new(ino: INum, name: OsString, entry_type: SFlag) -> Self {
Self {
ino,
name,
entry_type,
}
}
/// Returns the inode number (`d_ino`) of the underlying `dirent`.
pub fn ino(&self) -> u64 {
self.ino
}
/// Returns the bare file name of this directory entry without any other leading path component.
pub fn entry_name(&self) -> &OsStr {
self.name.as_os_str()
}
/// Returns the type of this directory entry, if known.
///
/// See platform `readdir(3)` or `dirent(5)` manpage for when the file type is known;
/// notably, some Linux filesystems don't implement this. The caller should use `stat` or
/// `fstat` if this returns `None`.
pub fn entry_type(&self) -> SFlag {
self.entry_type
}
fn from_dirent(de: dirent) -> Self {
let ino = de.d_ino as INum;
let name = unsafe { OsStr::from_bytes(CStr::from_ptr(de.d_name.as_ptr()).to_bytes()) };
let entry_type = match de.d_type {
libc::DT_FIFO => SFlag::S_IFIFO,
libc::DT_CHR => SFlag::S_IFCHR,
libc::DT_BLK => SFlag::S_IFBLK,
libc::DT_DIR => SFlag::S_IFDIR,
libc::DT_REG => SFlag::S_IFREG,
libc::DT_LNK => SFlag::S_IFLNK,
libc::DT_SOCK => SFlag::S_IFSOCK,
/* libc::DT_UNKNOWN | */ _ => panic!("failed to recognize file type"),
};
Self {
ino,
name: name.into(),
entry_type,
}
}
}
impl Iterator for Dir {
type Item = nix::Result<DirEntry>;
fn next(&mut self) -> Option<Self::Item> {
unsafe {
// Note: POSIX specifies that portable applications should dynamically allocate a
// buffer with room for a `d_name` field of size `pathconf(..., _PC_NAME_MAX)` plus 1
// for the NUL byte. It doesn't look like the std library does this; it just uses
// fixed-sized buffers (and libc's dirent seems to be sized so this is appropriate).
// Probably fine here too then.
let mut ent = std::mem::MaybeUninit::<dirent>::uninit();
let mut result = ptr::null_mut();
if let Err(e) =
nix::errno::Errno::result(readdir_r(self.0.as_ptr(), ent.as_mut_ptr(), &mut result))
{
return Some(Err(e));
}
if result.is_null() {
return None;
}
assert_eq!(result, ent.as_mut_ptr());
let dirent = ent.assume_init();
Some(Ok(DirEntry::from_dirent(dirent)))
}
}
}
#[cfg(test)]
mod test {
use futures::stream::StreamExt;
use nix::fcntl::{self, OFlag};
use nix::sys::stat::Mode;
use smol::blocking;
use super::Dir;
#[test]
fn test_dir() -> nix::Result<()> {
smol::run(async {
let oflags = OFlag::O_RDONLY | OFlag::O_DIRECTORY;
let fd = blocking!(fcntl::open(".", oflags, Mode::empty()))?;
let dir = Dir::from_fd(fd)?;
let mut dir = smol::iter(dir);
while let Some(entry) = dir.next().await {
let entry = entry?;
println!(
"read file name={:?}, ino={}, type:={:?}",
entry.entry_name(),
entry.ino(),
entry.entry_type()
);
}
Ok(())
})
}
}
此差异已折叠。
此差异已折叠。
use anyhow::{self, Context};
use log::debug;
use nix::fcntl::{self, OFlag};
use nix::sys::stat::{self, FileStat, Mode, SFlag};
use smol::blocking;
use std::ffi::OsString;
use std::os::unix::io::RawFd;
use std::path::Path;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use super::super::protocol::{FuseAttr, INum};
#[derive(Clone, Copy, Debug)]
pub struct FileAttr {
/// Inode number
pub ino: INum,
/// Size in bytes
pub size: u64,
/// Size in blocks
pub blocks: u64,
/// Time of last access
pub atime: SystemTime,
/// Time of last modification
pub mtime: SystemTime,
/// Time of last change
pub ctime: SystemTime,
/// Time of creation (macOS only)
pub crtime: SystemTime,
/// Kind of file (directory, file, pipe, etc)
pub kind: SFlag,
/// Permissions
pub perm: u16,
/// Number of hard links
pub nlink: u32,
/// User id
pub uid: u32,
/// Group id
pub gid: u32,
/// Rdev
pub rdev: u32,
/// Flags (macOS only, see chflags(2))
pub flags: u32,
}
pub fn parse_oflag(flags: u32) -> OFlag {
debug_assert!(
flags < std::i32::MAX as u32,
"helper_parse_oflag() found flags={} overflow, larger than u16::MAX",
flags,
);
let oflags = OFlag::from_bits_truncate(flags as i32);
debug!("helper_parse_oflag() read file flags={:?}", oflags);
oflags
}
pub fn parse_mode(mode: u32) -> Mode {
debug_assert!(
mode < std::u16::MAX as u32,
"helper_parse_mode() found mode={} overflow, larger than u16::MAX",
mode,
);
#[cfg(target_os = "linux")]
let fmode = Mode::from_bits_truncate(mode);
#[cfg(target_os = "macos")]
let fmode = Mode::from_bits_truncate(mode as u16);
debug!("helper_parse_mode() read file mode={:?}", fmode);
fmode
}
pub fn parse_mode_bits(mode: u32) -> u16 {
#[cfg(target_os = "linux")]
let bits = parse_mode(mode).bits() as u16;
#[cfg(target_os = "macos")]
let bits = parse_mode(mode).bits();
bits
}
pub fn parse_sflag(flags: u32) -> SFlag {
debug_assert!(
flags < std::u16::MAX as u32,
"parse_sflag() found flags={} overflow, larger than u16::MAX",
flags,
);
#[cfg(target_os = "linux")]
let sflag = SFlag::from_bits_truncate(flags);
#[cfg(target_os = "macos")]
let sflag = SFlag::from_bits_truncate(flags as u16);
debug!("convert_sflag() read file type={:?}", sflag);
sflag
}
pub async fn open_dir(path: impl AsRef<Path>) -> nix::Result<RawFd> {
let oflags = OFlag::O_RDONLY | OFlag::O_DIRECTORY;
let path = path.as_ref().to_path_buf();
let dfd = blocking!(fcntl::open(path.as_os_str(), oflags, Mode::empty()))?;
Ok(dfd)
}
pub async fn open_dir_at(dfd: RawFd, child_name: OsString) -> nix::Result<RawFd> {
let oflags = OFlag::O_RDONLY | OFlag::O_DIRECTORY;
let dir_fd = blocking!(fcntl::openat(
dfd,
child_name.as_os_str(),
oflags,
Mode::empty()
))?;
Ok(dir_fd)
}
pub async fn load_attr(fd: RawFd) -> nix::Result<FileAttr> {
let st = blocking!(stat::fstat(fd))?;
#[cfg(target_os = "macos")]
fn build_crtime(st: &FileStat) -> Option<SystemTime> {
UNIX_EPOCH.checked_add(Duration::new(
st.st_birthtime as u64,
st.st_birthtime_nsec as u32,
))
}
#[cfg(target_os = "linux")]
fn build_crtime(_st: &FileStat) -> Option<SystemTime> {
None
}
let atime = UNIX_EPOCH.checked_add(Duration::new(st.st_atime as u64, st.st_atime_nsec as u32));
let mtime = UNIX_EPOCH.checked_add(Duration::new(st.st_mtime as u64, st.st_mtime_nsec as u32));
let ctime = UNIX_EPOCH.checked_add(Duration::new(st.st_ctime as u64, st.st_ctime_nsec as u32));
let crtime = build_crtime(&st);
let perm = parse_mode_bits(st.st_mode as u32);
debug!("load_attr() got file permission={}", perm);
let kind = parse_sflag(st.st_mode as u32);
let nt = SystemTime::now();
let attr = FileAttr {
ino: st.st_ino,
size: st.st_size as u64,
blocks: st.st_blocks as u64,
atime: atime.unwrap_or(nt),
mtime: mtime.unwrap_or(nt),
ctime: ctime.unwrap_or(nt),
crtime: crtime.unwrap_or(nt),
kind,
perm,
nlink: st.st_nlink as u32,
uid: st.st_uid,
gid: st.st_gid,
rdev: st.st_rdev as u32,
#[cfg(target_os = "linux")]
flags: 0,
#[cfg(target_os = "macos")]
flags: st.st_flags,
};
Ok(attr)
}
pub fn time_from_system_time(system_time: &SystemTime) -> anyhow::Result<(u64, u32)> {
let duration = system_time.duration_since(UNIX_EPOCH).context(format!(
"failed to convert SystemTime={:?} to Duration",
system_time
))?;
Ok((duration.as_secs(), duration.subsec_nanos()))
}
pub fn mode_from_kind_and_perm(kind: SFlag, perm: u16) -> u32 {
(match kind {
SFlag::S_IFIFO => libc::S_IFIFO,
SFlag::S_IFCHR => libc::S_IFCHR,
SFlag::S_IFBLK => libc::S_IFBLK,
SFlag::S_IFDIR => libc::S_IFDIR,
SFlag::S_IFREG => libc::S_IFREG,
SFlag::S_IFLNK => libc::S_IFLNK,
SFlag::S_IFSOCK => libc::S_IFSOCK,
_ => panic!("unknown SFlag type={:?}", kind),
}) as u32
| perm as u32
}
pub fn convert_to_fuse_attr(attr: FileAttr) -> anyhow::Result<FuseAttr> {
let (atime_secs, atime_nanos) = time_from_system_time(&attr.atime)?;
let (mtime_secs, mtime_nanos) = time_from_system_time(&attr.mtime)?;
let (ctime_secs, ctime_nanos) = time_from_system_time(&attr.ctime)?;
#[cfg(target_os = "macos")]
let (crtime_secs, crtime_nanos) = time_from_system_time(&attr.crtime)?;
Ok(FuseAttr {
ino: attr.ino,
size: attr.size,
blocks: attr.blocks,
atime: atime_secs,
mtime: mtime_secs,
ctime: ctime_secs,
#[cfg(target_os = "macos")]
crtime: crtime_secs,
atimensec: atime_nanos,
mtimensec: mtime_nanos,
ctimensec: ctime_nanos,
#[cfg(target_os = "macos")]
crtimensec: crtime_nanos,
mode: mode_from_kind_and_perm(attr.kind, attr.perm),
nlink: attr.nlink,
uid: attr.uid,
gid: attr.gid,
rdev: attr.rdev,
#[cfg(target_os = "macos")]
flags: attr.flags,
#[cfg(feature = "abi-7-9")]
blksize: 0, // TODO: find a proper way to set block size
#[cfg(feature = "abi-7-9")]
padding: 0,
})
}
use futures::io::AsyncRead;
use futures::stream::Stream;
use futures::task::{Context, Poll};
use log::debug;
use pin_project_lite::pin_project;
use std::iter;
use std::pin::Pin;
pin_project! {
#[derive(Debug)]
pub(crate) struct FuseBufReadStream<R> {
#[pin]
reader: R,
buf: Vec<u8>,
cap: usize,
}
}
impl<R: AsyncRead> FuseBufReadStream<R> {
#[allow(dead_code)]
pub fn with_capacity(capacity: usize, reader: R) -> FuseBufReadStream<R> {
FuseBufReadStream {
reader,
buf: iter::repeat(0u8).take(capacity).collect(),
cap: capacity,
}
}
}
impl<R: AsyncRead> Stream for FuseBufReadStream<R> {
type Item = std::io::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
unsafe {
this.buf.set_len(this.buf.capacity());
}
let read_size = futures_core::ready!(this.reader.poll_read(cx, this.buf))?;
debug!("FuseBufReadStream read {} bytes", read_size);
if read_size == 0 {
return Poll::Ready(None);
}
unsafe {
this.buf.set_len(read_size);
}
Poll::Ready(Some(Ok(std::mem::replace(
this.buf,
// TODO: FIXME! It should preallocate all Vec<u8> buffers before reading
iter::repeat(0u8).take(*this.cap).collect(),
))))
}
}
#[cfg(test)]
mod test {
use futures::prelude::*;
use futures::stream::StreamExt;
use smol::{self, blocking};
use std::fs::{self, File};
use std::io;
use super::FuseBufReadStream;
#[test]
fn test_for_each_concurrent() -> io::Result<()> {
smol::run(async move {
let capacity = 1024;
let dir = blocking!(fs::read_dir("./src"))?;
let dir = smol::iter(dir);
dir.try_for_each_concurrent(2, |entry| async move {
let path = entry.path();
if path.is_dir() {
println!("skip directory: {:?}", path);
} else {
println!("read file: {:?}", path);
let file = blocking!(File::open(path))?;
let file = smol::reader(file);
FuseBufReadStream::with_capacity(capacity, file)
.for_each_concurrent(10, |res| async move {
match res {
Ok(byte_vec) => {
println!("read {} bytes", byte_vec.len());
let output_length = 16;
if byte_vec.len() > output_length {
println!(
"first {} bytes: {:?}",
output_length,
&byte_vec[..output_length]
);
} else {
println!("total bytes: {:?}", byte_vec);
}
// Ok::<(), Error>(())
}
Err(err) => {
println!("read file failed, the error is: {:?}", err);
}
}
})
.await;
}
Ok(())
})
.await?;
Ok(())
})
}
}
此差异已折叠。
此差异已折叠。
use log::debug;
mod channel;
mod fs;
mod fuse_read;
mod fuse_reply;
mod fuse_request;
mod mount;
mod protocol;
mod session;
use session::*;
fn main() -> anyhow::Result<()> {
env_logger::init();
let mountpoint = match std::env::args_os().nth(1) {
Some(path) => path,
None => {
return Err(anyhow::anyhow!(
"no mount path input, the usage: {} <MOUNTPOINT>",
std::env::args().next().unwrap(), // safe to use unwrap here
));
}
};
debug!("mount point: {:?}", mountpoint);
smol::run(async move {
let ss = Session::new(&mountpoint).await?;
ss.run().await?;
Ok(())
})
}
#[cfg(test)]
mod test {
mod integration_tests;
mod test_util;
use futures::prelude::*;
use futures::stream::StreamExt;
use smol::{self, blocking};
use std::fs::{self, File};
use std::io;
#[test]
fn test_async_iter() -> io::Result<()> {
smol::run(async move {
let dir = blocking!(fs::read_dir("."))?;
let mut dir = smol::iter(dir);
while let Some(entry) = dir.next().await {
let path = entry?.path();
if path.is_file() {
println!("read file: {:?}", path);
let file = blocking!(File::open(path))?;
let mut file = smol::reader(file);
let mut buf = vec![];
file.read_to_end(&mut buf).await?;
let output_length = 16;
if buf.len() > output_length {
println!("first {} bytes: {:?}", output_length, &buf[..output_length]);
} else {
println!("total bytes: {:?}", buf);
}
} else {
println!("skip directory: {:?}", path);
}
}
Ok(())
})
}
}
use anyhow::{self, Context};
use log::{debug, info};
use nix::fcntl::{self, OFlag};
use nix::sys::stat::{self, Mode};
use smol::blocking;
use std::ffi::CString;
use std::fs;
use std::os::raw::c_void;
use std::os::unix::io::RawFd;
use std::path::Path;
use param::*;
#[cfg(target_os = "linux")]
mod param {
// https://github.com/torvalds/linux/blob/master/include/uapi/linux/mount.h#L11
// TODO: use mount flags from libc
// pub const MS_RDONLY: u64 = 1; // Mount read-only
pub const MS_NOSUID: u64 = 2; // Ignore suid and sgid bits
pub const MS_NODEV: u64 = 4; // Disallow access to device special files
pub const MNT_FORCE: i32 = 1; // Force un-mount
}
#[cfg(target_os = "macos")]
mod param {
// https://github.com/apple/darwin-xnu/blob/master/bsd/sys/mount.h#L288
// TODO: use mount flags from libc
// pub const MNT_RDONLY: i32 = 0x00000001; // read only filesystem
pub const MNT_NOSUID: i32 = 0x00000008; // don't honor setuid bits on fs
pub const MNT_NODEV: i32 = 0x00000010; // don't interpret special files
pub const MNT_FORCE: i32 = 0x00080000; // force unmount or readonly change
pub const MNT_NOUSERXATTR: i32 = 0x01000000; // Don't allow user extended attributes
pub const MNT_NOATIME: i32 = 0x10000000; // disable update of file access time
pub const PAGE_SIZE: u32 = 4096;
pub const FUSE_DEFAULT_BLOCKSIZE: u32 = 4096;
pub const FUSE_DEFAULT_DAEMON_TIMEOUT: u32 = 60; // seconds
pub const FUSE_DEFAULT_IOSIZE: u32 = 16 * PAGE_SIZE;
pub const FUSE_IOC_MAGIC: u8 = b'F';
pub const FUSE_IOC_TYPE_MODE: u8 = 5;
pub const FUSE_FSSUBTYPE_UNKNOWN: u32 = 0;
pub const FUSE_MOPT_DEBUG: u64 = 0x0000000000000040;
pub const FUSE_MOPT_FSNAME: u64 = 0x0000000000001000;
pub const FUSE_MOPT_NO_APPLEXATTR: u64 = 0x0000000000800000;
use libc::size_t;
pub const MFSTYPENAMELEN: size_t = 16; // length of fs type name including null
pub const MAXPATHLEN: size_t = 1024; //PATH_MAX
#[repr(C)]
pub(crate) struct FuseMountArgs {
pub mntpath: [u8; MAXPATHLEN], // path to the mount point
pub fsname: [u8; MAXPATHLEN], // file system description string
pub fstypename: [u8; MFSTYPENAMELEN], // file system type name
pub volname: [u8; MAXPATHLEN], // volume name
pub altflags: u64, // see mount-time flags below
pub blocksize: u32, // fictitious block size of our "storage"
pub daemon_timeout: u32, // timeout in seconds for upcalls to daemon
pub fsid: u32, // optional custom value for part of fsid[0]
pub fssubtype: u32, // file system sub type id
pub iosize: u32, // maximum size for reading or writing
pub random: u32, // random "secret" from device
pub rdev: u32, // dev_t for the /dev/osxfuse{n} in question
}
}
#[cfg(target_os = "linux")]
pub async fn umount(short_path: impl AsRef<Path>) -> anyhow::Result<()> {
use nix::unistd;
use std::process::Command;
let mount_path = short_path.as_ref().to_path_buf();
blocking!(
let mntpnt = mount_path.as_os_str();
if unistd::geteuid().is_root() {
// direct umount
#[cfg(target_arch = "aarch64")]
let result = unsafe { libc::umount2(mntpnt as *const _ as *const u8, MNT_FORCE) };
#[cfg(target_arch = "x86_64")]
let result =
unsafe { libc::umount2(mntpnt as *const _ as *const u8 as *const i8, MNT_FORCE) };
if result == 0 {
Ok(())
} else {
Err(anyhow::anyhow!(
"failed to umount fuse device, the error is: {}",
nix::Error::last(),
))
}
} else {
// use fusermount to umount
let umount_handle = Command::new("fusermount")
.arg("-uz") // lazy umount
.arg(mntpnt)
.output()
.expect("fusermount command failed to start");
if umount_handle.status.success() {
Ok(())
} else {
let stderr = String::from_utf8_lossy(&umount_handle.stderr);
debug!("fusermount failed to umount, the error is: {}", &stderr);
Err(anyhow::anyhow!(
"fusermount failed to umount fuse device, the error is: {}",
&stderr,
))
}
}
)
}
#[cfg(target_os = "linux")]
pub async fn mount(mount_point: impl AsRef<Path>) -> anyhow::Result<RawFd> {
use nix::unistd;
if unistd::geteuid().is_root() {
// direct umount
direct_mount(mount_point).await
} else {
// use fusermount to mount
fuser_mount(mount_point).await
}
}
#[cfg(target_os = "linux")]
async fn fuser_mount(mount_point: impl AsRef<Path>) -> anyhow::Result<RawFd> {
use nix::cmsg_space;
use nix::sys::socket::{
self, AddressFamily, ControlMessageOwned, MsgFlags, SockFlag, SockType,
};
use nix::sys::uio::IoVec;
use std::process::Command;
let mount_path = mount_point.as_ref().to_path_buf();
let (local, remote) = blocking!(socket::socketpair(
AddressFamily::Unix,
SockType::Stream,
None,
SockFlag::empty(),
))
.context("failed to create socket pair")?;
let mount_handle = blocking!(Command::new("fusermount")
.arg("-o")
// fusermount option allow_other only allowed if user_allow_other is set in /etc/fuse.conf
.arg("nosuid,nodev,noexec,nonempty") // rw,async,noatime,auto_unmount,allow_other
.arg(mount_path.as_os_str())
.env("_FUSE_COMMFD", remote.to_string())
.output())
.context("fusermount command failed to start")?;
assert!(
mount_handle.status.success(),
"failed to run fusermount, the error is: {}",
String::from_utf8_lossy(&mount_handle.stderr),
);
info!(
"fusermount path={:?} to FUSE device successfully!",
mount_point.as_ref(),
);
blocking!(
let mut buf = [0u8; 5];
let iov = [IoVec::from_mut_slice(&mut buf[..])];
let mut cmsgspace = cmsg_space!([RawFd; 1]);
let msg = socket::recvmsg(local, &iov, Some(&mut cmsgspace), MsgFlags::empty())
.context("failed to receive from fusermount")?;
let mut mount_fd = -1;
for cmsg in msg.cmsgs() {
if let ControlMessageOwned::ScmRights(fd) = cmsg {
debug_assert_eq!(fd.len(), 1);
mount_fd = fd[0];
} else {
panic!("unexpected cmsg");
}
}
debug_assert_ne!(mount_fd, -1);
Ok(mount_fd)
)
}
#[cfg(target_os = "linux")]
async fn direct_mount(mount_point: impl AsRef<Path>) -> anyhow::Result<RawFd> {
use nix::sys::stat::SFlag;
use nix::unistd;
let mount_point = mount_point.as_ref().to_path_buf();
let devpath = Path::new("/dev/fuse");
let dev_fd = blocking!(fcntl::open(devpath, OFlag::O_RDWR, Mode::empty()))
.context("failed to open fuse device")?;
let full_path = blocking!(fs::canonicalize(mount_point))?;
let cstr_path = full_path
.to_str()
.expect("full mount path to string failed");
let mntpath = CString::new(cstr_path).expect("CString::new failed");
let fstype = CString::new("fuse").expect("CString::new failed");
let fsname = CString::new("/dev/fuse").expect("CString::new failed");
let mnt_sb =
blocking!(stat::stat(&full_path)).context("failed to get the file stat of mount point")?;
let opts = format!(
"fd={},rootmode={:o},user_id={},group_id={}",
dev_fd,
mnt_sb.st_mode & SFlag::S_IFMT.bits(),
unistd::getuid().as_raw(),
unistd::getgid().as_raw()
);
let opts = CString::new(&*opts).expect("CString::new failed");
debug!("direct mount opts={:?}", &opts);
blocking!(
let result = unsafe { libc::mount(
fsname.as_ptr(),
mntpath.as_ptr(),
fstype.as_ptr(),
MS_NOSUID | MS_NODEV,
opts.as_ptr() as *const c_void,
)};
if result == 0 {
info!("mount path={:?} to FUSE device={:?} successfully!", mntpath, devpath);
Ok(dev_fd)
} else {
// let e = Errno::from_i32(errno::errno());
// debug!("errno={},the error is: {:?}", errno::errno(), e);
// let mount_fail_str = "mount failed!";
// #[cfg(target_arch = "aarch64")]
// libc::perror(mount_fail_str.as_ptr());
// #[cfg(target_arch = "x86_64")]
// libc::perror(mount_fail_str.as_ptr() as *const i8);
Err(anyhow::anyhow!(
"failed to mount to fuse device, the error is: {}",
nix::Error::last(),
))
}
)
}
#[cfg(any(target_os = "macos"))]
pub async fn umount(mount_point: impl AsRef<Path>) -> nix::Result<()> {
let mntpnt = mount_point.as_ref().to_path_buf();
blocking!(
let mntpnt = mntpnt.as_os_str();
let res = unsafe { libc::unmount(mntpnt as *const _ as *const u8 as *const i8, MNT_FORCE) };
if res < 0 {
Err(nix::Error::last())
} else {
Ok(())
}
)
}
#[cfg(any(target_os = "macos"))]
pub async fn mount(mount_point: impl AsRef<Path>) -> anyhow::Result<RawFd> {
let mount_point = mount_point.as_ref().to_path_buf();
let devpath = Path::new("/dev/osxfuse1");
let fd = blocking!(fcntl::open(devpath, OFlag::O_RDWR, Mode::empty()))
.context("failed to open fuse device")?;
let sb = blocking!(stat::fstat(fd).context("failed to get the file stat of fuse device"))?;
// use ioctl to read device random secret
// osxfuse/support/mount_osxfuse/mount_osxfuse.c#L1099
// result = ioctl(fd, FUSEDEVIOCGETRANDOM, &drandom);
// FUSEDEVIOCGETRANDOM // osxfuse/common/fuse_ioctl.h#L43
let drandom = blocking!(
let mut drandom: u32 = 0;
nix::ioctl_read!(fuse_read_random, FUSE_IOC_MAGIC, FUSE_IOC_TYPE_MODE, u32);
let result = unsafe { fuse_read_random(fd, &mut drandom as *mut _)? };
debug_assert_eq!(result, 0);
debug!("successfully read drandom={}", drandom);
Ok::<_, anyhow::Error>(drandom)
)?;
let full_path = blocking!(fs::canonicalize(mount_point))?;
let cstr_path = full_path
.to_str()
.expect("full mount path to string failed");
let mntpath = CString::new(cstr_path)?;
let fstype = CString::new("osxfuse")?;
let fsname = CString::new("macfuse")?;
let fstypename = CString::new("")?;
let volname = CString::new("OSXFUSE Volume 0 (macfuse)")?;
// (fuse_mount_args) args = {
// mntpath = "/private/tmp/hello"
// fsname = "macfuse@osxfuse0"
// fstypename = ""
// volname = "OSXFUSE Volume 0 (macfuse)"
// altflags = 64
// blocksize = 4096
// daemon_timeout = 60
// fsid = 0
// fssubtype = 0
// iosize = 65536
// random = 1477356727
// rdev = 587202560
// }
fn copy_slice<T: Copy>(from: &[T], to: &mut [T]) {
debug_assert!(to.len() >= from.len());
to[..from.len()].copy_from_slice(&from);
}
let mut mntpath_slice = [0u8; MAXPATHLEN];
copy_slice(mntpath.as_bytes(), &mut mntpath_slice);
let mut fsname_slice = [0u8; MAXPATHLEN];
copy_slice(fsname.as_bytes(), &mut fsname_slice);
let mut fstypename_slice = [0u8; MFSTYPENAMELEN];
copy_slice(fstypename.as_bytes(), &mut fstypename_slice);
let mut volname_slice = [0u8; MAXPATHLEN];
copy_slice(volname.as_bytes(), &mut volname_slice);
let mut mnt_args = FuseMountArgs {
mntpath: mntpath_slice,
fsname: fsname_slice,
fstypename: fstypename_slice,
volname: volname_slice,
altflags: FUSE_MOPT_DEBUG | FUSE_MOPT_FSNAME | FUSE_MOPT_NO_APPLEXATTR,
blocksize: FUSE_DEFAULT_BLOCKSIZE,
daemon_timeout: FUSE_DEFAULT_DAEMON_TIMEOUT,
fsid: 0,
fssubtype: FUSE_FSSUBTYPE_UNKNOWN,
iosize: FUSE_DEFAULT_IOSIZE,
random: drandom,
rdev: sb.st_rdev as u32,
};
blocking!(
let result = unsafe {
libc::mount(
fstype.as_ptr(),
mntpath.as_ptr(),
MNT_NOSUID | MNT_NODEV | MNT_NOUSERXATTR | MNT_NOATIME,
&mut mnt_args as *mut _ as *mut c_void,
)
};
if result == 0 {
info!("mount path={:?} to FUSE device={:?} successfully!", mntpath, devpath);
Ok(fd)
} else {
// let e = Errno::from_i32(errno::errno());
// error!("errno={}, the error is: {:?}", errno::errno(), e);
// let mount_fail_str = "mount failed!";
// unsafe { libc::perror(mount_fail_str.as_ptr() as *const i8); }
Err(anyhow::anyhow!(
"failed to mount to fuse device, the error is: {}",
nix::Error::last(),
))
}
)
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
use log::{debug, info}; // warn, error
use smol::Task;
use std::fs;
use std::path::Path;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use super::super::{mount, session::Session};
pub const DEFAULT_MOUNT_DIR: &str = "../fuse_test";
pub const FILE_CONTENT: &str = "0123456789ABCDEF";
pub fn setup(mount_path: impl AsRef<Path>) -> anyhow::Result<JoinHandle<()>> {
env_logger::init();
let mut mount_dir = mount_path.as_ref().to_path_buf();
mount_dir = smol::block_on(async move {
let result = mount::umount(&mount_dir).await;
if result.is_ok() {
debug!("umounted {:?} before setup", mount_dir);
}
mount_dir
});
if mount_dir.exists() {
fs::remove_dir_all(&mount_dir)?;
}
fs::create_dir_all(&mount_dir)?;
let abs_root_path = fs::canonicalize(&mount_dir)?;
let fs_task = Task::spawn(async move {
async fn run_fs(mount_point: impl AsRef<Path>) -> anyhow::Result<()> {
let ss = Session::new(mount_point).await?;
ss.run().await?;
Ok(())
};
if let Err(e) = run_fs(&abs_root_path).await {
panic!("failed to run filesystem, the error is: {}", e);
}
});
// do not block main thread
let th = thread::spawn(|| {
smol::run(async { fs_task.await });
debug!("spawned a thread for smol::run()");
});
debug!("async_fuse task spawned");
let seconds = 2;
debug!("sleep {} seconds for setup", seconds);
thread::sleep(Duration::new(seconds, 0));
info!("setup finished");
Ok(th)
}
pub fn teardown(mount_dir: impl AsRef<Path>, th: JoinHandle<()>) -> anyhow::Result<()> {
info!("begin teardown");
let seconds = 1;
debug!("sleep {} seconds for teardown", seconds);
thread::sleep(Duration::new(seconds, 0));
smol::block_on(async {
mount::umount(&mount_dir).await.unwrap(); // TODO: remove unwrap()
});
let abs_mount_path = fs::canonicalize(mount_dir)?;
fs::remove_dir_all(&abs_mount_path)?;
th.join().unwrap(); // TODO: remove unwrap()
Ok(())
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册