buffer-code
// reservoir.go
package reservoir

import (
    "encoding/binary"
    "errors"
    "math"
    "math/rand"
    "sync"
    "sync/atomic"
)

var (
    errInvalidReadIndex      = errors.New("invalid read index")
    errBufferEntrySzTooSmall = errors.New("buffer entry size too small")
    errInvalidMessageSize    = errors.New("message exceeds configured buffer entry size")
    errCatastrophe           = errors.New("very bad...")
)

// Record - a buffer'able record
type Record interface {
    GetTimestamp() uint64
    GetData() []byte
    GetSize() uint64
}

// Reservoir - implements reservoir sampling using Devroye's Algo-L.
// For any arbitrary num. calls to r.Write(msg), retains `nSlots` msgs
// with equal probability.
// ref: https://dl.acm.org/doi/pdf/10.1145/198429.198435 (p. 485-486)
type Reservoir struct {
    nMsgs     uint64
    nSlots    uint64
    recSz     uint64
    nextWrite uint64
    w         float64
    mut       *sync.RWMutex
    data      []byte
}

// New - init new Reservoir
func New(nSlots uint64, recSz uint64) (*Reservoir, error) {
    if recSz < 16 {
        return nil, errBufferEntrySzTooSmall
    }
    w := math.Exp(math.Log(rand.Float64()) / float64(nSlots))
    return &Reservoir{
        data:      make([]byte, nSlots*recSz),
        recSz:     recSz,
        nSlots:    nSlots,
        mut:       &sync.RWMutex{},
        w:         w,
        nextWrite: nSlots,
    }, nil
}

// Write - begins a write sequence which either exits quickly and
// skips processing the message OR holds a lock, serializes the
// record, and writes into the buffer.
func (r *Reservoir) Write(rec Record) error {

    // write if r.nMsgs in [0, r.nSlots) or [r.nextWrite, \infty),
    // incr. and exit if r.nMsgs in [r.nSlots, r.nextWrite)
    //
    // WARN: it's safe, but avoid many concurrent write-requesters. If `nMsgs`
    // puts them on the write sequence, we end up doing extra serde + writes.
    // If it puts them on the short sequence, we end up not writing until
    // later than the desired `r.nextwrite` ...
    id := atomic.LoadUint64(&r.nMsgs)
    wr := atomic.LoadUint64(&r.nextWrite)
    if (r.nSlots <= id) && (id < wr) {
        atomic.AddUint64(&r.nMsgs, 1)
        return nil
    }

    r.mut.Lock()
    defer r.mut.Unlock()

    // call (presumably expensive) interface serde methods and write...
    ts, data := rec.GetTimestamp(), rec.GetData()
    if err := r.write(ts, data, id); err != nil {
        return err
    }

    atomic.AddUint64(&r.nMsgs, 1)
    return nil
}

// write - internal write
func (r *Reservoir) write(ts uint64, data []byte, id uint64) error {

    // Must decide on `j`, the reservoir slot to write this data. If `id`
    // is _at least_ `r.nSlots`, then draw `j` ~ U[0, r.nSlots).
    // if `id` less than `r.nSlots` (as in the initial seeding),
    // reload `nMsgs` to get a correct count and use that as `j`.
    var j uint64
    if r.nSlots <= id {
        j = uint64(rand.Int31n(int32(r.nSlots)))
    } else {
        // CAREFUL! With many concurrent writers the count may exceed
        // `r.nSlots`. In this case fallback to `j` ~ U[0, r.nSlots)...
        j = atomic.LoadUint64(&r.nMsgs)
        if r.nSlots <= j {
            j = uint64(rand.Int31n(int32(r.nSlots)))
        }
    }

    // write timestamp into the _first eight bytes_ of slot, message
    // size into the _second eight bytes_, and data into the remainder
    if msgSz := uint64(len(data)); msgSz > (r.recSz - 16) {
        return errInvalidMessageSize
    } else {
        binary.LittleEndian.PutUint64(r.data[(j*r.recSz)+8:], msgSz)
    }

    binary.LittleEndian.PutUint64(r.data[(j*r.recSz):], ts)
    copy(r.data[(j*r.recSz)+16:((j+1)*r.recSz)], data)

    // NOTE: first writer to _START_ after the `r.nSlots`'th writer
    // finishes will be the first to updateWaitingPeriod()
    if r.nSlots <= id {
        r.updateWaitingPeriod()
    }
    return nil
}

