1# Copyright 2018, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Integration Finder class.
17"""
18
19# pylint: disable=line-too-long
20
21import copy
22import logging
23import os
24import re
25import xml.etree.ElementTree as ElementTree
26
27import atest_error
28import constants
29
30from test_finders import test_info
31from test_finders import test_finder_base
32from test_finders import test_finder_utils
33from test_runners import atest_tf_test_runner
34
35# Find integration name based on file path of integration config xml file.
36# Group matches "foo/bar" given "blah/res/config/blah/res/config/foo/bar.xml
37_INT_NAME_RE = re.compile(r'^.*\/res\/config\/(?P<int_name>.*).xml$')
38_TF_TARGETS = frozenset(['tradefed', 'tradefed-contrib'])
39_GTF_TARGETS = frozenset(['google-tradefed', 'google-tradefed-contrib'])
40_CONTRIB_TARGETS = frozenset(['google-tradefed-contrib'])
41_TF_RES_DIR = '../res/config'
42
43
44class TFIntegrationFinder(test_finder_base.TestFinderBase):
45    """Integration Finder class."""
46    NAME = 'INTEGRATION'
47    _TEST_RUNNER = atest_tf_test_runner.AtestTradefedTestRunner.NAME
48
49
50    def __init__(self, module_info=None):
51        super(TFIntegrationFinder, self).__init__()
52        self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP)
53        self.module_info = module_info
54        # TODO: Break this up into AOSP/google_tf integration finders.
55        self.tf_dirs, self.gtf_dirs = self._get_integration_dirs()
56        self.integration_dirs = self.tf_dirs + self.gtf_dirs
57
58    def _get_mod_paths(self, module_name):
59        """Return the paths of the given module name."""
60        if self.module_info:
61            # Since aosp/801774 merged, the path of test configs have been
62            # changed to ../res/config.
63            if module_name in _CONTRIB_TARGETS:
64                mod_paths = self.module_info.get_paths(module_name)
65                return [os.path.join(path, _TF_RES_DIR) for path in mod_paths]
66            return self.module_info.get_paths(module_name)
67        return []
68
69    def _get_integration_dirs(self):
70        """Get integration dirs from MODULE_INFO based on targets.
71
72        Returns:
73            A tuple of lists of strings of integration dir rel to repo root.
74        """
75        tf_dirs = list(filter(None, [d for x in _TF_TARGETS for d in self._get_mod_paths(x)]))
76        gtf_dirs = list(filter(None, [d for x in _GTF_TARGETS for d in self._get_mod_paths(x)]))
77        return tf_dirs, gtf_dirs
78
79    def _get_build_targets(self, rel_config):
80        config_file = os.path.join(self.root_dir, rel_config)
81        xml_root = self._load_xml_file(config_file)
82        targets = test_finder_utils.get_targets_from_xml_root(xml_root,
83                                                              self.module_info)
84        if self.gtf_dirs:
85            targets.add(constants.GTF_TARGET)
86        return frozenset(targets)
87
88    def _load_xml_file(self, path):
89        """Load an xml file with option to expand <include> tags
90
91        Args:
92            path: A string of path to xml file.
93
94        Returns:
95            An xml.etree.ElementTree.Element instance of the root of the tree.
96        """
97        tree = ElementTree.parse(path)
98        root = tree.getroot()
99        self._load_include_tags(root)
100        return root
101
102    #pylint: disable=invalid-name
103    def _load_include_tags(self, root):
104        """Recursively expand in-place the <include> tags in a given xml tree.
105
106        Python xml libraries don't support our type of <include> tags. Logic
107        used below is modified version of the built-in ElementInclude logic
108        found here:
109        https://github.com/python/cpython/blob/2.7/Lib/xml/etree/ElementInclude.py
110
111        Args:
112            root: The root xml.etree.ElementTree.Element.
113
114        Returns:
115            An xml.etree.ElementTree.Element instance with
116            include tags expanded.
117        """
118        i = 0
119        while i < len(root):
120            elem = root[i]
121            if elem.tag == 'include':
122                # expand included xml file
123                integration_name = elem.get('name')
124                if not integration_name:
125                    logging.warning('skipping <include> tag with no "name" value')
126                    continue
127                full_paths = self._search_integration_dirs(integration_name)
128                node = None
129                if full_paths:
130                    node = self._load_xml_file(full_paths[0])
131                if node is None:
132                    raise atest_error.FatalIncludeError("can't load %r" %
133                                                        integration_name)
134                node = copy.copy(node)
135                if elem.tail:
136                    node.tail = (node.tail or "") + elem.tail
137                root[i] = node
138            i = i + 1
139
140    def _search_integration_dirs(self, name):
141        """Search integration dirs for name and return full path.
142        Args:
143            name: A string of integration name as seen in tf's list configs.
144
145        Returns:
146            A list of test path.
147        """
148        test_files = []
149        for integration_dir in self.integration_dirs:
150            abs_path = os.path.join(self.root_dir, integration_dir)
151            found_test_files = test_finder_utils.run_find_cmd(
152                test_finder_utils.FIND_REFERENCE_TYPE.INTEGRATION,
153                abs_path, name)
154            if found_test_files:
155                test_files.extend(found_test_files)
156        return test_files
157
158    def find_test_by_integration_name(self, name):
159        """Find the test info matching the given integration name.
160
161        Args:
162            name: A string of integration name as seen in tf's list configs.
163
164        Returns:
165            A populated TestInfo namedtuple if test found, else None
166        """
167        class_name = None
168        if ':' in name:
169            name, class_name = name.split(':')
170        test_files = self._search_integration_dirs(name)
171        if test_files is None:
172            return None
173        # Don't use names that simply match the path,
174        # must be the actual name used by TF to run the test.
175        t_infos = []
176        for test_file in test_files:
177            t_info = self._get_test_info(name, test_file, class_name)
178            if t_info:
179                t_infos.append(t_info)
180        return t_infos
181
182    def _get_test_info(self, name, test_file, class_name):
183        """Find the test info matching the given test_file and class_name.
184
185        Args:
186            name: A string of integration name as seen in tf's list configs.
187            test_file: A string of test_file full path.
188            class_name: A string of user's input.
189
190        Returns:
191            A populated TestInfo namedtuple if test found, else None.
192        """
193        match = _INT_NAME_RE.match(test_file)
194        if not match:
195            logging.error('Integration test outside config dir: %s',
196                          test_file)
197            return None
198        int_name = match.group('int_name')
199        if int_name != name:
200            logging.warning('Input (%s) not valid integration name, '
201                            'did you mean: %s?', name, int_name)
202            return None
203        rel_config = os.path.relpath(test_file, self.root_dir)
204        filters = frozenset()
205        if class_name:
206            class_name, methods = test_finder_utils.split_methods(class_name)
207            test_filters = []
208            if '.' in class_name:
209                test_filters.append(test_info.TestFilter(class_name, methods))
210            else:
211                logging.warning('Looking up fully qualified class name for: %s.'
212                                'Improve speed by using fully qualified names.',
213                                class_name)
214                paths = test_finder_utils.find_class_file(self.root_dir,
215                                                          class_name)
216                if not paths:
217                    return None
218                for path in paths:
219                    class_name = (
220                        test_finder_utils.get_fully_qualified_class_name(
221                            path))
222                    test_filters.append(test_info.TestFilter(
223                        class_name, methods))
224            filters = frozenset(test_filters)
225        return test_info.TestInfo(
226            test_name=name,
227            test_runner=self._TEST_RUNNER,
228            build_targets=self._get_build_targets(rel_config),
229            data={constants.TI_REL_CONFIG: rel_config,
230                  constants.TI_FILTER: filters})
231
232    def find_int_test_by_path(self, path):
233        """Find the first test info matching the given path.
234
235        Strategy:
236            path_to_integration_file --> Resolve to INTEGRATION
237            # If the path is a dir, we return nothing.
238            path_to_dir_with_integration_files --> Return None
239
240        Args:
241            path: A string of the test's path.
242
243        Returns:
244            A list of populated TestInfo namedtuple if test found, else None
245        """
246        path, _ = test_finder_utils.split_methods(path)
247
248        # Make sure we're looking for a config.
249        if not path.endswith('.xml'):
250            return None
251
252        # TODO: See if this can be generalized and shared with methods above
253        # create absolute path from cwd and remove symbolic links
254        path = os.path.realpath(path)
255        if not os.path.exists(path):
256            return None
257        int_dir = test_finder_utils.get_int_dir_from_path(path,
258                                                          self.integration_dirs)
259        if int_dir:
260            rel_config = os.path.relpath(path, self.root_dir)
261            match = _INT_NAME_RE.match(rel_config)
262            if not match:
263                logging.error('Integration test outside config dir: %s',
264                              rel_config)
265                return None
266            int_name = match.group('int_name')
267            return [test_info.TestInfo(
268                test_name=int_name,
269                test_runner=self._TEST_RUNNER,
270                build_targets=self._get_build_targets(rel_config),
271                data={constants.TI_REL_CONFIG: rel_config,
272                      constants.TI_FILTER: frozenset()})]
273        return None
274