跳轉至

Loop API 參考 (Loop & Workflow)

boring.loop

Boring Loop State Machine V10.26

A refactored, state-pattern-based agent loop with clear separation of concerns.

Module Structure: - base.py: LoopState ABC and utilities - context.py: LoopContext shared state - states/: Individual state implementations - agent.py: StatefulAgentLoop context class

V10.26 Reorganization: - shadow_mode: Human-in-the-loop protection (moved from root) - workflow_manager: Workflow package management (moved from root) - workflow_evolver: Dynamic workflow evolution (moved from root) - background_agent: Async task execution (moved from root) - transactions: Git-based snapshot/rollback (moved from root)

StatefulAgentLoop

State-pattern-based autonomous agent loop (V10.23 Enhanced).

This class is the Context in the State Pattern - it holds the current state and shared data, delegating logic to states.

V10.23 Features: - Automatic session context syncing with intelligence modules - Memory compaction to prevent context bloat - Error/task/file access recording for pattern learning

Usage

loop = StatefulAgentLoop(model_name="gemini-2.0-flash") loop.run()

Source code in src/boring/loop/agent.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
class StatefulAgentLoop:
    """
    State-pattern-based autonomous agent loop (V10.23 Enhanced).

    This class is the Context in the State Pattern - it holds
    the current state and shared data, delegating logic to states.

    V10.23 Features:
    - Automatic session context syncing with intelligence modules
    - Memory compaction to prevent context bloat
    - Error/task/file access recording for pattern learning

    Usage:
        loop = StatefulAgentLoop(model_name="gemini-2.0-flash")
        loop.run()
    """

    def __init__(
        self,
        model_name: str = settings.DEFAULT_MODEL,
        use_cli: bool = False,
        verbose: bool = False,
        prompt_file: Path | None = None,
        context_file: Path | None = None,
        verification_level: str = "STANDARD",
        interactive: bool = False,
    ):
        """Initialize the agent loop with configuration."""
        init_directories()

        # Create shared context
        self.context = LoopContext(
            model_name=model_name,
            use_cli=use_cli,
            verbose=verbose,
            interactive=interactive,
            verification_level=verification_level.upper(),
            project_root=settings.PROJECT_ROOT,
            log_dir=settings.LOG_DIR,
            prompt_file=prompt_file or settings.PROJECT_ROOT / settings.PROMPT_FILE,
        )

        # Initialize subsystems
        self._init_subsystems()

        # Initial state
        self._current_state: LoopState | None = None

        # RAG Watcher for auto-indexing
        self._rag_watcher: RAGWatcher | None = None

    def _init_subsystems(self) -> None:
        """Initialize all subsystems and inject into context."""
        ctx = self.context

        # Memory system
        ctx.memory = MemoryManager(ctx.project_root)

        # Code verifier
        ctx.verifier = CodeVerifier(ctx.project_root, ctx.log_dir)

        # Extensions manager
        ctx.extensions = ExtensionsManager(ctx.project_root)

        # SQLite storage for telemetry
        try:
            ctx.storage = create_storage(ctx.project_root, ctx.log_dir)
        except Exception as e:
            log_status(ctx.log_dir, "WARN", f"Failed to init storage: {e}")

        # Gemini client (SDK mode only)
        if not ctx.interactive:
            if not ctx.use_cli:
                ctx.gemini_client = create_gemini_client(
                    log_dir=ctx.log_dir, model_name=ctx.model_name
                )
                if not ctx.gemini_client:
                    raise RuntimeError("Failed to initialize Gemini SDK client")
            else:
                # Verify CLI is available
                if not shutil.which("gemini"):
                    raise RuntimeError(
                        "Gemini CLI not found in PATH. "
                        "Install with: npm install -g @google/gemini-cli"
                    )

        # RAG Watcher (for automatic re-indexing on file changes)
        try:
            self._rag_watcher = RAGWatcher(ctx.project_root)
        except Exception as e:
            log_status(ctx.log_dir, "WARN", f"Failed to init RAG watcher: {e}")

        # Log status
        if ctx.verbose:
            console.print(f"[dim]Memory: {ctx.memory.memory_dir}[/dim]")
            console.print(
                f"[dim]Verifier: ruff={ctx.verifier.tools.is_available('ruff')}, pytest={ctx.verifier.tools.is_available('pytest')}[/dim]"
            )

    def run(self) -> None:
        """Execute the main loop using state machine."""
        ctx = self.context

        # Check circuit breaker
        if should_halt_execution():
            console.print("[bold red]Circuit Breaker is OPEN. Execution halted.[/bold red]")
            log_status(ctx.log_dir, "CRITICAL", "Circuit Breaker is OPEN")

            if not self._handle_circuit_breaker_open():
                return

        # Display startup banner
        self._show_banner()

        # P4.2: Progress Persistence - Resume Session?
        progress = self._init_progress_manager()
        if progress and progress.has_progress():
            from rich.prompt import Confirm

            # Only ask if interactive or strict mode, otherwise maybe auto-resume?
            # For safety, let's ask if console is interactive.
            if not console.quiet and Confirm.ask(
                "[bold magenta]Found previous saved session. Resume?[/bold magenta]"
            ):
                if progress.restore_context(ctx):
                    console.print(f"[green]Restored session at Loop #{ctx.loop_count}[/green]")
                    log_status(
                        ctx.log_dir, "INFO", f"Restored session state (Loop {ctx.loop_count})"
                    )

        # Initialize tracking files
        from boring.paths import BoringPaths

        bp = BoringPaths(ctx.project_root)

        init_call_tracking(
            bp.state / ".call_count",
            bp.state / ".last_reset",
            bp.state / ".exit_signals",
        )

        # Deadlock Detector State
        _error_history = []

        # Start RAG Watcher (auto-index on file changes)
        if self._rag_watcher:
            try:
                from ..rag import create_rag_retriever

                retriever = create_rag_retriever(ctx.project_root)

                def on_file_change():
                    try:
                        retriever.build_index(incremental=True)
                        log_status(ctx.log_dir, "INFO", "[RAG] Incremental index complete")
                    except Exception as e:
                        log_status(ctx.log_dir, "WARN", f"[RAG] Re-index failed: {e}")

                self._rag_watcher.start(on_change=on_file_change)
                log_status(ctx.log_dir, "INFO", "[RAG] File watcher started")
            except ImportError:
                log_status(ctx.log_dir, "WARN", "[RAG] Watcher disabled (chromadb not installed)")
            except Exception as e:
                log_status(ctx.log_dir, "WARN", f"[RAG] Watcher failed to start: {e}")

        # Main loop
        try:
            from ..services.interrupt import InterruptHandler, start_interactive_shell

            interrupt_handler = InterruptHandler(
                save_callback=lambda: progress.save_progress(ctx) if progress else None
            )

            while ctx.should_continue():
                try:
                    # Check rate limits
                    if not can_make_call(
                        settings.PROJECT_ROOT / ".call_count", settings.MAX_HOURLY_CALLS
                    ):
                        if console.quiet:
                            ctx.mark_exit("Rate limit reached (Quiet/MCP mode)")
                            break
                        wait_for_reset(
                            settings.PROJECT_ROOT / ".call_count",
                            settings.PROJECT_ROOT / ".last_reset",
                            settings.MAX_HOURLY_CALLS,
                        )
                        console.print("[yellow]Rate limit reset. Resuming...[/yellow]")

                    # V10.23: Memory compaction check before each loop
                    self._v10_23_pre_loop_maintenance()

                    # Deadlock Detection (The "Stop Being Stupid" Fix)
                    current_errors = str(sorted(ctx.errors_this_loop))
                    if current_errors:
                        _error_history.append(current_errors)
                        if len(_error_history) >= 3:
                            # Check if last 3 loops had SAME error set
                            if _error_history[-1] == _error_history[-2] == _error_history[-3]:
                                console.print(
                                    "[bold red]🛑 Deadlock Detected: Same error persisted for 3 loops.[/bold red]"
                                )
                                ctx.mark_exit("Deadlock Detected (Infinite Error Loop)")
                                break
                        # Keep history manageable
                        if len(_error_history) > 10:
                            _error_history.pop(0)

                    # Start new iteration
                    ctx.increment_loop()
                    log_status(ctx.log_dir, "LOOP", f"=== Starting Loop #{ctx.loop_count} ===")
                    console.print(f"\n[bold purple]=== Loop #{ctx.loop_count} ===[/bold purple]")

                    # V10.23: Record task for session tracking
                    ctx.record_task("loop_iteration", {"loop_count": ctx.loop_count})

                    # Run state machine for this iteration
                    self._run_state_machine()

                    # V10.23: Sync session context to RAG after each iteration
                    self._v10_23_sync_session_context()

                    # P4.2: Save Progress
                    if progress:
                        progress.save_progress(ctx)

                    # Check for exit
                    if ctx.should_exit:
                        break

                except KeyboardInterrupt:
                    action = interrupt_handler.handle_interrupt()
                    if action == "exit":
                        break
                    elif action == "shell":
                        start_interactive_shell(ctx)
                        # Optionally continue after shell
                        continue
                    elif action == "continue":
                        continue

        except Exception as e:
            # Fatal error in loop
            console.print(f"[bold red]Fatal Loop Error:[/bold red] {e}")
            log_status(ctx.log_dir, "CRITICAL", f"Fatal loop error: {e}")
            raise e

        # Cleanup
        if self._rag_watcher:
            self._rag_watcher.stop()
            log_status(ctx.log_dir, "INFO", "[RAG] File watcher stopped")

        BackupManager.cleanup_old_backups(keep_last=10)

        if ctx.exit_reason:
            console.print(f"[dim]Exit: {ctx.exit_reason}[/dim]")
        console.print("[dim]Agent loop finished.[/dim]")

        # BoringDone: Notify user that agent loop is complete
        if notify_done:
            success = not ctx.exit_reason or "error" not in ctx.exit_reason.lower()
            notify_done(
                task_name="Agent Loop",
                success=success,
                details=f"Completed {ctx.loop_count} iterations. {ctx.exit_reason or 'Ready for review.'}",
            )

    def _run_state_machine(self) -> None:
        """Execute the state machine for a single iteration."""
        # V11.0 System 2 Reasoning Trigger
        initial_state = ThinkingState()

        try:
            # Try to peek at prompt to assess complexity
            prompt_content = ""
            if self.context.prompt_file and self.context.prompt_file.exists():
                prompt_content = self.context.prompt_file.read_text(encoding="utf-8")

            if prompt_content:
                from ..mcp.tool_router import get_tool_router

                complexity = get_tool_router().assess_complexity(prompt_content)

                if complexity >= 0.7:
                    from .states.reasoning import ReasoningState

                    log_status(
                        self.context.log_dir,
                        "INFO",
                        f"[System 2] High complexity detected ({complexity:.2f}). Triggering Reasoning State.",
                    )
                    initial_state = ReasoningState()
        except Exception as e:
            log_status(self.context.log_dir, "DEBUG", f"Complexity assessment failed: {e}")

        # Start with chosen state
        self._current_state = initial_state

        while self._current_state is not None:
            state = self._current_state

            # Enter state
            state.on_enter(self.context)

            # Execute state logic
            result = state.handle(self.context)

            # Exit state
            state.on_exit(self.context)

            # Transition to next state
            self._current_state = state.next_state(self.context, result)

            # Log transition
            if self._current_state:
                log_status(
                    self.context.log_dir,
                    "INFO",
                    f"Transition: {state.name} -> {self._current_state.name}",
                )

    def _show_banner(self) -> None:
        """Display startup banner."""
        ctx = self.context
        console.print(
            Panel.fit(
                f"[bold green]Boring Autonomous Agent (v4.0 - State Pattern)[/bold green]\n"
                f"Mode: {'CLI' if ctx.use_cli else 'SDK'}\n"
                f"Model: {ctx.model_name}\n"
                f"Log Dir: {ctx.log_dir}",
                title="System Initialization",
            )
        )

    def _handle_circuit_breaker_open(self) -> bool:
        """Handle circuit breaker open state. Returns True if should continue."""
        try:
            from ..interactive import enter_interactive_mode

            should_resume = enter_interactive_mode(
                reason="Circuit Breaker OPEN - Too many consecutive failures",
                project_root=settings.PROJECT_ROOT,
                recent_errors=self.context.errors_this_loop,
            )

            if should_resume:
                console.print("[green]Resuming loop after interactive session...[/green]")
                return True
            else:
                console.print("[yellow]Aborting as requested.[/yellow]")
                return False

        except ImportError:
            console.print("[dim]Use 'boring reset-circuit' to reset manually.[/dim]")
            return False
        except KeyboardInterrupt:
            console.print("\n[yellow]Interrupted.[/yellow]")
            return False

    # =========================================================================
    # V10.23: Intelligence Integration Methods
    # =========================================================================

    def _v10_23_pre_loop_maintenance(self) -> None:
        """
        V10.23: Pre-loop maintenance tasks.

        - Compact context memory if needed
        - Clear stale caches
        - Update pattern decay scores
        """
        ctx = self.context

        try:
            # Memory compaction
            if hasattr(ctx, "compact_if_needed"):
                compacted = ctx.compact_if_needed()
                if compacted:
                    log_status(ctx.log_dir, "INFO", "[V10.23] Context memory compacted")

            # Every 10 loops, trigger brain pattern decay update
            if ctx.loop_count > 0 and ctx.loop_count % 10 == 0:
                try:
                    from ..brain_manager import BrainManager

                    brain = BrainManager(ctx.project_root, ctx.log_dir)
                    if hasattr(brain, "update_pattern_decay"):
                        brain.update_pattern_decay()
                        log_status(ctx.log_dir, "DEBUG", "[V10.23] Pattern decay updated")
                except Exception:
                    pass  # Brain operations are optional

        except Exception as e:
            log_status(ctx.log_dir, "WARN", f"[V10.23] Pre-loop maintenance failed: {e}")

    def _v10_23_sync_session_context(self) -> None:
        """
        V10.23: Sync session context to intelligence modules.

        Propagates current task context to:
        - RAG retriever (for task-aware search)
        - IntelligentRanker (for task-aware ranking)
        - AdaptiveCache (for prediction hints)
        """
        ctx = self.context

        try:
            # Get session context from LoopContext
            if hasattr(ctx, "get_session_context_for_rag"):
                session_data = ctx.get_session_context_for_rag()

                if session_data:
                    # Sync to RAG
                    try:
                        from ..rag.rag_retriever import set_session_context

                        set_session_context(
                            task_type=session_data.get("task_type", "general"),
                            keywords=session_data.get("keywords", []),
                        )
                    except Exception:
                        pass  # RAG sync is optional

                    # Sync to IntelligentRanker
                    try:
                        from ..intelligence.intelligent_ranker import IntelligentRanker

                        ranker = IntelligentRanker(ctx.project_root)
                        ranker.set_session_context(
                            session_id=f"loop_{ctx.loop_count}",
                            task_type=session_data.get("task_type", "general"),
                            file_focus=session_data.get("recent_files", [])[:5],
                            error_context=session_data.get("recent_errors", [""])[0]
                            if session_data.get("recent_errors")
                            else "",
                        )
                    except Exception:
                        pass  # Ranker sync is optional

        except Exception as e:
            log_status(ctx.log_dir, "DEBUG", f"[V10.23] Session sync skipped: {e}")

    def _v10_23_record_loop_result(self, success: bool, error_msg: str = "") -> None:
        """
        V10.23: Record loop result for learning.

        Args:
            success: Whether the loop iteration succeeded
            error_msg: Error message if failed
        """
        ctx = self.context

        try:
            if not success and error_msg:
                # Record error for pattern learning
                if hasattr(ctx, "record_error"):
                    ctx.record_error(error_msg)

                # Also record in PredictiveAnalyzer for session insights
                try:
                    from ..intelligence.predictive_analyzer import PredictiveAnalyzer

                    analyzer = PredictiveAnalyzer(ctx.project_root, ctx.log_dir)
                    if hasattr(analyzer, "record_session_error"):
                        analyzer.record_session_error(
                            error_message=error_msg,
                            file_path="",  # Could be enhanced with actual file
                            session_id=f"loop_{ctx.loop_count}",
                        )
                except Exception:
                    pass

        except Exception as e:
            log_status(ctx.log_dir, "DEBUG", f"[V10.23] Result recording failed: {e}")

    def _init_progress_manager(self) -> Any | None:
        """Initialize progress manager safely."""
        try:
            from ..services.progress import ProgressManager

            return ProgressManager(self.context.project_root)
        except ImportError:
            return None

__init__(model_name=settings.DEFAULT_MODEL, use_cli=False, verbose=False, prompt_file=None, context_file=None, verification_level='STANDARD', interactive=False)

Initialize the agent loop with configuration.

Source code in src/boring/loop/agent.py
def __init__(
    self,
    model_name: str = settings.DEFAULT_MODEL,
    use_cli: bool = False,
    verbose: bool = False,
    prompt_file: Path | None = None,
    context_file: Path | None = None,
    verification_level: str = "STANDARD",
    interactive: bool = False,
):
    """Initialize the agent loop with configuration."""
    init_directories()

    # Create shared context
    self.context = LoopContext(
        model_name=model_name,
        use_cli=use_cli,
        verbose=verbose,
        interactive=interactive,
        verification_level=verification_level.upper(),
        project_root=settings.PROJECT_ROOT,
        log_dir=settings.LOG_DIR,
        prompt_file=prompt_file or settings.PROJECT_ROOT / settings.PROMPT_FILE,
    )

    # Initialize subsystems
    self._init_subsystems()

    # Initial state
    self._current_state: LoopState | None = None

    # RAG Watcher for auto-indexing
    self._rag_watcher: RAGWatcher | None = None

run()

Execute the main loop using state machine.

