A Priority Queue On The Cheap

A Priority Queue On The Cheap

I wrote a small vector search service earlier this year and wanted to write about a tiny component of the retrieval logic. I was using an approximate nearest neighbors index that partitioned a dataset into sections. This class of index is called an Inverted File Index and it is a common choice among many datastores with vector search support.

In my implementation, each call to Server.Query located the 𝑀\textit{M} sections nearest to a user’s query vector and then performed parallel sequential scans on those sections via a call to query. The query function created a capped priority queue, scanned the entire section, and then returned its contents back to Server.Query to be merged and sent back to the user.

// Query - Public RPC
func (s *Server) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
    
    // omitted: search region centroids, uses a PQ to rank top `M` of `K` regions. 
    // Passes `M` cluster UUIDs to `clustersToSearch`. init. parallel search, & worker 
    // for each section. Push results into a global PQ  to reduce from `M` x `limit` 
    // vectors to `limit` vectors.
    var globaltop = newprioQueue(req.Limit)

    for _, uuid := range clustersToSearch {
        wg.Add(1)
        go func(id string, wg *sync.WaitGroup) {
            defer wg.Done()
            regiontop, _ := s.stores[id].query(req.Data, req.Limit)
            globaltop.m.Lock()
            defer globaltop.m.Unlock()
            for _, res := range regiontop {
                globaltop.tryPush(&obj{value: res.record, priority: res.score})
            }
        }(uuid, &wg)
    }
    wg.Wait()
    return &pb.QueryResponse{Records: globalTop.Results()}, nil
}
// query - internal
func (vs *VectorStore) query(search []float64, limit int) ([]*obj, error) {
    var nearest = newPriorityQueue(limit)
    for _, v := range vs.vectors {
        nearest.tryPush(
            &obj{value: v.id, priority: d.distance(v.data, search)},
        )
    }
    return nearest.objs

For this to work well, I needed a reasonably performant priority queue. But this was meant to be a toy, what I was π‘Ÿπ‘’π‘Žπ‘™π‘™π‘¦\textit{really} looking for was a passable, low-effort solution to finish this proof-of-concept quickly. The priorityQueue example from the heap documentation seemed like a great place to start.

Unlike a traditional priority queue, what I refer to a β€œcapped priority queue” should only ever hold the mm items closed to the query vector. To fit this requirement, the example PQ would either need to be constantly sorted and pruned, or another structure would need to wrap the queue to manage inserts (i.e.Β peek at it’s maximum value). Rather than writing a wrapper, I just implemented my own π‘šπ‘–π‘›π‘–π‘šπ‘Žπ‘™\textit{minimal} capped priority queue.

type obj struct {
    value    string
    priority float64
}

// prioQueue - just maintains a sorted collection based on priority
type prioQueue struct { 
    objs []*obj 
}

// newprioQueue - create a new PQ, initialized w. inf values
func newprioQueue(lim int) *prioQueue {
    var pqobjs = make([]*obj, lim)
    for i, _ := range pqobjs {
        pqobjs[i] = &obj{value: nil, priority: math.Inf(1)}
    }
    return &prioQueue{objs: pqobjs}
}

// NOTE: Sample code omits implementations for Len, Swap, Less, to implement sort.Interface
// see: https://cs.opensource.google/go/go/+/go1.21.3:src/sort/sort.go;l=14

// push - Our object of study, seems like it sorts too much, but how bad is it?
func (pq prioQueue) tryPush(o *obj) {
    if o.priority < pq.objs[len(pq.objs)-1].priority {
        pq.objs[len(pq.objs)-1] = o
        sort.Sort(pq)
    }
}

This implementation contains π‘£π‘’π‘Ÿπ‘¦ 𝑙𝑖𝑑𝑑𝑙𝑒\textit{very little} logic. When tryPush is called with an object, that object is only inserted if its priority is below the maximum priority of the PQ. Following a push, we sort the PQ to move the new object to its correct position. I was concerned with the implementation of tryPush. It looks a little suspicious, how often are we really going to need to call sort.Sort(pq)? To accept this solution, I needed to get a grasp of this function’s cost in terms of objects scanned (N)\Big(N) and return limit (K)\Big(K).

E[# Sorts]=βˆ‘i=0NPr(𝐴𝑐𝑐𝑒𝑝𝑑 ith 𝑂𝑏𝑗.)=Kβ‹…βˆ‘i=0Niβˆ’1π‘ˆπ‘›π‘–π‘“π‘œπ‘Ÿπ‘šπ‘–π‘‘π‘¦βŸΉithπ‘œπ‘π‘—π‘’π‘π‘‘ hπ‘Žπ‘  𝑃(𝐴𝑐𝑐𝑒𝑝𝑑)β‰ˆKi=Kβ‹…HN𝑑𝑒𝑓’𝑛 π»π‘Žπ‘Ÿπ‘šπ‘œπ‘›π‘–π‘ π‘π‘’π‘šπ‘π‘’π‘Ÿβ‰ˆKβ‹…log(N)π‘Žπ‘π‘π‘Ÿπ‘œπ‘₯. π»π‘Žπ‘Ÿπ‘šπ‘œπ‘›π‘–π‘ π‘π‘’π‘šπ‘π‘’π‘Ÿ\begin{eqnarray} & E[\text{# Sorts}] & = & \sum_{i=0}^{N} Pr\Big(\textit{Accept } i^{th} \textit{ Obj.}) &\\ & & = & K \cdot \sum_{i=0}^{N} i^{-1} \qquad & \textit{Uniformity} \implies i^{th} \textit{object has P(Accept)} \approx \frac{K}{i} \\ & & = & K \cdot H_N \qquad & \textit{def'n Harmonic Number} \\ & & \approx & K \cdot \log\Big(N) \qquad & \textit{approx. Harmonic Number} \\ \end{eqnarray}

Using an approximation of the harmonic numbers we can estimate the number of sorts per query. For example, if a query requires getting the top 10 items from a section of 100,000 items, we can estimate around 115 calls to sort. If this section is 1M items, we can expect around 138 sorts. Overall, this doesn’t seem too bad!

I verified this estimate with an unscientific test that involved attaching a counter to the PQ. Because we don’t perform a sort on the first 𝐾\textit{K} items our theoretical estimate of 10*log(1M)β‰ˆ138.210 * \log\Big(1M) \approx 138.2 is h𝑖𝑔hπ‘’π‘Ÿ\textit{higher} than our observed mean (135.3Β±9.8)\Big(135.3 \pm 9.8). Not bad!

Finally, we can approximate a π‘π‘œπ‘ π‘‘\textit{cost} associated with each call to query. Assuming a sort function that runs in Klog(K)K\log\Big(K), we find a cost function that’s quadratic with respect to the number of vectors requested, but only logarithmic in vectors scanned.

β‰ˆKlog(K)β‹…Klog(N)β‰ˆK2log(K)log(N)\begin{eqnarray} \approx & K\log\Big(K) \cdot K\log\Big(N) \\ \approx & K^2 \log\Big(K)\log\Big(N) \\ \end{eqnarray}

Note: I create a DB of 200K vectors of dimension 768 and partition the DB into 200000β‰ˆ450\sqrt{200000} \approx 450 sections and serve 10,000 sequential requests for a random vector’s twenty nearest neighbors by 𝐿2\textit{L2} distance.

This is OK if we assume KK is relatively small. If KK were large, there’s an easy performance improvement to be had by temporarily allowing the PQ to remain unsorted and grow up to Ξ±β‹…K;Ξ±β‰₯1\alpha \cdot K; \alpha \geq 1 observations. Once the probability of triggering a sort is low, we sort once, truncate all observations in excess of KK and proceed as normal. In practice, this type of optimization is likely overkill. I ran a quick (probably too short) profile of the service as it served normal traffic and found that the sort.Sort operation isn’t negligible, but is π‘šπ‘’π‘h\textit{much} less time-consuming than the (admittedly under-optimized) distance calculations.

(pprof) list prioQueue
Total: 19.31s
ROUTINE ====== github.com/dmw2151/avdb.prioQueue.tryPush in /avdb/pq.go
40ms      600ms (flat, cum)  3.11% of Total
   .          .     61:func (pq prioQueue) tryPush(o *obj) {
20ms       20ms     62:   if o.priority < pq.objs[len(pq.objs)-1].priority {
10ms       10ms     63:           pq.objs[len(pq.objs)-1] = o
   .      560ms     64:           sort.Sort(pq)
   .          .     65:   }
10ms       10ms     66:}
(pprof) list distance     
Total: 19.31s
ROUTINE ====== github.com/dmw2151/avdb.L2Distancer.distance in in /avdb/distancer.go
11.36s     11.36s (flat, cum) 58.83% of Total
   .          .     22:func (d L2Distancer) distance(a, b []float64) float64 {
   .          .     23:   var dist float64
   .          .     24:   var diff float64
9.69s      9.69s    25:   for i, _ := range a {
1.59s      1.59s    26:           diff = (a[i] - b[i])
 70ms       70ms    27:           dist += diff * diff
   .          .     28:   }
 10ms       10ms    29:   return math.Sqrt(dist)
   .          .     30:}