// updateWaitingPeriod - corresponds to steps (L.4) of Devroye's algo.
// where `r.w` is `W` and `r.nextWrite` is a cumulative sum of `S` vals
func (r *Reservoir) updateWaitingPeriod() {
    r.w *= math.Exp(math.Log(rand.Float64()) / float64(r.nSlots))
    r.nextWrite += uint64(math.Log(rand.Float64()) / math.Log(1-r.w))
}

// Read - reads the reservoir at slot `j`.
func (r *Reservoir) Read(j uint64) (uint64, []byte, error) {

    if r.nSlots <= j {
        return 0, nil, errInvalidReadIndex
    }

    r.mut.RLock()
    defer r.mut.RUnlock()

    // 8B timestamp, 8B message length, _up to_ (b.recSz - 16)B msg data...
    ts := binary.LittleEndian.Uint64(r.data[(j * r.recSz):])
    msgSz := binary.LittleEndian.Uint64(r.data[(j*r.recSz)+8:])
    msg := r.data[(j*r.recSz)+16 : (j*r.recSz)+16+msgSz]
    return ts, msg, nil
}

// Reset - sets the reservoir back to it's initial state
func (r *Reservoir) Reset() {

    r.mut.Lock()
    defer r.mut.Unlock()

    atomic.StoreUint64(&r.nMsgs, 0)
    atomic.StoreUint64(&r.nextWrite, r.nSlots)
    r.w = math.Exp(math.Log(rand.Float64()) / float64(r.nSlots))

    var zeroedRecord = make([]byte, r.recSz)
    for i := uint64(0); i < r.nSlots; i++ {
        copy(r.data[i*r.recSz:], zeroedRecord)
    }
}

// Size - ...
func (r *Reservoir) Size() uint64 {
    return r.nSlots
}
// lockless.go
package reservoir

import (
    "math/rand"
    "runtime"
    "sync/atomic"
)

// LocklessReservoir - quick (almost surely not correct) impl. of a reservoir sampler
// w. algo-R and no locks, fine-grained control using a few uint32s...
type LocklessReservoir struct {
    nMsgs  uint64
    nSlots uint64
    recSz  uint64
    data   []atomic.Pointer[Record]
    locks  []uint32
}

// NewLockless - init new LocklessReservoir
func NewLockless(nSlots uint64, recSz uint64) (*LocklessReservoir, error) {
    return &LocklessReservoir{
        data:   make([]atomic.Pointer[Record], nSlots),
        nSlots: nSlots,
        recSz:  recSz,
        locks:  make([]uint32, (nSlots+31)/32),
    }, nil
}

// Write - begins a write sequence...
func (r *LocklessReservoir) Write(rec Record) error {

    numMsgs := atomic.LoadUint64(&r.nMsgs)
    j := uint64(rand.Int31n(int32(numMsgs) + 1))

    // short path - incr. && exit
    if (r.nSlots <= j) && (r.nSlots <= numMsgs) {
        atomic.AddUint64(&r.nMsgs, 1)
        return nil
    }

    // long path - get `lock` and write
    if rec.GetSize() > r.recSz {
        atomic.AddUint64(&r.nMsgs, 1)
        return errInvalidMessageSize
    }

    // locate the index (l) and position (k) in `r.locks` which correspond to j
    var l, k uint32 = uint32(j / 32), uint32(1 << (j % 32))
    for {
        if atomic.CompareAndSwapUint32(&r.locks[l], 0, k) {
            r.data[j].Store(&rec)
            if !atomic.CompareAndSwapUint32(&r.locks[l], k, 0) {
                // failure to CAS here would be a bad correctness bug!
                panic(errCatastrophe)
                return errCatastrophe
            }
            atomic.AddUint64(&r.nMsgs, 1)
            return nil
        } else {
            runtime.Gosched()
        }
    }
}

// Read - reads the reservoir at slot `j`.
func (r *LocklessReservoir) Read(j uint64) (uint64, []byte, error) {
    if r.nSlots <= j {
        return 0, nil, errInvalidReadIndex
    }

    var l, k uint32 = uint32(j / 32), uint32(1 << (j % 32))
    for {
        if atomic.CompareAndSwapUint32(&r.locks[l], 0, k) {
            rec := r.data[j].Load()
            drec := (*rec).(Record) // WARN: slow! slow! slow!
            if !atomic.CompareAndSwapUint32(&r.locks[l], k, 0) {
                panic(errCatastrophe)
                return 0, nil, errCatastrophe
            }
            if rec == nil {
                return 0, nil, nil
            }
            return drec.GetTimestamp(), drec.GetData(), nil
        } else {
            runtime.Gosched()
        }
    }
}

// Reset - resets reservoir counter w.o. clearing data...
func (r *LocklessReservoir) ResetCounter() {
    atomic.StoreUint64(&r.nMsgs, 0)
}