1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""
17A comprehensive interface for performing test actions on an Apollo device.
18"""
19
20import time
21
22from acts.controllers.buds_lib.apollo_lib import DeviceError
23from acts.controllers.buds_lib.test_actions.agsa_acts import AgsaOTAError
24from acts.controllers.buds_lib.test_actions.base_test_actions import BaseTestAction
25from acts.controllers.buds_lib.test_actions.base_test_actions import timed_action
26from acts.controllers.buds_lib.test_actions.bt_utils import BTUtils
27from acts.libs.utils.timer import TimeRecorder
28from acts.test_utils.tel.tel_test_utils import initiate_call
29from acts.test_utils.tel.tel_test_utils import wait_for_droid_in_call
30from acts.utils import wait_until
31
32PACKAGE_NAME_AGSA = 'com.google.android.googlequicksearchbox'
33PACKAGE_NAME_GMS = 'com.google.android.gms'
34PACKAGE_NAME_NEARBY = 'com.google.android.gms.policy_nearby'
35PACKAGE_NAME_SETTINGS = 'com.android.settings'
36BISTO_MP_DETECT_HEADER = 'Pixel Buds'
37BISTO_MP_DEVICE_TEXT = 'Pixel Buds'
38BISTO_MP_DETECT_TEXT = BISTO_MP_DETECT_HEADER + BISTO_MP_DEVICE_TEXT
39BISTO_MP_CANCEL_TEXT = 'CANCEL'
40BISTO_MP_CONNECT_TEXT = 'TAP TO CONNECT'
41BISTO_MP_CONNECT_FAIL_TEXT = 'Can\'t connect to'
42BISTO_MP_CONNECT_RETRY_TEXT = 'TRY AGAIN'
43BISTO_MP_CONNECTED_TEXT = 'Now set up your Google Assistant'
44BISTO_MP_CONNECTED_EXIT_TEXT = 'NO THANKS'
45BISTO_MP_EXIT_PROMPT_TEXT = 'Exit setup?'
46BISTO_MP_EXIT_CONFIRM_TEXT = 'EXIT'
47PROFILES_CONNECTED = {
48    'HFP(pri.)': 'TRUE',
49    'A2DP(pri)': 'TRUE',
50}
51PROFILES_DISCONNECTED = {
52    'HFP(pri.)': 'FALSE',
53    'A2DP(pri)': 'FALSE',
54}
55COMP_PROFILE_CONNECTED = {'Comp': 'TRUE'}
56COMP_PROFILE_DISCONNECTED = {'Comp': 'FALSE'}
57AVRCPSTATUS = 'AvrcpPlayPause'
58DEFAULT_TIMEOUT = 60  # wait 60 seconds max for bond/connect.
59DEFAULT_CMD_INTERVAL = 0.5  # default interval between serial commands
60DEFAULT_CMD_RETRY = 5  # default retry times when a command failed.
61DEFAULT_BT_PROFILES = [
62    'HFP Pri', 'HFP Sec', 'A2DP Pri', 'A2DP Sec', 'CTRL', 'AUDIO', 'DEBUG',
63    'TRANS'
64]
65DEFAULT_BT_STATUS = ['A2DP(pri)', 'HFP(pri.)', 'Comp']
66
67
68class TestActsError(Exception):
69    """Exception from Apollo Acts Error."""
70
71
72class ApolloTestActions(BaseTestAction):
73    """Test action class for all Apollo test actions."""
74
75    def __init__(self, apollo_dev, logger=None):
76        """
77        Args:
78             apollo_dev: apollo.lib.apollo_lib.Device the Apollo device
79        """
80        super(ApolloTestActions, self).__init__(logger)
81        self.dut = apollo_dev
82        # need a embedded timer for connection time measurements.
83        self.measurement_timer = TimeRecorder()
84
85    def bluetooth_get_status(self):
86        status = self.dut.get_bt_status()
87        self.logger.info(status)
88
89    def wait_for_bluetooth_disconnection(self, timeout=60):
90        """ Set pairing mode and disconnect.
91
92        This action will wait until the apollo profiles are false.
93
94        Args:
95             timeout: integer, timeout value in seconds.
96        """
97        result = True
98        apollo_status = self.dut.get_bt_status()
99        self.logger.info('Waiting for the disconnection.')
100        time.sleep(1)
101        ini_time = time.time()
102        while len(apollo_status) != len(
103            [s for s in apollo_status.values() if s == 'FALSE']):
104            apollo_status = self.dut.get_bt_status()
105            if (time.time() - ini_time) > timeout:
106                self.logger.warning('Timeout waiting for the disconnection.')
107                return False
108            time.sleep(1)
109        return result
110
111    def pair(self, phone, companion_app=True):
112        """Pairs phone with apollo and validates bluetooth profiles.
113
114        Args:
115            phone: android phone
116            apollo: apollo device
117            companion_app (optional): True if the phone has a companion app
118                                      installed. False otherwise.
119
120        Raises:
121            TestActsError: Bluetooth pairing failed/ Dut BT status check failed.
122        """
123        bt_util = BTUtils()
124        target_addr = self.dut.bluetooth_address
125        if bt_util.android_device_in_connected_state(phone, target_addr):
126            self.logger.info('Already paired and connected, skipping pairing.')
127        else:
128            if bt_util.android_device_in_paired_state(phone, target_addr):
129                self.logger.info(
130                    'Device is paired but not connected, unpair first.')
131                if not bt_util.bt_unpair(phone, self.dut):
132                    raise TestActsError('Unable to unpair the device')
133            bt_util.bt_pair_and_connect(phone, self.dut)
134            self.logger.info('DEVICE PAIRED')
135            if companion_app:
136                profiles = PROFILES_CONNECTED.copy()
137                profiles.update(COMP_PROFILE_CONNECTED)
138            else:
139                profiles = PROFILES_CONNECTED
140            self.logger.info(profiles)
141            if not bt_util.check_device_bt(device=self.dut, profiles=profiles):
142                raise TestActsError('Dut BT status check failed.')
143            else:
144                return True
145
146    def unpair(self, phone, companion_app=True, factory_reset_dut=True):
147        """Unpairs phone from apollo and validates bluetooth profiles.
148
149        Args:
150            phone: android phone
151            apollo: apollo device
152            companion_app (optional): True if the phone has a companion app
153                                      installed. False otherwise.
154
155        Raises:
156            TestActsError: Bluetooth unpairing/Dut BT status check failed.
157        """
158        bt_util = BTUtils()
159        target_addr = self.dut.bluetooth_address
160        if not bt_util.android_device_in_paired_state(phone, target_addr):
161            self.logger.info('Device is already unpaired, skipping unpairing.')
162        else:
163            result = bt_util.bt_unpair(
164                phone, self.dut, factory_reset_dut=factory_reset_dut)
165            if not result:
166                raise TestActsError('Bluetooth unpairing failed.')
167            if companion_app:
168                profiles = PROFILES_DISCONNECTED.copy()
169                profiles.update(COMP_PROFILE_DISCONNECTED)
170            else:
171                profiles = PROFILES_DISCONNECTED
172            if not bt_util.check_device_bt(device=self.dut, profiles=profiles):
173                raise TestActsError('Dut BT status check failed.')
174            else:
175                return True
176
177    def is_paired(self, phone):
178        """Check if the given apollo is paired with the android device.
179
180        Args:
181            phone: android phone
182            apollo: apollo device
183
184        Returns:
185            Bool: True if apollo is paired with the phone.
186        """
187        bt_util = BTUtils()
188        target_addr = self.dut.bluetooth_address
189        return bt_util.android_device_in_paired_state(phone, target_addr)
190
191    def send_music_play_event_and_validate(self):
192        """Send the play event on Apollo and validate the response and DSP
193        Status.
194
195        Raises:
196            TestActsError: Error while playing the music.
197        """
198        play_detection_timeout = 1
199        if self.dut.is_streaming():
200            self.logger.info('Music already streaming. Skipping play event..')
201            return
202        self.logger.info('Playing video...')
203        is_played = self.dut.music_control_events(
204            AVRCPSTATUS, self.dut.apollo_log_regex.AVRCP_PLAY_REGEX)
205        if not is_played:
206            self.logger.error('AVRCP Played status not found')
207            raise TestActsError('AVRCP Played status not found.')
208        wait_until(
209            lambda: self.dut.is_streaming(),
210            play_detection_timeout,
211            sleep_s=0.25)
212        if not self.dut.is_streaming():
213            self.logger.error('Device is NOT in a deviceA2DPStreaming state')
214            raise TestActsError(
215                'Device is NOT in a deviceA2DPStreaming state.')
216
217    def send_music_pause_event_and_validate(self):
218        """Send the pause event on Apollo and validate the responses and DSP
219        Status.
220
221        Raises:
222            TestActsError: Error while pausing the music.
223        """
224        paused_detection_timeout = 10
225        if not self.dut.is_streaming():
226            self.logger.info('Music not streaming. Skipping pause event..')
227            return
228        self.logger.info("Pausing video...")
229        is_paused = self.dut.music_control_events(
230            AVRCPSTATUS, self.dut.apollo_log_regex.AVRCP_PAUSE_REGEX)
231        if not is_paused:
232            self.logger.error('AVRCP Paused statue not found')
233            raise TestActsError('AVRCP Paused status not found.')
234        wait_until(
235            lambda: not self.dut.is_streaming(),
236            paused_detection_timeout,
237            sleep_s=0.25)
238        if self.dut.is_streaming():
239            self.logger.error('Device is still in deviceA2DPStreaming state')
240            raise TestActsError(
241                'Device is still in deviceA2DPStreaming state.')
242
243    def vol_down_and_validate(self):
244        """Send volume down twice and validate by comparing two levels
245
246        Raises:
247            TestActsError: Error
248        """
249        self.logger.info('Decreasing volume')
250        before_vol = self.dut.volume('Down', 1)
251        time.sleep(2)
252        after_vol = self.dut.volume('Down', 1)
253        if not after_vol or not before_vol or after_vol >= before_vol:
254            self.logger.error(
255                'Unable to decrease the volume. Before: %s. After: %s' %
256                (before_vol, after_vol))
257            raise TestActsError('error decreasing volume')
258
259    def vol_up_and_validate(self):
260        """Send volume up twice and validate by comparing two levels
261
262        Raises:
263            TestActsError: Error
264        """
265        self.logger.info('Increasing volume')
266        before_vol = self.dut.volume('Up', 1)
267        time.sleep(2)
268        after_vol = self.dut.volume('Up', 1)
269        if not after_vol or not before_vol or after_vol <= before_vol:
270            self.logger.error(
271                'Unable to increase the volume. Before: %s. After: %s' %
272                (before_vol, after_vol))
273            raise TestActsError('error increasing volume')
274
275    def call_and_validate_ringing(self,
276                                  calling_phone,
277                                  number_to_call,
278                                  call_retries=10):
279        for i in range(call_retries):
280            initiate_call(self.logger, calling_phone, number_to_call)
281            is_calling = wait_for_droid_in_call(
282                self.logger, calling_phone, max_time=10)
283            if is_calling:
284                self.logger.info('Call initiated!')
285                break
286            else:
287                self.logger.warning('Call is not initiating.')
288                if i == call_retries:
289                    self.logger.error('Call initiation retries exhausted')
290                    raise TestActsError(
291                        '%s retries failed to initiate the call' %
292                        (call_retries))
293            self.logger.warning('Retrying call...')
294        # wait for offhook state and return
295        wait_until(
296            (lambda: calling_phone.droid.telecomGetCallState() == 'OFFHOOK'),
297            timeout_s=40,
298            condition=True,
299            sleep_s=.5)
300        self.logger.info('Phone call initiated on %s' % calling_phone.serial)
301
302    def answer_phone_and_validate_call_received(self, receiving_phone):
303        # wait until the phone rings (assumes that a call is initiated prior to
304        # running the command)
305        wait_until(
306            lambda: receiving_phone.droid.telecomGetCallState() == 'RINGING',
307            timeout_s=40,
308            condition=True,
309            sleep_s=.5)
310        self.logger.info('Ring detected on %s - now answering the call...' %
311                         (receiving_phone.serial))
312        # answer the phone call
313        self.dut.tap()
314        # wait until OFFHOOK state
315        wait_until(
316            lambda: receiving_phone.droid.telecomGetCallState() == 'OFFHOOK',
317            timeout_s=40,
318            condition=True,
319            sleep_s=.5)
320
321    def hangup_phone_and_validate_call_hung(self, receiving_phone):
322        # wait for phone to be in OFFHOOK state (assumed that a call is answered
323        # and engaged)
324        wait_until(
325            lambda: receiving_phone.droid.telecomGetCallState() == 'OFFHOOK',
326            timeout_s=40,
327            condition=True,
328            sleep_s=.5)
329        # end the call (post and pre 1663 have different way of ending call)
330        self.logger.info(
331            'Hanging up the call on %s...' % receiving_phone.serial)
332        if self.dut.version < 1663:
333            self.dut.tap()
334        else:
335            self.dut.hold(duration=100)
336        # wait for idle state
337        wait_until(
338            lambda: receiving_phone.droid.telecomGetCallState() == 'IDLE',
339            timeout_s=40,
340            condition=True,
341            sleep_s=.5)
342
343    @timed_action
344    def factory_reset(self):
345        ret = False
346        try:
347            self.dut.factory_reset()
348            ret = True
349        except DeviceError as ex:
350            self.logger.warning('Failed to reset Apollo: %s' % ex)
351        return ret
352
353    @timed_action
354    def wait_for_magic_pairing_notification(self, android_act, timeout=60):
355        dut_detected = False
356        start_time = time.time()
357        self.logger.info('Waiting for MP prompt: %s' % BISTO_MP_DEVICE_TEXT)
358        while not dut_detected:
359            android_act.dut.ui_util.uia.wait.update()
360            self.sleep(1)
361            if android_act.dut.ui_util.uia(
362                    textContains=BISTO_MP_DETECT_HEADER, enabled=True).exists:
363                if android_act.dut.ui_util.uia(
364                        textContains=BISTO_MP_DEVICE_TEXT,
365                        enabled=True).exists:
366                    self.logger.info('DUT Apollo MP prompt detected!')
367                    dut_detected = True
368                else:
369                    self.logger.info(
370                        'NONE DUT Apollo MP prompt detected! Cancel and RETRY!'
371                    )
372                    android_act.dut.ui_util.click_by_text(BISTO_MP_CANCEL_TEXT)
373            if time.time() - start_time > timeout:
374                break
375        if not dut_detected:
376            self.logger.info(
377                'Failed to get %s MP prompt' % BISTO_MP_DEVICE_TEXT)
378        return dut_detected
379
380    @timed_action
381    def start_magic_pairing(self, android_act, timeout=30, retries=3):
382        paired = False
383        android_act.dut.ui_util.click_by_text(
384            BISTO_MP_CONNECT_TEXT, timeout=timeout)
385        connect_start_time = time.time()
386        count = 0
387        timeout = 30
388
389        while not paired and count < retries:
390            android_act.dut.ui_util.uia.wait.update()
391            self.sleep(1)
392            if time.time() - connect_start_time > timeout:
393                self.logger.info('Time out! %s seconds' % time)
394                android_act.app_force_close_agsa()
395                self.logger.info('Timeout(s): %s' % timeout)
396                break
397            if android_act.dut.ui_util.uia(
398                    textContains=BISTO_MP_CONNECT_FAIL_TEXT,
399                    enabled=True).exists:
400                count += 1
401                self.logger.info('MP FAILED! Retry %s.' % count)
402                android_act.dut.ui_util.click_by_text(
403                    BISTO_MP_CONNECT_RETRY_TEXT)
404                connect_start_time = time.time()
405            elif android_act.dut.ui_util.uia(
406                    textContains=BISTO_MP_CONNECTED_TEXT, enabled=True).exists:
407                self.logger.info('MP SUCCESSFUL! Exiting AGSA...')
408                paired = True
409                android_act.dut.ui_util.click_by_text(
410                    BISTO_MP_CONNECTED_EXIT_TEXT)
411                android_act.dut.ui_util.wait_for_text(
412                    BISTO_MP_EXIT_PROMPT_TEXT)
413                android_act.dut.ui_util.click_by_text(
414                    BISTO_MP_EXIT_CONFIRM_TEXT)
415        return paired
416
417    @timed_action
418    def turn_bluetooth_on(self):
419        self.dut.cmd('pow 1')
420        return True
421
422    @timed_action
423    def turn_bluetooth_off(self):
424        self.dut.cmd('pow 0')
425        return True
426
427    @timed_action
428    def wait_for_bluetooth_a2dp_hfp(self,
429                                    timeout=DEFAULT_TIMEOUT,
430                                    interval=DEFAULT_CMD_INTERVAL):
431        """Wait for BT connection by checking if A2DP and HFP connected.
432
433        This is used for BT pair+connect test.
434
435        Args:
436            timeout: float, timeout value in second.
437            interval: float, float, interval between polling BT profiles.
438            timer: TimeRecorder, time recorder to save the connection time.
439        """
440        # Need to check these two profiles
441        pass_profiles = ['A2DP Pri', 'HFP Pri']
442        # TODO(b/122730302): Change to just raise an error
443        ret = False
444        try:
445            ret = self._wait_for_bluetooth_profile_connection(
446                pass_profiles, timeout, interval, self.measurement_timer)
447        except DeviceError as ex:
448            self.logger.warning('Failed to wait for BT connection: %s' % ex)
449        return ret
450
451    def _wait_for_bluetooth_profile_connection(self, profiles_to_check,
452                                               timeout, interval, timer):
453        """A generic method to wait for specified BT profile connection.
454
455        Args:
456            profiles_to_check: list, profile names (A2DP, HFP, etc.) to be
457                               checked.
458            timeout: float, timeout value in second.
459            interval: float, interval between polling BT profiles.
460            timer: TimeRecorder, time recorder to save the connection time.
461
462        Returns:
463            bool, True if checked profiles are connected, False otherwise.
464        """
465        timer.start_timer(profiles_to_check, force=True)
466        start_time = time.time()
467        while time.time() - start_time < timeout:
468            profiles = self._bluetooth_check_profile_connection()
469            for profile in profiles:
470                if profiles[profile]:
471                    timer.stop_timer(profile)
472            # now check if the specified profile connected.
473            all_connected = True
474            for profile in profiles_to_check:
475                if not profiles[profile]:
476                    all_connected = False
477                    break
478            if all_connected:
479                return True
480            time.sleep(interval)
481        # make sure the profile timer are stopped.
482        timer.stop_timer(profiles_to_check)
483        return False
484
485    def _bluetooth_check_profile_connection(self):
486        """Return profile connection in a boolean dict.
487
488        key=<profile name>, val = T/F
489        """
490        profiles = dict()
491        output = self.dut.get_conn_devices()
492        # need to strip all whitespaces.
493        conn_devs = {}
494
495        for key in output:
496            conn_devs[key.strip()] = output[key].strip()
497        for key in conn_devs:
498            self.logger.info('%s:%s' % (key, conn_devs[key]))
499            if 'XXXXXXXX' in conn_devs[key]:
500                profiles[key] = conn_devs[key]
501            else:
502                profiles[key] = False
503        return profiles
504
505    @timed_action
506    def wait_for_bluetooth_status_connection_all(
507            self, timeout=DEFAULT_TIMEOUT, interval=DEFAULT_CMD_INTERVAL):
508        """Wait for BT connection by checking if A2DP, HFP and COMP connected.
509
510        This is used for BT reconnect test.
511
512        Args:
513            timeout: float, timeout value in second.
514            interval: float, float, interval between polling BT profiles.
515        """
516        ret = False
517        self.measurement_timer.start_timer(DEFAULT_BT_STATUS, force=True)
518        # All profile not connected by default.
519        connected_status = {key: False for key in DEFAULT_BT_STATUS}
520        start_time = time.time()
521        while time.time() < start_time + timeout:
522            try:
523                time.sleep(interval)
524                status = self.dut.get_bt_status()
525                for key in DEFAULT_BT_STATUS:
526                    if (not connected_status[key] and key in status
527                            and 'TRUE' == status[key]):
528                        self.measurement_timer.stop_timer(key)
529                        connected_status[key] = True
530                        self.logger.info(
531                            'BT status %s connected at %fs.' %
532                            (key, self.measurement_timer.elapsed(key)))
533                if False not in connected_status.values():
534                    ret = True
535                    break
536            except DeviceError as ex:
537                self.logger.warning(
538                    'Device exception when waiting for reconnection: %s' % ex)
539        self.measurement_timer.stop_timer(DEFAULT_BT_STATUS)
540        return ret
541
542    def initiate_ota_via_agsa_verify_transfer_completion_in_logcat(
543            self,
544            agsa_action,
545            dfu_path,
546            destination=None,
547            force=True,
548            apply_image=True,
549            reconnect=True):
550        """
551        Starts an OTA by issuing an intent to AGSA after copying the dfu file to
552        the appropriate location on the phone
553
554        Args:
555            agsa_action: projects.agsa.lib.test_actions.agsa_acts
556                         .AgsaTestActions
557            dfu_path: string - absolute path of dfu file
558            destination: string - absolute path of file on phone if not
559                         specified will use
560                         /storage/emulated/0/Android/data/com.google.android
561                         .googlequicksearchbox/files/download_cache/apollo.dfu
562            force: value set in the intent sent to AGSA
563            True if success False otherwise
564        """
565        try:
566            agsa_action.initiate_agsa_and_wait_until_transfer(
567                dfu_path, destination=destination, force=force)
568            if apply_image:
569                # set in case
570                self.dut.set_in_case(reconnect=reconnect)
571        except AgsaOTAError as ex:
572            self.logger.error('Failed to OTA via AGSA %s' % ex)
573            return False
574        except DeviceError as ex:
575            self.logger.error('Failed to bring up device %s' % ex)
576            return False
577        return True
578
579    @timed_action
580    def wait_for_bluetooth_a2dp_hfp_rfcomm_connect(
581            self, address, timeout=DEFAULT_TIMEOUT,
582            interval=DEFAULT_CMD_INTERVAL):
583        """Wait for BT reconnection by checking if A2DP, HFP and COMP connected
584        to the specified address.
585
586        This is used for BT connection switch test.
587
588        Args:
589            address: str, MAC of the address to connect.
590            timeout: float, timeout value in second.
591            interval: float, float, interval between polling BT profiles.
592
593        Returns:
594            True if the specified address is connected. False otherwise.
595        """
596        last_4_hex = address.replace(':', '')[-4:].lower()
597        profiles_to_check = ['HFP Pri', 'A2DP Pri', 'CTRL', 'AUDIO']
598        self.measurement_timer.start_timer(profiles_to_check, force=True)
599        end_time = time.time() + timeout
600        all_connected = True
601        while time.time() < end_time:
602            all_connected = True
603            profiles = self._bluetooth_check_profile_connection()
604            for profile in profiles_to_check:
605                if (profile in profiles and profiles[profile]
606                        and last_4_hex in profiles[profile].lower()):
607                    self.measurement_timer.stop_timer(profile)
608                else:
609                    all_connected = False
610            if all_connected:
611                break
612            time.sleep(interval)
613        # make sure the profile timer are stopped.
614        self.measurement_timer.stop_timer(profiles_to_check)
615
616        return all_connected
617