tracker.go 5.5 KB
Newer Older
martianzhang's avatar
martianzhang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
	"bytes"
	"fmt"
	"sync"
martianzhang's avatar
martianzhang 已提交
20
	"sync/atomic"
martianzhang's avatar
martianzhang 已提交
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
)

// Tracker is used to track the memory usage during query execution.
// It contains an optional limit and can be arranged into a tree structure
// such that the consumption tracked by a Tracker is also tracked by
// its ancestors. The main idea comes from Apache Impala:
//
// https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/runtime/mem-tracker.h
//
// By default, memory consumption is tracked via calls to "Consume()", either to
// the tracker itself or to one of its descendents. A typical sequence of calls
// for a single Tracker is:
// 1. tracker.SetLabel() / tracker.SetActionOnExceed() / tracker.AttachTo()
// 2. tracker.Consume() / tracker.ReplaceChild() / tracker.BytesConsumed()
//
// NOTE: We only protect concurrent access to "bytesConsumed" and "children",
// that is to say:
// 1. Only "BytesConsumed()", "Consume()", "AttachTo()" and "Detach" are thread-safe.
// 2. Other operations of a Tracker tree is not thread-safe.
type Tracker struct {
martianzhang's avatar
martianzhang 已提交
41 42 43 44
	mu struct {
		sync.Mutex
		children []*Tracker // The children memory trackers
	}
martianzhang's avatar
martianzhang 已提交
45 46 47 48 49

	label          string // Label of this "Tracker".
	bytesConsumed  int64  // Consumed bytes.
	bytesLimit     int64  // Negative value means no limit.
	actionOnExceed ActionOnExceed
martianzhang's avatar
martianzhang 已提交
50
	parent         *Tracker // The parent memory tracker.
martianzhang's avatar
martianzhang 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
}

// NewTracker creates a memory tracker.
//	1. "label" is the label used in the usage string.
//	2. "bytesLimit < 0" means no limit.
func NewTracker(label string, bytesLimit int64) *Tracker {
	return &Tracker{
		label:          label,
		bytesLimit:     bytesLimit,
		actionOnExceed: &LogOnExceed{},
	}
}

// SetActionOnExceed sets the action when memory usage is out of memory quota.
func (t *Tracker) SetActionOnExceed(a ActionOnExceed) {
	t.actionOnExceed = a
}

// SetLabel sets the label of a Tracker.
func (t *Tracker) SetLabel(label string) {
	t.label = label
}

// AttachTo attaches this memory tracker as a child to another Tracker. If it
// already has a parent, this function will remove it from the old parent.
// Its consumed memory usage is used to update all its ancestors.
func (t *Tracker) AttachTo(parent *Tracker) {
	if t.parent != nil {
		t.parent.remove(t)
	}
martianzhang's avatar
martianzhang 已提交
81 82 83
	parent.mu.Lock()
	parent.mu.children = append(parent.mu.children, t)
	parent.mu.Unlock()
martianzhang's avatar
martianzhang 已提交
84 85 86 87 88 89 90 91 92 93 94

	t.parent = parent
	t.parent.Consume(t.BytesConsumed())
}

// Detach detaches this Tracker from its parent.
func (t *Tracker) Detach() {
	t.parent.remove(t)
}

func (t *Tracker) remove(oldChild *Tracker) {
martianzhang's avatar
martianzhang 已提交
95 96 97
	t.mu.Lock()
	defer t.mu.Unlock()
	for i, child := range t.mu.children {
martianzhang's avatar
martianzhang 已提交
98 99 100 101
		if child != oldChild {
			continue
		}

martianzhang's avatar
martianzhang 已提交
102
		atomic.AddInt64(&t.bytesConsumed, -oldChild.BytesConsumed())
martianzhang's avatar
martianzhang 已提交
103
		oldChild.parent = nil
martianzhang's avatar
martianzhang 已提交
104
		t.mu.children = append(t.mu.children[:i], t.mu.children[i+1:]...)
martianzhang's avatar
martianzhang 已提交
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
		break
	}
}

