目录
参考论文: Scaling Distributed Machine Learning with the Parameter Server
现实中,训练数据的数量可能达到1TB到1PB之间,而训练过程中的参数可能会达到\(10^9\)
(十亿)到\(10^12\)
(千亿)。而往往这些模型的参数需要被所有的worker节点频繁的访问,就有如下问题与挑战:
通用的分布式系统通常都是:每次迭代都强制同步,通常在几十个节点上,它们的性能可以表现的很好,但是在大规模集群中,这样的每次迭代强制同步的机制会因为木桶效应变得很慢。
Mahout 基于 Hadoop,MLI 基于 Spark,它们(Spark与MLI)采用的都是 Iterative MapReduce 的架构。它们能够保持迭代之间的状态,并且执行策略也更加优化了。但是,由于这两种方法都采用同步迭代的通信方式,使得它们很容易因为个别机器的低性能导致全局性能的降低。
为了解决这个问题,Graphlab 采用图形抽象的方式进行异步调度通信。但是它缺少了以 MapReduce 为基础架构的弹性扩展性,并且它使用粗粒度的snapshots来进行恢复,这两点都会阻碍到可扩展性。parameter server 正是吸取Graphlab异步机制的优势,并且解决了其在可扩展性方面的劣势。
在parameter server中,每个 server 实际上都只负责分到的部分参数(servers共同维持一个全局的共享参数),而每个 work 也只分到部分数据和处理任务。
上图中,每个子节点都只维护自己分配到的参数(黑色),自己部分更新之后,将计算结果(例如,参数)传回到主节点,进行全局的更新(比如平均操作之类的),主节点再向子节点传送新的参数。
servers 与 workers 之间的通信如下:
parameter server 中,参数都是可以被表示成(key, value)的集合,比如一个最小化损失函数的问题,key就是feature ID,而value就是它的权值。对于稀疏参数,不存在的key,就可以认为是0。
workers 跟 servers 之间通过 push 跟 pull 来通信。 worker 通过 push 将计算好的梯度发送到server,然后通过 pull 从server更新参数。 为了提高计算性能和带宽效率,parameter server 允许用户使用Range Push 跟 Range Pull操作。
range push/pull:发送和接收特定Range中的参数。
如果 iter1 需要在 iter0 computation,push 跟 pull 都完成后才能开始,那么就是Synchronous,反之就是Asynchronous.
Asynchronous Task:能够提高系统的效率(因为节省了很多等待的过程),但是,它的缺点就是容易降低算法的收敛速率。
系统性能和算法收敛速率的trade-off需要考虑的因素:
考虑到用户使用的时候会有不同的情况,parameter server 为用户提供了多种任务依赖方式:
\(\tau \)
作为最大的延时时间。也就是说,只有\(>\tau \)
之前的任务都被完成了,才能开始一个新的任务。极端的情况:\(\tau = 0\)
情况就是 Sequential;\(\tau = \infty \)
情况就是 Sequential;一个bounded delay 的 PGD (proximal gradient descent)算法的系统运行流程:
如何选择\(\tau \)
parameter server 使用 vector clock 来记录每个节点中参数的时间戳,能够用来跟踪状态或避免数据的重复发送。但是,假设有n个节点,m个参数,那么vector clock的空间复杂度就是\(O(n*m)\)
。当有几千个节点和几十亿的参数时,对于内存和带宽来说都是不可实现的。
parameter server 在push跟pull的时候,都是rang-based,这就带来了一个好处:这个range里面的参数共享的是同一个时间戳,这显然可以大大降低空间复杂度。
每次从一个range里再提取一个range,最多会生成3个新的 vector clocks(一分为三)。假设总共m个参数,\(k\)
是算法中产生的所有的range,那么空间复杂度就变成了\(O(k*m)\)
。
一条 message 包括:时间戳,len(range)对k-v:
\[
[vc(R), (k_1, v_1), . . . , (k_p, v_p)] k_j \in R \; \; and\;\; j \in \{1, . . . p\}
\]
这是parameter server 中最基本的通信格式,不仅仅是共享的参数才有,task 的message也是这样的格式,只要把这里的(key, value) 改成 (task ID, 参数/返回值)。
由于机器学习问题通常都需要很高的网络带宽,因此信息的压缩是必须的。
另外,key 的压缩和 value 的压缩可以同时进行。
另外,还有用户自定义过滤: 对于机器学习优化问题比如梯度下降来说,并不是每次计算的梯度对于最终优化都是有价值的,用户可以通过自定义的规则过滤一些不必要的传送,再进一步压缩带宽cost:
parameter server 在数据一致性上,使用的是传统的一致性哈希算法,参数key与server node id被插入到一个hash ring中。在分布式系统中,动态增加和移除节点的同时还能保证系统存储与key分配的性能效率。
每个节点都复制了它逆时钟方向的k个节点中的key。图中,k=2,\(S_1\)
复制了\(S_2\)
和\(S_3\)
内的key。
两种方式保证slave跟master之间的数据一致性:
两个worker 节点分别向server传送x和y。server 首先通过一定方式(如:\(f(x+y)\)
)进行aggregate,然后再进行复制操作;
当有n个worker的时候,复制只需要\(k/n\)
的带宽。通常来说,k(复制次数)是一个很小的常数,而n的值大概是几百到几千;
要想实现系统的容错以及动态的扩展系统规模,必须要求系统能够支持动态添加和移除节点。
当有一个 server节点添加进来时:
当有一个worker节点W添加进来时: