tracker.go 6.3 KB
Newer Older
martianzhang's avatar
martianzhang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
	"bytes"
	"fmt"
	"sync"
martianzhang's avatar
martianzhang 已提交
20
	"sync/atomic"
martianzhang's avatar
martianzhang 已提交
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
)

// Tracker is used to track the memory usage during query execution.
// It contains an optional limit and can be arranged into a tree structure
// such that the consumption tracked by a Tracker is also tracked by
// its ancestors. The main idea comes from Apache Impala:
//
// https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/runtime/mem-tracker.h
//
// By default, memory consumption is tracked via calls to "Consume()", either to
// the tracker itself or to one of its descendents. A typical sequence of calls
// for a single Tracker is:
// 1. tracker.SetLabel() / tracker.SetActionOnExceed() / tracker.AttachTo()
// 2. tracker.Consume() / tracker.ReplaceChild() / tracker.BytesConsumed()
//
// NOTE: We only protect concurrent access to "bytesConsumed" and "children",
// that is to say:
// 1. Only "BytesConsumed()", "Consume()", "AttachTo()" and "Detach" are thread-safe.
// 2. Other operations of a Tracker tree is not thread-safe.
type Tracker struct {
martianzhang's avatar
martianzhang 已提交
41 42 43 44
	mu struct {
		sync.Mutex
		children []*Tracker // The children memory trackers
	}
martianzhang's avatar
martianzhang 已提交
45

martianzhang's avatar
martianzhang 已提交
46 47 48 49
	label          fmt.Stringer // Label of this "Tracker".
	bytesConsumed  int64        // Consumed bytes.
	bytesLimit     int64        // Negative value means no limit.
	maxConsumed    int64        // max number of bytes consumed during execution.
martianzhang's avatar
martianzhang 已提交
50
	actionOnExceed ActionOnExceed
martianzhang's avatar
martianzhang 已提交
51
	parent         *Tracker // The parent memory tracker.
martianzhang's avatar
martianzhang 已提交
52 53 54 55 56
}

// NewTracker creates a memory tracker.
//	1. "label" is the label used in the usage string.
//	2. "bytesLimit < 0" means no limit.
martianzhang's avatar
martianzhang 已提交
57
func NewTracker(label fmt.Stringer, bytesLimit int64) *Tracker {
martianzhang's avatar
martianzhang 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70
	return &Tracker{
		label:          label,
		bytesLimit:     bytesLimit,
		actionOnExceed: &LogOnExceed{},
	}
}

// SetActionOnExceed sets the action when memory usage is out of memory quota.
func (t *Tracker) SetActionOnExceed(a ActionOnExceed) {
	t.actionOnExceed = a
}

// SetLabel sets the label of a Tracker.
martianzhang's avatar
martianzhang 已提交
71
func (t *Tracker) SetLabel(label fmt.Stringer) {
martianzhang's avatar
martianzhang 已提交
72 73 74 75 76 77 78 79 80 81
	t.label = label
}

// AttachTo attaches this memory tracker as a child to another Tracker. If it
// already has a parent, this function will remove it from the old parent.
// Its consumed memory usage is used to update all its ancestors.
func (t *Tracker) AttachTo(parent *Tracker) {
	if t.parent != nil {
		t.parent.remove(t)
	}
martianzhang's avatar
martianzhang 已提交
82 83 84
	parent.mu.Lock()
	parent.mu.children = append(parent.mu.children, t)
	parent.mu.Unlock()
martianzhang's avatar
martianzhang 已提交
85 86 87 88 89 90 91 92 93 94 95

	t.parent = parent
	t.parent.Consume(t.BytesConsumed())
}

// Detach detaches this Tracker from its parent.
func (t *Tracker) Detach() {
	t.parent.remove(t)
}

func (t *Tracker) remove(oldChild *Tracker) {
martianzhang's avatar
martianzhang 已提交
96 97 98
	t.mu.Lock()
	defer t.mu.Unlock()
	for i, child := range t.mu.children {
martianzhang's avatar
martianzhang 已提交
99 100 101 102
		if child != oldChild {
			continue
		}

martianzhang's avatar
martianzhang 已提交
103
		atomic.AddInt64(&t.bytesConsumed, -oldChild.BytesConsumed())
martianzhang's avatar
martianzhang 已提交
104
		oldChild.parent = nil
martianzhang's avatar
martianzhang 已提交
105
		t.mu.children = append(t.mu.children[:i], t.mu.children[i+1:]...)
martianzhang's avatar
martianzhang 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
		break
	}
}