Source code in src/boring/loop/agent.py
def run(self) -> None:
    """Execute the main loop using state machine."""
    ctx = self.context

    # Check circuit breaker
    if should_halt_execution():
        console.print("[bold red]Circuit Breaker is OPEN. Execution halted.[/bold red]")
        log_status(ctx.log_dir, "CRITICAL", "Circuit Breaker is OPEN")

        if not self._handle_circuit_breaker_open():
            return

    # Display startup banner
    self._show_banner()

    # P4.2: Progress Persistence - Resume Session?
    progress = self._init_progress_manager()
    if progress and progress.has_progress():
        from rich.prompt import Confirm

        # Only ask if interactive or strict mode, otherwise maybe auto-resume?
        # For safety, let's ask if console is interactive.
        if not console.quiet and Confirm.ask(
            "[bold magenta]Found previous saved session. Resume?[/bold magenta]"
        ):
            if progress.restore_context(ctx):
                console.print(f"[green]Restored session at Loop #{ctx.loop_count}[/green]")
                log_status(
                    ctx.log_dir, "INFO", f"Restored session state (Loop {ctx.loop_count})"
                )

    # Initialize tracking files
    from boring.paths import BoringPaths

    bp = BoringPaths(ctx.project_root)

    init_call_tracking(
        bp.state / ".call_count",
        bp.state / ".last_reset",
        bp.state / ".exit_signals",
    )

    # Deadlock Detector State
    _error_history = []

    # Start RAG Watcher (auto-index on file changes)
    if self._rag_watcher:
        try:
            from ..rag import create_rag_retriever

            retriever = create_rag_retriever(ctx.project_root)

            def on_file_change():
                try:
                    retriever.build_index(incremental=True)
                    log_status(ctx.log_dir, "INFO", "[RAG] Incremental index complete")
                except Exception as e:
                    log_status(ctx.log_dir, "WARN", f"[RAG] Re-index failed: {e}")

            self._rag_watcher.start(on_change=on_file_change)
            log_status(ctx.log_dir, "INFO", "[RAG] File watcher started")
        except ImportError:
            log_status(ctx.log_dir, "WARN", "[RAG] Watcher disabled (chromadb not installed)")
        except Exception as e:
            log_status(ctx.log_dir, "WARN", f"[RAG] Watcher failed to start: {e}")

    # Main loop
    try:
        from ..services.interrupt import InterruptHandler, start_interactive_shell

        interrupt_handler = InterruptHandler(
            save_callback=lambda: progress.save_progress(ctx) if progress else None
        )

        while ctx.should_continue():
            try:
                # Check rate limits
                if not can_make_call(
                    settings.PROJECT_ROOT / ".call_count", settings.MAX_HOURLY_CALLS
                ):
                    if console.quiet:
                        ctx.mark_exit("Rate limit reached (Quiet/MCP mode)")
                        break
                    wait_for_reset(
                        settings.PROJECT_ROOT / ".call_count",
                        settings.PROJECT_ROOT / ".last_reset",
                        settings.MAX_HOURLY_CALLS,
                    )
                    console.print("[yellow]Rate limit reset. Resuming...[/yellow]")

                # V10.23: Memory compaction check before each loop
                self._v10_23_pre_loop_maintenance()

                # Deadlock Detection (The "Stop Being Stupid" Fix)
                current_errors = str(sorted(ctx.errors_this_loop))
                if current_errors:
                    _error_history.append(current_errors)
                    if len(_error_history) >= 3:
                        # Check if last 3 loops had SAME error set
                        if _error_history[-1] == _error_history[-2] == _error_history[-3]:
                            console.print(
                                "[bold red]🛑 Deadlock Detected: Same error persisted for 3 loops.[/bold red]"
                            )
                            ctx.mark_exit("Deadlock Detected (Infinite Error Loop)")
                            break
                    # Keep history manageable
                    if len(_error_history) > 10:
                        _error_history.pop(0)

                # Start new iteration
                ctx.increment_loop()
                log_status(ctx.log_dir, "LOOP", f"=== Starting Loop #{ctx.loop_count} ===")
                console.print(f"\n[bold purple]=== Loop #{ctx.loop_count} ===[/bold purple]")

                # V10.23: Record task for session tracking
                ctx.record_task("loop_iteration", {"loop_count": ctx.loop_count})

                # Run state machine for this iteration
                self._run_state_machine()

                # V10.23: Sync session context to RAG after each iteration
                self._v10_23_sync_session_context()

                # P4.2: Save Progress
                if progress:
                    progress.save_progress(ctx)

                # Check for exit
                if ctx.should_exit:
                    break

            except KeyboardInterrupt:
                action = interrupt_handler.handle_interrupt()
                if action == "exit":
                    break
                elif action == "shell":
                    start_interactive_shell(ctx)
                    # Optionally continue after shell
                    continue
                elif action == "continue":
                    continue

    except Exception as e:
        # Fatal error in loop
        console.print(f"[bold red]Fatal Loop Error:[/bold red] {e}")
        log_status(ctx.log_dir, "CRITICAL", f"Fatal loop error: {e}")
        raise e

    # Cleanup
    if self._rag_watcher:
        self._rag_watcher.stop()
        log_status(ctx.log_dir, "INFO", "[RAG] File watcher stopped")

    BackupManager.cleanup_old_backups(keep_last=10)

    if ctx.exit_reason:
        console.print(f"[dim]Exit: {ctx.exit_reason}[/dim]")
    console.print("[dim]Agent loop finished.[/dim]")

    # BoringDone: Notify user that agent loop is complete
    if notify_done:
        success = not ctx.exit_reason or "error" not in ctx.exit_reason.lower()
        notify_done(
            task_name="Agent Loop",
            success=success,
            details=f"Completed {ctx.loop_count} iterations. {ctx.exit_reason or 'Ready for review.'}",
        )

BackgroundTask dataclass

Represents a background task.

Source code in src/boring/loop/background_agent.py
@dataclass
class BackgroundTask:
    """Represents a background task."""

    task_id: str
    name: str
    status: str  # pending, running, completed, failed
    created_at: datetime
    started_at: datetime | None = None
    completed_at: datetime | None = None
    result: Any = None
    error: str | None = None
    progress: int = 0  # 0-100

BackgroundTaskRunner

Manages background task execution using thread pool.

Features: - Non-blocking task submission - Status tracking - Result retrieval

Source code in src/boring/loop/background_agent.py
class BackgroundTaskRunner:
    """
    Manages background task execution using thread pool.

    Features:
    - Non-blocking task submission
    - Status tracking
    - Result retrieval
    """

    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._initialized = False
            return cls._instance

    def __init__(self, max_workers: int = 4):
        if self._initialized:
            return
        self._initialized = True
        self.executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="boring-bg-")
        self.tasks: dict[str, BackgroundTask] = {}
        self.futures: dict[str, Any] = {}

    def submit(
        self,
        func: Callable,
        *args: Any,
        name: str = "Background task",
        **kwargs: Any,
    ) -> str:
        """
        Submit a task for background execution.

        Args:
            func: Function to execute
            name: Task name for display
            *args: Arguments to pass to func
            **kwargs: Keyword arguments to pass to func

        Returns:
            task_id for tracking
        """
        task_id = f"task-{uuid.uuid4().hex[:8]}"

        task = BackgroundTask(
            task_id=task_id,
            name=name,
            status="pending",
            created_at=datetime.now(),
        )
        self.tasks[task_id] = task

        def wrapper():
            task.status = "running"
            task.started_at = datetime.now()
            try:
                result = func(*args, **kwargs)
                task.result = result
                task.status = "completed"
            except Exception as e:
                task.error = str(e)
                task.status = "failed"
            finally:
                task.completed_at = datetime.now()

        future = self.executor.submit(wrapper)
        self.futures[task_id] = future

        return task_id

    def get_status(self, task_id: str) -> dict:
        """Get status of a task."""
        task = self.tasks.get(task_id)
        if not task:
            return {"status": "not_found", "task_id": task_id}

        return {
            "task_id": task.task_id,
            "name": task.name,
            "status": task.status,
            "created_at": task.created_at.isoformat(),
            "started_at": task.started_at.isoformat() if task.started_at else None,
            "completed_at": task.completed_at.isoformat() if task.completed_at else None,
            "progress": task.progress,
            "result": task.result if task.status == "completed" else None,
            "error": task.error,
        }

    def get_result(self, task_id: str, timeout: float = None) -> dict:
        """Wait for and get result of a task."""
        future = self.futures.get(task_id)
        if not future:
            return {"status": "not_found", "task_id": task_id}

        try:
            future.result(timeout=timeout)
        except Exception:
            pass

        return self.get_status(task_id)

    def list_tasks(self, status_filter: str = None) -> list[dict]:
        """List all tasks, optionally filtered by status."""
        tasks = []
        for task in self.tasks.values():
            if status_filter and task.status != status_filter:
                continue
            tasks.append(
                {
                    "task_id": task.task_id,
                    "name": task.name,
                    "status": task.status,
                    "created_at": task.created_at.isoformat(),
                }
            )
        return sorted(tasks, key=lambda x: x["created_at"], reverse=True)

    def cancel(self, task_id: str) -> dict:
        """Attempt to cancel a pending task."""
        future = self.futures.get(task_id)
        if not future:
            return {"status": "not_found", "task_id": task_id}

        task = self.tasks[task_id]
        if task.status != "pending":
            return {"status": "cannot_cancel", "message": f"Task is {task.status}"}

        if future.cancel():
            task.status = "cancelled"
            return {"status": "cancelled", "task_id": task_id}

        return {"status": "cancel_failed", "task_id": task_id}

    def shutdown(self, wait: bool = True):
        """Shutdown the executor."""
        self.executor.shutdown(wait=wait)

submit(func, *args, name='Background task', **kwargs)

Submit a task for background execution.

Parameters:

Name Type Description Default
func Callable

Function to execute

required
name str

Task name for display

'Background task'
*args Any

Arguments to pass to func

()
**kwargs Any

Keyword arguments to pass to func

{}

Returns:

Type Description
str

task_id for tracking

Source code in src/boring/loop/background_agent.py
def submit(
    self,
    func: Callable,
    *args: Any,
    name: str = "Background task",
    **kwargs: Any,
) -> str:
    """
    Submit a task for background execution.

    Args:
        func: Function to execute
        name: Task name for display
        *args: Arguments to pass to func
        **kwargs: Keyword arguments to pass to func

    Returns:
        task_id for tracking
    """
    task_id = f"task-{uuid.uuid4().hex[:8]}"

    task = BackgroundTask(
        task_id=task_id,
        name=name,
        status="pending",
        created_at=datetime.now(),
    )
    self.tasks[task_id] = task

    def wrapper():
        task.status = "running"
        task.started_at = datetime.now()
        try:
            result = func(*args, **kwargs)
            task.result = result
            task.status = "completed"
        except Exception as e:
            task.error = str(e)
            task.status = "failed"
        finally:
            task.completed_at = datetime.now()

    future = self.executor.submit(wrapper)
    self.futures[task_id] = future

    return task_id

get_status(task_id)

Get status of a task.

Source code in src/boring/loop/background_agent.py
def get_status(self, task_id: str) -> dict:
    """Get status of a task."""
    task = self.tasks.get(task_id)
    if not task:
        return {"status": "not_found", "task_id": task_id}

    return {
        "task_id": task.task_id,
        "name": task.name,
        "status": task.status,
        "created_at": task.created_at.isoformat(),
        "started_at": task.started_at.isoformat() if task.started_at else None,
        "completed_at": task.completed_at.isoformat() if task.completed_at else None,
        "progress": task.progress,
        "result": task.result if task.status == "completed" else None,
        "error": task.error,
    }

get_result(task_id, timeout=None)

Wait for and get result of a task.

Source code in src/boring/loop/background_agent.py
def get_result(self, task_id: str, timeout: float = None) -> dict:
    """Wait for and get result of a task."""
    future = self.futures.get(task_id)
    if not future:
        return {"status": "not_found", "task_id": task_id}

    try:
        future.result(timeout=timeout)
    except Exception:
        pass

    return self.get_status(task_id)

list_tasks(status_filter=None)

List all tasks, optionally filtered by status.

Source code in src/boring/loop/background_agent.py
def list_tasks(self, status_filter: str = None) -> list[dict]:
    """List all tasks, optionally filtered by status."""
    tasks = []
    for task in self.tasks.values():
        if status_filter and task.status != status_filter:
            continue
        tasks.append(
            {
                "task_id": task.task_id,
                "name": task.name,
                "status": task.status,
                "created_at": task.created_at.isoformat(),
            }
        )
    return sorted(tasks, key=lambda x: x["created_at"], reverse=True)

cancel(task_id)

Attempt to cancel a pending task.

Source code in src/boring/loop/background_agent.py
def cancel(self, task_id: str) -> dict:
    """Attempt to cancel a pending task."""
    future = self.futures.get(task_id)
    if not future:
        return {"status": "not_found", "task_id": task_id}

    task = self.tasks[task_id]
    if task.status != "pending":
        return {"status": "cannot_cancel", "message": f"Task is {task.status}"}

    if future.cancel():
        task.status = "cancelled"
        return {"status": "cancelled", "task_id": task_id}

    return {"status": "cancel_failed", "task_id": task_id}

shutdown(wait=True)

Shutdown the executor.

Source code in src/boring/loop/background_agent.py
def shutdown(self, wait: bool = True):
    """Shutdown the executor."""
    self.executor.shutdown(wait=wait)

LoopState

Bases: ABC

Abstract base class for all loop states.

Each state encapsulates: 1. The logic to execute (handle) 2. The decision of what state comes next (next_state)

Usage

state = ThinkingState() state.handle(context) # Execute logic next_state = state.next_state(context) # Get transition

Source code in src/boring/loop/base.py
class LoopState(ABC):
    """
    Abstract base class for all loop states.

    Each state encapsulates:
    1. The logic to execute (handle)
    2. The decision of what state comes next (next_state)

    Usage:
        state = ThinkingState()
        state.handle(context)  # Execute logic
        next_state = state.next_state(context)  # Get transition
    """

    @property
    @abstractmethod
    def name(self) -> str:
        """Human-readable state name for logging/telemetry."""
        pass

    @abstractmethod
    def handle(self, context: "LoopContext") -> StateResult:
        """
        Execute the state's logic.

        Args:
            context: Shared mutable context containing all loop state

        Returns:
            StateResult indicating outcome for transition logic
        """
        pass

    @abstractmethod
    def next_state(self, context: "LoopContext", result: StateResult) -> Optional["LoopState"]:
        """
        Determine the next state based on context and execution result.

        Args:
            context: Current loop context
            result: Result from handle() execution

        Returns:
            Next state instance, or None to exit the loop
        """
        pass

    def on_enter(self, context: "LoopContext") -> None:
        """Optional hook called when entering this state."""
        pass

    def on_exit(self, context: "LoopContext") -> None:
        """Optional hook called when leaving this state."""
        pass

name abstractmethod property

Human-readable state name for logging/telemetry.

handle(context) abstractmethod

Execute the state's logic.

Parameters:

Name Type Description Default
context LoopContext

Shared mutable context containing all loop state

required

Returns:

Type Description
StateResult

StateResult indicating outcome for transition logic

Source code in src/boring/loop/base.py
@abstractmethod
def handle(self, context: "LoopContext") -> StateResult:
    """
    Execute the state's logic.

    Args:
        context: Shared mutable context containing all loop state

    Returns:
        StateResult indicating outcome for transition logic
    """
    pass

next_state(context, result) abstractmethod

Determine the next state based on context and execution result.

Parameters:

Name Type Description Default
context LoopContext

Current loop context

required
result StateResult

Result from handle() execution

required

Returns:

Type Description
Optional[LoopState]

Next state instance, or None to exit the loop

Source code in src/boring/loop/base.py
@abstractmethod
def next_state(self, context: "LoopContext", result: StateResult) -> Optional["LoopState"]:
    """
    Determine the next state based on context and execution result.

    Args:
        context: Current loop context
        result: Result from handle() execution

    Returns:
        Next state instance, or None to exit the loop
    """
    pass

on_enter(context)

Optional hook called when entering this state.

Source code in src/boring/loop/base.py
def on_enter(self, context: "LoopContext") -> None:
    """Optional hook called when entering this state."""
    pass

on_exit(context)

Optional hook called when leaving this state.

Source code in src/boring/loop/base.py
def on_exit(self, context: "LoopContext") -> None:
    """Optional hook called when leaving this state."""
    pass

LoopContext dataclass

Shared mutable context passed between all states (V10.23 Enhanced).

This replaces the scattered instance variables in the old AgentLoop, providing a clean, explicit container for loop state.

V10.23 features: - Sliding window for error/task history - Memory usage tracking - Task context for RAG integration

Source code in src/boring/loop/context.py
@dataclass
class LoopContext:
    """
    Shared mutable context passed between all states (V10.23 Enhanced).

    This replaces the scattered instance variables in the old AgentLoop,
    providing a clean, explicit container for loop state.

    V10.23 features:
    - Sliding window for error/task history
    - Memory usage tracking
    - Task context for RAG integration
    """

    # === Configuration (immutable during loop) ===
    model_name: str = settings.DEFAULT_MODEL
    use_cli: bool = False
    verbose: bool = False
    interactive: bool = False
    verification_level: str = "STANDARD"
    project_root: Path = field(default_factory=lambda: settings.PROJECT_ROOT)
    log_dir: Path = field(default_factory=lambda: settings.LOG_DIR)
    prompt_file: Path = field(default_factory=lambda: settings.PROJECT_ROOT / settings.PROMPT_FILE)

    # === Injected Subsystems ===
    gemini_client: Optional["GeminiClient"] = None
    memory: Optional["MemoryManager"] = None
    verifier: Optional["CodeVerifier"] = None
    storage: Optional["SQLiteStorage"] = None
    extensions: Optional["ExtensionsManager"] = None

    # === Loop Counters ===
    loop_count: int = 0
    max_loops: int = field(default_factory=lambda: settings.MAX_LOOPS)
    retry_count: int = 0
    max_retries: int = 3
    empty_output_count: int = 0

    # === Timing ===
    loop_start_time: float = 0.0
    state_start_time: float = 0.0

    # === Generation State ===
    output_content: str = ""
    output_file: Path | None = None
    function_calls: list[dict[str, Any]] = field(default_factory=list)
    status_report: dict[str, Any] | None = None

    # === Patching State ===
    files_modified: list[str] = field(default_factory=list)
    files_created: list[str] = field(default_factory=list)
    patch_errors: list[str] = field(default_factory=list)

    # === Verification State ===
    verification_passed: bool = False
    verification_error: str = ""

    # === Control Flags ===
    should_exit: bool = False
    exit_reason: str = ""

    # === Accumulated Errors ===
    errors_this_loop: list[str] = field(default_factory=list)
    tasks_completed: list[str] = field(default_factory=list)

    # === V10.23: Enhanced State Tracking ===
    error_history: list[dict[str, Any]] = field(default_factory=list)  # Sliding window
    task_history: list[dict[str, Any]] = field(default_factory=list)  # Sliding window
    file_access_history: list[str] = field(default_factory=list)  # Sliding window
    current_task_type: str = "general"  # "debugging", "feature", "refactoring", "testing"
    session_keywords: list[str] = field(default_factory=list)
    memory_warnings: int = 0
    prompt_cache: dict[str, Any] = field(default_factory=dict)

    def start_loop(self) -> None:
        """Reset per-loop state at beginning of each iteration."""
        self.loop_start_time = time.time()
        self.output_content = ""
        self.output_file = None
        self.function_calls = []
        self.status_report = None
        self.files_modified = []
        self.files_created = []
        self.patch_errors = []
        self.verification_passed = False
        self.verification_error = ""
        self.errors_this_loop = []
        self.tasks_completed = []

    def start_state(self) -> None:
        """Record state entry time for telemetry."""
        self.state_start_time = time.time()

    def get_state_duration(self) -> float:
        """Get duration of current state in seconds."""
        return time.time() - self.state_start_time

    def get_loop_duration(self) -> float:
        """Get duration of current loop in seconds."""
        return time.time() - self.loop_start_time

    def increment_loop(self) -> None:
        """Increment loop counter and reset retry count."""
        self.loop_count += 1
        self.retry_count = 0
        self.start_loop()

    def increment_retry(self) -> None:
        """Increment retry counter."""
        self.retry_count += 1

    def can_retry(self) -> bool:
        """Check if more retries are allowed."""
        return self.retry_count < self.max_retries

    def should_continue(self) -> bool:
        """Check if loop should continue."""
        return not self.should_exit and self.loop_count < self.max_loops

    def mark_exit(self, reason: str) -> None:
        """Mark loop for exit with reason."""
        self.should_exit = True
        self.exit_reason = reason

    # =========================================================================
    # V10.23: Enhanced Memory Management Methods
    # =========================================================================

    def record_error(self, error_type: str, error_msg: str, file_path: str = "") -> None:
        """
        V10.23: Record an error with sliding window management.

        Keeps only the most recent MAX_ERROR_HISTORY errors.
        """
        self.error_history.append(
            {
                "type": error_type,
                "message": error_msg[:500],  # Truncate long messages
                "file": file_path,
                "timestamp": time.time(),
                "loop": self.loop_count,
            }
        )

        # Sliding window: keep only recent errors
        if len(self.error_history) > MAX_ERROR_HISTORY:
            self.error_history = self.error_history[-MAX_ERROR_HISTORY:]

    def record_task(self, task_description: str, status: str = "completed") -> None:
        """
        V10.23: Record a completed task with sliding window management.
        """
        self.task_history.append(
            {
                "description": task_description[:200],
                "status": status,
                "timestamp": time.time(),
                "loop": self.loop_count,
            }
        )

        # Sliding window
        if len(self.task_history) > MAX_TASK_HISTORY:
            self.task_history = self.task_history[-MAX_TASK_HISTORY:]

    def record_file_access(self, file_path: str) -> None:
        """
        V10.23: Record file access for RAG context.
        """
        # Avoid duplicates in recent history
        if file_path not in self.file_access_history[-10:]:
            self.file_access_history.append(file_path)

        # Sliding window
        if len(self.file_access_history) > MAX_FILE_HISTORY:
            self.file_access_history = self.file_access_history[-MAX_FILE_HISTORY:]

    def set_task_context(self, task_type: str, keywords: list[str] | None = None) -> None:
        """
        V10.23: Set the current task context for RAG integration.

        Args:
            task_type: One of "debugging", "feature", "refactoring", "testing", "general"
            keywords: Keywords extracted from the current task
        """
        self.current_task_type = task_type
        if keywords:
            # Merge with existing, keeping unique
            existing = set(self.session_keywords)
            existing.update(keywords)
            self.session_keywords = list(existing)[-50:]  # Keep last 50

    def get_recent_focus_files(self, limit: int = 5) -> list[str]:
        """
        V10.23: Get recently accessed files for RAG focus.
        """
        # Deduplicate while preserving order (most recent first)
        seen = set()
        result = []
        for f in reversed(self.file_access_history):
            if f not in seen:
                seen.add(f)
                result.append(f)
                if len(result) >= limit:
                    break
        return result

    def get_error_summary(self) -> dict[str, int]:
        """
        V10.23: Get summary of recent errors by type.
        """
        from collections import Counter

        return dict(Counter(e["type"] for e in self.error_history))

    def get_session_context_for_rag(self) -> dict:
        """
        V10.23: Get session context formatted for RAG retriever.
        """
        return {
            "task_type": self.current_task_type,
            "focus_files": self.get_recent_focus_files(),
            "keywords": self.session_keywords[-20:],  # Last 20 keywords
            "recent_errors": [e["type"] for e in self.error_history[-5:]],
        }

    def estimate_memory_usage(self) -> dict[str, int]:
        """
        V10.23: Estimate memory usage of context data.
        """
        import sys

        def safe_sizeof(obj) -> int:
            try:
                return sys.getsizeof(obj)
            except TypeError:
                return 0

        return {
            "error_history": safe_sizeof(self.error_history),
            "task_history": safe_sizeof(self.task_history),
            "file_access_history": safe_sizeof(self.file_access_history),
            "function_calls": safe_sizeof(self.function_calls),
            "output_content": len(self.output_content),
            "total_estimate": (
                safe_sizeof(self.error_history)
                + safe_sizeof(self.task_history)
                + safe_sizeof(self.file_access_history)
                + safe_sizeof(self.function_calls)
                + len(self.output_content)
            ),
        }

    def compact_if_needed(self, threshold_kb: int = 1024) -> bool:
        """
        V10.23: Compact history if memory exceeds threshold.

        Returns:
            True if compaction was performed
        """
        memory = self.estimate_memory_usage()
        total_kb = memory["total_estimate"] / 1024

        if total_kb > threshold_kb:
            self.memory_warnings += 1

            # Aggressive compaction
            self.error_history = self.error_history[-20:]
            self.task_history = self.task_history[-30:]
            self.file_access_history = self.file_access_history[-50:]
            self.session_keywords = self.session_keywords[-20:]

            return True
        return False

    def get_context_summary(self) -> str:
        """
        V10.23: Get a human-readable summary of context state.
        """
        memory = self.estimate_memory_usage()
        error_summary = self.get_error_summary()

        lines = [
            "📊 Loop Context Summary (V10.23)",
            f"├─ Loop: {self.loop_count}/{self.max_loops}",
            f"├─ Task Type: {self.current_task_type}",
            f"├─ Files Accessed: {len(self.file_access_history)}",
            f"├─ Errors Recorded: {len(self.error_history)}",
        ]

        if error_summary:
            top_errors = sorted(error_summary.items(), key=lambda x: x[1], reverse=True)[:3]
            lines.append(f"│  └─ Top: {', '.join(f'{k}({v})' for k, v in top_errors)}")

        lines.extend(
            [
                f"├─ Tasks Completed: {len(self.task_history)}",
                f"├─ Memory Usage: ~{memory['total_estimate'] / 1024:.1f} KB",
                f"└─ Memory Warnings: {self.memory_warnings}",
            ]
        )

        return "\n".join(lines)

