1#!/usr/bin/env python3.4
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import pprint
18import queue
19
20import acts.base_test
21import acts.test_utils.wifi.wifi_test_utils as wutils
22import acts.utils
23from acts import asserts
24from acts.controllers.sl4a_lib import rpc_client
25
26WifiEnums = wutils.WifiEnums
27
28# Macros for RttParam keywords
29RttParam = WifiEnums.RttParam
30# Macros for RttManager
31Rtt = WifiEnums.Rtt
32RttBW = WifiEnums.RttBW
33RttPreamble = WifiEnums.RttPreamble
34RttPeerType = WifiEnums.RttPeerType
35RttType = WifiEnums.RttType
36
37ScanResult = WifiEnums.ScanResult
38RTT_MARGIN_OF_ERROR = WifiEnums.RTT_MARGIN_OF_ERROR
39
40
41class WifiRTTRangingError(Exception):
42    """Error in WifiScanner Rtt."""
43
44
45class WifiRttManagerTest(acts.base_test.BaseTestClass):
46    """Tests for wifi's RttManager APIs."""
47    tests = None
48    MAX_RTT_AP = 10
49
50    def __init__(self, controllers):
51        acts.base_test.BaseTestClass.__init__(self, controllers)
52        self.tests = ("test_support_check", "test_invalid_params",
53                      "test_capability_check",
54                      "test_rtt_ranging_single_AP_stress",
55                      "test_regular_scan_then_rtt_ranging_stress",
56                      "test_gscan_then_rtt_ranging_stress")
57
58    def setup_class(self):
59        self.dut = self.android_devices[0]
60        wutils.wifi_test_device_init(self.dut)
61        required_params = ("support_models", "stress_num", "vht80_5g",
62                           "actual_distance")
63        self.unpack_userparams(required_params)
64        asserts.assert_true(
65            self.actual_distance >= 5,
66            "Actual distance should be no shorter than 5 meters.")
67        self.visible_networks = (self.vht80_5g, )
68        self.default_rtt_params = {
69            RttParam.request_type: RttType.TYPE_TWO_SIDED,
70            RttParam.device_type: RttPeerType.PEER_TYPE_AP,
71            RttParam.preamble: RttPreamble.PREAMBLE_HT,
72            RttParam.bandwidth: RttBW.BW_80_SUPPORT
73        }
74        # Expected capability for devices that don't support RTT.
75        rtt_cap_neg = {
76            'lcrSupported': False,
77            'bwSupported': 0,
78            'twoSided11McRttSupported': False,
79            'preambleSupported': 0,
80            'oneSidedRttSupported': False,
81            'lciSupported': False
82        }
83        rtt_cap_shamu = {
84            'lcrSupported': False,
85            'bwSupported': 0x1C,
86            'twoSided11McRttSupported': True,
87            'preambleSupported': 6,
88            'oneSidedRttSupported': False,
89            'lciSupported': False
90        }
91        rtt_cap_bullhead = {
92            'lcrSupported': True,
93            'bwSupported': 0x1C,
94            'twoSided11McRttSupported': True,
95            'preambleSupported': 7,
96            'oneSidedRttSupported': True,
97            'lciSupported': True
98        }
99        rtt_cap_angler = {
100            'lcrSupported': True,
101            'bwSupported': 0x1C,
102            'twoSided11McRttSupported': True,
103            'preambleSupported': 6,
104            'oneSidedRttSupported': False,
105            'lciSupported': True
106        }
107        self.rtt_cap_table = {
108            "hammerhead": rtt_cap_neg,
109            "shamu": rtt_cap_shamu,
110            "volantis": rtt_cap_neg,
111            "volantisg": rtt_cap_neg,
112            "bullhead": rtt_cap_bullhead,
113            "angler": rtt_cap_angler
114        }
115
116    """Helper Functions"""
117
118    def invalid_params_logic(self, rtt_params):
119        try:
120            self.dut.droid.wifiRttStartRanging([rtt_params])
121        except rpc_client.Sl4aApiError as e:
122            e_str = str(e)
123            asserts.assert_true(
124                "IllegalArgumentException" in e_str,
125                "Missing IllegalArgumentException in %s." % e_str)
126            msg = "Got expected exception with invalid param %s." % rtt_params
127            self.log.info(msg)
128
129    def get_rtt_results(self, rtt_params):
130        """Starts RTT ranging and get results.
131
132        Args:
133            rtt_params: A list of dicts each representing an RttParam.
134
135        Returns:
136            Rtt ranging results.
137        """
138        self.log.debug("Start ranging with:\n%s" % pprint.pformat(rtt_params))
139        idx = self.dut.droid.wifiRttStartRanging(rtt_params)
140        event = None
141        try:
142            event = self.dut.ed.pop_events("WifiRttRanging%d" % idx, 30)
143            if event[0]["name"].endswith("onSuccess"):
144                results = event[0]["data"]["Results"]
145                result_len = len(results)
146                param_len = len(rtt_params)
147                asserts.assert_true(result_len == param_len,
148                                    "Expected %d results, got %d." %
149                                    (param_len, result_len))
150                # Add acceptable margin of error to results, which will be used
151                # during result processing.
152                for i, r in enumerate(results):
153                    bw_mode = rtt_params[i][RttParam.bandwidth]
154                    r[RttParam.margin] = RTT_MARGIN_OF_ERROR[bw_mode]
155            self.log.debug(pprint.pformat(event))
156            return event
157        except queue.Empty:
158            self.log.error("Waiting for RTT event timed out.")
159            return None
160
161    def network_selector(self, network_info):
162        """Decides if a network should be used for rtt ranging.
163
164        There are a few conditions:
165        1. This network supports 80211mc.
166        2. This network's info matches certain conditions.
167
168        This is added to better control which networks to range against instead
169        of blindly use all 80211mc networks in air.
170
171        Args:
172            network_info: A dict representing a WiFi network.
173
174        Returns:
175            True if the input network should be used for ranging, False
176            otherwise.
177        """
178        target_params = {
179            "is80211McRTTResponder": True,
180            WifiEnums.BSSID_KEY: self.vht80_5g[WifiEnums.BSSID_KEY],
181        }
182        for k, v in target_params.items():
183            if k not in network_info:
184                return False
185            if type(network_info[k]) is str:
186                network_info[k] = network_info[k].lower()
187                v = v.lower()
188            if network_info[k] != v:
189                return False
190        return True
191
192    def regular_scan_for_rtt_networks(self):
193        """Scans for 11mc-capable WiFi networks using regular wifi scan.
194
195        Networks are selected based on self.network_selector.
196
197        Returns:
198            A list of networks that have RTTResponders.
199        """
200        wutils.start_wifi_connection_scan(self.dut)
201        networks = self.dut.droid.wifiGetScanResults()
202        rtt_networks = []
203        for nw in networks:
204            if self.network_selector(nw):
205                rtt_networks.append(nw)
206        return rtt_networks
207
208    def gscan_for_rtt_networks(self):
209        """Scans for 11mc-capable WiFi networks using wifi gscan.
210
211        Networks are selected based on self.network_selector.
212
213        Returns:
214            A list of networks that have RTTResponders.
215        """
216        s = {
217            "reportEvents": WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT,
218            "band": WifiEnums.WIFI_BAND_BOTH,
219            "periodInMs": 10000,
220            "numBssidsPerScan": 32
221        }
222        idx = wutils.start_wifi_single_scan(self.android_devices[0],
223                                            s)["Index"]
224        self.log.info("Scan index is %d" % idx)
225        event_name = "WifiScannerScan%donFullResult" % idx
226
227        def condition(event):
228            nw = event["data"]["Results"][0]
229            return self.network_selector(nw)
230
231        rtt_networks = []
232        try:
233            for i in range(len(self.visible_networks)):
234                event = self.dut.ed.wait_for_event(event_name, condition, 30)
235                rtt_networks.append(event["data"]["Results"][0])
236            self.log.info("Waiting for gscan to finish.")
237            event_name = "WifiScannerScan%donResults" % idx
238            event = self.dut.ed.pop_event(event_name, 300)
239            total_network_cnt = len(event["data"]["Results"][0]["ScanResults"])
240            self.log.info("Found %d networks in total." % total_network_cnt)
241            self.log.debug(rtt_networks)
242            return rtt_networks
243        except queue.Empty:
244            self.log.error("Timed out waiting for gscan result.")
245
246    def process_rtt_events(self, events):
247        """Processes rtt ranging events.
248
249        Validates RTT event types.
250        Validates RTT response status and measured RTT values.
251        Enforces success rate.
252
253        Args:
254            events: A list of callback results from RTT ranging.
255        """
256        total = aborted = failure = invalid = out_of_range = 0
257        for e in events:
258            if e["name"].endswith("onAborted"):
259                aborted += 1
260            if e["name"].endswith("onFailure"):
261                failure += 1
262            if e["name"].endswith("onSuccess"):
263                results = e["data"]["Results"]
264                for r in results:
265                    total += 1
266                    # Status needs to be "success".
267                    status = r["status"]
268                    if status != Rtt.STATUS_SUCCESS:
269                        self.log.warning("Got error status %d." % status)
270                        invalid += 1
271                        continue
272                    # RTT value should be positive.
273                    value = r["rtt"]
274                    if value <= 0:
275                        self.log.warning("Got error RTT value %d." % value)
276                        invalid += 1
277                        continue
278                    # Vadlidate values in successful responses.
279                    acd = self.actual_distance
280                    margin = r[RttParam.margin]
281                    # If the distance is >= 0, check distance only.
282                    d = r["distance"] / 100.0
283                    if d > 0:
284                        # Distance should be in acceptable range.
285                        is_d_valid = (acd - margin) <= d <= acd + (margin)
286                        if not is_d_valid:
287                            self.log.warning(
288                                ("Reported distance %.2fm is out of the"
289                                 " acceptable range %.2f±%.2fm.") % (d, acd,
290                                                                     margin))
291                            out_of_range += 1
292                        continue
293                    # Check if the RTT value is in range.
294                    d = (value / 2) / 1E10 * wutils.SPEED_OF_LIGHT
295                    is_rtt_valid = (acd - margin) <= d <= (acd + margin)
296                    if not is_rtt_valid:
297                        self.log.warning((
298                            "Distance calculated from RTT value %d - %.2fm is "
299                            "out of the acceptable range %.2f±%dm.") %
300                                         (value, d, acd, margin))
301                        out_of_range += 1
302                        continue
303                    # Check if the RSSI value is in range.
304                    rssi = r["rssi"]
305                    # average rssi in 0.5dB steps, e.g. 143 implies -71.5dB,
306                    # so the valid range is 0 to 200
307                    is_rssi_valid = 0 <= rssi <= 200
308                    if not is_rssi_valid:
309                        self.log.warning(("Reported RSSI %d is out of the"
310                                          " acceptable range 0-200") % rssi)
311                        out_of_range += 1
312                        continue
313        self.log.info((
314            "Processed %d RTT events. %d aborted, %s failed. Among"
315            " the %d responses in successful callbacks, %s are invalid, %s has"
316            " RTT values that are out of range.") %
317                      (len(events), aborted, failure, total, invalid,
318                       out_of_range))
319        asserts.assert_true(total > 0, "No RTT response received.")
320        # Percentage of responses that are valid should be >= 90%.
321        valid_total = float(total - invalid)
322        valid_response_rate = valid_total / total
323        self.log.info("%.2f%% of the responses are valid." %
324                      (valid_response_rate * 100))
325        asserts.assert_true(valid_response_rate >= 0.9,
326                            "Valid response rate is below 90%%.")
327        # Among the valid responses, the percentage of having an in-range RTT
328        # value should be >= 67%.
329        valid_value_rate = (total - invalid - out_of_range) / valid_total
330        self.log.info("%.2f%% of valid responses have in-range RTT value" %
331                      (valid_value_rate * 100))
332        msg = "In-range response rate is below 67%%."
333        asserts.assert_true(valid_value_rate >= 0.67, msg)
334
335    def scan_then_rtt_ranging_stress_logic(self, scan_func):
336        """Test logic to scan then do rtt ranging based on the scan results.
337
338        Steps:
339        1. Start scan and get scan results.
340        2. Filter out the networks that support rtt in scan results.
341        3. Start rtt ranging against those networks that support rtt.
342        4. Repeat
343        5. Process RTT events.
344
345        Args:
346            scan_func: A function that does a wifi scan and only returns the
347                networks that support rtt in the scan results.
348
349        Returns:
350            True if rtt behaves as expected, False otherwise.
351        """
352        total = self.stress_num
353        failed = 0
354        all_results = []
355        for i in range(total):
356            self.log.info("Iteration %d" % i)
357            rtt_networks = scan_func()
358            if not rtt_networks:
359                self.log.warning("Found no rtt network, skip this iteration.")
360                failed += 1
361                continue
362            self.log.debug("Found rtt networks:%s" % rtt_networks)
363            rtt_params = []
364            for rn in rtt_networks:
365                rtt_params.append(self.rtt_config_from_scan_result(rn))
366            results = self.get_rtt_results(rtt_params)
367            if results:
368                self.log.debug(results)
369                all_results += results
370        self.process_rtt_events(all_results)
371
372    def rtt_config_from_scan_result(self, scan_result):
373        """Creates an Rtt configuration based on the scan result of a network.
374        """
375        scan_result_channel_width_to_rtt = {
376            ScanResult.CHANNEL_WIDTH_20MHZ: RttBW.BW_20_SUPPORT,
377            ScanResult.CHANNEL_WIDTH_40MHZ: RttBW.BW_40_SUPPORT,
378            ScanResult.CHANNEL_WIDTH_80MHZ: RttBW.BW_80_SUPPORT,
379            ScanResult.CHANNEL_WIDTH_160MHZ: RttBW.BW_160_SUPPORT,
380            ScanResult.CHANNEL_WIDTH_80MHZ_PLUS_MHZ: RttBW.BW_160_SUPPORT
381        }
382        p = {}
383        freq = scan_result[RttParam.frequency]
384        p[RttParam.frequency] = freq
385        p[RttParam.BSSID] = scan_result[WifiEnums.BSSID_KEY]
386        if freq > 5000:
387            p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT
388        else:
389            p[RttParam.preamble] = RttPreamble.PREAMBLE_HT
390        cf0 = scan_result[RttParam.center_freq0]
391        if cf0 > 0:
392            p[RttParam.center_freq0] = cf0
393        cf1 = scan_result[RttParam.center_freq1]
394        if cf1 > 0:
395            p[RttParam.center_freq1] = cf1
396        cw = scan_result["channelWidth"]
397        p[RttParam.channel_width] = cw
398        p[RttParam.bandwidth] = scan_result_channel_width_to_rtt[cw]
399        if scan_result["is80211McRTTResponder"]:
400            p[RttParam.request_type] = RttType.TYPE_TWO_SIDED
401        else:
402            p[RttParam.request_type] = RttType.TYPE_ONE_SIDED
403        return p
404
405    """Tests"""
406
407    def test_invalid_params(self):
408        """Tests the sanity check function in RttManager.
409        """
410        param_list = [{
411            RttParam.device_type: 3
412        }, {
413            RttParam.device_type: 1,
414            RttParam.request_type: 3
415        }, {
416            RttParam.device_type: 1,
417            RttParam.request_type: 1,
418            RttParam.BSSID: None
419        }, {
420            RttParam.BSSID: "xxxxxxxx",
421            RttParam.number_burst: 1
422        }, {
423            RttParam.number_burst: 0,
424            RttParam.num_samples_per_burst: -1
425        }, {
426            RttParam.num_samples_per_burst: 32
427        }, {
428            RttParam.num_samples_per_burst: 5,
429            RttParam.num_retries_per_measurement_frame: -1
430        }, {
431            RttParam.num_retries_per_measurement_frame: 4
432        }, {
433            RttParam.num_retries_per_measurement_frame: 2,
434            RttParam.num_retries_per_FTMR: -1
435        }, {
436            RttParam.num_retries_per_FTMR: 4
437        }]
438        for param in param_list:
439            self.invalid_params_logic(param)
440        return True
441
442    def test_support_check(self):
443        """No device supports device-to-device RTT; only shamu and volantis
444        devices support device-to-ap RTT.
445        """
446        model = acts.utils.trim_model_name(self.dut.model)
447        asserts.assert_true(self.dut.droid.wifiIsDeviceToDeviceRttSupported(),
448                            "Device to device is supposed to be supported.")
449        if any([model in m for m in self.support_models]):
450            asserts.assert_true(self.dut.droid.wifiIsDeviceToApRttSupported(),
451                                "%s should support device-to-ap RTT." % model)
452            self.log.info("%s supports device-to-ap RTT as expected." % model)
453        else:
454            asserts.assert_false(
455                self.dut.droid.wifiIsDeviceToApRttSupported(),
456                "%s should not support device-to-ap RTT." % model)
457            self.log.info(
458                ("%s does not support device-to-ap RTT as expected.") % model)
459            asserts.abort_class(
460                "Device %s does not support RTT, abort." % model)
461        return True
462
463    def test_capability_check(self):
464        """Checks the capabilities params are reported as expected.
465        """
466        caps = self.dut.droid.wifiRttGetCapabilities()
467        asserts.assert_true(caps, "Unable to get rtt capabilities.")
468        self.log.debug("Got rtt capabilities %s" % caps)
469        model = acts.utils.trim_model_name(self.dut.model)
470        asserts.assert_true(model in self.rtt_cap_table,
471                            "Unknown model %s" % model)
472        expected_caps = self.rtt_cap_table[model]
473        for k, v in expected_caps.items():
474            asserts.assert_true(k in caps, "%s missing in capabilities." % k)
475            asserts.assert_true(v == caps[k], "Expected %s for %s, got %s." %
476                                (v, k, caps[k]))
477        return True
478
479    def test_discovery(self):
480        """Make sure all the expected 11mc BSSIDs are discovered properly, and
481        they are all reported as 802.11mc Rtt Responder.
482
483        Procedures:
484            1. Scan for wifi networks.
485
486        Expect:
487            All the RTT networks show up in scan results and their
488            "is80211McRTTResponder" is True.
489            All the non-RTT networks show up in scan results and their
490            "is80211McRTTResponder" is False.
491        """
492        wutils.start_wifi_connection_scan(self.dut)
493        scan_results = self.dut.droid.wifiGetScanResults()
494        self.log.debug(scan_results)
495        for n in visible_networks:
496            asserts.assert_true(
497                wutils.match_networks(n, scan_results),
498                "Network %s was not discovered properly." % n)
499        return True
500
501    def test_missing_bssid(self):
502        """Start Rtt ranging with a config that does not have BSSID set.
503        Should not get onSuccess.
504        """
505        p = {}
506        p[RttParam.request_type] = RttType.TYPE_TWO_SIDED
507        p[RttParam.device_type] = RttPeerType.PEER_TYPE_AP
508        p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT
509        p[RttParam.bandwidth] = RttBW.BW_80_SUPPORT
510        p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key]
511        p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0]
512        results = self.get_rtt_results([p])
513        asserts.assert_true(results, "Did not get any result.")
514        self.log.info(pprint.pformat(results))
515
516    def test_rtt_ranging_single_AP_stress(self):
517        """Stress test for Rtt against one AP.
518
519        Steps:
520            1. Do RTT ranging against the self.vht80_5g BSSID.
521            2. Repeat self.stress_num times.
522            3. Verify RTT results.
523        """
524        p = {}
525        p[RttParam.request_type] = RttType.TYPE_TWO_SIDED
526        p[RttParam.device_type] = RttPeerType.PEER_TYPE_AP
527        p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT
528        p[RttParam.bandwidth] = RttBW.BW_80_SUPPORT
529        p[RttParam.BSSID] = self.vht80_5g[WifiEnums.BSSID_KEY]
530        p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key]
531        p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0]
532        p[RttParam.channel_width] = ScanResult.CHANNEL_WIDTH_80MHZ
533        all_results = []
534        for i in range(self.stress_num):
535            self.log.info("RTT Ranging iteration %d" % (i + 1))
536            results = self.get_rtt_results([p])
537            if results:
538                all_results += results
539            else:
540                self.log.warning("Did not get result for iteration %d." % i)
541        frate = self.process_rtt_events(all_results)
542
543    def test_regular_scan_then_rtt_ranging_stress(self):
544        """Stress test for regular scan then start rtt ranging against the RTT
545        compatible networks found by the scan.
546
547        Steps:
548            1. Start a WiFi connection scan.
549            2. Get scan results.
550            3. Find all the 11mc capable BSSIDs and choose the ones to use
551               (self.network_selector)
552            4. Do RTT ranging against the selected BSSIDs, with the info from
553               the scan results.
554            5. Repeat self.stress_num times.
555            6. Verify RTT results.
556        """
557        scan_func = self.regular_scan_for_rtt_networks
558        self.scan_then_rtt_ranging_stress_logic(scan_func)
559
560    def test_gscan_then_rtt_ranging_stress(self):
561        """Stress test for gscan then start rtt ranging against the RTT
562        compatible networks found by the scan.
563
564        Steps:
565            1. Start a WifiScanner single shot scan on all channels.
566            2. Wait for full scan results of the expected 11mc capable BSSIDs.
567            3. Wait for single shot scan to finish on all channels.
568            4. Do RTT ranging against the selected BSSIDs, with the info from
569               the scan results.
570            5. Repeat self.stress_num times.
571            6. Verify RTT results.
572        """
573        scan_func = self.gscan_for_rtt_networks
574        self.scan_then_rtt_ranging_stress_logic(scan_func)
575