Hadoop YARN 的 JobTracker 对于许多特定场景而言,是一个过于庞大且迟钝的黑箱。任务提交后,我们能做的往往只有等待,并通过一个刷新率感人的 Web UI 艰难地追踪进度。当任务失败时,诊断过程无异于一场考古,需要在成堆的节点日志中大海捞针。这种延迟和不透明性,在需要快速迭代和精细化控制的计算任务中,成了一个难以忍受的瓶颈。我们需要的是一个轻量、高可用、并且能提供毫秒级实时反馈的分布式执行框架。
最初的构想很简单:一个中心化的 Master 节点负责接收任务、拆分、调度;一群 Worker 节点负责执行并实时上报状态。但这个简单模型立刻暴露了致命缺陷:Master 是一个明显的单点故障。在生产环境中,任何形式的单点故障都是不可接受的。
为了解决 Master 的高可用问题,我们引入了 Raft 协议。通过将 Master 节点组成一个小型集群,利用 Raft 的日志复制和领导者选举机制,我们可以保证即使当前 Leader 节点宕机,集群也能在秒级内选举出新的 Leader 并无缝接管,整个任务状态机得以保全。语言上选择 Go,它的并发模型和强大的网络库非常适合构建这类系统。
通信协议上,放弃了传统的 HTTP 轮询。为了实现真正的实时反馈——包括任务心跳、进度百分比、日志流、性能指标——WebSocket 是不二之选。它提供了一个持久化的双向通信管道,Worker 可以将数据流式地推送到 Master,而 Master 也能随时向 Worker 推送控制指令,如“取消任务”。
最终的技术选型确定为:
- 核心状态管理: 基于 HashiCorp Raft 实现的分布式一致性状态机。
- 实时通信: Gorilla WebSocket 用于 Master 与 Worker 间的双向流式通信。
- 任务模型: 借鉴经典的 MapReduce 范式,但实现我们自己的轻量级调度逻辑。
- 构建与部署: Go 语言,编译成静态二进制文件,配合简单的配置文件,实现无依赖部署。
一、基于 Raft 的高可用状态机
系统的核心是 Master 节点的状态机。这个状态机存储了所有 Job 和 Task 的生命周期信息:哪个 Job 正在运行、它的 Task 分布在哪些 Worker 上、每个 Task 的状态是待处理、运行中、已完成还是已失败。这些状态必须在所有 Master 节点间强一致。
Raft 库(如 hashicorp/raft)本身不关心你存储什么数据,它只负责将你提交的“日志条目”(Command)以一致的顺序应用到所有副本的 FSM (Finite State Machine) 上。因此,我们的首要任务是定义这个 FSM。
// fsm.go
package main
import (
"encoding/json"
"fmt"
"io"
"sync"
"github.com/hashicorp/raft"
)
// CommandType 定义了可以应用到状态机的操作类型
type CommandType int
const (
SubmitJobCmd CommandType = iota
UpdateTaskStatusCmd
)
// Command 是应用到FSM的日志条目结构
type Command struct {
Type CommandType `json:"type"`
Payload []byte `json:"payload"`
}
// Job 和 Task 的定义 (简化版)
type TaskStatus int
const (
TaskPending TaskStatus = iota
TaskRunning
TaskCompleted
TaskFailed
)
type Task struct {
ID string `json:"id"`
JobID string `json:"job_id"`
Type string `json:"type"` // "map" or "reduce"
Status TaskStatus `json:"status"`
WorkerID string `json:"worker_id"`
InputPath string `json:"input_path"`
OutputPath string `json:"output_path"`
}
type Job struct {
ID string `json:"id"`
Tasks map[string]*Task `json:"tasks"`
Status string `json:"status"` // "running", "completed", "failed"
}
// fsm 是我们实现的核心状态机
// 它存储了所有job的状态,并在Raft集群中保持一致
type fsm struct {
mu sync.RWMutex
jobs map[string]*Job // jobID -> Job
}
func newFSM() *fsm {
return &fsm{
jobs: make(map[string]*Job),
}
}
// Apply 将Raft日志应用到状态机
// 这是FSM接口的核心方法,必须是确定性的
func (f *fsm) Apply(log *raft.Log) interface{} {
f.mu.Lock()
defer f.mu.Unlock()
var cmd Command
if err := json.Unmarshal(log.Data, &cmd); err != nil {
panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error()))
}
switch cmd.Type {
case SubmitJobCmd:
var job Job
if err := json.Unmarshal(cmd.Payload, &job); err != nil {
return fmt.Errorf("failed to unmarshal job payload: %w", err)
}
f.jobs[job.ID] = &job
// 在真实项目中,这里会触发任务调度逻辑
return nil
case UpdateTaskStatusCmd:
var taskUpdate struct {
JobID string `json:"job_id"`
TaskID string `json:"task_id"`
Status TaskStatus `json:"status"`
WorkerID string `json:"worker_id"`
}
if err := json.Unmarshal(cmd.Payload, &taskUpdate); err != nil {
return fmt.Errorf("failed to unmarshal task update payload: %w", err)
}
if job, ok := f.jobs[taskUpdate.JobID]; ok {
if task, ok := job.Tasks[taskUpdate.TaskID]; ok {
task.Status = taskUpdate.Status
task.WorkerID = taskUpdate.WorkerID
}
}
// 这里可以检查job是否所有task都完成了
return nil
default:
return fmt.Errorf("unrecognized command type: %d", cmd.Type)
}
}
// Snapshot 用于创建状态机的快照,用于日志压缩
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
f.mu.RLock()
defer f.mu.RUnlock()
// 序列化整个状态机
data, err := json.Marshal(f.jobs)
if err != nil {
return nil, err
}
return &fsmSnapshot{store: data}, nil
}
// Restore 从快照中恢复状态机状态
func (f *fsm) Restore(rc io.ReadCloser) error {
f.mu.Lock()
defer f.mu.Unlock()
data, err := io.ReadAll(rc)
if err != nil {
return err
}
return json.Unmarshal(data, &f.jobs)
}
type fsmSnapshot struct {
store []byte
}
func (s *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
if _, err := sink.Write(s.store); err != nil {
sink.Cancel()
return err
}
return sink.Close()
}
func (s *fsmSnapshot) Release() {}
这里的关键在于 Apply 方法。任何对系统状态的修改,比如提交一个新 Job 或更新一个 Task 的状态,都必须封装成一个 Command,序列化后通过 raft.Apply() 提交。Raft 保证了这个 command 会被写入其内部的 WAL (Write-Ahead Log),并被复制到集群中的大多数节点,最后才会调用每个节点的 Apply 方法。这个过程保证了即使发生主节点切换,新主节点的状态机也和旧主节点完全一致。
sequenceDiagram
participant Client
participant Leader
participant Follower1
participant Follower2
Client->>Leader: raft.Apply(SubmitJobCmd)
Leader->>Leader: Append command to local log
par
Leader->>Follower1: Replicate log entry
Leader->>Follower2: Replicate log entry
end
Follower1-->>Leader: Acknowledge
Follower2-->>Leader: Acknowledge
Leader->>Leader: Commit log entry (majority acked)
Leader->>Leader: Apply command to its FSM
par
Leader->>Follower1: Notify commit
Leader->>Follower2: Notify commit
end
Follower1->>Follower1: Apply command to its FSM
Follower2->>Follower2: Apply command to its FSM
Leader-->>Client: Apply operation successful
二、Worker 与 Master 的实时 WebSocket 通道
一旦 Master 集群建立起来,下一步就是 Worker 节点如何与之通信。Worker 需要连接到当前的 Leader 节点,并在连接断开时能够自动发现新的 Leader。
每个 Worker 启动后,会尝试连接配置文件中指定的 Master 节点列表。它会轮询 /status 这样的 HTTP 端点,直到找到返回 Leader 地址的节点。一旦确定 Leader,Worker 会建立一个到该 Leader 的 WebSocket 连接。
// worker/client.go
package main
import (
"log"
"net/url"
"time"
"github.com/gorilla/websocket"
)
// Message 定义了worker和master之间通信的结构
type Message struct {
Type string `json:"type"` // "heartbeat", "log", "status_update", "task_result"
Payload interface{} `json:"payload"`
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
}
// connectToMaster 循环尝试连接,直到成功
func connectToMaster(addr string) (*websocket.Conn, error) {
u := url.URL{Scheme: "ws", Host: addr, Path: "/ws"}
log.Printf("connecting to %s", u.String())
var conn *websocket.Conn
var err error
// 带有退避策略的重连逻辑
for i := 0; i < 5; i++ {
conn, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
if err == nil {
log.Println("successfully connected to master")
return conn, nil
}
log.Printf("failed to connect to master: %v. retrying in %d seconds...", err, 2*(i+1))
time.Sleep(time.Duration(2*(i+1)) * time.Second)
}
return nil, err
}
func main() {
// 实际应用中,地址应从配置中读取
masterAddr := "localhost:8080"
conn, err := connectToMaster(masterAddr)
if err != nil {
log.Fatalf("could not connect to master after several retries: %v", err)
}
defer conn.Close()
// 启动一个goroutine来处理从Master接收的消息
go readLoop(conn)
// 主goroutine负责发送心跳和任务状态
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 发送心跳
heartbeat := Message{Type: "heartbeat", Payload: "I am alive"}
if err := conn.WriteJSON(heartbeat); err != nil {
log.Println("write heartbeat error:", err)
// 连接可能已断开,触发重连
return
}
// ... 其他逻辑,比如发送任务进度
}
}
}
// readLoop 负责读取和处理来自master的指令
func readLoop(conn *websocket.Conn) {
for {
var msg Message
err := conn.ReadJSON(&msg)
if err != nil {
log.Println("read error:", err)
// 连接断开,需要外部逻辑来处理重连
return
}
switch msg.Type {
case "assign_task":
// 解析任务并开始执行
log.Printf("received task assignment: %+v", msg.Payload)
// go executeTask(msg.Payload)
case "cancel_task":
// 取消正在执行的任务
log.Printf("received cancel command for task: %+v", msg.Payload)
}
}
}
在 Master 端,需要一个 WebSocket 管理器来处理所有 Worker 的连接。
// master/websocket_handler.go
package main
import (
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // 在生产中应有更严格的检查
},
}
type Hub struct {
mu sync.RWMutex
workers map[string]*websocket.Conn // workerID -> conn
}
func newHub() *Hub {
return &Hub{
workers: make(map[string]*websocket.Conn),
}
}
func (h *Hub) handleWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("upgrade error:", err)
return
}
defer conn.Close()
// 注册worker, workerID应该在握手时提供
workerID := r.Header.Get("X-Worker-ID")
if workerID == "" {
log.Println("worker ID not provided")
return
}
h.mu.Lock()
h.workers[workerID] = conn
h.mu.Unlock()
log.Printf("worker %s connected", workerID)
defer func() {
h.mu.Lock()
delete(h.workers, workerID)
h.mu.Unlock()
log.Printf("worker %s disconnected", workerID)
}()
for {
// 读取来自worker的消息 (心跳,日志,状态更新)
// 并通过 raft.Apply() 更新 FSM
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("unexpected close error: %v", err)
}
break
}
log.Printf("received message from %s: %s", workerID, message)
// 在这里解析消息,并将其转换为FSM的Command
}
}
三、任务调度与可观测性
一个 Job 被提交后,Master Leader 节点负责将其分解为 Map 和 Reduce 任务。调度逻辑的核心在于将这些 Task 分配给当前空闲的 Worker。
graph TD
subgraph Job Lifecycle
A[Client Submits Job] --> B{Master Leader};
B --> C{Parse Job & Create Tasks};
C --> D{Store Job in Raft FSM};
D --> E{Find Idle Workers};
E --> F[Assign Map Tasks via WebSocket];
end
subgraph Task Execution
G[Worker Receives Task] --> H{Executes Map Function};
H --> I[Streams Logs & Progress via WebSocket];
I --> J{Writes Intermediate Output};
J --> K[Report Task Completion];
end
subgraph Shuffle & Reduce
L{All Mappers Done} --> M{Master Schedules Reducers};
M --> N[Assign Reduce Tasks];
O[Reducer Fetches Intermediate Data] --> P{Executes Reduce Function};
P --> Q[Writes Final Output];
Q --> R{Report Job Completion};
end
B -- Raft Log Replication --> S((Follower Nodes));
K -- Update FSM --> D;
R -- Update FSM --> D;
为了实现真正的“可观测性”,而不仅仅是“监控”,我们不能满足于简单的日志和指标。我们需要将一个任务的完整生命周期串联起来。这里引入了分布式追踪的概念。
当一个 Job 被提交时,Master 会为它创建一个全局唯一的 TraceID。
- Job Span: Master 为整个 Job 创建一个 Root Span。
- Task Spans: 每当一个 Map 或 Reduce 任务被调度时,Master 会为它创建一个 Child Span,其 Parent 是 Job Span。
- Context Propagation:
TraceID和SpanID会作为元数据,通过 WebSocket 的assign_task消息传递给 Worker。 - Worker-side Tracing: Worker 在执行任务和上报日志、状态时,会带上这些 ID。
// WebSocket message from worker to master
{
"type": "log_entry",
"trace_id": "abc-123-xyz-789",
"span_id": "span-map-task-001",
"payload": {
"timestamp": "2023-10-27T10:45:15Z",
"level": "INFO",
"message": "Processing line 1000 of input split..."
}
}
这种结构化的信息流彻底改变了调试方式。当一个 Job 变慢时,我们不再需要 grep 几百个日志文件。我们可以直接在 Jaeger 或 Zipkin 这样的追踪系统中,按 TraceID 查询,看到整个 Job 的甘特图:
- 哪个 Map 任务启动最慢?
- 哪个 Reduce 任务是“长尾任务”(Straggler)拖慢了整个 Job?
- 某个任务失败前,它的日志流是怎样的?
所有这些信息都上下文关联,一目了然。
四、构建与部署工具
构建与工具 在这个项目中体现在简化和标准化上。我们使用一个简单的 Makefile 来管理构建过程。
# Makefile
.PHONY: all build clean master worker
GO = go
GO_FLAGS = -v
LDFLAGS = -ldflags="-s -w"
MASTER_BIN = ./bin/master
WORKER_BIN = ./bin/worker
all: build
build: master worker
master:
@echo "Building master binary..."
@$(GO) build $(GO_FLAGS) $(LDFLAGS) -o $(MASTER_BIN) ./master/
worker:
@echo "Building worker binary..."
@$(GO) build $(GO_FLAGS) $(LDFLAGS) -o $(WORKER_BIN) ./worker/
clean:
@echo "Cleaning up..."
@rm -f $(MASTER_BIN) $(WORKER_BIN)
配置文件使用 TOML 格式,清晰易读。
# config/master.toml
node_id = "master-1"
listen_addr = "0.0.0.0:8080" # for http/ws
raft_addr = "127.0.0.1:9001"
raft_dir = "/var/lib/dist-mr/master-1"
# 对于集群中的第一个节点,bootstrap为true
bootstrap = true
# 其他节点的配置
# [[peers]]
# node_id = "master-2"
# address = "127.0.0.1:9002"
# [[peers]]
# node_id = "master-3"
# address = "127.0.0.1:9003"
# config/worker.toml
worker_id = "worker-01"
# worker会轮询这个列表来发现leader
master_addresses = ["localhost:8080", "localhost:8081", "localhost:8082"]
部署时,只需将编译好的 master 二进制文件和配置文件分发到几台机器上,启动即可形成 Raft 集群。同样,worker 二进制文件可以分发到任意数量的计算节点上。这种无外部依赖的部署方式,在弹性伸缩的环境中尤其具有优势。
局限性与未来迭代方向
这个系统虽然解决了核心的高可用和实时可观测性问题,但它远非一个完备的生产级系统。
首先,Shuffle 阶段的实现非常初级。目前的设计可能依赖一个共享网络文件系统(如 NFS)来存储 Map 任务的中间输出,这会成为性能瓶颈和另一个潜在的故障点。一个更健壮的实现需要 Worker 之间能直接点对点地拉取数据,或者通过一个专门的 Shuffle Service。
其次,资源管理是完全缺席的。Worker 节点无差别地执行任务,没有考虑节点的负载、内存、CPU 使用情况。集成资源隔离(例如通过 cgroups 或容器运行时)和更智能的、考虑数据本地性的调度算法,是通往生产环境的必经之路。
最后,任务的序列化和执行方式也较为简单。当前可能只支持执行编译进 Worker 二进制文件中的特定函数。一个可扩展的系统需要支持动态加载代码,例如通过 WebAssembly (WASM) 或者插件化的方式运行用户自定义的 Map 和 Reduce 函数,这将极大地提升系统的灵活性和适用范围。