start_loop()

Reset per-loop state at beginning of each iteration.

Source code in src/boring/loop/context.py
def start_loop(self) -> None:
    """Reset per-loop state at beginning of each iteration."""
    self.loop_start_time = time.time()
    self.output_content = ""
    self.output_file = None
    self.function_calls = []
    self.status_report = None
    self.files_modified = []
    self.files_created = []
    self.patch_errors = []
    self.verification_passed = False
    self.verification_error = ""
    self.errors_this_loop = []
    self.tasks_completed = []

start_state()

Record state entry time for telemetry.

Source code in src/boring/loop/context.py
def start_state(self) -> None:
    """Record state entry time for telemetry."""
    self.state_start_time = time.time()

get_state_duration()

Get duration of current state in seconds.

Source code in src/boring/loop/context.py
def get_state_duration(self) -> float:
    """Get duration of current state in seconds."""
    return time.time() - self.state_start_time

get_loop_duration()

Get duration of current loop in seconds.

Source code in src/boring/loop/context.py
def get_loop_duration(self) -> float:
    """Get duration of current loop in seconds."""
    return time.time() - self.loop_start_time

increment_loop()

Increment loop counter and reset retry count.

Source code in src/boring/loop/context.py
def increment_loop(self) -> None:
    """Increment loop counter and reset retry count."""
    self.loop_count += 1
    self.retry_count = 0
    self.start_loop()

increment_retry()

Increment retry counter.

Source code in src/boring/loop/context.py
def increment_retry(self) -> None:
    """Increment retry counter."""
    self.retry_count += 1

can_retry()

Check if more retries are allowed.

Source code in src/boring/loop/context.py
def can_retry(self) -> bool:
    """Check if more retries are allowed."""
    return self.retry_count < self.max_retries

should_continue()

Check if loop should continue.

Source code in src/boring/loop/context.py
def should_continue(self) -> bool:
    """Check if loop should continue."""
    return not self.should_exit and self.loop_count < self.max_loops

mark_exit(reason)

Mark loop for exit with reason.

Source code in src/boring/loop/context.py
def mark_exit(self, reason: str) -> None:
    """Mark loop for exit with reason."""
    self.should_exit = True
    self.exit_reason = reason

record_error(error_type, error_msg, file_path='')

V10.23: Record an error with sliding window management.

Keeps only the most recent MAX_ERROR_HISTORY errors.

Source code in src/boring/loop/context.py
def record_error(self, error_type: str, error_msg: str, file_path: str = "") -> None:
    """
    V10.23: Record an error with sliding window management.

    Keeps only the most recent MAX_ERROR_HISTORY errors.
    """
    self.error_history.append(
        {
            "type": error_type,
            "message": error_msg[:500],  # Truncate long messages
            "file": file_path,
            "timestamp": time.time(),
            "loop": self.loop_count,
        }
    )

    # Sliding window: keep only recent errors
    if len(self.error_history) > MAX_ERROR_HISTORY:
        self.error_history = self.error_history[-MAX_ERROR_HISTORY:]

record_task(task_description, status='completed')

V10.23: Record a completed task with sliding window management.

Source code in src/boring/loop/context.py
def record_task(self, task_description: str, status: str = "completed") -> None:
    """
    V10.23: Record a completed task with sliding window management.
    """
    self.task_history.append(
        {
            "description": task_description[:200],
            "status": status,
            "timestamp": time.time(),
            "loop": self.loop_count,
        }
    )

    # Sliding window
    if len(self.task_history) > MAX_TASK_HISTORY:
        self.task_history = self.task_history[-MAX_TASK_HISTORY:]

record_file_access(file_path)

V10.23: Record file access for RAG context.

Source code in src/boring/loop/context.py
def record_file_access(self, file_path: str) -> None:
    """
    V10.23: Record file access for RAG context.
    """
    # Avoid duplicates in recent history
    if file_path not in self.file_access_history[-10:]:
        self.file_access_history.append(file_path)

    # Sliding window
    if len(self.file_access_history) > MAX_FILE_HISTORY:
        self.file_access_history = self.file_access_history[-MAX_FILE_HISTORY:]

set_task_context(task_type, keywords=None)

V10.23: Set the current task context for RAG integration.

Parameters:

Name Type Description Default
task_type str

One of "debugging", "feature", "refactoring", "testing", "general"

required
keywords list[str] | None

Keywords extracted from the current task

None
Source code in src/boring/loop/context.py
def set_task_context(self, task_type: str, keywords: list[str] | None = None) -> None:
    """
    V10.23: Set the current task context for RAG integration.

    Args:
        task_type: One of "debugging", "feature", "refactoring", "testing", "general"
        keywords: Keywords extracted from the current task
    """
    self.current_task_type = task_type
    if keywords:
        # Merge with existing, keeping unique
        existing = set(self.session_keywords)
        existing.update(keywords)
        self.session_keywords = list(existing)[-50:]  # Keep last 50

get_recent_focus_files(limit=5)

V10.23: Get recently accessed files for RAG focus.

Source code in src/boring/loop/context.py
def get_recent_focus_files(self, limit: int = 5) -> list[str]:
    """
    V10.23: Get recently accessed files for RAG focus.
    """
    # Deduplicate while preserving order (most recent first)
    seen = set()
    result = []
    for f in reversed(self.file_access_history):
        if f not in seen:
            seen.add(f)
            result.append(f)
            if len(result) >= limit:
                break
    return result

get_error_summary()

V10.23: Get summary of recent errors by type.

Source code in src/boring/loop/context.py
def get_error_summary(self) -> dict[str, int]:
    """
    V10.23: Get summary of recent errors by type.
    """
    from collections import Counter

    return dict(Counter(e["type"] for e in self.error_history))

get_session_context_for_rag()

V10.23: Get session context formatted for RAG retriever.

Source code in src/boring/loop/context.py
def get_session_context_for_rag(self) -> dict:
    """
    V10.23: Get session context formatted for RAG retriever.
    """
    return {
        "task_type": self.current_task_type,
        "focus_files": self.get_recent_focus_files(),
        "keywords": self.session_keywords[-20:],  # Last 20 keywords
        "recent_errors": [e["type"] for e in self.error_history[-5:]],
    }

estimate_memory_usage()

V10.23: Estimate memory usage of context data.

Source code in src/boring/loop/context.py
def estimate_memory_usage(self) -> dict[str, int]:
    """
    V10.23: Estimate memory usage of context data.
    """
    import sys

    def safe_sizeof(obj) -> int:
        try:
            return sys.getsizeof(obj)
        except TypeError:
            return 0

    return {
        "error_history": safe_sizeof(self.error_history),
        "task_history": safe_sizeof(self.task_history),
        "file_access_history": safe_sizeof(self.file_access_history),
        "function_calls": safe_sizeof(self.function_calls),
        "output_content": len(self.output_content),
        "total_estimate": (
            safe_sizeof(self.error_history)
            + safe_sizeof(self.task_history)
            + safe_sizeof(self.file_access_history)
            + safe_sizeof(self.function_calls)
            + len(self.output_content)
        ),
    }

compact_if_needed(threshold_kb=1024)

V10.23: Compact history if memory exceeds threshold.

Returns:

Type Description
bool

True if compaction was performed

Source code in src/boring/loop/context.py
def compact_if_needed(self, threshold_kb: int = 1024) -> bool:
    """
    V10.23: Compact history if memory exceeds threshold.

    Returns:
        True if compaction was performed
    """
    memory = self.estimate_memory_usage()
    total_kb = memory["total_estimate"] / 1024

    if total_kb > threshold_kb:
        self.memory_warnings += 1

        # Aggressive compaction
        self.error_history = self.error_history[-20:]
        self.task_history = self.task_history[-30:]
        self.file_access_history = self.file_access_history[-50:]
        self.session_keywords = self.session_keywords[-20:]

        return True
    return False

get_context_summary()

V10.23: Get a human-readable summary of context state.

Source code in src/boring/loop/context.py
def get_context_summary(self) -> str:
    """
    V10.23: Get a human-readable summary of context state.
    """
    memory = self.estimate_memory_usage()
    error_summary = self.get_error_summary()

    lines = [
        "📊 Loop Context Summary (V10.23)",
        f"├─ Loop: {self.loop_count}/{self.max_loops}",
        f"├─ Task Type: {self.current_task_type}",
        f"├─ Files Accessed: {len(self.file_access_history)}",
        f"├─ Errors Recorded: {len(self.error_history)}",
    ]

    if error_summary:
        top_errors = sorted(error_summary.items(), key=lambda x: x[1], reverse=True)[:3]
        lines.append(f"│  └─ Top: {', '.join(f'{k}({v})' for k, v in top_errors)}")

    lines.extend(
        [
            f"├─ Tasks Completed: {len(self.task_history)}",
            f"├─ Memory Usage: ~{memory['total_estimate'] / 1024:.1f} KB",
            f"└─ Memory Warnings: {self.memory_warnings}",
        ]
    )

    return "\n".join(lines)

AgentLoop

The main autonomous agent loop. Manages the lifecycle of Generate -> Backup -> Patch -> Verify -> Self-Correct.

V3.0 Features: - Memory System: Persistent state across loops - Advanced Verification: Linting + Testing - Extensions Support: context7, criticalthink

