From 69d61be9fd52d4ba036c68614e72d3f7a3044f89 Mon Sep 17 00:00:00 2001 From: djedruszczak <93140157+djedruszczak@users.noreply.github.com> Date: Thu, 11 Nov 2021 10:14:17 -0500 Subject: [PATCH 01/12] Add tool for reporting out of order/duplicate metrics grouped by name or tag (#112) Adds mt-kafka-mdm-report-out-of-order, which consumes metrics from kafka and discovers those which are out of order or duplicates. It then groups these metrics by name or a specific tag using an index built from Cassandra, and outputs the results. --- .../aggregate.go | 50 ++++++ cmd/mt-kafka-mdm-report-out-of-order/flags.go | 146 ++++++++++++++++++ .../inputooofinder.go | 130 ++++++++++++++++ cmd/mt-kafka-mdm-report-out-of-order/main.go | 131 ++++++++++++++++ 4 files changed, 457 insertions(+) create mode 100644 cmd/mt-kafka-mdm-report-out-of-order/aggregate.go create mode 100644 cmd/mt-kafka-mdm-report-out-of-order/flags.go create mode 100644 cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go create mode 100644 cmd/mt-kafka-mdm-report-out-of-order/main.go diff --git a/cmd/mt-kafka-mdm-report-out-of-order/aggregate.go b/cmd/mt-kafka-mdm-report-out-of-order/aggregate.go new file mode 100644 index 000000000..260f9dacf --- /dev/null +++ b/cmd/mt-kafka-mdm-report-out-of-order/aggregate.go @@ -0,0 +1,50 @@ +package main + +import ( + "strings" + + log "github.com/sirupsen/logrus" +) + +type Aggregate struct { + Count int + OutOfOrderCount int + DuplicateCount int +} + +func aggregateByName(tracker Tracker) map[string]Aggregate { + aggregates := map[string]Aggregate{} + + for _, track := range tracker { + aggregate, _ := aggregates[track.Name] + aggregate.Count += track.Count + aggregate.OutOfOrderCount += track.OutOfOrderCount + aggregate.DuplicateCount += track.DuplicateCount + aggregates[track.Name] = aggregate + } + + return aggregates +} + +func aggregateByTag(tracker Tracker, groupByTag string) map[string]Aggregate { + aggregates := map[string]Aggregate{} + + for _, track := range tracker { + for _, tag := range track.Tags { + kv := strings.Split(tag, "=") + if len(kv) != 2 { + log.Errorf("unexpected tag encoding for metric with name=%q tag=%q", track.Name, tag) + continue + } + if kv[0] == groupByTag { + aggregate, _ := aggregates[kv[1]] + aggregate.Count += track.Count + aggregate.OutOfOrderCount += track.OutOfOrderCount + aggregate.DuplicateCount += track.DuplicateCount + aggregates[kv[1]] = aggregate + } + } + } + + return aggregates +} diff --git a/cmd/mt-kafka-mdm-report-out-of-order/flags.go b/cmd/mt-kafka-mdm-report-out-of-order/flags.go new file mode 100644 index 000000000..7645c13b1 --- /dev/null +++ b/cmd/mt-kafka-mdm-report-out-of-order/flags.go @@ -0,0 +1,146 @@ +package main + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/grafana/globalconf" + "github.com/grafana/metrictank/idx/cassandra" + inKafkaMdm "github.com/grafana/metrictank/input/kafkamdm" + log "github.com/sirupsen/logrus" +) + +type Flags struct { + flagSet *flag.FlagSet + + RunDuration time.Duration + Config string + PartitionFrom int + PartitionTo int + ReorderWindow uint + Prefix string + Substr string + GroupByName bool + GroupByTag string +} + +func NewFlags() *Flags { + var flags Flags + + flags.flagSet = flag.NewFlagSet("application flags", flag.ExitOnError) + flags.flagSet.DurationVar(&flags.RunDuration, "run-duration", 5*time.Minute, "the duration of time to run the program") + flags.flagSet.StringVar(&flags.Config, "config", "/etc/metrictank/metrictank.ini", "configuration file path") + flags.flagSet.IntVar(&flags.PartitionFrom, "partition-from", 0, "the partition to load the index from") + flags.flagSet.IntVar(&flags.PartitionTo, "partition-to", -1, "load the index from all partitions up to this one (exclusive). If unset, only the partition defined with \"--partition-from\" is loaded from") + flags.flagSet.UintVar(&flags.ReorderWindow, "reorder-window", 1, "the size of the reorder buffer window") + flags.flagSet.StringVar(&flags.Prefix, "prefix", "", "only report metrics with a name that has this prefix") + flags.flagSet.StringVar(&flags.Substr, "substr", "", "only report metrics with a name that has this substring") + flags.flagSet.BoolVar(&flags.GroupByName, "group-by-name", false, "group out-of-order metrics by name") + flags.flagSet.StringVar(&flags.GroupByTag, "group-by-tag", "", "group out-of-order metrics by the specified tag") + + flags.flagSet.Usage = flags.Usage + return &flags +} + +func (flags *Flags) Parse(args []string) { + err := flags.flagSet.Parse(args) + if err != nil { + log.Fatalf("failed to parse application flags %v: %s", args, err.Error) + os.Exit(1) + } + + path := "" + if _, err := os.Stat(flags.Config); err == nil { + path = flags.Config + } + config, err := globalconf.NewWithOptions(&globalconf.Options{ + Filename: path, + EnvPrefix: "MT_", + }) + if err != nil { + log.Fatalf("error with configuration file: %s", err.Error()) + os.Exit(1) + } + _ = cassandra.ConfigSetup() + inKafkaMdm.ConfigSetup() + config.Parse() + + if flags.GroupByName == false && flags.GroupByTag == "" { + log.Fatalf("must specify at least one of -group-by-name or -group-by-tag") + os.Exit(1) + } + + if flags.ReorderWindow < 1 { + log.Fatalf("-reorder-window must be greater than zero") + os.Exit(1) + } +} + +func (flags *Flags) Usage() { + fmt.Fprintln(os.Stderr, "mt-kafka-mdm-report-out-of-order") + fmt.Fprintln(os.Stderr) + fmt.Fprintln(os.Stderr, "Inspects what's flowing through kafka (in mdm format) and reports out of order data grouped by metric name or tag, taking into account the reorder buffer)") + fmt.Fprintln(os.Stderr) + fmt.Fprintln(os.Stderr, "# Mechanism") + fmt.Fprintln(os.Stderr, "* it sniffs points being added on a per-series (metric Id) level") + fmt.Fprintln(os.Stderr, "* for every series, tracks the last 'correct' point. E.g. a point that was able to be added to the series because its timestamp is higher than any previous timestamp") + fmt.Fprintln(os.Stderr, "* if for any series, a point comes in with a timestamp equal or lower than the last point correct point - which metrictank would not add unless it falls within the reorder buffer - it triggers an event for this out-of-order point") + fmt.Fprintln(os.Stderr, "* the reorder buffer is described by the window size") + fmt.Fprintln(os.Stderr, "Usage:") + fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order [flags]") + fmt.Fprintln(os.Stderr) + fmt.Fprintln(os.Stderr, "Example output:") + fmt.Fprintln(os.Stderr) + fmt.Fprintln(os.Stderr, " total metric points count=2710806") + fmt.Fprintln(os.Stderr, " total out-of-order metric points count=3878") + fmt.Fprintln(os.Stderr, " out-of-order metric points grouped by name:") + fmt.Fprintln(os.Stderr, " out-of-order metric points for name=\"fruit.weight\" count=4 percentGroup=4.301075 percentClass=0.096131 percentTotal=0.000129") + fmt.Fprintln(os.Stderr, " out-of-order metric points for name=\"fruit.height\" count=1 percentGroup=4.545455 percentClass=0.024033 percentTotal=0.000032") + fmt.Fprintln(os.Stderr, " ...") + fmt.Fprintln(os.Stderr, " out-of-order metric points grouped by tag=\"fruit\":") + fmt.Fprintln(os.Stderr, " out-of-order metric points for tag=\"fruit\" value=\"apple\" count=80 percentGroup=5.856515 percentClass=2.062919 percentTotal=0.002951") + fmt.Fprintln(os.Stderr, " out-of-order metric points for tag=\"fruit\" value=\"orange\" count=2912 percentGroup=0.306267 percentClass=75.090253 percentTotal=0.107422") + fmt.Fprintln(os.Stderr, " ...") + fmt.Fprintln(os.Stderr, " total duplicate metric points count=12760") + fmt.Fprintln(os.Stderr, " duplicate metric points grouped by name:") + fmt.Fprintln(os.Stderr, " duplicate metric points for name=\"fruit.width\" count=105 percentGroup=19.266055 percentClass=0.760704 percentTotal=0.003397") + fmt.Fprintln(os.Stderr, " duplicate metric points for name=\"fruit.length\" count=123 percentGroup=15.688776 percentClass=0.891111 percentTotal=0.003979") + fmt.Fprintln(os.Stderr, " ...") + fmt.Fprintln(os.Stderr, " duplicate metric points grouped by tag=\"fruit\":") + fmt.Fprintln(os.Stderr, " duplicate metric points for tag=\"fruit\" value=\"banana\" count=4002 percentGroup=17.201066 percentClass=31.363636 percentTotal=0.147631") + fmt.Fprintln(os.Stderr, " duplicate metric points for tag=\"fruit\" value=\"orange\" count=4796 percentGroup=0.504415 percentClass=37.586207 percentTotal=0.176922") + fmt.Fprintln(os.Stderr, " ...") + fmt.Fprintln(os.Stderr) + fmt.Fprintln(os.Stderr, "Fields:") + fmt.Fprintln(os.Stderr) + fmt.Fprintln(os.Stderr, " name: the name of the metric (when grouped by name)") + fmt.Fprintln(os.Stderr, " tag: the tag key (when grouped by tag)") + fmt.Fprintln(os.Stderr, " value: the tag value (when grouped by tag)") + fmt.Fprintln(os.Stderr, " count: the number of metric points") + fmt.Fprintln(os.Stderr, " the example above shows that 4002 metric points that had tag \"fruit\"=\"banana\" were duplicates") + fmt.Fprintln(os.Stderr, " percentGroup: the percentage of all of the metric points which had the same name/tag (depending on grouping) that were out of order/duplicates (depending on classification)") + fmt.Fprintln(os.Stderr, " the example above shows that ~4.301% of all metric points with name \"fruit.weight\" were out of order") + fmt.Fprintln(os.Stderr, " percentClass: the percentage of all of the metric points which were out of order/duplicates (depending on classification) that had this name/tag (depending on grouping)") + fmt.Fprintln(os.Stderr, " the example above shows that ~2.063% of all metric points that were out of order had tag fruit=apple") + fmt.Fprintln(os.Stderr, " percentTotal: the percentage of all metric points that had this name/tag (depending on grouping) and were out of order/duplicates (depending on classification)") + fmt.Fprintln(os.Stderr, " the example above shows that ~0.177% of all metric points had tag \"fruit\"=\"orange\" and were duplicates") + fmt.Fprintln(os.Stderr) + fmt.Fprintln(os.Stderr, "flags:") + flags.flagSet.PrintDefaults() + fmt.Fprintln(os.Stderr) + fmt.Fprintln(os.Stderr, "EXAMPLES:") + fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order -group-by-name -config metrictank.ini -partition-from 0") + fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order -group-by-name -group-by-tag namespace -config metrictank.ini -partition-from 0 -partition-to 3 -reorder-window 5 -run-duration 5m") +} + +func ParseFlags() Flags { + flags := NewFlags() + + flag.Usage = flags.Usage + + flags.Parse(os.Args[1:]) + + return *flags +} diff --git a/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go new file mode 100644 index 000000000..5f1c31df4 --- /dev/null +++ b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go @@ -0,0 +1,130 @@ +package main + +import ( + "os" + "sync" + "time" + + "github.com/grafana/metrictank/idx/cassandra" + "github.com/grafana/metrictank/mdata" + "github.com/grafana/metrictank/mdata/errors" + "github.com/grafana/metrictank/schema" + "github.com/grafana/metrictank/schema/msg" + log "github.com/sirupsen/logrus" +) + +type Track struct { + Name string + Tags []string + + reorderBuffer *mdata.ReorderBuffer + + Count int + OutOfOrderCount int + DuplicateCount int +} + +type Tracker map[schema.MKey]Track + +// find out-of-order and duplicate metrics +type inputOOOFinder struct { + reorderWindow uint32 + tracker Tracker + + lock sync.Mutex +} + +func (ip inputOOOFinder) Tracker() Tracker { + return ip.tracker +} + +func newInputOOOFinder(partitionFrom int, partitionTo int, reorderWindow uint32) *inputOOOFinder { + cassandraIndex := cassandra.New(cassandra.CliConfig) + err := cassandraIndex.InitBare() + if err != nil { + log.Fatalf("error initializing cassandra index: %s", err.Error()) + os.Exit(1) + } + + metricDefinitions := make([]schema.MetricDefinition, 0) + for partition := partitionFrom; (partitionTo == -1 && partition == partitionFrom) || (partitionTo > 0 && partition < partitionTo); partition++ { + metricDefinitions = cassandraIndex.LoadPartitions([]int32{int32(partition)}, metricDefinitions, time.Now()) + } + + tracker := Tracker{} + for _, metricDefinition := range metricDefinitions { + tracker[metricDefinition.Id] = Track{ + Name: metricDefinition.Name, + Tags: metricDefinition.Tags, + reorderBuffer: mdata.NewReorderBuffer(reorderWindow, uint32(metricDefinition.Interval), false), + Count: 0, + OutOfOrderCount: 0, + DuplicateCount: 0, + } + } + + return &inputOOOFinder{ + reorderWindow: reorderWindow, + tracker: tracker, + + lock: sync.Mutex{}, + } +} + +func (ip *inputOOOFinder) incrementCounts(metricKey schema.MKey, metricTime int64, track Track, partition int32) { + track.Count++ + + _, err := track.reorderBuffer.Add(uint32(metricTime), 0) // ignore value + if err == errors.ErrMetricTooOld { + track.OutOfOrderCount++ + } else if err == errors.ErrMetricNewValueForTimestamp { + track.DuplicateCount++ + } else if err != nil { + log.Errorf("failed to add metric with Name=%q and timestamp=%d from partition=%d to reorder buffer: %s", track.Name, metricTime, partition, err) + return + } + + ip.tracker[metricKey] = track +} + +func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition int32) { + metricKey, err := schema.MKeyFromString(metric.Id) + if err != nil { + log.Errorf("failed to get metric key from id=%v: %s", metric.Id, err.Error()) + return + } + + ip.lock.Lock() + defer ip.lock.Unlock() + + track, exists := ip.tracker[metricKey] + if !exists { + ip.tracker[metricKey] = Track{ + Name: metric.Name, + Tags: metric.Tags, + reorderBuffer: mdata.NewReorderBuffer(ip.reorderWindow, uint32(metric.Interval), false), + Count: 0, + OutOfOrderCount: 0, + DuplicateCount: 0, + } + } + + ip.incrementCounts(metricKey, metric.Time, track, partition) +} + +func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, format msg.Format, partition int32) { + ip.lock.Lock() + defer ip.lock.Unlock() + + track, exists := ip.tracker[mp.MKey] + if !exists { + log.Errorf("track for metric with key=%v from partition=%d not found", mp.MKey, partition) + return + } + + ip.incrementCounts(mp.MKey, int64(mp.Time), track, partition) +} + +func (ip *inputOOOFinder) ProcessIndexControlMsg(msg schema.ControlMsg, partition int32) { + +} diff --git a/cmd/mt-kafka-mdm-report-out-of-order/main.go b/cmd/mt-kafka-mdm-report-out-of-order/main.go new file mode 100644 index 000000000..029f419e4 --- /dev/null +++ b/cmd/mt-kafka-mdm-report-out-of-order/main.go @@ -0,0 +1,131 @@ +package main + +import ( + "context" + "math/rand" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + inKafkaMdm "github.com/grafana/metrictank/input/kafkamdm" + "github.com/grafana/metrictank/logger" + log "github.com/sirupsen/logrus" +) + +func configureLogging() { + formatter := &logger.TextFormatter{} + formatter.TimestampFormat = "2006-01-02 15:04:05.000" + log.SetFormatter(formatter) + log.SetLevel(log.InfoLevel) +} + +func filter(tracker Tracker, prefix string, substr string) { + for key, track := range tracker { + if prefix != "" && !strings.HasPrefix(track.Name, prefix) { + delete(tracker, key) + } + if substr != "" && !strings.Contains(track.Name, substr) { + delete(tracker, key) + } + } +} + +func asPercent(numerator int, denominator int) float32 { + return float32(numerator) / float32(denominator) * 100 +} + +func aggregateAndLog(tracker Tracker, groupByName bool, groupByTag string) { + count := 0 + outOfOrderCount := 0 + duplicateCount := 0 + for _, track := range tracker { + count += track.Count + outOfOrderCount += track.OutOfOrderCount + duplicateCount += track.DuplicateCount + } + + var aggregatedByName map[string]Aggregate + if groupByName { + aggregatedByName = aggregateByName(tracker) + } + + var aggregatedByTag map[string]Aggregate + if groupByTag != "" { + aggregatedByTag = aggregateByTag(tracker, groupByTag) + } + + log.Infof("total metric points count=%d", count) + + log.Infof("total out-of-order metric points count=%d", outOfOrderCount) + if groupByName && outOfOrderCount > 0 { + log.Info("out-of-order metric points grouped by name:") + for name, aggregate := range aggregatedByName { + if aggregate.OutOfOrderCount > 0 { + log.Infof("out-of-order metric points for name=%q count=%d percentGroup=%f percentClass=%f percentTotal=%f", name, aggregate.OutOfOrderCount, asPercent(aggregate.OutOfOrderCount, aggregate.Count), asPercent(aggregate.OutOfOrderCount, outOfOrderCount), asPercent(aggregate.OutOfOrderCount, count)) + } + } + } + if groupByTag != "" && outOfOrderCount > 0 { + log.Infof("out-of-order metric points grouped by tag=%q:", groupByTag) + for tag, aggregate := range aggregatedByTag { + if aggregate.OutOfOrderCount > 0 { + log.Infof("out-of-order metric points for tag=%q value=%q count=%d percentGroup=%f percentClass=%f percentTotal=%f", groupByTag, tag, aggregate.OutOfOrderCount, asPercent(aggregate.OutOfOrderCount, aggregate.Count), asPercent(aggregate.OutOfOrderCount, outOfOrderCount), asPercent(aggregate.OutOfOrderCount, count)) + } + } + } + + log.Infof("total duplicate metric points count=%d", duplicateCount) + if groupByName && duplicateCount > 0 { + log.Info("duplicate metric points grouped by name:") + for name, aggregate := range aggregatedByName { + if aggregate.DuplicateCount > 0 { + log.Infof("duplicate metric points for name=%q count=%d percentGroup=%f percentClass=%f percentTotal=%f", name, aggregate.DuplicateCount, asPercent(aggregate.DuplicateCount, aggregate.Count), asPercent(aggregate.DuplicateCount, duplicateCount), asPercent(aggregate.DuplicateCount, count)) + } + } + } + if groupByTag != "" && duplicateCount > 0 { + log.Infof("duplicate metric points grouped by tag=%q:", groupByTag) + for tag, aggregate := range aggregatedByTag { + if aggregate.DuplicateCount > 0 { + log.Infof("duplicate metric points for tag=%q value=%q count=%d percentGroup=%f percentClass=%f percentTotal=%f", groupByTag, tag, aggregate.DuplicateCount, asPercent(aggregate.DuplicateCount, aggregate.Count), asPercent(aggregate.DuplicateCount, duplicateCount), asPercent(aggregate.DuplicateCount, count)) + } + } + } +} + +func main() { + configureLogging() + + flags := ParseFlags() + + inKafkaMdm.ConfigProcess("mt-kafka-mdm-report-out-of-order" + strconv.Itoa(rand.Int())) + kafkaMdm := inKafkaMdm.New() + + inputOOOFinder := newInputOOOFinder( + flags.PartitionFrom, + flags.PartitionTo, + uint32(flags.ReorderWindow), + ) + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + ctx, cancel := context.WithCancel(context.Background()) + kafkaMdm.Start(inputOOOFinder, cancel) + select { + case sig := <-sigChan: + log.Infof("Received signal %q. Shutting down", sig) + case <-ctx.Done(): + log.Info("Mdm input plugin signalled a fatal error. Shutting down") + case <-time.After(flags.RunDuration): + log.Infof("Finished scanning") + } + kafkaMdm.Stop() + + tracker := inputOOOFinder.Tracker() + filter(tracker, flags.Prefix, flags.Substr) + aggregateAndLog(tracker, flags.GroupByName, flags.GroupByTag) +} From e5f9b620592b31201de5930f063861f8bee70785 Mon Sep 17 00:00:00 2001 From: Dominik Jedruszczak Date: Fri, 12 Nov 2021 10:26:49 -0500 Subject: [PATCH 02/12] Update tool documentation --- docs/tools.md | 97 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 87 insertions(+), 10 deletions(-) diff --git a/docs/tools.md b/docs/tools.md index b7fb7c1c3..2c9f3e1f0 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -407,16 +407,6 @@ reads rowkeys from stdin and deletes them from the index. only BigTable is suppo ``` -## mt-indexdump-rules-analyzer - -``` -Usage of ./mt-indexdump-rules-analyzer: -reads metric names from stdin and reports the number of metrics that match each index-rules.conf rule - -index-rules-file string - name of file which defines the max-stale times (default "/etc/metrictank/index-rules.conf") -``` - - ## mt-index-migrate ``` @@ -538,6 +528,93 @@ mt-index-prune --verbose --partition-from 0 --partition-to 8 cass -hosts cassand ``` +## mt-indexdump-rules-analyzer + +``` +Usage of ./mt-indexdump-rules-analyzer: +reads metric names from stdin and reports the number of metrics that match each index-rules.conf rule + -index-rules-file string + name of file which defines the max-stale times (default "/etc/metrictank/index-rules.conf") +``` + + +## mt-kafka-mdm-report-out-of-order + +``` +mt-kafka-mdm-report-out-of-order + +Inspects what's flowing through kafka (in mdm format) and reports out of order data grouped by metric name or tag, taking into account the reorder buffer) + +# Mechanism +* it sniffs points being added on a per-series (metric Id) level +* for every series, tracks the last 'correct' point. E.g. a point that was able to be added to the series because its timestamp is higher than any previous timestamp +* if for any series, a point comes in with a timestamp equal or lower than the last point correct point - which metrictank would not add unless it falls within the reorder buffer - it triggers an event for this out-of-order point +* the reorder buffer is described by the window size +Usage: + mt-kafka-mdm-report-out-of-order [flags] + +Example output: + + total metric points count=2710806 + total out-of-order metric points count=3878 + out-of-order metric points grouped by name: + out-of-order metric points for name="fruit.weight" count=4 percentGroup=4.301075 percentClass=0.096131 percentTotal=0.000129 + out-of-order metric points for name="fruit.height" count=1 percentGroup=4.545455 percentClass=0.024033 percentTotal=0.000032 + ... + out-of-order metric points grouped by tag="fruit": + out-of-order metric points for tag="fruit" value="apple" count=80 percentGroup=5.856515 percentClass=2.062919 percentTotal=0.002951 + out-of-order metric points for tag="fruit" value="orange" count=2912 percentGroup=0.306267 percentClass=75.090253 percentTotal=0.107422 + ... + total duplicate metric points count=12760 + duplicate metric points grouped by name: + duplicate metric points for name="fruit.width" count=105 percentGroup=19.266055 percentClass=0.760704 percentTotal=0.003397 + duplicate metric points for name="fruit.length" count=123 percentGroup=15.688776 percentClass=0.891111 percentTotal=0.003979 + ... + duplicate metric points grouped by tag="fruit": + duplicate metric points for tag="fruit" value="banana" count=4002 percentGroup=17.201066 percentClass=31.363636 percentTotal=0.147631 + duplicate metric points for tag="fruit" value="orange" count=4796 percentGroup=0.504415 percentClass=37.586207 percentTotal=0.176922 + ... + +Fields: + + name: the name of the metric (when grouped by name) + tag: the tag key (when grouped by tag) + value: the tag value (when grouped by tag) + count: the number of metric points + the example above shows that 4002 metric points that had tag "fruit"="banana" were duplicates + percentGroup: the percentage of all of the metric points which had the same name/tag (depending on grouping) that were out of order/duplicates (depending on classification) + the example above shows that ~4.301% of all metric points with name "fruit.weight" were out of order + percentClass: the percentage of all of the metric points which were out of order/duplicates (depending on classification) that had this name/tag (depending on grouping) + the example above shows that ~2.063% of all metric points that were out of order had tag fruit=apple + percentTotal: the percentage of all metric points that had this name/tag (depending on grouping) and were out of order/duplicates (depending on classification) + the example above shows that ~0.177% of all metric points had tag "fruit"="orange" and were duplicates + +flags: + -config string + configuration file path (default "/etc/metrictank/metrictank.ini") + -group-by-name + group out-of-order metrics by name + -group-by-tag string + group out-of-order metrics by the specified tag + -partition-from int + the partition to load the index from + -partition-to int + load the index from all partitions up to this one (exclusive). If unset, only the partition defined with "--partition-from" is loaded from (default -1) + -prefix string + only report metrics with a name that has this prefix + -reorder-window uint + the size of the reorder buffer window (default 1) + -run-duration duration + the duration of time to run the program (default 5m0s) + -substr string + only report metrics with a name that has this substring + +EXAMPLES: + mt-kafka-mdm-report-out-of-order -group-by-name -config metrictank.ini -partition-from 0 + mt-kafka-mdm-report-out-of-order -group-by-name -group-by-tag namespace -config metrictank.ini -partition-from 0 -partition-to 3 -reorder-window 5 -run-duration 5m +``` + + ## mt-kafka-mdm-sniff ``` From 3243f89a69bafa382226840da75f9e54e07b0cc1 Mon Sep 17 00:00:00 2001 From: Dominik Jedruszczak Date: Fri, 12 Nov 2021 10:38:01 -0500 Subject: [PATCH 03/12] Restructure files to match contributing guidelines --- cmd/mt-kafka-mdm-report-out-of-order/flags.go | 20 +++--- .../inputooofinder.go | 66 ++++++++--------- cmd/mt-kafka-mdm-report-out-of-order/main.go | 72 +++++++++---------- 3 files changed, 79 insertions(+), 79 deletions(-) diff --git a/cmd/mt-kafka-mdm-report-out-of-order/flags.go b/cmd/mt-kafka-mdm-report-out-of-order/flags.go index 7645c13b1..f7890ec4a 100644 --- a/cmd/mt-kafka-mdm-report-out-of-order/flags.go +++ b/cmd/mt-kafka-mdm-report-out-of-order/flags.go @@ -12,6 +12,16 @@ import ( log "github.com/sirupsen/logrus" ) +func ParseFlags() Flags { + flags := NewFlags() + + flag.Usage = flags.Usage + + flags.Parse(os.Args[1:]) + + return *flags +} + type Flags struct { flagSet *flag.FlagSet @@ -134,13 +144,3 @@ func (flags *Flags) Usage() { fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order -group-by-name -config metrictank.ini -partition-from 0") fmt.Fprintln(os.Stderr, " mt-kafka-mdm-report-out-of-order -group-by-name -group-by-tag namespace -config metrictank.ini -partition-from 0 -partition-to 3 -reorder-window 5 -run-duration 5m") } - -func ParseFlags() Flags { - flags := NewFlags() - - flag.Usage = flags.Usage - - flags.Parse(os.Args[1:]) - - return *flags -} diff --git a/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go index 5f1c31df4..e6dbfeced 100644 --- a/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go +++ b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go @@ -13,19 +13,6 @@ import ( log "github.com/sirupsen/logrus" ) -type Track struct { - Name string - Tags []string - - reorderBuffer *mdata.ReorderBuffer - - Count int - OutOfOrderCount int - DuplicateCount int -} - -type Tracker map[schema.MKey]Track - // find out-of-order and duplicate metrics type inputOOOFinder struct { reorderWindow uint32 @@ -34,10 +21,6 @@ type inputOOOFinder struct { lock sync.Mutex } -func (ip inputOOOFinder) Tracker() Tracker { - return ip.tracker -} - func newInputOOOFinder(partitionFrom int, partitionTo int, reorderWindow uint32) *inputOOOFinder { cassandraIndex := cassandra.New(cassandra.CliConfig) err := cassandraIndex.InitBare() @@ -71,22 +54,6 @@ func newInputOOOFinder(partitionFrom int, partitionTo int, reorderWindow uint32) } } -func (ip *inputOOOFinder) incrementCounts(metricKey schema.MKey, metricTime int64, track Track, partition int32) { - track.Count++ - - _, err := track.reorderBuffer.Add(uint32(metricTime), 0) // ignore value - if err == errors.ErrMetricTooOld { - track.OutOfOrderCount++ - } else if err == errors.ErrMetricNewValueForTimestamp { - track.DuplicateCount++ - } else if err != nil { - log.Errorf("failed to add metric with Name=%q and timestamp=%d from partition=%d to reorder buffer: %s", track.Name, metricTime, partition, err) - return - } - - ip.tracker[metricKey] = track -} - func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition int32) { metricKey, err := schema.MKeyFromString(metric.Id) if err != nil { @@ -128,3 +95,36 @@ func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, format msg.F func (ip *inputOOOFinder) ProcessIndexControlMsg(msg schema.ControlMsg, partition int32) { } + +func (ip *inputOOOFinder) incrementCounts(metricKey schema.MKey, metricTime int64, track Track, partition int32) { + track.Count++ + + _, err := track.reorderBuffer.Add(uint32(metricTime), 0) // ignore value + if err == errors.ErrMetricTooOld { + track.OutOfOrderCount++ + } else if err == errors.ErrMetricNewValueForTimestamp { + track.DuplicateCount++ + } else if err != nil { + log.Errorf("failed to add metric with Name=%q and timestamp=%d from partition=%d to reorder buffer: %s", track.Name, metricTime, partition, err) + return + } + + ip.tracker[metricKey] = track +} + +func (ip inputOOOFinder) Tracker() Tracker { + return ip.tracker +} + +type Tracker map[schema.MKey]Track + +type Track struct { + Name string + Tags []string + + reorderBuffer *mdata.ReorderBuffer + + Count int + OutOfOrderCount int + DuplicateCount int +} diff --git a/cmd/mt-kafka-mdm-report-out-of-order/main.go b/cmd/mt-kafka-mdm-report-out-of-order/main.go index 029f419e4..30fd33f4c 100644 --- a/cmd/mt-kafka-mdm-report-out-of-order/main.go +++ b/cmd/mt-kafka-mdm-report-out-of-order/main.go @@ -15,6 +15,40 @@ import ( log "github.com/sirupsen/logrus" ) +func main() { + configureLogging() + + flags := ParseFlags() + + inKafkaMdm.ConfigProcess("mt-kafka-mdm-report-out-of-order" + strconv.Itoa(rand.Int())) + kafkaMdm := inKafkaMdm.New() + + inputOOOFinder := newInputOOOFinder( + flags.PartitionFrom, + flags.PartitionTo, + uint32(flags.ReorderWindow), + ) + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + ctx, cancel := context.WithCancel(context.Background()) + kafkaMdm.Start(inputOOOFinder, cancel) + select { + case sig := <-sigChan: + log.Infof("Received signal %q. Shutting down", sig) + case <-ctx.Done(): + log.Info("Mdm input plugin signalled a fatal error. Shutting down") + case <-time.After(flags.RunDuration): + log.Infof("Finished scanning") + } + kafkaMdm.Stop() + + tracker := inputOOOFinder.Tracker() + filter(tracker, flags.Prefix, flags.Substr) + aggregateAndLog(tracker, flags.GroupByName, flags.GroupByTag) +} + func configureLogging() { formatter := &logger.TextFormatter{} formatter.TimestampFormat = "2006-01-02 15:04:05.000" @@ -33,10 +67,6 @@ func filter(tracker Tracker, prefix string, substr string) { } } -func asPercent(numerator int, denominator int) float32 { - return float32(numerator) / float32(denominator) * 100 -} - func aggregateAndLog(tracker Tracker, groupByName bool, groupByTag string) { count := 0 outOfOrderCount := 0 @@ -96,36 +126,6 @@ func aggregateAndLog(tracker Tracker, groupByName bool, groupByTag string) { } } -func main() { - configureLogging() - - flags := ParseFlags() - - inKafkaMdm.ConfigProcess("mt-kafka-mdm-report-out-of-order" + strconv.Itoa(rand.Int())) - kafkaMdm := inKafkaMdm.New() - - inputOOOFinder := newInputOOOFinder( - flags.PartitionFrom, - flags.PartitionTo, - uint32(flags.ReorderWindow), - ) - - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - ctx, cancel := context.WithCancel(context.Background()) - kafkaMdm.Start(inputOOOFinder, cancel) - select { - case sig := <-sigChan: - log.Infof("Received signal %q. Shutting down", sig) - case <-ctx.Done(): - log.Info("Mdm input plugin signalled a fatal error. Shutting down") - case <-time.After(flags.RunDuration): - log.Infof("Finished scanning") - } - kafkaMdm.Stop() - - tracker := inputOOOFinder.Tracker() - filter(tracker, flags.Prefix, flags.Substr) - aggregateAndLog(tracker, flags.GroupByName, flags.GroupByTag) +func asPercent(numerator int, denominator int) float32 { + return float32(numerator) / float32(denominator) * 100 } From 01804443da8155e021f53b8b77a01bb4b4e61d53 Mon Sep 17 00:00:00 2001 From: Dominik Jedruszczak Date: Fri, 12 Nov 2021 11:15:38 -0500 Subject: [PATCH 04/12] Add mt-kafka-mdm-report-out-of-order to .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 05a7c9da3..917023736 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ /cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff /cmd/mt-keygen/mt-keygen /cmd/mt-parrot/mt-parrot +/cmd/mt-kafka-mdm-report-out-of-order/mt-kafka-mdm-report-out-of-order /cmd/mt-schemas-explain/mt-schemas-explain /cmd/mt-split-metrics-by-ttl/mt-split-metrics-by-ttl /cmd/mt-store-cat/mt-store-cat From 355c28c4b68223f038d8582905a9a3b951f3d7dc Mon Sep 17 00:00:00 2001 From: Dominik Jedruszczak Date: Fri, 12 Nov 2021 12:15:06 -0500 Subject: [PATCH 05/12] Fix .gitignore order --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 917023736..5e614631a 100644 --- a/.gitignore +++ b/.gitignore @@ -16,12 +16,12 @@ /cmd/mt-index-migrate/mt-index-migrate /cmd/mt-index-prune/mt-index-prune /cmd/mt-indexdump-rules-analyzer/mt-indexdump-rules-analyzer +/cmd/mt-kafka-mdm-report-out-of-order/mt-kafka-mdm-report-out-of-order /cmd/mt-kafka-mdm-sniff-out-of-order/mt-kafka-mdm-sniff-out-of-order /cmd/mt-kafka-mdm-sniff/mt-kafka-mdm-sniff /cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff /cmd/mt-keygen/mt-keygen /cmd/mt-parrot/mt-parrot -/cmd/mt-kafka-mdm-report-out-of-order/mt-kafka-mdm-report-out-of-order /cmd/mt-schemas-explain/mt-schemas-explain /cmd/mt-split-metrics-by-ttl/mt-split-metrics-by-ttl /cmd/mt-store-cat/mt-store-cat From bce2ae0b8d08d6bed9b041efc20bb6b9e18b5ab6 Mon Sep 17 00:00:00 2001 From: Dominik Jedruszczak Date: Mon, 29 Nov 2021 16:05:37 -0500 Subject: [PATCH 06/12] Fix go vet --- cmd/mt-kafka-mdm-report-out-of-order/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-kafka-mdm-report-out-of-order/flags.go b/cmd/mt-kafka-mdm-report-out-of-order/flags.go index f7890ec4a..a2415d18d 100644 --- a/cmd/mt-kafka-mdm-report-out-of-order/flags.go +++ b/cmd/mt-kafka-mdm-report-out-of-order/flags.go @@ -57,7 +57,7 @@ func NewFlags() *Flags { func (flags *Flags) Parse(args []string) { err := flags.flagSet.Parse(args) if err != nil { - log.Fatalf("failed to parse application flags %v: %s", args, err.Error) + log.Fatalf("failed to parse application flags %v: %s", args, err.Error()) os.Exit(1) } From c16f94f4b9dcfc0e96fb635023d87aa16131d826 Mon Sep 17 00:00:00 2001 From: Dominik Jedruszczak Date: Wed, 1 Dec 2021 12:30:58 -0500 Subject: [PATCH 07/12] Rebase master --- cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go index e6dbfeced..0e3e965b3 100644 --- a/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go +++ b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go @@ -99,7 +99,7 @@ func (ip *inputOOOFinder) ProcessIndexControlMsg(msg schema.ControlMsg, partitio func (ip *inputOOOFinder) incrementCounts(metricKey schema.MKey, metricTime int64, track Track, partition int32) { track.Count++ - _, err := track.reorderBuffer.Add(uint32(metricTime), 0) // ignore value + _, _, err := track.reorderBuffer.Add(uint32(metricTime), 0) // ignore value if err == errors.ErrMetricTooOld { track.OutOfOrderCount++ } else if err == errors.ErrMetricNewValueForTimestamp { From 32322d70ef083da1e17395b9c49fc13ec10d9d68 Mon Sep 17 00:00:00 2001 From: Dominik Jedruszczak Date: Thu, 2 Dec 2021 13:04:51 -0500 Subject: [PATCH 08/12] Manually reorder docs --- docs/tools.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/tools.md b/docs/tools.md index 2c9f3e1f0..2931297e0 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -407,6 +407,16 @@ reads rowkeys from stdin and deletes them from the index. only BigTable is suppo ``` +## mt-indexdump-rules-analyzer + +``` +Usage of ./mt-indexdump-rules-analyzer: +reads metric names from stdin and reports the number of metrics that match each index-rules.conf rule + -index-rules-file string + name of file which defines the max-stale times (default "/etc/metrictank/index-rules.conf") +``` + + ## mt-index-migrate ``` @@ -528,16 +538,6 @@ mt-index-prune --verbose --partition-from 0 --partition-to 8 cass -hosts cassand ``` -## mt-indexdump-rules-analyzer - -``` -Usage of ./mt-indexdump-rules-analyzer: -reads metric names from stdin and reports the number of metrics that match each index-rules.conf rule - -index-rules-file string - name of file which defines the max-stale times (default "/etc/metrictank/index-rules.conf") -``` - - ## mt-kafka-mdm-report-out-of-order ``` From 6a4545e029bad34dbc11ef4c5a3e403aaf852faf Mon Sep 17 00:00:00 2001 From: Dominik Jedruszczak Date: Fri, 10 Dec 2021 16:12:00 -0500 Subject: [PATCH 09/12] Fix segmentation fault --- cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go index 0e3e965b3..f2a8e614f 100644 --- a/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go +++ b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go @@ -66,7 +66,7 @@ func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition track, exists := ip.tracker[metricKey] if !exists { - ip.tracker[metricKey] = Track{ + track = Track{ Name: metric.Name, Tags: metric.Tags, reorderBuffer: mdata.NewReorderBuffer(ip.reorderWindow, uint32(metric.Interval), false), From 7a32b2a05a2b82e46c2570352300735689274635 Mon Sep 17 00:00:00 2001 From: djedruszczak <93140157+djedruszczak@users.noreply.github.com> Date: Mon, 9 May 2022 14:17:27 -0400 Subject: [PATCH 10/12] Update cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go Co-authored-by: Dieter Plaetinck --- cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go index f2a8e614f..581c905a9 100644 --- a/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go +++ b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go @@ -56,6 +56,7 @@ func newInputOOOFinder(partitionFrom int, partitionTo int, reorderWindow uint32) func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition int32) { metricKey, err := schema.MKeyFromString(metric.Id) + // we loaded all definitions from cassandra at construction time, and any new MetricData in kafka should always be preceeded by the corresponding MetricDefinition (in the same kafka topic), so this should never happen if err != nil { log.Errorf("failed to get metric key from id=%v: %s", metric.Id, err.Error()) return From ea8a4346883ce40bd46f5aa608cb88d409d57aae Mon Sep 17 00:00:00 2001 From: djedruszczak Date: Mon, 9 May 2022 14:19:17 -0400 Subject: [PATCH 11/12] Gofmt --- cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go index 581c905a9..9819eeeac 100644 --- a/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go +++ b/cmd/mt-kafka-mdm-report-out-of-order/inputooofinder.go @@ -56,7 +56,7 @@ func newInputOOOFinder(partitionFrom int, partitionTo int, reorderWindow uint32) func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition int32) { metricKey, err := schema.MKeyFromString(metric.Id) - // we loaded all definitions from cassandra at construction time, and any new MetricData in kafka should always be preceeded by the corresponding MetricDefinition (in the same kafka topic), so this should never happen + // we loaded all definitions from cassandra at construction time, and any new MetricData in kafka should always be preceeded by the corresponding MetricDefinition (in the same kafka topic), so this should never happen if err != nil { log.Errorf("failed to get metric key from id=%v: %s", metric.Id, err.Error()) return From 9b82dcef2f7457ce0b677c1c7849492226ca74e8 Mon Sep 17 00:00:00 2001 From: djedruszczak <93140157+djedruszczak@users.noreply.github.com> Date: Mon, 9 May 2022 14:23:29 -0400 Subject: [PATCH 12/12] Update docs/tools.md Co-authored-by: Dieter Plaetinck --- docs/tools.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tools.md b/docs/tools.md index 2931297e0..9e2598c01 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -549,7 +549,7 @@ Inspects what's flowing through kafka (in mdm format) and reports out of order d * it sniffs points being added on a per-series (metric Id) level * for every series, tracks the last 'correct' point. E.g. a point that was able to be added to the series because its timestamp is higher than any previous timestamp * if for any series, a point comes in with a timestamp equal or lower than the last point correct point - which metrictank would not add unless it falls within the reorder buffer - it triggers an event for this out-of-order point -* the reorder buffer is described by the window size +* unlike metrictank where different metric matching patterns can result in different reorder buffers, this tool uses a single reorder buffer size for all metrics (see -reorder-window parameter) Usage: mt-kafka-mdm-report-out-of-order [flags]