// ReplaceChild removes the old child specified in "oldChild" and add a new
// child specified in "newChild". old child's memory consumption will be
// removed and new child's memory consumption will be added.
func (t *Tracker) ReplaceChild(oldChild, newChild *Tracker) {
	if newChild == nil {
		t.remove(oldChild)
		return
	}

	newConsumed := newChild.BytesConsumed()
	newChild.parent = t

martianzhang's avatar
martianzhang 已提交
122 123
	t.mu.Lock()
	for i, child := range t.mu.children {
martianzhang's avatar
martianzhang 已提交
124 125 126 127 128 129
		if child != oldChild {
			continue
		}

		newConsumed -= oldChild.BytesConsumed()
		oldChild.parent = nil
martianzhang's avatar
martianzhang 已提交
130
		t.mu.children[i] = newChild
martianzhang's avatar
martianzhang 已提交
131 132
		break
	}
martianzhang's avatar
martianzhang 已提交
133
	t.mu.Unlock()
martianzhang's avatar
martianzhang 已提交
134 135 136 137 138 139 140 141 142

	t.Consume(newConsumed)
}

// Consume is used to consume a memory usage. "bytes" can be a negative value,
// which means this is a memory release operation.
func (t *Tracker) Consume(bytes int64) {
	var rootExceed *Tracker
	for tracker := t; tracker != nil; tracker = tracker.parent {
martianzhang's avatar
martianzhang 已提交
143
		if atomic.AddInt64(&tracker.bytesConsumed, bytes) >= tracker.bytesLimit && tracker.bytesLimit > 0 {
martianzhang's avatar
martianzhang 已提交
144 145
			rootExceed = tracker
		}
martianzhang's avatar
martianzhang 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158

		if tracker.parent == nil {
			// since we only need a total memory usage during execution,
			// we only record max consumed bytes in root(statement-level) for performance.
			for {
				maxNow := atomic.LoadInt64(&tracker.maxConsumed)
				consumed := atomic.LoadInt64(&tracker.bytesConsumed)
				if consumed > maxNow && !atomic.CompareAndSwapInt64(&tracker.maxConsumed, maxNow, consumed) {
					continue
				}
				break
			}
		}
martianzhang's avatar
martianzhang 已提交
159 160 161 162 163 164 165 166
	}
	if rootExceed != nil {
		rootExceed.actionOnExceed.Action(rootExceed)
	}
}

// BytesConsumed returns the consumed memory usage value in bytes.
func (t *Tracker) BytesConsumed() int64 {
martianzhang's avatar
martianzhang 已提交
167
	return atomic.LoadInt64(&t.bytesConsumed)
martianzhang's avatar
martianzhang 已提交
168 169
}

martianzhang's avatar
martianzhang 已提交
170 171 172 173 174
// MaxConsumed returns max number of bytes consumed during execution.
func (t *Tracker) MaxConsumed() int64 {
	return atomic.LoadInt64(&t.maxConsumed)
}

martianzhang's avatar
martianzhang 已提交
175 176 177 178 179 180 181 182 183 184
// String returns the string representation of this Tracker tree.
func (t *Tracker) String() string {
	buffer := bytes.NewBufferString("\n")
	t.toString("", buffer)
	return buffer.String()
}

func (t *Tracker) toString(indent string, buffer *bytes.Buffer) {
	fmt.Fprintf(buffer, "%s\"%s\"{\n", indent, t.label)
	if t.bytesLimit > 0 {
martianzhang's avatar
martianzhang 已提交
185
		fmt.Fprintf(buffer, "%s  \"quota\": %s\n", indent, t.BytesToString(t.bytesLimit))
martianzhang's avatar
martianzhang 已提交
186
	}
martianzhang's avatar
martianzhang 已提交
187
	fmt.Fprintf(buffer, "%s  \"consumed\": %s\n", indent, t.BytesToString(t.BytesConsumed()))
martianzhang's avatar
martianzhang 已提交
188

martianzhang's avatar
martianzhang 已提交
189 190 191 192
	t.mu.Lock()
	for i := range t.mu.children {
		if t.mu.children[i] != nil {
			t.mu.children[i].toString(indent+"  ", buffer)
martianzhang's avatar
martianzhang 已提交
193 194
		}
	}
martianzhang's avatar
martianzhang 已提交
195
	t.mu.Unlock()
martianzhang's avatar
martianzhang 已提交
196 197 198
	buffer.WriteString(indent + "}\n")
}

martianzhang's avatar
martianzhang 已提交
199 200
// BytesToString converts the memory consumption to a readable string.
func (t *Tracker) BytesToString(numBytes int64) string {
martianzhang's avatar
martianzhang 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
	GB := float64(numBytes) / float64(1<<30)
	if GB > 1 {
		return fmt.Sprintf("%v GB", GB)
	}

	MB := float64(numBytes) / float64(1<<20)
	if MB > 1 {
		return fmt.Sprintf("%v MB", MB)
	}

	KB := float64(numBytes) / float64(1<<10)
	if KB > 1 {
		return fmt.Sprintf("%v KB", KB)
	}

	return fmt.Sprintf("%v Bytes", numBytes)
}