Source code in src/boring/loop/legacy.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
class AgentLoop:
    """
    The main autonomous agent loop.
    Manages the lifecycle of Generate -> Backup -> Patch -> Verify -> Self-Correct.

    V3.0 Features:
    - Memory System: Persistent state across loops
    - Advanced Verification: Linting + Testing
    - Extensions Support: context7, criticalthink
    """

    def __init__(
        self,
        model_name: str = settings.DEFAULT_MODEL,
        use_cli: bool = False,
        verbose: bool = False,
        prompt_file: Path | None = None,
        context_file: Path | None = None,
        verification_level: str = "STANDARD",  # BASIC, STANDARD, FULL
    ):
        init_directories()
        self.log_dir = settings.LOG_DIR
        self.model_name = model_name
        self.use_cli = use_cli
        self.verbose = verbose
        self.verification_level = verification_level

        self.prompt_file = prompt_file or settings.PROJECT_ROOT / settings.PROMPT_FILE
        self.context_file = context_file or settings.PROJECT_ROOT / settings.CONTEXT_FILE

        # Initialize subsystems
        self.memory = MemoryManager(settings.PROJECT_ROOT)
        self.verifier = CodeVerifier(settings.PROJECT_ROOT, self.log_dir)
        self.extensions = ExtensionsManager(settings.PROJECT_ROOT)

        # Loop state
        self._empty_output_count = 0
        self._loop_start_time = 0.0
        self._files_modified_this_loop: list[str] = []
        self._tasks_completed_this_loop: list[str] = []
        self._errors_this_loop: list[str] = []

        # Initialize Gemini Client (for SDK mode)
        self.gemini_client: GeminiClient | None = None
        self.gemini_cli_cmd: str | None = None

        if self.use_cli:
            self.gemini_cli_cmd = shutil.which("gemini")
            if not self.gemini_cli_cmd:
                raise RuntimeError(
                    "Gemini CLI not found in PATH. Install with: npm install -g @google/gemini-cli"
                )
            console.print(f"[green]Using Gemini CLI: {self.gemini_cli_cmd}[/green]")
        else:
            try:
                self.gemini_client = create_gemini_client(
                    log_dir=self.log_dir, model_name=model_name
                )
                if not self.gemini_client:
                    raise RuntimeError("Failed to initialize Gemini client.")

                console.print(f"[green]Using Gemini SDK (Model: {model_name})[/green]")
            except Exception as e:
                console.print("[bold red]Failed to initialize Gemini client:[/bold red]")
                console.print(f"[red]{str(e)}[/red]")
                raise RuntimeError("Failed to initialize Gemini client.") from e

        # Show subsystem status
        if self.verbose:
            console.print(f"[dim]Memory: {self.memory.memory_dir}[/dim]")
            console.print(
                f"[dim]Verifier: ruff={self.verifier.tools.is_available('ruff')}, pytest={self.verifier.tools.is_available('pytest')}[/dim]"
            )
            ext_report = self.extensions.create_extensions_report()
            console.print(f"[dim]{ext_report}[/dim]")

    def run(self, max_duration: int | None = None):
        """Start the main loop."""
        start_time = time.time()
        if should_halt_execution():
            console.print("[bold red]Circuit Breaker is OPEN. Execution halted.[/bold red]")
            log_status(self.log_dir, "CRITICAL", "Circuit Breaker is OPEN.")

            # === HUMAN-IN-THE-LOOP: Enter interactive mode ===
            try:
                from ..interactive import enter_interactive_mode

                should_resume = enter_interactive_mode(
                    reason="Circuit Breaker OPEN - Too many consecutive failures",
                    project_root=settings.PROJECT_ROOT,
                    recent_errors=self._errors_this_loop
                    if hasattr(self, "_errors_this_loop")
                    else [],
                )

                if should_resume:
                    console.print("[green]Resuming loop after interactive session...[/green]")
                    # Continue to loop start (don't return)
                else:
                    console.print("[yellow]Aborting as requested.[/yellow]")
                    return
            except ImportError:
                console.print("[dim]Use 'boring reset-circuit' to reset manually.[/dim]")
                return
            except KeyboardInterrupt:
                console.print("\n[yellow]Interrupted.[/yellow]")
                return

        console.print(
            Panel.fit(
                f"[bold green]Boring Autonomous Agent (v2.0)[/bold green]\n"
                f"Mode: {'CLI' if self.use_cli else 'SDK'}\n"
                f"Model: {self.model_name}\n"
                f"Log Dir: {self.log_dir}",
                title="System Initialization",
            )
        )

        # Initialize tracking (ensure files exist)
        init_call_tracking(
            settings.PROJECT_ROOT / ".call_count",
            settings.PROJECT_ROOT / ".last_reset",
            settings.PROJECT_ROOT / ".exit_signals",
        )

        loop_count = 0
        while loop_count < settings.MAX_LOOPS:
            # Check for global timeout
            if max_duration and (time.time() - start_time) > max_duration:
                console.print(
                    f"[bold red]Global timeout of {max_duration}s reached. Aborting.[/bold red]"
                )
                log_status(
                    self.log_dir, "WARN", f"AgentLoop aborted: reached {max_duration}s timeout"
                )
                break

            loop_count += 1

            # rate limit check
            if not can_make_call(settings.PROJECT_ROOT / ".call_count", settings.MAX_HOURLY_CALLS):
                wait_for_reset(
                    settings.PROJECT_ROOT / ".call_count",
                    settings.PROJECT_ROOT / ".last_reset",
                    settings.MAX_HOURLY_CALLS,
                )
                console.print("[yellow]Rate limit reset. Resuming...[/yellow]")

            log_status(self.log_dir, "LOOP", f"=== Starting Loop #{loop_count} ===")
            console.print(f"\n[bold purple]=== Starting Loop #{loop_count} ===[/bold purple]")

            # 1. Generate
            try:
                success, output_content, output_file = self._generate_step(loop_count)
            except Exception as e:
                log_status(self.log_dir, "CRITICAL", f"Generation failed: {e}")
                record_loop_result(loop_count, 0, True, 0)
                break

            if not success:
                log_status(
                    self.log_dir,
                    "ERROR",
                    f"Generation failed (Loop #{loop_count}): {output_content}",
                )
                record_loop_result(loop_count, 0, True, len(output_content))
                continue

            # 2. Patch & Backup
            # The file_patcher now handles backup internally using BackupManager(loop_id)
            try:
                files_changed = process_gemini_output(
                    output_file=output_file,
                    project_root=settings.PROJECT_ROOT,
                    log_dir=self.log_dir,
                    loop_id=loop_count,
                )
            except Exception as e:
                log_status(self.log_dir, "ERROR", f"Patching failed: {e}")
                record_loop_result(loop_count, 0, True, len(output_content))
                continue

            if files_changed == 0:
                log_status(self.log_dir, "WARN", "No files changed in this loop.")

                # === EMPTY OUTPUT FEEDBACK (Fix #2) ===
                # If AI produced output but we couldn't parse it, telling the AI
                if len(output_content) > 0:  # AI did output something
                    # [V12.0 Enhancement] Print the output for the user (One-Shot Mode support)
                    console.print(
                        Panel(
                            output_content,
                            title="[bold blue]Gemini Response[/bold blue]",
                            border_style="blue",
                        )
                    )

                if len(output_content) > 100:
                    console.print(
                        "[bold yellow]⚠️ AI output could not be parsed into file changes.[/bold yellow]"
                    )
                    empty_output_count = getattr(self, "_empty_output_count", 0) + 1
                    self._empty_output_count = empty_output_count

                    if empty_output_count < 3:  # Prevent infinite feedback loops
                        feedback_prompt = self._create_format_feedback()
                        self._save_loop_summary(
                            loop_count, "FAILED", "Output not parseable. Sending format feedback."
                        )
                        self._self_correct(feedback_prompt, loop_count)
                        continue
                    else:
                        log_status(
                            self.log_dir, "ERROR", "Too many unparseable outputs. Breaking loop."
                        )
                        self._save_loop_summary(loop_count, "FAILED", "Repeated format errors.")
                        break
                else:
                    self._empty_output_count = 0  # Reset counter on truly empty output

                record_loop_result(loop_count, 0, False, len(output_content))
            else:
                self._empty_output_count = 0  # Reset counter on success

                # 3. Dependency Check
                check_and_install_dependencies(output_content)

                # 4. Advanced Verification (using CodeVerifier)
                verification_passed, error_msg = self.verifier.verify_project(
                    self.verification_level
                )

                if not verification_passed:
                    log_status(self.log_dir, "ERROR", f"Verification Failed: {error_msg[:200]}")
                    console.print("[bold red]Verification Failed[/bold red]")

                    # V14: Emit Event
                    FlowEventBus.emit(
                        FlowEvent.ON_LINT_FAIL,
                        project_path=str(settings.PROJECT_ROOT),
                        error=error_msg,
                    )

                    # Record failure and learn from error
                    record_loop_result(loop_count, files_changed, True, len(output_content))
                    self._save_loop_summary(
                        loop_count, "FAILED", f"Verification: {error_msg[:100]}"
                    )
                    self.memory.record_error_pattern("verification_error", error_msg[:200])
                    self._errors_this_loop.append(error_msg[:200])

                    # 5. Self-Correction with detailed feedback
                    console.print("[bold yellow]Triggering Self-Correction...[/bold yellow]")
                    self._self_correct(error_msg, loop_count)
                    continue

                # Record Success
                duration = time.time() - self._loop_start_time

                # Record to Memory System
                loop_memory = LoopMemory(
                    loop_id=loop_count,
                    timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
                    status="SUCCESS",
                    files_modified=self._files_modified_this_loop,
                    tasks_completed=self._tasks_completed_this_loop,
                    errors=[],
                    ai_output_summary=output_content[:500] if output_content else "",
                    duration_seconds=duration,
                )
                self.memory.record_loop(loop_memory)
                record_loop_result(loop_count, files_changed, False, len(output_content))
                self._save_loop_summary(loop_count, "SUCCESS", f"Modified {files_changed} file(s).")

                # V14: Emit Event
                if files_changed > 0:
                    FlowEventBus.emit(
                        FlowEvent.POST_BUILD,
                        project_path=str(settings.PROJECT_ROOT),
                        modified_files=self._files_modified_this_loop,
                    )

            # 6. Analyze & Update Task
            increment_call_counter(settings.PROJECT_ROOT / ".call_count")
            analysis = analyze_response(output_file, loop_count)

            # === EXIT DETECTION HARDENING (Fix #3) ===
            # Only exit if BOTH: AI signals exit AND @fix_plan.md has no unchecked items
            should_exit = False
            if analysis.get("analysis", {}).get("exit_signal", False):
                plan_complete = self._check_plan_completion()
                if plan_complete:
                    console.print("[bold green]✅ All tasks complete. Agent exiting.[/bold green]")
                    should_exit = True
                else:
                    console.print(
                        "[yellow]⚠️ AI signaled exit but @fix_plan.md has unchecked items. Continuing.[/yellow]"
                    )
                    log_status(
                        self.log_dir, "WARN", "EXIT_SIGNAL ignored: plan has unchecked items."
                    )

            if should_exit:
                break

            log_status(self.log_dir, "LOOP", f"=== Completed Loop #{loop_count} ===")

        # Cleanup old backups after loop completes
        BackupManager.cleanup_old_backups(keep_last=10)
        console.print("[dim]Agent loop finished.[/dim]")

    def _generate_step(self, loop_count: int) -> tuple[bool, str, Path]:
        """Handles the AI generation step (SDK or CLI)."""

        timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")
        output_file = self.log_dir / f"gemini_output_{timestamp}.log"

        if self.use_cli:
            return self._execute_cli(loop_count, output_file)
        else:
            return self._execute_sdk(loop_count, output_file)

    def _get_file_tree_str(self) -> str:
        """Generates a string representation of the project's file tree."""
        tree_output = ""
        try:
            tree_result = subprocess.run(
                ["cmd", "/c", "tree", "/F", "/A", "src"],
                stdin=subprocess.DEVNULL,
                capture_output=True,
                text=True,
                timeout=10,
                cwd=settings.PROJECT_ROOT,
            )
            if tree_result.returncode == 0 and tree_result.stdout:
                tree_output = tree_result.stdout[:2000]
        except Exception as e:
            logger.debug("Failed to get file tree using 'tree' command: %s", e)
            # Fallback for systems without 'tree' or if it fails
            try:
                src_dir = settings.PROJECT_ROOT / "src"
                if src_dir.exists():
                    files = [
                        str(f.relative_to(settings.PROJECT_ROOT)) for f in src_dir.rglob("*.py")
                    ][:20]  # Limit to 20 for brevity
                    tree_output = "Fallback: Listing up to 20 Python files in src/\n" + "\n".join(
                        files
                    )
            except Exception as e:
                logger.debug("Failed to list project files as fallback: %s", e)
                tree_output = "Could not generate project file tree."
        return f"\n\n# PROJECT STRUCTURE (src/)\n```\n{tree_output}\n```\n"

    def _execute_sdk(self, loop_count: int, output_file: Path) -> tuple[bool, str, Path]:
        """Execute using Python SDK with comprehensive context injection."""
        # Reset loop state
        self._loop_start_time = time.time()
        self._files_modified_this_loop = []
        self._tasks_completed_this_loop = []
        self._errors_this_loop = []

        # Read prompt/context
        prompt = (
            self.prompt_file.read_text(encoding="utf-8")
            if self.prompt_file.exists()
            else "No prompt found."
        )
        context = (
            self.context_file.read_text(encoding="utf-8") if self.context_file.exists() else ""
        )

        # === ENHANCED CONTEXT INJECTION (V3.0) ===

        # 1. Inject Memory System Context (project state, history, learned patterns)
        memory_context = self.memory.generate_context_injection()
        if memory_context:
            context += f"\n\n{memory_context}"

        # 2. Inject Task Plan (@fix_plan.md)
        task_file = settings.PROJECT_ROOT / settings.TASK_FILE
        if task_file.exists():
            task_content = task_file.read_text(encoding="utf-8")
            context += f"\n\n# CURRENT PLAN STATUS (@fix_plan.md)\n{task_content}\n"

        # 3. Inject Project Structure (tree)
        context += self._get_file_tree_str()

        # 4. Inject Recent Git Changes
        try:
            git_result = subprocess.run(
                ["git", "diff", "--stat", "HEAD~1"],
                stdin=subprocess.DEVNULL,
                capture_output=True,
                text=True,
                timeout=10,
                cwd=settings.PROJECT_ROOT,
            )
            if git_result.returncode == 0 and git_result.stdout.strip():
                context += f"\n\n# RECENT GIT CHANGES\n```\n{git_result.stdout[:1000]}\n```\n"
        except Exception as e:
            logger.debug("Failed to get git changes: %s", e)

        # 5. Inject Extensions Info
        ext_context = self.extensions.setup_auto_extensions()
        if ext_context:
            context += ext_context

        # 6. Enhance prompt with extensions (auto-add 'use context7' if relevant)
        prompt = self.extensions.enhance_prompt_with_extensions(prompt)

        # === CONTEXT INJECTION END ===

        if self.gemini_client and getattr(self.gemini_client, "backend", "sdk") == "cli":
            backend_msg = "CLI (Internal Fallback)"
            title_msg = "Gemini CLI Progress"
        else:
            backend_msg = "SDK"
            title_msg = "Gemini SDK Progress"

        console.print(
            f"[blue]Generating with {backend_msg}... (Timeout: {settings.TIMEOUT_MINUTES}m)[/blue]"
        )
        if self.verbose:
            console.print(f"[dim]Context size: {len(context)} chars[/dim]")

        with Live(console=console, screen=False, auto_refresh=True) as live:
            progress = Progress(
                SpinnerColumn(),
                TextColumn("[progress.description]{task.description}"),
                TimeElapsedColumn(),
                console=console,
            )
            spinner_text = "Gemini Thinking..." if backend_msg == "SDK" else "Gemini CLI Running..."
            progress.add_task(f"[cyan]{spinner_text}", total=None)
            live.update(Panel(progress, title=f"[bold blue]{title_msg}[/bold blue]"))

            response_text, success = self.gemini_client.generate_with_retry(
                prompt=prompt,
                context=context,
                system_instruction="",  # Already in client init
            )

        # Write output
        try:
            output_file.write_text(response_text, encoding="utf-8")
        except Exception as e:
            logger.debug("Failed to write output file: %s", e)

        return success, response_text, output_file

    def _execute_cli(self, loop_count: int, output_file: Path) -> tuple[bool, str, Path]:
        """Execute using Gemini CLI Adapter (Privacy Mode)."""
        # Note: We now use GeminiCLIAdapter instead of raw subprocess here
        # But for AgentLoop backward compatibility, we can recreate the adapter logic
        # or better yet, use the self.gemini_client which SHOULD be the adapter if --backend cli was used

        # However, AgentLoop initialization in main.py sets self.gemini_client only for SDK mode.
        # We need to fix AgentLoop to accept an adapter or initialize it.

        # Let's fix this method to use the proper CLI structure
        import os

        if os.environ.get("BORING_OFFLINE_MODE") == "1":
            from ..llm.local_llm import LocalLLM

            console.print("[bold green]Using Offline Local LLM[/bold green]")
            # LocalLLM needs to be compatible with adapter interface
            adapter = LocalLLM(
                model_path=self.model_name if self.model_name.startswith("local/") else None,
            )
        else:
            from ..cli_client import GeminiCLIAdapter

            adapter = GeminiCLIAdapter(
                model_name=self.model_name,
                log_dir=self.log_dir,
                timeout_seconds=settings.TIMEOUT_MINUTES * 60,
            )

        prompt = self.prompt_file.read_text(encoding="utf-8") if self.prompt_file.exists() else ""

        # Inject Context - Smart Context (RAG Lite) if available
        # Or simple file tree injection
        context_str = ""

        # Try Smart Context
        try:
            from ..context_selector import create_context_selector

            selector = create_context_selector(settings.PROJECT_ROOT)
            context_str = selector.generate_context_injection(prompt)
        except ImportError:
            logger.debug("Smart Context (RAG Lite) not available. Falling back to file tree.")
            pass  # Continue to inject file tree

        # If smart context is empty, inject file tree as a fallback
        if not context_str:
            context_str += self._get_file_tree_str()

        # Add Task Plan to context_str
        task_file = settings.PROJECT_ROOT / settings.TASK_FILE
        if task_file.exists():
            task_content = task_file.read_text(encoding="utf-8")
            context_str += f"\n\n# CURRENT PLAN STATUS (@fix_plan.md)\n{task_content}\n"

        console.print(
            f"[blue]Generating with CLI (Privacy Mode)... (Timeout: {settings.TIMEOUT_MINUTES}m)[/blue]"
        )

        with Live(console=console, screen=False, auto_refresh=True) as live:
            progress = Progress(
                SpinnerColumn(),
                TextColumn("{task.description}"),
                TimeElapsedColumn(),
                console=console,
            )
            progress.add_task("[cyan]Gemini CLI Running...", total=None)
            live.update(Panel(progress, title="[bold blue]CLI Progress[/bold blue]"))

            # Load Tools for CLI
            from ..mcp.tools.agents import boring_web_search

            tools = [boring_web_search]

            # Use generate_with_tools if available
            response = adapter.generate_with_tools(prompt=prompt, context=context_str, tools=tools)

            response_text = response.text
            success = response.success

            # Handle tool calls if any returned
            function_calls = None
            if hasattr(response, "function_calls") and response.function_calls:
                function_calls = response.function_calls
            elif isinstance(response, dict) and "function_calls" in response:
                function_calls = response["function_calls"]

            if function_calls:
                console.print(
                    f"[bold magenta]🛠️ CLI Tool Call Requested: {function_calls}[/bold magenta]"
                )
                # Execute tool calls and append results
                tool_results = []
                for fc in function_calls:
                    tool_name = (
                        fc.get("name") if isinstance(fc, dict) else getattr(fc, "name", None)
                    )
                    tool_args = (
                        fc.get("args", {}) if isinstance(fc, dict) else getattr(fc, "args", {})
                    )

                    if tool_name == "boring_web_search":
                        try:
                            result = boring_web_search(**tool_args)
                            tool_results.append(f"[Tool Result: {tool_name}]\n{result}")
                            console.print(f"[green]✅ Tool executed: {tool_name}[/green]")
                        except Exception as e:
                            tool_results.append(f"[Tool Error: {tool_name}] {e}")
                            console.print(f"[red]❌ Tool failed: {tool_name} - {e}[/red]")
                    else:
                        console.print(f"[yellow]⚠️ Unknown tool: {tool_name}[/yellow]")

                # Append tool results to response
                if tool_results:
                    response_text += "\n\n# Tool Execution Results\n" + "\n".join(tool_results)

        # Write output log
        try:
            output_file.write_text(response_text, encoding="utf-8")
        except Exception as e:
            logger.debug("Failed to write CLI output file: %s", e)

        return success, response_text, output_file

    def _verify_project_syntax(self, files_to_check: list[str] | None = None) -> tuple[bool, str]:
        """
        Checks syntax of Python files.

        Optimization: Only checks modified files first.
        If import error occurs, expands to full src check.

        Args:
            files_to_check: List of modified files to check (optional)
        """

        # Use modified files if available, otherwise fall back to full scan
        if files_to_check is None:
            files_to_check = self._files_modified_this_loop

        # Only check modified Python files
        py_files_to_check = [
            Path(f) for f in files_to_check if f.endswith(".py") and Path(f).exists()
        ]

        if py_files_to_check:
            log_status(
                self.log_dir, "INFO", f"Syntax check: {len(py_files_to_check)} modified file(s)"
            )

            for py_file in py_files_to_check:
                valid, error = check_syntax(py_file)
                if not valid:
                    # If import error, expand check to related files
                    if "import" in error.lower():
                        log_status(
                            self.log_dir, "WARN", "Import error detected, expanding check..."
                        )
                        return self._full_syntax_check()
                    return False, error
            return True, ""

        # No modified files = no check needed
        return True, ""

    def _full_syntax_check(self) -> tuple[bool, str]:
        """Full syntax check of all Python files in src."""

        src_dir = settings.PROJECT_ROOT / "src"
        if not src_dir.exists():
            return True, ""

        for py_file in src_dir.rglob("*.py"):
            valid, error = check_syntax(py_file)
            if not valid:
                return False, error
        return True, ""

    def _self_correct(self, error_msg: str, loop_count: int):
        """
        Feeds the error back to Gemini for immediate correction.
        """
        correction_prompt = f"""
CRITICAL: The code you just generated caused a Verification Failure.
Error: {error_msg}

You must FIX this error immediately.
Do not change functionality, just fix the syntax/error.
Output the corrected full file content.
"""
        # We reuse the _generate_step but override the prompt file temporarily?
        # A bit hacky. Better to call client directly.
        if self.gemini_client:
            console.print("[bold yellow]Attempting Self-Correction via SDK...[/bold yellow]")
            response, success = self.gemini_client.generate_with_retry(prompt=correction_prompt)
            if success:
                # We manually write the response to a file so process_gemini_output can read it
                corr_file = self.log_dir / f"correction_loop_{loop_count}.log"
                corr_file.write_text(response, encoding="utf-8")

                # Backup again? Yes, always backup before write.
                process_gemini_output(
                    output_file=corr_file,
                    project_root=settings.PROJECT_ROOT,
                    log_dir=self.log_dir,
                    loop_id=loop_count,
                )

    def _create_format_feedback(self) -> str:
        """Creates a feedback prompt when AI output cannot be parsed."""
        return """
CRITICAL FEEDBACK: Your previous output could NOT be parsed into file changes.
The system requires you to output code using a specific format.

## REQUIRED FORMAT (Use XML tags):
<file path="src/example.py">
# Complete file content here
def my_function():
    pass
</file>

## RULES:
1. You MUST use <file path="...">...</file> tags for EVERY file you want to create or modify.
2. Output the COMPLETE file content, not just changes or diffs.
3. The path must be relative to the project root (e.g., "src/main.py", not "/home/user/project/src/main.py").
4. Do NOT use markdown code blocks (```python) for file content - use XML tags only.

## ALSO REQUIRED:
At the END of your response, include the status block:
---BORING_STATUS---
STATUS: IN_PROGRESS
TASKS_COMPLETED_THIS_LOOP: 0
EXIT_SIGNAL: false
---END_BORING_STATUS---

Please try again with the correct format.
"""

    def _save_loop_summary(self, loop_count: int, status: str, message: str):
        """Saves a summary of this loop for the next iteration to read."""
        summary_file = settings.PROJECT_ROOT / ".last_loop_summary"
        summary = f"""## Loop #{loop_count} Summary
- **Status:** {status}
- **Message:** {message}
- **Timestamp:** {time.strftime("%Y-%m-%d %H:%M:%S")}
"""
        try:
            summary_file.write_text(summary, encoding="utf-8")
        except Exception as e:
            logger.debug("Failed to write loop summary: %s", e)

    def _check_plan_completion(self) -> bool:
        """
        Checks if @fix_plan.md has all items completed.
        Returns True only if there are NO unchecked items.
        """
        task_file = settings.PROJECT_ROOT / settings.TASK_FILE
        if not task_file.exists():
            # No plan file = nothing to check, allow exit
            return True

        try:
            content = task_file.read_text(encoding="utf-8")
            # Check for any unchecked items: - [ ] or * [ ]
            has_unchecked = "- [ ]" in content or "* [ ]" in content
            if has_unchecked:
                # Count for logging
                unchecked_count = content.count("- [ ]") + content.count("* [ ]")
                checked_count = (
                    content.count("- [x]")
                    + content.count("- [X]")
                    + content.count("* [x]")
                    + content.count("* [X]")
                )
                log_status(
                    self.log_dir,
                    "INFO",
                    f"Plan status: {checked_count} done, {unchecked_count} remaining",
                )
                return False
            return True
        except Exception:
            return True  # On error, allow exit

