I wrote a small vector search service earlier this year and wanted to write about a tiny component of the retrieval logic. I was using an approximate nearest neighbors index that partitioned a dataset into sections. This class of index is called an Inverted File Index and it is a common choice among many datastores with vector search support.
In my implementation, each call to Server.Query located
the
sections nearest to a userβs query vector and then performed parallel
sequential scans on those sections via a call to query. The
query function created a capped priority queue, scanned the
entire section, and then returned its contents back to
Server.Query to be merged and sent back to the user.
// Query - Public RPC
func (s *Server) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
// omitted: search region centroids, uses a PQ to rank top `M` of `K` regions.
// Passes `M` cluster UUIDs to `clustersToSearch`. init. parallel search, & worker
// for each section. Push results into a global PQ to reduce from `M` x `limit`
// vectors to `limit` vectors.
var globaltop = newprioQueue(req.Limit)
for _, uuid := range clustersToSearch {
wg.Add(1)
go func(id string, wg *sync.WaitGroup) {
defer wg.Done()
regiontop, _ := s.stores[id].query(req.Data, req.Limit)
globaltop.m.Lock()
defer globaltop.m.Unlock()
for _, res := range regiontop {
globaltop.tryPush(&obj{value: res.record, priority: res.score})
}
}(uuid, &wg)
}
wg.Wait()
return &pb.QueryResponse{Records: globalTop.Results()}, nil
}// query - internal
func (vs *VectorStore) query(search []float64, limit int) ([]*obj, error) {
var nearest = newPriorityQueue(limit)
for _, v := range vs.vectors {
nearest.tryPush(
&obj{value: v.id, priority: d.distance(v.data, search)},
)
}
return nearest.objsFor this to work well, I needed a reasonably performant priority
queue. But this was meant to be a toy, what I was
looking for was a passable, low-effort solution to finish this
proof-of-concept quickly. The priorityQueue example from
the heap documentation seemed like a great place to
start.
Unlike a traditional priority queue, what I refer to a βcapped priority queueβ should only ever hold the items closed to the query vector. To fit this requirement, the example PQ would either need to be constantly sorted and pruned, or another structure would need to wrap the queue to manage inserts (i.e.Β peek at itβs maximum value). Rather than writing a wrapper, I just implemented my own capped priority queue.
type obj struct {
value string
priority float64
}
// prioQueue - just maintains a sorted collection based on priority
type prioQueue struct {
objs []*obj
}
// newprioQueue - create a new PQ, initialized w. inf values
func newprioQueue(lim int) *prioQueue {
var pqobjs = make([]*obj, lim)
for i, _ := range pqobjs {
pqobjs[i] = &obj{value: nil, priority: math.Inf(1)}
}
return &prioQueue{objs: pqobjs}
}
// NOTE: Sample code omits implementations for Len, Swap, Less, to implement sort.Interface
// see: https://cs.opensource.google/go/go/+/go1.21.3:src/sort/sort.go;l=14
// push - Our object of study, seems like it sorts too much, but how bad is it?
func (pq prioQueue) tryPush(o *obj) {
if o.priority < pq.objs[len(pq.objs)-1].priority {
pq.objs[len(pq.objs)-1] = o
sort.Sort(pq)
}
}This implementation contains
logic. When tryPush is called with an object, that object
is only inserted if its priority is below the maximum priority of the
PQ. Following a push, we sort the PQ to move the new object to its
correct position. I was concerned with the implementation of
tryPush. It looks a little suspicious, how often are we
really going to need to call sort.Sort(pq)? To accept this
solution, I needed to get a grasp of this functionβs cost in terms of
objects scanned
and return limit
.
Using an approximation of the harmonic numbers we can estimate the
number of sorts per query. For example, if a query requires
getting the top 10 items from a section of 100,000 items, we can
estimate around 115 calls to sort. If this section is 1M
items, we can expect around 138 sorts. Overall, this doesnβt seem too
bad!
I verified this estimate with an unscientific test that involved attaching a counter to the PQ. Because we donβt perform a sort on the first items our theoretical estimate of is than our observed mean . Not bad!
Finally, we can approximate a
associated with each call to query. Assuming a sort
function that runs in
,
we find a cost function thatβs quadratic with respect to the number of
vectors requested, but only logarithmic in vectors scanned.
Note: I create a DB of 200K vectors of dimension 768 and partition the DB into sections and serve 10,000 sequential requests for a random vectorβs twenty nearest neighbors by distance.
This is OK if we assume
is relatively small. If
were large, thereβs an easy performance improvement to be had by
temporarily allowing the PQ to remain unsorted and grow up to
observations. Once the probability of triggering a sort is low, we sort
once, truncate all observations in excess of
and proceed as normal. In practice, this type of optimization is likely
overkill. I ran a quick (probably too short) profile of the service as
it served normal traffic and found that the sort.Sort
operation isnβt negligible, but is
less time-consuming than the (admittedly under-optimized) distance
calculations.
(pprof) list prioQueue
Total: 19.31s
ROUTINE ====== github.com/dmw2151/avdb.prioQueue.tryPush in /avdb/pq.go
40ms 600ms (flat, cum) 3.11% of Total
. . 61:func (pq prioQueue) tryPush(o *obj) {
20ms 20ms 62: if o.priority < pq.objs[len(pq.objs)-1].priority {
10ms 10ms 63: pq.objs[len(pq.objs)-1] = o
. 560ms 64: sort.Sort(pq)
. . 65: }
10ms 10ms 66:}(pprof) list distance
Total: 19.31s
ROUTINE ====== github.com/dmw2151/avdb.L2Distancer.distance in in /avdb/distancer.go
11.36s 11.36s (flat, cum) 58.83% of Total
. . 22:func (d L2Distancer) distance(a, b []float64) float64 {
. . 23: var dist float64
. . 24: var diff float64
9.69s 9.69s 25: for i, _ := range a {
1.59s 1.59s 26: diff = (a[i] - b[i])
70ms 70ms 27: dist += diff * diff
. . 28: }
10ms 10ms 29: return math.Sqrt(dist)
. . 30:}