DCL Store - Distributed Commit Log

April 12, 2026

Overview

DCL Store is a distributed append-only commit log service built in Go. It provides strongly-consistent replicated storage for applications where write ordering and durability matter — the kind of infrastructure that backs systems like Apache Kafka or NATS JetStream.

The system combines several distributed systems primitives: HashiCorp Raft for consensus and log replication, Serf for gossip-based cluster membership discovery, gRPC with Protocol Buffers for the service layer, memory-mapped file indices for fast record lookups, and Casbin for role-based access control. All inter-node and client-server communication is encrypted with mutual TLS.

Explore the source code on GitHub.

Architecture

Agent Orchestration

The agent is the top-level coordinator that bootstraps every subsystem in a specific dependency order. Each setup phase returns an error, and the entire startup aborts on any failure.

func New(config Config) (*Agent, error) {
	a := &Agent{
		Config:    config,
		shutdowns: make(chan struct{}),
	}
	setup := []func() error{
		a.setupLogger,
		a.setupMux,
		a.setupLog,
		a.setupServer,
		a.setupMembership,
	}

	for _, fn := range setup {
		if err := fn(); err != nil {
			return nil, err
		}
	}

	go a.serve()
	return a, nil
}

The function table pattern keeps the bootstrap sequence readable and makes it straightforward to add new subsystems. Shutdown mirrors this in reverse — membership leaves first, then the gRPC server drains, then the log flushes and closes. Getting this order wrong means a node could accept writes after leaving the Raft group or drop in-flight RPCs.

Segmented Log Storage

The core log is split into segments, each backed by a store file and an index file. When a segment fills up, a new one is created. Each append writes to the active segment, and the log checks for rotation after every write.

func (s *segment) Append(record *api.Record) (offset uint64, err error) {
	cur := s.nextOffset
	record.Offset = cur

	p, err := proto.Marshal(record)
	if err != nil {
		return 0, err
	}

	_, pos, err := s.store.Append(p)
	if err != nil {
		return 0, err
	}

	if err = s.index.Write(
		uint32(s.nextOffset-uint64(s.baseOffset)),
		pos,
	); err != nil {
		return 0, err
	}

	s.nextOffset++
	return cur, nil
}

The segment coordinates three operations atomically: serialize the protobuf record, append the bytes to the store (getting back the byte position), then write an index entry mapping the relative offset to that store position. Index offsets are relative to the segment's base offset, which keeps index entries small (4 bytes instead of 8) and enables efficient segment-level operations.

Length-Prefixed Store

The store is the actual file where record bytes live. Each record is written with an 8-byte length prefix so the reader knows exactly how many bytes to allocate before reading the payload.

func (s *store) Append(p []byte) (n uint64, pos uint64, err error) {
	s.mu.Lock()
	defer s.mu.Unlock()
	pos = s.size

	if err := binary.Write(s.buf, enc, uint64(len(p))); err != nil {
		return 0, 0, err
	}

	w, err := s.buf.Write(p)
	if err != nil {
		return 0, 0, err
	}

	w += lenWidth
	s.size += uint64(w)
	return uint64(w), pos, nil
}

The length prefix is critical — without it, a reader would need to scan byte-by-byte to find record boundaries. Writing through a bufio.Writer batches small writes into larger I/O operations, which matters when the log is under heavy append load. The returned position is the byte offset where this record starts, which the segment's index stores for O(1) lookups.

Memory-Mapped Index

Each segment has a sparse index backed by mmap. The index maps record offsets to byte positions in the store file. Each entry is exactly 12 bytes: 4 bytes for the offset and 8 bytes for the store position.

func (i *index) Read(in int64) (out uint32, pos uint64, err error) {
	if i.size == 0 {
		return 0, 0, io.EOF
	}

	if in == -1 {
		out = uint32((i.size / entWidth) - 1)
	} else {
		out = uint32(in)
	}

	pos = uint64(out) * entWidth
	if i.size < pos+entWidth {
		return 0, 0, io.EOF
	}

	out = enc.Uint32(i.mmap[pos : pos+offWidth])
	pos = enc.Uint64(i.mmap[pos+offWidth : pos+entWidth])
	return out, pos, nil
}

Memory mapping lets the OS page index data in and out as needed — frequently accessed entries stay hot in the page cache while old segments can be evicted. The special case in == -1 returns the last entry, which is used during segment recovery to determine where to resume appending after a restart.

Raft Consensus

Writes go through Raft consensus before being applied to the local log. The finite state machine receives committed entries and applies them deterministically to each node's local log.

func (l *fsm) Apply(record *raft.Log) interface{} {
	buf := record.Data
	reqType := RequestType(buf[0])
	reqMsg := buf[1:]
	switch reqType {
	case AppendRequestType:
		return l.applyAppend(reqMsg)
	}
	return nil
}

func (l *fsm) applyAppend(b []byte) interface{} {
	var req api.AppendRequest
	err := proto.Unmarshal(b, &req)
	if err != nil {
		return err
	}
	offset, err := l.log.Append(req.Record)
	if err != nil {
		return err
	}
	return &api.AppendResponse{Offset: offset}
}

The first byte of each Raft log entry encodes the request type, and the rest is a protobuf payload. This type-byte prefix pattern makes it cheap to route entries to the right handler without unmarshaling the full message first. The write path flows: client, gRPC Append, DistributedLog.apply(), raft.Apply(), FSM, local log. Every node applies the same sequence, so they all converge to identical state.

