在具体介绍 TensorFlow 分布式的各种 Strategy 之前,我们首先需要看看分布式的基础:分布式环境。只有把基础打扎实了,才能在以后的分析工作之中最大程度的扫清障碍,事半功倍。本文梳理下 Master 的静态逻辑。
本系列其他文章是:
[翻译] TensorFlow 分布式之论文篇 "TensorFlow : Large-Scale Machine Learning on Heterogeneous Distributed Systems"
[翻译] TensorFlow 分布式之论文篇 "Implementation of Control Flow in TensorFlow"
[源码解析] TensorFlow 分布式环境(1) --- 总体架构
Server 上运行了两个 RPC 服务,分别是MasterService 和 WorkerService。如果 Client 接入到Server,那么Server 就是 Master 角色,Client 访问的就是 MasterService 服务(MasterService 同时负责协调和控制多个 WorkerService 的执行过程)。
Master 这个角色的具体实现是 Master Service。Master Service是一个GRPC service,用于与一系列远端的分布式设备进行交互来协调多个worker service。
Client 通过 GrpcSession 调用 Master Service,既然是 RPC 服务,那么 Client 和 MasterService 之间就需要有一个接口规范。这个规范定义在 master_service.proto 文件中,其定义了各个接口的消息体。
service MasterService { // Creates a session. rpc CreateSession(CreateSessionRequest) returns (CreateSessionResponse); // Extends a session. rpc ExtendSession(ExtendSessionRequest) returns (ExtendSessionResponse); // Prepares future partial run calls. rpc PartialRunSetup(PartialRunSetupRequest) returns (PartialRunSetupResponse); // Drives the graph computation. rpc RunStep(RunStepRequest) returns (RunStepResponse); // Closes a session. rpc CloseSession(CloseSessionRequest) returns (CloseSessionResponse); // List the devices usable by the master. rpc ListDevices(ListDevicesRequest) returns (ListDevicesResponse); // Close and abandon all existing sessions. Ongoing computations // will no longer affect fresh ones via the resources in containers listed in // the ResetRequest. See ResetRequest for more details. rpc Reset(ResetRequest) returns (ResetResponse); // Registers a callable for execution with RunCallable. rpc MakeCallable(MakeCallableRequest) returns (MakeCallableResponse); // Executes a callable registered with MakeCallable. rpc RunCallable(RunCallableRequest) returns (RunCallableResponse); // Frees resources associated with a callable registered with MakeCallable. rpc ReleaseCallable(ReleaseCallableRequest) returns (ReleaseCallableResponse);}Client 使用接口 MasterInterface 获取远端 MasterService 的服务。MasterInterface 是接口类,是 Client 与 TensorFlow Master service 进行通信的抽象接口。这个接口既支持基于 RPC 的 master 实现,也支持不需要 RPC 往返的进程内部的 master 实现。MasterInterface 所有接口都是同步接口,这样 Client 就像调用本地函数一样调用远端 MasterService 提供的服务。
MasterInterface有两种实现,都是用来和 Master service 进行通信,
class MasterInterface { public: virtual ~MasterInterface() {} virtual Status CreateSession(CallOptions* call_options, const CreateSessionRequest* request, CreateSessionResponse* response) = 0; virtual Status ExtendSession(CallOptions* call_options, const ExtendSessionRequest* request, ExtendSessionResponse* response) = 0; virtual Status PartialRunSetup(CallOptions* call_options, const PartialRunSetupRequest* request, PartialRunSetupResponse* response) { return errors::Unimplemented("Partial run not implemented for this master"); } virtual Status RunStep(CallOptions* call_options, RunStepRequestWrapper* request, MutableRunStepResponseWrapper* response) = 0; virtual Status RunStep(CallOptions* call_options, const RunStepRequest* request, RunStepResponse* response) { std::unique_ptr<RunStepRequestWrapper> wrapped_request( new ProtoRunStepRequest(request)); std::unique_ptr<MutableRunStepResponseWrapper> wrapped_response( new NonOwnedProtoRunStepResponse(response)); return RunStep(call_options, wrapped_request.get(), wrapped_response.get()); } virtual MutableRunStepRequestWrapper* CreateRunStepRequest() { MutableProtoRunStepRequest* ret = new MutableProtoRunStepRequest; ret->request_.set_request_id(GetUniqueRequestId()); return ret; } virtual MutableRunStepResponseWrapper* CreateRunStepResponse() { return new OwnedProtoRunStepResponse; } virtual Status CloseSession(CallOptions* call_options, const CloseSessionRequest* request, CloseSessionResponse* response) = 0; virtual Status ListDevices(CallOptions* call_options, const ListDevicesRequest* request, ListDevicesResponse* response) = 0; virtual Status Reset(CallOptions* call_options, const ResetRequest* request, ResetResponse* response) = 0; virtual Status MakeCallable(CallOptions* call_options, const MakeCallableRequest* request, MakeCallableResponse* response) = 0; virtual Status RunCallable(CallOptions* call_options, const RunCallableRequest* request, RunCallableResponse* response) = 0; virtual Status ReleaseCallable(CallOptions* call_options, const ReleaseCallableRequest* request, ReleaseCallableResponse* response) = 0; protected: // NOTE: This should only be called by implementations of this // interface whose CreateRunStepResponse() method returns a // proto-based wrappers for the RunStepResponse message. RunStepResponse* get_proto_from_wrapper( MutableRunStepResponseWrapper* wrapper) { return wrapper->get_proto(); }};具体使用如下,如果 Client 和 Master 在同一个进程,则直接使用 LocalMaster,否则使用 GrpcRemoteMaster 来利用 gRPC 访问远程 GrpcMasterService。图上两个矩形封装的 Master 代表实际的 Master 类,此类实现了具体 Master 功能。

图 1 Master 逻辑结构
下面的伪代码说明了客户端如何与 master 交互,这其实就是分布式模式之中,使用 GrpcRemoteMaster 来通过 gRPC 与远端 MasterSerivce 服务交互的过程。
stub = NewStub("/job:mnist/replica:0/task:0"){handle} = stub->CreateSession({graph_def}) do { stub->RunStep({handle, {feeds}, {fetches}}) // The client can evaluate a predicate locally, based on the // result of fetches, to determine whether to terminate. For // example, it might fetch the loss and evaluate whether it is less // than some threshold.} while (!should_stop({fetches}));stub->CloseSession({handle})当 Client 调用时候,GrpcSession 使用 LocalMaster 获取本地master,如果没有得到,则才使用 GrpcRemoteMaster。此时 Client 和 master 没有跨节点,LocalMaster 使客户端和master之间能够直接进行进程内通信,这样就可以给同进程内部的Client提供更高效的Master服务。
LocalMaster 定义如下,主要成员变量就是 master_impl_。LocalMaster 其实就是一个壳而已,直接转发给master_impl_。master_impl_ 是当 Client 和 master 没有跨节点时候,本地直接调用的类。
class LocalMaster : public MasterInterface { private: Master* master_impl_; // Not owned. const int64 default_timeout_in_ms_; // See LocalMaster::Lookup for the factory function that creates // objects of this type. LocalMaster(Master* master_impl, const int64 default_timeout_in_ms); TF_DISALLOW_COPY_AND_ASSIGN(LocalMaster);};LocalMaster 有一个静态变量 local_master_registry_ 用来注册。
typedef std::unordered_map<string, MasterInfo> LocalMasterRegistry;LocalMasterRegistry* local_master_registry() { static LocalMasterRegistry* local_master_registry_ = new LocalMasterRegistry; return local_master_registry_;}在 GrpcServer 初始化时候,调用如下代码把 target="grpc://" 生成的 Master 注册到本地 LocalMaster。
LocalMaster::Register(target(), master_impl_.get(), config.operation_timeout_in_ms());就是把 master 注册到这个static变量 local_master_registry_ 之中。
/* static */void LocalMaster::Register(const string& target, Master* master, int64 default_timeout_in_ms) { mutex_lock l(*get_local_master_registry_lock()); local_master_registry()->insert( {target, MasterInfo(master, default_timeout_in_ms)});}当调用 GrpcSession::Create 方法时候,如果 Client 和 Master 在同一个进程,Lookup 在本地能够找到注册的 Master,则会生成一个 LocalMaster 返回,同时 LocalMaster 的 master_impl_ 就配置成找到的 Master。如果找不到,就返回空,则 GrpcSession::Create 方法会创建一个 GrpcRemoterMaster,这样就同远端 Master 进行交互。
/* static */std::unique_ptr<LocalMaster> LocalMaster::Lookup(const string& target) { std::unique_ptr<LocalMaster> ret; mutex_lock l(*get_local_master_registry_lock()); auto iter = local_master_registry()->find(target); if (iter != local_master_registry()->end()) { ret.reset(new LocalMaster(iter->second.master, iter->second.default_timeout_in_ms)); } return ret;}以下是同一个进程,Lookup 可以找到的情况,生成 LocalMaster 进行本地操作。

图 2 同进程 master 操作
我们看看不同进程的情况。此时进程 1 之中的 LocalMaster 没有指向任何 Master,因为本地没有启动 Server,所以 GrpcSession::Create 方法第一步 Lookup 调用失败,返回 Null,GrpcSession::Create 方法执行第二步骤,创建 GrpcRemoteMaster,进行远程交互。进程 2 之中,LocalMaster 因为没有客户端调用 GrpcSession::Create 方法,所以也没有指向任何 Master。

图 3 跨进程 master 操作
LocalMaster 调用到其内部成员变量 master_impl_ 来完成业务功能。
Status LocalMaster::CreateSession(CallOptions* call_options, const CreateSessionRequest* request, CreateSessionResponse* response) { Notification n; Status ret; master_impl_->CreateSession(request, response, [&n, &ret](const Status& s) { ret.Update(s); n.Notify(); }); TF_RETURN_IF_ERROR( WaitForNotification(call_options, default_timeout_in_ms_, &n)); return ret;}Status LocalMaster::ExtendSession(CallOptions* call_options, const ExtendSessionRequest* request, ExtendSessionResponse* response) { Notification n; Status ret; master_impl_->ExtendSession(request, response, [&n, &ret](const Status& s) { ret.Update(s); n.Notify(); }); TF_RETURN_IF_ERROR( WaitForNotification(call_options, default_timeout_in_ms_, &n)); return ret;}Status LocalMaster::RunStep(CallOptions* call_options, RunStepRequestWrapper* request, MutableRunStepResponseWrapper* response) { Notification n; Status ret; master_impl_->RunStep(call_options, request, response, [&n, &ret](const Status& s) { ret.Update(s); n.Notify(); }); TF_RETURN_IF_ERROR( WaitForNotification(call_options, default_timeout_in_ms_, &n)); return ret;}GrpcRemoteMaster 是 gRPC 客户端的一种实现, 其终通过 Stub 调用远端 Master 上的 GrpcMasterService 服务,这样调用行为就犹如本地函数调用一样。远端 GrpcMasterService 实现了 MasterService 服务定义的所有接口,是 MasterService 服务的真正实体。当创建 GrpcRemoteMaster 实例时候,需要通过 target 来指定 Master 服务的地址和端口,并且创建对应的 RPC 通道。GrpcSession 和 GrpcRemoteMaster 从严格意义上讲都是 Client 实现的一部分。
GrpcRemoteMaster 具体定义如下,主要是使用了MasterServiceStub。
// GrpcRemoteMaster is an implementation of the MasterInterface// that uses gRPC to talk to the Master service.class GrpcRemoteMaster : public MasterInterface { using MasterServiceStub = grpc::MasterService::Stub; public: explicit GrpcRemoteMaster(const SharedGrpcChannelPtr& client_channel) : stub_(grpc::MasterService::NewStub(client_channel)) {} ~GrpcRemoteMaster() override {} std::unique_ptr<MasterServiceStub> stub_;};GrpcRemoteMaster 的功能很简单,就是通过 gRPC 的一 个 stub 调用远端 Master 服务的相应接口。
我们使用 CreateSession 为例看看,是使用 CallWithRetry 完成功能。
Status CreateSession(CallOptions* call_options, const CreateSessionRequest* request, CreateSessionResponse* response) override { return CallWithRetry(call_options, request, response, &MasterServiceStub::CreateSession);}CallWithRetry 代码如下,其又是调用 s = FromGrpcStatus((stub_.get()->*pfunc)(&ctx, *request, response)) 获取 Stub 来完成功能。
template <typename Request, typename Response>Status CallWithRetry(CallOptions* call_options, const Request* request, Response* response, ::grpc::Status (MasterServiceStub::*pfunc)( ::grpc::ClientContext*, const Request&, Response*), string trace_string = {}) { absl::Duration timeout = absl::Milliseconds(call_options->GetTimeout()); absl::Time expired_time = absl::FromUnixMicros(Env::Default()->NowMicros()); if (timeout > absl::ZeroDuration()) { expired_time += timeout; } Status s; for (int num_retries = 0;; ++num_retries) { ::grpc::ClientContext ctx; std::unique_ptr<profiler::TraceMe> trace; if (!trace_string.empty()) { trace.reset(NewTraceRpc(trace_string, &ctx)); } ctx.set_fail_fast(false); if (timeout > absl::ZeroDuration()) { // We do not modify the timeout here to match legacy behavior. However, // this could violate the contract of tensorflow::Session. If we retry // an RPC just before the deadline is exceeded, we will still set the // timeout to the original value. This leads to the overall timeout // being double what was expected. ctx.set_deadline(absl::ToChronoTime(absl::Now() + timeout)); } s = FromGrpcStatus((stub_.get()->*pfunc)(&ctx, *request, response)); if (!errors::IsUnavailable(s)) { return s; } // TODO(b/117162170): we may want to make this configurable. constexpr int kMaxRetries = 10; if (num_retries >= kMaxRetries) { return s; } absl::Time now = absl::FromUnixMicros(Env::Default()->NowMicros()); const absl::Time deadline_with_backoff = now + absl::Microseconds(ComputeBackoffMicroseconds(num_retries)); // Wait for a short period of time before retrying the RPC. If our // backoff would put us past the RPC deadline, we truncate it to ensure // our RPC starts before the deadline. const auto backoff_until = (timeout <= absl::ZeroDuration() || expired_time > deadline_with_backoff) ? deadline_with_backoff : expired_time; Env::Default()->SleepForMicroseconds( absl::ToInt64Microseconds(backoff_until - now)); now = absl::FromUnixMicros(Env::Default()->NowMicros()); if (now > expired_time && timeout > absl::ZeroDuration()) { // If timeout_in_ms is set, exit the retry loop on timeout. return errors::DeadlineExceeded(ctx.debug_error_string()); } }}接下来我们看看 Stub,这是依据 "//tensorflow/core/protobuf/master_service.proto" 来使用 grpc 实现的。
class Stub final : public StubInterface { public: Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel); ::grpc::Status CreateSession(::grpc::ClientContext* context, const CreateSessionRequest& request, CreateSessionResponse* response) override; ::grpc::Status ExtendSession(::grpc::ClientContext* context, const ExtendSessionRequest& request, ExtendSessionResponse* response) override; ::grpc::Status PartialRunSetup(::grpc::ClientContext* context, const PartialRunSetupRequest& request, PartialRunSetupResponse* response) override; ::grpc::Status RunStep(::grpc::ClientContext* context, const RunStepRequest& request, RunStepResponse* response) override; ::grpc::Status CloseSession(::grpc::ClientContext* context, const CloseSessionRequest& request, CloseSessionResponse* response) override; ::grpc::Status ListDevices(::grpc::ClientContext* context, const ListDevicesRequest& request, ListDevicesResponse* response) override; ::grpc::Status Reset(::grpc::ClientContext* context, const ResetRequest& request, ResetResponse* response) override; ::grpc::Status MakeCallable(::grpc::ClientContext* context, const MakeCallableRequest& request, MakeCallableResponse* response) override; ::grpc::Status RunCallable(::grpc::ClientContext* context, const RunCallableRequest& request, RunCallableResponse* response) override; ::grpc::Status ReleaseCallable(::grpc::ClientContext* context, const ReleaseCallableRequest& request, ReleaseCallableResponse* response) override; private: std::shared_ptr< ::grpc::ChannelInterface> channel_; const ::grpc::internal::RpcMethod rpcmethod_CreateSession_; const ::grpc::internal::RpcMethod rpcmethod_ExtendSession_; const ::grpc::internal::RpcMethod rpcmethod_PartialRunSetup_; const ::grpc::internal::RpcMethod rpcmethod_RunStep_; const ::grpc::internal::RpcMethod rpcmethod_CloseSession_; const ::grpc::internal::RpcMethod rpcmethod_ListDevices_; const ::grpc::internal::RpcMethod rpcmethod_Reset_; const ::grpc::internal::RpcMethod rpcmethod_MakeCallable_; const ::grpc::internal::RpcMethod rpcmethod_RunCallable_; const ::grpc::internal::RpcMethod rpcmethod_ReleaseCallable_;};具体远端的对应方法是:
static const char* grpcMasterService_method_names[] = { "/tensorflow.MasterService/CreateSession", "/tensorflow.MasterService/ExtendSession", "/tensorflow.MasterService/PartialRunSetup", "/tensorflow.MasterService/RunStep", "/tensorflow.MasterService/CloseSession", "/tensorflow.MasterService/ListDevices", "/tensorflow.MasterService/Reset", "/tensorflow.MasterService/MakeCallable", "/tensorflow.MasterService/RunCallable", "/tensorflow.MasterService/ReleaseCallable",};std::unique_ptr<MasterService::Stub> MasterService::NewStub( const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options) { std::unique_ptr<MasterService::Stub> stub(new MasterService::Stub(channel)); return stub;}Stub 内部调用 grpc 完成发送功能。
::grpc::Status MasterService::Stub::CreateSession( ::grpc::ClientContext* context, const CreateSessionRequest& request, CreateSessionResponse* response) { return ::grpc::internal::BlockingUnaryCall( channel_.get(), rpcmethod_CreateSession_, context, request, response);}所以,如果是 GrpcRemoteMaster,则调用流程应该是:GrpcRemoteMaster 接收到 grpc session 的请求,转交给 grpc master service,这期间经历了 GrpcSession -> GrpcRemoteMaster -> GrpcMasterService -> Master -> MasterSession 一系列流程。
当建立 GrpcSession 时候,create 方法之中会先查找有没有 Master。如果找到了就直接返回 LocalMaster,这部分我们前面介绍过。如果 Lookup 找不到。所以会调用 NewGrpcMaster 生成一个 GrpcRemoteMaster。
/* static */Status GrpcSession::Create(const SessionOptions& options, std::unique_ptr<GrpcSession>* out_session) { std::unique_ptr<GrpcSession> session(new GrpcSession(options)); std::unique_ptr<MasterInterface> master; // For testing, we enable the client to disable the use of the local // master registry, so that the RPC stack is exercised. if (!options.config.rpc_options().use_rpc_for_inprocess_master()) { master = LocalMaster::Lookup(options.target); } if (!master) { SharedGrpcChannelPtr master_channel; TF_RETURN_IF_ERROR( NewHostPortGrpcChannel(options.target.substr(kSchemePrefixLength), &options.config.rpc_options(), &master_channel)); // 建立 GrpcRemoteMaster,与远端 Master 交互 master.reset(NewGrpcMaster(master_channel)); } else { session->is_local_ = true; } session->SetRemoteMaster(std::move(master)); *out_session = std::move(session); return Status::OK();}NewGrpcMaster 方法具体如下:
MasterInterface* NewGrpcMaster(const SharedGrpcChannelPtr& channel) { return new GrpcRemoteMaster(channel);}GrpcMasterService 实现了 RPC 对应的 MasterService。GrpcMasterService 会:
GrpcServer 之中,master_service_ 是 GrpcMasterService 类型的变量。
// 创建 Master 以及对应的 GrpcMasterService master_impl_ = CreateMaster(&master_env_); master_service_ = NewGrpcMasterService(master_impl_.get(), config, &builder);GrpcServer 使用 master_thread_ 线程来执行 GrpcMasterService 的 HandleRPCsLoop方法。
master_thread_.reset( env_->StartThread(ThreadOptions(), "TF_master_service", [this] { master_service_->HandleRPCsLoop(); }));GrpcMasterService 定义如下,master_impl_ 是 Server 传入的 master 指针,是一个 Master 类的实例:
class GrpcMasterService : public AsyncServiceInterface { Master* master_impl_ = nullptr; // Not owned. std::unique_ptr<::grpc::ServerCompletionQueue> cq_; grpc::MasterService::AsyncService master_service_; mutex mu_; bool is_shutdown_ TF_GUARDED_BY(mu_); const ConfigProto default_session_config_; ::grpc::Alarm* shutdown_alarm_ = nullptr; template <class RequestMessage, class ResponseMessage> using MasterCall = Call<GrpcMasterService, grpc::MasterService::AsyncService, RequestMessage, ResponseMessage>;}GrpcMasterService 初始化时候,会得到 grpc 的消息队列 cq_。
GrpcMasterService(Master* master, const ConfigProto& default_session_config, ::grpc::ServerBuilder* builder) : master_impl_(master), is_shutdown_(false), default_session_config_(default_session_config) { builder->RegisterService(&master_service_); cq_ = builder->AddCompletionQueue();}前面提到了,master_thread_ 线程来执行 GrpcMasterService 的 HandleRPCsLoop 方法。HandleRPCsLoop 会调用 GrpcMasterService 内部函数来进行处理RPC消息。主循环 HandleRPCsLoop 代码如下:
void HandleRPCsLoop() override { ENQUEUE_REQUEST(CreateSession, true); ENQUEUE_REQUEST(ExtendSession, false); for (int i = 0; i < 100; ++i) { ENQUEUE_REQUEST(PartialRunSetup, false); ENQUEUE_REQUEST(RunStep, true); } ENQUEUE_REQUEST(CloseSession, false); ENQUEUE_REQUEST(ListDevices, false); ENQUEUE_REQUEST(Reset, false); ENQUEUE_REQUEST(MakeCallable, false); for (int i = 0; i < 100; ++i) { ENQUEUE_REQUEST(RunCallable, true); } ENQUEUE_REQUEST(ReleaseCallable, false); void* tag; bool ok; while (cq_->Next(&tag, &ok)) { UntypedCall<GrpcMasterService>::Tag* callback_tag = static_cast<UntypedCall<GrpcMasterService>::Tag*>(tag); if (callback_tag) { callback_tag->OnCompleted(this, ok); } else { // NOTE(mrry): A null callback_tag indicates that this is // the shutdown alarm. cq_->Shutdown(); } }}上面代码之中有一些最佳实践,具体就是围绕 ENQUEUE_REQUEST 做了一些处理:
#define ENQUEUE_REQUEST(method, supports_cancel) \ do { \ mutex_lock l(mu_); \ if (!is_shutdown_) { \ Call<GrpcMasterService, grpc::MasterService::AsyncService, \ method##Request, method##Response>:: \ EnqueueRequest(&master_service_, cq_.get(), \ &grpc::MasterService::AsyncService::Request##method, \ &GrpcMasterService::method##Handler, \ (supports_cancel)); \ } \ } while (0)在具体消息响应之中,会调用 master_impl_ 进行处理,当 Master 处理完成之后,处理函数将回调一个 lambda 表达式,向 Client 返回的响应消息。可以看到,代码在最后会使用 ENQUEUE_REQUEST 再插入一个同样类型的请求,比如下面最后会返回给 Client 一个 CreateSessionResponse。
// RPC handler for creating a session.void CreateSessionHandler( MasterCall<CreateSessionRequest, CreateSessionResponse>* call) { CreateSessionRequest* rewritten_req = new CreateSessionRequest; rewritten_req->mutable_config()->MergeFrom(default_session_config_); rewritten_req->MergeFrom(call->request); master_impl_->CreateSession(rewritten_req, &call->response, [call, rewritten_req](const Status& status) { call->SendResponse(ToGrpcStatus(status)); delete rewritten_req; }); ENQUEUE_REQUEST(CreateSession, true);}GrpcMasterService 提供的 API 如下:
static const char* grpcMasterService_method_names[] = { "/tensorflow.MasterService/CreateSession", "/tensorflow.MasterService/ExtendSession", "/tensorflow.MasterService/PartialRunSetup", "/tensorflow.MasterService/RunStep", "/tensorflow.MasterService/CloseSession", "/tensorflow.MasterService/ListDevices", "/tensorflow.MasterService/Reset", "/tensorflow.MasterService/MakeCallable", "/tensorflow.MasterService/RunCallable", "/tensorflow.MasterService/ReleaseCallable",};我们举出三个具体功能分析一下:
CreateSessionRequest 消息之中会带有 Client 设定的计算图和配置信息。Master 接收到请求之后,为这个 Client 建立一个 MasterSession 实例,并建立一个唯一地标识该 MasterSession 实例的 session_handle。这是通过 Master 类成员变量 std::unordered_map<string, MasterSession*> sessions_ 来完成的,session_handle 就是 string 类型。
Master 返回消息 CreateSessionResponse 给 Client。CreateSessionResponse 消息中携带:

图 4 CreateSession
具体响应代码如下:
// RPC handler for creating a session.void CreateSessionHandler( MasterCall<CreateSessionRequest, CreateSessionResponse>* call) { CreateSessionRequest* rewritten_req = new CreateSessionRequest; rewritten_req->mutable_config()->MergeFrom(default_session_config_); rewritten_req->MergeFrom(call->request); master_impl_->CreateSession(rewritten_req, &call->response, [call, rewritten_req](const Status& status) { call->SendResponse(ToGrpcStatus(status)); delete rewritten_req; }); ENQUEUE_REQUEST(CreateSession, true);}当建立 Session 之后,Client 可以通过 ExtendSession 告诉 Master 我需要拓展原有计算图的规模 (只能追加子图,不能修改或删除)。
在请求消息 ExtendSessionRequest 中有:
在在响应消息 ExtendSessionResponse 中返回 new_graph_version,其用于下一此 ExtendSession 操作。

图 5 ExtendSession
具体代码如下:
// RPC handler for extending a session.void ExtendSessionHandler( MasterCall<ExtendSessionRequest, ExtendSessionResponse>* call) { master_impl_->ExtendSession(&call->request, &call->response, [call](const Status& status) { call->SendResponse(ToGrpcStatus(status)); }); ENQUEUE_REQUEST(ExtendSession, false);}客户端会迭代执行 RunStep,请求消息 RunStepRequest 的变量较多,比如:
响应消息 RunStepResponse 主要携带:

图 6 RunStep
消息定义具体如下:
message RunStepRequest { // REQUIRED: session_handle must be returned by a CreateSession call // to the same master service. string session_handle = 1; // Tensors to be fed in the step. Each feed is a named tensor. repeated NamedTensorProto feed = 2; // Fetches. A list of tensor names. The caller expects a tensor to // be returned for each fetch[i] (see RunStepResponse.tensor). The // order of specified fetches does not change the execution order. repeated string fetch = 3; // Target Nodes. A list of node names. The named nodes will be run // to but their outputs will not be fetched. repeated string target = 4; // Options for the run call. RunOptions options = 5; // Partial run handle (optional). If specified, this will be a partial run // execution, run up to the specified fetches. string partial_run_handle = 6; // If true then some errors, e.g., execution errors that have long // error messages, may return an OK RunStepResponse with the actual // error saved in the status_code/status_error_message fields of the // response body. This is a workaround since the RPC subsystem may // truncate long metadata messages. bool store_errors_in_response_body = 7; // Unique identifier for this request. Every RunStepRequest must // have a unique request_id, and retried RunStepRequest must have // the same request_id. If request_id is zero, retry detection is disabled. int64 request_id = 8;}message RunStepResponse { // NOTE: The order of the returned tensors may or may not match // the fetch order specified in RunStepRequest. repeated NamedTensorProto tensor = 1; // Returned metadata if requested in the options. RunMetadata metadata = 2; // If store_errors_in_response_body is true in the request, then // optionally the server may return an OK status for the RPC and // fill the true status into the fields below, to allow for messages // that are too long to fit in metadata. error.Code status_code = 3; string status_error_message = 4;}具体代码如下:
// RPC handler for running one step in a session.void RunStepHandler(MasterCall<RunStepRequest, RunStepResponse>* call) { auto* trace = TraceRpc("RunStep/Server", call->client_metadata()); CallOptions* call_opts = new CallOptions; if (call->request.options().timeout_in_ms() > 0) { call_opts->SetTimeout(call->request.options().timeout_in_ms()); } else { call_opts->SetTimeout(default_session_config_.operation_timeout_in_ms()); } RunStepRequestWrapper* wrapped_request = new ProtoRunStepRequest(&call->request); MutableRunStepResponseWrapper* wrapped_response = new NonOwnedProtoRunStepResponse(&call->response); call->SetCancelCallback([call_opts]() { call_opts->StartCancel(); }); master_impl_->RunStep( call_opts, wrapped_request, wrapped_response, [call, call_opts, wrapped_request, trace](const Status& status) { call->ClearCancelCallback(); delete call_opts; delete wrapped_request; delete trace; if (call->request.store_errors_in_response_body() && !status.ok()) { call->response.set_status_code(status.code()); call->response.set_status_error_message(status.error_message()); call->SendResponse(ToGrpcStatus(Status::OK())); } else { call->SendResponse(ToGrpcStatus(status)); } }); ENQUEUE_REQUEST(RunStep, true);}前面提到了,GrpcServer 之中建立的是 Master 类的实例。
std::unique_ptr<Master> GrpcServer::CreateMaster(MasterEnv* master_env) { return std::unique_ptr<Master>(new Master(master_env, 0.0));}这样,在收到 Client 的消息后,在具体消息响应之中,GrpcMasterService 的线程会调用 master_impl_ 进行处理,就是把业务逻辑委托给 Master 类来实现。所以我们接下来就看看 Master 如何处理。
// RPC handler for creating a session.void CreateSessionHandler( MasterCall<CreateSessionRequest, CreateSessionResponse>* call) { CreateSessionRequest* rewritten_req = new CreateSessionRequest; rewritten_req->mutable_config()->MergeFrom(default_session_config_); rewritten_req->MergeFrom(call->request); master_impl_->CreateSession(rewritten_req, &call->response, [call, rewritten_req](const Status& status) { call->SendResponse(ToGrpcStatus(status)); delete rewritten_req; }); ENQUEUE_REQUEST(CreateSession, true);}Master 其实不是 MasterInterface 的派生类,其定义在tensorflow/core/distributed_runtime/master.cc。可以从成员变量 sessions_ 上看出来,主要就是管理 MasterSession。
class Master { private: typedef Master ME; // Not owned. MasterEnv* env_ = nullptr; // Owned. mutex mu_; // shutdown_ is set to true by the dtor. condition_variable shutdown_cv_; bool shutdown_ TF_GUARDED_BY(mu_) = false; Thread* gc_thread_; // Maps session handles to sessions. std::unordered_map<string, MasterSession*> sessions_ TF_GUARDED_BY(mu_); // Moving average of step times. MovingAverage last_1000_steps_ TF_GUARDED_BY(mu_); // Cumulative number of steps executed. int64 step_count_ TF_GUARDED_BY(mu_); // If a session is not active for this many seconds, it will be // closed automatically. const double session_gc_seconds_; // Used to track ids for incoming requests so we can detect duplicates. RecentRequestIds recent_request_ids_;};我们回忆一下之前提到的。
分布式运行的核心是如何操作计算图,但是计算功能被拆分为 Client,Master 和 Worker 三个角色。
Client 负责构造计算图,Worker 负责执行具体计算,但是 Worker 怎么知道应该计算什么?TensorFlow 在两者之间插入了一个 Master 角色来负责协调,调度。
虽然 Master 不是 MasterInterface 的派生类,但时其实现了 MasterService 的具体业务。Master 具体负责:
至此,Master 的静态结构我们已经介绍完毕,具体 Master 功能我们将在后文 Session 部分进行具体介绍。
最后,强烈推荐两个大神:
TensorFlow Internals
TensorFlow架构与设计:概述
TensorFlow内核剖析
TensorFlow架构与设计:OP本质论
[译] TensorFlow 白皮书
2017TensorFlow开发者峰会
https://jcf94.com/2018/02/28/2018-02-28-tfunpacking3/
TensorFlow 拆包(五):Distributed
TensorFlow Architecture
『深度长文』Tensorflow代码解析(五)
什么是in-graph replication和between-graph replication?
[腾讯机智] TensorFlow源码解析(1): 创建会话
05tensorflow分布式会话
第八节,配置分布式TensorFlow
TensorFlow 分布式(Distributed TensorFlow)
tensorflow源码解析之distributed_runtime
Distributed TensorFlow: A Gentle Introduction
一文说清楚Tensorflow分布式训练必备知识
TensorFlow中的Placement启发式算法模块——Placer
TensorFlow的图切割模块——Graph Partitioner
TensorFlow中的通信机制——Rendezvous(一)本地传输
TensorFlow分布式采坑记
TensorFlow技术内幕(九):模型优化之分布式执行
Tensorflow架构流程]