1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16""" 17A comprehensive interface for performing test actions on an Apollo device. 18""" 19 20import time 21 22from acts.controllers.buds_lib.apollo_lib import DeviceError 23from acts.controllers.buds_lib.test_actions.agsa_acts import AgsaOTAError 24from acts.controllers.buds_lib.test_actions.base_test_actions import BaseTestAction 25from acts.controllers.buds_lib.test_actions.base_test_actions import timed_action 26from acts.controllers.buds_lib.test_actions.bt_utils import BTUtils 27from acts.libs.utils.timer import TimeRecorder 28from acts.test_utils.tel.tel_test_utils import initiate_call 29from acts.test_utils.tel.tel_test_utils import wait_for_droid_in_call 30from acts.utils import wait_until 31 32PACKAGE_NAME_AGSA = 'com.google.android.googlequicksearchbox' 33PACKAGE_NAME_GMS = 'com.google.android.gms' 34PACKAGE_NAME_NEARBY = 'com.google.android.gms.policy_nearby' 35PACKAGE_NAME_SETTINGS = 'com.android.settings' 36BISTO_MP_DETECT_HEADER = 'Pixel Buds' 37BISTO_MP_DEVICE_TEXT = 'Pixel Buds' 38BISTO_MP_DETECT_TEXT = BISTO_MP_DETECT_HEADER + BISTO_MP_DEVICE_TEXT 39BISTO_MP_CANCEL_TEXT = 'CANCEL' 40BISTO_MP_CONNECT_TEXT = 'TAP TO CONNECT' 41BISTO_MP_CONNECT_FAIL_TEXT = 'Can\'t connect to' 42BISTO_MP_CONNECT_RETRY_TEXT = 'TRY AGAIN' 43BISTO_MP_CONNECTED_TEXT = 'Now set up your Google Assistant' 44BISTO_MP_CONNECTED_EXIT_TEXT = 'NO THANKS' 45BISTO_MP_EXIT_PROMPT_TEXT = 'Exit setup?' 46BISTO_MP_EXIT_CONFIRM_TEXT = 'EXIT' 47PROFILES_CONNECTED = { 48 'HFP(pri.)': 'TRUE', 49 'A2DP(pri)': 'TRUE', 50} 51PROFILES_DISCONNECTED = { 52 'HFP(pri.)': 'FALSE', 53 'A2DP(pri)': 'FALSE', 54} 55COMP_PROFILE_CONNECTED = {'Comp': 'TRUE'} 56COMP_PROFILE_DISCONNECTED = {'Comp': 'FALSE'} 57AVRCPSTATUS = 'AvrcpPlayPause' 58DEFAULT_TIMEOUT = 60 # wait 60 seconds max for bond/connect. 59DEFAULT_CMD_INTERVAL = 0.5 # default interval between serial commands 60DEFAULT_CMD_RETRY = 5 # default retry times when a command failed. 61DEFAULT_BT_PROFILES = [ 62 'HFP Pri', 'HFP Sec', 'A2DP Pri', 'A2DP Sec', 'CTRL', 'AUDIO', 'DEBUG', 63 'TRANS' 64] 65DEFAULT_BT_STATUS = ['A2DP(pri)', 'HFP(pri.)', 'Comp'] 66 67 68class TestActsError(Exception): 69 """Exception from Apollo Acts Error.""" 70 71 72class ApolloTestActions(BaseTestAction): 73 """Test action class for all Apollo test actions.""" 74 75 def __init__(self, apollo_dev, logger=None): 76 """ 77 Args: 78 apollo_dev: apollo.lib.apollo_lib.Device the Apollo device 79 """ 80 super(ApolloTestActions, self).__init__(logger) 81 self.dut = apollo_dev 82 # need a embedded timer for connection time measurements. 83 self.measurement_timer = TimeRecorder() 84 85 def bluetooth_get_status(self): 86 status = self.dut.get_bt_status() 87 self.logger.info(status) 88 89 def wait_for_bluetooth_disconnection(self, timeout=60): 90 """ Set pairing mode and disconnect. 91 92 This action will wait until the apollo profiles are false. 93 94 Args: 95 timeout: integer, timeout value in seconds. 96 """ 97 result = True 98 apollo_status = self.dut.get_bt_status() 99 self.logger.info('Waiting for the disconnection.') 100 time.sleep(1) 101 ini_time = time.time() 102 while len(apollo_status) != len( 103 [s for s in apollo_status.values() if s == 'FALSE']): 104 apollo_status = self.dut.get_bt_status() 105 if (time.time() - ini_time) > timeout: 106 self.logger.warning('Timeout waiting for the disconnection.') 107 return False 108 time.sleep(1) 109 return result 110 111 def pair(self, phone, companion_app=True): 112 """Pairs phone with apollo and validates bluetooth profiles. 113 114 Args: 115 phone: android phone 116 apollo: apollo device 117 companion_app (optional): True if the phone has a companion app 118 installed. False otherwise. 119 120 Raises: 121 TestActsError: Bluetooth pairing failed/ Dut BT status check failed. 122 """ 123 bt_util = BTUtils() 124 target_addr = self.dut.bluetooth_address 125 if bt_util.android_device_in_connected_state(phone, target_addr): 126 self.logger.info('Already paired and connected, skipping pairing.') 127 else: 128 if bt_util.android_device_in_paired_state(phone, target_addr): 129 self.logger.info( 130 'Device is paired but not connected, unpair first.') 131 if not bt_util.bt_unpair(phone, self.dut): 132 raise TestActsError('Unable to unpair the device') 133 bt_util.bt_pair_and_connect(phone, self.dut) 134 self.logger.info('DEVICE PAIRED') 135 if companion_app: 136 profiles = PROFILES_CONNECTED.copy() 137 profiles.update(COMP_PROFILE_CONNECTED) 138 else: 139 profiles = PROFILES_CONNECTED 140 self.logger.info(profiles) 141 if not bt_util.check_device_bt(device=self.dut, profiles=profiles): 142 raise TestActsError('Dut BT status check failed.') 143 else: 144 return True 145 146 def unpair(self, phone, companion_app=True, factory_reset_dut=True): 147 """Unpairs phone from apollo and validates bluetooth profiles. 148 149 Args: 150 phone: android phone 151 apollo: apollo device 152 companion_app (optional): True if the phone has a companion app 153 installed. False otherwise. 154 155 Raises: 156 TestActsError: Bluetooth unpairing/Dut BT status check failed. 157 """ 158 bt_util = BTUtils() 159 target_addr = self.dut.bluetooth_address 160 if not bt_util.android_device_in_paired_state(phone, target_addr): 161 self.logger.info('Device is already unpaired, skipping unpairing.') 162 else: 163 result = bt_util.bt_unpair( 164 phone, self.dut, factory_reset_dut=factory_reset_dut) 165 if not result: 166 raise TestActsError('Bluetooth unpairing failed.') 167 if companion_app: 168 profiles = PROFILES_DISCONNECTED.copy() 169 profiles.update(COMP_PROFILE_DISCONNECTED) 170 else: 171 profiles = PROFILES_DISCONNECTED 172 if not bt_util.check_device_bt(device=self.dut, profiles=profiles): 173 raise TestActsError('Dut BT status check failed.') 174 else: 175 return True 176 177 def is_paired(self, phone): 178 """Check if the given apollo is paired with the android device. 179 180 Args: 181 phone: android phone 182 apollo: apollo device 183 184 Returns: 185 Bool: True if apollo is paired with the phone. 186 """ 187 bt_util = BTUtils() 188 target_addr = self.dut.bluetooth_address 189 return bt_util.android_device_in_paired_state(phone, target_addr) 190 191 def send_music_play_event_and_validate(self): 192 """Send the play event on Apollo and validate the response and DSP 193 Status. 194 195 Raises: 196 TestActsError: Error while playing the music. 197 """ 198 play_detection_timeout = 1 199 if self.dut.is_streaming(): 200 self.logger.info('Music already streaming. Skipping play event..') 201 return 202 self.logger.info('Playing video...') 203 is_played = self.dut.music_control_events( 204 AVRCPSTATUS, self.dut.apollo_log_regex.AVRCP_PLAY_REGEX) 205 if not is_played: 206 self.logger.error('AVRCP Played status not found') 207 raise TestActsError('AVRCP Played status not found.') 208 wait_until( 209 lambda: self.dut.is_streaming(), 210 play_detection_timeout, 211 sleep_s=0.25) 212 if not self.dut.is_streaming(): 213 self.logger.error('Device is NOT in a deviceA2DPStreaming state') 214 raise TestActsError( 215 'Device is NOT in a deviceA2DPStreaming state.') 216 217 def send_music_pause_event_and_validate(self): 218 """Send the pause event on Apollo and validate the responses and DSP 219 Status. 220 221 Raises: 222 TestActsError: Error while pausing the music. 223 """ 224 paused_detection_timeout = 10 225 if not self.dut.is_streaming(): 226 self.logger.info('Music not streaming. Skipping pause event..') 227 return 228 self.logger.info("Pausing video...") 229 is_paused = self.dut.music_control_events( 230 AVRCPSTATUS, self.dut.apollo_log_regex.AVRCP_PAUSE_REGEX) 231 if not is_paused: 232 self.logger.error('AVRCP Paused statue not found') 233 raise TestActsError('AVRCP Paused status not found.') 234 wait_until( 235 lambda: not self.dut.is_streaming(), 236 paused_detection_timeout, 237 sleep_s=0.25) 238 if self.dut.is_streaming(): 239 self.logger.error('Device is still in deviceA2DPStreaming state') 240 raise TestActsError( 241 'Device is still in deviceA2DPStreaming state.') 242 243 def vol_down_and_validate(self): 244 """Send volume down twice and validate by comparing two levels 245 246 Raises: 247 TestActsError: Error 248 """ 249 self.logger.info('Decreasing volume') 250 before_vol = self.dut.volume('Down', 1) 251 time.sleep(2) 252 after_vol = self.dut.volume('Down', 1) 253 if not after_vol or not before_vol or after_vol >= before_vol: 254 self.logger.error( 255 'Unable to decrease the volume. Before: %s. After: %s' % 256 (before_vol, after_vol)) 257 raise TestActsError('error decreasing volume') 258 259 def vol_up_and_validate(self): 260 """Send volume up twice and validate by comparing two levels 261 262 Raises: 263 TestActsError: Error 264 """ 265 self.logger.info('Increasing volume') 266 before_vol = self.dut.volume('Up', 1) 267 time.sleep(2) 268 after_vol = self.dut.volume('Up', 1) 269 if not after_vol or not before_vol or after_vol <= before_vol: 270 self.logger.error( 271 'Unable to increase the volume. Before: %s. After: %s' % 272 (before_vol, after_vol)) 273 raise TestActsError('error increasing volume') 274 275 def call_and_validate_ringing(self, 276 calling_phone, 277 number_to_call, 278 call_retries=10): 279 for i in range(call_retries): 280 initiate_call(self.logger, calling_phone, number_to_call) 281 is_calling = wait_for_droid_in_call( 282 self.logger, calling_phone, max_time=10) 283 if is_calling: 284 self.logger.info('Call initiated!') 285 break 286 else: 287 self.logger.warning('Call is not initiating.') 288 if i == call_retries: 289 self.logger.error('Call initiation retries exhausted') 290 raise TestActsError( 291 '%s retries failed to initiate the call' % 292 (call_retries)) 293 self.logger.warning('Retrying call...') 294 # wait for offhook state and return 295 wait_until( 296 (lambda: calling_phone.droid.telecomGetCallState() == 'OFFHOOK'), 297 timeout_s=40, 298 condition=True, 299 sleep_s=.5) 300 self.logger.info('Phone call initiated on %s' % calling_phone.serial) 301 302 def answer_phone_and_validate_call_received(self, receiving_phone): 303 # wait until the phone rings (assumes that a call is initiated prior to 304 # running the command) 305 wait_until( 306 lambda: receiving_phone.droid.telecomGetCallState() == 'RINGING', 307 timeout_s=40, 308 condition=True, 309 sleep_s=.5) 310 self.logger.info('Ring detected on %s - now answering the call...' % 311 (receiving_phone.serial)) 312 # answer the phone call 313 self.dut.tap() 314 # wait until OFFHOOK state 315 wait_until( 316 lambda: receiving_phone.droid.telecomGetCallState() == 'OFFHOOK', 317 timeout_s=40, 318 condition=True, 319 sleep_s=.5) 320 321 def hangup_phone_and_validate_call_hung(self, receiving_phone): 322 # wait for phone to be in OFFHOOK state (assumed that a call is answered 323 # and engaged) 324 wait_until( 325 lambda: receiving_phone.droid.telecomGetCallState() == 'OFFHOOK', 326 timeout_s=40, 327 condition=True, 328 sleep_s=.5) 329 # end the call (post and pre 1663 have different way of ending call) 330 self.logger.info( 331 'Hanging up the call on %s...' % receiving_phone.serial) 332 if self.dut.version < 1663: 333 self.dut.tap() 334 else: 335 self.dut.hold(duration=100) 336 # wait for idle state 337 wait_until( 338 lambda: receiving_phone.droid.telecomGetCallState() == 'IDLE', 339 timeout_s=40, 340 condition=True, 341 sleep_s=.5) 342 343 @timed_action 344 def factory_reset(self): 345 ret = False 346 try: 347 self.dut.factory_reset() 348 ret = True 349 except DeviceError as ex: 350 self.logger.warning('Failed to reset Apollo: %s' % ex) 351 return ret 352 353 @timed_action 354 def wait_for_magic_pairing_notification(self, android_act, timeout=60): 355 dut_detected = False 356 start_time = time.time() 357 self.logger.info('Waiting for MP prompt: %s' % BISTO_MP_DEVICE_TEXT) 358 while not dut_detected: 359 android_act.dut.ui_util.uia.wait.update() 360 self.sleep(1) 361 if android_act.dut.ui_util.uia( 362 textContains=BISTO_MP_DETECT_HEADER, enabled=True).exists: 363 if android_act.dut.ui_util.uia( 364 textContains=BISTO_MP_DEVICE_TEXT, 365 enabled=True).exists: 366 self.logger.info('DUT Apollo MP prompt detected!') 367 dut_detected = True 368 else: 369 self.logger.info( 370 'NONE DUT Apollo MP prompt detected! Cancel and RETRY!' 371 ) 372 android_act.dut.ui_util.click_by_text(BISTO_MP_CANCEL_TEXT) 373 if time.time() - start_time > timeout: 374 break 375 if not dut_detected: 376 self.logger.info( 377 'Failed to get %s MP prompt' % BISTO_MP_DEVICE_TEXT) 378 return dut_detected 379 380 @timed_action 381 def start_magic_pairing(self, android_act, timeout=30, retries=3): 382 paired = False 383 android_act.dut.ui_util.click_by_text( 384 BISTO_MP_CONNECT_TEXT, timeout=timeout) 385 connect_start_time = time.time() 386 count = 0 387 timeout = 30 388 389 while not paired and count < retries: 390 android_act.dut.ui_util.uia.wait.update() 391 self.sleep(1) 392 if time.time() - connect_start_time > timeout: 393 self.logger.info('Time out! %s seconds' % time) 394 android_act.app_force_close_agsa() 395 self.logger.info('Timeout(s): %s' % timeout) 396 break 397 if android_act.dut.ui_util.uia( 398 textContains=BISTO_MP_CONNECT_FAIL_TEXT, 399 enabled=True).exists: 400 count += 1 401 self.logger.info('MP FAILED! Retry %s.' % count) 402 android_act.dut.ui_util.click_by_text( 403 BISTO_MP_CONNECT_RETRY_TEXT) 404 connect_start_time = time.time() 405 elif android_act.dut.ui_util.uia( 406 textContains=BISTO_MP_CONNECTED_TEXT, enabled=True).exists: 407 self.logger.info('MP SUCCESSFUL! Exiting AGSA...') 408 paired = True 409 android_act.dut.ui_util.click_by_text( 410 BISTO_MP_CONNECTED_EXIT_TEXT) 411 android_act.dut.ui_util.wait_for_text( 412 BISTO_MP_EXIT_PROMPT_TEXT) 413 android_act.dut.ui_util.click_by_text( 414 BISTO_MP_EXIT_CONFIRM_TEXT) 415 return paired 416 417 @timed_action 418 def turn_bluetooth_on(self): 419 self.dut.cmd('pow 1') 420 return True 421 422 @timed_action 423 def turn_bluetooth_off(self): 424 self.dut.cmd('pow 0') 425 return True 426 427 @timed_action 428 def wait_for_bluetooth_a2dp_hfp(self, 429 timeout=DEFAULT_TIMEOUT, 430 interval=DEFAULT_CMD_INTERVAL): 431 """Wait for BT connection by checking if A2DP and HFP connected. 432 433 This is used for BT pair+connect test. 434 435 Args: 436 timeout: float, timeout value in second. 437 interval: float, float, interval between polling BT profiles. 438 timer: TimeRecorder, time recorder to save the connection time. 439 """ 440 # Need to check these two profiles 441 pass_profiles = ['A2DP Pri', 'HFP Pri'] 442 # TODO(b/122730302): Change to just raise an error 443 ret = False 444 try: 445 ret = self._wait_for_bluetooth_profile_connection( 446 pass_profiles, timeout, interval, self.measurement_timer) 447 except DeviceError as ex: 448 self.logger.warning('Failed to wait for BT connection: %s' % ex) 449 return ret 450 451 def _wait_for_bluetooth_profile_connection(self, profiles_to_check, 452 timeout, interval, timer): 453 """A generic method to wait for specified BT profile connection. 454 455 Args: 456 profiles_to_check: list, profile names (A2DP, HFP, etc.) to be 457 checked. 458 timeout: float, timeout value in second. 459 interval: float, interval between polling BT profiles. 460 timer: TimeRecorder, time recorder to save the connection time. 461 462 Returns: 463 bool, True if checked profiles are connected, False otherwise. 464 """ 465 timer.start_timer(profiles_to_check, force=True) 466 start_time = time.time() 467 while time.time() - start_time < timeout: 468 profiles = self._bluetooth_check_profile_connection() 469 for profile in profiles: 470 if profiles[profile]: 471 timer.stop_timer(profile) 472 # now check if the specified profile connected. 473 all_connected = True 474 for profile in profiles_to_check: 475 if not profiles[profile]: 476 all_connected = False 477 break 478 if all_connected: 479 return True 480 time.sleep(interval) 481 # make sure the profile timer are stopped. 482 timer.stop_timer(profiles_to_check) 483 return False 484 485 def _bluetooth_check_profile_connection(self): 486 """Return profile connection in a boolean dict. 487 488 key=<profile name>, val = T/F 489 """ 490 profiles = dict() 491 output = self.dut.get_conn_devices() 492 # need to strip all whitespaces. 493 conn_devs = {} 494 495 for key in output: 496 conn_devs[key.strip()] = output[key].strip() 497 for key in conn_devs: 498 self.logger.info('%s:%s' % (key, conn_devs[key])) 499 if 'XXXXXXXX' in conn_devs[key]: 500 profiles[key] = conn_devs[key] 501 else: 502 profiles[key] = False 503 return profiles 504 505 @timed_action 506 def wait_for_bluetooth_status_connection_all( 507 self, timeout=DEFAULT_TIMEOUT, interval=DEFAULT_CMD_INTERVAL): 508 """Wait for BT connection by checking if A2DP, HFP and COMP connected. 509 510 This is used for BT reconnect test. 511 512 Args: 513 timeout: float, timeout value in second. 514 interval: float, float, interval between polling BT profiles. 515 """ 516 ret = False 517 self.measurement_timer.start_timer(DEFAULT_BT_STATUS, force=True) 518 # All profile not connected by default. 519 connected_status = {key: False for key in DEFAULT_BT_STATUS} 520 start_time = time.time() 521 while time.time() < start_time + timeout: 522 try: 523 time.sleep(interval) 524 status = self.dut.get_bt_status() 525 for key in DEFAULT_BT_STATUS: 526 if (not connected_status[key] and key in status 527 and 'TRUE' == status[key]): 528 self.measurement_timer.stop_timer(key) 529 connected_status[key] = True 530 self.logger.info( 531 'BT status %s connected at %fs.' % 532 (key, self.measurement_timer.elapsed(key))) 533 if False not in connected_status.values(): 534 ret = True 535 break 536 except DeviceError as ex: 537 self.logger.warning( 538 'Device exception when waiting for reconnection: %s' % ex) 539 self.measurement_timer.stop_timer(DEFAULT_BT_STATUS) 540 return ret 541 542 def initiate_ota_via_agsa_verify_transfer_completion_in_logcat( 543 self, 544 agsa_action, 545 dfu_path, 546 destination=None, 547 force=True, 548 apply_image=True, 549 reconnect=True): 550 """ 551 Starts an OTA by issuing an intent to AGSA after copying the dfu file to 552 the appropriate location on the phone 553 554 Args: 555 agsa_action: projects.agsa.lib.test_actions.agsa_acts 556 .AgsaTestActions 557 dfu_path: string - absolute path of dfu file 558 destination: string - absolute path of file on phone if not 559 specified will use 560 /storage/emulated/0/Android/data/com.google.android 561 .googlequicksearchbox/files/download_cache/apollo.dfu 562 force: value set in the intent sent to AGSA 563 True if success False otherwise 564 """ 565 try: 566 agsa_action.initiate_agsa_and_wait_until_transfer( 567 dfu_path, destination=destination, force=force) 568 if apply_image: 569 # set in case 570 self.dut.set_in_case(reconnect=reconnect) 571 except AgsaOTAError as ex: 572 self.logger.error('Failed to OTA via AGSA %s' % ex) 573 return False 574 except DeviceError as ex: 575 self.logger.error('Failed to bring up device %s' % ex) 576 return False 577 return True 578 579 @timed_action 580 def wait_for_bluetooth_a2dp_hfp_rfcomm_connect( 581 self, address, timeout=DEFAULT_TIMEOUT, 582 interval=DEFAULT_CMD_INTERVAL): 583 """Wait for BT reconnection by checking if A2DP, HFP and COMP connected 584 to the specified address. 585 586 This is used for BT connection switch test. 587 588 Args: 589 address: str, MAC of the address to connect. 590 timeout: float, timeout value in second. 591 interval: float, float, interval between polling BT profiles. 592 593 Returns: 594 True if the specified address is connected. False otherwise. 595 """ 596 last_4_hex = address.replace(':', '')[-4:].lower() 597 profiles_to_check = ['HFP Pri', 'A2DP Pri', 'CTRL', 'AUDIO'] 598 self.measurement_timer.start_timer(profiles_to_check, force=True) 599 end_time = time.time() + timeout 600 all_connected = True 601 while time.time() < end_time: 602 all_connected = True 603 profiles = self._bluetooth_check_profile_connection() 604 for profile in profiles_to_check: 605 if (profile in profiles and profiles[profile] 606 and last_4_hex in profiles[profile].lower()): 607 self.measurement_timer.stop_timer(profile) 608 else: 609 all_connected = False 610 if all_connected: 611 break 612 time.sleep(interval) 613 # make sure the profile timer are stopped. 614 self.measurement_timer.stop_timer(profiles_to_check) 615 616 return all_connected 617