manager的api_impl模块负责flamenco所提供的API的具体实现,可以视为flamenco向外提供的接口。flamenco中的api都是使用OpenAPI来定义的,关于OpenAPI的相关信息可以查看官方文档: https://learn.openapis.org/

在pkg中定义API

flamenco的API(REST API)都在pkg模块定义,使用OpenAPI语法,以GetVersion为例,在flamenco-openapi.yaml中定义GetVersion的使用格式:

  /api/v3/version:
    summary: Clients can use this to check this is actually a Flamenco server.
    get:
      summary: Get the Flamenco version of this Manager
      operationId: getVersion
      tags: [meta]
      responses:
        "200":
          description: normal response
          content:
            application/json:
              schema:
                $ref: "#/components/schemas/FlamencoVersion"
  ...
  components:
  schemas:
    FlamencoVersion:
      type: object
      properties:
        "version":
          type: string
          description: >
            Version of this Manager, meant for human consumption. For release
            builds it is the same as `shortversion`, for other builds it also
            includes the `git` version info.
        "shortversion": { type: string }
        "name": { type: string }
        "git": { type: string }
      required: [version, shortversion, name, git]
      example:
        version: "3.3-alpha0 (v3.2-76-gdd34d538)"
        shortversion: 3.3-alpha0
        name: Your Manager
        git: v3.2-76-gdd34d538

这意味着我们使用http向地址 http://{manager-ip}/api/v3/version发送一个GET方法,就可以获取到flamencao的版本号,它以一个json格式的字符串表示。

同样,在pkg模块中,定义了这个API的响应函数:

type ServerInterfaceWrapper struct {
	Handler ServerInterface
}
...
// GetVersion converts echo context to params.
func (w *ServerInterfaceWrapper) GetVersion(ctx echo.Context) error {
	var err error

	// Invoke the callback with all the unmarshalled arguments
	err = w.Handler.GetVersion(ctx)
	return err
}

从上面的代码可以看出,真正含有GetVersion方法的类是ServerInterface,而ServerInterfaceWrapper是包装器。pkg模块中的接口都类似一个包装器,用于把各模块对外提供的接口都整合到一起。

在api_impl中实现API

接下来就是ServerInterface的实现,它位于api_impl模块的meta.go文件中:

func (f *Flamenco) GetVersion(e echo.Context) error {
	return e.JSON(http.StatusOK, api.FlamencoVersion{
		Version:      appinfo.ExtendedVersion(),
		Shortversion: appinfo.ApplicationVersion,
		Name:         f.config.Get().ManagerName,
		Git:          appinfo.ApplicationGitHash,
	})
}

GetVersion函数将appinfo和config中的信息读取出来,打包成json格式,并返回出去,对应flamenco-openapi.yaml中声明的查询版本的responses格式。

Flamenco类

在上面的GetVersion函数中还可以发现,GetVersion其实是Flamenco的成员函数,在Go语言中,如果一个类的成员函数实现了某一个接口的方法,那么这个来就自动实现了这个接口,所以Flamenco就是实现了ServerInterface的类。Flamenco类的定义如下:

type Flamenco struct {
	jobCompiler    JobCompiler
	persist        PersistenceService
	broadcaster    ChangeBroadcaster
	logStorage     LogStorage
	config         ConfigService
	stateMachine   TaskStateMachine
	shaman         Shaman
	clock          TimeService
	lastRender     LastRendered
	localStorage   LocalStorage
	sleepScheduler WorkerSleepScheduler
	jobDeleter     JobDeleter

	// The task scheduler can be locked to prevent multiple Workers from getting
	// the same task. It is also used for certain other queries, like
	// `MayWorkerRun` to prevent similar race conditions.
	taskSchedulerMutex sync.Mutex

	// done is closed by Flamenco when it wants the application to shut down and
	// restart itself from scratch.
	done chan struct{}
}

var _ api.ServerInterface = (*Flamenco)(nil)

Flamenco包含了很多成员变量,而每一个成员变量就是Manager中的一个功能模块,所以Flamenco类也可以看作是Manager功能的整合。ServerInterface 初始化时就是一个Flamenco类型的空指针。

