// reservoir.go
package reservoir
import (
"encoding/binary"
"errors"
"math"
"math/rand"
"sync"
"sync/atomic"
)
var (
errInvalidReadIndex = errors.New("invalid read index")
errBufferEntrySzTooSmall = errors.New("buffer entry size too small")
errInvalidMessageSize = errors.New("message exceeds configured buffer entry size")
errCatastrophe = errors.New("very bad...")
)
// Record - a buffer'able record
type Record interface {
GetTimestamp() uint64
GetData() []byte
GetSize() uint64
}
// Reservoir - implements reservoir sampling using Devroye's Algo-L.
// For any arbitrary num. calls to r.Write(msg), retains `nSlots` msgs
// with equal probability.
// ref: https://dl.acm.org/doi/pdf/10.1145/198429.198435 (p. 485-486)
type Reservoir struct {
nMsgs uint64
nSlots uint64
recSz uint64
nextWrite uint64
w float64
mut *sync.RWMutex
data []byte
}
// New - init new Reservoir
func New(nSlots uint64, recSz uint64) (*Reservoir, error) {
if recSz < 16 {
return nil, errBufferEntrySzTooSmall
}
w := math.Exp(math.Log(rand.Float64()) / float64(nSlots))
return &Reservoir{
data: make([]byte, nSlots*recSz),
recSz: recSz,
nSlots: nSlots,
mut: &sync.RWMutex{},
w: w,
nextWrite: nSlots,
}, nil
}
// Write - begins a write sequence which either exits quickly and
// skips processing the message OR holds a lock, serializes the
// record, and writes into the buffer.
func (r *Reservoir) Write(rec Record) error {
// write if r.nMsgs in [0, r.nSlots) or [r.nextWrite, \infty),
// incr. and exit if r.nMsgs in [r.nSlots, r.nextWrite)
//
// WARN: it's safe, but avoid many concurrent write-requesters. If `nMsgs`
// puts them on the write sequence, we end up doing extra serde + writes.
// If it puts them on the short sequence, we end up not writing until
// later than the desired `r.nextwrite` ...
id := atomic.LoadUint64(&r.nMsgs)
wr := atomic.LoadUint64(&r.nextWrite)
if (r.nSlots <= id) && (id < wr) {
atomic.AddUint64(&r.nMsgs, 1)
return nil
}
r.mut.Lock()
defer r.mut.Unlock()
// call (presumably expensive) interface serde methods and write...
ts, data := rec.GetTimestamp(), rec.GetData()
if err := r.write(ts, data, id); err != nil {
return err
}
atomic.AddUint64(&r.nMsgs, 1)
return nil
}
// write - internal write
func (r *Reservoir) write(ts uint64, data []byte, id uint64) error {
// Must decide on `j`, the reservoir slot to write this data. If `id`
// is _at least_ `r.nSlots`, then draw `j` ~ U[0, r.nSlots).
// if `id` less than `r.nSlots` (as in the initial seeding),
// reload `nMsgs` to get a correct count and use that as `j`.
var j uint64
if r.nSlots <= id {
j = uint64(rand.Int31n(int32(r.nSlots)))
} else {
// CAREFUL! With many concurrent writers the count may exceed
// `r.nSlots`. In this case fallback to `j` ~ U[0, r.nSlots)...
j = atomic.LoadUint64(&r.nMsgs)
if r.nSlots <= j {
j = uint64(rand.Int31n(int32(r.nSlots)))
}
}
// write timestamp into the _first eight bytes_ of slot, message
// size into the _second eight bytes_, and data into the remainder
if msgSz := uint64(len(data)); msgSz > (r.recSz - 16) {
return errInvalidMessageSize
} else {
binary.LittleEndian.PutUint64(r.data[(j*r.recSz)+8:], msgSz)
}
binary.LittleEndian.PutUint64(r.data[(j*r.recSz):], ts)
copy(r.data[(j*r.recSz)+16:((j+1)*r.recSz)], data)
// NOTE: first writer to _START_ after the `r.nSlots`'th writer
// finishes will be the first to updateWaitingPeriod()
if r.nSlots <= id {
r.updateWaitingPeriod()
}
return nil
}
// updateWaitingPeriod - corresponds to steps (L.4) of Devroye's algo.
// where `r.w` is `W` and `r.nextWrite` is a cumulative sum of `S` vals
func (r *Reservoir) updateWaitingPeriod() {
r.w *= math.Exp(math.Log(rand.Float64()) / float64(r.nSlots))
r.nextWrite += uint64(math.Log(rand.Float64()) / math.Log(1-r.w))
}
// Read - reads the reservoir at slot `j`.
func (r *Reservoir) Read(j uint64) (uint64, []byte, error) {
if r.nSlots <= j {
return 0, nil, errInvalidReadIndex
}
r.mut.RLock()
defer r.mut.RUnlock()
// 8B timestamp, 8B message length, _up to_ (b.recSz - 16)B msg data...
ts := binary.LittleEndian.Uint64(r.data[(j * r.recSz):])
msgSz := binary.LittleEndian.Uint64(r.data[(j*r.recSz)+8:])
msg := r.data[(j*r.recSz)+16 : (j*r.recSz)+16+msgSz]
return ts, msg, nil
}
// Reset - sets the reservoir back to it's initial state
func (r *Reservoir) Reset() {
r.mut.Lock()
defer r.mut.Unlock()
atomic.StoreUint64(&r.nMsgs, 0)
atomic.StoreUint64(&r.nextWrite, r.nSlots)
r.w = math.Exp(math.Log(rand.Float64()) / float64(r.nSlots))
var zeroedRecord = make([]byte, r.recSz)
for i := uint64(0); i < r.nSlots; i++ {
copy(r.data[i*r.recSz:], zeroedRecord)
}
}
// Size - ...
func (r *Reservoir) Size() uint64 {
return r.nSlots
}
// lockless.go
package reservoir
import (
"math/rand"
"runtime"
"sync/atomic"
)
// LocklessReservoir - quick (almost surely not correct) impl. of a reservoir sampler
// w. algo-R and no locks, fine-grained control using a few uint32s...
type LocklessReservoir struct {
nMsgs uint64
nSlots uint64
recSz uint64
data []atomic.Pointer[Record]
locks []uint32
}
// NewLockless - init new LocklessReservoir
func NewLockless(nSlots uint64, recSz uint64) (*LocklessReservoir, error) {
return &LocklessReservoir{
data: make([]atomic.Pointer[Record], nSlots),
nSlots: nSlots,
recSz: recSz,
locks: make([]uint32, (nSlots+31)/32),
}, nil
}
// Write - begins a write sequence...
func (r *LocklessReservoir) Write(rec Record) error {
numMsgs := atomic.LoadUint64(&r.nMsgs)
j := uint64(rand.Int31n(int32(numMsgs) + 1))
// short path - incr. && exit
if (r.nSlots <= j) && (r.nSlots <= numMsgs) {
atomic.AddUint64(&r.nMsgs, 1)
return nil
}
// long path - get `lock` and write
if rec.GetSize() > r.recSz {
atomic.AddUint64(&r.nMsgs, 1)
return errInvalidMessageSize
}
// locate the index (l) and position (k) in `r.locks` which correspond to j
var l, k uint32 = uint32(j / 32), uint32(1 << (j % 32))
for {
if atomic.CompareAndSwapUint32(&r.locks[l], 0, k) {
r.data[j].Store(&rec)
if !atomic.CompareAndSwapUint32(&r.locks[l], k, 0) {
// failure to CAS here would be a bad correctness bug!
panic(errCatastrophe)
return errCatastrophe
}
atomic.AddUint64(&r.nMsgs, 1)
return nil
} else {
runtime.Gosched()
}
}
}
// Read - reads the reservoir at slot `j`.
func (r *LocklessReservoir) Read(j uint64) (uint64, []byte, error) {
if r.nSlots <= j {
return 0, nil, errInvalidReadIndex
}
var l, k uint32 = uint32(j / 32), uint32(1 << (j % 32))
for {
if atomic.CompareAndSwapUint32(&r.locks[l], 0, k) {
rec := r.data[j].Load()
drec := (*rec).(Record) // WARN: slow! slow! slow!
if !atomic.CompareAndSwapUint32(&r.locks[l], k, 0) {
panic(errCatastrophe)
return 0, nil, errCatastrophe
}
if rec == nil {
return 0, nil, nil
}
return drec.GetTimestamp(), drec.GetData(), nil
} else {
runtime.Gosched()
}
}
}
// Reset - resets reservoir counter w.o. clearing data...
func (r *LocklessReservoir) ResetCounter() {
atomic.StoreUint64(&r.nMsgs, 0)
}