Diff of /mmaction/apis/test.py [000000] .. [6d389a]

Switch to unified view

a b/mmaction/apis/test.py
1
# Copyright (c) OpenMMLab. All rights reserved.
2
import os.path as osp
3
import pickle
4
import shutil
5
import tempfile
6
# TODO import test functions from mmcv and delete them from mmaction2
7
import warnings
8
9
import mmcv
10
import torch
11
import torch.distributed as dist
12
from mmcv.runner import get_dist_info
13
14
try:
15
    from mmcv.engine import (single_gpu_test, multi_gpu_test,
16
                             collect_results_gpu, collect_results_cpu)
17
    from_mmcv = True
18
except (ImportError, ModuleNotFoundError):
19
    warnings.warn(
20
        'DeprecationWarning: single_gpu_test, multi_gpu_test, '
21
        'collect_results_cpu, collect_results_gpu from mmaction2 will be '
22
        'deprecated. Please install mmcv through master branch.')
23
    from_mmcv = False
24
25
if not from_mmcv:
26
27
    def single_gpu_test(model, data_loader):  # noqa: F811
28
        """Test model with a single gpu.
29
30
        This method tests model with a single gpu and
31
        displays test progress bar.
32
33
        Args:
34
            model (nn.Module): Model to be tested.
35
            data_loader (nn.Dataloader): Pytorch data loader.
36
37
        Returns:
38
            list: The prediction results.
39
        """
40
        model.eval()
41
        results = []
42
        dataset = data_loader.dataset
43
        prog_bar = mmcv.ProgressBar(len(dataset))
44
        for data in data_loader:
45
            with torch.no_grad():
46
                result = model(return_loss=False, **data)
47
            results.extend(result)
48
49
            # use the first key as main key to calculate the batch size
50
            batch_size = len(next(iter(data.values())))
51
            for _ in range(batch_size):
52
                prog_bar.update()
53
        return results
54
55
    def multi_gpu_test(  # noqa: F811
56
            model, data_loader, tmpdir=None, gpu_collect=True):
57
        """Test model with multiple gpus.
58
59
        This method tests model with multiple gpus and collects the results
60
        under two different modes: gpu and cpu modes. By setting
61
        'gpu_collect=True' it encodes results to gpu tensors and use gpu
62
        communication for results collection. On cpu mode it saves the results
63
        on different gpus to 'tmpdir' and collects them by the rank 0 worker.
64
65
        Args:
66
            model (nn.Module): Model to be tested.
67
            data_loader (nn.Dataloader): Pytorch data loader.
68
            tmpdir (str): Path of directory to save the temporary results from
69
                different gpus under cpu mode. Default: None
70
            gpu_collect (bool): Option to use either gpu or cpu to collect
71
                results. Default: True
72
73
        Returns:
74
            list: The prediction results.
75
        """
76
        model.eval()
77
        results = []
78
        dataset = data_loader.dataset
79
        rank, world_size = get_dist_info()
80
        if rank == 0:
81
            prog_bar = mmcv.ProgressBar(len(dataset))
82
        for data in data_loader:
83
            with torch.no_grad():
84
                result = model(return_loss=False, **data)
85
            results.extend(result)
86
87
            if rank == 0:
88
                # use the first key as main key to calculate the batch size
89
                batch_size = len(next(iter(data.values())))
90
                for _ in range(batch_size * world_size):
91
                    prog_bar.update()
92
93
        # collect results from all ranks
94
        if gpu_collect:
95
            results = collect_results_gpu(results, len(dataset))
96
        else:
97
            results = collect_results_cpu(results, len(dataset), tmpdir)
98
        return results
99
100
    def collect_results_cpu(result_part, size, tmpdir=None):  # noqa: F811
101
        """Collect results in cpu mode.
102
103
        It saves the results on different gpus to 'tmpdir' and collects
104
        them by the rank 0 worker.
105
106
        Args:
107
            result_part (list): Results to be collected
108
            size (int): Result size.
109
            tmpdir (str): Path of directory to save the temporary results from
110
                different gpus under cpu mode. Default: None
111
112
        Returns:
113
            list: Ordered results.
114
        """