run(max_duration=None)

Start the main loop.

Source code in src/boring/loop/legacy.py
def run(self, max_duration: int | None = None):
    """Start the main loop."""
    start_time = time.time()
    if should_halt_execution():
        console.print("[bold red]Circuit Breaker is OPEN. Execution halted.[/bold red]")
        log_status(self.log_dir, "CRITICAL", "Circuit Breaker is OPEN.")

        # === HUMAN-IN-THE-LOOP: Enter interactive mode ===
        try:
            from ..interactive import enter_interactive_mode

            should_resume = enter_interactive_mode(
                reason="Circuit Breaker OPEN - Too many consecutive failures",
                project_root=settings.PROJECT_ROOT,
                recent_errors=self._errors_this_loop
                if hasattr(self, "_errors_this_loop")
                else [],
            )

            if should_resume:
                console.print("[green]Resuming loop after interactive session...[/green]")
                # Continue to loop start (don't return)
            else:
                console.print("[yellow]Aborting as requested.[/yellow]")
                return
        except ImportError:
            console.print("[dim]Use 'boring reset-circuit' to reset manually.[/dim]")
            return
        except KeyboardInterrupt:
            console.print("\n[yellow]Interrupted.[/yellow]")
            return

    console.print(
        Panel.fit(
            f"[bold green]Boring Autonomous Agent (v2.0)[/bold green]\n"
            f"Mode: {'CLI' if self.use_cli else 'SDK'}\n"
            f"Model: {self.model_name}\n"
            f"Log Dir: {self.log_dir}",
            title="System Initialization",
        )
    )

    # Initialize tracking (ensure files exist)
    init_call_tracking(
        settings.PROJECT_ROOT / ".call_count",
        settings.PROJECT_ROOT / ".last_reset",
        settings.PROJECT_ROOT / ".exit_signals",
    )

    loop_count = 0
    while loop_count < settings.MAX_LOOPS:
        # Check for global timeout
        if max_duration and (time.time() - start_time) > max_duration:
            console.print(
                f"[bold red]Global timeout of {max_duration}s reached. Aborting.[/bold red]"
            )
            log_status(
                self.log_dir, "WARN", f"AgentLoop aborted: reached {max_duration}s timeout"
            )
            break

        loop_count += 1

        # rate limit check
        if not can_make_call(settings.PROJECT_ROOT / ".call_count", settings.MAX_HOURLY_CALLS):
            wait_for_reset(
                settings.PROJECT_ROOT / ".call_count",
                settings.PROJECT_ROOT / ".last_reset",
                settings.MAX_HOURLY_CALLS,
            )
            console.print("[yellow]Rate limit reset. Resuming...[/yellow]")

        log_status(self.log_dir, "LOOP", f"=== Starting Loop #{loop_count} ===")
        console.print(f"\n[bold purple]=== Starting Loop #{loop_count} ===[/bold purple]")

        # 1. Generate
        try:
            success, output_content, output_file = self._generate_step(loop_count)
        except Exception as e:
            log_status(self.log_dir, "CRITICAL", f"Generation failed: {e}")
            record_loop_result(loop_count, 0, True, 0)
            break

        if not success:
            log_status(
                self.log_dir,
                "ERROR",
                f"Generation failed (Loop #{loop_count}): {output_content}",
            )
            record_loop_result(loop_count, 0, True, len(output_content))
            continue

        # 2. Patch & Backup
        # The file_patcher now handles backup internally using BackupManager(loop_id)
        try:
            files_changed = process_gemini_output(
                output_file=output_file,
                project_root=settings.PROJECT_ROOT,
                log_dir=self.log_dir,
                loop_id=loop_count,
            )
        except Exception as e:
            log_status(self.log_dir, "ERROR", f"Patching failed: {e}")
            record_loop_result(loop_count, 0, True, len(output_content))
            continue

        if files_changed == 0:
            log_status(self.log_dir, "WARN", "No files changed in this loop.")

            # === EMPTY OUTPUT FEEDBACK (Fix #2) ===
            # If AI produced output but we couldn't parse it, telling the AI
            if len(output_content) > 0:  # AI did output something
                # [V12.0 Enhancement] Print the output for the user (One-Shot Mode support)
                console.print(
                    Panel(
                        output_content,
                        title="[bold blue]Gemini Response[/bold blue]",
                        border_style="blue",
                    )
                )

            if len(output_content) > 100:
                console.print(
                    "[bold yellow]⚠️ AI output could not be parsed into file changes.[/bold yellow]"
                )
                empty_output_count = getattr(self, "_empty_output_count", 0) + 1
                self._empty_output_count = empty_output_count

                if empty_output_count < 3:  # Prevent infinite feedback loops
                    feedback_prompt = self._create_format_feedback()
                    self._save_loop_summary(
                        loop_count, "FAILED", "Output not parseable. Sending format feedback."
                    )
                    self._self_correct(feedback_prompt, loop_count)
                    continue
                else:
                    log_status(
                        self.log_dir, "ERROR", "Too many unparseable outputs. Breaking loop."
                    )
                    self._save_loop_summary(loop_count, "FAILED", "Repeated format errors.")
                    break
            else:
                self._empty_output_count = 0  # Reset counter on truly empty output

            record_loop_result(loop_count, 0, False, len(output_content))
        else:
            self._empty_output_count = 0  # Reset counter on success

            # 3. Dependency Check
            check_and_install_dependencies(output_content)

            # 4. Advanced Verification (using CodeVerifier)
            verification_passed, error_msg = self.verifier.verify_project(
                self.verification_level
            )

            if not verification_passed:
                log_status(self.log_dir, "ERROR", f"Verification Failed: {error_msg[:200]}")
                console.print("[bold red]Verification Failed[/bold red]")

                # V14: Emit Event
                FlowEventBus.emit(
                    FlowEvent.ON_LINT_FAIL,
                    project_path=str(settings.PROJECT_ROOT),
                    error=error_msg,
                )

                # Record failure and learn from error
                record_loop_result(loop_count, files_changed, True, len(output_content))
                self._save_loop_summary(
                    loop_count, "FAILED", f"Verification: {error_msg[:100]}"
                )
                self.memory.record_error_pattern("verification_error", error_msg[:200])
                self._errors_this_loop.append(error_msg[:200])

                # 5. Self-Correction with detailed feedback
                console.print("[bold yellow]Triggering Self-Correction...[/bold yellow]")
                self._self_correct(error_msg, loop_count)
                continue

            # Record Success
            duration = time.time() - self._loop_start_time

            # Record to Memory System
            loop_memory = LoopMemory(
                loop_id=loop_count,
                timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
                status="SUCCESS",
                files_modified=self._files_modified_this_loop,
                tasks_completed=self._tasks_completed_this_loop,
                errors=[],
                ai_output_summary=output_content[:500] if output_content else "",
                duration_seconds=duration,
            )
            self.memory.record_loop(loop_memory)
            record_loop_result(loop_count, files_changed, False, len(output_content))
            self._save_loop_summary(loop_count, "SUCCESS", f"Modified {files_changed} file(s).")

            # V14: Emit Event
            if files_changed > 0:
                FlowEventBus.emit(
                    FlowEvent.POST_BUILD,
                    project_path=str(settings.PROJECT_ROOT),
                    modified_files=self._files_modified_this_loop,
                )

        # 6. Analyze & Update Task
        increment_call_counter(settings.PROJECT_ROOT / ".call_count")
        analysis = analyze_response(output_file, loop_count)

        # === EXIT DETECTION HARDENING (Fix #3) ===
        # Only exit if BOTH: AI signals exit AND @fix_plan.md has no unchecked items
        should_exit = False
        if analysis.get("analysis", {}).get("exit_signal", False):
            plan_complete = self._check_plan_completion()
            if plan_complete:
                console.print("[bold green]✅ All tasks complete. Agent exiting.[/bold green]")
                should_exit = True
            else:
                console.print(
                    "[yellow]⚠️ AI signaled exit but @fix_plan.md has unchecked items. Continuing.[/yellow]"
                )
                log_status(
                    self.log_dir, "WARN", "EXIT_SIGNAL ignored: plan has unchecked items."
                )

        if should_exit:
            break

        log_status(self.log_dir, "LOOP", f"=== Completed Loop #{loop_count} ===")

    # Cleanup old backups after loop completes
    BackupManager.cleanup_old_backups(keep_last=10)
    console.print("[dim]Agent loop finished.[/dim]")

OperationSeverity

Bases: Enum

Severity levels for operations.

Source code in src/boring/loop/shadow_mode.py
class OperationSeverity(Enum):
    """Severity levels for operations."""

    LOW = "low"  # Read operations, non-destructive queries
    MEDIUM = "medium"  # Large edits, batch operations
    HIGH = "high"  # File deletion, config changes
    CRITICAL = "critical"  # Secrets, system files, mass deletion

PendingOperation dataclass

An operation awaiting human approval.

Source code in src/boring/loop/shadow_mode.py
@dataclass
class PendingOperation:
    """An operation awaiting human approval."""

    operation_id: str
    operation_type: str
    file_path: str
    severity: OperationSeverity
    description: str
    preview: str  # What the change looks like
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    approved: bool | None = None
    approver_note: str | None = None

    def to_dict(self) -> dict[str, Any]:
        return {
            "operation_id": self.operation_id,
            "operation_type": self.operation_type,
            "file_path": self.file_path,
            "severity": self.severity.value,
            "description": self.description,
            "preview": self.preview[:500],
            "timestamp": self.timestamp,
            "approved": self.approved,
            "approver_note": self.approver_note,
        }

ShadowModeGuard

Intercepts and validates operations before execution.

Modes (per user decision - ENABLED is default): - DISABLED: All operations auto-approved - ENABLED: Only HIGH/CRITICAL ops require approval - STRICT: ALL file modifications require approval

Design principles: - Read operations NEVER blocked (avoid alert fatigue) - Only side-effect operations need approval - Quick approve/reject via callback or queue

Source code in src/boring/loop/shadow_mode.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
class ShadowModeGuard:
    """
    Intercepts and validates operations before execution.

    Modes (per user decision - ENABLED is default):
    - DISABLED: All operations auto-approved
    - ENABLED: Only HIGH/CRITICAL ops require approval
    - STRICT: ALL file modifications require approval

    Design principles:
    - Read operations NEVER blocked (avoid alert fatigue)
    - Only side-effect operations need approval
    - Quick approve/reject via callback or queue
    """

    # Patterns for sensitive files
    SENSITIVE_PATTERNS = {
        ".env",
        "secret",
        "password",
        "credential",
        "key",
        "token",
        "auth",
        "private",
        "api_key",
        ".pem",
        ".key",
    }

    # Config files that warrant extra caution
    CONFIG_PATTERNS = {
        "config",
        "settings",
        "pyproject.toml",
        "package.json",
        "docker-compose",
        "Dockerfile",
        ".yaml",
        ".yml",
    }

    # Files that should NEVER be modified by automation
    PROTECTED_FILES = {".git/config", ".git/HEAD", "~/.ssh/", "/etc/"}

    def __init__(
        self,
        project_root: Path,
        mode: ShadowModeLevel = ShadowModeLevel.ENABLED,
        approval_callback: ApprovalCallback | None = None,
        pending_file: Path | None = None,
    ):
        """
        Initialize Shadow Mode guard.

        Args:
            project_root: Project root directory
            mode: Protection level (default ENABLED)
            approval_callback: Sync callback for approval (returns bool)
            pending_file: Path to save pending operations queue
        """
        self.project_root = Path(project_root)
        self._mode = mode  # Use private attr to avoid triggering setter persistence
        self.approval_callback = approval_callback

        self.pending_file = pending_file or (self.project_root / ".boring_pending_approval.json")
        self._mode_file = self.project_root / ".boring_shadow_mode"

        self.pending_queue: list[PendingOperation] = []
        self._operation_counter = 0

        # Load persisted mode (overrides constructor default if file exists)
        self._load_mode()

        # Load any existing pending operations
        self._load_pending()

        # V12.4: File Integrity Monitor
        try:
            from ..security.integrity import FileIntegrityMonitor

            self.integrity_monitor = FileIntegrityMonitor(self.project_root)
            # Monitor config and protected files
            monitored = [
                self.project_root / f
                for f in self.PROTECTED_FILES
                if (self.project_root / f).exists()
            ]
            # Add sensitive config patterns
            for pattern in self.CONFIG_PATTERNS:
                # Basic glob for top-level configs matches
                if not pattern.startswith("."):
                    for f in self.project_root.glob(f"{pattern}*"):
                        if f.is_file():
                            monitored.append(f)

            if monitored:
                self.integrity_monitor.snapshot_files(monitored)

        except ImportError:
            self.integrity_monitor = None
            logger.warning("FileIntegrityMonitor not available")

    @property
    def mode(self) -> ShadowModeLevel:
        """Get current protection mode."""
        return self._mode

    @mode.setter
    def mode(self, value: ShadowModeLevel) -> None:
        """Set protection mode and persist to disk."""
        self._mode = value
        self._persist_mode()

    def check_operation(self, operation: dict[str, Any]) -> PendingOperation | None:
        """
        Check if an operation should be blocked for approval.

        Args:
            operation: Dict with 'name' and 'args'

        Returns:
            PendingOperation if blocked, None if auto-approved
        """
        if self.mode == ShadowModeLevel.DISABLED:
            return None

        op_name = operation.get("name", "")
        args = operation.get("args", {})

        # Classify operation
        pending = self._classify_operation(op_name, args)
        if not pending:
            return None

        # Check trust rules BEFORE blocking
        trust_manager = _get_trust_manager(self.project_root)
        if trust_manager:
            severity_str = pending.severity.value if pending.severity else "medium"
            matched_rule = trust_manager.check_trust(op_name, args, severity_str)
            if matched_rule:
                logger.info(f"✅ Auto-approved by trust rule: {op_name}")
                return None  # Trusted operation, no blocking

        # Check protection level
        if self.mode == ShadowModeLevel.STRICT:
            # Block ALL write operations (unless trusted above)
            return pending

        # ENABLED mode: only block HIGH and CRITICAL
        if pending.severity in (OperationSeverity.HIGH, OperationSeverity.CRITICAL):
            return pending

        return None  # Auto-approve LOW/MEDIUM

        return None  # Auto-approve LOW/MEDIUM

    def verify_system_integrity(self) -> list[str]:
        """
        Check for silent modifications to protected files.
        V12.4 Security Feature.
        """
        if self.integrity_monitor:
            return self.integrity_monitor.detect_silent_modifications()
        return []

    def request_approval(self, pending: PendingOperation) -> bool:
        """
        Request approval for a pending operation.

        Returns:
            True if approved, False if rejected/pending
        """
        # Try callback first
        if self.approval_callback:
            try:
                return self.approval_callback(pending)
            except Exception as e:
                logger.warning(f"Approval callback failed: {e}")

        # Fall back to queue
        self.pending_queue.append(pending)
        self._save_pending()

        logger.info(
            f"⚠️ Operation queued for approval: {pending.operation_type} "
            f"on {pending.file_path} ({pending.severity.value})"
        )

        return False  # Not approved yet

    def approve_operation(self, operation_id: str, note: str = None) -> bool:
        """
        Approve a pending operation by ID.

        Returns:
            True if found and approved
        """
        for op in self.pending_queue:
            if op.operation_id == operation_id:
                op.approved = True
                op.approver_note = note
                self._save_pending()
                return True
        return False

    def reject_operation(self, operation_id: str, note: str = None) -> bool:
        """
        Reject a pending operation by ID.

        Returns:
            True if found and rejected
        """
        for op in self.pending_queue:
            if op.operation_id == operation_id:
                op.approved = False
                op.approver_note = note
                self._remove_pending(operation_id)
                return True
        return False

    def get_pending_operations(self) -> list[PendingOperation]:
        """Get all pending operations awaiting approval."""
        return [op for op in self.pending_queue if op.approved is None]

    def clear_pending(self) -> int:
        """Clear all pending operations. Returns count cleared."""
        count = len(self.pending_queue)
        self.pending_queue.clear()
        self._save_pending()
        return count

    def is_operation_approved(self, operation_id: str) -> bool | None:
        """
        Check if an operation has been approved.

        Returns:
            True if approved, False if rejected, None if pending
        """
        for op in self.pending_queue:
            if op.operation_id == operation_id:
                return op.approved
        return None

    def _classify_operation(self, op_name: str, args: dict[str, Any]) -> PendingOperation | None:
        """Classify an operation by severity."""
        file_path = args.get("file_path", "") or args.get("path", "")

        # Generate unique ID
        self._operation_counter += 1
        op_id = f"op_{self._operation_counter}_{datetime.now().strftime('%H%M%S')}"

        # ==================
        # FILE DELETION - HIGH
        # ==================
        if op_name in ("delete_file", "remove_file", "rm"):
            return PendingOperation(
                operation_id=op_id,
                operation_type="DELETE",
                file_path=file_path,
                severity=OperationSeverity.HIGH,
                description=f"Delete file: {file_path}",
                preview="[File will be permanently deleted]",
            )

        # ==================
        # SECRETS/SENSITIVE - CRITICAL
        # ==================
        if op_name in ("write_file", "create_file", "search_replace"):
            if self._is_sensitive_file(file_path):
                content = args.get("content", "") or args.get("replace", "")
                return PendingOperation(
                    operation_id=op_id,
                    operation_type="SENSITIVE_CHANGE",
                    file_path=file_path,
                    severity=OperationSeverity.CRITICAL,
                    description=f"Modify sensitive file: {file_path}",
                    preview=self._safe_preview(content),
                )

        # ==================
        # CONFIG FILES - HIGH
        # ==================
        if op_name in ("write_file", "create_file", "search_replace"):
            if self._is_config_file(file_path):
                content = args.get("content", "") or args.get("replace", "")
                return PendingOperation(
                    operation_id=op_id,
                    operation_type="CONFIG_CHANGE",
                    file_path=file_path,
                    severity=OperationSeverity.HIGH,
                    description=f"Modify config file: {file_path}",
                    preview=self._safe_preview(content),
                )

        # ==================
        # LARGE DELETIONS - MEDIUM
        # ==================
        if op_name == "search_replace":
            search_content = args.get("search", "")
            if len(search_content) > 1000 or search_content.count("\n") > 30:
                return PendingOperation(
                    operation_id=op_id,
                    operation_type="LARGE_EDIT",
                    file_path=file_path,
                    severity=OperationSeverity.MEDIUM,
                    description=f"Large edit in {file_path} ({len(search_content)} chars, {search_content.count(chr(10))} lines)",
                    preview=f"Removing:\n{search_content[:300]}...",
                )

        # ==================
        # SHELL COMMANDS - HIGH
        # ==================
        if op_name in ("exec", "shell", "run_command", "subprocess"):
            cmd = args.get("command", "") or args.get("cmd", "")
            return PendingOperation(
                operation_id=op_id,
                operation_type="SHELL_COMMAND",
                file_path="[shell]",
                severity=OperationSeverity.HIGH,
                description="Execute shell command",
                preview=cmd[:200],
            )

        # ==================
        # PROTECTED PATHS - CRITICAL
        # ==================
        if self._is_protected_path(file_path):
            return PendingOperation(
                operation_id=op_id,
                operation_type="PROTECTED_PATH",
                file_path=file_path,
                severity=OperationSeverity.CRITICAL,
                description=f"Attempt to modify protected path: {file_path}",
                preview="[BLOCKED - Protected system path]",
            )

        # ==================
        # CATCH-ALL WRITES - LOW
        # ==================
        if op_name in ("write_file", "create_file", "search_replace", "apply_patch"):
            content = args.get("content", "") or args.get("replace", "")
            return PendingOperation(
                operation_id=op_id,
                operation_type="WRITE_FILE",
                file_path=file_path,
                severity=OperationSeverity.LOW,
                description=f"Modify file: {file_path}",
                preview=self._safe_preview(content),
            )

        return None  # No special handling needed

    def _is_sensitive_file(self, path: str) -> bool:
        """Check if file might contain sensitive data."""
        path_lower = path.lower()
        return any(pattern in path_lower for pattern in self.SENSITIVE_PATTERNS)

    def _is_config_file(self, path: str) -> bool:
        """Check if file is a configuration file."""
        path_lower = path.lower()
        return any(pattern in path_lower for pattern in self.CONFIG_PATTERNS)

    def _is_protected_path(self, path: str) -> bool:
        """Check if path is in protected list."""
        return any(protected in path for protected in self.PROTECTED_FILES)

    def _safe_preview(self, content: str, max_len: int = 300) -> str:
        """Create safe preview of content (redact sensitive data)."""
        if not content:
            return "[empty]"

        # Redact potential secrets
        import re

        redacted = re.sub(
            r'(password|secret|key|token|api_key)\s*[=:]\s*["\']?[^"\'\s]+',
            r"\1=[REDACTED]",
            content,
            flags=re.IGNORECASE,
        )

        if len(redacted) > max_len:
            return redacted[:max_len] + "..."

        return redacted

    def _load_pending(self) -> None:
        """Load pending operations from file."""
        if self.pending_file.exists():
            try:
                data = json.loads(self.pending_file.read_text())
                self.pending_queue = [
                    PendingOperation(
                        operation_id=op["operation_id"],
                        operation_type=op["operation_type"],
                        file_path=op["file_path"],
                        severity=OperationSeverity(op["severity"]),
                        description=op["description"],
                        preview=op["preview"],
                        timestamp=op.get("timestamp", ""),
                        approved=op.get("approved"),
                        approver_note=op.get("approver_note"),
                    )
                    for op in data
                ]
            except Exception as e:
                logger.warning(f"Failed to load pending operations: {e}")

    def _save_pending(self) -> None:
        """Save pending operations to file."""
        try:
            data = [op.to_dict() for op in self.pending_queue]
            TransactionalFileWriter.write_json(self.pending_file, data, indent=2)
        except Exception as e:
            logger.warning(f"Failed to save pending operations: {e}")

    def _remove_pending(self, operation_id: str) -> None:
        """Remove an operation from the queue."""
        self.pending_queue = [op for op in self.pending_queue if op.operation_id != operation_id]
        self._save_pending()

    def _persist_mode(self) -> None:
        """Persist current mode to disk for cross-session consistency."""
        try:
            TransactionalFileWriter.write_text(self._mode_file, self._mode.value)
            logger.debug(f"Shadow Mode persisted: {self._mode.value}")
        except Exception as e:
            logger.warning(f"Failed to persist Shadow Mode: {e}")

    def _load_mode(self) -> None:
        """Load persisted mode from disk if available."""
        if self._mode_file.exists():
            try:
                mode_str = self._mode_file.read_text().strip().upper()
                if mode_str in ("DISABLED", "ENABLED", "STRICT"):
                    self._mode = ShadowModeLevel[mode_str]
                    logger.debug(f"Loaded persisted Shadow Mode: {mode_str}")
            except Exception as e:
                logger.warning(f"Failed to load persisted Shadow Mode: {e}")