Interface

Flamenco类中的每一个成员变量都是一个Interface,负责Manager中的某一个功能,以JobCompiler为例,它的定义如下:

type JobCompiler interface {
	ListJobTypes() api.AvailableJobTypes
	GetJobType(typeName string) (api.AvailableJobType, error)
	Compile(ctx context.Context, job api.SubmittedJob) (*job_compilers.AuthoredJob, error)
}

它实现3个功能:列出所有类型的Job;查询一个Job的类型;编译Job。当我们需要实现Job编译相关的API时,就可以使用它的功能。以现有的SubmitJob为例,这是最为常用的提交渲染任务的API,它的实现如下:

func (f *Flamenco) SubmitJob(e echo.Context) error {
	logger := requestLogger(e)

    // 将提交的任务信息绑定到job变量
	var job api.SubmitJobJSONRequestBody
	if err := e.Bind(&job); err != nil {
		logger.Warn().Err(err).Msg("bad request received")
		return sendAPIError(e, http.StatusBadRequest, "invalid format")
	}

	logger = logger.With().
		Str("type", job.Type).
		Str("name", job.Name).
		Logger()
	logger.Info().Msg("new Flamenco job received")

	ctx := e.Request().Context()
	// 编译这个job,其中使用了JobCompiler中的方法
	authoredJob, err := f.compileSubmittedJob(ctx, logger, api.SubmittedJob(job))
	switch {
	case errors.Is(err, job_compilers.ErrJobTypeBadEtag):
		logger.Info().Err(err).Msg("rejecting submitted job because its settings are outdated, refresh the job type")
		return sendAPIError(e, http.StatusPreconditionFailed, "rejecting job because its settings are outdated, refresh the job type")
	case err != nil:
		logger.Warn().Err(err).Msg("error compiling job")
		// TODO: make this a more specific error object for this API call.
		return sendAPIError(e, http.StatusBadRequest, err.Error())
	}

	logger = logger.With().Str("job_id", authoredJob.JobID).Logger()

	// TODO: check whether this job should be queued immediately or start paused.
	authoredJob.Status = api.JobStatusQueued
    // 将此job储存起来
	if err := f.persist.StoreAuthoredJob(ctx, *authoredJob); err != nil {
		logger.Error().Err(err).Msg("error persisting job in database")
		return sendAPIError(e, http.StatusInternalServerError, "error persisting job in database")
	}

	dbJob, err := f.persist.FetchJob(ctx, authoredJob.JobID)
	if err != nil {
		logger.Error().Err(err).Msg("unable to retrieve just-stored job from database")
		return sendAPIError(e, http.StatusInternalServerError, "error retrieving job from database")
	}
    // 发布job更新消息
	jobUpdate := webupdates.NewJobUpdate(dbJob)
	f.broadcaster.BroadcastNewJob(jobUpdate)

	apiJob := jobDBtoAPI(dbJob)
	return e.JSON(http.StatusOK, apiJob)
}

compileSubmittedJob的实现如下:

func (f *Flamenco) compileSubmittedJob(ctx context.Context, logger zerolog.Logger, submittedJob api.SubmittedJob) (*job_compilers.AuthoredJob, error) {
	// Replace the special "manager" platform with the Manager's actual platform.
	if submittedJob.SubmitterPlatform == "manager" {
		submittedJob.SubmitterPlatform = runtime.GOOS
	}

	if submittedJob.TypeEtag == nil || *submittedJob.TypeEtag == "" {
		logger.Warn().Msg("job submitted without job type etag, refresh the job types in the Blender add-on")
	}

	// Before compiling the job, replace the two-way variables. This ensures all
	// the tasks also use those.
	replaceTwoWayVariables(f.config, &submittedJob)
    // 在这里调用了JobCompiler中的方法
	return f.jobCompiler.Compile(ctx, submittedJob)
}

总结

api_impl模块用于实现manager提供的所有API,他把manager上的基础功能整合到一起,提供更易用的接口给开发者。它也是整个manager程序的入口,当启动flamenco-manager程序时,Flamenco类的对象会被创建,其中的各个功能模块开始启动。