Logos, Thumos & Code

Let’s write some code

A basic Write Ahead Log

This weekend I decided to add some basic persistence to my Key-Value store. I considered going directly into B-Trees or LSM, but they are quite involved. Moving, forward I want to focus more on the distributed side of the store rather than low level storage details (saving those for later!), so for now I decided to implement something simpler, a basic Write Ahead Log (WAL, for short).

What is a Write Ahead Log?

This post from Oskar Dudycz does a much better job than I could explaining the theory, so I will just give the high level summary.

When a database receives an update (a new ‘set’ operation), it has to store that into its memory representation of the store, which would eventually be backed in persistent disk storage. It will also replicate the data across other shards. If the database fails in the middle of those operations, the data could be lost, of leave the system in an inconsistent state!

So how do we solve this problem? We just keep a file were we append all our updates sequentially. Appending to a file is simple and quick, and if we do this before acknowleding the update to the client, we will end up with a sequence of operations safely (well, maybe) store in disk that will help us reconstruct the state of our store in the case of a crash.

A forever growing log?

Well, it doesn’t have to be, you can save checkpoints! When the WAL goes over a threshold (it could be size, and/or time), we just take a snapshot of the data in the store and save it into a checkpoint. Once it’s clear, we can truncate the WAL.

My implementation

For this version I went with a very naive approach. All the updates to the store are currently serialized, so I know there is only one thread writing to the WAL, making it safe for me to do the checkpointing and WAL truncating at any point.

Also, I’m not paying much attention to performance, opening and closing files all the time. I’ve never done any benchmarking or performance testing in OCaml, so I’m saving that for a future post!

The Storage Interface

First I extracted the storage backend for the KV into an interface (before I was using a Hashtbl directly):

open! Base

type t

val create :  unit -> t
val put : t -> key:Model.Key.t -> value:Model.value -> unit 
val get : t -> key:Model.Key.t -> Model.value option
val iter : t -> f:(key:Model.Key.t -> data:Model.value -> unit) -> unit

And moved my Hashtbl implementation into its own module:

open! Base

type t = (Model.Key.t, Model.value) Hashtbl.t

let create () = Hashtbl.create (module Model.Key) 

let put table ~key ~value =
  Hashtbl.set table ~key ~data:value

let get table ~key =
  Hashtbl.find table key

let iter table ~f =
  Hashtbl.iteri table ~f

Now I can follow the same pattern for the WAL, and swap implementations later. This is the main type:

type t = {
  table : Storage_hashtbl.t;
  (** The path to the file storing the write ahead log *)
  wal_path : string;
  (** The path to the file storing the checkpoint *)
  checkpoint_path : string;
  (** The number of operations we have so far, to trigger the checkpoint save*)
  mutable operations : int;
}

Note that for storage we use the Storage_hashtbl module we defined above, we keep the same Hashtbl implementation at the core of the new module!.

The store currently supports two different types, int32 and strings, for both values and keys. We can represent each one in the same way, in both WAL, and checkpoint file:

  • A first byte for the type: 0 for int32, 1 for string
  • If it’s an int32, the next 4 bytes contain the value
  • If it’s a string, the next 4 bytes contain the size, and the next n, the value of the string.
let store_value channel = function
  | Model.Num n ->
    (** The type of int32 is 0 *)
    Out_channel.output_byte channel 0;
    let buffer = Bytes.create 4 in
    let _ = Encoding.push_int32_exn 0 buffer n in
    Out_channel.output channel buffer 0 4
  | Model.String s ->
    (** The type of string is 1 *)
    Out_channel.output_byte channel 1;
    let len = String.length s in
    let buffer = Bytes.create (4 + len) in
    let _ = Encoding.push_str_exn 0 buffer s in
    Out_channel.output channel buffer 0 (Bytes.length buffer)

This means that we can use the same function to load both WAL and Checkpoint:

let load_data table path =
  In_channel.with_open_gen [Open_binary] 0o666 path
    (fun channel ->
       let rec read () =
         match In_channel.input_byte channel with
         | None -> () (** EOF *)
         | Some input_type ->
           let key = match input_type with
             | 0 -> Model.Key.Num (get_num channel)
             | 1 -> Model.Key.String (get_str channel)
             | _ -> raise (Failure "Type not Supported!")
           in
           match In_channel.input_byte channel with
           | None -> () (** EOF *)
           | Some input_type ->
             let value = match input_type with
               | 0 -> Model.Num (get_num channel)
               | 1 -> Model.String (get_str channel)
               | _ -> raise (Failure "Type not Supported!")
             in
             let _ = Storage_hashtbl.put table.table ~key ~value in
             read ()
       in
       read ()
    )

