1#!/usr/bin/env python3.4
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import re
18import sys
19import random
20import time
21
22import acts.controllers.packet_capture as packet_capture
23import acts.signals as signals
24import acts.test_utils.wifi.rpm_controller_utils as rutils
25import acts.test_utils.wifi.wifi_datastore_utils as dutils
26import acts.test_utils.wifi.wifi_test_utils as wutils
27
28from acts import asserts
29from acts.base_test import BaseTestClass
30from acts.controllers.ap_lib import hostapd_constants
31from acts.test_decorators import test_tracker_info
32from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
33
34WifiEnums = wutils.WifiEnums
35
36WAIT_BEFORE_CONNECTION = 1
37SINGLE_BAND = 1
38DUAL_BAND = 2
39
40TIMEOUT = 60
41TEST = 'test_'
42PING_ADDR = 'www.google.com'
43
44NUM_LINK_PROBES = 3
45PROBE_DELAY_SEC = 1
46
47
48class WifiChaosTest(WifiBaseTest):
49    """ Tests for wifi IOT
50
51        Test Bed Requirement:
52          * One Android device
53          * Wi-Fi IOT networks visible to the device
54    """
55
56    def __init__(self, configs):
57        BaseTestClass.__init__(self, configs)
58        self.generate_interop_tests()
59
60    def randomize_testcases(self):
61        """Randomize the list of hosts and build a random order of tests,
62           based on SSIDs, keeping all the relevant bands of an AP together.
63
64        """
65        temp_tests = list()
66        hosts = self.user_params['interop_host']
67
68        random.shuffle(hosts)
69
70        for host in hosts:
71            ssid_2g = None
72            ssid_5g = None
73            info = dutils.show_device(host)
74
75            # Based on the information in datastore, add to test list if
76            # AP has 2G band.
77            if 'ssid_2g' in info:
78                ssid_2g = info['ssid_2g']
79                temp_tests.append(TEST + ssid_2g)
80
81            # Based on the information in datastore, add to test list if
82            # AP has 5G band.
83            if 'ssid_5g' in info:
84                ssid_5g = info['ssid_5g']
85                temp_tests.append(TEST + ssid_5g)
86
87        self.tests = temp_tests
88
89    def generate_interop_testcase(self, base_test, testcase_name, ssid_dict):
90        """Generates a single test case from the given data.
91
92        Args:
93            base_test: The base test case function to run.
94            testcase_name: The name of the test case.
95            ssid_dict: The information about the network under test.
96        """
97        ssid = testcase_name
98        test_tracker_uuid = ssid_dict[testcase_name]['uuid']
99        hostname = ssid_dict[testcase_name]['host']
100        if not testcase_name.startswith('test_'):
101            testcase_name = 'test_%s' % testcase_name
102        test_case = test_tracker_info(uuid=test_tracker_uuid)(
103            lambda: base_test(ssid, hostname))
104        setattr(self, testcase_name, test_case)
105        self.tests.append(testcase_name)
106
107    def generate_interop_tests(self):
108        for ssid_dict in self.user_params['interop_ssid']:
109            testcase_name = list(ssid_dict)[0]
110            self.generate_interop_testcase(self.interop_base_test,
111                                           testcase_name, ssid_dict)
112        self.randomize_testcases()
113
114    def setup_class(self):
115        self.dut = self.android_devices[0]
116        self.admin = 'admin' + str(random.randint(1000001, 12345678))
117        wutils.wifi_test_device_init(self.dut)
118        # Set country code explicitly to "US".
119        wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
120
121        asserts.assert_true(
122            self.lock_pcap(),
123            "Could not lock a Packet Capture. Aborting Interop test.")
124
125        wutils.wifi_toggle_state(self.dut, True)
126
127    def lock_pcap(self):
128        """Lock a Packet Capturere to use for the test."""
129
130        # Get list of devices from the datastore.
131        locked_pcap = False
132        devices = dutils.get_devices()
133
134        for device in devices:
135
136            device_name = device['hostname']
137            device_type = device['ap_label']
138            if device_type == 'PCAP' and not device['lock_status']:
139                if dutils.lock_device(device_name, self.admin):
140                    self.pcap_host = device_name
141                    host = device['ip_address']
142                    self.log.info("Locked Packet Capture device: %s" % device_name)
143                    locked_pcap = True
144                    break
145                else:
146                    self.log.warning("Failed to lock %s PCAP." % device_name)
147
148        if not locked_pcap:
149            return False
150
151        pcap_config = {'ssh_config':{'user':'root'} }
152        pcap_config['ssh_config']['host'] = host
153
154        self.pcap = packet_capture.PacketCapture(pcap_config)
155        return True
156
157    def setup_test(self):
158        self.dut.droid.wakeLockAcquireBright()
159        self.dut.droid.wakeUpNow()
160
161    def on_pass(self, test_name, begin_time):
162        wutils.stop_pcap(self.pcap, self.pcap_procs, True)
163
164    def on_fail(self, test_name, begin_time):
165        # Sleep to ensure all failed packets are captured.
166        time.sleep(5)
167        wutils.stop_pcap(self.pcap, self.pcap_procs, False)
168        self.dut.take_bug_report(test_name, begin_time)
169        self.dut.cat_adb_log(test_name, begin_time)
170
171    def teardown_test(self):
172        self.dut.droid.wakeLockRelease()
173        self.dut.droid.goToSleepNow()
174
175    def teardown_class(self):
176        # Unlock the PCAP.
177        if not dutils.unlock_device(self.pcap_host):
178            self.log.warning("Failed to unlock %s PCAP. Check in datastore.")
179
180
181    """Helper Functions"""
182
183    def scan_and_connect_by_id(self, network, net_id):
184        """Scan for network and connect using network id.
185
186        Args:
187            net_id: Integer specifying the network id of the network.
188
189        """
190        ssid = network[WifiEnums.SSID_KEY]
191        wutils.start_wifi_connection_scan_and_ensure_network_found(self.dut,
192                                                                   ssid)
193        wutils.wifi_connect_by_id(self.dut, net_id)
194
195    def run_ping(self, sec):
196        """Run ping for given number of seconds.
197
198        Args:
199            sec: Time in seconds to run teh ping traffic.
200
201        """
202        self.log.info("Finding Gateway...")
203        route_response = self.dut.adb.shell("ip route get 8.8.8.8")
204        gateway_ip = re.search('via (.*) dev', str(route_response)).group(1)
205        self.log.info("Gateway IP = %s" % gateway_ip)
206        self.log.info("Running ping for %d seconds" % sec)
207        result = self.dut.adb.shell("ping -w %d %s" % (sec, gateway_ip),
208                                    timeout=sec + 1)
209        self.log.debug("Ping Result = %s" % result)
210        if "100% packet loss" in result:
211            raise signals.TestFailure("100% packet loss during ping")
212
213    def send_link_probes(self, network):
214        """
215        Send link probes, and verify that the device and AP did not crash.
216        Also verify that at least one link probe succeeded.
217
218        Steps:
219        1. Send a few link probes.
220        2. Ensure that the device and AP did not crash (by checking that the
221           device remains connected to the expected network).
222        """
223        results = wutils.send_link_probes(
224            self.dut, NUM_LINK_PROBES, PROBE_DELAY_SEC)
225
226        self.log.info("Link Probe results: %s" % (results,))
227
228        wifi_info = self.dut.droid.wifiGetConnectionInfo()
229        expected = network[WifiEnums.SSID_KEY]
230        actual = wifi_info[WifiEnums.SSID_KEY]
231        asserts.assert_equal(
232            expected, actual,
233            "Device did not remain connected after sending link probes!")
234
235    def unlock_and_turn_off_ap(self, hostname, rpm_port, rpm_ip):
236        """UNlock the AP in datastore and turn off the AP.
237
238        Args:
239            hostname: Hostname of the AP.
240            rpm_port: Port number on the RPM for the AP.
241            rpm_ip: RPM's IP address.
242
243        """
244        # Un-Lock AP in datastore.
245        self.log.debug("Un-lock AP in datastore")
246        if not dutils.unlock_device(hostname):
247            self.log.warning("Failed to unlock %s AP. Check AP in datastore.")
248        # Turn OFF AP from the RPM port.
249        rutils.turn_off_ap(rpm_port, rpm_ip)
250
251    def run_connect_disconnect(self, network, hostname, rpm_port, rpm_ip,
252                               release_ap):
253        """Run connect/disconnect to a given network in loop.
254
255           Args:
256               network: Dict, network information.
257               hostname: Hostanme of the AP to connect to.
258               rpm_port: Port number on the RPM for the AP.
259               rpm_ip: Port number on the RPM for the AP.
260               release_ap: Flag to determine if we should turn off the AP yet.
261
262           Raises: TestFailure if the network connection fails.
263
264        """
265        for attempt in range(5):
266            try:
267                begin_time = time.time()
268                ssid = network[WifiEnums.SSID_KEY]
269                net_id = self.dut.droid.wifiAddNetwork(network)
270                asserts.assert_true(net_id != -1, "Add network %s failed" % network)
271                self.log.info("Connecting to %s" % ssid)
272                self.scan_and_connect_by_id(network, net_id)
273                self.run_ping(10)
274                # TODO(b/133369482): uncomment once bug is resolved
275                # self.send_link_probes(network)
276                wutils.wifi_forget_network(self.dut, ssid)
277                time.sleep(WAIT_BEFORE_CONNECTION)
278            except Exception as e:
279                self.log.error("Connection to %s network failed on the %d "
280                               "attempt with exception %s." % (ssid, attempt, e))
281                # TODO:(bmahadev) Uncomment after scan issue is fixed.
282                # self.dut.take_bug_report(ssid, begin_time)
283                # self.dut.cat_adb_log(ssid, begin_time)
284                if release_ap:
285                    self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)
286                raise signals.TestFailure("Failed to connect to %s" % ssid)
287
288    def get_band_and_chan(self, ssid):
289        """Get the band and channel information from SSID.
290
291        Args:
292            ssid: SSID of the network.
293
294        """
295        ssid_info = ssid.split('_')
296        self.band = ssid_info[-1]
297        for item in ssid_info:
298            # Skip over the router model part.
299            if 'ch' in item and item != ssid_info[0]:
300                self.chan = re.search(r'(\d+)',item).group(0)
301                return
302        raise signals.TestFailure("Channel information not found in SSID.")
303
304    def interop_base_test(self, ssid, hostname):
305        """Base test for all the connect-disconnect interop tests.
306
307        Args:
308            ssid: string, SSID of the network to connect to.
309            hostname: string, hostname of the AP.
310
311        Steps:
312            1. Lock AP in datstore.
313            2. Turn on AP on the rpm switch.
314            3. Run connect-disconnect in loop.
315            4. Turn off AP on the rpm switch.
316            5. Unlock AP in datastore.
317
318        """
319        network = {}
320        network['password'] = 'password'
321        network['SSID'] = ssid
322        release_ap = False
323        wutils.reset_wifi(self.dut)
324
325        # Lock AP in datastore.
326        self.log.info("Lock AP in datastore")
327
328        ap_info = dutils.show_device(hostname)
329
330        # If AP is locked by a different test admin, then we skip.
331        if ap_info['lock_status'] and ap_info['locked_by'] != self.admin:
332            raise signals.TestSkip("AP %s is locked, skipping test" % hostname)
333
334        if not dutils.lock_device(hostname, self.admin):
335            self.log.warning("Failed to lock %s AP. Unlock AP in datastore"
336                             " and try again.")
337            raise signals.TestFailure("Failed to lock AP")
338
339        band = SINGLE_BAND
340        if ('ssid_2g' in ap_info) and ('ssid_5g' in ap_info):
341            band = DUAL_BAND
342        if (band == SINGLE_BAND) or (
343                band == DUAL_BAND and '5G' in ssid):
344            release_ap = True
345
346        # Get AP RPM attributes and Turn ON AP.
347        rpm_ip = ap_info['rpm_ip']
348        rpm_port = ap_info['rpm_port']
349
350        rutils.turn_on_ap(self.pcap, ssid, rpm_port, rpm_ip=rpm_ip)
351        self.log.info("Finished turning ON AP.")
352        # Experimental. Some APs take upto a min to come online.
353        time.sleep(60)
354
355        self.get_band_and_chan(ssid)
356        self.pcap.configure_monitor_mode(self.band, self.chan)
357        self.pcap_procs = wutils.start_pcap(
358                self.pcap, self.band.lower(), self.test_name)
359        self.run_connect_disconnect(network, hostname, rpm_port, rpm_ip,
360                                    release_ap)
361
362        # Un-lock only if it's a single band AP or we are running the last band.
363        if release_ap:
364            self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)
365