使用 Raft 与 WebSocket 构建一个支持实时可观测性的分布式 MapReduce 任务调度器


Hadoop YARN 的 JobTracker 对于许多特定场景而言,是一个过于庞大且迟钝的黑箱。任务提交后,我们能做的往往只有等待,并通过一个刷新率感人的 Web UI 艰难地追踪进度。当任务失败时,诊断过程无异于一场考古,需要在成堆的节点日志中大海捞针。这种延迟和不透明性,在需要快速迭代和精细化控制的计算任务中,成了一个难以忍受的瓶颈。我们需要的是一个轻量、高可用、并且能提供毫秒级实时反馈的分布式执行框架。

最初的构想很简单:一个中心化的 Master 节点负责接收任务、拆分、调度;一群 Worker 节点负责执行并实时上报状态。但这个简单模型立刻暴露了致命缺陷:Master 是一个明显的单点故障。在生产环境中,任何形式的单点故障都是不可接受的。

为了解决 Master 的高可用问题,我们引入了 Raft 协议。通过将 Master 节点组成一个小型集群,利用 Raft 的日志复制和领导者选举机制,我们可以保证即使当前 Leader 节点宕机,集群也能在秒级内选举出新的 Leader 并无缝接管,整个任务状态机得以保全。语言上选择 Go,它的并发模型和强大的网络库非常适合构建这类系统。

通信协议上,放弃了传统的 HTTP 轮询。为了实现真正的实时反馈——包括任务心跳、进度百分比、日志流、性能指标——WebSocket 是不二之选。它提供了一个持久化的双向通信管道,Worker 可以将数据流式地推送到 Master,而 Master 也能随时向 Worker 推送控制指令,如“取消任务”。

最终的技术选型确定为:

  • 核心状态管理: 基于 HashiCorp Raft 实现的分布式一致性状态机。
  • 实时通信: Gorilla WebSocket 用于 Master 与 Worker 间的双向流式通信。
  • 任务模型: 借鉴经典的 MapReduce 范式,但实现我们自己的轻量级调度逻辑。
  • 构建与部署: Go 语言,编译成静态二进制文件,配合简单的配置文件,实现无依赖部署。

一、基于 Raft 的高可用状态机

系统的核心是 Master 节点的状态机。这个状态机存储了所有 Job 和 Task 的生命周期信息:哪个 Job 正在运行、它的 Task 分布在哪些 Worker 上、每个 Task 的状态是待处理、运行中、已完成还是已失败。这些状态必须在所有 Master 节点间强一致。

Raft 库(如 hashicorp/raft)本身不关心你存储什么数据,它只负责将你提交的“日志条目”(Command)以一致的顺序应用到所有副本的 FSM (Finite State Machine) 上。因此,我们的首要任务是定义这个 FSM。

// fsm.go

package main

import (
	"encoding/json"
	"fmt"
	"io"
	"sync"

	"github.com/hashicorp/raft"
)

// CommandType 定义了可以应用到状态机的操作类型
type CommandType int

const (
	SubmitJobCmd CommandType = iota
	UpdateTaskStatusCmd
)

// Command 是应用到FSM的日志条目结构
type Command struct {
	Type    CommandType `json:"type"`
	Payload []byte      `json:"payload"`
}

// Job 和 Task 的定义 (简化版)
type TaskStatus int
const (
    TaskPending TaskStatus = iota
    TaskRunning
    TaskCompleted
    TaskFailed
)

type Task struct {
	ID        string     `json:"id"`
	JobID     string     `json:"job_id"`
	Type      string     `json:"type"` // "map" or "reduce"
	Status    TaskStatus `json:"status"`
	WorkerID  string     `json:"worker_id"`
	InputPath string     `json:"input_path"`
	OutputPath string    `json:"output_path"`
}

type Job struct {
	ID    string           `json:"id"`
	Tasks map[string]*Task `json:"tasks"`
	Status string          `json:"status"` // "running", "completed", "failed"
}


// fsm 是我们实现的核心状态机
// 它存储了所有job的状态,并在Raft集群中保持一致
type fsm struct {
	mu   sync.RWMutex
	jobs map[string]*Job // jobID -> Job
}

func newFSM() *fsm {
	return &fsm{
		jobs: make(map[string]*Job),
	}
}

