未验证 提交 d9485fb7 编写于 作者: S Sandy Xu 提交者: GitHub

add client information for each session (#491)

* meta: rename info version to redisVersion

* add client information for each session

* cmd: add `sessions` subcommand to list all client sessions

* meta/sql: sync session table when creating sessions to add 'info' column

* cmd/sessions: add --detail option to show sustained inode and locks related to each session

* cmd: remove `sessions` subcommand, and add its functions to `status`

* cmd/status: remove --detail flag, instead only show detailed information if sid is specified

* cmd/status: add -f/--format to specify format of output

* cmd/status: use only json format for now

* cmd/status: rename print to printJson
上级 5016705c
......@@ -88,6 +88,7 @@ func mount(c *cli.Context) error {
Retries: 10,
Strict: true,
CaseInsensi: strings.HasSuffix(mp, ":") && runtime.GOOS == "windows",
MountPoint: mp,
})
format, err := m.Load()
if err != nil {
......
......@@ -23,23 +23,48 @@ import (
"github.com/urfave/cli/v2"
)
type sections struct {
Setting *meta.Format
Sessions []*meta.Session
}
func printJson(v interface{}) {
output, err := json.MarshalIndent(v, "", " ")
if err != nil {
logger.Fatalf("json: %s", err)
}
fmt.Println(string(output))
}
func status(ctx *cli.Context) error {
setLoggerLevel(ctx)
if ctx.Args().Len() < 1 {
return fmt.Errorf("REDIS-URL is needed")
}
m := meta.NewClient(ctx.Args().Get(0), &meta.Config{Retries: 10, Strict: true})
if sid := ctx.Uint64("session"); sid != 0 {
s, err := m.GetSession(sid)
if err != nil {
logger.Fatalf("get session: %s", err)
}
printJson(s)
return nil
}
format, err := m.Load()
if err != nil {
logger.Fatalf("load setting: %s", err)
}
format.SecretKey = ""
format.EncryptKey = ""
data, err := json.MarshalIndent(format, "", " ")
sessions, err := m.ListSessions()
if err != nil {
logger.Fatalf("json: %s", err)
logger.Fatalf("list sessions: %s", err)
}
fmt.Println(string(data))
printJson(&sections{format, sessions})
return nil
}
......@@ -49,5 +74,12 @@ func statusFlags() *cli.Command {
Usage: "show status of JuiceFS",
ArgsUsage: "REDIS-URL",
Action: status,
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "session",
Aliases: []string{"s"},
Usage: "show detailed information (sustained inodes, locks) of the specified session (sid)",
},
},
}
}
......@@ -20,6 +20,7 @@ type Config struct {
Strict bool // update ctime
Retries int
CaseInsensi bool
MountPoint string
}
type Format struct {
......
......@@ -19,6 +19,9 @@ import (
"os"
"strings"
"syscall"
"time"
"github.com/juicedata/juicefs/pkg/version"
)
const (
......@@ -133,14 +136,47 @@ type Summary struct {
Dirs uint64
}
type SessionInfo struct {
Version string
Hostname string
MountPoint string
ProcessID int
}
type Flock struct {
Inode Ino
Owner uint64
Ltype string
}
type Plock struct {
Inode Ino
Owner uint64
Records []byte // FIXME: loadLocks
}
// Session contains detailed information of a client session
type Session struct {
Sid uint64
Heartbeat time.Time
SessionInfo
Sustained []Ino `json:",omitempty"`
Flocks []Flock `json:",omitempty"`
Plocks []Plock `json:",omitempty"`
}
// Meta is a interface for a meta service for file system.
type Meta interface {
// Init is used to initialize a meta service.
Init(format Format, force bool) error
// Load loads the existing setting of a formatted volume from meta service.
Load() (*Format, error)
// NewSession create a new client session.
// NewSession creates a new client session.
NewSession() error
// GetSession retrieves information of session with sid
GetSession(sid uint64) (*Session, error)
// ListSessions returns all client sessions.
ListSessions() ([]*Session, error)
// StatFS returns summary statistics of a volume.
StatFS(ctx Context, totalspace, availspace, iused, iavail *uint64) syscall.Errno
......@@ -261,3 +297,11 @@ func NewClient(uri string, conf *Config) Meta {
}
return m
}
func newSessionInfo() (*SessionInfo, error) {
host, err := os.Hostname()
if err != nil {
return nil, err
}
return &SessionInfo{Version: version.Version(), Hostname: host, ProcessID: os.Getpid()}, nil
}
......@@ -47,7 +47,7 @@ import (
POSIX lock: lockp$inode -> { $sid_$owner -> Plock(pid,ltype,start,end) }
Sessions: sessions -> [ $sid -> heartbeat ]
sustained: session$sid -> [$inode]
locked: locked$sid -> { lockf$inode or lockpinode }
locked: locked$sid -> { lockf$inode or lockp$inode }
Removed files: delfiles -> [$inode:$length -> seconds]
Slices refs: k$chunkid_$size -> refcount
......@@ -66,6 +66,7 @@ const usedSpace = "usedSpace"
const totalInodes = "totalInodes"
const delfiles = "delfiles"
const allSessions = "sessions"
const sessionInfos = "sessionInfos"
const sliceRefs = "sliceRef"
type redisMeta struct {
......@@ -237,6 +238,16 @@ func (r *redisMeta) NewSession() error {
return fmt.Errorf("create session: %s", err)
}
logger.Debugf("session is %d", r.sid)
info, err := newSessionInfo()
if err != nil {
return fmt.Errorf("new session info: %s", err)
}
info.MountPoint = r.conf.MountPoint
data, err := json.Marshal(info)
if err != nil {
return fmt.Errorf("json: %s", err)
}
r.rdb.HSet(Background, sessionInfos, r.sid, data)
r.shaLookup, err = r.rdb.ScriptLoad(Background, scriptLookup).Result()
if err != nil {
......@@ -255,6 +266,92 @@ func (r *redisMeta) NewSession() error {
return nil
}
func (r *redisMeta) getSession(sid string, detail bool) (*Session, error) {
ctx := Background
info, err := r.rdb.HGet(ctx, sessionInfos, sid).Bytes()
if err == redis.Nil { // legacy client has no info
info = []byte("{}")
} else if err != nil {
return nil, fmt.Errorf("HGet %s %s: %s", sessionInfos, sid, err)
}
var s Session
if err := json.Unmarshal(info, &s); err != nil {
return nil, fmt.Errorf("corrupted session info; json error: %s", err)
}
s.Sid, _ = strconv.ParseUint(sid, 10, 64)
if detail {
inodes, err := r.rdb.SMembers(ctx, r.sustained(int64(s.Sid))).Result()
if err != nil {
return nil, fmt.Errorf("SMembers %s: %s", sid, err)
}
s.Sustained = make([]Ino, 0, len(inodes))
for _, sinode := range inodes {
inode, _ := strconv.ParseUint(sinode, 10, 64)
s.Sustained = append(s.Sustained, Ino(inode))
}
locks, err := r.rdb.SMembers(ctx, r.lockedKey(int64(s.Sid))).Result()
if err != nil {
return nil, fmt.Errorf("SMembers %s: %s", sid, err)
}
s.Flocks = make([]Flock, 0, len(locks)) // greedy
s.Plocks = make([]Plock, 0, len(locks))
for _, lock := range locks {
owners, err := r.rdb.HGetAll(ctx, lock).Result()
if err != nil {
return nil, fmt.Errorf("HGetAll %s: %s", lock, err)
}
isFlock := strings.HasPrefix(lock, "lockf")
inode, _ := strconv.ParseUint(lock[5:], 10, 64)
for k, v := range owners {
parts := strings.Split(k, "_")
if parts[0] != sid {
continue
}
owner, _ := strconv.ParseUint(parts[1], 16, 64)
if isFlock {
s.Flocks = append(s.Flocks, Flock{Ino(inode), owner, v})
} else {
s.Plocks = append(s.Plocks, Plock{Ino(inode), owner, []byte(v)})
}
}
}
}
return &s, nil
}
func (r *redisMeta) GetSession(sid uint64) (*Session, error) {
key := strconv.FormatUint(sid, 10)
score, err := r.rdb.ZScore(Background, allSessions, key).Result()
if err != nil {
return nil, err
}
s, err := r.getSession(key, true)
if err != nil {
return nil, err
}
s.Heartbeat = time.Unix(int64(score), 0)
return s, nil
}
func (r *redisMeta) ListSessions() ([]*Session, error) {
keys, err := r.rdb.ZRangeWithScores(Background, allSessions, 0, -1).Result()
if err != nil {
return nil, err
}
sessions := make([]*Session, 0, len(keys))
for _, k := range keys {
s, err := r.getSession(k.Member.(string), false)
if err != nil {
logger.Errorf("get session: %s", err)
continue
}
s.Heartbeat = time.Unix(int64(k.Score), 0)
sessions = append(sessions, s)
}
return sessions, nil
}
func (r *redisMeta) OnMsg(mtype uint32, cb MsgCallback) {
r.msgCallbacks.Lock()
defer r.msgCallbacks.Unlock()
......@@ -1670,6 +1767,7 @@ func (r *redisMeta) cleanStaleSession(sid int64) {
}
if len(inodes) == 0 {
r.rdb.ZRem(ctx, allSessions, strconv.Itoa(int(sid)))
r.rdb.HDel(ctx, sessionInfos, strconv.Itoa(int(sid)))
logger.Infof("cleanup session %d", sid)
}
}
......
......@@ -109,6 +109,7 @@ type plock struct {
type session struct {
Sid uint64 `xorm:"pk"`
Heartbeat int64 `xorm:"notnull"`
Info []byte `xorm:"blob"`
}
type sustained struct {
......@@ -269,12 +270,24 @@ func (m *dbMeta) Load() (*Format, error) {
}
func (m *dbMeta) NewSession() error {
if err := m.engine.Sync2(new(session)); err != nil { // old client has no info field
return err
}
v, err := m.incrCounter("nextSession", 1)
if err != nil {
return fmt.Errorf("create session: %s", err)
}
info, err := newSessionInfo()
if err != nil {
return fmt.Errorf("new session info: %s", err)
}
info.MountPoint = m.conf.MountPoint
data, err := json.Marshal(info)
if err != nil {
return fmt.Errorf("json: %s", err)
}
err = m.txn(func(s *xorm.Session) error {
return mustInsert(s, &session{v, time.Now().Unix()})
return mustInsert(s, &session{v, time.Now().Unix(), data})
})
if err != nil {
return fmt.Errorf("insert new session: %s", err)
......@@ -289,6 +302,79 @@ func (m *dbMeta) NewSession() error {
return nil
}
func (m *dbMeta) getSession(row *session, detail bool) (*Session, error) {
var s Session
if row.Info == nil { // legacy client has no info
row.Info = []byte("{}")
}
if err := json.Unmarshal(row.Info, &s); err != nil {
return nil, fmt.Errorf("corrupted session info; json error: %s", err)
}
s.Sid = row.Sid
s.Heartbeat = time.Unix(row.Heartbeat, 0)
if detail {
var (
srows []sustained
frows []flock
prows []plock
)
if err := m.engine.Find(&srows, &sustained{Sid: s.Sid}); err != nil {
return nil, fmt.Errorf("find sustained %d: %s", s.Sid, err)
}
s.Sustained = make([]Ino, 0, len(srows))
for _, srow := range srows {
s.Sustained = append(s.Sustained, srow.Inode)
}
if err := m.engine.Find(&frows, &flock{Sid: s.Sid}); err != nil {
return nil, fmt.Errorf("find flock %d: %s", s.Sid, err)
}
s.Flocks = make([]Flock, 0, len(frows))
for _, frow := range frows {
s.Flocks = append(s.Flocks, Flock{frow.Inode, frow.Owner, string(frow.Ltype)})
}
if err := m.engine.Find(&prows, &plock{Sid: s.Sid}); err != nil {
return nil, fmt.Errorf("find plock %d: %s", s.Sid, err)
}
s.Plocks = make([]Plock, 0, len(prows))
for _, prow := range prows {
s.Plocks = append(s.Plocks, Plock{prow.Inode, prow.Owner, prow.Records})
}
}
return &s, nil
}
func (m *dbMeta) GetSession(sid uint64) (*Session, error) {
row := session{Sid: sid}
ok, err := m.engine.Get(&row)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("session not found: %d", sid)
}
return m.getSession(&row, true)
}
func (m *dbMeta) ListSessions() ([]*Session, error) {
var rows []session
err := m.engine.Find(&rows)
if err != nil {
return nil, err
}
sessions := make([]*Session, 0, len(rows))
for _, row := range rows {
s, err := m.getSession(&row, false)
if err != nil {
logger.Errorf("get session: %s", err)
continue
}
sessions = append(sessions, s)
}
return sessions, nil
}
func (m *dbMeta) OnMsg(mtype uint32, cb MsgCallback) {
m.msgCallbacks.Lock()
defer m.msgCallbacks.Unlock()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册