package statsd
import (
"strconv"
)
// MessageTooLongError is an error returned when a sample, event or service check is too large once serialized. See
// WithMaxBytesPerPayload option for more details.
type MessageTooLongError struct{}
func (e MessageTooLongError) Error() string {
return "message too long. See 'WithMaxBytesPerPayload' documentation."
}
var errBufferFull = MessageTooLongError{}
type partialWriteError string
func (e partialWriteError) Error() string { return string(e) }
const errPartialWrite = partialWriteError("value partially written")
const metricOverhead = 512
// statsdBuffer is a buffer containing statsd messages
// this struct methods are NOT safe for concurrent use
type statsdBuffer struct {
buffer []byte
maxSize int
maxElements int
elementCount int
}
func newStatsdBuffer(maxSize, maxElements int) *statsdBuffer {
return &statsdBuffer{
buffer: make([]byte, 0, maxSize+metricOverhead), // pre-allocate the needed size + metricOverhead to avoid having Go re-allocate on it's own if an element does not fit
maxSize: maxSize,
maxElements: maxElements,
}
}
func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, timestamp int64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate)
b.buffer = appendTimestamp(b.buffer, timestamp)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64, timestamp int64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate)
b.buffer = appendTimestamp(b.buffer, timestamp)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
func (b *statsdBuffer) writeHistogram(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendHistogram(b.buffer, namespace, globalTags, name, value, tags, rate)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
// writeAggregated serialized as many values as possible in the current buffer and return the position in values where it stopped.
func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int, precision int, rate float64) (int, error) {
if b.elementCount >= b.maxElements {
return 0, errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendHeader(b.buffer, namespace, name)
// buffer already full
if len(b.buffer)+tagSize > b.maxSize {
b.buffer = originalBuffer
return 0, errBufferFull
}
// We add as many value as possible
var position int
for idx, v := range values {
previousBuffer := b.buffer
if idx != 0 {
b.buffer = append(b.buffer, ':')
}
b.buffer = strconv.AppendFloat(b.buffer, v, 'f', precision, 64)
// Should we stop serializing and switch to another buffer
if len(b.buffer)+tagSize > b.maxSize {
b.buffer = previousBuffer
break
}
position = idx + 1
}
// we could not add a single value
if position == 0 {
b.buffer = originalBuffer
return 0, errBufferFull
}
b.buffer = append(b.buffer, '|')
b.buffer = append(b.buffer, metricSymbol...)
b.buffer = appendRate(b.buffer, rate)
b.buffer = appendTagsAggregated(b.buffer, globalTags, tags)
b.buffer = appendContainerID(b.buffer)
b.writeSeparator()
b.elementCount++
if position != len(values) {
return position, errPartialWrite
}
return position, nil
}
func (b *statsdBuffer) writeDistribution(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendDistribution(b.buffer, namespace, globalTags, name, value, tags, rate)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
func (b *statsdBuffer) writeSet(namespace string, globalTags []string, name string, value string, tags []string, rate float64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendSet(b.buffer, namespace, globalTags, name, value, tags, rate)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
func (b *statsdBuffer) writeTiming(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendTiming(b.buffer, namespace, globalTags, name, value, tags, rate)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
func (b *statsdBuffer) writeEvent(event *Event, globalTags []string) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendEvent(b.buffer, event, globalTags)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
func (b *statsdBuffer) writeServiceCheck(serviceCheck *ServiceCheck, globalTags []string) error {
if b.elementCount >= b.maxElements {
return errBufferFull
}
originalBuffer := b.buffer
b.buffer = appendServiceCheck(b.buffer, serviceCheck, globalTags)
b.writeSeparator()
return b.validateNewElement(originalBuffer)
}
func (b *statsdBuffer) validateNewElement(originalBuffer []byte) error {
if len(b.buffer) > b.maxSize {
b.buffer = originalBuffer
return errBufferFull
}
b.elementCount++
return nil
}
func (b *statsdBuffer) writeSeparator() {
b.buffer = append(b.buffer, '\n')
}
func (b *statsdBuffer) reset() {
b.buffer = b.buffer[:0]
b.elementCount = 0
}
func (b *statsdBuffer) bytes() []byte {
return b.buffer
}