1# Copyright 2019 - The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""OtaTools class.""" 15 16import logging 17import os 18import stat 19import subprocess 20import tempfile 21 22from acloud import errors 23from acloud.internal import constants 24from acloud.internal.lib import utils 25 26logger = logging.getLogger(__name__) 27 28_BIN_DIR_NAME = "bin" 29_LPMAKE = "lpmake" 30_BUILD_SUPER_IMAGE = "build_super_image" 31_AVBTOOL = "avbtool" 32_SGDISK = "sgdisk" 33_SIMG2IMG = "simg2img" 34_MK_COMBINED_IMG = "mk_combined_img" 35 36_BUILD_SUPER_IMAGE_TIMEOUT_SECS = 30 37_AVBTOOL_TIMEOUT_SECS = 30 38_MK_COMBINED_IMG_TIMEOUT_SECS = 180 39 40_MISSING_OTA_TOOLS_MSG = ("%(tool_name)s is not found. Run `make otatools` " 41 "in build environment, or set --local-tool to an " 42 "extracted otatools.zip.") 43 44 45def FindOtaTools(search_paths): 46 """Find OTA tools in the search paths and in build environment. 47 48 Args: 49 search_paths: List of paths, the directories to search for OTA tools. 50 51 Returns: 52 The directory containing OTA tools. 53 54 Raises: 55 errors.CheckPathError if OTA tools are not found. 56 """ 57 for search_path in search_paths: 58 if os.path.isfile(os.path.join(search_path, _BIN_DIR_NAME, 59 _BUILD_SUPER_IMAGE)): 60 return search_path 61 62 host_out_dir = os.environ.get(constants.ENV_ANDROID_HOST_OUT) 63 if (host_out_dir and 64 os.path.isfile(os.path.join(host_out_dir, _BIN_DIR_NAME, 65 _BUILD_SUPER_IMAGE))): 66 return host_out_dir 67 68 raise errors.CheckPathError(_MISSING_OTA_TOOLS_MSG % 69 {"tool_name": "OTA tool directory"}) 70 71 72class OtaTools(object): 73 """The class that executes OTA tool commands.""" 74 75 def __init__(self, ota_tools_dir): 76 self._ota_tools_dir = os.path.abspath(ota_tools_dir) 77 78 def _GetBinary(self, name): 79 """Get an executable file from _ota_tools_dir. 80 81 Args: 82 name: String, the file name. 83 84 Returns: 85 String, the absolute path. 86 87 Raises: 88 errors.NoExecuteCmd if the file does not exist. 89 """ 90 path = os.path.join(self._ota_tools_dir, _BIN_DIR_NAME, name) 91 if not os.path.isfile(path): 92 raise errors.NoExecuteCmd(_MISSING_OTA_TOOLS_MSG % 93 {"tool_name": name}) 94 mode = os.stat(path).st_mode 95 os.chmod(path, mode | (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | 96 stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)) 97 return path 98 99 @staticmethod 100 def _ExecuteCommand(*command, **popen_args): 101 """Execute a command and log the output. 102 103 This method waits for the process to terminate. It kills the process 104 if it's interrupted due to timeout. 105 106 Args: 107 command: Strings, the command. 108 popen_kwargs: The arguments to be passed to subprocess.Popen. 109 110 Raises: 111 errors.SubprocessFail if the process returns non-zero. 112 """ 113 proc = None 114 try: 115 logger.info("Execute %s", command) 116 popen_args["stdin"] = subprocess.PIPE 117 popen_args["stdout"] = subprocess.PIPE 118 popen_args["stderr"] = subprocess.PIPE 119 proc = subprocess.Popen(command, **popen_args) 120 stdout, stderr = proc.communicate() 121 logger.info("%s stdout: %s", command[0], stdout) 122 logger.info("%s stderr: %s", command[0], stderr) 123 124 if proc.returncode != 0: 125 raise errors.SubprocessFail("%s returned %d." % 126 (command[0], proc.returncode)) 127 finally: 128 if proc and proc.poll() is None: 129 logger.info("Kill %s", command[0]) 130 proc.kill() 131 132 @staticmethod 133 def _RewriteMiscInfo(output_file, input_file, lpmake_path, get_image): 134 """Rewrite lpmake and image paths in misc_info.txt. 135 136 Misc info consists of multiple lines of <key>=<value>. 137 Sample input_file: 138 lpmake=lpmake 139 dynamic_partition_list= system system_ext product vendor 140 141 Sample output_file: 142 lpmake=/path/to/lpmake 143 dynamic_partition_list= system system_ext product vendor 144 system_image=/path/to/system.img 145 system_ext_image=/path/to/system_ext.img 146 product_image=/path/to/product.img 147 vendor_image=/path/to/vendor.img 148 149 This method replaces lpmake with the specified path, and sets 150 *_image for every partition in dynamic_partition_list. 151 152 Args: 153 output_file: The output file object. 154 input_file: The input file object. 155 lpmake_path: The path to lpmake binary. 156 get_image: A function that takes the partition name as the 157 parameter and returns the image path. 158 """ 159 partition_names = () 160 for line in input_file: 161 split_line = line.strip().split("=", 1) 162 if len(split_line) < 2: 163 split_line = (split_line[0], "") 164 if split_line[0] == "dynamic_partition_list": 165 partition_names = split_line[1].split() 166 elif split_line[0] == "lpmake": 167 output_file.write("lpmake=%s\n" % lpmake_path) 168 continue 169 elif split_line[0].endswith("_image"): 170 continue 171 output_file.write(line) 172 173 if not partition_names: 174 logger.w("No dynamic partition list in misc info.") 175 176 for partition_name in partition_names: 177 output_file.write("%s_image=%s\n" % 178 (partition_name, get_image(partition_name))) 179 180 @utils.TimeExecute(function_description="Build super image") 181 @utils.TimeoutException(_BUILD_SUPER_IMAGE_TIMEOUT_SECS) 182 def BuildSuperImage(self, output_path, misc_info_path, get_image): 183 """Use build_super_image to create a super image. 184 185 Args: 186 output_path: The path to the output super image. 187 misc_info_path: The path to the misc info that provides parameters 188 to create the super image. 189 get_image: A function that takes the partition name as the 190 parameter and returns the image path. 191 """ 192 build_super_image = self._GetBinary(_BUILD_SUPER_IMAGE) 193 lpmake = self._GetBinary(_LPMAKE) 194 195 new_misc_info_path = None 196 try: 197 with open(misc_info_path, "r") as misc_info: 198 with tempfile.NamedTemporaryFile( 199 prefix="misc_info_", suffix=".txt", 200 delete=False) as new_misc_info: 201 new_misc_info_path = new_misc_info.name 202 self._RewriteMiscInfo(new_misc_info, misc_info, lpmake, 203 get_image) 204 205 self._ExecuteCommand(build_super_image, new_misc_info_path, 206 output_path) 207 finally: 208 if new_misc_info_path: 209 os.remove(new_misc_info_path) 210 211 @utils.TimeExecute(function_description="Make disabled vbmeta image.") 212 @utils.TimeoutException(_AVBTOOL_TIMEOUT_SECS) 213 def MakeDisabledVbmetaImage(self, output_path): 214 """Use avbtool to create a vbmeta image with verification disabled. 215 216 Args: 217 output_path: The path to the output vbmeta image. 218 """ 219 avbtool = self._GetBinary(_AVBTOOL) 220 self._ExecuteCommand(avbtool, "make_vbmeta_image", 221 "--flag", "2", 222 "--padding_size", "4096", 223 "--output", output_path) 224 225 @staticmethod 226 def _RewriteSystemQemuConfig(output_file, input_file, get_image): 227 """Rewrite image paths in system-qemu-config.txt. 228 229 Sample input_file: 230 out/target/product/generic_x86_64/vbmeta.img vbmeta 1 231 out/target/product/generic_x86_64/super.img super 2 232 233 Sample output_file: 234 /path/to/vbmeta.img vbmeta 1 235 /path/to/super.img super 2 236 237 This method replaces the first entry of each line with the path 238 returned by get_image. 239 240 Args: 241 output_file: The output file object. 242 input_file: The input file object. 243 get_image: A function that takes the partition name as the 244 parameter and returns the image path. 245 """ 246 for line in input_file: 247 split_line = line.split() 248 if len(split_line) == 3: 249 output_file.write("%s %s %s\n" % (get_image(split_line[1]), 250 split_line[1], 251 split_line[2])) 252 else: 253 output_file.write(line) 254 255 @utils.TimeExecute(function_description="Make combined image") 256 @utils.TimeoutException(_MK_COMBINED_IMG_TIMEOUT_SECS) 257 def MkCombinedImg(self, output_path, system_qemu_config_path, get_image): 258 """Use mk_combined_img to create a disk image. 259 260 Args: 261 output_path: The path to the output disk image. 262 system_qemu_config: The path to the config that provides the 263 parition information on the disk. 264 get_image: A function that takes the partition name as the 265 parameter and returns the image path. 266 """ 267 mk_combined_img = self._GetBinary(_MK_COMBINED_IMG) 268 sgdisk = self._GetBinary(_SGDISK) 269 simg2img = self._GetBinary(_SIMG2IMG) 270 271 new_config_path = None 272 try: 273 with open(system_qemu_config_path, "r") as config: 274 with tempfile.NamedTemporaryFile( 275 prefix="system-qemu-config_", suffix=".txt", 276 delete=False) as new_config: 277 new_config_path = new_config.name 278 self._RewriteSystemQemuConfig(new_config, config, 279 get_image) 280 281 mk_combined_img_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img} 282 self._ExecuteCommand(mk_combined_img, 283 "-i", new_config_path, 284 "-o", output_path, 285 env=mk_combined_img_env) 286 finally: 287 if new_config_path: 288 os.remove(new_config_path) 289