1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17# TODO: In the future to decide whether to move it to a common directory rather
18# than the one specific to apollo.
19# TODO: The move is contingent on understanding the functions that should be
20# supported by the dut device (sec_device).
21
22"""A generic library with bluetooth related functions. The connection is assumed
23to be between and android phone with any dut (referred to as secondary device)
24device that supports the following calls:
25        sec_device.turn_on_bluetooth()
26        sec_device.is_bt_enabled():
27        sec_device.bluetooth_address
28        sec_device.set_pairing_mode()
29        sec_device.factory_reset()
30
31"""
32import queue
33import time
34from logging import Logger
35
36from acts import asserts
37from acts.controllers.buds_lib import tako_trace_logger
38from acts.utils import TimeoutError
39from acts.utils import wait_until
40
41# Add connection profile for future devices in this dictionary
42WEARABLE_BT_PROTOCOLS = {
43    'rio': {
44        'Comp. App': 'FALSE',
45        'HFP (pri.)': 'FALSE',
46        'HFP (sec.)': 'FALSE',
47        'A2DP (pri.)': 'FALSE',
48        'A2DP (sec.)': 'FALSE',
49    },
50    'apollo': {
51        'Comp': 'FALSE',
52        'HFP(pri.)': 'FALSE',
53        'HFP(sec.)': 'FALSE',
54        'A2DP(pri)': 'FALSE',
55        'A2DP(sec)': 'FALSE',
56    }
57}
58
59
60class BTUtilsError(Exception):
61    """Generic BTUtils error"""
62
63
64class BTUtils(object):
65    """A utility that provides access to bluetooth controls.
66
67    This class to be maintained as a generic class such that it is compatible
68    with any devices that pair with a phone.
69    """
70
71    def __init__(self):
72        self.default_timeout = 60
73        self.logger = tako_trace_logger.TakoTraceLogger(Logger(__file__))
74
75    def bt_pair_and_connect(self, pri_device, sec_device):
76        """Pair and connect a pri_device to a sec_device.
77
78        Args:
79        pri_device: an android device with sl4a installed.
80        sec_device: a wearable device.
81
82        Returns:
83        (Tuple)True if pair and connect successful. False Otherwise.
84        Time in ms to execute the flow.
85        """
86
87        pair_time = self.bt_pair(pri_device, sec_device)
88        connect_result, connect_time = self.bt_connect(pri_device, sec_device)
89        return connect_result, pair_time + connect_time
90
91    def bt_pair(self, pri_device, sec_device):
92        """Pair a pri_device to a sec_device.
93
94        Args:
95        pri_device: an android device with sl4a installed.
96        sec_device: a wearable device.
97
98        Returns:
99            (Tuple)True if pair successful. False Otherwise.
100            Time in ms to execute the flow.
101         """
102        start_time = time.time()
103        # Enable BT on the primary device if it's not currently ON.
104        if not pri_device.droid.bluetoothCheckState():
105            pri_device.droid.bluetoothToggleState(True)
106            try:
107                pri_device.ed.pop_event(event_name='BluetoothStateChangedOn',
108                                        timeout=10)
109            except queue.Empty:
110                raise BTUtilsError(
111                    'Failed to toggle Bluetooth on the primary device.')
112        sec_device.turn_on_bluetooth()
113        if not sec_device.is_bt_enabled():
114            raise BTUtilsError('Could not turn on Bluetooth on secondary '
115                               'devices')
116        target_addr = sec_device.bluetooth_address
117        sec_device.set_pairing_mode()
118
119        pri_device.droid.bluetoothDiscoverAndBond(target_addr)
120        # Loop until we have bonded successfully or timeout.
121        self.logger.info('Verifying devices are bonded')
122        try:
123            wait_until(lambda: self.android_device_in_paired_state(pri_device,
124                                                                   target_addr),
125                       self.default_timeout)
126        except TimeoutError as err:
127            raise BTUtilsError('bt_pair failed: {}'.format(err))
128        end_time = time.time()
129        return end_time - start_time
130
131    def bt_connect(self, pri_device, sec_device):
132        """Connect a previously paired sec_device to a pri_device.
133
134        Args:
135          pri_device: an android device with sl4a installed.
136          sec_device: a wearable device.
137
138        Returns:
139          (Tuple)True if connect successful. False otherwise.
140          Time in ms to execute the flow.
141        """
142        start_time = end_time = time.time()
143        target_addr = sec_device.bluetooth_address
144        # First check that devices are bonded.
145        paired = False
146        for paired_device in pri_device.droid.bluetoothGetBondedDevices():
147            if paired_device['address'] == target_addr:
148                paired = True
149                break
150        if not paired:
151            self.logger.error('Not paired to %s', sec_device.device_name)
152            return False, 0
153
154        self.logger.info('Attempting to connect.')
155        pri_device.droid.bluetoothConnectBonded(target_addr)
156
157        self.logger.info('Verifying devices are connected')
158        wait_until(
159            lambda: self.android_device_in_connected_state(pri_device,
160                                                           target_addr),
161            self.default_timeout)
162        end_time = time.time()
163        return True, end_time - start_time
164
165    def android_device_in_paired_state(self, device, mac_address):
166        """Check device in paired list."""
167        bonded_devices = device.droid.bluetoothGetBondedDevices()
168        for d in bonded_devices:
169            if d['address'] == mac_address:
170                self.logger.info('Successfully bonded to device')
171                return True
172        return False
173
174    def android_device_in_connected_state(self, device, mac_address):
175        """Check device in connected list."""
176        connected_devices = device.droid.bluetoothGetConnectedDevices()
177        for d in connected_devices:
178            if d['address'] == mac_address:
179                self.logger.info('Successfully connected to device')
180                return True
181        return False
182
183    def bt_unpair(self, pri_device, sec_device, factory_reset_dut=True):
184        """Unpairs two Android devices using bluetooth.
185
186        Args:
187          pri_device: an android device with sl4a installed.
188          sec_device: a wearable device.
189
190        Returns:
191          (Tuple)True: if the devices successfully unpaired.
192          Time in ms to execute the flow.
193        Raises:
194          Error: When devices fail to unpair.
195        """
196        target_address = sec_device.bluetooth_address
197        if not self.android_device_in_paired_state(pri_device, target_address):
198            self.logger.debug('Already unpaired.')
199            return True, 0
200        self.logger.debug('Unpairing from %s' % target_address)
201        start_time = end_time = time.time()
202        asserts.assert_true(
203            pri_device.droid.bluetoothUnbond(target_address),
204            'Failed to request device unpairing.')
205
206        # Check that devices have unpaired successfully.
207        self.logger.debug('Verifying devices are unpaired')
208
209        # Loop until we have unbonded successfully or timeout.
210        wait_until(
211            lambda: self.android_device_in_paired_state(pri_device,
212                                                        target_address),
213            self.default_timeout,
214            condition=False)
215
216        self.logger.info('Successfully unpaired from %s' % target_address)
217        if factory_reset_dut:
218            self.logger.info('Factory reset DUT')
219            sec_device.factory_reset()
220        end_time = time.time()
221        return True, end_time - start_time
222
223    def check_device_bt(self, device, **kwargs):
224        """Check the Bluetooth connection status from device.
225
226        Args:
227          device: a wearable device.
228          **kwargs: additional parameters
229
230        Returns:
231          True: if bt status check success, False otherwise.
232        """
233        if device.dut_type in ['rio', 'apollo']:
234            profiles = kwargs.get('profiles')
235            return self.check_dut_status(device, profiles)
236
237    def check_dut_status(self, device, profiles=None):
238        """Check the Bluetooth connection status from rio/apollo device.
239
240        Args:
241          device: rio/apollo device
242          profiles: A dict of profiles, eg. {'HFP (pri.)': 'TRUE', 'Comp. App':
243            'TRUE', 'A2DP (pri.)': 'TRUE'}
244
245        Returns:
246          True: if bt status check success, False otherwise.
247        """
248        expected = WEARABLE_BT_PROTOCOLS
249        self.logger.info(profiles)
250        for key in profiles:
251            expected[device.dut_type][key] = profiles[key]
252        try:
253            wait_until(lambda: self._compare_profile(device,
254                                                     expected[device.dut_type]),
255                       self.default_timeout)
256        except TimeoutError:
257            status = device.get_bt_status()
258            msg_fmt = self._get_formatted_output(expected[device.dut_type],
259                                                 status)
260            self.logger.error(msg_fmt)
261            return False
262        return True
263
264    def _get_formatted_output(self, expected, actual):
265        """On BT status mismatch generate formatted output string.
266
267        Args:
268          expected: Expected BT status hash.
269          actual: Actual BT status hash from Rio.
270
271        Returns:
272          Formatted mismatch string.
273
274        Raises:
275          Error: When unexpcted parameter encounterd.
276        """
277        msg = ''
278        mismatch_format = '{}: Expected {} Actual {}. '
279        if actual is None:
280            raise BTUtilsError('None is not expected.')
281        for key in expected.keys():
282            if expected[key] != actual[key]:
283                msg += mismatch_format.format(key, expected[key], actual[key])
284        return msg
285
286    def _compare_profile(self, device, expected):
287        """Compare input expected profile with actual."""
288        actual = device.get_bt_status()
289        if actual is None:
290            raise BTUtilsError('None is not expected.')
291        for key in expected.keys():
292            if expected[key] != actual[key]:
293                return False
294        return True
295