跳轉至

Utilities API 參考

boring.utils

TransactionalFileWriter

Ensures atomic file writes using a temporary file. Prevents data corruption during concurrent access or crashes.

Source code in src/boring/core/utils.py
class TransactionalFileWriter:
    """
    Ensures atomic file writes using a temporary file.
    Prevents data corruption during concurrent access or crashes.
    """

    @staticmethod
    def _validate_path(file_path: Path):
        """
        [SECURITY FIX] Path Jail
        Ensure path is within project root.
        """
        from .config import settings

        abs_path = Path(file_path).resolve()
        abs_root = settings.PROJECT_ROOT.resolve()

        if not abs_path.is_relative_to(abs_root):
            raise ValueError(
                f"SECURITY ERROR: Path Traversal blocked. {file_path} is outside project root."
            )

    @staticmethod
    def write_json(file_path: Path, data: dict, indent: int = 4) -> bool:
        """Write a dictionary to a JSON file atomically."""
        try:
            TransactionalFileWriter._validate_path(file_path)
        except ValueError as e:
            logger.error(str(e))
            return False

        file_path = Path(file_path)
        file_path.parent.mkdir(parents=True, exist_ok=True)

        # Create temporary file in the same directory
        temp_fd, temp_path = tempfile.mkstemp(
            suffix=".tmp", prefix=".boring_", dir=file_path.parent
        )
        try:
            with os.fdopen(temp_fd, "w", encoding="utf-8") as f:
                json.dump(data, f, indent=indent, ensure_ascii=False)
                f.flush()
                # Ensure data is written to disk
                os.fsync(f.fileno())

            # Atomic replace with retry for Windows lock contention
            for attempt in range(5):
                try:
                    os.replace(temp_path, file_path)
                    return True
                except (OSError, PermissionError) as e:
                    # Windows Error 32: Sharing violation
                    if attempt < 4 and (getattr(e, "errno", 0) == 13 or "WinError 32" in str(e)):
                        time.sleep(0.2 * (attempt + 1))
                        continue
                    raise

            return False
        except Exception as e:
            if os.path.exists(temp_path):
                try:
                    os.unlink(temp_path)
                except Exception:
                    pass
            logger.error(f"Atomic JSON write failed for {file_path}: {e}")
            return False

    @staticmethod
    def write_text(file_path: Path, content: str) -> bool:
        """Write a string to a file atomically."""
        try:
            TransactionalFileWriter._validate_path(file_path)
        except ValueError as e:
            logger.error(str(e))
            return False

        file_path = Path(file_path)
        file_path.parent.mkdir(parents=True, exist_ok=True)

        temp_fd, temp_path = tempfile.mkstemp(
            suffix=".tmp", prefix=".boring_", dir=file_path.parent
        )
        try:
            with os.fdopen(temp_fd, "w", encoding="utf-8") as f:
                f.write(content)
                f.flush()
                os.fsync(f.fileno())

            # Atomic replace with retry for Windows lock contention
            for attempt in range(5):
                try:
                    os.replace(temp_path, file_path)
                    return True
                except (OSError, PermissionError) as e:
                    if attempt < 4 and (getattr(e, "errno", 0) == 13 or "WinError 32" in str(e)):
                        time.sleep(0.2 * (attempt + 1))
                        continue
                    raise

            return False
        except Exception as e:
            if os.path.exists(temp_path):
                try:
                    os.unlink(temp_path)
                except Exception:
                    pass
            logger.error(f"Atomic text write failed for {file_path}: {e}")
            return False

    @staticmethod
    def write_gzip(file_path: Path, content: str) -> bool:
        """Write a string to a gzipped file atomically (V14.1)."""
        try:
            TransactionalFileWriter._validate_path(file_path)
        except ValueError as e:
            logger.error(str(e))
            return False

        import gzip

        file_path = Path(file_path)
        file_path.parent.mkdir(parents=True, exist_ok=True)

        temp_fd, temp_path = tempfile.mkstemp(
            suffix=".tmp.gz", prefix=".boring_", dir=file_path.parent
        )
        try:
            # Close the fd from mkstemp and use gzip.open
            os.close(temp_fd)
            with gzip.open(temp_path, "wt", encoding="utf-8") as f:
                f.write(content)
                f.flush()

            # Atomic replace with retry for Windows lock contention
            for attempt in range(5):
                try:
                    os.replace(temp_path, file_path)
                    return True
                except (OSError, PermissionError) as e:
                    if attempt < 4 and (getattr(e, "errno", 0) == 13 or "WinError 32" in str(e)):
                        time.sleep(0.2 * (attempt + 1))
                        continue
                    raise

            return False
        except Exception as e:
            if os.path.exists(temp_path):
                try:
                    os.unlink(temp_path)
                except Exception:
                    pass
            logger.error(f"Atomic Gzip write failed for {file_path}: {e}")
            return False

write_json(file_path, data, indent=4) staticmethod

Write a dictionary to a JSON file atomically.

