# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under both the MIT license found in the
# LICENSE-MIT file in the root directory of this source tree and the Apache
# License, Version 2.0 found in the LICENSE-APACHE file in the root directory
# of this source tree.
# pyre-strict
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from .assemble_bundle_types import BundleSpecItem, IncrementalContext
from .incremental_state import CodesignedOnCopy, IncrementalStateItem
FILES_TO_BE_IGNORED: Set[str] = {
# Storage of Finder settings, which shouldn't be added when enumerating files from sources
".DS_Store",
}
def should_assemble_incrementally(
spec: List[BundleSpecItem], incremental_context: IncrementalContext
) -> bool:
previous_run_state = incremental_context.state
if previous_run_state is None:
logging.getLogger(__name__).info(
"Decided not to assemble incrementally — no incremental state for previous build."
)
return False
if previous_run_state.versioned_if_macos != incremental_context.versioned_if_macos:
logging.getLogger(__name__).info(
"Decided not to assemble incrementally — current build and previous build have different versioned_if_macos settings."
)
return False
previously_codesigned = previous_run_state.codesigned
# If previously bundle was not code signed there should be no problems with code signing
# currently in incremental mode. Existing binaries could be code signed "on
# top" if that's needed.
if not previously_codesigned:
logging.getLogger(__name__).info(
"Decided to assemble incrementally — previous build is not codesigned."
)
return True
# For simplicity and correctness purposes instead of stripping code signatures we
# perform non-incremental run.
if not incremental_context.codesigned:
logging.getLogger(__name__).info(
"Decided not to assemble incrementally — current build is not codesigned, while previous build is codesigned."
)
return False
# If previous identity is different from the current one also perform non-incremental run.
if previous_run_state.codesign_identity != incremental_context.codesign_identity:
logging.getLogger(__name__).info(
"Decided not to assemble incrementally — previous vs current builds have mismatching codesigning identities."
)
return False
# If previous codesign arguments are different from the current ones also perform non-incremental run.
if previous_run_state.codesign_arguments != incremental_context.codesign_arguments:
logging.getLogger(__name__).info(
"Decided not to assemble incrementally — previous vs current builds have mismatching codesigning arguments."
)
return False
# If bundle from previous run was signed in a different configuration vs the current run (e.g. dry code signed while now regular code signing is required) perform non-incremental run.
if (
previous_run_state.codesign_configuration
!= incremental_context.codesign_configuration
):
logging.getLogger(__name__).info(
"Decided not to assemble incrementally — previous vs current builds have mismatching codesigning configurations."
)
return False
# If there is an artifact that was code signed on copy in previous run which is
# present in current run and not code signed on copy, we should perform
# non-incremental run for simplicity and correctness reasons.
current_codesigned_on_copy_items = {
codesigned_on_copy_item(
path=Path(i.dst),
entitlements=(
Path(i.codesign_entitlements) if i.codesign_entitlements else None
),
incremental_context=incremental_context,
codesign_flags_override=i.codesign_flags_override,
extra_codesign_paths=i.extra_codesign_paths,
)
for i in spec
if i.codesign_on_copy
}
codesigned_on_copy_paths_from_previous_build_which_are_present_in_current_build = _codesigned_on_copy_paths_from_previous_build_which_are_present_in_current_build(
previous_run_state.codesigned_on_copy,
{Path(i.dst) for i in spec},
)
codesign_on_copy_paths_are_compatible = codesigned_on_copy_paths_from_previous_build_which_are_present_in_current_build.issubset(
current_codesigned_on_copy_items
)
if not codesign_on_copy_paths_are_compatible:
logging.getLogger(__name__).info(
f"Decided not to assemble incrementally — there is at least one artifact `{list(codesigned_on_copy_paths_from_previous_build_which_are_present_in_current_build - current_codesigned_on_copy_items)[0]}` that was code signed on copy in previous build which is present in current run and not code signed on copy (or codesigned but with a different set of entitlements, flags or extra paths.)."
)
return codesign_on_copy_paths_are_compatible
def _codesigned_on_copy_paths_from_previous_build_which_are_present_in_current_build(
previously_codesigned_on_copy: List[CodesignedOnCopy],
all_input_files: Set[Path],
) -> Set[CodesignedOnCopy]:
all_input_files_and_directories = all_input_files | {
i for file in all_input_files for i in file.parents
}
return {
i
for i in previously_codesigned_on_copy
if i.path in all_input_files_and_directories
}
def _get_new_digest(action_metadata: Dict[Path, str], path: Path) -> str:
# While a resource file can be in a symlinked folder, like the `ghi/def` example below,
# ```
# project_root
# ├── abc
# │ └── def
# └── ghi -> abc
# ```
# In this case, Python would say `ghi/abc` not a symlink. However the `action_metadata` comes
# with the actual resolved path (`abc/def`). We need to resolve the path then.
# Given Python doesn't think it's a symlink, the `readlink` API wouldn't work either
resolved_path = path.resolve().relative_to(Path.cwd())
return action_metadata[resolved_path]
def calculate_incremental_state(
spec: List[BundleSpecItem], action_metadata: Dict[Path, str]
) -> List[IncrementalStateItem]:
"""
`action_metadata` maps Buck project relative paths to hash digest
for every input file of the action which executes this script
"""
result = []
source_with_destination_files = _source_with_destination_files(spec)
for src, dst in source_with_destination_files:
is_symlink = src.is_symlink()
new_digest = _get_new_digest(action_metadata, src) if not is_symlink else None
resolved_symlink = Path(os.readlink(src)) if is_symlink else None
result.append(
IncrementalStateItem(
source=src,
destination_relative_to_bundle=dst,
digest=new_digest,
resolved_symlink=resolved_symlink,
)
)
return result
def _source_with_destination_files(
spec: List[BundleSpecItem],
) -> List[Tuple[Path, Path]]:
"""
Returns:
Ordered mapping from source path to destination path (relative to bundle) for every file
present in a bundle. Directories that were parts of the spec are split into actual files.
"""
result = []
for spec_item in spec:
file_or_dir = Path(spec_item.src)
if file_or_dir.is_file():
if not spec_item.dst:
raise RuntimeError(
f'Invalid input bundle spec. File located at {file_or_dir} should not have `""` destination (only directories are allowed to have such value).'
)
result.append((file_or_dir, Path(spec_item.dst)))
elif file_or_dir.is_dir():
result.extend(
[
(file, spec_item.dst / file.relative_to(file_or_dir))
for file in _list_directory_deterministically(file_or_dir)
]
)
else:
raise RuntimeError(
f"Path {file_or_dir} is not a file and not a dir, don't know how to handle it."
)
return [(src, dst) for src, dst in result if src.name not in FILES_TO_BE_IGNORED]
def _list_directory_deterministically(directory: Path) -> List[Path]:
result = []
for current_dir_path, dir_names, file_names in os.walk(directory):
result += [Path(os.path.join(current_dir_path, f)) for f in sorted(file_names)]
# Sort in order for walk to be deterministic.
dir_names.sort()
return result
def codesigned_on_copy_item(
path: Path,
entitlements: Optional[Path],
incremental_context: IncrementalContext,
codesign_flags_override: Optional[List[str]],
extra_codesign_paths: Optional[List[str]],
) -> CodesignedOnCopy:
if entitlements is not None:
digest = incremental_context.metadata.get(entitlements)
if digest is None:
raise RuntimeError(
f"Expected digest for entitlements file path `{entitlements}` to be present in action metadata."
)
else:
digest = None
return CodesignedOnCopy(
path=path,
entitlements_digest=digest,
codesign_flags_override=codesign_flags_override,
extra_codesign_paths=extra_codesign_paths,
)