每个worker都有不同的state,用来管理worker在不同状态下的行为,比如一个worker在sleep状态,那么他不会接取manager上的任何任务,直到它的状态变为awake。
worker使用状态模式来管理自己的状态,也就是说,它的每个状态都是一个单独的模块,在其中定义了该状态下的行为。
下面以awake状态为例,看一下在工作状态下worker如何运行。
当worker切换为awake状态时,执行下面这个函数:
func (w *Worker) gotoStateAwake(ctx context.Context) {
w.stateMutex.Lock()
w.state = api.WorkerStatusAwake
w.stateMutex.Unlock()
w.doneWg.Add(2)
w.ackStateChange(ctx, w.state)
go w.runStateAwake(ctx)
}
第一步,状态切换到api.WorkerStatusAwake,这里使用mutex保证线程安全。
第二步,异步执行两个goroutine,ackStateChange是为了通知manager自已的状态变更;runStateAwake就是awake状态下worker应该执行的任务。
runStateAwake
// runStateAwake fetches a task and executes it, in an endless loop.
func (w *Worker) runStateAwake(ctx context.Context) {
...
for {
task := w.fetchTask(ctx)
if task == nil {
return
}
// The task runner's listener will be responsible for sending results back
// to the Manager. This code only needs to fetch a task and run it.
err := w.runTask(ctx, *task)
if err != nil {
...
}
// Do some rate limiting. This is mostly useful while developing.
select {
case <-ctx.Done():
return
case <-time.After(durationTaskComplete):
}
}
}
在这个函数中,worker一直fetchTask获取manager上还未执行的任务,然后runTask执行这个任务并将结果传回manager。
fetchTask
// fetchTasks periodically tries to fetch a task from the Manager, returning it when obtained.
// Returns nil when a task could not be obtained and the period loop was cancelled.
func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask {
...
var wait time.Duration
for {
select {
case <-ctx.Done():
logger.Debug().Msg("task fetching interrupted by context cancellation")
return nil
case <-w.doneChan:
logger.Debug().Msg("task fetching interrupted by shutdown")
return nil
case <-time.After(wait):
}
logger.Debug().Msg("fetching tasks")
resp, err := w.client.ScheduleTaskWithResponse(ctx)
if err != nil {
log.Error().Err(err).Msg("error obtaining task")
wait = durationFetchFailed
continue
}
switch {
case resp.JSON200 != nil:
log.Info().
Interface("task", resp.JSON200).
Msg("obtained task")
return resp.JSON200
case ...
...
}
}
}
fetchTask也是一个循环,每隔一段时间取获取一次任务(当前间隔时间是0),通过ScheduleTaskWithResponse获取到任务之后,将resp返回。resp是一个ScheduleTaskResponse类型的数据,它的定义如下:
type ScheduleTaskResponse struct {
Body []byte
HTTPResponse *http.Response
JSON200 *AssignedTask
JSON403 *SecurityError
JSON423 *WorkerStateChange
}
所以上面返回的resp.JSON200就是AssignedTask指针。也就是这样的结构体:
// AssignedTask is a task as it is received by the Worker.
type AssignedTask struct {
Commands []Command `json:"commands"`
Job string `json:"job"`
JobPriority int `json:"job_priority"`
JobType string `json:"job_type"`
Name string `json:"name"`
Priority int `json:"priority"`
Status TaskStatus `json:"status"`
TaskType string `json:"task_type"`
Uuid string `json:"uuid"`
}
runTask
// runTask runs the given task.
func (w *Worker) runTask(ctx context.Context, task api.AssignedTask) error {
// Create a sub-context to manage the life-span of both the running of the
// task and the loop to check whether we're still allowed to run it.
taskCtx, taskCancel := context.WithCancel(ctx)
defer taskCancel()
var taskRunnerErr, abortReason error
// Run the actual task in a separate goroutine.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
defer taskCancel()
taskRunnerErr = w.taskRunner.Run(taskCtx, task)
}()
...
// Wait for the task runner to either complete or abort.
wg.Wait()
...
return taskRunnerErr
}
获取到task之后,在runTask中开一个goroutine执行这个任务,worker中的taskRunner是一个TaskExecutor对象。
TaskExecutor定义如下:
type TaskExecutor struct {
cmdRunner CommandRunner
listener TaskExecutionListener
}
其中的CommandRunner是一个接口:
type CommandRunner interface {
Run(ctx context.Context, taskID string, cmd api.Command) error
}
它是由CommandExecutor实现的,
TaskExecutionListener也是一个接口:
// TaskExecutionListener sends task lifecycle events (start/fail/complete) to the Manager.
type TaskExecutionListener interface {
// TaskStarted tells the Manager that task execution has started.
TaskStarted(ctx context.Context, taskID string) error
// TaskFailed tells the Manager the task failed for some reason.
TaskFailed(ctx context.Context, taskID string, reason string) error
// TaskCompleted tells the Manager the task has been completed.
TaskCompleted(ctx context.Context, taskID string) error
}
它是由Listener实现。
有关CommandExecutor和Listener的详细功能,在CommandExecutor章节有介绍。