mode property writable

Get current protection mode.

__init__(project_root, mode=ShadowModeLevel.ENABLED, approval_callback=None, pending_file=None)

Initialize Shadow Mode guard.

Parameters:

Name Type Description Default
project_root Path

Project root directory

required
mode ShadowModeLevel

Protection level (default ENABLED)

ENABLED
approval_callback ApprovalCallback | None

Sync callback for approval (returns bool)

None
pending_file Path | None

Path to save pending operations queue

None
Source code in src/boring/loop/shadow_mode.py
def __init__(
    self,
    project_root: Path,
    mode: ShadowModeLevel = ShadowModeLevel.ENABLED,
    approval_callback: ApprovalCallback | None = None,
    pending_file: Path | None = None,
):
    """
    Initialize Shadow Mode guard.

    Args:
        project_root: Project root directory
        mode: Protection level (default ENABLED)
        approval_callback: Sync callback for approval (returns bool)
        pending_file: Path to save pending operations queue
    """
    self.project_root = Path(project_root)
    self._mode = mode  # Use private attr to avoid triggering setter persistence
    self.approval_callback = approval_callback

    self.pending_file = pending_file or (self.project_root / ".boring_pending_approval.json")
    self._mode_file = self.project_root / ".boring_shadow_mode"

    self.pending_queue: list[PendingOperation] = []
    self._operation_counter = 0

    # Load persisted mode (overrides constructor default if file exists)
    self._load_mode()

    # Load any existing pending operations
    self._load_pending()

    # V12.4: File Integrity Monitor
    try:
        from ..security.integrity import FileIntegrityMonitor

        self.integrity_monitor = FileIntegrityMonitor(self.project_root)
        # Monitor config and protected files
        monitored = [
            self.project_root / f
            for f in self.PROTECTED_FILES
            if (self.project_root / f).exists()
        ]
        # Add sensitive config patterns
        for pattern in self.CONFIG_PATTERNS:
            # Basic glob for top-level configs matches
            if not pattern.startswith("."):
                for f in self.project_root.glob(f"{pattern}*"):
                    if f.is_file():
                        monitored.append(f)

        if monitored:
            self.integrity_monitor.snapshot_files(monitored)

    except ImportError:
        self.integrity_monitor = None
        logger.warning("FileIntegrityMonitor not available")

check_operation(operation)

Check if an operation should be blocked for approval.

Parameters:

Name Type Description Default
operation dict[str, Any]

Dict with 'name' and 'args'

required

Returns:

Type Description
PendingOperation | None

PendingOperation if blocked, None if auto-approved

Source code in src/boring/loop/shadow_mode.py
def check_operation(self, operation: dict[str, Any]) -> PendingOperation | None:
    """
    Check if an operation should be blocked for approval.

    Args:
        operation: Dict with 'name' and 'args'

    Returns:
        PendingOperation if blocked, None if auto-approved
    """
    if self.mode == ShadowModeLevel.DISABLED:
        return None

    op_name = operation.get("name", "")
    args = operation.get("args", {})

    # Classify operation
    pending = self._classify_operation(op_name, args)
    if not pending:
        return None

    # Check trust rules BEFORE blocking
    trust_manager = _get_trust_manager(self.project_root)
    if trust_manager:
        severity_str = pending.severity.value if pending.severity else "medium"
        matched_rule = trust_manager.check_trust(op_name, args, severity_str)
        if matched_rule:
            logger.info(f"✅ Auto-approved by trust rule: {op_name}")
            return None  # Trusted operation, no blocking

    # Check protection level
    if self.mode == ShadowModeLevel.STRICT:
        # Block ALL write operations (unless trusted above)
        return pending

    # ENABLED mode: only block HIGH and CRITICAL
    if pending.severity in (OperationSeverity.HIGH, OperationSeverity.CRITICAL):
        return pending

    return None  # Auto-approve LOW/MEDIUM

    return None  # Auto-approve LOW/MEDIUM

verify_system_integrity()

Check for silent modifications to protected files. V12.4 Security Feature.

Source code in src/boring/loop/shadow_mode.py
def verify_system_integrity(self) -> list[str]:
    """
    Check for silent modifications to protected files.
    V12.4 Security Feature.
    """
    if self.integrity_monitor:
        return self.integrity_monitor.detect_silent_modifications()
    return []

request_approval(pending)

Request approval for a pending operation.

Returns:

Type Description
bool

True if approved, False if rejected/pending

Source code in src/boring/loop/shadow_mode.py
def request_approval(self, pending: PendingOperation) -> bool:
    """
    Request approval for a pending operation.

    Returns:
        True if approved, False if rejected/pending
    """
    # Try callback first
    if self.approval_callback:
        try:
            return self.approval_callback(pending)
        except Exception as e:
            logger.warning(f"Approval callback failed: {e}")

    # Fall back to queue
    self.pending_queue.append(pending)
    self._save_pending()

    logger.info(
        f"⚠️ Operation queued for approval: {pending.operation_type} "
        f"on {pending.file_path} ({pending.severity.value})"
    )

    return False  # Not approved yet

approve_operation(operation_id, note=None)

Approve a pending operation by ID.

Returns:

Type Description
bool

True if found and approved

Source code in src/boring/loop/shadow_mode.py
def approve_operation(self, operation_id: str, note: str = None) -> bool:
    """
    Approve a pending operation by ID.

    Returns:
        True if found and approved
    """
    for op in self.pending_queue:
        if op.operation_id == operation_id:
            op.approved = True
            op.approver_note = note
            self._save_pending()
            return True
    return False

reject_operation(operation_id, note=None)

Reject a pending operation by ID.

Returns:

Type Description
bool

True if found and rejected

Source code in src/boring/loop/shadow_mode.py
def reject_operation(self, operation_id: str, note: str = None) -> bool:
    """
    Reject a pending operation by ID.

    Returns:
        True if found and rejected
    """
    for op in self.pending_queue:
        if op.operation_id == operation_id:
            op.approved = False
            op.approver_note = note
            self._remove_pending(operation_id)
            return True
    return False

get_pending_operations()

Get all pending operations awaiting approval.

Source code in src/boring/loop/shadow_mode.py
def get_pending_operations(self) -> list[PendingOperation]:
    """Get all pending operations awaiting approval."""
    return [op for op in self.pending_queue if op.approved is None]

clear_pending()

Clear all pending operations. Returns count cleared.

Source code in src/boring/loop/shadow_mode.py
def clear_pending(self) -> int:
    """Clear all pending operations. Returns count cleared."""
    count = len(self.pending_queue)
    self.pending_queue.clear()
    self._save_pending()
    return count

is_operation_approved(operation_id)

Check if an operation has been approved.

Returns:

Type Description
bool | None

True if approved, False if rejected, None if pending

Source code in src/boring/loop/shadow_mode.py
def is_operation_approved(self, operation_id: str) -> bool | None:
    """
    Check if an operation has been approved.

    Returns:
        True if approved, False if rejected, None if pending
    """
    for op in self.pending_queue:
        if op.operation_id == operation_id:
            return op.approved
    return None

ShadowModeLevel

Bases: Enum

Shadow mode protection levels.

Source code in src/boring/loop/shadow_mode.py
class ShadowModeLevel(Enum):
    """Shadow mode protection levels."""

    DISABLED = "DISABLED"  # All operations auto-approved
    ENABLED = "ENABLED"  # Block HIGH/CRITICAL only (DEFAULT)
    STRICT = "STRICT"  # Block ALL write operations

TransactionManager

Manages atomic transactions using Git snapshots.

Provides: - start() - Create a checkpoint before making changes - commit() - Confirm changes and clear checkpoint - rollback() - Revert to checkpoint

Source code in src/boring/loop/transactions.py
class TransactionManager:
    """
    Manages atomic transactions using Git snapshots.

    Provides:
    - start() - Create a checkpoint before making changes
    - commit() - Confirm changes and clear checkpoint
    - rollback() - Revert to checkpoint
    """

    def __init__(self, project_root: Path):
        self.project_root = Path(project_root)
        self.state_file = self.project_root / ".boring_transaction"
        self.current_transaction: TransactionState | None = None

    def _run_git(self, args: list[str], with_retry: bool = False) -> tuple[bool, str]:
        """
        Run a git command and return (success, output).

        Args:
            args: Git command arguments
            with_retry: If True, use exponential backoff retry for file lock issues

        Returns:
            Tuple of (success, output)
        """
        # Prevent git from prompting for credentials or GPG keys
        env = os.environ.copy()
        env["GIT_TERMINAL_PROMPT"] = "0"

        def _execute_git() -> tuple[bool, str]:
            try:
                # Add -c commit.gpgsign=false to avoid GPG passphrase prompts
                cmd = ["git", "-c", "commit.gpgsign=false"] + args

                result = subprocess.run(
                    cmd,
                    cwd=self.project_root,
                    capture_output=True,
                    text=True,
                    timeout=30,
                    env=env,
                )
                return result.returncode == 0, result.stdout.strip() or result.stderr.strip()
            except subprocess.TimeoutExpired as e:
                raise OSError(f"Git command timed out: {e}")
            except Exception as e:
                return False, str(e)

        if with_retry:
            # Use retry mechanism for operations that may encounter file locks
            @retry_with_backoff(max_retries=5, initial_delay=0.1, max_delay=5.0)
            def _execute_with_retry() -> tuple[bool, str]:
                success, output = _execute_git()
                if not success and self._is_file_lock_error(output):
                    raise OSError(f"File lock detected: {output}")
                return success, output

            try:
                return _execute_with_retry()
            except OSError as e:
                return False, str(e)
        else:
            return _execute_git()

    def _is_file_lock_error(self, error_message: str) -> bool:
        """Check if error message indicates a file lock issue."""
        lock_indicators = [
            "cannot lock",
            "unable to create",
            "Permission denied",
            "Access is denied",
            "being used by another process",
            "cannot create",
            "file is busy",
            "resource busy",
        ]
        error_lower = error_message.lower()
        return any(indicator.lower() in error_lower for indicator in lock_indicators)

    def _check_file_locks_before_operation(self) -> dict:
        """
        Check for file locks before performing rollback operation.

        Returns:
            Dict with status and any locked files found
        """
        locked_files = FileLockDetector.find_locked_files(self.project_root)

        if locked_files:
            # Attempt to wait for files to unlock
            still_locked = []
            for file_path in locked_files:
                if not FileLockDetector.wait_for_file_unlock(file_path, timeout=3.0):
                    still_locked.append(str(file_path))

            if still_locked:
                return {
                    "status": "warning",
                    "locked_files": still_locked,
                    "message": f"Found {len(still_locked)} locked files. Proceeding with caution.",
                }

        return {"status": "ok", "locked_files": []}

    def _get_current_commit(self) -> str:
        """Get current HEAD commit hash."""
        success, output = self._run_git(["rev-parse", "HEAD"])
        return output if success else ""

    def _get_changed_files(self) -> list[str]:
        """Get list of uncommitted changed files."""
        success, output = self._run_git(["status", "--porcelain"])
        if not success or not output:
            return []
        res = []
        for line in output.split("\n"):
            if line:
                res.append(line[3:])
        return res

    def start(self, description: str = "Boring transaction") -> dict:
        """
        Start a new transaction by creating a Git checkpoint.

        Args:
            description: Description of the transaction

        Returns:
            Transaction state as dict
        """
        if self.current_transaction and self.current_transaction.is_active:
            return {
                "status": "error",
                "message": "Transaction already in progress. Commit or rollback first.",
                "transaction_id": self.current_transaction.transaction_id,
            }

        # Check if we're in a git repo
        success, _ = self._run_git(["rev-parse", "--git-dir"])
        if not success:
            return {
                "status": "error",
                "message": "Not a Git repository. Please initialize with: git init",
            }

        # Get current state
        commit_hash = self._get_current_commit()
        changed_files = self._get_changed_files()

        # Create stash if there are uncommitted changes
        stash_ref = None
        if changed_files:
            success, output = self._run_git(
                ["stash", "push", "-m", f"boring-transaction-{datetime.now().isoformat()}"]
            )
            if success:
                stash_ref = "stash@{0}"

        # Generate transaction ID
        transaction_id = f"tx-{datetime.now().strftime('%Y%m%d-%H%M%S')}"

        # Create transaction state
        self.current_transaction = TransactionState(
            transaction_id=transaction_id,
            started_at=datetime.now(),
            commit_hash=stash_ref or commit_hash,
            description=description,
            files_changed=changed_files,
            is_active=True,
        )

        # Save state to file
        self._save_state()

        return {
            "status": "success",
            "transaction_id": transaction_id,
            "checkpoint": commit_hash,
            "files_stashed": len(changed_files) if stash_ref else 0,
            "message": "Transaction started. Use rollback to revert if needed.",
        }

    def commit(self) -> dict:
        """
        Commit the transaction, keeping all changes.

        Returns:
            Result as dict
        """
        if not self.current_transaction or not self.current_transaction.is_active:
            return {"status": "error", "message": "No active transaction to commit"}

        transaction_id = self.current_transaction.transaction_id

        # Clear the stash if we created one
        if self.current_transaction.commit_hash.startswith("stash"):
            self._run_git(["stash", "drop", "stash@{0}"])

        # Mark transaction as complete
        self.current_transaction.is_active = False
        self._clear_state()

        return {
            "status": "success",
            "transaction_id": transaction_id,
            "message": "Transaction committed. Changes are permanent.",
        }

    def rollback(self) -> dict:
        """
        Rollback to the checkpoint, discarding all changes since start().

        V11.0 Enhancements:
        - Pre-execution file lock detection
        - Exponential backoff retry for Windows file locking
        - Graceful recovery on disk write failures

        Returns:
            Result as dict
        """
        if not self.current_transaction or not self.current_transaction.is_active:
            return {"status": "error", "message": "No active transaction to rollback"}

        transaction_id = self.current_transaction.transaction_id
        checkpoint = self.current_transaction.commit_hash

        # V11.0: Check for file locks before operation
        lock_check = self._check_file_locks_before_operation()
        if lock_check["status"] == "warning":
            logger.warning(
                f"Proceeding with rollback despite locked files: {lock_check['locked_files']}"
            )

        # Track rollback progress for graceful recovery
        rollback_state = {"checkout_done": False, "clean_done": False, "restore_done": False}

        try:
            # Discard all current changes (with retry for file locks)
            success, output = self._run_git(["checkout", "--", "."], with_retry=True)
            if not success:
                return {
                    "status": "error",
                    "transaction_id": transaction_id,
                    "message": f"Failed to checkout: {output}",
                    "partial_state": rollback_state,
                }
            rollback_state["checkout_done"] = True

            success, output = self._run_git(["clean", "-fd"], with_retry=True)
            if not success:
                logger.warning(f"Clean operation had issues: {output}")
            rollback_state["clean_done"] = True

            # Restore stashed changes if applicable
            if checkpoint.startswith("stash"):
                success, output = self._run_git(["stash", "pop"], with_retry=True)
                if not success:
                    # Attempt graceful recovery: try stash apply instead
                    success, output = self._run_git(["stash", "apply", "stash@{0}"])
                    if success:
                        # Apply worked, drop the stash
                        self._run_git(["stash", "drop", "stash@{0}"])
                    else:
                        return {
                            "status": "partial",
                            "transaction_id": transaction_id,
                            "message": f"Rolled back but stash restore failed: {output}. "
                            "Your changes are still in the stash.",
                            "partial_state": rollback_state,
                        }
                rollback_state["restore_done"] = True
            else:
                # Hard reset to the commit (with retry)
                success, output = self._run_git(["reset", "--hard", checkpoint], with_retry=True)
                if not success:
                    return {
                        "status": "partial",
                        "transaction_id": transaction_id,
                        "message": f"Reset failed: {output}",
                        "partial_state": rollback_state,
                    }
                rollback_state["restore_done"] = True

            # Clear transaction
            self.current_transaction.is_active = False
            self._clear_state()

            return {
                "status": "success",
                "transaction_id": transaction_id,
                "message": "Transaction rolled back. All changes since start() have been reverted.",
            }

        except Exception as e:
            logger.error(f"Rollback encountered an unexpected error: {e}")
            return {
                "status": "error",
                "transaction_id": transaction_id,
                "message": f"Rollback failed with error: {e}",
                "partial_state": rollback_state,
            }

    def status(self) -> dict:
        """Get current transaction status."""
        self._load_state()

        if not self.current_transaction or not self.current_transaction.is_active:
            return {"status": "idle", "message": "No active transaction"}

        return {
            "status": "active",
            "transaction_id": self.current_transaction.transaction_id,
            "started_at": self.current_transaction.started_at.isoformat(),
            "description": self.current_transaction.description,
            "files_at_start": self.current_transaction.files_changed,
            "current_changes": self._get_changed_files(),
        }

    def _save_state(self):
        """Save transaction state to file."""
        import json

        if self.current_transaction:
            data = {
                "transaction_id": self.current_transaction.transaction_id,
                "started_at": self.current_transaction.started_at.isoformat(),
                "commit_hash": self.current_transaction.commit_hash,
                "description": self.current_transaction.description,
                "files_changed": self.current_transaction.files_changed,
                "is_active": self.current_transaction.is_active,
            }
            self.state_file.write_text(json.dumps(data, indent=2), encoding="utf-8")

    def _load_state(self):
        """Load transaction state from file."""
        import json

        if self.state_file.exists():
            try:
                data = json.loads(self.state_file.read_text(encoding="utf-8"))
                self.current_transaction = TransactionState(
                    transaction_id=data["transaction_id"],
                    started_at=datetime.fromisoformat(data["started_at"]),
                    commit_hash=data["commit_hash"],
                    description=data["description"],
                    files_changed=data.get("files_changed", []),
                    is_active=data.get("is_active", True),
                )
            except Exception:
                self.current_transaction = None

    def _clear_state(self):
        """Clear saved transaction state."""
        if self.state_file.exists():
            self.state_file.unlink()
        self.current_transaction = None

