1#!/usr/bin/env python3.4 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import pprint 18import queue 19 20import acts.base_test 21import acts.test_utils.wifi.wifi_test_utils as wutils 22import acts.utils 23from acts import asserts 24from acts.controllers.sl4a_lib import rpc_client 25 26WifiEnums = wutils.WifiEnums 27 28# Macros for RttParam keywords 29RttParam = WifiEnums.RttParam 30# Macros for RttManager 31Rtt = WifiEnums.Rtt 32RttBW = WifiEnums.RttBW 33RttPreamble = WifiEnums.RttPreamble 34RttPeerType = WifiEnums.RttPeerType 35RttType = WifiEnums.RttType 36 37ScanResult = WifiEnums.ScanResult 38RTT_MARGIN_OF_ERROR = WifiEnums.RTT_MARGIN_OF_ERROR 39 40 41class WifiRTTRangingError(Exception): 42 """Error in WifiScanner Rtt.""" 43 44 45class WifiRttManagerTest(acts.base_test.BaseTestClass): 46 """Tests for wifi's RttManager APIs.""" 47 tests = None 48 MAX_RTT_AP = 10 49 50 def __init__(self, controllers): 51 acts.base_test.BaseTestClass.__init__(self, controllers) 52 self.tests = ("test_support_check", "test_invalid_params", 53 "test_capability_check", 54 "test_rtt_ranging_single_AP_stress", 55 "test_regular_scan_then_rtt_ranging_stress", 56 "test_gscan_then_rtt_ranging_stress") 57 58 def setup_class(self): 59 self.dut = self.android_devices[0] 60 wutils.wifi_test_device_init(self.dut) 61 required_params = ("support_models", "stress_num", "vht80_5g", 62 "actual_distance") 63 self.unpack_userparams(required_params) 64 asserts.assert_true( 65 self.actual_distance >= 5, 66 "Actual distance should be no shorter than 5 meters.") 67 self.visible_networks = (self.vht80_5g, ) 68 self.default_rtt_params = { 69 RttParam.request_type: RttType.TYPE_TWO_SIDED, 70 RttParam.device_type: RttPeerType.PEER_TYPE_AP, 71 RttParam.preamble: RttPreamble.PREAMBLE_HT, 72 RttParam.bandwidth: RttBW.BW_80_SUPPORT 73 } 74 # Expected capability for devices that don't support RTT. 75 rtt_cap_neg = { 76 'lcrSupported': False, 77 'bwSupported': 0, 78 'twoSided11McRttSupported': False, 79 'preambleSupported': 0, 80 'oneSidedRttSupported': False, 81 'lciSupported': False 82 } 83 rtt_cap_shamu = { 84 'lcrSupported': False, 85 'bwSupported': 0x1C, 86 'twoSided11McRttSupported': True, 87 'preambleSupported': 6, 88 'oneSidedRttSupported': False, 89 'lciSupported': False 90 } 91 rtt_cap_bullhead = { 92 'lcrSupported': True, 93 'bwSupported': 0x1C, 94 'twoSided11McRttSupported': True, 95 'preambleSupported': 7, 96 'oneSidedRttSupported': True, 97 'lciSupported': True 98 } 99 rtt_cap_angler = { 100 'lcrSupported': True, 101 'bwSupported': 0x1C, 102 'twoSided11McRttSupported': True, 103 'preambleSupported': 6, 104 'oneSidedRttSupported': False, 105 'lciSupported': True 106 } 107 self.rtt_cap_table = { 108 "hammerhead": rtt_cap_neg, 109 "shamu": rtt_cap_shamu, 110 "volantis": rtt_cap_neg, 111 "volantisg": rtt_cap_neg, 112 "bullhead": rtt_cap_bullhead, 113 "angler": rtt_cap_angler 114 } 115 116 """Helper Functions""" 117 118 def invalid_params_logic(self, rtt_params): 119 try: 120 self.dut.droid.wifiRttStartRanging([rtt_params]) 121 except rpc_client.Sl4aApiError as e: 122 e_str = str(e) 123 asserts.assert_true( 124 "IllegalArgumentException" in e_str, 125 "Missing IllegalArgumentException in %s." % e_str) 126 msg = "Got expected exception with invalid param %s." % rtt_params 127 self.log.info(msg) 128 129 def get_rtt_results(self, rtt_params): 130 """Starts RTT ranging and get results. 131 132 Args: 133 rtt_params: A list of dicts each representing an RttParam. 134 135 Returns: 136 Rtt ranging results. 137 """ 138 self.log.debug("Start ranging with:\n%s" % pprint.pformat(rtt_params)) 139 idx = self.dut.droid.wifiRttStartRanging(rtt_params) 140 event = None 141 try: 142 event = self.dut.ed.pop_events("WifiRttRanging%d" % idx, 30) 143 if event[0]["name"].endswith("onSuccess"): 144 results = event[0]["data"]["Results"] 145 result_len = len(results) 146 param_len = len(rtt_params) 147 asserts.assert_true(result_len == param_len, 148 "Expected %d results, got %d." % 149 (param_len, result_len)) 150 # Add acceptable margin of error to results, which will be used 151 # during result processing. 152 for i, r in enumerate(results): 153 bw_mode = rtt_params[i][RttParam.bandwidth] 154 r[RttParam.margin] = RTT_MARGIN_OF_ERROR[bw_mode] 155 self.log.debug(pprint.pformat(event)) 156 return event 157 except queue.Empty: 158 self.log.error("Waiting for RTT event timed out.") 159 return None 160 161 def network_selector(self, network_info): 162 """Decides if a network should be used for rtt ranging. 163 164 There are a few conditions: 165 1. This network supports 80211mc. 166 2. This network's info matches certain conditions. 167 168 This is added to better control which networks to range against instead 169 of blindly use all 80211mc networks in air. 170 171 Args: 172 network_info: A dict representing a WiFi network. 173 174 Returns: 175 True if the input network should be used for ranging, False 176 otherwise. 177 """ 178 target_params = { 179 "is80211McRTTResponder": True, 180 WifiEnums.BSSID_KEY: self.vht80_5g[WifiEnums.BSSID_KEY], 181 } 182 for k, v in target_params.items(): 183 if k not in network_info: 184 return False 185 if type(network_info[k]) is str: 186 network_info[k] = network_info[k].lower() 187 v = v.lower() 188 if network_info[k] != v: 189 return False 190 return True 191 192 def regular_scan_for_rtt_networks(self): 193 """Scans for 11mc-capable WiFi networks using regular wifi scan. 194 195 Networks are selected based on self.network_selector. 196 197 Returns: 198 A list of networks that have RTTResponders. 199 """ 200 wutils.start_wifi_connection_scan(self.dut) 201 networks = self.dut.droid.wifiGetScanResults() 202 rtt_networks = [] 203 for nw in networks: 204 if self.network_selector(nw): 205 rtt_networks.append(nw) 206 return rtt_networks 207 208 def gscan_for_rtt_networks(self): 209 """Scans for 11mc-capable WiFi networks using wifi gscan. 210 211 Networks are selected based on self.network_selector. 212 213 Returns: 214 A list of networks that have RTTResponders. 215 """ 216 s = { 217 "reportEvents": WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT, 218 "band": WifiEnums.WIFI_BAND_BOTH, 219 "periodInMs": 10000, 220 "numBssidsPerScan": 32 221 } 222 idx = wutils.start_wifi_single_scan(self.android_devices[0], 223 s)["Index"] 224 self.log.info("Scan index is %d" % idx) 225 event_name = "WifiScannerScan%donFullResult" % idx 226 227 def condition(event): 228 nw = event["data"]["Results"][0] 229 return self.network_selector(nw) 230 231 rtt_networks = [] 232 try: 233 for i in range(len(self.visible_networks)): 234 event = self.dut.ed.wait_for_event(event_name, condition, 30) 235 rtt_networks.append(event["data"]["Results"][0]) 236 self.log.info("Waiting for gscan to finish.") 237 event_name = "WifiScannerScan%donResults" % idx 238 event = self.dut.ed.pop_event(event_name, 300) 239 total_network_cnt = len(event["data"]["Results"][0]["ScanResults"]) 240 self.log.info("Found %d networks in total." % total_network_cnt) 241 self.log.debug(rtt_networks) 242 return rtt_networks 243 except queue.Empty: 244 self.log.error("Timed out waiting for gscan result.") 245 246 def process_rtt_events(self, events): 247 """Processes rtt ranging events. 248 249 Validates RTT event types. 250 Validates RTT response status and measured RTT values. 251 Enforces success rate. 252 253 Args: 254 events: A list of callback results from RTT ranging. 255 """ 256 total = aborted = failure = invalid = out_of_range = 0 257 for e in events: 258 if e["name"].endswith("onAborted"): 259 aborted += 1 260 if e["name"].endswith("onFailure"): 261 failure += 1 262 if e["name"].endswith("onSuccess"): 263 results = e["data"]["Results"] 264 for r in results: 265 total += 1 266 # Status needs to be "success". 267 status = r["status"] 268 if status != Rtt.STATUS_SUCCESS: 269 self.log.warning("Got error status %d." % status) 270 invalid += 1 271 continue 272 # RTT value should be positive. 273 value = r["rtt"] 274 if value <= 0: 275 self.log.warning("Got error RTT value %d." % value) 276 invalid += 1 277 continue 278 # Vadlidate values in successful responses. 279 acd = self.actual_distance 280 margin = r[RttParam.margin] 281 # If the distance is >= 0, check distance only. 282 d = r["distance"] / 100.0 283 if d > 0: 284 # Distance should be in acceptable range. 285 is_d_valid = (acd - margin) <= d <= acd + (margin) 286 if not is_d_valid: 287 self.log.warning( 288 ("Reported distance %.2fm is out of the" 289 " acceptable range %.2f±%.2fm.") % (d, acd, 290 margin)) 291 out_of_range += 1 292 continue 293 # Check if the RTT value is in range. 294 d = (value / 2) / 1E10 * wutils.SPEED_OF_LIGHT 295 is_rtt_valid = (acd - margin) <= d <= (acd + margin) 296 if not is_rtt_valid: 297 self.log.warning(( 298 "Distance calculated from RTT value %d - %.2fm is " 299 "out of the acceptable range %.2f±%dm.") % 300 (value, d, acd, margin)) 301 out_of_range += 1 302 continue 303 # Check if the RSSI value is in range. 304 rssi = r["rssi"] 305 # average rssi in 0.5dB steps, e.g. 143 implies -71.5dB, 306 # so the valid range is 0 to 200 307 is_rssi_valid = 0 <= rssi <= 200 308 if not is_rssi_valid: 309 self.log.warning(("Reported RSSI %d is out of the" 310 " acceptable range 0-200") % rssi) 311 out_of_range += 1 312 continue 313 self.log.info(( 314 "Processed %d RTT events. %d aborted, %s failed. Among" 315 " the %d responses in successful callbacks, %s are invalid, %s has" 316 " RTT values that are out of range.") % 317 (len(events), aborted, failure, total, invalid, 318 out_of_range)) 319 asserts.assert_true(total > 0, "No RTT response received.") 320 # Percentage of responses that are valid should be >= 90%. 321 valid_total = float(total - invalid) 322 valid_response_rate = valid_total / total 323 self.log.info("%.2f%% of the responses are valid." % 324 (valid_response_rate * 100)) 325 asserts.assert_true(valid_response_rate >= 0.9, 326 "Valid response rate is below 90%%.") 327 # Among the valid responses, the percentage of having an in-range RTT 328 # value should be >= 67%. 329 valid_value_rate = (total - invalid - out_of_range) / valid_total 330 self.log.info("%.2f%% of valid responses have in-range RTT value" % 331 (valid_value_rate * 100)) 332 msg = "In-range response rate is below 67%%." 333 asserts.assert_true(valid_value_rate >= 0.67, msg) 334 335 def scan_then_rtt_ranging_stress_logic(self, scan_func): 336 """Test logic to scan then do rtt ranging based on the scan results. 337 338 Steps: 339 1. Start scan and get scan results. 340 2. Filter out the networks that support rtt in scan results. 341 3. Start rtt ranging against those networks that support rtt. 342 4. Repeat 343 5. Process RTT events. 344 345 Args: 346 scan_func: A function that does a wifi scan and only returns the 347 networks that support rtt in the scan results. 348 349 Returns: 350 True if rtt behaves as expected, False otherwise. 351 """ 352 total = self.stress_num 353 failed = 0 354 all_results = [] 355 for i in range(total): 356 self.log.info("Iteration %d" % i) 357 rtt_networks = scan_func() 358 if not rtt_networks: 359 self.log.warning("Found no rtt network, skip this iteration.") 360 failed += 1 361 continue 362 self.log.debug("Found rtt networks:%s" % rtt_networks) 363 rtt_params = [] 364 for rn in rtt_networks: 365 rtt_params.append(self.rtt_config_from_scan_result(rn)) 366 results = self.get_rtt_results(rtt_params) 367 if results: 368 self.log.debug(results) 369 all_results += results 370 self.process_rtt_events(all_results) 371 372 def rtt_config_from_scan_result(self, scan_result): 373 """Creates an Rtt configuration based on the scan result of a network. 374 """ 375 scan_result_channel_width_to_rtt = { 376 ScanResult.CHANNEL_WIDTH_20MHZ: RttBW.BW_20_SUPPORT, 377 ScanResult.CHANNEL_WIDTH_40MHZ: RttBW.BW_40_SUPPORT, 378 ScanResult.CHANNEL_WIDTH_80MHZ: RttBW.BW_80_SUPPORT, 379 ScanResult.CHANNEL_WIDTH_160MHZ: RttBW.BW_160_SUPPORT, 380 ScanResult.CHANNEL_WIDTH_80MHZ_PLUS_MHZ: RttBW.BW_160_SUPPORT 381 } 382 p = {} 383 freq = scan_result[RttParam.frequency] 384 p[RttParam.frequency] = freq 385 p[RttParam.BSSID] = scan_result[WifiEnums.BSSID_KEY] 386 if freq > 5000: 387 p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT 388 else: 389 p[RttParam.preamble] = RttPreamble.PREAMBLE_HT 390 cf0 = scan_result[RttParam.center_freq0] 391 if cf0 > 0: 392 p[RttParam.center_freq0] = cf0 393 cf1 = scan_result[RttParam.center_freq1] 394 if cf1 > 0: 395 p[RttParam.center_freq1] = cf1 396 cw = scan_result["channelWidth"] 397 p[RttParam.channel_width] = cw 398 p[RttParam.bandwidth] = scan_result_channel_width_to_rtt[cw] 399 if scan_result["is80211McRTTResponder"]: 400 p[RttParam.request_type] = RttType.TYPE_TWO_SIDED 401 else: 402 p[RttParam.request_type] = RttType.TYPE_ONE_SIDED 403 return p 404 405 """Tests""" 406 407 def test_invalid_params(self): 408 """Tests the sanity check function in RttManager. 409 """ 410 param_list = [{ 411 RttParam.device_type: 3 412 }, { 413 RttParam.device_type: 1, 414 RttParam.request_type: 3 415 }, { 416 RttParam.device_type: 1, 417 RttParam.request_type: 1, 418 RttParam.BSSID: None 419 }, { 420 RttParam.BSSID: "xxxxxxxx", 421 RttParam.number_burst: 1 422 }, { 423 RttParam.number_burst: 0, 424 RttParam.num_samples_per_burst: -1 425 }, { 426 RttParam.num_samples_per_burst: 32 427 }, { 428 RttParam.num_samples_per_burst: 5, 429 RttParam.num_retries_per_measurement_frame: -1 430 }, { 431 RttParam.num_retries_per_measurement_frame: 4 432 }, { 433 RttParam.num_retries_per_measurement_frame: 2, 434 RttParam.num_retries_per_FTMR: -1 435 }, { 436 RttParam.num_retries_per_FTMR: 4 437 }] 438 for param in param_list: 439 self.invalid_params_logic(param) 440 return True 441 442 def test_support_check(self): 443 """No device supports device-to-device RTT; only shamu and volantis 444 devices support device-to-ap RTT. 445 """ 446 model = acts.utils.trim_model_name(self.dut.model) 447 asserts.assert_true(self.dut.droid.wifiIsDeviceToDeviceRttSupported(), 448 "Device to device is supposed to be supported.") 449 if any([model in m for m in self.support_models]): 450 asserts.assert_true(self.dut.droid.wifiIsDeviceToApRttSupported(), 451 "%s should support device-to-ap RTT." % model) 452 self.log.info("%s supports device-to-ap RTT as expected." % model) 453 else: 454 asserts.assert_false( 455 self.dut.droid.wifiIsDeviceToApRttSupported(), 456 "%s should not support device-to-ap RTT." % model) 457 self.log.info( 458 ("%s does not support device-to-ap RTT as expected.") % model) 459 asserts.abort_class( 460 "Device %s does not support RTT, abort." % model) 461 return True 462 463 def test_capability_check(self): 464 """Checks the capabilities params are reported as expected. 465 """ 466 caps = self.dut.droid.wifiRttGetCapabilities() 467 asserts.assert_true(caps, "Unable to get rtt capabilities.") 468 self.log.debug("Got rtt capabilities %s" % caps) 469 model = acts.utils.trim_model_name(self.dut.model) 470 asserts.assert_true(model in self.rtt_cap_table, 471 "Unknown model %s" % model) 472 expected_caps = self.rtt_cap_table[model] 473 for k, v in expected_caps.items(): 474 asserts.assert_true(k in caps, "%s missing in capabilities." % k) 475 asserts.assert_true(v == caps[k], "Expected %s for %s, got %s." % 476 (v, k, caps[k])) 477 return True 478 479 def test_discovery(self): 480 """Make sure all the expected 11mc BSSIDs are discovered properly, and 481 they are all reported as 802.11mc Rtt Responder. 482 483 Procedures: 484 1. Scan for wifi networks. 485 486 Expect: 487 All the RTT networks show up in scan results and their 488 "is80211McRTTResponder" is True. 489 All the non-RTT networks show up in scan results and their 490 "is80211McRTTResponder" is False. 491 """ 492 wutils.start_wifi_connection_scan(self.dut) 493 scan_results = self.dut.droid.wifiGetScanResults() 494 self.log.debug(scan_results) 495 for n in visible_networks: 496 asserts.assert_true( 497 wutils.match_networks(n, scan_results), 498 "Network %s was not discovered properly." % n) 499 return True 500 501 def test_missing_bssid(self): 502 """Start Rtt ranging with a config that does not have BSSID set. 503 Should not get onSuccess. 504 """ 505 p = {} 506 p[RttParam.request_type] = RttType.TYPE_TWO_SIDED 507 p[RttParam.device_type] = RttPeerType.PEER_TYPE_AP 508 p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT 509 p[RttParam.bandwidth] = RttBW.BW_80_SUPPORT 510 p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key] 511 p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0] 512 results = self.get_rtt_results([p]) 513 asserts.assert_true(results, "Did not get any result.") 514 self.log.info(pprint.pformat(results)) 515 516 def test_rtt_ranging_single_AP_stress(self): 517 """Stress test for Rtt against one AP. 518 519 Steps: 520 1. Do RTT ranging against the self.vht80_5g BSSID. 521 2. Repeat self.stress_num times. 522 3. Verify RTT results. 523 """ 524 p = {} 525 p[RttParam.request_type] = RttType.TYPE_TWO_SIDED 526 p[RttParam.device_type] = RttPeerType.PEER_TYPE_AP 527 p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT 528 p[RttParam.bandwidth] = RttBW.BW_80_SUPPORT 529 p[RttParam.BSSID] = self.vht80_5g[WifiEnums.BSSID_KEY] 530 p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key] 531 p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0] 532 p[RttParam.channel_width] = ScanResult.CHANNEL_WIDTH_80MHZ 533 all_results = [] 534 for i in range(self.stress_num): 535 self.log.info("RTT Ranging iteration %d" % (i + 1)) 536 results = self.get_rtt_results([p]) 537 if results: 538 all_results += results 539 else: 540 self.log.warning("Did not get result for iteration %d." % i) 541 frate = self.process_rtt_events(all_results) 542 543 def test_regular_scan_then_rtt_ranging_stress(self): 544 """Stress test for regular scan then start rtt ranging against the RTT 545 compatible networks found by the scan. 546 547 Steps: 548 1. Start a WiFi connection scan. 549 2. Get scan results. 550 3. Find all the 11mc capable BSSIDs and choose the ones to use 551 (self.network_selector) 552 4. Do RTT ranging against the selected BSSIDs, with the info from 553 the scan results. 554 5. Repeat self.stress_num times. 555 6. Verify RTT results. 556 """ 557 scan_func = self.regular_scan_for_rtt_networks 558 self.scan_then_rtt_ranging_stress_logic(scan_func) 559 560 def test_gscan_then_rtt_ranging_stress(self): 561 """Stress test for gscan then start rtt ranging against the RTT 562 compatible networks found by the scan. 563 564 Steps: 565 1. Start a WifiScanner single shot scan on all channels. 566 2. Wait for full scan results of the expected 11mc capable BSSIDs. 567 3. Wait for single shot scan to finish on all channels. 568 4. Do RTT ranging against the selected BSSIDs, with the info from 569 the scan results. 570 5. Repeat self.stress_num times. 571 6. Verify RTT results. 572 """ 573 scan_func = self.gscan_for_rtt_networks 574 self.scan_then_rtt_ranging_stress_logic(scan_func) 575