package dd
import (
"math"
"math/bits"
"sync"
"sync/atomic"
)
// DDSketch - provides a sketch which can answer quantile queries with
// relative-error, arbitrary observation range, and full mergability
// ref: https://www.vldb.org/pvldb/vol12/p2195-masson.pdf
type DDSketch struct {
coef float64
gamma float64
count uint64
head *node
mut *sync.Mutex
lookup map[uint64]*node
}
type node struct {
val uint64
count uint64
ptr *node
}
// NewDDSketch - init an empty sketch
func NewDDSketch(alpha float64) *DDSketch {
var gamma float64 = float64(1+alpha) / float64(1-alpha)
return &DDSketch{
lookup: make(map[uint64]*node),
coef: 1.0 / (math.Log10(gamma) * math.Log2(10)),
gamma: gamma,
mut: new(sync.Mutex),
}
}
// Add - adds observation to sketch. the original DDSketch algorithm uses
// Log w. base gamma. For ergonimics' sake, use Log2 and mult by const.
// see algorithm 1 (sec. 2.1) from Masson, Rim, & Lee
func (dd *DDSketch) Add(diff uint64) {
atomic.AddUint64(&dd.count, 1)
j := uint64(math.Log2(float64(diff)) * dd.coef)
if n, ok := dd.lookup[j]; ok {
atomic.AddUint64(&n.count, 1)
return
}
dd.mut.Lock()
dd.addNode(&node{val: j, count: 1})
dd.mut.Unlock()
return
}
// FastAdd - adds observation to sketch, approximates DDSketch.Add by
// replacing Log2 w. bits.LeadingZeros64. Error to DDSketch.Add is
// proportional to ⌈log2(diff)⌉ - log2(diff). LZCNT is a bit faster than
// Log2 (~30-50%). However! I have not worked out the implications of this
// on query accuracy.
func (dd *DDSketch) FastAdd(diff uint64) {
atomic.AddUint64(&dd.count, 1)
j := uint64(float64(64-bits.LeadingZeros64(diff)) * dd.coef)
if n, ok := dd.lookup[j]; ok {
atomic.AddUint64(&n.count, 1)
return
}
// lock to add a new _unique_ value, called infrequently, do not fret
dd.mut.Lock()
dd.addNode(&node{val: j, count: 1})
dd.mut.Unlock()
return
}
// Quantile - returns an alpha-accurate estimate of the q^th percentile
// of observed values. See algorithm 2 (sec. 2.1) from Masson, Rim, & Lee.
func (dd *DDSketch) Quantile(q float64) float64 {
if dd.head == nil {
return 0.0
}
var threshold = uint64(q * float64(dd.count-1))
var count uint64 = dd.head.count
cn := dd.head
for (count < threshold) && (cn.ptr != nil) {
count += cn.ptr.count
cn = cn.ptr
}
return 2 * math.Pow(dd.gamma, float64(cn.val)) / (dd.gamma + 1)
}
// addNode - adds a new _unique_ value
func (dd *DDSketch) addNode(nn *node) {
dd.lookup[nn.val] = nn
// set as head if sketch has no unique...
if dd.head == nil {
dd.head = nn
return
}
// set as head if lower than current head...
if nn.val < dd.head.val {
h := dd.head
dd.head = nn
nn.ptr = h
return
}
// set in interior...
cn := dd.head
for cn.ptr != nil {
if nn.val < cn.ptr.val {
nn.ptr = cn.ptr
cn.ptr = nn
return
}
cn = cn.ptr
}
return
}