start(description='Boring transaction')

Start a new transaction by creating a Git checkpoint.

Parameters:

Name Type Description Default
description str

Description of the transaction

'Boring transaction'

Returns:

Type Description
dict

Transaction state as dict

Source code in src/boring/loop/transactions.py
def start(self, description: str = "Boring transaction") -> dict:
    """
    Start a new transaction by creating a Git checkpoint.

    Args:
        description: Description of the transaction

    Returns:
        Transaction state as dict
    """
    if self.current_transaction and self.current_transaction.is_active:
        return {
            "status": "error",
            "message": "Transaction already in progress. Commit or rollback first.",
            "transaction_id": self.current_transaction.transaction_id,
        }

    # Check if we're in a git repo
    success, _ = self._run_git(["rev-parse", "--git-dir"])
    if not success:
        return {
            "status": "error",
            "message": "Not a Git repository. Please initialize with: git init",
        }

    # Get current state
    commit_hash = self._get_current_commit()
    changed_files = self._get_changed_files()

    # Create stash if there are uncommitted changes
    stash_ref = None
    if changed_files:
        success, output = self._run_git(
            ["stash", "push", "-m", f"boring-transaction-{datetime.now().isoformat()}"]
        )
        if success:
            stash_ref = "stash@{0}"

    # Generate transaction ID
    transaction_id = f"tx-{datetime.now().strftime('%Y%m%d-%H%M%S')}"

    # Create transaction state
    self.current_transaction = TransactionState(
        transaction_id=transaction_id,
        started_at=datetime.now(),
        commit_hash=stash_ref or commit_hash,
        description=description,
        files_changed=changed_files,
        is_active=True,
    )

    # Save state to file
    self._save_state()

    return {
        "status": "success",
        "transaction_id": transaction_id,
        "checkpoint": commit_hash,
        "files_stashed": len(changed_files) if stash_ref else 0,
        "message": "Transaction started. Use rollback to revert if needed.",
    }

commit()

Commit the transaction, keeping all changes.

Returns:

Type Description
dict

Result as dict

Source code in src/boring/loop/transactions.py
def commit(self) -> dict:
    """
    Commit the transaction, keeping all changes.

    Returns:
        Result as dict
    """
    if not self.current_transaction or not self.current_transaction.is_active:
        return {"status": "error", "message": "No active transaction to commit"}

    transaction_id = self.current_transaction.transaction_id

    # Clear the stash if we created one
    if self.current_transaction.commit_hash.startswith("stash"):
        self._run_git(["stash", "drop", "stash@{0}"])

    # Mark transaction as complete
    self.current_transaction.is_active = False
    self._clear_state()

    return {
        "status": "success",
        "transaction_id": transaction_id,
        "message": "Transaction committed. Changes are permanent.",
    }

rollback()

Rollback to the checkpoint, discarding all changes since start().

V11.0 Enhancements: - Pre-execution file lock detection - Exponential backoff retry for Windows file locking - Graceful recovery on disk write failures

Returns:

Type Description
dict

Result as dict

Source code in src/boring/loop/transactions.py
def rollback(self) -> dict:
    """
    Rollback to the checkpoint, discarding all changes since start().

    V11.0 Enhancements:
    - Pre-execution file lock detection
    - Exponential backoff retry for Windows file locking
    - Graceful recovery on disk write failures

    Returns:
        Result as dict
    """
    if not self.current_transaction or not self.current_transaction.is_active:
        return {"status": "error", "message": "No active transaction to rollback"}

    transaction_id = self.current_transaction.transaction_id
    checkpoint = self.current_transaction.commit_hash

    # V11.0: Check for file locks before operation
    lock_check = self._check_file_locks_before_operation()
    if lock_check["status"] == "warning":
        logger.warning(
            f"Proceeding with rollback despite locked files: {lock_check['locked_files']}"
        )

    # Track rollback progress for graceful recovery
    rollback_state = {"checkout_done": False, "clean_done": False, "restore_done": False}

    try:
        # Discard all current changes (with retry for file locks)
        success, output = self._run_git(["checkout", "--", "."], with_retry=True)
        if not success:
            return {
                "status": "error",
                "transaction_id": transaction_id,
                "message": f"Failed to checkout: {output}",
                "partial_state": rollback_state,
            }
        rollback_state["checkout_done"] = True

        success, output = self._run_git(["clean", "-fd"], with_retry=True)
        if not success:
            logger.warning(f"Clean operation had issues: {output}")
        rollback_state["clean_done"] = True

        # Restore stashed changes if applicable
        if checkpoint.startswith("stash"):
            success, output = self._run_git(["stash", "pop"], with_retry=True)
            if not success:
                # Attempt graceful recovery: try stash apply instead
                success, output = self._run_git(["stash", "apply", "stash@{0}"])
                if success:
                    # Apply worked, drop the stash
                    self._run_git(["stash", "drop", "stash@{0}"])
                else:
                    return {
                        "status": "partial",
                        "transaction_id": transaction_id,
                        "message": f"Rolled back but stash restore failed: {output}. "
                        "Your changes are still in the stash.",
                        "partial_state": rollback_state,
                    }
            rollback_state["restore_done"] = True
        else:
            # Hard reset to the commit (with retry)
            success, output = self._run_git(["reset", "--hard", checkpoint], with_retry=True)
            if not success:
                return {
                    "status": "partial",
                    "transaction_id": transaction_id,
                    "message": f"Reset failed: {output}",
                    "partial_state": rollback_state,
                }
            rollback_state["restore_done"] = True

        # Clear transaction
        self.current_transaction.is_active = False
        self._clear_state()

        return {
            "status": "success",
            "transaction_id": transaction_id,
            "message": "Transaction rolled back. All changes since start() have been reverted.",
        }

    except Exception as e:
        logger.error(f"Rollback encountered an unexpected error: {e}")
        return {
            "status": "error",
            "transaction_id": transaction_id,
            "message": f"Rollback failed with error: {e}",
            "partial_state": rollback_state,
        }

status()

Get current transaction status.

Source code in src/boring/loop/transactions.py
def status(self) -> dict:
    """Get current transaction status."""
    self._load_state()

    if not self.current_transaction or not self.current_transaction.is_active:
        return {"status": "idle", "message": "No active transaction"}

    return {
        "status": "active",
        "transaction_id": self.current_transaction.transaction_id,
        "started_at": self.current_transaction.started_at.isoformat(),
        "description": self.current_transaction.description,
        "files_at_start": self.current_transaction.files_changed,
        "current_changes": self._get_changed_files(),
    }

TransactionState dataclass

State of a transaction.

Source code in src/boring/loop/transactions.py
@dataclass
class TransactionState:
    """State of a transaction."""

    transaction_id: str
    started_at: datetime
    commit_hash: str  # Git commit or stash reference
    description: str
    files_changed: list[str] = field(default_factory=list)
    is_active: bool = True

ProjectContext dataclass

Captured state of a project for context-aware evolution.

Source code in src/boring/loop/workflow_evolver.py
@dataclass
class ProjectContext:
    """Captured state of a project for context-aware evolution."""

    name: str = "unknown"
    language: str = "unknown"
    frameworks: list[str] = field(default_factory=list)
    has_git: bool = False
    last_evolved: str | None = None
    vibe_score: float = 0.0

ProjectContextDetector

Analyzes codebase to detect project context.

Source code in src/boring/loop/workflow_evolver.py
class ProjectContextDetector:
    """Analyzes codebase to detect project context."""

    def __init__(self, project_root: Path):
        self.root = project_root

    def detect(self) -> ProjectContext:
        """Run detection heuristics."""
        ctx = ProjectContext(name=self.root.name)

        # 1. Detection Logic
        if (self.root / ".git").exists():
            ctx.has_git = True

        if (self.root / "package.json").exists():
            ctx.language = "javascript"
            try:
                pj = json.loads((self.root / "package.json").read_text(encoding="utf-8"))
                deps = {**pj.get("dependencies", {}), **pj.get("devDependencies", {})}
                if "next" in deps:
                    ctx.frameworks.append("nextjs")
                if "react" in deps:
                    ctx.frameworks.append("react")
                if "typescript" in deps:
                    ctx.frameworks.append("typescript")
            except Exception:
                pass

        if (self.root / "pyproject.toml").exists() or (self.root / "requirements.txt").exists():
            ctx.language = "python"
            # Basic framework check
            text = ""
            if (self.root / "pyproject.toml").exists():
                text = (self.root / "pyproject.toml").read_text(encoding="utf-8")
            elif (self.root / "requirements.txt").exists():
                text = (self.root / "requirements.txt").read_text(encoding="utf-8")

            if "django" in text.lower():
                ctx.frameworks.append("django")
            if "flask" in text.lower():
                ctx.frameworks.append("flask")
            if "fastapi" in text.lower():
                ctx.frameworks.append("fastapi")

        return ctx

detect()

Run detection heuristics.

Source code in src/boring/loop/workflow_evolver.py
def detect(self) -> ProjectContext:
    """Run detection heuristics."""
    ctx = ProjectContext(name=self.root.name)

    # 1. Detection Logic
    if (self.root / ".git").exists():
        ctx.has_git = True

    if (self.root / "package.json").exists():
        ctx.language = "javascript"
        try:
            pj = json.loads((self.root / "package.json").read_text(encoding="utf-8"))
            deps = {**pj.get("dependencies", {}), **pj.get("devDependencies", {})}
            if "next" in deps:
                ctx.frameworks.append("nextjs")
            if "react" in deps:
                ctx.frameworks.append("react")
            if "typescript" in deps:
                ctx.frameworks.append("typescript")
        except Exception:
            pass

    if (self.root / "pyproject.toml").exists() or (self.root / "requirements.txt").exists():
        ctx.language = "python"
        # Basic framework check
        text = ""
        if (self.root / "pyproject.toml").exists():
            text = (self.root / "pyproject.toml").read_text(encoding="utf-8")
        elif (self.root / "requirements.txt").exists():
            text = (self.root / "requirements.txt").read_text(encoding="utf-8")

        if "django" in text.lower():
            ctx.frameworks.append("django")
        if "flask" in text.lower():
            ctx.frameworks.append("flask")
        if "fastapi" in text.lower():
            ctx.frameworks.append("fastapi")

    return ctx

WorkflowEvolver

Manages workflow evolution, history, and dreaming.

Source code in src/boring/loop/workflow_evolver.py
class WorkflowEvolver:
    """Manages workflow evolution, history, and dreaming."""

    def __init__(self, project_root: Path, log_dir: Path | None = None):
        self.root = project_root
        self.log_dir = log_dir or project_root / "logs"
        self.workflows_dir = self.root / ".agent" / "workflows"
        self.backup_dir = self.workflows_dir / "_base"
        self.history_file = self.workflows_dir / "_evolution_history.json"

        # Ensure directories exist
        self.workflows_dir.mkdir(parents=True, exist_ok=True)
        self.backup_dir.mkdir(parents=True, exist_ok=True)

    def evolve_workflow(self, name: str, content: str, reason: str) -> dict:
        """Modify an existing workflow with backup and history."""
        target = self.workflows_dir / f"{name}.md"
        if not target.exists():
            # Try finding it in our templates if missing (Future)
            return {"status": "ERROR", "error": f"Workflow {name} not found."}

        # Validate content (simple frontmatter check)
        if not content.strip().startswith("---"):
            return {"status": "ERROR", "error": "New content must have YAML frontmatter."}

        # Create Backup if first time
        backup = self.backup_dir / f"{name}.md"
        backup_created = False
        if not backup.exists():
            shutil.copy2(target, backup)
            backup_created = True

        old_content = target.read_text(encoding="utf-8")
        old_hash = hashlib.sha256(old_content.encode()).hexdigest()
        new_hash = hashlib.sha256(content.encode()).hexdigest()

        # Save new content
        target.write_text(content, encoding="utf-8")

        # Log History
        self._log_evolution(name, old_hash, new_hash, reason)

        return {
            "status": "SUCCESS",
            "workflow": name,
            "old_hash": old_hash[:8],
            "new_hash": new_hash[:8],
            "backup_created": backup_created,
        }

    def reset_workflow(self, name: str) -> dict:
        """Restore a workflow from its backup."""
        backup = self.backup_dir / f"{name}.md"
        target = self.workflows_dir / f"{name}.md"

        if not backup.exists():
            return {"status": "ERROR", "error": "No backup found to reset from."}

        shutil.copy2(backup, target)
        return {"status": "SUCCESS", "message": f"Workflow {name} reset to base."}

    def backup_all_workflows(self) -> dict:
        """Create backups for all md files in workflows dir."""
        results = {}
        for wf in self.workflows_dir.glob("*.md"):
            backup = self.backup_dir / wf.name
            if not backup.exists():
                shutil.copy2(wf, backup)
                results[wf.stem] = True
            else:
                results[wf.stem] = False
        return results

    def get_workflow_status(self, name: str) -> dict:
        """Check if a workflow is evolved and return hashes."""
        target = self.workflows_dir / f"{name}.md"
        backup = self.backup_dir / f"{name}.md"

        if not target.exists():
            return {"status": "ERROR", "error": "Not found"}

        current_hash = hashlib.sha256(target.read_text(encoding="utf-8").encode()).hexdigest()
        base_hash = None
        if backup.exists():
            base_hash = hashlib.sha256(backup.read_text(encoding="utf-8").encode()).hexdigest()

        return {
            "status": "SUCCESS",
            "name": name,
            "is_evolved": current_hash != base_hash if base_hash else False,
            "current_hash": current_hash[:8],
            "base_hash": base_hash[:8] if base_hash else None,
        }

    def dream_next_steps(self) -> str:
        """Sage Mode: Propose future roadmaps based on project state."""
        detector = ProjectContextDetector(self.root)
        ctx = detector.detect()

        # Simplified dreaming logic
        advice = []
        if ctx.language == "python":
            advice.append(
                "🐍 **Python Power**: Consider adding type hints or a Sphinx/MkDocs setup."
            )
        if "react" in ctx.frameworks:
            advice.append(
                "⚛️ **React Pulse**: I see React! Maybe add component tests with Jest/Vitest?"
            )

        if not ctx.has_git:
            advice.append(
                "📦 **Quick Tip**: No Git repo detected. Use `git init` for better version tracking."
            )

        if not advice:
            advice.append("🔮 **Future Vision**: Keep building! The project looks clean.")

        return "\n".join(advice)

    def learn_from_session(self):
        """Sage Mode: Archive and learn patterns."""
        # Integrates with boring learn in real implementation
        return "🧠 Knowledge extracted and saved to Brain."

    def _log_evolution(self, name: str, old_h: str, new_h: str, reason: str):
        history = []
        if self.history_file.exists():
            try:
                history = json.loads(self.history_file.read_text(encoding="utf-8"))
            except Exception:
                pass

        history.append(
            {
                "timestamp": datetime.now().isoformat(),
                "workflow": name,
                "old_hash": old_h,
                "new_hash": new_h,
                "reason": reason,
            }
        )

        self.history_file.write_text(json.dumps(history, indent=2), encoding="utf-8")

evolve_workflow(name, content, reason)

Modify an existing workflow with backup and history.

Source code in src/boring/loop/workflow_evolver.py
def evolve_workflow(self, name: str, content: str, reason: str) -> dict:
    """Modify an existing workflow with backup and history."""
    target = self.workflows_dir / f"{name}.md"
    if not target.exists():
        # Try finding it in our templates if missing (Future)
        return {"status": "ERROR", "error": f"Workflow {name} not found."}

    # Validate content (simple frontmatter check)
    if not content.strip().startswith("---"):
        return {"status": "ERROR", "error": "New content must have YAML frontmatter."}

    # Create Backup if first time
    backup = self.backup_dir / f"{name}.md"
    backup_created = False
    if not backup.exists():
        shutil.copy2(target, backup)
        backup_created = True

    old_content = target.read_text(encoding="utf-8")
    old_hash = hashlib.sha256(old_content.encode()).hexdigest()
    new_hash = hashlib.sha256(content.encode()).hexdigest()

    # Save new content
    target.write_text(content, encoding="utf-8")

    # Log History
    self._log_evolution(name, old_hash, new_hash, reason)

    return {
        "status": "SUCCESS",
        "workflow": name,
        "old_hash": old_hash[:8],
        "new_hash": new_hash[:8],
        "backup_created": backup_created,
    }

reset_workflow(name)

Restore a workflow from its backup.

Source code in src/boring/loop/workflow_evolver.py
def reset_workflow(self, name: str) -> dict:
    """Restore a workflow from its backup."""
    backup = self.backup_dir / f"{name}.md"
    target = self.workflows_dir / f"{name}.md"

    if not backup.exists():
        return {"status": "ERROR", "error": "No backup found to reset from."}

    shutil.copy2(backup, target)
    return {"status": "SUCCESS", "message": f"Workflow {name} reset to base."}

backup_all_workflows()

Create backups for all md files in workflows dir.

Source code in src/boring/loop/workflow_evolver.py
def backup_all_workflows(self) -> dict:
    """Create backups for all md files in workflows dir."""
    results = {}
    for wf in self.workflows_dir.glob("*.md"):
        backup = self.backup_dir / wf.name
        if not backup.exists():
            shutil.copy2(wf, backup)
            results[wf.stem] = True
        else:
            results[wf.stem] = False
    return results

get_workflow_status(name)

Check if a workflow is evolved and return hashes.

