本文旨在分享我在处理多TB数据集时的一些经验教训。内容主要集中在数据集规模扩大的过程中可能遇到的问题,以及我采取的一些应对措施。希望你在阅读这篇文章时,正好在等待某个任务的完成!
请记住,这不是一份严格的指南,而是介绍一些概念并解释为什么你应该开始应用它们。还有许多其他工具可能优于我使用的这些工具,我强烈鼓励你主动探索这些工具。主动探索是你职业成长的关键。
本文分为两个部分:单机扩展和多机扩展。目标是最大限度地利用现有资源,并尽快实现目标。
最后,我想强调的是,没有任何优化或扩展能够弥补一个有缺陷的算法。在扩展之前,评估你的算法是至关重要的。这应该始终是你的第一步,为你的工作提供一个有信心的指导。
单机扩展
Joblib
计算是扩展时首先想到的瓶颈。可以通过几种不同的实际方式来扩展计算。如果你是一名数据科学家或机器学习工程师,你可能已经熟悉了Joblib,这是一个用于并行运行代码(以及其他功能)的库。它通常在其他库中使用,例如scikit-learn或XGBoost。
使用Joblib并行化某些东西的过程非常简单,如下所示(为了清晰起见,已从Joblib文档中修改):
from joblib import Parallel, delayed
from math import sqrt
parallel_mapper = Parallel(n_jobs=-1)
delayed_func = delayed(sqrt)
jobs = [
delayed_func(x**2)
for x in range(10)
]
parallel_mapper(jobs)
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Joblib是扩展并行工作负载的绝佳方式。它在scikit-learn等工具中使用,对于许多工作负载都非常可靠。这还没有考虑到它在记忆化或快速压缩持久性方面的其他优秀功能。Joblib对于使函数可并行化运行在所有CPU核心上非常有帮助。
GNU Parallel
GNU Parallel是一个用于CLI中预处理或提取数据的强大工具。与Joblib不同,它可以在脚本之外使用,非常通用。你甚至可以并行运行其他Python脚本。最常见的用例之一是同时解压多个文件。以下是我的做法:
> ls
random_0.zip random_2.zip random_4.zip random_6.zip random_8.zip
random_1.zip random_3.zip random_5.zip random_7.zip random_9.zip
...
> mkdir output
> ls | parallel --eta --bar "unzip -q {} -d output/"
100% 10:0=0s random_9.zip
> ls output/
random_0.txt random_2.txt random_4.txt random_6.txt random_8.txt
random_1.txt random_3.txt random_5.txt random_7.txt random_9.txt
...
如果你以前使用过Linux终端,这些命令相当简单。主要部分是将文件名通过管道传递给parallel,以便unzip可以解压它们。
对于任何任务,一旦你有了一个用于单个文件的bash命令,你可以通过稍微修改你的命令来使其并行化。默认情况下,parallel使用所有可用的CPU核心,并且可以使用ssh在多台机器上执行命令,这意味着它可以用作临时计算集群。
另一个用例是下载大量文件。使用wget和parallel以及一个要下载的文件列表,可以写一个简单的一行命令来并行下载所有文件。其他工具如axel和aria2c也能很好地完成这项任务,但当我需要下载许多较小文件时,我发现这种方法效果更好。
需要注意的是,虽然可以使用这个方法下载许多文件,但要注意这可能会给服务器带来压力,创建多个连接,导致网络拥塞和其他用户性能下降,甚至被视为DOS攻击。这种增加的服务器负载对于较小的网站或带宽有限的服务器来说特别成问题。有名的是,aria2c曾拒绝增加最大连接数从16增加,即使计算机变得更快,网络带宽大幅增加。我同意他们的决定,建议你在下载时负责任地行事。
另一个我要提到的是,虽然使用Parallel可以更快地完成工作,但管理bash命令可能会很困难,尤其是对于团队中的初学者,他们可能更习惯于Python/传统编程语言。因此,我通常建议将Parallel保留用于一次性任务,而不是在bash中编写复杂的ETL管道。可维护的代码仅次于不写代码。
多机扩展
何时开始使用多台机器
何时切换到使用多台机器(例如Spark或我最喜欢的Dask)的一个关键标志是,当计算时间太长而无法满足你的需求时。这可能是实验、数据处理或其他工作。如果我坚持使用单个实例,即使是在AWS的u-24tb1.112xlarge(一台强大的机器)上,有些任务估计需要几个月甚至一年的时间才能完成计算。我反对任何形式的浪费,越能充分利用现有资源越好。
通过切换到多台较小的机器,你可以利用比大型实例更多的性能优势。根据你的扩展解决方案,水平扩展允许你几乎线性地扩展CPU、内存和网络,具体取决于你使用的实例数量。
大多数相当大的EC2实例提供高达10 GBit的互联网速度,这有助于缓解IO瓶颈,尤其是当你快速将数据流入或流出S3时。如果你的工作负载需要50 Gbit/s的数据流入,你可以选择使用m7i.48xlarge实例,每小时成本为$9.6768,运行速度为50 GBit,或者四个m7i.8xlarge实例,每个实例每小时成本为$1.6128,总共$6.4512,每小时具有相同的网络带宽。
我选择网络速度和成本作为两个关注点,但如果你想最大化内存和CPU使用,我们可以比较之前提到的u-24tb1.112xlarge。以相同的成本,你可以租用135个m7i.8xlarge实例。这将给你4320个CPU(实例数量的10倍),17.28TB内存,以及1687.5 GBit互联网速度(大约实例速度的17倍)!虽然内存较少,但我这里使用的是通用实例,而不是内存优化型实例。使用内存优化型等效实例,我们可以获得34.56 TB内存,同时还享受使用多台机器的其他好处(冗余、对实例大小更精细的控制等)。
此外,使用正确的后端,我可以根据我的用例、编排工具或财务部门允许的情况扩展到任意数量的实例。这种可扩展性是一个关键优势,使你能够满足工作负载需求,而不受限于单个实例的能力。
如同所有事情一样,不同的方法各有利弊。你的任务是评估每种解决方案的优缺点,并确定哪种方法最适合你的用例。在最大化性能和最小化成本之间找到平衡是一项良好的直觉构建练习。
然而,鉴于这些巨大优势,我只建议在理解你面临的瓶颈之后再使用多个实例。我曾见过团队在理解他们的用例之前就开始扩展和过度设计他们的计算方法。我可能也曾是这些团队的一员,在学到了教训之前。在某些情况下,编写良好的cli工具比整个spark集群处理数据还快。
不同计算模型
对于完全并行工作负载
完全并行工作负载通常比其他类型的工作负载更容易扩展。我们已经讨论了如何使用Joblib或Parallel来扩展计算,那么如何扩展到多台机器呢?有很多工具可以用来扩展计算。我建议使用AWS Batch或AWS Lambda来处理一次性的完全并行工作负载。Batch具有可扩展性,并且通过spot定价,你可以以按需实例的一小部分成本完成大部分任务,比在单台机器上并行运行所需时间少得多。有其他工具可以使用(例如GCP的Cloud Run),但我只能推荐AWS Batch用于长时间运行的任务,因为这是我过去使用过的。
由于设置集群可能非常耗时且超出了本文范围,我这里包含了一个链接,以防你有兴趣自己探索这一点:
需要注意的是,你工作的总体吞吐量将受到读写速度限制,而不是计算速度。如果你正在从数据库读取/写入,那么数据库可能成为瓶颈(甚至崩溃)。S3是一个可行的读写选项,因为它设计得更好,但它仍然有其限制。每个分区前缀每秒3500次写入和5500次读取。S3设计为用户在扩展时不可见,因此你对其如何适应增加的吞吐量几乎没有控制权。
一旦数据进入S3(或你使用的任何服务),你可以将其传输到任何需要的地方。
这种设置非常繁琐,但适合一次性任务进行扩展。经过几次迭代后,你可以将设置时间减少到几分钟,这取决于你自动化过程和团队需求的程度。一般来说,我发现设置时间值得节省计算和工程时间,但你可以理解我不愿意将其用于每项任务。
分析工作负载
分析工作负载稍微难以扩展一些。这是因为你通常处理的是单个数据集,并尝试对该数据集进行大量操作。你可能还会有一些交互元素,例如在Jupyter Notebook中运行的东西。我用来扩展分析工作负载的首选工具是Dask,备选的是Spark。Dask和Spark都是开源工具,可以让你将工作负载扩展到多台机器,各有优缺点。这些工具也可以本地使用,它们对DataFrame(Dask DataFrame和Spark Dataframe)的实现可以用来扩展现有工作负载。
Dask更容易设置和安装。我可以通过一个简单命令(pip install "dask[complete]")在几分钟内本地运行Dask。而Spark需要更多设置,我发现本地运行更具挑战性。Dask还具有任何使用Pandas或Numpy的数据科学家都能快速上手的优势,而了解Spark则是一项完全不同的技能集。Dask还与几个PyData工具集成得更好,这意味着你可以立即利用它们。然而,尽管如此,Spark及其生态系统相比之下更加成熟,很可能你的团队已经投入了时间建立了Spark集群。我偶尔会遇到Dask的bug或性能问题,而Spark因其成熟度而被认为更加稳定。Dask也不适合长时间运行计算。
鉴于此,我的一般建议是:
- 如果你是一个没有大数据或分布式计算基础设施的小团队或初创公司,我建议至少尝试一下Dask,无论团队对Spark是否有经验。在你花时间本地运行Spark之前,你可能已经用Dask验证了你的用例,并且你的团队将能够利用PyData领域中的其他工具。
- 如果你已经是一个使用Spark或其他重要数据基础设施的大型组织的一部分,那么除非有令人信服的理由,否则建议继续使用它。我推荐观看Eric Dill关于《Is Spark Still Relevant?》的视频,了解为什么大型组织更喜欢使用Spark而不是更现代的工具。这视频已有五年历史,因此某些观点可能已过时。尽管如此,你仍然应该尝试Dask,因为你可以同时使用这两者。
结论
总之,管理和扩展多TB数据集需要对你的数据和可用工具有深入了解。通过利用Joblib和GNU Parallel进行单机扩展,你可以最大限度地提高计算资源效率。当需要超越单机扩展时,AWS Batch、Dask和Spark提供了针对各种工作负载(从完全并行任务到复杂分析操作)的强大解决方案。
关键要点是在扩展之前优化你的算法,以确保不只是放大低效之处。积极探索和适应新工具可以显著提升你的性能和成本效益。成功扩展不仅仅依赖于纯粹计算能力,还需要战略规划和资源管理。拥抱学习曲线,你将能够自信且熟练地处理即使是最大的数据库。