顾名思义,这个类用于执行worker上需要执行的各种命令,是worker中最核心的功能。先看它的定义:

type CommandExecutor struct {
	cli         CommandLineRunner
	listener    CommandListener
	timeService TimeService
 
	// registry maps a command name to a function that runs that command.
	registry map[string]commandCallable
}

CommandExecutor中有四个变量:
cli:用于在本计算机上执行cli命令。
listener:用于将命令执行的结果和日志上传至manager,以便于查看任务的执行情况。
timeService:时间服务。
registry:保存每种任务和它们需要执行的函数的映射,比如blender-render类型的任务就需要执行计算机上的blender程序。commandCallable的定义为:

type commandCallable func(ctx context.Context, logger zerolog.Logger, taskID string, cmd api.Command) error

初始化:

func NewCommandExecutor(cli CommandLineRunner, listener CommandListener, timeService TimeService) *CommandExecutor {
	ce := &CommandExecutor{
		cli:         cli,
		listener:    listener,
		timeService: timeService,
	}
	// Registry of supported commands. Having this as a map (instead of a big
	// switch statement) makes it possible to do things like reporting the list of
	// supported commands.
	ce.registry = map[string]commandCallable{
		// misc
		"echo":  ce.cmdEcho,
		"sleep": ce.cmdSleep,
		"exec":  ce.cmdExec,
		// blender
		"blender-render": ce.cmdBlenderRender,
		// ffmpeg
		"frames-to-video": ce.cmdFramesToVideo,
		// file-management
		"move-directory": ce.cmdMoveDirectory,
		"copy-file":      ce.cmdCopyFile,
	}
	return ce
}

CommandExecutor的初始化参数就是前3个成员变量,将成员变量都赋值后,再将registry初始化,可以看到每种类型的任务都有它们对应的commandCallable,比如echo对应ce.cmdEcho,这就是worker支持的所有任务类型。

CommandLineRunner成员

// CommandLineRunner is an interface around exec.CommandContext().
//
//go:generate go run github.com/golang/mock/mockgen -destination mocks/cli_runner.gen.go -package mocks projects.blender.org/studio/flamenco/internal/worker CommandLineRunner
type CommandLineRunner interface {
	CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd
	RunWithTextOutput(
		ctx context.Context,
		logger zerolog.Logger,
		execCmd *exec.Cmd,
		logChunker cli_runner.LogChunker,
		lineChannel chan<- string,
	) error
}

注释已经描述了,CommandLineRunner是一个包裹exec.CommandContext的接口,exec.CommandContext就是Go语言中执行CMD命令的工具。这个接口有两个函数,CommandContext用来获取exec.Cmd,RunWithTextOutput用来执行CMD命令。

CLIRunner

实现CommandLineRunner这个接口的类叫做CLIRunner,CLIRunner中没有任何成员变量,单纯地实现了接口中的两个函数。

type CLIRunner struct {
}

CommandContext函数

func (cli *CLIRunner) CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd {
	return exec.CommandContext(ctx, name, arg...)
}

非常简单,创建并返回一个exec.Cmd指针。

RunWithTextOutput函数

// RunWithTextOutput runs a command and sends its output line-by-line to the
// lineChannel. Stdout and stderr are combined.
// Before returning. RunWithTextOutput() waits for the subprocess, to ensure it
// doesn't become defunct.
//
// Note that all output read from the command is logged via `logChunker` as
// well, so the receiving end of the `lineChannel` does not have to do this.
func (cli *CLIRunner) RunWithTextOutput(
	ctx context.Context,
	logger zerolog.Logger,
	execCmd *exec.Cmd,
	logChunker LogChunker,
	lineChannel chan<- string,
) error {
	outPipe, err := execCmd.StdoutPipe()
	if err != nil {
		return err
	}
	execCmd.Stderr = execCmd.Stdout // Redirect stderr to stdout.
    ...
	if err := execCmd.Start(); err != nil {
		logger.Error().Err(err).Msg("error starting CLI execution")
		return err
	}
 
	subprocPID := execCmd.Process.Pid
	logger = logger.With().Int("pid", subprocPID).Logger()
    ...
}

这里只看它的核心代码部分,除了记录Cmd执行的日志以外,他唯一的功能就是execCmd.Start()开始执行Cmd命令。

CommandListener成员

// CommandListener sends the result of commands (log, output files) to the Manager.
type CommandListener interface {
    // LogProduced sends any logging to whatever service for storing logging.
    // logLines are concatenated.
    LogProduced(ctx context.Context, taskID string, logLines ...string) error
    // OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video).
    OutputProduced(ctx context.Context, taskID string, outputLocation string) error
}

CommandListener也是一个接口,用于将worker的任务执行日志和任务结果提交给manager,用来观测各个任务的详细执行情况。
LogProduced:用于上传日志到manager。
OutputProduced:用于上传任务输出(一般是渲染图片)到manager。

Listener

Listener是实现CommandListener接口的类。它的定义如下:

// Listener listens to the result of task and command execution, and sends it to the Manager.
type Listener struct {
	client         FlamencoClient
	buffer         UpstreamBuffer
	outputUploader *OutputUploader
}
// NewListener creates a new Listener that will send updates to the API client.
func NewListener(client FlamencoClient, buffer UpstreamBuffer) *Listener {
	l := &Listener{
		client:         client,
		buffer:         buffer,
		outputUploader: NewOutputUploader(client),
	}
	return l
}

