persistence是manager中最复杂的一个模块,复制manager数据的持久化工作,它提供了很多持久化数据的接口给上层应用,比如worker信息的保存、更新,job信息的创建、保存、状态更新、删除等。
manager中使用spllite作为持久化的工具,同时使用gorm库来访问数据库。gorm库是go语言中的一个进行ORM的库,可以以go语言的形式来进行数据库的访问,而不需要编写大量的sql语句,关于gorm的详细信息可以查看 https://gorm.io/docs/ 。
DB
DB是persistence模块中的数据库对象,persistence的接口都由它来提供,它的定义如下:
// DB provides the database interface.
type DB struct {
gormDB *gorm.DB
}
// Model contains the common database fields for most model structs.
// It is a copy of the gorm.Model struct, but without the `DeletedAt` field.
// Soft deletion is not used by Flamenco. If it ever becomes necessary to
// support soft-deletion, see https://gorm.io/docs/delete.html#Soft-Delete
type Model struct {
ID uint `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
}
DB包含了一个*gorm.DB类型的结构体,Model 是数据库中所有model结构的公共字段。
OpenDB(ctx context.Context, dsn string) (*DB, error)用于创建DB,核心代码:
...
dialector := sqlite.Open(dsn)
gormDB, err := gorm.Open(dialector, config)
if err != nil {
return nil, err
}
db := DB{
gormDB: gormDB,
}
...
(db *DB) Close() error用于关闭DB连接:
// Close closes the connection to the database.
func (db *DB) Close() error {
sqldb, err := db.gormDB.DB()
if err != nil {
return err
}
return sqldb.Close()
}
当DB创建之后,自动为数据库关联以下model,
err := db.gormDB.AutoMigrate(
&Job{},
&JobBlock{},
&JobStorageInfo{},
&LastRendered{},
&SleepSchedule{},
&Task{},
&TaskFailure{},
&Worker{},
&WorkerTag{},
)
这些就是数据库中包含的表结构,目前它们没有任何字段。
jobs
jobs专门用于对flamenco中job的持久化,包括储存、更新、删除和查找等,它涉及的model如下:
type Job struct {
Model // Model是所有表包含的公共字段
UUID string `gorm:"type:char(36);default:'';unique;index"`
Name string `gorm:"type:varchar(64);default:''"`
JobType string `gorm:"type:varchar(32);default:''"`
Priority int `gorm:"type:smallint;default:0"`
Status api.JobStatus `gorm:"type:varchar(32);default:''"`
Activity string `gorm:"type:varchar(255);default:''"`
Settings StringInterfaceMap `gorm:"type:jsonb"`
Metadata StringStringMap `gorm:"type:jsonb"`
DeleteRequestedAt sql.NullTime
Storage JobStorageInfo `gorm:"embedded;embeddedPrefix:storage_"`
WorkerTagID *uint
WorkerTag *WorkerTag `gorm:"foreignkey:WorkerTagID;references:ID;constraint:OnDelete:SET NULL"`
}
type Task struct {
Model
UUID string `gorm:"type:char(36);default:'';unique;index"`
Name string `gorm:"type:varchar(64);default:''"`
Type string `gorm:"type:varchar(32);default:''"`
JobID uint `gorm:"default:0"`
Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE"`
Priority int `gorm:"type:smallint;default:50"`
Status api.TaskStatus `gorm:"type:varchar(16);default:''"`
// Which worker is/was working on this.
WorkerID *uint
Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:SET NULL"`
LastTouchedAt time.Time `gorm:"index"` // Should contain UTC timestamps.
// Dependencies are tasks that need to be completed before this one can run.
Dependencies []*Task `gorm:"many2many:task_dependencies;constraint:OnDelete:CASCADE"`
Commands Commands `gorm:"type:jsonb"`
Activity string `gorm:"type:varchar(255);default:''"`
}
// TaskFailure keeps track of which Worker failed which Task.
type TaskFailure struct {
// Don't include the standard Gorm ID, UpdatedAt, or DeletedAt fields, as they're useless here.
// Entries will never be updated, and should never be soft-deleted but just purged from existence.
CreatedAt time.Time
TaskID uint `gorm:"primaryKey;autoIncrement:false"`
Task *Task `gorm:"foreignkey:TaskID;references:ID;constraint:OnDelete:CASCADE"`
WorkerID uint `gorm:"primaryKey;autoIncrement:false"`
Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"`
}
以保存job的状态为例,它的代码如下:
// SaveJobStatus saves the job's Status and Activity fields.
func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error {
tx := db.gormDB.WithContext(ctx).
Model(j).
Updates(Job{Status: j.Status, Activity: j.Activity})
if tx.Error != nil {
return jobError(tx.Error, "saving job status")
}
return nil
}
SaveJobStatus会更新传入job的Status和Activity两个字段。
其余的增删查改逻辑基本类似,不再详细介绍。jobs提供了以下持久化的接口:
-
StoreAuthoredJob:保存编译好的job。
-
FetchJob:通过jobID获取一个job。
-
SaveJobPriority:保存(更新)job的优先级。
-
FetchTask:通过taskID获取一个task。
-
FetchTaskFailureList:获取执行某个task失败的所有worker。
-
SaveTask:保存task。
-
SaveTaskActivity:保存task的active状态。
-
TaskTouchedByWorker:将一个task标记为’touched‘,这个操作发生在worker上,用于超时检测。
-
QueryJobs:查询符合条件的所有job。
-
QueryJobTaskSummaries:查询一个job下的所有task。
workers
workers用于manager对连接到它的所有worker进行数据管理,manager上会储存worker的信息,以便于对worker进行控制(让一个worker休眠),或者为worker分配job。
workers所涉及的model:
type Worker struct {
Model
DeletedAt gorm.DeletedAt `gorm:"index"`
UUID string `gorm:"type:char(36);default:'';unique;index"`
Secret string `gorm:"type:varchar(255);default:''"`
Name string `gorm:"type:varchar(64);default:''"`
Address string `gorm:"type:varchar(39);default:'';index"` // 39 = max length of IPv6 address.
Platform string `gorm:"type:varchar(16);default:''"`
Software string `gorm:"type:varchar(32);default:''"`
Status api.WorkerStatus `gorm:"type:varchar(16);default:''"`
LastSeenAt time.Time `gorm:"index"` // Should contain UTC timestamps.
StatusRequested api.WorkerStatus `gorm:"type:varchar(16);default:''"`
LazyStatusRequest bool `gorm:"type:smallint;default:false"`
SupportedTaskTypes string `gorm:"type:varchar(255);default:''"` // comma-separated list of task types.
Tags []*WorkerTag `gorm:"many2many:worker_tag_membership;constraint:OnDelete:CASCADE"`
}
type WorkerTag struct {
Model
UUID string `gorm:"type:char(36);default:'';unique;index"`
Name string `gorm:"type:varchar(64);default:'';unique"`
Description string `gorm:"type:varchar(255);default:''"`
Workers []*Worker `gorm:"many2many:worker_tag_membership;constraint:OnDelete:CASCADE"`
}
提供的持久化接口:
-
CreateWorker:创建一个worker记录。
-
FetchWorker:通过uuid查找worker。
-
FetchWorkers:查找连接到此manager的所有worker。
-
FetchWorkerTask:获得最近被指定到此worker上的task。
-
SaveWorker:保存(更新)worker记录。
-
SaveWorkerStatus:保存(更新)worker状态。
-
WorkerSeen:将一个worker标记为此manager可见,用于超时检测。
-
DeleteWorker:通过uuid删除一个worker。
除了jobs和workers这两个主要模块,persistence还提供了一些其余的持久化接口,比如last_render图片的保存。就实现逻辑而言,基本都是不同表之间的连接以及增删查改,因此这里只描述每个接口的功能,不详细研究他们的实现方式。