Source code in src/boring/core/utils.py
@staticmethod
def write_json(file_path: Path, data: dict, indent: int = 4) -> bool:
    """Write a dictionary to a JSON file atomically."""
    try:
        TransactionalFileWriter._validate_path(file_path)
    except ValueError as e:
        logger.error(str(e))
        return False

    file_path = Path(file_path)
    file_path.parent.mkdir(parents=True, exist_ok=True)

    # Create temporary file in the same directory
    temp_fd, temp_path = tempfile.mkstemp(
        suffix=".tmp", prefix=".boring_", dir=file_path.parent
    )
    try:
        with os.fdopen(temp_fd, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=indent, ensure_ascii=False)
            f.flush()
            # Ensure data is written to disk
            os.fsync(f.fileno())

        # Atomic replace with retry for Windows lock contention
        for attempt in range(5):
            try:
                os.replace(temp_path, file_path)
                return True
            except (OSError, PermissionError) as e:
                # Windows Error 32: Sharing violation
                if attempt < 4 and (getattr(e, "errno", 0) == 13 or "WinError 32" in str(e)):
                    time.sleep(0.2 * (attempt + 1))
                    continue
                raise

        return False
    except Exception as e:
        if os.path.exists(temp_path):
            try:
                os.unlink(temp_path)
            except Exception:
                pass
        logger.error(f"Atomic JSON write failed for {file_path}: {e}")
        return False

write_text(file_path, content) staticmethod

Write a string to a file atomically.

Source code in src/boring/core/utils.py
@staticmethod
def write_text(file_path: Path, content: str) -> bool:
    """Write a string to a file atomically."""
    try:
        TransactionalFileWriter._validate_path(file_path)
    except ValueError as e:
        logger.error(str(e))
        return False

    file_path = Path(file_path)
    file_path.parent.mkdir(parents=True, exist_ok=True)

    temp_fd, temp_path = tempfile.mkstemp(
        suffix=".tmp", prefix=".boring_", dir=file_path.parent
    )
    try:
        with os.fdopen(temp_fd, "w", encoding="utf-8") as f:
            f.write(content)
            f.flush()
            os.fsync(f.fileno())

        # Atomic replace with retry for Windows lock contention
        for attempt in range(5):
            try:
                os.replace(temp_path, file_path)
                return True
            except (OSError, PermissionError) as e:
                if attempt < 4 and (getattr(e, "errno", 0) == 13 or "WinError 32" in str(e)):
                    time.sleep(0.2 * (attempt + 1))
                    continue
                raise

        return False
    except Exception as e:
        if os.path.exists(temp_path):
            try:
                os.unlink(temp_path)
            except Exception:
                pass
        logger.error(f"Atomic text write failed for {file_path}: {e}")
        return False

write_gzip(file_path, content) staticmethod

Write a string to a gzipped file atomically (V14.1).

Source code in src/boring/core/utils.py
@staticmethod
def write_gzip(file_path: Path, content: str) -> bool:
    """Write a string to a gzipped file atomically (V14.1)."""
    try:
        TransactionalFileWriter._validate_path(file_path)
    except ValueError as e:
        logger.error(str(e))
        return False

    import gzip

    file_path = Path(file_path)
    file_path.parent.mkdir(parents=True, exist_ok=True)

    temp_fd, temp_path = tempfile.mkstemp(
        suffix=".tmp.gz", prefix=".boring_", dir=file_path.parent
    )
    try:
        # Close the fd from mkstemp and use gzip.open
        os.close(temp_fd)
        with gzip.open(temp_path, "wt", encoding="utf-8") as f:
            f.write(content)
            f.flush()

        # Atomic replace with retry for Windows lock contention
        for attempt in range(5):
            try:
                os.replace(temp_path, file_path)
                return True
            except (OSError, PermissionError) as e:
                if attempt < 4 and (getattr(e, "errno", 0) == 13 or "WinError 32" in str(e)):
                    time.sleep(0.2 * (attempt + 1))
                    continue
                raise

        return False
    except Exception as e:
        if os.path.exists(temp_path):
            try:
                os.unlink(temp_path)
            except Exception:
                pass
        logger.error(f"Atomic Gzip write failed for {file_path}: {e}")
        return False

safe_read_text(file_path)

Safely read text file with UTF-8 encoding.

Source code in src/boring/core/utils.py
def safe_read_text(file_path: Path) -> str:
    """Safely read text file with UTF-8 encoding."""
    try:
        return Path(file_path).read_text(encoding="utf-8", errors="replace")
    except Exception as e:
        console.print(f"[yellow]Failed to read {file_path}: {e}[/yellow]")
        return ""

check_syntax(file_path)

Checks if a Python file has valid syntax. Returns (is_valid, error_message).

Source code in src/boring/core/utils.py
def check_syntax(file_path: Path) -> tuple[bool, str]:
    """
    Checks if a Python file has valid syntax.
    Returns (is_valid, error_message).
    """
    try:
        source = safe_read_text(file_path)
        if not source:
            return False, f"Could not read {file_path}"
        ast.parse(source)
        return True, ""
    except SyntaxError as e:
        return False, f"SyntaxError in {file_path.name} line {e.lineno}: {e.msg}"
    except Exception as e:
        return False, f"Error checking syntax for {file_path.name}: {str(e)}"