We also have a function to save the checkpoint on demand:

let save_checkpoint table =
  Out_channel.with_open_gen [Open_binary; Open_creat; Open_trunc; Open_append] 0o666 table.checkpoint_path
    (fun out ->
       Storage_hashtbl.iter table.table ~f:(fun ~key ~data:value ->
           store_key out key;
           store_value out value;
           Out_channel.flush out;
         );
    )

And we all this happens when we get updates. First we add one to our operation count, and append to the WAL. If our count goes over the limit, we trigger the checkpoint, persisting all our data and truncating the WAL.

let put table ~key ~value =
  table.operations <- table.operations + 1;
  Out_channel.with_open_gen [Open_binary; Open_creat; Open_append] 0o666 table.wal_path
    (fun wal ->
       store_key wal key;
       store_value wal value;
       Out_channel.flush wal;
       Storage_hashtbl.put table.table ~key ~value;
    );
  if table.operations > max_wal_size then
    let _ = save_checkpoint table in
    Stdlib.Sys.remove table.wal_path;
    table.operations <- 0;

We also have to make sure we load the WAL during start up:

let create () =
  let table = {
    table = Storage_hashtbl.create ();
    wal_path = "./storage.bin";
    checkpoint_path = "./checkpoint.bin";
    operations = 0;
  } in
  if Stdlib.Sys.file_exists table.checkpoint_path then load_checkpoint table;
  if Stdlib.Sys.file_exists table.wal_path then
    let _ = load_wal table in
    save_checkpoint table; 
    Stdlib.Sys.remove table.wal_path;
  else ();
  table

Is it still quick?

No.

As you can see in the code above, we are opening and flushing the file for every write and read operation. In a real life system we would find a way to keep the file open for longer, maybe batch writes. But this is functional, and provides a baseline to compare performance, so should be a decent starting point, I think.

So, I wrote a quick benchmark to see where we stand today. Let’s run a 1000 sets and gets, using the raw Hashtbl and the WAL backed one, and see what’s the price of safety (at least naive safety)!

open! Core
open! Core_bench

(** dune exec -- ./bench.exe -ascii -quota 0.25 *) 
let amount = 1000
let value_pairs =
  List.init amount ~f:(fun i -> ("key" ^ string_of_int i, Int32.of_int_exn i))

module type Storage = sig
  type t

  val create :  unit -> t
  val put : t -> key:Kvlib.Model.Key.t -> value:Kvlib.Model.value -> unit 
  val get : t -> key:Kvlib.Model.Key.t -> Kvlib.Model.value option
end

module Bench (S : Storage) = struct
  let run_bench () =
    let table = S.create () in
    List.iter value_pairs
      ~f:(fun (key, value) ->
          let key = Kvlib.Model.Key.String key in
          let value = Kvlib.Model.Num value in
          S.put table ~key ~value;
          let _ = S.get table ~key in
          ()
        )
end 

module Bench_hashtbl = Bench(Kvlib.Storage_hashtbl)
module Bench_wal = Bench(Kvlib.Write_ahead_log)

let benchmarks =
  [ "Hashtable", Bench_hashtbl.run_bench
  ; "WAL", Bench_wal.run_bench]

let () =
  List.map benchmarks ~f:(fun (name, test) ->
      Core_bench.Bench.Test.create ~name test)
  |> Core_bench.Bench.make_command
  |> Command_unix.run

And if I run it:

$ dune exec -- ./bench.exe -ascii -quota 0.25

Entering directory '/home/tony/projects/ocaml/ocledis'
Leaving directory '/home/tony/projects/ocaml/ocledis'
Estimated testing time 500ms (2 benchmarks x 250ms). Change using '-quota'.

  Name            Time/Run    mWd/Run   mjWd/Run   Prom/Run   Percentage
 ----------- -------------- ---------- ---------- ---------- ------------
  Hashtable       248.27us    18.60kw    11.65kw    10.11kw        0.21%
  WAL         119_260.79us   809.65kw    17.89kw    16.35kw      100.00%

So, it seems we are now about x480 times slower…

While this is OK for this experiment, it’s nowhere near an acceptable value in a production system, so I will have to implement some optimizations, and see if we can make that a bit better!

In the meantime, all the code is available here.