1#
2#   Copyright 2018 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16from acts import asserts
17from acts import base_test
18from acts.test_decorators import test_tracker_info
19from acts.test_utils.net.net_test_utils import start_tcpdump
20from acts.test_utils.net.net_test_utils import stop_tcpdump
21from acts.test_utils.wifi import wifi_test_utils as wutils
22
23
24from scapy.all import IP
25from scapy.all import TCP
26from scapy.all import UDP
27from scapy.all import Raw
28from scapy.all import rdpcap
29from scapy.all import Scapy_Exception
30
31
32class ProxyTest(base_test.BaseTestClass):
33    """ Network proxy tests """
34
35    def setup_class(self):
36        """ Setup devices for tests and unpack params """
37        self.dut = self.android_devices[0]
38        req_params = ("proxy_pac", "proxy_server",
39                      "proxy_port", "bypass_host", "non_bypass_host",
40                      "wifi_network")
41        self.unpack_userparams(req_param_names=req_params,)
42        wutils.wifi_test_device_init(self.dut)
43        wutils.wifi_toggle_state(self.dut, True)
44        wutils.start_wifi_connection_scan_and_ensure_network_found(
45            self.dut, self.wifi_network["SSID"])
46        wutils.wifi_connect(self.dut, self.wifi_network)
47        self.tcpdump_pid = None
48        self.proxy_port = int(self.proxy_port)
49
50    def teardown_test(self):
51        self.dut.droid.connectivityResetGlobalProxy()
52        global_proxy = self.dut.droid.connectivityGetGlobalProxy()
53        if global_proxy:
54            self.log.error("Failed to reset global proxy settings")
55
56    def teardown_class(self):
57        wutils.reset_wifi(self.dut)
58
59    def on_fail(self, test_name, begin_time):
60        self.dut.take_bug_report(test_name, begin_time)
61
62    """ Helper methods """
63
64    def _verify_http_request(self, ad):
65        """ Send http requests to hosts
66
67        Steps:
68            1. Send http requests to hosts
69                a. Host that is bypassed by proxy server
70                b. Host that goes through proxy server
71            2. Verify that both return valid responses
72
73        Args:
74            1. ad: dut to run http requests
75        """
76        for host in [self.bypass_host, self.non_bypass_host]:
77            host = "https://%s" % host
78            result = ad.droid.httpRequestString(host)
79            asserts.assert_true(result, "Http request failed for %s" % host)
80
81    def _verify_proxy_server(self, pcap_file, bypass_host, hostname):
82        """ Verify that http requests are going through proxy server
83
84        Args:
85            1. tcpdump: pcap file
86            2. bypass_host: boolean value if the request goes through proxy
87            3. hostname: hostname requested
88
89        Returns:
90            True/False if the bypass condition met
91        """
92        self.log.info("Checking proxy server for query to: %s" % hostname)
93        try:
94            packets = rdpcap(pcap_file)
95        except Scapy_Exception:
96            asserts.fail("Not a valid pcap file")
97
98        dns_query = False
99        http_query = False
100        for pkt in packets:
101            summary = "%s" % pkt.summary()
102            if UDP in pkt and pkt[UDP].dport == 53 and hostname in summary:
103                dns_query = True
104                break
105            if TCP in pkt and pkt[TCP].dport == self.proxy_port and Raw in pkt\
106                and hostname in str(pkt[Raw]):
107                  http_query = True
108
109        self.log.info("Bypass hostname set to: %s" % bypass_host)
110        self.log.info("Found DNS query for host: %s" % dns_query)
111        self.log.info("Found HTTP query for host: %s" % http_query)
112        if bypass_host and http_query and not dns_query or \
113            not bypass_host and not http_query and dns_query:
114              return False
115        return True
116
117    def _test_proxy(self):
118        """ Test pac piroxy and manual proxy settings
119
120        Steps:
121            1. Start tcpdump
122            2. Run http requests
123            3. Stop tcpdump
124            4. Verify the packets from tcpdump have valid queries
125        """
126
127        # start tcpdump on the device
128        self.tcpdump_pid = start_tcpdump(self.dut, self.test_name)
129
130        # verify http requests
131        self._verify_http_request(self.dut)
132
133        # stop tcpdump on the device
134        pcap_file = stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name)
135
136        # verify proxy server
137        result = self._verify_proxy_server(pcap_file, True, self.bypass_host)
138        asserts.assert_true(result, "Proxy failed for %s" % self.bypass_host)
139        result = self._verify_proxy_server(pcap_file, False, self.non_bypass_host)
140        asserts.assert_true(result, "Proxy failed for %s" % self.non_bypass_host)
141
142    """ Test Cases """
143
144    @test_tracker_info(uuid="16881315-1a50-48ce-bd36-7b0d2f21b734")
145    def test_pac_proxy_over_wifi(self):
146        """ Test proxy with auto config over wifi
147
148        Steps:
149            1. Connect to a wifi network
150            2. Set a global proxy with auto config
151            3. Do a http request on the hostnames
152            4. Verify that no DNS packets seen for non bypassed hostnames
153            5. Verify that DNS packets seen for bypassed hostnames
154        """
155        # set global pac proxy
156        self.log.info("Setting global proxy to: %s" % self.proxy_pac)
157        self.dut.droid.connectivitySetGlobalPacProxy(self.proxy_pac)
158        global_proxy = self.dut.droid.connectivityGetGlobalProxy()
159        asserts.assert_true(global_proxy['PacUrl'] == self.proxy_pac,
160                            "Failed to set pac proxy")
161
162        # test proxy
163        self._test_proxy()
164
165    @test_tracker_info(uuid="4d3361f6-866d-423c-9ed7-5a6943575fe9")
166    def test_manual_proxy_over_wifi(self):
167        """ Test manual proxy over wifi
168
169        Steps:
170            1. Connect to a wifi network
171            2. Set a global manual proxy with proxy server, port & bypass URLs
172            3. Do a http request on the hostnames
173            4. Verify that no DNS packets are seen for non bypassed hostnames
174            5. Verify that DNS packets seen for bypassed hostnames
175        """
176        # set global manual proxy
177        self.log.info("Setting global proxy to: %s %s %s" %
178                      (self.proxy_server, self.proxy_port, self.bypass_host))
179        self.dut.droid.connectivitySetGlobalProxy(self.proxy_server,
180                                                  self.proxy_port,
181                                                  self.bypass_host)
182        global_proxy = self.dut.droid.connectivityGetGlobalProxy()
183        asserts.assert_true(global_proxy['Hostname'] == self.proxy_server,
184                            "Failed to set manual proxy")
185
186        # test proxy
187        self._test_proxy()
188