import os.path as osp
import shutil
import tempfile
import time
from collections import defaultdict
import mmcv
import torch
import torch.distributed as dist
from mmcv.runner import get_dist_info
[docs]def single_gpu_test(model,
data_loader,
show=False,
out_dir=None,
show_score_thr=0.3):
"""Test model with single gpu.
Args:
model (nn.Module): Model to be tested.
data_loader (nn.Dataloader): Pytorch data loader.
show (bool): If True, visualize the prediction results (Not supported
for now). Defaults to False.
out_dir (str): Path of directory to save the visualization results (Not
supported for now). Defaults to None.
show_score_thr (float): The score threthold of visualization (Not
supported for now). Defaults to 0.3.
Returns:
dict[str, list]: The prediction results.
"""
model.eval()
results = defaultdict(list)
dataset = data_loader.dataset
prog_bar = mmcv.ProgressBar(len(dataset))
for i, data in enumerate(data_loader):
with torch.no_grad():
result = model(return_loss=False, rescale=True, **data)
for k, v in result.items():
results[k].append(v)
if show or out_dir:
pass # TODO
batch_size = data['img'][0].size(0)
for _ in range(batch_size):
prog_bar.update()
return results
[docs]def multi_gpu_test(model, data_loader, tmpdir=None, gpu_collect=False):
"""Test model with multiple gpus.
This method tests model with multiple gpus and collects the results
under two different modes: gpu and cpu modes. By setting 'gpu_collect=True'
it encodes results to gpu tensors and use gpu communication for results
collection. On cpu mode it saves the results on different gpus to 'tmpdir'
and collects them by the rank 0 worker. 'gpu_collect=True' is not
supported for now.
Args:
model (nn.Module): Model to be tested.
data_loader (nn.Dataloader): Pytorch data loader.
tmpdir (str): Path of directory to save the temporary results from
different gpus under cpu mode. Defaults to None.
gpu_collect (bool): Option to use either gpu or cpu to collect results.
Defaults to False.
Returns:
dict[str, list]: The prediction results.
"""
model.eval()
results = defaultdict(list)
dataset = data_loader.dataset
rank, world_size = get_dist_info()
if rank == 0:
prog_bar = mmcv.ProgressBar(len(dataset))
time.sleep(2) # This line can prevent deadlock problem in some cases.
for i, data in enumerate(data_loader):
with torch.no_grad():
result = model(return_loss=False, rescale=True, **data)
for k, v in result.items():
results[k].append(v)
if rank == 0:
batch_size = data['img'][0].size(0)
for _ in range(batch_size * world_size):
prog_bar.update()
# collect results from all ranks
if gpu_collect:
raise NotImplementedError
else:
results = collect_results_cpu(results, tmpdir)
return results
def collect_results_cpu(result_part, tmpdir=None):
"""Collect results on cpu mode.
Saves the results on different gpus to 'tmpdir' and collects them by the
rank 0 worker.
Args:
result_part (dict[list]): The part of prediction results.
tmpdir (str): Path of directory to save the temporary results from
different gpus under cpu mode. If is None, use `tempfile.mkdtemp()`
to make a temporary path. Defaults to None.
Returns:
dict[str, list]: The prediction results.
"""
rank, world_size = get_dist_info()
# create a tmp dir if it is not specified
if tmpdir is None:
MAX_LEN = 512
# 32 is whitespace
dir_tensor = torch.full((MAX_LEN, ),
32,
dtype=torch.uint8,
device='cuda')
if rank == 0:
tmpdir = tempfile.mkdtemp()
tmpdir = torch.tensor(
bytearray(tmpdir.encode()), dtype=torch.uint8, device='cuda')
dir_tensor[:len(tmpdir)] = tmpdir
dist.broadcast(dir_tensor, 0)
tmpdir = dir_tensor.cpu().numpy().tobytes().decode().rstrip()
else:
mmcv.mkdir_or_exist(tmpdir)
# dump the part result to the dir
mmcv.dump(result_part, osp.join(tmpdir, f'part_{rank}.pkl'))
dist.barrier()
# collect all parts
if rank != 0:
return None
else:
# load results of all parts from tmp dir
part_list = defaultdict(list)
for i in range(world_size):
part_file = osp.join(tmpdir, f'part_{i}.pkl')
part_file = mmcv.load(part_file)
for k, v in part_file.items():
part_list[k].extend(v)
shutil.rmtree(tmpdir)
return part_list