flamenco使用ssdp协议来连接manager和worker,关于SSDP(简单服务发现协议):https://zh.wikipedia.org/wiki/%E7%AE%80%E5%8D%95%E6%9C%8D%E5%8A%A1%E5%8F%91%E7%8E%B0%E5%8D%8F%E8%AE%AE

ssdp协议规定client接入网络时可以向一个server发送信息,server会根据信息判断自己是否有client请求的服务,如果有就响应它。同理server接入网络时会发送一次广播,client根据自己的策略处理接受到的信息。

floamenco使用 github.com/fromkeith/gossdp 包的ssdp协议功能。

Client

type Client struct {
	ssdp       *gossdp.ClientSsdp
	log        *zerolog.Logger
	wrappedLog *ssdpLogger

	mutex    *sync.Mutex
	urls     []string        // Preserves order
	seenURLs map[string]bool // Removes duplicates
}

func NewClient(logger zerolog.Logger) (*Client, error) {
	wrap := wrappedLogger(&logger)
	client := Client{
		log:        &logger,
		wrappedLog: wrap,

		mutex:    new(sync.Mutex),
		urls:     make([]string, 0),
		seenURLs: make(map[string]bool),
	}

	ssdp, err := gossdp.NewSsdpClientWithLogger(&client, wrap)
	if err != nil {
		return nil, fmt.Errorf("create UPnP/SSDP client: %w", err)
	}

	client.ssdp = ssdp
	return &client, nil
}

Client的核心是*gossdp.ClientSsdp指针,提供ssdp client的功能,urls保存它可以看到的server地址,seenURLs快速判断此url是否在它的保存的urls中。

Run

func (c *Client) Run(ctx context.Context) ([]string, error) {
    defer c.stopCleanly()

    log.Debug().Msg("waiting for UPnP/SSDP answer")
    go c.ssdp.Start()

    var waitTime time.Duration
    for {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-time.After(waitTime):
            if err := c.ssdp.ListenFor(FlamencoServiceType); err != nil {
                return nil, fmt.Errorf("unable to find Manager: %w", err)
            }
            waitTime = 1 * time.Second

            urls := c.receivedURLs()
            if len(urls) > 0 {
                return urls, nil
            }
        }
    }
}

启动ssdp客户端并等待应答,如果超时自动关闭client。

Server

// Server advertises services via UPnP/SSDP.
type Server struct {
	ssdp       *gossdp.Ssdp
	log        *zerolog.Logger
	wrappedLog *ssdpLogger
}

func NewServer(logger zerolog.Logger) (*Server, error) {
	wrap := wrappedLogger(&logger)
	ssdp, err := gossdp.NewSsdpWithLogger(nil, wrap)
	if err != nil {
		return nil, err
	}
	return &Server{ssdp, &logger, wrap}, nil
}

和Client一样,Server核心是一个gossdp.Ssdp指针。

AddAdvertisement

// AddAdvertisement adds a service advertisement for Flamenco Manager.
// Must be called before calling Run().
func (s *Server) AddAdvertisement(serviceLocation string) {
	// Define the service we want to advertise
	serverDef := gossdp.AdvertisableServer{
		ServiceType: FlamencoServiceType,  // "urn:flamenco:manager:0"
		DeviceUuid:  FlamencoUUID,    // "aa80bc5f-d0af-46b8-8630-23bd7e80ec4d"
		Location:    serviceLocation,  
		MaxAge:      3600, // Number of seconds this advertisement is valid for.
	}
	s.ssdp.AdvertiseServer(serverDef)
	s.log.Debug().Str("location", serviceLocation).Msg("UPnP/SSDP location registered")
}

// AddAdvertisementURLs constructs a service location from the given URLs, and
// adds the advertisement for it.
func (s *Server) AddAdvertisementURLs(baseURLs []url.URL) {
	for _, url := range baseURLs {
		url.Path = path.Join(url.Path, serviceDescriptionPath)
		s.AddAdvertisement(url.String())
	}
}

添加广播地址,先添加之后才能启动server。

Run

// Run starts the advertisement, and blocks until the context is closed.
func (s *Server) Run(ctx context.Context) {
	s.log.Info().Msg("UPnP/SSDP advertisement starting")

	isStopping := false

	go func() {
		// There is a bug in the SSDP library, where closing the server can cause a panic.
		defer func() {
			if isStopping {
				// Only capture a panic when we expect one.
				value := recover()
				s.log.Debug().Interface("value", value).Msg("recovered from panic in SSDP library")
			}
		}()

		s.ssdp.Start()
	}()

	<-ctx.Done()

	s.log.Debug().Msg("UPnP/SSDP advertisement stopping")

	// Sneakily disable warnings when shutting down, otherwise the read operation
	// from the UDP socket will cause a warning.
	tempLog := s.log.Level(zerolog.ErrorLevel)
	s.wrappedLog.zlog = &tempLog
	isStopping = true
	s.ssdp.Stop()
	s.wrappedLog.zlog = s.log

	s.log.Info().Msg("UPnP/SSDP advertisement stopped")
}

启动server服务。