check_and_install_dependencies(code_content)

Scans code for imports and installs missing packages using pip. Note: This is a heuristics-based approach.

Source code in src/boring/core/utils.py
def check_and_install_dependencies(code_content: str):
    """
    Scans code for imports and installs missing packages using pip.
    Note: This is a heuristics-based approach.
    """
    # Regex to find 'import x' or 'from x import y'
    imports = set()

    # Analyze AST for robust import detection
    try:
        tree = ast.parse(code_content)
        for node in ast.walk(tree):
            if isinstance(node, ast.Import):
                for alias in node.names:
                    imports.add(alias.name.split(".")[0])
            elif isinstance(node, ast.ImportFrom):
                if node.module:
                    imports.add(node.module.split(".")[0])
    except Exception:
        # If code is not parseable, we can't detect imports reliably
        return

    # Filter out standard library modules (simplified)
    # Just try to import. If fails, try install.
    for module_name in imports:
        if not module_name:
            continue

        try:
            __import__(module_name)
        except ImportError:
            console.print(f"[yellow]Module '{module_name}' missing.[/yellow]")

            # [SECURITY FIX] Human-in-the-loop for dependency installation
            # Prevent RCE via malicious package names
            package_name = _map_module_to_package(module_name)

            msg = f"⚠️  Security Alert: The agent wants to install '{package_name}'. Allow?"
            try:
                # Default to NO for security
                should_install = typer.confirm(msg, default=False)
            except Exception:
                # If non-interactive (e.g. headless), auto-deny
                console.print(
                    f"[red]Headless mode: Auto-denying installation of {package_name}[/red]"
                )
                should_install = False

            if should_install:
                try:
                    # Force UTF-8 for subprocess to avoid Windows encoding issues
                    env = os.environ.copy()
                    env["PYTHONIOENCODING"] = "utf-8"

                    subprocess.check_call(
                        [sys.executable, "-m", "pip", "install", package_name],
                        env=env,
                        stdout=subprocess.DEVNULL,
                        stderr=subprocess.DEVNULL,
                    )
                    console.print(f"[green]Successfully installed {package_name}[/green]")
                except subprocess.CalledProcessError:
                    console.print(f"[red]Failed to install {package_name}. Ignoring.[/red]")
            else:
                console.print(
                    f"[yellow]Skipping installation of {package_name}. Code may fail.[/yellow]"
                )

robust_write_file(path, content)

Write text to file with retries.

Source code in src/boring/utils/__init__.py
@retry(stop=stop_after_attempt(3), wait=wait_fixed(0.5))
def robust_write_file(path: Path, content: str) -> None:
    """Write text to file with retries."""
    path.write_text(content, encoding="utf-8")

robust_read_file(path)

Read text from file with retries.

Source code in src/boring/utils/__init__.py
@retry(stop=stop_after_attempt(3), wait=wait_fixed(0.5))
def robust_read_file(path: Path) -> str:
    """Read text from file with retries."""
    return path.read_text(encoding="utf-8")

get_project_tree(path, ignore_patterns=None, max_depth=-1, prefix='')

Recursively lists the contents of a directory, similar to the tree command.

Parameters:

Name Type Description Default
path Path

The starting directory path.

required
ignore_patterns list

A list of glob patterns to ignore.

None
max_depth int

The maximum depth to traverse. -1 for no limit.

-1
prefix str

Internal parameter for recursion, do not use.

''

Returns:

Type Description
str

A string representing the directory tree.

Source code in src/boring/utils/__init__.py
def get_project_tree(
    path: Path, ignore_patterns: list = None, max_depth: int = -1, prefix: str = ""
) -> str:
    """
    Recursively lists the contents of a directory, similar to the `tree` command.

    Args:
        path: The starting directory path.
        ignore_patterns: A list of glob patterns to ignore.
        max_depth: The maximum depth to traverse. -1 for no limit.
        prefix: Internal parameter for recursion, do not use.

    Returns:
        A string representing the directory tree.
    """
    if ignore_patterns is None:
        ignore_patterns = []

    tree_str = ""
    if prefix == "":
        tree_str += f"{path.name}/\n"

    if max_depth == 0:
        return tree_str

    entries = sorted(path.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower()))

    filtered_entries = []
    for entry in entries:
        is_ignored = any(entry.match(pattern) for pattern in ignore_patterns)
        if not is_ignored:
            filtered_entries.append(entry)

    for i, entry in enumerate(filtered_entries):
        connector = "├── " if i < len(entries) - 1 else "└── "
        entry_name = f"{entry.name}/" if entry.is_dir() else entry.name
        tree_str += f"{prefix}{connector}{entry_name}\n"

        if entry.is_dir():
            if max_depth != -1:
                sub_max_depth = max_depth - 1
            else:
                sub_max_depth = -1
            extension_prefix = "│   " if i < len(entries) - 1 else "    "
            tree_str += get_project_tree(
                entry, ignore_patterns, sub_max_depth, prefix + extension_prefix
            )

    return tree_str