Source code in src/boring/loop/workflow_evolver.py
def get_workflow_status(self, name: str) -> dict:
    """Check if a workflow is evolved and return hashes."""
    target = self.workflows_dir / f"{name}.md"
    backup = self.backup_dir / f"{name}.md"

    if not target.exists():
        return {"status": "ERROR", "error": "Not found"}

    current_hash = hashlib.sha256(target.read_text(encoding="utf-8").encode()).hexdigest()
    base_hash = None
    if backup.exists():
        base_hash = hashlib.sha256(backup.read_text(encoding="utf-8").encode()).hexdigest()

    return {
        "status": "SUCCESS",
        "name": name,
        "is_evolved": current_hash != base_hash if base_hash else False,
        "current_hash": current_hash[:8],
        "base_hash": base_hash[:8] if base_hash else None,
    }

dream_next_steps()

Sage Mode: Propose future roadmaps based on project state.

Source code in src/boring/loop/workflow_evolver.py
def dream_next_steps(self) -> str:
    """Sage Mode: Propose future roadmaps based on project state."""
    detector = ProjectContextDetector(self.root)
    ctx = detector.detect()

    # Simplified dreaming logic
    advice = []
    if ctx.language == "python":
        advice.append(
            "🐍 **Python Power**: Consider adding type hints or a Sphinx/MkDocs setup."
        )
    if "react" in ctx.frameworks:
        advice.append(
            "⚛️ **React Pulse**: I see React! Maybe add component tests with Jest/Vitest?"
        )

    if not ctx.has_git:
        advice.append(
            "📦 **Quick Tip**: No Git repo detected. Use `git init` for better version tracking."
        )

    if not advice:
        advice.append("🔮 **Future Vision**: Keep building! The project looks clean.")

    return "\n".join(advice)

learn_from_session()

Sage Mode: Archive and learn patterns.

Source code in src/boring/loop/workflow_evolver.py
def learn_from_session(self):
    """Sage Mode: Archive and learn patterns."""
    # Integrates with boring learn in real implementation
    return "🧠 Knowledge extracted and saved to Brain."

WorkflowManager

Manages the lifecycle of Boring Workflows: - Discovery (Local) - Exporting (Packaging) - Installing (Importing)

Source code in src/boring/loop/workflow_manager.py
class WorkflowManager:
    """
    Manages the lifecycle of Boring Workflows:
    - Discovery (Local)
    - Exporting (Packaging)
    - Installing (Importing)
    """

    def __init__(self, project_root: Path = None):
        self.project_root = project_root or settings.PROJECT_ROOT
        self.workflows_dir = self.project_root / ".agent" / "workflows"
        self.base_dir = self.workflows_dir / "_base"

        # Ensure directories exist
        self.workflows_dir.mkdir(parents=True, exist_ok=True)
        self.base_dir.mkdir(parents=True, exist_ok=True)

    def _parse_frontmatter(self, content: str) -> dict[str, str]:
        """Robustly parse YAML frontmatter from markdown."""
        metadata = {}
        # Regex to find frontmatter block
        match = re.search(r"^---\s*\n(.*?)\n---\s*\n", content, re.DOTALL)
        if match:
            yaml_block = match.group(1)
            # Simple line-based YAML parser (dependency-free)
            for line in yaml_block.split("\n"):
                if ":" in line:
                    key, value = line.split(":", 1)
                    metadata[key.strip()] = value.strip()
        return metadata

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
    def _fetch_url(self, url: str) -> str:
        """Fetch URL content with retry logic."""
        log_status(self.project_root / "logs", "INFO", f"Downloading workflow from {url}...")
        with urllib.request.urlopen(url, timeout=10) as response:  # nosec B310  # URL is user-supplied and supported
            return response.read().decode("utf-8")

    def list_local_workflows(self) -> list[str]:
        """List all available .md workflows in the project."""
        if not self.workflows_dir.exists():
            return []
        return [f.stem for f in self.workflows_dir.glob("*.md")]

    def export_workflow(self, name: str, author: str = "user") -> tuple[Path | None, str]:
        """
        Package a local workflow into a .bwf.json file.

        Args:
            name: Workflow name (without .md or .json)
            author: Author name

        Returns:
            (Path to created file, Message)
        """
        source_file = self.workflows_dir / f"{name}.md"
        if not source_file.exists():
            return None, f"Workflow '{name}' not found."

        try:
            content = source_file.read_text(encoding="utf-8")

            # Use robust parser
            fm_data = self._parse_frontmatter(content)
            description = fm_data.get("description", f"Exported workflow: {name}")
            tags = []

            # Create package
            metadata = WorkflowMetadata(
                name=name, version="1.0.0", description=description, author=author, tags=tags
            )

            package = WorkflowPackage(metadata=metadata, content=content)

            # Write to file
            output_filename = f"{name}.bwf.json"
            output_path = self.project_root / output_filename
            output_path.write_text(package.to_json(), encoding="utf-8")

            return output_path, f"Successfully exported to {output_filename}"

        except Exception as e:
            return None, f"Export failed: {str(e)}"

    def install_workflow(self, source: str) -> tuple[bool, str]:
        """
        Install a workflow from a local file path or a URL.

        Args:
            source: File path (e.g., 'my-flow.bwf.json') or URL

        Returns:
            (Success boolean, Message)
        """
        try:
            pkg_json = ""

            # 1. Fetch content
            if source.startswith(("http://", "https://")):
                pkg_json = self._fetch_url(source)
            else:
                # Local file
                path = Path(source)
                if not path.is_absolute():
                    path = self.project_root / path

                if not path.exists():
                    return False, f"Source file not found: {source}"

                pkg_json = path.read_text(encoding="utf-8")

            # 2. Parse & Validate
            try:
                package = WorkflowPackage.from_json(pkg_json)
            except json.JSONDecodeError:
                return False, "Invalid JSON format."
            except Exception as e:
                return False, f"Invalid workflow package structure: {e}"

            # 3. Install
            target_name = package.metadata.name
            target_file = self.workflows_dir / f"{target_name}.md"

            # Backup existing if present
            if target_file.exists():
                backup_path = self.base_dir / f"{target_name}.md.bak"
                shutil.copy2(target_file, backup_path)
                log_status(
                    self.project_root / "logs",
                    "INFO",
                    f"Backed up existing workflow to {backup_path.name}",
                )

            # Write new content
            target_file.write_text(package.content, encoding="utf-8")

            return (
                True,
                f"Successfully installed workflow '{target_name}' (v{package.metadata.version}) by {package.metadata.author}",
            )

        except urllib.error.URLError as e:
            return False, f"Network error: {e}"
        except Exception as e:
            return False, f"Installation failed: {str(e)}"

    def publish_workflow(self, name: str, token: str, public: bool = True) -> tuple[bool, str]:
        """
        Publish a workflow to GitHub Gist (The "Serverless Registry").

        Args:
            name: Workflow name
            token: GitHub Personal Access Token
            public: Whether to make the Gist public

        Returns:
            (Success, Message/URL)
        """
        source_file = self.workflows_dir / f"{name}.md"
        if not source_file.exists():
            return False, f"Workflow '{name}' not found."

        try:
            # 1. Export locally first to get JSON content
            export_path, _ = self.export_workflow(name, "Anonymous")
            if not export_path:
                return False, "Failed to package workflow."

            content = export_path.read_text(encoding="utf-8")
            filename = f"{name}.bwf.json"

            # 2. Upload to GitHub Gist
            log_status(self.project_root / "logs", "INFO", f"Publishing '{name}' to GitHub Gist...")

            payload = {
                "description": f"Boring Workflow: {name}",
                "public": public,
                "files": {filename: {"content": content}},
            }

            req = urllib.request.Request(
                "https://api.github.com/gists",
                data=json.dumps(payload).encode("utf-8"),
                headers={
                    "Authorization": f"token {token}",
                    "Accept": "application/vnd.github.v3+json",
                    "User-Agent": "Boring-Agent",
                },
                method="POST",
            )

            with urllib.request.urlopen(req) as response:  # nosec B310  # URL is trusted GitHub Gist
                result = json.loads(response.read().decode("utf-8"))
                result.get("html_url")
                # Get raw url of the file
                raw_url = result["files"][filename]["raw_url"]

                # Cleanup temporary export file
                if export_path.exists():
                    export_path.unlink()

                return True, f"Scan this to install:\nboring workflow install {raw_url}"

        except urllib.error.HTTPError as e:
            if e.code == 401:
                return False, "Authentication failed. Check your GITHUB_TOKEN."
            return False, f"GitHub API Error: {e.code} {e.reason}"
        except Exception as e:
            return False, f"Publish failed: {str(e)}"

list_local_workflows()

List all available .md workflows in the project.

Source code in src/boring/loop/workflow_manager.py
def list_local_workflows(self) -> list[str]:
    """List all available .md workflows in the project."""
    if not self.workflows_dir.exists():
        return []
    return [f.stem for f in self.workflows_dir.glob("*.md")]

export_workflow(name, author='user')

Package a local workflow into a .bwf.json file.

Parameters:

Name Type Description Default
name str

Workflow name (without .md or .json)

required
author str

Author name

'user'

Returns:

Type Description
tuple[Path | None, str]

(Path to created file, Message)

Source code in src/boring/loop/workflow_manager.py
def export_workflow(self, name: str, author: str = "user") -> tuple[Path | None, str]:
    """
    Package a local workflow into a .bwf.json file.

    Args:
        name: Workflow name (without .md or .json)
        author: Author name

    Returns:
        (Path to created file, Message)
    """
    source_file = self.workflows_dir / f"{name}.md"
    if not source_file.exists():
        return None, f"Workflow '{name}' not found."

    try:
        content = source_file.read_text(encoding="utf-8")

        # Use robust parser
        fm_data = self._parse_frontmatter(content)
        description = fm_data.get("description", f"Exported workflow: {name}")
        tags = []

        # Create package
        metadata = WorkflowMetadata(
            name=name, version="1.0.0", description=description, author=author, tags=tags
        )

        package = WorkflowPackage(metadata=metadata, content=content)

        # Write to file
        output_filename = f"{name}.bwf.json"
        output_path = self.project_root / output_filename
        output_path.write_text(package.to_json(), encoding="utf-8")

        return output_path, f"Successfully exported to {output_filename}"

    except Exception as e:
        return None, f"Export failed: {str(e)}"

install_workflow(source)

Install a workflow from a local file path or a URL.

Parameters:

Name Type Description Default
source str

File path (e.g., 'my-flow.bwf.json') or URL

required

Returns:

Type Description
tuple[bool, str]

(Success boolean, Message)

Source code in src/boring/loop/workflow_manager.py
def install_workflow(self, source: str) -> tuple[bool, str]:
    """
    Install a workflow from a local file path or a URL.

    Args:
        source: File path (e.g., 'my-flow.bwf.json') or URL

    Returns:
        (Success boolean, Message)
    """
    try:
        pkg_json = ""

        # 1. Fetch content
        if source.startswith(("http://", "https://")):
            pkg_json = self._fetch_url(source)
        else:
            # Local file
            path = Path(source)
            if not path.is_absolute():
                path = self.project_root / path

            if not path.exists():
                return False, f"Source file not found: {source}"

            pkg_json = path.read_text(encoding="utf-8")

        # 2. Parse & Validate
        try:
            package = WorkflowPackage.from_json(pkg_json)
        except json.JSONDecodeError:
            return False, "Invalid JSON format."
        except Exception as e:
            return False, f"Invalid workflow package structure: {e}"

        # 3. Install
        target_name = package.metadata.name
        target_file = self.workflows_dir / f"{target_name}.md"

        # Backup existing if present
        if target_file.exists():
            backup_path = self.base_dir / f"{target_name}.md.bak"
            shutil.copy2(target_file, backup_path)
            log_status(
                self.project_root / "logs",
                "INFO",
                f"Backed up existing workflow to {backup_path.name}",
            )

        # Write new content
        target_file.write_text(package.content, encoding="utf-8")

        return (
            True,
            f"Successfully installed workflow '{target_name}' (v{package.metadata.version}) by {package.metadata.author}",
        )

    except urllib.error.URLError as e:
        return False, f"Network error: {e}"
    except Exception as e:
        return False, f"Installation failed: {str(e)}"

publish_workflow(name, token, public=True)

Publish a workflow to GitHub Gist (The "Serverless Registry").

Parameters:

Name Type Description Default
name str

Workflow name

required
token str

GitHub Personal Access Token

required
public bool

Whether to make the Gist public

True

Returns:

Type Description
tuple[bool, str]

(Success, Message/URL)

Source code in src/boring/loop/workflow_manager.py
def publish_workflow(self, name: str, token: str, public: bool = True) -> tuple[bool, str]:
    """
    Publish a workflow to GitHub Gist (The "Serverless Registry").

    Args:
        name: Workflow name
        token: GitHub Personal Access Token
        public: Whether to make the Gist public

    Returns:
        (Success, Message/URL)
    """
    source_file = self.workflows_dir / f"{name}.md"
    if not source_file.exists():
        return False, f"Workflow '{name}' not found."

    try:
        # 1. Export locally first to get JSON content
        export_path, _ = self.export_workflow(name, "Anonymous")
        if not export_path:
            return False, "Failed to package workflow."

        content = export_path.read_text(encoding="utf-8")
        filename = f"{name}.bwf.json"

        # 2. Upload to GitHub Gist
        log_status(self.project_root / "logs", "INFO", f"Publishing '{name}' to GitHub Gist...")

        payload = {
            "description": f"Boring Workflow: {name}",
            "public": public,
            "files": {filename: {"content": content}},
        }

        req = urllib.request.Request(
            "https://api.github.com/gists",
            data=json.dumps(payload).encode("utf-8"),
            headers={
                "Authorization": f"token {token}",
                "Accept": "application/vnd.github.v3+json",
                "User-Agent": "Boring-Agent",
            },
            method="POST",
        )

        with urllib.request.urlopen(req) as response:  # nosec B310  # URL is trusted GitHub Gist
            result = json.loads(response.read().decode("utf-8"))
            result.get("html_url")
            # Get raw url of the file
            raw_url = result["files"][filename]["raw_url"]

            # Cleanup temporary export file
            if export_path.exists():
                export_path.unlink()

            return True, f"Scan this to install:\nboring workflow install {raw_url}"

    except urllib.error.HTTPError as e:
        if e.code == 401:
            return False, "Authentication failed. Check your GITHUB_TOKEN."
        return False, f"GitHub API Error: {e.code} {e.reason}"
    except Exception as e:
        return False, f"Publish failed: {str(e)}"

WorkflowMetadata dataclass

Metadata for a workflow package.

Source code in src/boring/loop/workflow_manager.py
@dataclass
class WorkflowMetadata:
    """Metadata for a workflow package."""

    name: str
    version: str
    description: str
    author: str = "Anonymous"
    created_at: float = 0.0
    tags: list[str] = None

    def __post_init__(self):
        if self.tags is None:
            self.tags = []
        if self.created_at == 0.0:
            self.created_at = time.time()

WorkflowPackage dataclass

Represents a portable workflow package (.bwf.json). Include metadata and the actual markdown content.

Source code in src/boring/loop/workflow_manager.py
@dataclass
class WorkflowPackage:
    """
    Represents a portable workflow package (.bwf.json).
    Include metadata and the actual markdown content.
    """

    metadata: WorkflowMetadata
    content: str  # The markdown content
    config: dict[str, Any] | None = None  # Optional extra config

    def to_json(self) -> str:
        """Serialize to JSON string."""
        return json.dumps(asdict(self), indent=2, ensure_ascii=False)

    @classmethod
    def from_json(cls, json_str: str) -> "WorkflowPackage":
        """Deserialize from JSON string with validation."""
        try:
            data = json.loads(json_str)
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON: {e}")

        if not isinstance(data, dict):
            raise ValueError("Workflow package must be a JSON object")

        # Validate required fields
        required = ["metadata", "content"]
        missing = [f for f in required if f not in data]
        if missing:
            raise ValueError(f"Missing required fields: {', '.join(missing)}")

        meta_data = data.get("metadata", {})
        if not isinstance(meta_data, dict):
            raise ValueError("Metadata must be an object")

        # Filter valid fields for Metadata to avoid crashes on extra keys
        valid_keys = {"name", "version", "description", "author", "created_at", "tags"}
        filtered_meta = {k: v for k, v in meta_data.items() if k in valid_keys}

        # Ensure name exists (required by dataclass)
        if "name" not in filtered_meta:
            raise ValueError("Metadata missing 'name'")

        # Provide defaults for others if missing in filtered (dataclass has defaults for some)
        if "version" not in filtered_meta:
            filtered_meta["version"] = "0.0.0"
        if "description" not in filtered_meta:
            filtered_meta["description"] = "No description"

        metadata = WorkflowMetadata(**filtered_meta)
        return cls(metadata=metadata, content=data.get("content", ""), config=data.get("config"))

to_json()

Serialize to JSON string.

Source code in src/boring/loop/workflow_manager.py
def to_json(self) -> str:
    """Serialize to JSON string."""
    return json.dumps(asdict(self), indent=2, ensure_ascii=False)

from_json(json_str) classmethod

Deserialize from JSON string with validation.

Source code in src/boring/loop/workflow_manager.py
@classmethod
def from_json(cls, json_str: str) -> "WorkflowPackage":
    """Deserialize from JSON string with validation."""
    try:
        data = json.loads(json_str)
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid JSON: {e}")

    if not isinstance(data, dict):
        raise ValueError("Workflow package must be a JSON object")

    # Validate required fields
    required = ["metadata", "content"]
    missing = [f for f in required if f not in data]
    if missing:
        raise ValueError(f"Missing required fields: {', '.join(missing)}")

    meta_data = data.get("metadata", {})
    if not isinstance(meta_data, dict):
        raise ValueError("Metadata must be an object")

    # Filter valid fields for Metadata to avoid crashes on extra keys
    valid_keys = {"name", "version", "description", "author", "created_at", "tags"}
    filtered_meta = {k: v for k, v in meta_data.items() if k in valid_keys}

    # Ensure name exists (required by dataclass)
    if "name" not in filtered_meta:
        raise ValueError("Metadata missing 'name'")

    # Provide defaults for others if missing in filtered (dataclass has defaults for some)
    if "version" not in filtered_meta:
        filtered_meta["version"] = "0.0.0"
    if "description" not in filtered_meta:
        filtered_meta["description"] = "No description"

    metadata = WorkflowMetadata(**filtered_meta)
    return cls(metadata=metadata, content=data.get("content", ""), config=data.get("config"))

create_shadow_guard(project_root, mode='ENABLED', interactive=False)

Create a Shadow Mode guard with sensible defaults.

Parameters:

Name Type Description Default
project_root Path

Project root directory

required
mode str

"DISABLED", "ENABLED", or "STRICT"

'ENABLED'
interactive bool

If True, use console UI for approval

False

Returns:

Type Description
ShadowModeGuard

Configured ShadowModeGuard

Source code in src/boring/loop/shadow_mode.py
def create_shadow_guard(
    project_root: Path, mode: str = "ENABLED", interactive: bool = False
) -> ShadowModeGuard:
    """
    Create a Shadow Mode guard with sensible defaults.

    Args:
        project_root: Project root directory
        mode: "DISABLED", "ENABLED", or "STRICT"
        interactive: If True, use console UI for approval

    Returns:
        Configured ShadowModeGuard
    """
    # Parse mode
    try:
        level = ShadowModeLevel[mode.upper()]
    except KeyError:
        level = ShadowModeLevel.ENABLED

    # Set callback if interactive
    callback = interactive_approval_ui if interactive else None

    return ShadowModeGuard(project_root=project_root, mode=level, approval_callback=callback)