ingress-nginx-helm/vendor/github.com/ncabatoff/process-exporter/proc/tracker.go
Manuel Alejandro de Brito Fontes 9278f0cad2
Update metric dependencies (#5023)
2020-02-06 09:50:13 -03:00

445 lines
13 KiB
Go

package proc
import (
"fmt"
"log"
"os/user"
"strconv"
"time"
seq "github.com/ncabatoff/go-seq/seq"
common "github.com/ncabatoff/process-exporter"
)
type (
// Tracker tracks processes and records metrics.
Tracker struct {
// namer determines what processes to track and names them
namer common.MatchNamer
// tracked holds the processes are being monitored. Processes
// may be blacklisted such that they no longer get tracked by
// setting their value in the tracked map to nil.
tracked map[ID]*trackedProc
// procIds is a map from pid to ProcId. This is a convenience
// to allow finding the Tracked entry of a parent process.
procIds map[int]ID
// trackChildren makes Tracker track descendants of procs the
// namer wanted tracked.
trackChildren bool
// trackThreads makes Tracker track per-thread metrics.
trackThreads bool
// never ignore processes, i.e. always re-check untracked processes in case comm has changed
alwaysRecheck bool
username map[int]string
debug bool
}
// Delta is an alias of Counts used to signal that its contents are not
// totals, but rather the result of subtracting two totals.
Delta Counts
trackedThread struct {
name string
accum Counts
latest Delta
lastUpdate time.Time
wchan string
}
// trackedProc accumulates metrics for a process, as well as
// remembering an optional GroupName tag associated with it.
trackedProc struct {
// lastUpdate is used internally during the update cycle to find which procs have exited
lastUpdate time.Time
// static
static Static
metrics Metrics
// lastaccum is the increment to the counters seen in the last update.
lastaccum Delta
// groupName is the tag for this proc given by the namer.
groupName string
threads map[ThreadID]trackedThread
}
// ThreadUpdate describes what's changed for a thread since the last cycle.
ThreadUpdate struct {
// ThreadName is the name of the thread based on field of stat.
ThreadName string
// Latest is how much the counts increased since last cycle.
Latest Delta
}
// Update reports on the latest stats for a process.
Update struct {
// GroupName is the name given by the namer to the process.
GroupName string
// Latest is how much the counts increased since last cycle.
Latest Delta
// Memory is the current memory usage.
Memory
// Filedesc is the current fd usage/limit.
Filedesc
// Start is the time the process started.
Start time.Time
// NumThreads is the number of threads.
NumThreads uint64
// States is how many processes are in which run state.
States
// Wchans is how many threads are in each non-zero wchan.
Wchans map[string]int
// Threads are the thread updates for this process, if the Tracker
// has trackThreads==true.
Threads []ThreadUpdate
}
// CollectErrors describes non-fatal errors found while collecting proc
// metrics.
CollectErrors struct {
// Read is incremented every time GetMetrics() returns an error.
// This means we failed to load even the basics for the process,
// and not just because it disappeared on us.
Read int
// Partial is incremented every time we're unable to collect
// some metrics (e.g. I/O) for a tracked proc, but we're still able
// to get the basic stuff like cmdline and core stats.
Partial int
}
)
func lessUpdateGroupName(x, y Update) bool { return x.GroupName < y.GroupName }
func lessThreadUpdate(x, y ThreadUpdate) bool { return seq.Compare(x, y) < 0 }
func lessCounts(x, y Counts) bool { return seq.Compare(x, y) < 0 }
func (tp *trackedProc) getUpdate() Update {
u := Update{
GroupName: tp.groupName,
Latest: tp.lastaccum,
Memory: tp.metrics.Memory,
Filedesc: tp.metrics.Filedesc,
Start: tp.static.StartTime,
NumThreads: tp.metrics.NumThreads,
States: tp.metrics.States,
Wchans: make(map[string]int),
}
if tp.metrics.Wchan != "" {
u.Wchans[tp.metrics.Wchan] = 1
}
if len(tp.threads) > 1 {
for _, tt := range tp.threads {
u.Threads = append(u.Threads, ThreadUpdate{tt.name, tt.latest})
if tt.wchan != "" {
u.Wchans[tt.wchan]++
}
}
}
return u
}
// NewTracker creates a Tracker.
func NewTracker(namer common.MatchNamer, trackChildren, trackThreads, alwaysRecheck, debug bool) *Tracker {
return &Tracker{
namer: namer,
tracked: make(map[ID]*trackedProc),
procIds: make(map[int]ID),
trackChildren: trackChildren,
trackThreads: trackThreads,
alwaysRecheck: alwaysRecheck,
username: make(map[int]string),
debug: debug,
}
}
func (t *Tracker) track(groupName string, idinfo IDInfo) {
tproc := trackedProc{
groupName: groupName,
static: idinfo.Static,
metrics: idinfo.Metrics,
}
if len(idinfo.Threads) > 0 {
tproc.threads = make(map[ThreadID]trackedThread)
for _, thr := range idinfo.Threads {
tproc.threads[thr.ThreadID] = trackedThread{
thr.ThreadName, thr.Counts, Delta{}, time.Time{}, thr.Wchan}
}
}
t.tracked[idinfo.ID] = &tproc
}
func (t *Tracker) ignore(id ID) {
// only ignore ID if we didn't set recheck to true
if t.alwaysRecheck == false {
t.tracked[id] = nil
}
}
func (tp *trackedProc) update(metrics Metrics, now time.Time, cerrs *CollectErrors, threads []Thread) {
// newcounts: resource consumption since last cycle
newcounts := metrics.Counts
tp.lastaccum = newcounts.Sub(tp.metrics.Counts)
tp.metrics = metrics
tp.lastUpdate = now
if len(threads) > 1 {
if tp.threads == nil {
tp.threads = make(map[ThreadID]trackedThread)
}
for _, thr := range threads {
tt := trackedThread{thr.ThreadName, thr.Counts, Delta{}, now, thr.Wchan}
if old, ok := tp.threads[thr.ThreadID]; ok {
tt.latest, tt.accum = thr.Counts.Sub(old.accum), thr.Counts
}
tp.threads[thr.ThreadID] = tt
}
for id, tt := range tp.threads {
if tt.lastUpdate != now {
delete(tp.threads, id)
}
}
} else {
tp.threads = nil
}
}
// handleProc updates the tracker if it's a known and not ignored proc.
// If it's neither known nor ignored, newProc will be non-nil.
// It is not an error if the process disappears while we are reading
// its info out of /proc, it just means nothing will be returned and
// the tracker will be unchanged.
func (t *Tracker) handleProc(proc Proc, updateTime time.Time) (*IDInfo, CollectErrors) {
var cerrs CollectErrors
procID, err := proc.GetProcID()
if err != nil {
if t.debug {
log.Printf("error getting proc ID for pid %+v: %v", proc.GetPid(), err)
}
return nil, cerrs
}
// Do nothing if we're ignoring this proc.
last, known := t.tracked[procID]
if known && last == nil {
return nil, cerrs
}
metrics, softerrors, err := proc.GetMetrics()
if err != nil {
if t.debug {
log.Printf("error reading metrics for %+v: %v", procID, err)
}
// This usually happens due to the proc having exited, i.e.
// we lost the race. We don't count that as an error.
if err != ErrProcNotExist {
cerrs.Read++
}
return nil, cerrs
}
var threads []Thread
threads, err = proc.GetThreads()
if err != nil {
if t.debug {
log.Printf("can't read thread metrics for %+v: %v", procID, err)
}
softerrors |= 1
}
cerrs.Partial += softerrors
if len(threads) > 0 {
metrics.Counts.CtxSwitchNonvoluntary, metrics.Counts.CtxSwitchVoluntary = 0, 0
for _, thread := range threads {
metrics.Counts.CtxSwitchNonvoluntary += thread.Counts.CtxSwitchNonvoluntary
metrics.Counts.CtxSwitchVoluntary += thread.Counts.CtxSwitchVoluntary
metrics.States.Add(thread.States)
}
}
var newProc *IDInfo
if known {
last.update(metrics, updateTime, &cerrs, threads)
} else {
static, err := proc.GetStatic()
if err != nil {
if t.debug {
log.Printf("error reading static details for %+v: %v", procID, err)
}
return nil, cerrs
}
newProc = &IDInfo{procID, static, metrics, threads}
if t.debug {
log.Printf("found new proc: %s", newProc)
}
// Is this a new process with the same pid as one we already know?
// Then delete it from the known map, otherwise the cleanup in Update()
// will remove the ProcIds entry we're creating here.
if oldProcID, ok := t.procIds[procID.Pid]; ok {
delete(t.tracked, oldProcID)
}
t.procIds[procID.Pid] = procID
}
return newProc, cerrs
}
// update scans procs and updates metrics for those which are tracked. Processes
// that have gone away get removed from the Tracked map. New processes are
// returned, along with the count of nonfatal errors.
func (t *Tracker) update(procs Iter) ([]IDInfo, CollectErrors, error) {
var newProcs []IDInfo
var colErrs CollectErrors
var now = time.Now()
for procs.Next() {
newProc, cerrs := t.handleProc(procs, now)
if newProc != nil {
newProcs = append(newProcs, *newProc)
}
colErrs.Read += cerrs.Read
colErrs.Partial += cerrs.Partial
}
err := procs.Close()
if err != nil {
return nil, colErrs, fmt.Errorf("Error reading procs: %v", err)
}
// Rather than allocating a new map each time to detect procs that have
// disappeared, we bump the last update time on those that are still
// present. Then as a second pass we traverse the map looking for
// stale procs and removing them.
for procID, pinfo := range t.tracked {
if pinfo == nil {
// TODO is this a bug? we're not tracking the proc so we don't see it go away so ProcIds
// and Tracked are leaking?
continue
}
if pinfo.lastUpdate != now {
delete(t.tracked, procID)
delete(t.procIds, procID.Pid)
}
}
return newProcs, colErrs, nil
}
// checkAncestry walks the process tree recursively towards the root,
// stopping at pid 1 or upon finding a parent that's already tracked
// or ignored. If we find a tracked parent track this one too; if not,
// ignore this one.
func (t *Tracker) checkAncestry(idinfo IDInfo, newprocs map[ID]IDInfo) string {
ppid := idinfo.ParentPid
pProcID := t.procIds[ppid]
if pProcID.Pid < 1 {
if t.debug {
log.Printf("ignoring unmatched proc with no matched parent: %+v", idinfo)
}
// Reached root of process tree without finding a tracked parent.
t.ignore(idinfo.ID)
return ""
}
// Is the parent already known to the tracker?
if ptproc, ok := t.tracked[pProcID]; ok {
if ptproc != nil {
if t.debug {
log.Printf("matched as %q because child of %+v: %+v",
ptproc.groupName, pProcID, idinfo)
}
// We've found a tracked parent.
t.track(ptproc.groupName, idinfo)
return ptproc.groupName
}
// We've found an untracked parent.
t.ignore(idinfo.ID)
return ""
}
// Is the parent another new process?
if pinfoid, ok := newprocs[pProcID]; ok {
if name := t.checkAncestry(pinfoid, newprocs); name != "" {
if t.debug {
log.Printf("matched as %q because child of %+v: %+v",
name, pProcID, idinfo)
}
// We've found a tracked parent, which implies this entire lineage should be tracked.
t.track(name, idinfo)
return name
}
}
// Parent is dead, i.e. we never saw it, or there's no tracked proc in our ancestry.
if t.debug {
log.Printf("ignoring unmatched proc with no matched parent: %+v", idinfo)
}
t.ignore(idinfo.ID)
return ""
}
func (t *Tracker) lookupUid(uid int) string {
if name, ok := t.username[uid]; ok {
return name
}
var name string
uidstr := strconv.Itoa(uid)
u, err := user.LookupId(uidstr)
if err != nil {
name = uidstr
} else {
name = u.Username
}
t.username[uid] = name
return name
}
// Update modifies the tracker's internal state based on what it reads from
// iter. Tracks any new procs the namer wants tracked, and updates
// its metrics for existing tracked procs. Returns nonfatal errors
// and the status of all tracked procs, or an error if fatal.
func (t *Tracker) Update(iter Iter) (CollectErrors, []Update, error) {
newProcs, colErrs, err := t.update(iter)
if err != nil {
return colErrs, nil, err
}
// Step 1: track any new proc that should be tracked based on its name and cmdline.
untracked := make(map[ID]IDInfo)
for _, idinfo := range newProcs {
nacl := common.ProcAttributes{
Name: idinfo.Name,
Cmdline: idinfo.Cmdline,
Username: t.lookupUid(idinfo.EffectiveUID),
PID: idinfo.Pid,
StartTime: idinfo.StartTime,
}
wanted, gname := t.namer.MatchAndName(nacl)
if wanted {
if t.debug {
log.Printf("matched as %q: %+v", gname, idinfo)
}
t.track(gname, idinfo)
} else {
untracked[idinfo.ID] = idinfo
}
}
// Step 2: track any untracked new proc that should be tracked because its parent is tracked.
if t.trackChildren {
for _, idinfo := range untracked {
if _, ok := t.tracked[idinfo.ID]; ok {
// Already tracked or ignored in an earlier iteration
continue
}
t.checkAncestry(idinfo, untracked)
}
}
tp := []Update{}
for _, tproc := range t.tracked {
if tproc != nil {
tp = append(tp, tproc.getUpdate())
}
}
return colErrs, tp, nil
}