job_compilers模块的主要功能是支持用户自定义flamenco上的job,flamenco默认有一个渲染场景的job类型供用户使用,但是也支持用户根据自身的需要编写job。自定义的job使用javascript来编码,将其放在manager程序文件夹下的script目录中(没有此目录时可以自己创建),manager会自动识别并编译这些job,后续就可以通过API来使用它们了。
关于job的详细介绍,可参考:https://flamenco.blender.org/usage/job-types/
job_compilers模块提供以下3个功能:
type JobCompiler interface {
ListJobTypes() api.AvailableJobTypes
GetJobType(typeName string) (api.AvailableJobType, error)
Compile(ctx context.Context, job api.SubmittedJob) (*job_compilers.AuthoredJob, error)
}
goja
job使用javascript编写的,flamenco主要使用go编写,所以需要一种让两种编程语言相互协同的手段,flamenco使用的时goja库。goja 包提供了一套完整的 JavaScript 运行环境,包括了解析器、虚拟机、标准库等,可以在 Go 程序中通过直接调用函数或间接调用脚本来执行 JavaScript 代码。goja 还支持绑定和导出本地 Go 函数或类型,使 JavaScript 代码能够与 Go 代码进行交互。 关于goja库更详细的介绍可以查看:https://pkg.go.dev/github.com/dop251/goja#section-documentation
scripts.go
script.go负责从磁盘中读取job文件,并创建javascript运行所需的虚拟机环境。
job_compilers中有如下数据类型定义:
// Service contains job compilers defined in JavaScript.
type Service struct {
compilers map[string]Compiler // Mapping from job type name to the job compiler of that type.
registry *require.Registry // Goja module registry.
timeService TimeService
// mutex protects 'compilers' from race conditions.
mutex *sync.Mutex
}
type Compiler struct {
jobType string
program *goja.Program // Compiled JavaScript file.
filename string // The filename of that JS file.
}
type VM struct {
runtime *goja.Runtime // Goja VM containing the job compiler script.
compiler Compiler // Program loaded into this VM.
jobTypeEtag string // Etag for this particular job type.
}
// jobCompileFunc is a function that fills job.Tasks.
type jobCompileFunc func(job *AuthoredJob) error
Service是job_campilers的核心,包含所有job的编译器,VM是job执行的虚拟机,它们都包含了Compiler结构体。Compiler中的program就是定义job的javascript文件编译而来的代码。script.go主要实现两个函数:
-
loadScripts():从文件夹中读取job文件,并将它的compile函数储存到 Service.compilers 中。
-
compilerVMForJobType(jobTypeName string):输入一个job类型,返回该job运行的虚拟机环境,即上面的VM结构体。
在compilerVMForJobType函数中,有这样一段代码:
runtime := newGojaVM(s.registry)
if _, err := runtime.RunProgram(program.program); err != nil {
return nil, err
}
他调用newGojaVM函数创建了一个goja虚拟机,用于初始化最终返回的VM结构体。newGojaVM函数的实现如下:
func newGojaVM(registry *require.Registry) *goja.Runtime {
vm := goja.New()
vm.SetFieldNameMapper(goja.UncapFieldNameMapper())
mustSet := func(name string, value interface{}) {
err := vm.Set(name, value)
if err != nil {
log.Panic().Err(err).Msgf("unable to register '%s' in Goja VM", name)
}
}
// Set some global functions.
mustSet("print", jsPrint)
mustSet("alert", jsAlert)
mustSet("frameChunker", jsFrameChunker)
mustSet("formatTimestampLocal", jsFormatTimestampLocal)
// Pre-import some useful modules.
registry.Enable(vm)
mustSet("author", require.Require(vm, "author"))
mustSet("path", require.Require(vm, "path"))
mustSet("process", require.Require(vm, "process"))
return vm
}
可以看到,在创建goja的虚拟机时,这里向其中注入了几个方法,比如用于调试的jsPrint,在虚拟机中的名称为print,这就表示在使用javascript编写job时,可以使用print函数来间接使用jsPrint打印信息,其他几个函数也是同理。除了函数外,这里也注入了几个模块,也可以理解成类,path模块提供了manager运行的路径信息,process模块提供了当前路径和当前的运行平台(windows, linux),author模块则是比较核心的模块,接下来详细介绍。
author
author模块专门提供一些方法让用户来自定义job,是job编写中最常使用的模块。
author中有如下数据结构定义:
// Author allows scripts to author tasks and commands.
type Author struct {
runtime *goja.Runtime
}
// AuthoredJob就是自定义的Job,它包含了多个Task
type AuthoredJob struct {
JobID string
WorkerTagUUID string
Name string
JobType string
Priority int
Status api.JobStatus
Created time.Time
Settings JobSettings
Metadata JobMetadata
Storage JobStorageInfo
Tasks []AuthoredTask
}
type JobSettings map[string]interface{}
type JobMetadata map[string]string
// AuthoredTask是Job中具体执行的任务,任务的执行命令保存在Commands中
type AuthoredTask struct {
// Tasks already get their UUID in the authoring stage. This makes it simpler
// to store the dependencies, as the code doesn't have to worry about value
// vs. pointer semantics. Tasks can always be unambiguously referenced by
// their UUID.
UUID string
Name string
Type string
Priority int
Commands []AuthoredCommand
// Dependencies are tasks that need to be completed before this one can run.
Dependencies []*AuthoredTask `json:"omitempty" yaml:"omitempty"`
}
type AuthoredCommand struct {
Name string
Parameters AuthoredCommandParameters
}
author模块向两个javascript提供了两个函数,Task和Command,用来在自定以Job时创建Task和它所执行的命令。两个函数的具体实现:
// 输入任务的名称和类型,返回AuthoredTask
func (a *Author) Task(name string, taskType string) (*AuthoredTask, error) {
name = strings.TrimSpace(name)
taskType = strings.TrimSpace(taskType)
if name == "" {
return nil, errors.New("author.Task(name, type): name is required")
}
if taskType == "" {
return nil, errors.New("author.Task(name, type): type is required")
}
at := AuthoredTask{
uuid.New(),
name,
taskType,
50, // TODO: handle default priority somehow.
make([]AuthoredCommand, 0), // 创建的是一个没有command的空任务
make([]*AuthoredTask, 0),
}
return &at, nil
}
// 输入需要执行的cmd命令和参数,返回一个Command
func (a *Author) Command(cmdName string, parameters AuthoredCommandParameters) (*AuthoredCommand, error) {
ac := AuthoredCommand{cmdName, parameters}
return &ac, nil
}
除此之外,author模块也提供了AddTask(将一个AuthoredTask加入到AuthoredJob)、AddCommand(将一个Command加入到AuthoredTask中),AddDependency(将一个AuthoredTask加入到另一个AuthoredTask的依赖中)三个函数,加上上面的Task和Command函数,自定义job所需的接口就全部准备好了,后面三个add函数的实现比较简单,append到对应的数据中就行,不再详细分析。
job_compilers
job_compilers用于整合上述的所有模块,实现自定义job的加载和编译服务,并实现此模块需要提供的三个接口(ListJobTypes(),GetJobType(typeName string),Compile(ctx context.Context, job api.SubmittedJob))。前两个接口比较简单,这里以Compile为例来分析。先看Compile接口的实现:
func (s *Service) Compile(ctx context.Context, sj api.SubmittedJob) (*AuthoredJob, error) {
// 获得该job类型的VM
vm, err := s.compilerVMForJobType(sj.Type)
if err != nil {
return nil, err
}
if err := vm.checkJobTypeEtag(sj); err != nil {
return nil, err
}
// Create an AuthoredJob from this SubmittedJob.
// 注意这里的Settings和Metadata是空的
aj := AuthoredJob{
JobID: uuid.New(),
Created: s.timeService.Now(),
Name: sj.Name,
JobType: sj.Type,
Priority: sj.Priority,
Status: api.JobStatusUnderConstruction,
Settings: make(JobSettings),
Metadata: make(JobMetadata),
}
if sj.Settings != nil {
for key, value := range sj.Settings.AdditionalProperties {
aj.Settings[key] = value
}
}
if sj.Metadata != nil {
for key, value := range sj.Metadata.AdditionalProperties {
aj.Metadata[key] = value
}
}
if sj.Storage != nil && sj.Storage.ShamanCheckoutId != nil {
aj.Storage.ShamanCheckoutID = *sj.Storage.ShamanCheckoutId
}
if sj.WorkerTag != nil {
aj.WorkerTagUUID = *sj.WorkerTag
}
// 从VM中获取该job的compiler,它们在manager启动时就加载好了
compiler, err := vm.getCompileJob()
if err != nil {
return nil, err
}
// 调用这个compiler函数编译该job,主要为填充job的task和command
if err := compiler(&aj); err != nil {
return nil, err
}
log.Info().
Int("num_tasks", len(aj.Tasks)).
Str("name", aj.Name).
Str("jobtype", aj.JobType).
Str("job", aj.JobID).
Msg("job compiled")
return &aj, nil
}
vm.getCompileJob()的实现如下:
func (vm *VM) getCompileJob() (jobCompileFunc, error) {
// 所谓的compiler由用户自定义,在javascript中名称为compileJob
compileJob, isCallable := goja.AssertFunction(vm.runtime.Get("compileJob"))
if !isCallable {
// TODO: construct a more elaborate Error type that contains this info, instead of logging here.
log.Error().
Str("jobType", vm.compiler.jobType).
Str("script", vm.compiler.filename).
Msg("script does not define a compileJob(job) function")
return nil, ErrScriptIncomplete
}
// TODO: wrap this in a nicer way.
return func(job *AuthoredJob) error {
_, err := compileJob(nil, vm.runtime.ToValue(job))
return err
}, nil
}
Compile的功能可以总结为:当用户提交job时,通过该job的类型寻找对应的compiler(这个job的类型可能是manager自带的,也可能是用户自己定义的,其中包括了该类型的任务的参和compiler),使用该compiler构建好job的task和对应的command,封装到一个完整的AuthoredJob中,后面就可以发送给worker执行了。
一个Job_type示例
以manager中自带的echo_and_sleep的job类型举例,它的定义如下:
const JOB_TYPE = {
label: "Echo Sleep Test", // job type 的名称
settings: [ // 对应JobSettings
{ key: "message", type: "string", required: true },
{ key: "sleep_duration_seconds", type: "int32", default: 1 },
{ key: "sleep_repeats", type: "int32", default: 1 },
]
};
function compileJob(job) { // 该job type的compiler,名称必须为compileJob,接受AuthoredJob参数
const settings = job.settings;
const echoTask = author.Task("echo", "misc"); // 使用author的Task函数创建一个task,它的名称为scho,类型为misc
echoTask.addCommand(author.Command("echo", {message: settings.message})); // 使用addCommand为这个task添加echo命令
job.addTask(echoTask); // 将这个task加入job
// 同理,加入sleep的task
for (let repeat=0; repeat < settings.sleep_repeats; repeat++) {
const sleepTask = author.Task("sleep", "misc")
sleepTask.addCommand(author.Command("sleep", {duration_in_seconds: settings.sleep_duration_seconds}))
sleepTask.addDependency(echoTask); // Ensure sleeping happens after echo, and not at the same time.
job.addTask(sleepTask);
}
}