An Optimistic Concurrency Control Manager

An Optimistic Concurrency Control Manager

I took a few days off work this week and wanted to write some fun code while I was out. Learning more about concurrency management has been on my list for a while, so let’s go with that. Broadly, databases tend to manage concurrency with either 𝐏𝐞𝐬𝐬𝐒𝐦𝐒𝐬𝐭𝐒𝐜\textbf{Pessimistic} (e.g.Β Two-Phase Locking) or 𝐎𝐩𝐭𝐒𝐦𝐒𝐬𝐭𝐒𝐜\textbf{Optimistic} (e.g.Β Timestamp Ordering) protocols. In a textbook implementation of OCC, a DB is made serializable by taking no locks during a preliminary working phase and then write-locking the DB during a short validation and write phase.

Today, I roughly implemented Kung and Robinson’s On Optimistic Methods for Concurrency Control (1981) and wrote a small (≀200\leq 200 LOC) Go package that can be used to wrap an arbitrary data store and give it OCC semantics. Because direct ports of papers are hardly ever fun, I made the following changes:

  1. I collapse π™²πš›πšŽπšŠπšπšŽπš‚πšŽπš\texttt{CreateSet}, πš†πš›πš’πšπšŽπš‚πšŽπš\texttt{WriteSet}, and π™³πšŽπš•πšŽπšπšŽπš‚πšŽπš\texttt{DeleteSet} into πš†πš›πš’πšπšŽπš‚πšŽπš\texttt{WriteSet}. These ops are all writes, treating them as such gives us a much tighter implementation.

  2. (🚨) I record π™²πšžπš›πš›πšŽπš—πšπšƒπš‘πš—π™Έπ™³\texttt{CurrentTxnID} for each read a transaction executes, and only validate it’s πšπšŽπšŠπšπš‚πšŽπš\texttt{ReadSet} against committed transactions with πšƒπš‘πš—π™Έπ™³\texttt{TxnID} up to the last read. This should give us a consistently faster validation phase.

  3. Rather than maintaining a πšπšŽπšŠπšπš‚πšŽπš\texttt{ReadSet} and πš†πš›πš’πšπšŽπš‚πšŽπš\texttt{WriteSet} of object IDs, I represent either as a slice of bits (really πšžπš’πš—πšπŸΌπŸΊ\texttt{uint64}). On read or write, I set a bit in the corresponding array by hashing the object’s ID. With this optimization, the validation phase is just a few bitwise 𝙰𝙽𝙳\texttt{AND}s. This is effectively a bloom filter, and regrettably means that transactions with many reads will have an inflated commit failure rate.

  4. I bound the size of the log by πšπš‘πš—πšπšŽπšπšŽπš—πšπš’πš˜πš—\texttt{txnRetention}. Because of (πŸ‘)\textbf{(3)}, I can allocate one slice proportional to πš‘πšŠπšœπš‘π™±πš’πšπšœβ‹…πšπš‘πš—πšπšŽπšπšŽπš—πšπš’πš˜πš—\texttt{hashBits} \ \cdot \ \texttt{txnRetention}. This gives us π’ͺ(1)\mathcal{O}\Big(1) access to a committed transaction’s πš†πš›πš’πšπšŽπš‚πšŽπš\texttt{WriteSet} hash. In exchange, we must fail all transactions more than πšπš‘πš—πšπšŽπšπšŽπš—πšπš’πš˜πš—\texttt{txnRetention} commits old. Fair trade, we take that…

𝐍.𝐁\textbf{N.B} β€” I am not strictly anti-Github, but I’d prefer to share large code snippets without (1) linking to another domain (2) wasting a ton of space inline, or (3) having my code shoveled into the maw of Copilot. Working out some ways to get a gist-like interface onto my website. For the time being, full code for the OCC manager is here locally and on GitHub.

In the example below I initialize a πš—πš˜πš˜πš™πš‚πšπš˜πš›πšŽ\texttt{noopStore} which satisfies 𝚘𝚌𝚌.πš‚πšπš˜πš›πšŽ\texttt{occ.Store} and use it to manage several dozen concurrent readers and writers. Acknowledging that this is a trivial example, we’re looking at the ability to verify several million operations per second.

package main

import (
    "math/rand"
    "sync"
    "occ"
)

type noopStore int

func (s noopStore) Write(ds []interface{}) error {
    return nil
}

func (s noopStore) Read(id uint64) (interface{}, error) {
    return 1, nil
}

func (s noopStore) Merge(old, new interface{}) (interface{}, error) {
    return 1, nil
}

func main() {

    var (
        numEvents, numWorkers  int    = 1000000, 48
        hashBits, txnRetention uint64 = 1024, 10000
        numObjects             uint64 = 1024 * 1024
        workCh                        = make(chan uint64)
        wg                            = sync.WaitGroup{}
    )

    DB := occ.NewManager(noopStore(1), hashBits, txnRetention)

    wg.Add(numWorkers)
    for workerID := 0; workerID < numWorkers; workerID++ {
        go func() {
            defer wg.Done()
            c := DB.NewConnection()
            for r := range workCh {
                c.Begin()
                c.Read(r)
                c.Write(r, 1)
                c.Commit()
            }
        }()
    }

    for i := 0; i < numEvents; i++ {
        workCh <- (rand.Uint64() % numObjects)
    }
    close(workCh)
    wg.Wait()

    // A little contrived example, `commitRate` subject to fluctuate on quite a few factors, 
    // number of items read,  max concurrency, hashBits, variation in exec time, etc. This 
    // is as favorable as it gets...
    // 
    // 2024/08/23 13:38:20 commitRate: 0.994779, execTime: 387.828209ms

}

Still need a lot of testing for correctness.