Pytorch 分布式训练
手动配置分布式训练
该方法自定义程度化较高,但环境等需自己配置,代码写起来较繁杂
准备环境
1 |
|
num_workers
线程数,一般去cpu线程数的 1/2,或取gpu数量。但取多了会占大量内存pin_memory
固定数据在内存中的地址,可加快读取速度,但可能会导致占用内存大prefetch_factor
预先取多少个batch到内存中,默认为2,调大可加快读取速度persistent_workers
每次迭代结束是否保留进程,默认为False,可加快读写速度collate_fn
默认将[(data 1, label 1), (data 2, label 2), …]
转化为[[data 1, data 2, ...], [label 1, label 2, ...]]
若要自定义collate_fn
则需自行转换
执行函数
1 |
|
结果写入和保存
由于是多卡推理/训练,涉及到文件读写冲突问题,因此需要制定策略防止文件写冲突
- 每张卡各自写到自己的文件,整个训练/推理过程结束完最后再合并(推荐,并行写入更快)
- 只有一个结果文件,每张卡轮流写入(进程写入结果文件要排队,降低效率)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52import fcntl
def write_result_to_file(batch, results, rank):
''' 每个线程的结果写入临时文件, 或者单独写入一个文件'''
sim_temp_path = f"./res/temp/results_rank_{rank}.txt"
is_header = False if os.path.exists(sim_temp_path) else True
# ... 结果处理,得到写入文件的格式
new_df = pd.DataFrame(new_rows) # 要写入文件的格式
# 写入临时文件
with open(sim_temp_path, 'a') as f:
# 独占锁
fcntl.flock(f, fcntl.LOCK_EX)
try:
new_df.to_csv(f, sep='\t', index=False, header=is_header, mode='a')
except Exception as e:
print(f"Raise Error: {e}")
finally:
# 解锁
fcntl.flock(f, fcntl.LOCK_UN)
'''
当 num_workers 设定大于gpu数量时,一个gpu可能会执行多个线程的任务。
当线程1再cuda:0上执行完,然后执行写入临时文件。若线程1的写文件还没执行完,线程2也在
cuda:0上执行完,也开始写入临时文件,就会发生冲突
因此需要一个互斥锁来保证两者的写操作冲突
'''
def merge_results_from_files(world_size, save_path):
'''
将每个gpu的结果合并到一起
'''
is_header = False if os.path.exists(save_path) else True
# 合并每个rank的结果
for rank in range(world_size):
sim_temp_path = f"./res/temp/results_rank_{rank}.txt"
rank_file = pd.read_table(sim_temp_path, sep='\t', encoding="utf-8")
rank_file.to_csv(save_path, sep='\t', index=False, header=is_header, mode='a')
print(f"Finish merge file to: {save_path}")
def delete_temp_file():
'''删除临时文件(可选)'''
temp_folder = "./res/temp/"
temp_file_names = [f"results_rank_{rank}.txt" for rank in range(torch.cuda.device_count())]
for file_name in temp_file_names:
file_path = os.path.join(temp_folder, file_name)
if os.path.isfile(file_path):
try:
os.remove(file_path)
print(f"Delete file: {file_path} successfully")
except Exception as e:
print(f"Raise Error when delete {file_path}: {e}")
自动配置分布式训练
另一种分布式训练写法,就是使用torchrun来执行python文件。运行的主函数只需关注每一个gpu的代码怎么运行即可,torchrun会自动分配环境给每一gpu。该方法只需考虑每个 gpu 对应的执行函数即可,代码写起来较为简单,也无需考虑文件互斥的问题,运行时直接 torchrun 自动执行分布式环境
一个典型的例子:CLIP
执行函数
1 |
|
执行脚本
1 |
|
Pytorch 分布式训练
https://guokent.github.io/developnotes/distribute/