// Apply 将Raft日志应用到状态机
// 这是FSM接口的核心方法,必须是确定性的
func (f *fsm) Apply(log *raft.Log) interface{} {
	f.mu.Lock()
	defer f.mu.Unlock()

	var cmd Command
	if err := json.Unmarshal(log.Data, &cmd); err != nil {
		panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error()))
	}

	switch cmd.Type {
	case SubmitJobCmd:
		var job Job
		if err := json.Unmarshal(cmd.Payload, &job); err != nil {
			return fmt.Errorf("failed to unmarshal job payload: %w", err)
		}
		f.jobs[job.ID] = &job
		// 在真实项目中,这里会触发任务调度逻辑
		return nil
	case UpdateTaskStatusCmd:
		var taskUpdate struct {
			JobID  string     `json:"job_id"`
			TaskID string     `json:"task_id"`
			Status TaskStatus `json:"status"`
            WorkerID string    `json:"worker_id"`
		}
		if err := json.Unmarshal(cmd.Payload, &taskUpdate); err != nil {
			return fmt.Errorf("failed to unmarshal task update payload: %w", err)
		}
		if job, ok := f.jobs[taskUpdate.JobID]; ok {
			if task, ok := job.Tasks[taskUpdate.TaskID]; ok {
				task.Status = taskUpdate.Status
                task.WorkerID = taskUpdate.WorkerID
			}
		}
		// 这里可以检查job是否所有task都完成了
		return nil
	default:
		return fmt.Errorf("unrecognized command type: %d", cmd.Type)
	}
}

// Snapshot 用于创建状态机的快照,用于日志压缩
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
	f.mu.RLock()
	defer f.mu.RUnlock()

	// 序列化整个状态机
	data, err := json.Marshal(f.jobs)
	if err != nil {
		return nil, err
	}
	return &fsmSnapshot{store: data}, nil
}

// Restore 从快照中恢复状态机状态
func (f *fsm) Restore(rc io.ReadCloser) error {
	f.mu.Lock()
	defer f.mu.Unlock()

	data, err := io.ReadAll(rc)
	if err != nil {
		return err
	}
	return json.Unmarshal(data, &f.jobs)
}

type fsmSnapshot struct {
	store []byte
}

func (s *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
	if _, err := sink.Write(s.store); err != nil {
		sink.Cancel()
		return err
	}
	return sink.Close()
}

func (s *fsmSnapshot) Release() {}

这里的关键在于 Apply 方法。任何对系统状态的修改,比如提交一个新 Job 或更新一个 Task 的状态,都必须封装成一个 Command,序列化后通过 raft.Apply() 提交。Raft 保证了这个 command 会被写入其内部的 WAL (Write-Ahead Log),并被复制到集群中的大多数节点,最后才会调用每个节点的 Apply 方法。这个过程保证了即使发生主节点切换,新主节点的状态机也和旧主节点完全一致。

sequenceDiagram
    participant Client
    participant Leader
    participant Follower1
    participant Follower2

    Client->>Leader: raft.Apply(SubmitJobCmd)
    Leader->>Leader: Append command to local log
    par
        Leader->>Follower1: Replicate log entry
        Leader->>Follower2: Replicate log entry
    end
    Follower1-->>Leader: Acknowledge
    Follower2-->>Leader: Acknowledge
    Leader->>Leader: Commit log entry (majority acked)
    Leader->>Leader: Apply command to its FSM
    par
        Leader->>Follower1: Notify commit
        Leader->>Follower2: Notify commit
    end
    Follower1->>Follower1: Apply command to its FSM
    Follower2->>Follower2: Apply command to its FSM
    Leader-->>Client: Apply operation successful

二、Worker 与 Master 的实时 WebSocket 通道

一旦 Master 集群建立起来,下一步就是 Worker 节点如何与之通信。Worker 需要连接到当前的 Leader 节点,并在连接断开时能够自动发现新的 Leader。

每个 Worker 启动后,会尝试连接配置文件中指定的 Master 节点列表。它会轮询 /status 这样的 HTTP 端点,直到找到返回 Leader 地址的节点。一旦确定 Leader,Worker 会建立一个到该 Leader 的 WebSocket 连接。

// worker/client.go
package main

import (
	"log"
	"net/url"
	"time"

	"github.com/gorilla/websocket"
)

// Message 定义了worker和master之间通信的结构
type Message struct {
	Type    string      `json:"type"` // "heartbeat", "log", "status_update", "task_result"
	Payload interface{} `json:"payload"`
    TraceID string      `json:"trace_id"`
    SpanID  string      `json:"span_id"`
}