其中有3个成员变量,client用于与manager通信,buffer用来保存需要上传的信息,outputUploader实现上传操作,在它的初始化中,outputUploader也是需要client来进行初始化的,outputUploader就是Listener的核心。

LogProduced

func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {
	if ctx.Err() != nil {
		return ctx.Err()
	}
	return l.buffer.SendTaskUpdate(ctx, taskID, update)
}
 
// LogProduced sends any logging to whatever service for storing logging.
func (l *Listener) LogProduced(ctx context.Context, taskID string, logLines ...string) error {
    return l.sendTaskUpdate(ctx, taskID, api.TaskUpdateJSONRequestBody{
        Log: ptr(strings.Join(logLines, "\n")),
    })
}

LogProduced将最新的日志按行送到manager,其中调用了buffer.SendTaskUpdate方法,这是UpstreamBuffer中的一个方法。
日志传输的流程暂略。

OutputProduced

// OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video).
func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLocation string) error {
	l.outputUploader.OutputProduced(taskID, outputLocation)
	return nil
}

由outputUploader执行文件传输操作。

OutputUploader

这是执行真正文件传输操作的类,

// OutputUploader sends (downscaled versions of) rendered images to Flamenco
// Manager. Only one image is sent at a time. A queue of a single image is kept,
// where newly queued images replace older ones.
type OutputUploader struct {
	client FlamencoClient
	queue  *last_in_one_out_queue.LastInOneOutQueue[TaskOutput]
}
 
type TaskOutput struct {
	TaskID   string
	Filename string
}
 
func NewOutputUploader(client FlamencoClient) *OutputUploader {
	return &OutputUploader{
		client: client,
		queue:  last_in_one_out_queue.New[TaskOutput](),
	}
}

可以看到,其中有一个FlamencoClient和一个queue,queue中保存了每一个任务的信息,信息由TaskID和Filename组成。
OutputUploader中的OutputProduced函数如下:

// OutputProduced enqueues the given filename for processing.
func (ou *OutputUploader) OutputProduced(taskID, filename string) {
    // TODO: Before enqueueing (and thus overwriting any previously queued item),
    // check that this file can actually be handled by the Last Rendered system of
    // Flamenco. It would be a shame if a perfectly-good JPEG file is kicked off
    // the queue by an EXR file we can't handle.
    item := TaskOutput{taskID, filename}
    ou.queue.Enqueue(item)
}

它并未做任务传输操作,而是将需要传输的文件加入队列。
OutputUploader中还有一个Run函数,它是这样的:

func (ou *OutputUploader) Run(ctx context.Context) {
    log.Info().Msg("output uploader: running")
    defer log.Info().Msg("output uploader: shutting down")
 
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        ou.queue.Run(ctx)
    }()
 
runLoop:
    for {
        select {
        case <-ctx.Done():
            break runLoop
        case item := <-ou.queue.Item():
            ou.process(ctx, item)
        }
    }
 
    wg.Wait()
}

由此可以看出,worker的文件传输是异步的,它只要将需要传输的任务加入队列中,另一个goroutine会调用process函数来对每个任务进行处理,
process函数:

// process loads the given image, converts it to JPEG, and uploads it to
// Flamenco Manager.
func (ou *OutputUploader) process(ctx context.Context, item TaskOutput) {
    ...
    jpegBytes := loadAsJPEG(item.Filename)
    if len(jpegBytes) == 0 {
        return // loadAsJPEG() already logged the error.
    }
 
    // Upload to Manager.
    jpegReader := bytes.NewReader(jpegBytes)
    resp, err := ou.client.TaskOutputProducedWithBodyWithResponse(
        ctx, item.TaskID, "image/jpeg", jpegReader)
    if err != nil {
        logger.Error().Err(err).Msg("output uploader: unable to send image to Manager")
        return
    }
    ...
}

将需要上传的图片解析为JPEG格式,然后上传到manager。

CommandExecutor的其他成员函数

Run

func (ce *CommandExecutor) Run(ctx context.Context, taskID string, cmd api.Command) error {
    logger := log.With().Str("task", string(taskID)).Str("command", cmd.Name).Logger()
    logger.Info().Interface("parameters", cmd.Parameters).Msg("running Flamenco command")
 
    runner, ok := ce.registry[cmd.Name]
    if !ok {
        return fmt.Errorf("unknown command: %q", cmd.Name)
    }
 
    return runner(ctx, logger, taskID, cmd)
}

Run函数是CommandExecutor执行cmd命令的函数,传入一个cmd,根据cmd.Name在registry中找到一个commandCallable类型的函数,这里叫做runner,然后调用这个函数。registry中保存了每个cmd.Name和需要执行的函数的映射,在CommandExecutor构建时就初始化了,比如”blender-render”对应cmdBlenderRender,cmdBlenderRender中就会调用CLIRunner的RunWithTextOutput来执行cmd命令。

总结

CommandExecutor是worker中用来执行cmd命令的函数,在flamenco中,manager中的每一项任务最终都具体到一个cmd命令,可能是简单的echo命令,可能是文件管理命令,最常用的是blender xxxx的后台渲染命令。CommandExecutor提供了命令执行,日志上传,命令执行结果上传的功能,不过它可以执行的命令是有限制的,在初始化时所有可执行的命令类型都被记录在registry中。