1#!/usr/bin/python3.4 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from acts import asserts 18from acts.test_decorators import test_tracker_info 19from acts.test_utils.net import connectivity_const as cconsts 20from acts.test_utils.wifi.aware import aware_const as aconsts 21from acts.controllers.ap_lib.hostapd_constants import BAND_2G 22from acts.controllers.ap_lib.hostapd_constants import BAND_5G 23from acts.test_utils.wifi.aware import aware_test_utils as autils 24from acts.test_utils.wifi import wifi_test_utils as wutils 25from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest 26from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest 27from scapy.all import * 28 29 30class MacRandomNoLeakageTest(AwareBaseTest, WifiBaseTest): 31 """Set of tests for Wi-Fi Aware MAC address randomization of NMI (NAN 32 management interface) and NDI (NAN data interface).""" 33 34 SERVICE_NAME = "GoogleTestServiceXYZ" 35 ping_msg = 'PING' 36 37 AWARE_DEFAULT_CHANNEL_5_BAND = 149 38 AWARE_DEFAULT_CHANNEL_24_BAND = 6 39 40 ENCR_TYPE_OPEN = 0 41 ENCR_TYPE_PASSPHRASE = 1 42 ENCR_TYPE_PMK = 2 43 44 PASSPHRASE = "This is some random passphrase - very very secure!!" 45 PMK = "ODU0YjE3YzdmNDJiNWI4NTQ2NDJjNDI3M2VkZTQyZGU=" 46 47 def setup_class(self): 48 super().setup_class() 49 50 asserts.assert_true(hasattr(self, 'packet_capture'), 51 "Needs packet_capture attribute to support sniffing.") 52 self.configure_packet_capture(channel_5g=self.AWARE_DEFAULT_CHANNEL_5_BAND, 53 channel_2g=self.AWARE_DEFAULT_CHANNEL_24_BAND) 54 55 def setup_test(self): 56 WifiBaseTest.setup_test(self) 57 AwareBaseTest.setup_test(self) 58 59 def teardown_test(self): 60 WifiBaseTest.teardown_test(self) 61 AwareBaseTest.teardown_test(self) 62 63 def verify_mac_no_leakage(self, pcap_procs, factory_mac_addresses, mac_addresses): 64 # Get 2G and 5G pcaps 65 pcap_fname = '%s_%s.pcap' % (pcap_procs[BAND_5G][1], BAND_5G.upper()) 66 pcap_5g = rdpcap(pcap_fname) 67 68 pcap_fname = '%s_%s.pcap' % (pcap_procs[BAND_2G][1], BAND_2G.upper()) 69 pcap_2g = rdpcap(pcap_fname) 70 pcaps = pcap_5g + pcap_2g 71 72 # Verify factory MAC is not leaked in both 2G and 5G pcaps 73 ads = [self.android_devices[0], self.android_devices[1]] 74 for i, mac in enumerate(factory_mac_addresses): 75 wutils.verify_mac_not_found_in_pcap(ads[i], mac, pcaps) 76 77 # Verify random MACs are being used and in pcaps 78 for i, mac in enumerate(mac_addresses): 79 wutils.verify_mac_is_found_in_pcap(ads[i], mac, pcaps) 80 81 def transfer_mac_format(self, mac): 82 """add ':' to mac String, and transfer to lower case 83 84 Args: 85 mac: String of mac without ':' 86 @return: Lower case String of mac like "xx:xx:xx:xx:xx:xx" 87 """ 88 return re.sub(r"(?<=\w)(?=(?:\w\w)+$)", ":", mac.lower()) 89 90 def start_aware(self, dut, is_publish): 91 """Start Aware attach, then start Publish/Subscribe based on role 92 93 Args: 94 dut: Aware device 95 is_publish: True for Publisher, False for subscriber 96 @:return: dict with Aware discovery session info 97 """ 98 aware_id = dut.droid.wifiAwareAttach(True) 99 autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED) 100 event = autils.wait_for_event(dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED) 101 mac = self.transfer_mac_format(event["data"]["mac"]) 102 dut.log.info("NMI=%s", mac) 103 104 if is_publish: 105 config = autils.create_discovery_config(self.SERVICE_NAME, 106 aconsts.PUBLISH_TYPE_UNSOLICITED) 107 disc_id = dut.droid.wifiAwarePublish(aware_id, config) 108 autils.wait_for_event(dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED) 109 else: 110 config = autils.create_discovery_config(self.SERVICE_NAME, 111 aconsts.SUBSCRIBE_TYPE_PASSIVE) 112 disc_id = dut.droid.wifiAwareSubscribe(aware_id, config) 113 autils.wait_for_event(dut, aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED) 114 aware_session = {"awareId": aware_id, "discId": disc_id, "mac": mac} 115 return aware_session 116 117 def create_date_path(self, p_dut, pub_session, s_dut, sub_session, sec_type): 118 """Create NDP based on the security type(open, PMK, PASSPHRASE), run socket connect 119 120 Args: 121 p_dut: Publish device 122 p_disc_id: Publisher discovery id 123 peer_id_on_pub: peer id on publisher 124 s_dut: Subscribe device 125 s_disc_id: Subscriber discovery id 126 peer_id_on_sub: peer id on subscriber 127 sec_type: NDP security type(open, PMK or PASSPHRASE) 128 @:return: dict with NDP info 129 """ 130 131 passphrase = None 132 pmk = None 133 134 if sec_type == self.ENCR_TYPE_PASSPHRASE: 135 passphrase = self.PASSPHRASE 136 if sec_type == self.ENCR_TYPE_PMK: 137 pmk = self.PMK 138 139 p_req_key = autils.request_network( 140 p_dut, 141 p_dut.droid.wifiAwareCreateNetworkSpecifier(pub_session["discId"], 142 None, 143 passphrase, pmk)) 144 s_req_key = autils.request_network( 145 s_dut, 146 s_dut.droid.wifiAwareCreateNetworkSpecifier(sub_session["discId"], 147 sub_session["peerId"], 148 passphrase, pmk)) 149 150 p_net_event_nc = autils.wait_for_event_with_keys( 151 p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, 152 (cconsts.NETWORK_CB_KEY_EVENT, 153 cconsts.NETWORK_CB_CAPABILITIES_CHANGED), 154 (cconsts.NETWORK_CB_KEY_ID, p_req_key)) 155 s_net_event_nc = autils.wait_for_event_with_keys( 156 s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, 157 (cconsts.NETWORK_CB_KEY_EVENT, 158 cconsts.NETWORK_CB_CAPABILITIES_CHANGED), 159 (cconsts.NETWORK_CB_KEY_ID, s_req_key)) 160 p_net_event_lp = autils.wait_for_event_with_keys( 161 p_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, 162 (cconsts.NETWORK_CB_KEY_EVENT, 163 cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), 164 (cconsts.NETWORK_CB_KEY_ID, p_req_key)) 165 s_net_event_lp = autils.wait_for_event_with_keys( 166 s_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_NDP_TIMEOUT, 167 (cconsts.NETWORK_CB_KEY_EVENT, 168 cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), 169 (cconsts.NETWORK_CB_KEY_ID, s_req_key)) 170 171 p_aware_if = p_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME] 172 s_aware_if = s_net_event_lp["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME] 173 p_if_mac = self.transfer_mac_format(autils.get_mac_addr(p_dut, p_aware_if)) 174 p_dut.log.info("NDI %s=%s", p_aware_if, p_if_mac) 175 s_if_mac = self.transfer_mac_format(autils.get_mac_addr(s_dut, s_aware_if)) 176 s_dut.log.info("NDI %s=%s", s_aware_if, s_if_mac) 177 178 s_ipv6 = p_net_event_nc["data"][aconsts.NET_CAP_IPV6] 179 p_ipv6 = s_net_event_nc["data"][aconsts.NET_CAP_IPV6] 180 asserts.assert_true( 181 autils.verify_socket_connect(p_dut, s_dut, p_ipv6, s_ipv6, 0), 182 "Failed socket link with Pub as Server") 183 asserts.assert_true( 184 autils.verify_socket_connect(s_dut, p_dut, s_ipv6, p_ipv6, 0), 185 "Failed socket link with Sub as Server") 186 187 ndp_info = {"pubReqKey": p_req_key, "pubIfMac": p_if_mac, 188 "subReqKey": s_req_key, "subIfMac": s_if_mac} 189 return ndp_info 190 191 @test_tracker_info(uuid="c9c66873-a8e0-4830-8baa-ada03223bcef") 192 def test_ib_multi_data_path_mac_random_test(self): 193 """Verify there is no factory MAC Address leakage during the Aware discovery, NDP creation, 194 socket setup and IP service connection.""" 195 196 p_dut = self.android_devices[0] 197 s_dut = self.android_devices[1] 198 mac_addresses = [] 199 factory_mac_addresses = [] 200 sec_types = [self.ENCR_TYPE_PMK, self.ENCR_TYPE_PASSPHRASE] 201 202 self.log.info("Starting packet capture") 203 pcap_procs = wutils.start_pcap( 204 self.packet_capture, 'dual', self.test_name) 205 206 factory_mac_1 = p_dut.droid.wifigetFactorymacAddresses()[0] 207 p_dut.log.info("Factory Address: %s", factory_mac_1) 208 factory_mac_2 = s_dut.droid.wifigetFactorymacAddresses()[0] 209 s_dut.log.info("Factory Address: %s", factory_mac_2) 210 factory_mac_addresses.append(factory_mac_1) 211 factory_mac_addresses.append(factory_mac_2) 212 213 # Start Aware and exchange messages 214 publish_session = self.start_aware(p_dut, True) 215 subscribe_session = self.start_aware(s_dut, False) 216 mac_addresses.append(publish_session["mac"]) 217 mac_addresses.append(subscribe_session["mac"]) 218 discovery_event = autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_SERVICE_DISCOVERED) 219 subscribe_session["peerId"] = discovery_event['data'][aconsts.SESSION_CB_KEY_PEER_ID] 220 221 msg_id = self.get_next_msg_id() 222 s_dut.droid.wifiAwareSendMessage(subscribe_session["discId"], subscribe_session["peerId"], 223 msg_id, self.ping_msg, aconsts.MAX_TX_RETRIES) 224 autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_MESSAGE_SENT) 225 pub_rx_msg_event = autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED) 226 publish_session["peerId"] = pub_rx_msg_event['data'][aconsts.SESSION_CB_KEY_PEER_ID] 227 228 msg_id = self.get_next_msg_id() 229 p_dut.droid.wifiAwareSendMessage(publish_session["discId"], publish_session["peerId"], 230 msg_id, self.ping_msg, aconsts.MAX_TX_RETRIES) 231 autils.wait_for_event(p_dut, aconsts.SESSION_CB_ON_MESSAGE_SENT) 232 autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_MESSAGE_RECEIVED) 233 234 # Create Aware NDP 235 p_req_keys = [] 236 s_req_keys = [] 237 for sec in sec_types: 238 ndp_info = self.create_date_path(p_dut, publish_session, s_dut, subscribe_session, sec) 239 p_req_keys.append(ndp_info["pubReqKey"]) 240 s_req_keys.append(ndp_info["subReqKey"]) 241 mac_addresses.append(ndp_info["pubIfMac"]) 242 mac_addresses.append(ndp_info["subIfMac"]) 243 244 # clean-up 245 for p_req_key in p_req_keys: 246 p_dut.droid.connectivityUnregisterNetworkCallback(p_req_key) 247 for s_req_key in s_req_keys: 248 s_dut.droid.connectivityUnregisterNetworkCallback(s_req_key) 249 p_dut.droid.wifiAwareDestroyAll() 250 s_dut.droid.wifiAwareDestroyAll() 251 252 self.log.info("Stopping packet capture") 253 wutils.stop_pcap(self.packet_capture, pcap_procs, False) 254 255 self.verify_mac_no_leakage(pcap_procs, factory_mac_addresses, mac_addresses) 256