// connectToMaster 循环尝试连接,直到成功
func connectToMaster(addr string) (*websocket.Conn, error) {
	u := url.URL{Scheme: "ws", Host: addr, Path: "/ws"}
	log.Printf("connecting to %s", u.String())

	var conn *websocket.Conn
	var err error

	// 带有退避策略的重连逻辑
	for i := 0; i < 5; i++ {
		conn, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
		if err == nil {
			log.Println("successfully connected to master")
			return conn, nil
		}
		log.Printf("failed to connect to master: %v. retrying in %d seconds...", err, 2*(i+1))
		time.Sleep(time.Duration(2*(i+1)) * time.Second)
	}
	return nil, err
}

func main() {
	// 实际应用中,地址应从配置中读取
	masterAddr := "localhost:8080" 
	
    conn, err := connectToMaster(masterAddr)
	if err != nil {
		log.Fatalf("could not connect to master after several retries: %v", err)
	}
	defer conn.Close()
	
	// 启动一个goroutine来处理从Master接收的消息
	go readLoop(conn)

	// 主goroutine负责发送心跳和任务状态
	ticker := time.NewTicker(10 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			// 发送心跳
			heartbeat := Message{Type: "heartbeat", Payload: "I am alive"}
			if err := conn.WriteJSON(heartbeat); err != nil {
				log.Println("write heartbeat error:", err)
				// 连接可能已断开,触发重连
				return 
			}
		// ... 其他逻辑,比如发送任务进度
		}
	}
}

// readLoop 负责读取和处理来自master的指令
func readLoop(conn *websocket.Conn) {
	for {
		var msg Message
		err := conn.ReadJSON(&msg)
		if err != nil {
			log.Println("read error:", err)
			// 连接断开,需要外部逻辑来处理重连
			return
		}

		switch msg.Type {
		case "assign_task":
			// 解析任务并开始执行
			log.Printf("received task assignment: %+v", msg.Payload)
			// go executeTask(msg.Payload)
		case "cancel_task":
			// 取消正在执行的任务
			log.Printf("received cancel command for task: %+v", msg.Payload)
		}
	}
}

在 Master 端,需要一个 WebSocket 管理器来处理所有 Worker 的连接。

// master/websocket_handler.go
package main

import (
    "log"
    "net/http"
    "sync"
    
    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true // 在生产中应有更严格的检查
    },
}

type Hub struct {
    mu      sync.RWMutex
    workers map[string]*websocket.Conn // workerID -> conn
}

func newHub() *Hub {
    return &Hub{
        workers: make(map[string]*websocket.Conn),
    }
}

func (h *Hub) handleWS(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println("upgrade error:", err)
        return
    }
    defer conn.Close()
    
    // 注册worker, workerID应该在握手时提供
    workerID := r.Header.Get("X-Worker-ID")
    if workerID == "" {
        log.Println("worker ID not provided")
        return
    }
    
    h.mu.Lock()
    h.workers[workerID] = conn
    h.mu.Unlock()

    log.Printf("worker %s connected", workerID)

    defer func() {
        h.mu.Lock()
        delete(h.workers, workerID)
        h.mu.Unlock()
        log.Printf("worker %s disconnected", workerID)
    }()

    for {
        // 读取来自worker的消息 (心跳,日志,状态更新)
        // 并通过 raft.Apply() 更新 FSM
        _, message, err := conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Printf("unexpected close error: %v", err)
			}
			break
		}
        log.Printf("received message from %s: %s", workerID, message)
        // 在这里解析消息,并将其转换为FSM的Command
    }
}

三、任务调度与可观测性

一个 Job 被提交后,Master Leader 节点负责将其分解为 Map 和 Reduce 任务。调度逻辑的核心在于将这些 Task 分配给当前空闲的 Worker。

graph TD
    subgraph Job Lifecycle
        A[Client Submits Job] --> B{Master Leader};
        B --> C{Parse Job & Create Tasks};
        C --> D{Store Job in Raft FSM};
        D --> E{Find Idle Workers};
        E --> F[Assign Map Tasks via WebSocket];
    end

    subgraph Task Execution
        G[Worker Receives Task] --> H{Executes Map Function};
        H --> I[Streams Logs & Progress via WebSocket];
        I --> J{Writes Intermediate Output};
        J --> K[Report Task Completion];
    end

    subgraph Shuffle & Reduce
        L{All Mappers Done} --> M{Master Schedules Reducers};
        M --> N[Assign Reduce Tasks];
        O[Reducer Fetches Intermediate Data] --> P{Executes Reduce Function};
        P --> Q[Writes Final Output];
        Q --> R{Report Job Completion};
    end

    B -- Raft Log Replication --> S((Follower Nodes));
    K -- Update FSM --> D;
    R -- Update FSM --> D;