// ReplaceChild removes the old child specified in "oldChild" and add a new
// child specified in "newChild". old child's memory consumption will be
// removed and new child's memory consumption will be added.
func (t *Tracker) ReplaceChild(oldChild, newChild *Tracker) {
	if newChild == nil {
		t.remove(oldChild)
		return
	}

	newConsumed := newChild.BytesConsumed()
	newChild.parent = t

martianzhang's avatar
martianzhang 已提交
121 122
	t.mu.Lock()
	for i, child := range t.mu.children {
martianzhang's avatar
martianzhang 已提交
123 124 125 126 127 128
		if child != oldChild {
			continue
		}

		newConsumed -= oldChild.BytesConsumed()
		oldChild.parent = nil
martianzhang's avatar
martianzhang 已提交
129
		t.mu.children[i] = newChild
martianzhang's avatar
martianzhang 已提交
130 131
		break
	}
martianzhang's avatar
martianzhang 已提交
132
	t.mu.Unlock()
martianzhang's avatar
martianzhang 已提交
133 134 135 136 137 138 139 140 141

	t.Consume(newConsumed)
}

// Consume is used to consume a memory usage. "bytes" can be a negative value,
// which means this is a memory release operation.
func (t *Tracker) Consume(bytes int64) {
	var rootExceed *Tracker
	for tracker := t; tracker != nil; tracker = tracker.parent {
martianzhang's avatar
martianzhang 已提交
142
		if atomic.AddInt64(&tracker.bytesConsumed, bytes) >= tracker.bytesLimit && tracker.bytesLimit > 0 {
martianzhang's avatar
martianzhang 已提交
143 144 145 146 147 148 149 150 151 152
			rootExceed = tracker
		}
	}
	if rootExceed != nil {
		rootExceed.actionOnExceed.Action(rootExceed)
	}
}

// BytesConsumed returns the consumed memory usage value in bytes.
func (t *Tracker) BytesConsumed() int64 {
martianzhang's avatar
martianzhang 已提交
153
	return atomic.LoadInt64(&t.bytesConsumed)
martianzhang's avatar
martianzhang 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
}

// String returns the string representation of this Tracker tree.
func (t *Tracker) String() string {
	buffer := bytes.NewBufferString("\n")
	t.toString("", buffer)
	return buffer.String()
}

func (t *Tracker) toString(indent string, buffer *bytes.Buffer) {
	fmt.Fprintf(buffer, "%s\"%s\"{\n", indent, t.label)
	if t.bytesLimit > 0 {
		fmt.Fprintf(buffer, "%s  \"quota\": %s\n", indent, t.bytesToString(t.bytesLimit))
	}
	fmt.Fprintf(buffer, "%s  \"consumed\": %s\n", indent, t.bytesToString(t.BytesConsumed()))

martianzhang's avatar
martianzhang 已提交
170 171 172 173
	t.mu.Lock()
	for i := range t.mu.children {
		if t.mu.children[i] != nil {
			t.mu.children[i].toString(indent+"  ", buffer)
martianzhang's avatar
martianzhang 已提交
174 175
		}
	}
martianzhang's avatar
martianzhang 已提交
176
	t.mu.Unlock()
martianzhang's avatar
martianzhang 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
	buffer.WriteString(indent + "}\n")
}

func (t *Tracker) bytesToString(numBytes int64) string {
	GB := float64(numBytes) / float64(1<<30)
	if GB > 1 {
		return fmt.Sprintf("%v GB", GB)
	}

	MB := float64(numBytes) / float64(1<<20)
	if MB > 1 {
		return fmt.Sprintf("%v MB", MB)
	}

	KB := float64(numBytes) / float64(1<<10)
	if KB > 1 {
		return fmt.Sprintf("%v KB", KB)
	}

	return fmt.Sprintf("%v Bytes", numBytes)
}