115
        rank, world_size = get_dist_info()
116
        # create a tmp dir if it is not specified
117
        if tmpdir is None:
118
            MAX_LEN = 512
119
            # 32 is whitespace
120
            dir_tensor = torch.full((MAX_LEN, ),
121
                                    32,
122
                                    dtype=torch.uint8,
123
                                    device='cuda')
124
            if rank == 0:
125
                mmcv.mkdir_or_exist('.dist_test')
126
                tmpdir = tempfile.mkdtemp(dir='.dist_test')
127
                tmpdir = torch.tensor(
128
                    bytearray(tmpdir.encode()),
129
                    dtype=torch.uint8,
130
                    device='cuda')
131
                dir_tensor[:len(tmpdir)] = tmpdir
132
            dist.broadcast(dir_tensor, 0)
133
            tmpdir = dir_tensor.cpu().numpy().tobytes().decode().rstrip()
134
        else:
135
            tmpdir = osp.join(tmpdir, '.dist_test')
136
            mmcv.mkdir_or_exist(tmpdir)
137
        # synchronizes all processes to make sure tmpdir exist
138
        dist.barrier()
139
        # dump the part result to the dir
140
        mmcv.dump(result_part, osp.join(tmpdir, f'part_{rank}.pkl'))
141
        # synchronizes all processes for loading pickle file
142
        dist.barrier()
143
        # collect all parts
144
        if rank != 0:
145
            return None
146
        # load results of all parts from tmp dir
147
        part_list = []
148
        for i in range(world_size):
149
            part_file = osp.join(tmpdir, f'part_{i}.pkl')
150
            part_list.append(mmcv.load(part_file))
151
        # sort the results
152
        ordered_results = []
153
        for res in zip(*part_list):
154
            ordered_results.extend(list(res))
155
        # the dataloader may pad some samples
156
        ordered_results = ordered_results[:size]
157
        # remove tmp dir
158
        shutil.rmtree(tmpdir)
159
        return ordered_results
160
161
    def collect_results_gpu(result_part, size):  # noqa: F811
162
        """Collect results in gpu mode.
163
164
        It encodes results to gpu tensors and use gpu communication for results
165
        collection.
166
167
        Args:
168
            result_part (list): Results to be collected
169
            size (int): Result size.
170
171
        Returns:
172
            list: Ordered results.
173
        """
174
        rank, world_size = get_dist_info()
175
        # dump result part to tensor with pickle
176
        part_tensor = torch.tensor(
177
            bytearray(pickle.dumps(result_part)),
178
            dtype=torch.uint8,
179
            device='cuda')
180
        # gather all result part tensor shape
181
        shape_tensor = torch.tensor(part_tensor.shape, device='cuda')
182
        shape_list = [shape_tensor.clone() for _ in range(world_size)]
183
        dist.all_gather(shape_list, shape_tensor)
184
        # padding result part tensor to max length
185
        shape_max = torch.tensor(shape_list).max()
186
        part_send = torch.zeros(shape_max, dtype=torch.uint8, device='cuda')
187
        part_send[:shape_tensor[0]] = part_tensor
188
        part_recv_list = [
189
            part_tensor.new_zeros(shape_max) for _ in range(world_size)
190
        ]
191
        # gather all result part
192
        dist.all_gather(part_recv_list, part_send)
193
194
        if rank == 0:
195
            part_list = []
196
            for recv, shape in zip(part_recv_list, shape_list):
197
                part_list.append(
198
                    pickle.loads(recv[:shape[0]].cpu().numpy().tobytes()))
199
            # sort the results
200
            ordered_results = []
201
            for res in zip(*part_list):
202
                ordered_results.extend(list(res))
203
            # the dataloader may pad some samples
204
            ordered_results = ordered_results[:size]
205
            return ordered_results
206
        return None