为了实现真正的“可观测性”,而不仅仅是“监控”,我们不能满足于简单的日志和指标。我们需要将一个任务的完整生命周期串联起来。这里引入了分布式追踪的概念。

当一个 Job 被提交时,Master 会为它创建一个全局唯一的 TraceID

  1. Job Span: Master 为整个 Job 创建一个 Root Span。
  2. Task Spans: 每当一个 Map 或 Reduce 任务被调度时,Master 会为它创建一个 Child Span,其 Parent 是 Job Span。
  3. Context Propagation: TraceIDSpanID 会作为元数据,通过 WebSocket 的 assign_task 消息传递给 Worker。
  4. Worker-side Tracing: Worker 在执行任务和上报日志、状态时,会带上这些 ID。
// WebSocket message from worker to master
{
  "type": "log_entry",
  "trace_id": "abc-123-xyz-789",
  "span_id": "span-map-task-001",
  "payload": {
    "timestamp": "2023-10-27T10:45:15Z",
    "level": "INFO",
    "message": "Processing line 1000 of input split..."
  }
}

这种结构化的信息流彻底改变了调试方式。当一个 Job 变慢时,我们不再需要 grep 几百个日志文件。我们可以直接在 Jaeger 或 Zipkin 这样的追踪系统中,按 TraceID 查询,看到整个 Job 的甘特图:

  • 哪个 Map 任务启动最慢?
  • 哪个 Reduce 任务是“长尾任务”(Straggler)拖慢了整个 Job?
  • 某个任务失败前,它的日志流是怎样的?

所有这些信息都上下文关联,一目了然。

四、构建与部署工具

构建与工具 在这个项目中体现在简化和标准化上。我们使用一个简单的 Makefile 来管理构建过程。

# Makefile
.PHONY: all build clean master worker

GO = go
GO_FLAGS = -v
LDFLAGS = -ldflags="-s -w"

MASTER_BIN = ./bin/master
WORKER_BIN = ./bin/worker

all: build

build: master worker

master:
	@echo "Building master binary..."
	@$(GO) build $(GO_FLAGS) $(LDFLAGS) -o $(MASTER_BIN) ./master/

worker:
	@echo "Building worker binary..."
	@$(GO) build $(GO_FLAGS) $(LDFLAGS) -o $(WORKER_BIN) ./worker/

clean:
	@echo "Cleaning up..."
	@rm -f $(MASTER_BIN) $(WORKER_BIN)

配置文件使用 TOML 格式,清晰易读。

# config/master.toml
node_id = "master-1"
listen_addr = "0.0.0.0:8080" # for http/ws
raft_addr = "127.0.0.1:9001"
raft_dir = "/var/lib/dist-mr/master-1"

# 对于集群中的第一个节点,bootstrap为true
bootstrap = true 

# 其他节点的配置
# [[peers]]
#   node_id = "master-2"
#   address = "127.0.0.1:9002"
# [[peers]]
#   node_id = "master-3"
#   address = "127.0.0.1:9003"
# config/worker.toml
worker_id = "worker-01"
# worker会轮询这个列表来发现leader
master_addresses = ["localhost:8080", "localhost:8081", "localhost:8082"] 

部署时,只需将编译好的 master 二进制文件和配置文件分发到几台机器上,启动即可形成 Raft 集群。同样,worker 二进制文件可以分发到任意数量的计算节点上。这种无外部依赖的部署方式,在弹性伸缩的环境中尤其具有优势。

局限性与未来迭代方向

这个系统虽然解决了核心的高可用和实时可观测性问题,但它远非一个完备的生产级系统。

首先,Shuffle 阶段的实现非常初级。目前的设计可能依赖一个共享网络文件系统(如 NFS)来存储 Map 任务的中间输出,这会成为性能瓶颈和另一个潜在的故障点。一个更健壮的实现需要 Worker 之间能直接点对点地拉取数据,或者通过一个专门的 Shuffle Service。

其次,资源管理是完全缺席的。Worker 节点无差别地执行任务,没有考虑节点的负载、内存、CPU 使用情况。集成资源隔离(例如通过 cgroups 或容器运行时)和更智能的、考虑数据本地性的调度算法,是通往生产环境的必经之路。

最后,任务的序列化和执行方式也较为简单。当前可能只支持执行编译进 Worker 二进制文件中的特定函数。一个可扩展的系统需要支持动态加载代码,例如通过 WebAssembly (WASM) 或者插件化的方式运行用户自定义的 Map 和 Reduce 函数,这将极大地提升系统的灵活性和适用范围。


  目录