Cluster Discovery

Nodes find each other through Serf, a gossip protocol. When a new node joins the Serf cluster, an event fires and the handler adds it to the Raft group.

func (m *Membership) eventHandler() {
	for e := range m.events {
		switch e.EventType() {
		case serf.EventMemberJoin:
			for _, member := range e.(serf.MemberEvent).Members {
				if m.isLocal(member) {
					continue
				}
				m.handleJoin(member)
			}
		case serf.EventMemberLeave, serf.EventMemberFailed:
			for _, member := range e.(serf.MemberEvent).Members {
				if m.isLocal(member) {
					return
				}
				m.handleLeave(member)
			}
		}
	}
}

This decouples cluster discovery from consensus. Serf handles the "who is in the cluster" question with lightweight gossip, and Raft handles "who can vote on writes." When handleJoin fires, it calls DistributedLog.Join() which issues raft.AddVoter(). When a node fails or leaves, handleLeave calls raft.RemoveServer(). This gives the cluster self-healing behavior — no manual intervention needed when nodes come and go.

Leader-Aware Load Balancing

The custom gRPC load balancer routes requests based on whether they're writes or reads. The picker inspects the RPC method name to decide where to send each request.

func (p *Picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
	p.mu.RLock()
	defer p.mu.RUnlock()

	var result balancer.PickResult

	if strings.Contains(info.FullMethodName, "Append") ||
		len(p.followers) == 0 {
		result.SubConn = p.leader
	} else if strings.Contains(info.FullMethodName, "Read") {
		result.SubConn = p.nextFollower()
	}

	if result.SubConn == nil {
		return result, balancer.ErrNoSubConnAvailable
	}

	return result, nil
}

Append operations always go to the leader because writes must go through Raft consensus. Reads are round-robined across followers, which scales read throughput horizontally. If there are no followers (single-node cluster or all followers are down), reads fall through to the leader. The resolver that feeds this picker discovers servers by calling the GetServers RPC, which returns the current Raft configuration with leader/follower status.

Difficult Parts

Snapshot Restore and State Recovery

When a new node joins or a node falls too far behind, it needs to recover state from a snapshot rather than replaying the entire log. The restore process reads length-prefixed records from the snapshot and replays them into a fresh local log.

func (f *fsm) Restore(r io.ReadCloser) error {
	b := make([]byte, lenWidth)
	var buf bytes.Buffer

	for i := 0; ; i++ {
		_, err := io.ReadFull(r, b)
		if err == io.EOF {
			break
		} else if err != nil {
			return err
		}

		size := int64(enc.Uint64(b))
		if _, err = io.CopyN(&buf, r, size); err != nil {
			return err
		}

		record := &api.Record{}
		if err = proto.Unmarshal(buf.Bytes(), record); err != nil {
			return err
		}

		if i == 0 {
			f.log.Config.Segment.InitialOffset = record.Offset
			if err := f.log.Reset(); err != nil {
				return err
			}
		}
		if _, err = f.log.Append(record); err != nil {
			return err
		}
		buf.Reset()
	}
	return nil
}

The critical detail is in the i == 0 check. The first record in the snapshot determines the initial offset for the restored log — without this, the log would start at offset 0, causing offset collisions with new writes. The log is fully reset before replay to prevent state contamination from a partial previous state.

Memory-Mapped Index Lifecycle

The index has a tricky lifecycle with mmap. On creation, the file is truncated up to MaxIndexBytes to reserve virtual address space for the memory map. On close, the file is truncated back down to the actual data size. This means a crash between creation and close leaves an oversized file on disk. The recovery path handles this by reading the actual size from the OS stat and only treating that portion as valid data, even though the file is physically larger.

The shutdown sequence is also order-sensitive: flush the mmap, sync the file, truncate, then close. Skipping the mmap sync before truncate risks data loss because the OS may not have written dirty pages back to disk yet.

Connection Multiplexing

The agent runs both Raft RPC and gRPC on a single TCP port using cmux. The multiplexer reads the first byte of each incoming connection to classify it — Raft peers write a 0x01 byte as a protocol identifier, and everything else is routed to gRPC.

func (a *Agent) setupLog() error {
	raftLn := a.mux.Match(func(reader io.Reader) bool {
		b := make([]byte, 1)
		if _, err := reader.Read(b); err != nil {
			return false
		}
		return bytes.Equal(b, []byte{byte(log.RaftRPC)})
	})

	logConfig := log.Config{}
	logConfig.Raft.StreamLayer = log.NewStreamLayer(
		raftLn,
		a.Config.ServerTLSConfig,
		a.Config.PeerTLSConfig,
	)
	logConfig.Raft.LocalID = raft.ServerID(a.Config.NodeName)
	logConfig.Raft.Bootstrap = a.Config.Bootstrap

	var err error
	a.log, err = log.NewDistributedLog(a.Config.DataDir, logConfig)
	if err != nil {
		return err
	}

	if a.Config.Bootstrap {
		err = a.log.WaitForLeader(3 * time.Second)
	}
	return err
}

The TLS handshake happens after protocol detection, not before. This matters because the Raft stream layer wraps the classified connection in TLS on the Dial side and the Accept side separately. If TLS happened first, the encrypted bytes would be unrecognizable to the multiplexer's byte-sniffing logic.

Explore the source code on GitHub.