跳轉至

RAG 與記憶 API 參考

boring.rag

RAG (Retrieval-Augmented Generation) System for Boring V10.24

Components: - CodeIndexer: AST-based Python code chunking - DependencyGraph: Function/class call graph - RAGRetriever: Hybrid search (vector + graph) - HyDEExpander: Query expansion with hypothetical documents (V10.24 NEW) - CrossEncoderReranker: High-precision reranking (V10.24 NEW) - EnsembleReranker: Multi-signal reranking (V10.24 NEW)

V10.24 Key Enhancements: - HyDE: Generate hypothetical code for better semantic matching (+15-20% accuracy) - Cross-Encoder Reranking: Fine-grained relevance scoring (+10-15% precision) - Ensemble Reranking: Combine semantic, keyword, structure, and usage signals

Usage

from boring.rag import RAGRetriever, create_rag_retriever from boring.rag import HyDEExpander, CrossEncoderReranker

retriever = create_rag_retriever(project_root) retriever.build_index()

Basic retrieval

results = retriever.retrieve("authentication error handling")

With HyDE expansion

hyde = HyDEExpander() expanded = hyde.expand_query("how to handle login errors") results = retriever.retrieve(expanded.hypothetical_document)

With cross-encoder reranking

reranker = CrossEncoderReranker() reranked = reranker.rerank(query, [r.chunk.content for r in results], [r.score for r in results])

CodeChunk dataclass

A semantic chunk of code for embedding.

Source code in src/boring/rag/code_indexer.py
@dataclass
class CodeChunk:
    """A semantic chunk of code for embedding."""

    chunk_id: str
    file_path: str
    chunk_type: str  # "function", "class", "imports", "module_doc", "interface", "type_alias"
    name: str
    content: str
    start_line: int
    end_line: int
    dependencies: list[str] = field(default_factory=list)  # Functions/classes this chunk calls
    parent: str | None = None  # Parent class if method
    receiver: str | None = None  # Go method receiver type (V11.0)
    signature: str | None = None  # Function/class signature for quick reference
    docstring: str | None = None

CodeIndexer

Parse Python files and extract semantic chunks.

Features: - AST-based parsing for accurate structure extraction - Dependency tracking (what each function calls) - Configurable chunk size limits

Source code in src/boring/rag/code_indexer.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
class CodeIndexer:
    """
    Parse Python files and extract semantic chunks.

    Features:
    - AST-based parsing for accurate structure extraction
    - Dependency tracking (what each function calls)
    - Configurable chunk size limits
    """

    SUPPORTED_EXTENSIONS: set[str] = {
        ".py",
        ".js",
        ".jsx",
        ".ts",
        ".tsx",
        ".go",
        ".rs",
        ".c",
        ".cpp",
        ".h",
        ".hpp",
        ".java",
        ".md",
    }

    IGNORED_DIRS: set[str] = {
        ".git",
        "__pycache__",
        "node_modules",
        ".venv",
        "venv",
        "htmlcov",
        ".pytest_cache",
        ".mypy_cache",
        ".ruff_cache",
        "dist",
        "build",
        "*.egg-info",
        ".boring_memory",
    }

    IGNORED_DIRS: set[str] = {
        ".git",
        "__pycache__",
        "node_modules",
        ".venv",
        "venv",
        "htmlcov",
        ".pytest_cache",
        ".mypy_cache",
        ".ruff_cache",
        "dist",
        "build",
        "*.egg-info",
        ".boring_memory",
    }

    def __init__(
        self, project_root: Path, max_chunk_tokens: int = 500, include_init_files: bool = False
    ):
        self.project_root = Path(project_root)
        self.max_chunk_tokens = max_chunk_tokens
        self.include_init_files = include_init_files
        self.stats = IndexStats()

    def get_changed_files(self, since_commit: str) -> list[Path]:
        """
        Identify files changed between since_commit and HEAD.

        Args:
            since_commit: Git commit hash to compare against HEAD.

        Returns:
            List of absolute paths to changed files.
        """
        try:
            import subprocess

            # Check if this is a git repo
            if not (self.project_root / ".git").exists():
                return self.collect_files()

            # Run git diff
            cmd = ["git", "diff", "--name-only", "--diff-filter=ACMRT", since_commit, "HEAD"]
            result = subprocess.run(
                cmd, cwd=self.project_root, capture_output=True, text=True, check=True
            )

            changed_paths = []
            for line in result.stdout.splitlines():
                if not line.strip():
                    continue
                full_path = self.project_root / line.strip()
                if full_path.exists() and not self._should_skip_path(full_path):
                    changed_paths.append(full_path)

            return changed_paths
        except Exception as e:
            logger.warning(f"Git diff failed, falling back to full scan: {e}")
            return self.collect_files()

    def collect_files(self) -> list[Path]:
        """
        Collect all files that should be indexed.

        Returns:
            List of Path objects for all indexable files in the project.
        """
        files = []
        for root, dirs, filenames in os.walk(self.project_root):
            # Skip hidden directories and those in ignore list
            dirs[:] = [d for d in dirs if not self._should_skip_dir(d)]

            for filename in filenames:
                file_path = Path(root) / filename
                if self._should_skip_path(file_path):
                    continue
                files.append(file_path)
        return files

    def index_project(self, files_to_index: list[Path] | None = None) -> Iterator[CodeChunk]:
        """
        Index files in the project.

        Args:
            files_to_index: Optional list of files to process. If None, scans all.

        Yields:
            CodeChunk objects for each semantic unit
        """
        self.stats = IndexStats()

        target_files = files_to_index if files_to_index is not None else self.collect_files()

        for file_path in target_files:
            try:
                for chunk in self.index_file(file_path):
                    self.stats.total_chunks += 1
                    yield chunk
            except Exception as e:
                logger.warning(f"Failed to index {file_path}: {e}")
                self.stats.skipped_files += 1

    def index_file(self, file_path: Path) -> Iterator[CodeChunk]:
        """
        Extract chunks from a file (AST for Python, line-based for others).
        """
        if file_path.suffix.lower() == ".py":
            yield from self._index_python_file(file_path)
        else:
            yield from self._index_universal_file(file_path)

    def _should_skip_dir(self, dir_name: str) -> bool:
        """Helper to check if a directory should be skipped during walk."""
        return dir_name in self.IGNORED_DIRS or any(
            dir_name.endswith(ex[1:]) for ex in self.IGNORED_DIRS if ex.startswith("*")
        )

    def _get_rel_path(self, file_path: Path) -> str:
        """Get relative path from project root."""
        try:
            rel_path = str(file_path.relative_to(self.project_root))
            return rel_path.replace("\\", "/")
        except ValueError:
            return str(file_path).replace("\\", "/")

    def _index_universal_file(self, file_path: Path) -> Iterator[CodeChunk]:
        """
        Smart chunking for non-Python files using Tree-sitter or regex fallback.
        Supports C-style languages (JS, TS, Java, C++, Go, Rust) and Markdown.

        V11.0: Enhanced to capture interface, type_alias, namespace, and Go method receivers.
        """
        import re

        try:
            from .parser import TreeSitterParser

            ts_parser = TreeSitterParser()
        except ImportError:
            ts_parser = None

        try:
            content = file_path.read_text(encoding="utf-8")
        except Exception as e:
            logger.debug(f"Error reading {file_path}: {e}")
            return

        rel_path = self._get_rel_path(file_path)

        # 1. Try Tree-sitter Parsing (V11.0 Enhanced)
        if ts_parser and ts_parser.is_available():
            ts_chunks = ts_parser.parse_file(file_path)
            if ts_chunks:
                for chunk in ts_chunks:
                    # V11.0: Map parser chunk types to indexer chunk types
                    chunk_type_map = {
                        "function": "code_function",
                        "class": "code_class",
                        "method": "code_method",
                        "interface": "code_interface",
                        "type_alias": "code_type_alias",
                        "namespace": "code_namespace",
                    }
                    mapped_type = chunk_type_map.get(chunk.type, f"code_{chunk.type}")

                    yield CodeChunk(
                        chunk_id=self._make_id(rel_path, f"{chunk.type}_{chunk.name}"),
                        file_path=rel_path,
                        chunk_type=mapped_type,
                        name=chunk.name,
                        content=chunk.content,
                        start_line=chunk.start_line,
                        end_line=chunk.end_line,
                        receiver=getattr(chunk, "receiver", None),  # V11.0: Go method receiver
                    )
                # If we got chunks, assume we handled the file well enough (for now).
                # Optionally we could index the gaps as well, but definitions are key.
                return

        # 2. Fallback to Smart Regex Chunking (if Tree-sitter unavailable or unsupported language)
        lines = content.splitlines()

        # Regex patterns for common block starts
        # C/C++/Java/JS/TS/Go/Rust function/class definitions
        block_start = re.compile(
            r"^\s*(?:export\s+)?(?:public\s+|private\s+|protected\s+)?(?:async\s+)?(?:func|function|class|interface|struct|impl|const|let|var|type|def)\s+([a-zA-Z0-9_]+)"
        )
        # Markdown headers
        md_header = re.compile(r"^#{1,3}\s+(.+)")

        current_chunk_lines = []
        current_start_line = 1
        current_name = file_path.name

        for i, line in enumerate(lines):
            line_num = i + 1

            # Check for new block start if current chunk is getting big enough
            # or if we are just starting
            is_start = block_start.match(line) or md_header.match(line)

            # Decide to yield current chunk
            # 1. New block detected AND current chunk is substantial (>5 lines)
            # 2. Current chunk is too big (>50 lines)
            if (is_start and len(current_chunk_lines) > 5) or len(current_chunk_lines) >= 50:
                if current_chunk_lines:
                    # Yield previous chunk
                    chunk_content = "\n".join(current_chunk_lines)
                    yield CodeChunk(
                        chunk_id=self._make_id(rel_path, f"chunk_{current_start_line}"),
                        file_path=rel_path,
                        chunk_type="code_block",
                        name=current_name,
                        content=chunk_content,
                        start_line=current_start_line,
                        end_line=line_num - 1,
                    )
                    current_chunk_lines = []
                    current_start_line = line_num

                    if is_start:
                        current_name = is_start.group(1)

            current_chunk_lines.append(line)

            # If we matched a block start, update name for the *current* accumulating chunk
            if is_start and len(current_chunk_lines) == 1:
                current_name = is_start.group(1)

        # Yield remaining
        if current_chunk_lines:
            yield CodeChunk(
                chunk_id=self._make_id(rel_path, f"chunk_{current_start_line}"),
                file_path=rel_path,
                chunk_type="code_block",
                name=current_name,
                content="\n".join(current_chunk_lines),
                start_line=current_start_line,
                end_line=len(lines),
            )

    def _index_python_file(self, file_path: Path) -> Iterator[CodeChunk]:
        """Extract chunks from a single Python file using AST."""
        try:
            content = file_path.read_text(encoding="utf-8")
            tree = ast.parse(content)
        except (SyntaxError, UnicodeDecodeError) as e:
            logger.debug(f"Error parsing {file_path}: {e}")
            return

        rel_path = self._get_rel_path(file_path)
        lines = content.splitlines()

        # 1. Module docstring
        module_doc = ast.get_docstring(tree)
        if module_doc:
            yield CodeChunk(
                chunk_id=self._make_id(rel_path, "module_doc"),
                file_path=rel_path,
                chunk_type="module_doc",
                name=file_path.stem,
                content=module_doc,
                start_line=1,
                end_line=self._get_docstring_end_line(tree),
                docstring=module_doc,
            )

        # 2. Top-level imports (as a single chunk)
        imports = self._extract_imports(tree, lines)
        if imports:
            yield CodeChunk(
                chunk_id=self._make_id(rel_path, "imports"),
                file_path=rel_path,
                chunk_type="imports",
                name="imports",
                content=imports["content"],
                start_line=imports["start"],
                end_line=imports["end"],
                dependencies=imports["modules"],
            )

        # 3. Top-level functions and classes
        covered_lines = set()
        if module_doc:
            covered_lines.update(range(1, self._get_docstring_end_line(tree) + 1))
        if imports:
            covered_lines.update(range(imports["start"], imports["end"] + 1))

        for node in ast.iter_child_nodes(tree):
            if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
                self.stats.functions += 1
                chunk = self._chunk_from_function(node, rel_path, lines)
                covered_lines.update(range(chunk.start_line, chunk.end_line + 1))
                yield chunk

            elif isinstance(node, ast.ClassDef):
                self.stats.classes += 1
                # Yield class header
                chunk = self._chunk_from_class(node, rel_path, lines)
                # Note: header covered lines
                covered_lines.update(range(chunk.start_line, chunk.end_line + 1))
                yield chunk

                # Yield methods separately
                for method in node.body:
                    if isinstance(method, (ast.FunctionDef, ast.AsyncFunctionDef)):
                        self.stats.methods += 1
                        m_chunk = self._chunk_from_function(
                            method, rel_path, lines, parent=node.name
                        )
                        covered_lines.update(range(m_chunk.start_line, m_chunk.end_line + 1))
                        yield m_chunk

        # 4. Fallback: Capture remaining top-level code as "script" chunks
        script_code_chunks = self._extract_script_chunks(tree, lines, covered_lines, rel_path)
        for chunk in script_code_chunks:
            self.stats.script_chunks += 1
            yield chunk

    def _chunk_from_function(
        self, node: ast.FunctionDef, file_path: str, lines: list[str], parent: str | None = None
    ) -> CodeChunk:
        """Create chunk from function definition."""
        start = node.lineno - 1
        end = node.end_lineno or (start + 1)
        content = "\n".join(lines[start:end])

        # Extract function signature
        sig_end = start
        for i, line in enumerate(lines[start:end]):
            if ":" in line and not line.strip().startswith("#"):
                sig_end = start + i
                break
        signature = "\n".join(lines[start : sig_end + 1])

        # Extract docstring
        docstring = ast.get_docstring(node)

        # Extract function calls (dependencies)
        deps = self._extract_dependencies(node)

        # Build qualified name
        name = f"{parent}.{node.name}" if parent else node.name

        return CodeChunk(
            chunk_id=self._make_id(file_path, name),
            file_path=file_path,
            chunk_type="method" if parent else "function",
            name=node.name,
            content=content,
            start_line=node.lineno,
            end_line=end,
            dependencies=deps,
            parent=parent,
            signature=signature.strip(),
            docstring=docstring,
        )

    def _chunk_from_class(self, node: ast.ClassDef, file_path: str, lines: list[str]) -> CodeChunk:
        """
        Create chunk from class definition (header + docstring only).
        Methods are extracted separately.
        """
        start = node.lineno - 1

        # Find where the class header ends (before first method)
        class_header_end = start + 1
        for child in node.body:
            if isinstance(child, ast.Expr) and isinstance(child.value, ast.Constant):
                # Docstring
                class_header_end = child.end_lineno or class_header_end
            elif isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef)):
                # First method - stop before it
                break
            elif isinstance(child, ast.Assign):
                # Class variable
                class_header_end = child.end_lineno or class_header_end

        content = "\n".join(lines[start:class_header_end])

        # Extract base classes
        bases = []
        for base in node.bases:
            if isinstance(base, ast.Name):
                bases.append(base.id)
            elif isinstance(base, ast.Attribute):
                bases.append(base.attr)

        return CodeChunk(
            chunk_id=self._make_id(file_path, node.name),
            file_path=file_path,
            chunk_type="class",
            name=node.name,
            content=content,
            start_line=node.lineno,
            end_line=class_header_end,
            dependencies=bases,  # Base classes as dependencies
            docstring=ast.get_docstring(node),
        )

    def _extract_dependencies(self, node: ast.AST) -> list[str]:
        """Extract all function/method calls within a node."""
        deps: set[str] = set()

        for child in ast.walk(node):
            if isinstance(child, ast.Call):
                # Direct function call: func()
                if isinstance(child.func, ast.Name):
                    deps.add(child.func.id)
                # Method call: obj.method()
                elif isinstance(child.func, ast.Attribute):
                    deps.add(child.func.attr)

        # Filter out builtins and common functions
        builtins = {
            "print",
            "len",
            "str",
            "int",
            "float",
            "list",
            "dict",
            "set",
            "tuple",
            "range",
            "enumerate",
            "zip",
            "map",
            "filter",
            "open",
            "isinstance",
            "issubclass",
            "hasattr",
            "getattr",
            "setattr",
        }

        return sorted(deps - builtins)

    def _extract_imports(self, tree: ast.Module, lines: list[str]) -> dict | None:
        """Extract import statements from module."""
        import_nodes = []
        modules = []

        for node in ast.iter_child_nodes(tree):
            if isinstance(node, ast.Import):
                import_nodes.append(node)
                for alias in node.names:
                    modules.append(alias.name.split(".")[0])
            elif isinstance(node, ast.ImportFrom):
                import_nodes.append(node)
                if node.module:
                    modules.append(node.module.split(".")[0])

        if not import_nodes:
            return None

        start = min(n.lineno for n in import_nodes)
        end = max(n.end_lineno or n.lineno for n in import_nodes)

        return {
            "content": "\n".join(lines[start - 1 : end]),
            "start": start,
            "end": end,
            "modules": sorted(set(modules)),
        }

    def _extract_script_chunks(
        self, tree: ast.Module, lines: list[str], covered_lines: set[int], rel_path: str
    ) -> list[CodeChunk]:
        """Extract remaining top-level code as script chunks."""
        script_chunks = []

        # Collect all line ranges for non-indexed top-level nodes
        nodes_to_index = []
        for node in ast.iter_child_nodes(tree):
            if isinstance(
                node,
                (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Import, ast.ImportFrom),
            ):
                continue
            if isinstance(node, ast.Expr) and isinstance(node.value, ast.Constant):
                continue

            node_start = node.lineno
            node_end = node.end_lineno or node_start

            # Ensure it's not a node that was partially covered (rare but safe)
            if any(line_num in covered_lines for line_num in range(node_start, node_end + 1)):
                continue

            nodes_to_index.append((node_start, node_end))

        if not nodes_to_index:
            return []

        # Sort by start line
        nodes_to_index.sort()

        current_chunk_start = nodes_to_index[0][0]
        current_chunk_end = nodes_to_index[0][1]

        for i in range(1, len(nodes_to_index)):
            n_start, n_end = nodes_to_index[i]

            # If the gap between nodes contains any covered lines, we must split
            has_gap_covered = any(
                line_num in covered_lines for line_num in range(current_chunk_end + 1, n_start)
            )

            if not has_gap_covered and n_start <= current_chunk_end + 5:  # Small gap allowed
                current_chunk_end = n_end
            else:
                # Split
                script_chunks.append(
                    self._create_script_chunk(
                        current_chunk_start, current_chunk_end, rel_path, lines
                    )
                )
                current_chunk_start = n_start
                current_chunk_end = n_end

        # Last one
        script_chunks.append(
            self._create_script_chunk(current_chunk_start, current_chunk_end, rel_path, lines)
        )

        return script_chunks

    def _create_script_chunk(
        self, start: int, end: int, rel_path: str, lines: list[str]
    ) -> CodeChunk:
        """Helper to create a script chunk."""
        content = "\n".join(lines[start - 1 : end])
        return CodeChunk(
            chunk_id=self._make_id(rel_path, f"script_{start}"),
            file_path=rel_path,
            chunk_type="script",
            name=f"script_L{start}",
            content=content,
            start_line=start,
            end_line=end,
            dependencies=[],  # Could extract deps here too if needed
        )

    def _get_docstring_end_line(self, tree: ast.Module) -> int:
        """Get the ending line of module docstring."""
        if tree.body and isinstance(tree.body[0], ast.Expr):
            if isinstance(tree.body[0].value, ast.Constant):
                return tree.body[0].end_lineno or 1
        return 1

    def _should_skip_path(self, path: Path) -> bool:
        """Check if path should be skipped."""
        parts = path.parts
        for ignored in self.IGNORED_DIRS:
            if ignored.startswith("*"):
                # Glob pattern like *.egg-info
                if any(p.endswith(ignored[1:]) for p in parts):
                    return True
            elif ignored in parts:
                return True
        return False

    def _make_id(self, file_path: str, name: str) -> str:
        """Generate unique chunk ID."""
        raw = f"{file_path}::{name}"
        # Use sha256 for ID generation (non-security) to satisfy bandit
        return hashlib.sha256(raw.encode()).hexdigest()[:12]

    def get_stats(self) -> IndexStats:
        """Return indexing statistics."""
        return self.stats

get_changed_files(since_commit)

Identify files changed between since_commit and HEAD.

Parameters:

Name Type Description Default
since_commit str

Git commit hash to compare against HEAD.

required

Returns:

Type Description
list[Path]

List of absolute paths to changed files.

Source code in src/boring/rag/code_indexer.py
def get_changed_files(self, since_commit: str) -> list[Path]:
    """
    Identify files changed between since_commit and HEAD.

    Args:
        since_commit: Git commit hash to compare against HEAD.

    Returns:
        List of absolute paths to changed files.
    """
    try:
        import subprocess

        # Check if this is a git repo
        if not (self.project_root / ".git").exists():
            return self.collect_files()

        # Run git diff
        cmd = ["git", "diff", "--name-only", "--diff-filter=ACMRT", since_commit, "HEAD"]
        result = subprocess.run(
            cmd, cwd=self.project_root, capture_output=True, text=True, check=True
        )

        changed_paths = []
        for line in result.stdout.splitlines():
            if not line.strip():
                continue
            full_path = self.project_root / line.strip()
            if full_path.exists() and not self._should_skip_path(full_path):
                changed_paths.append(full_path)

        return changed_paths
    except Exception as e:
        logger.warning(f"Git diff failed, falling back to full scan: {e}")
        return self.collect_files()

collect_files()

Collect all files that should be indexed.

Returns:

Type Description
list[Path]

List of Path objects for all indexable files in the project.

Source code in src/boring/rag/code_indexer.py
def collect_files(self) -> list[Path]:
    """
    Collect all files that should be indexed.

    Returns:
        List of Path objects for all indexable files in the project.
    """
    files = []
    for root, dirs, filenames in os.walk(self.project_root):
        # Skip hidden directories and those in ignore list
        dirs[:] = [d for d in dirs if not self._should_skip_dir(d)]

        for filename in filenames:
            file_path = Path(root) / filename
            if self._should_skip_path(file_path):
                continue
            files.append(file_path)
    return files

index_project(files_to_index=None)

Index files in the project.

Parameters:

Name Type Description Default
files_to_index list[Path] | None

Optional list of files to process. If None, scans all.

None

Yields:

Type Description
CodeChunk

CodeChunk objects for each semantic unit

Source code in src/boring/rag/code_indexer.py
def index_project(self, files_to_index: list[Path] | None = None) -> Iterator[CodeChunk]:
    """
    Index files in the project.

    Args:
        files_to_index: Optional list of files to process. If None, scans all.

    Yields:
        CodeChunk objects for each semantic unit
    """
    self.stats = IndexStats()

    target_files = files_to_index if files_to_index is not None else self.collect_files()

    for file_path in target_files:
        try:
            for chunk in self.index_file(file_path):
                self.stats.total_chunks += 1
                yield chunk
        except Exception as e:
            logger.warning(f"Failed to index {file_path}: {e}")
            self.stats.skipped_files += 1

index_file(file_path)

Extract chunks from a file (AST for Python, line-based for others).

Source code in src/boring/rag/code_indexer.py
def index_file(self, file_path: Path) -> Iterator[CodeChunk]:
    """
    Extract chunks from a file (AST for Python, line-based for others).
    """
    if file_path.suffix.lower() == ".py":
        yield from self._index_python_file(file_path)
    else:
        yield from self._index_universal_file(file_path)

get_stats()

Return indexing statistics.

Source code in src/boring/rag/code_indexer.py
def get_stats(self) -> IndexStats:
    """Return indexing statistics."""
    return self.stats

IndexStats dataclass

Statistics about the indexed codebase.

Source code in src/boring/rag/code_indexer.py
@dataclass
class IndexStats:
    """Statistics about the indexed codebase."""

    total_files: int = 0
    total_chunks: int = 0
    functions: int = 0
    classes: int = 0
    methods: int = 0
    script_chunks: int = 0
    skipped_files: int = 0

DependencyGraph

Bidirectional dependency graph for code chunks.

Edges: - callers[A] = {B, C} means B and C call A - callees[A] = {X, Y} means A calls X and Y

Usage

graph = DependencyGraph(chunks) callers = graph.get_callers(chunk_id) # Who calls this? impact = graph.get_impact_zone(chunk_id) # What might break?

Source code in src/boring/rag/graph_builder.py
class DependencyGraph:
    """
    Bidirectional dependency graph for code chunks.

    Edges:
    - callers[A] = {B, C} means B and C call A
    - callees[A] = {X, Y} means A calls X and Y

    Usage:
        graph = DependencyGraph(chunks)
        callers = graph.get_callers(chunk_id)  # Who calls this?
        impact = graph.get_impact_zone(chunk_id)  # What might break?
    """

    def __init__(self, chunks: list[CodeChunk] | None = None):
        self._chunks: dict[str, CodeChunk] = {}
        self._name_to_ids: dict[str, set[str]] = defaultdict(set)  # name -> chunk_ids

        # Adjacency lists
        self.callers: dict[str, set[str]] = defaultdict(set)  # who calls this
        self.callees: dict[str, set[str]] = defaultdict(set)  # this calls who

        if chunks:
            self._build(chunks)

    def _build(self, chunks: list[CodeChunk]) -> None:
        """Build the graph from chunks."""
        # First pass: register all chunks by name
        for chunk in chunks:
            self._chunks[chunk.chunk_id] = chunk

            # Register by simple name
            self._name_to_ids[chunk.name].add(chunk.chunk_id)

            # Also register by qualified name (ClassName.method)
            if chunk.parent:
                qualified = f"{chunk.parent}.{chunk.name}"
                self._name_to_ids[qualified].add(chunk.chunk_id)

        # Second pass: build edges based on dependencies
        for chunk in chunks:
            for dep_name in chunk.dependencies:
                # Find all chunks matching this dependency name
                dep_ids = self._name_to_ids.get(dep_name, set())

                for dep_id in dep_ids:
                    # chunk calls dep
                    self.callees[chunk.chunk_id].add(dep_id)
                    self.callers[dep_id].add(chunk.chunk_id)

    def add_chunk(self, chunk: CodeChunk) -> None:
        """Add a single chunk to the graph."""
        self._chunks[chunk.chunk_id] = chunk
        self._name_to_ids[chunk.name].add(chunk.chunk_id)

        if chunk.parent:
            qualified = f"{chunk.parent}.{chunk.name}"
            self._name_to_ids[qualified].add(chunk.chunk_id)

        # Build edges for this chunk
        for dep_name in chunk.dependencies:
            dep_ids = self._name_to_ids.get(dep_name, set())
            for dep_id in dep_ids:
                self.callees[chunk.chunk_id].add(dep_id)
                self.callers[dep_id].add(chunk.chunk_id)

    def get_chunk(self, chunk_id: str) -> CodeChunk | None:
        """Get chunk by ID."""
        return self._chunks.get(chunk_id)

    def get_chunks_by_name(self, name: str) -> list[CodeChunk]:
        """Get all chunks matching a name."""
        ids = self._name_to_ids.get(name, set())
        return [self._chunks[cid] for cid in ids if cid in self._chunks]

    def get_callers(self, chunk_id: str) -> list[CodeChunk]:
        """
        Get all chunks that call this chunk.

        Use case: "Who depends on this function?"
        """
        caller_ids = self.callers.get(chunk_id, set())
        return [self._chunks[cid] for cid in caller_ids if cid in self._chunks]

    def get_callees(self, chunk_id: str) -> list[CodeChunk]:
        """
        Get all chunks that this chunk calls.

        Use case: "What does this function depend on?"
        """
        callee_ids = self.callees.get(chunk_id, set())
        return [self._chunks[cid] for cid in callee_ids if cid in self._chunks]

    def get_related_chunks(
        self, seed_chunks: list[CodeChunk], depth: int = 1, direction: str = "both"
    ) -> list[CodeChunk]:
        """
        Get related chunks via BFS on the dependency graph.

        Args:
            seed_chunks: Starting chunks
            depth: How many hops to traverse (default 1 per user decision)
            direction: "callers", "callees", or "both"

        Returns:
            Related chunks (excluding seeds)
        """
        visited: set[str] = {c.chunk_id for c in seed_chunks}
        frontier: set[str] = {c.chunk_id for c in seed_chunks}
        related: list[CodeChunk] = []

        for _ in range(depth):
            next_frontier: set[str] = set()

            for chunk_id in frontier:
                neighbors: set[str] = set()

                if direction in ("callers", "both"):
                    neighbors.update(self.callers.get(chunk_id, set()))

                if direction in ("callees", "both"):
                    neighbors.update(self.callees.get(chunk_id, set()))

                for neighbor_id in neighbors:
                    if neighbor_id not in visited:
                        visited.add(neighbor_id)
                        next_frontier.add(neighbor_id)
                        if neighbor_id in self._chunks:
                            related.append(self._chunks[neighbor_id])

            frontier = next_frontier

            if not frontier:
                break

        return related

    def get_impact_zone(self, modified_chunk_id: str, depth: int = 1) -> list[CodeChunk]:
        """
        Get the "impact zone" - all chunks that might break if this one changes.

        This returns all CALLERS (things that depend on the modified chunk).
        If you change a function, its callers might break.

        Args:
            modified_chunk_id: The chunk being modified
            depth: How many levels of callers to include (default 1)

        Returns:
            List of chunks that might be affected
        """
        if modified_chunk_id not in self._chunks:
            return []

        seed = [self._chunks[modified_chunk_id]]
        return self.get_related_chunks(seed, depth=depth, direction="callers")

    def get_context_for_modification(self, modified_chunk_id: str) -> dict[str, list[CodeChunk]]:
        """
        Get comprehensive context for modifying a chunk.

        Returns dict with:
        - callers: Who calls this (might break)
        - callees: What this calls (need to understand interface)
        - siblings: Other methods in same class
        """
        result = {
            "callers": self.get_callers(modified_chunk_id),
            "callees": self.get_callees(modified_chunk_id),
            "siblings": [],
        }

        chunk = self._chunks.get(modified_chunk_id)
        if chunk and chunk.parent:
            # Find other methods in the same class
            for cid, c in self._chunks.items():
                if c.parent == chunk.parent and cid != modified_chunk_id:
                    result["siblings"].append(c)

        return result

    def get_stats(self) -> GraphStats:
        """Get graph statistics."""
        total_edges = sum(len(callees) for callees in self.callees.values())
        total_nodes = len(self._chunks)

        max_callers = max((len(c) for c in self.callers.values()), default=0)
        max_callees = max((len(c) for c in self.callees.values()), default=0)

        return GraphStats(
            total_nodes=total_nodes,
            total_edges=total_edges,
            avg_connections=total_edges / total_nodes if total_nodes > 0 else 0,
            max_callers=max_callers,
            max_callees=max_callees,
        )

    def find_path(self, from_id: str, to_id: str, max_depth: int = 5) -> list[str] | None:
        """
        Find shortest path between two chunks (if exists).

        Useful for understanding how two distant pieces of code are connected.
        """
        if from_id not in self._chunks or to_id not in self._chunks:
            return None

        if from_id == to_id:
            return [from_id]

        # BFS
        visited = {from_id}
        queue = [(from_id, [from_id])]

        for _ in range(max_depth):
            if not queue:
                break

            next_queue = []
            for current, path in queue:
                # Check all neighbors (both callers and callees)
                neighbors = self.callers.get(current, set()) | self.callees.get(current, set())

                for neighbor in neighbors:
                    if neighbor == to_id:
                        return path + [neighbor]

                    if neighbor not in visited:
                        visited.add(neighbor)
                        next_queue.append((neighbor, path + [neighbor]))

            queue = next_queue

        return None  # No path found within max_depth

    def visualize(self, format: str = "mermaid", max_nodes: int = 50) -> str:
        """
        Generate a visualization of the dependency graph.

        Args:
            format: Output format ("mermaid" or "json")
            max_nodes: Maximum nodes to include (for readability)

        Returns:
            String representation of the graph
        """
        if format == "json":
            import json

            nodes = []
            edges = []
            for cid, chunk in list(self._chunks.items())[:max_nodes]:
                nodes.append({"id": cid, "name": chunk.name, "type": chunk.chunk_type})
                for callee in self.callees.get(cid, set()):
                    edges.append({"from": cid, "to": callee})
            return json.dumps({"nodes": nodes, "edges": edges}, indent=2)

        # Mermaid format
        lines = ["```mermaid", "flowchart TD"]

        # Add nodes (limit for readability)
        added = set()
        for cid, chunk in list(self._chunks.items())[:max_nodes]:
            safe_name = chunk.name.replace('"', "'")
            lines.append(f'    {cid[:8]}["{safe_name}"]')
            added.add(cid)

        # Add edges
        for cid in added:
            for callee in self.callees.get(cid, set()):
                if callee in added:
                    lines.append(f"    {cid[:8]} --> {callee[:8]}")

        lines.append("```")
        return "\n".join(lines)

add_chunk(chunk)

Add a single chunk to the graph.

Source code in src/boring/rag/graph_builder.py
def add_chunk(self, chunk: CodeChunk) -> None:
    """Add a single chunk to the graph."""
    self._chunks[chunk.chunk_id] = chunk
    self._name_to_ids[chunk.name].add(chunk.chunk_id)

    if chunk.parent:
        qualified = f"{chunk.parent}.{chunk.name}"
        self._name_to_ids[qualified].add(chunk.chunk_id)

    # Build edges for this chunk
    for dep_name in chunk.dependencies:
        dep_ids = self._name_to_ids.get(dep_name, set())
        for dep_id in dep_ids:
            self.callees[chunk.chunk_id].add(dep_id)
            self.callers[dep_id].add(chunk.chunk_id)

get_chunk(chunk_id)

Get chunk by ID.

Source code in src/boring/rag/graph_builder.py
def get_chunk(self, chunk_id: str) -> CodeChunk | None:
    """Get chunk by ID."""
    return self._chunks.get(chunk_id)

get_chunks_by_name(name)

Get all chunks matching a name.

Source code in src/boring/rag/graph_builder.py
def get_chunks_by_name(self, name: str) -> list[CodeChunk]:
    """Get all chunks matching a name."""
    ids = self._name_to_ids.get(name, set())
    return [self._chunks[cid] for cid in ids if cid in self._chunks]

get_callers(chunk_id)

Get all chunks that call this chunk.

Use case: "Who depends on this function?"

Source code in src/boring/rag/graph_builder.py
def get_callers(self, chunk_id: str) -> list[CodeChunk]:
    """
    Get all chunks that call this chunk.

    Use case: "Who depends on this function?"
    """
    caller_ids = self.callers.get(chunk_id, set())
    return [self._chunks[cid] for cid in caller_ids if cid in self._chunks]

get_callees(chunk_id)

Get all chunks that this chunk calls.

Use case: "What does this function depend on?"

Source code in src/boring/rag/graph_builder.py
def get_callees(self, chunk_id: str) -> list[CodeChunk]:
    """
    Get all chunks that this chunk calls.

    Use case: "What does this function depend on?"
    """
    callee_ids = self.callees.get(chunk_id, set())
    return [self._chunks[cid] for cid in callee_ids if cid in self._chunks]

Get related chunks via BFS on the dependency graph.

Parameters:

Name Type Description Default
seed_chunks list[CodeChunk]

Starting chunks

required
depth int

How many hops to traverse (default 1 per user decision)

1
direction str

"callers", "callees", or "both"

'both'

Returns:

Type Description
list[CodeChunk]

Related chunks (excluding seeds)

Source code in src/boring/rag/graph_builder.py
def get_related_chunks(
    self, seed_chunks: list[CodeChunk], depth: int = 1, direction: str = "both"
) -> list[CodeChunk]:
    """
    Get related chunks via BFS on the dependency graph.

    Args:
        seed_chunks: Starting chunks
        depth: How many hops to traverse (default 1 per user decision)
        direction: "callers", "callees", or "both"

    Returns:
        Related chunks (excluding seeds)
    """
    visited: set[str] = {c.chunk_id for c in seed_chunks}
    frontier: set[str] = {c.chunk_id for c in seed_chunks}
    related: list[CodeChunk] = []

    for _ in range(depth):
        next_frontier: set[str] = set()

        for chunk_id in frontier:
            neighbors: set[str] = set()

            if direction in ("callers", "both"):
                neighbors.update(self.callers.get(chunk_id, set()))

            if direction in ("callees", "both"):
                neighbors.update(self.callees.get(chunk_id, set()))

            for neighbor_id in neighbors:
                if neighbor_id not in visited:
                    visited.add(neighbor_id)
                    next_frontier.add(neighbor_id)
                    if neighbor_id in self._chunks:
                        related.append(self._chunks[neighbor_id])

        frontier = next_frontier

        if not frontier:
            break

    return related

get_impact_zone(modified_chunk_id, depth=1)

Get the "impact zone" - all chunks that might break if this one changes.

This returns all CALLERS (things that depend on the modified chunk). If you change a function, its callers might break.

Parameters:

Name Type Description Default
modified_chunk_id str

The chunk being modified

required
depth int

How many levels of callers to include (default 1)

1

Returns:

Type Description
list[CodeChunk]

List of chunks that might be affected

Source code in src/boring/rag/graph_builder.py
def get_impact_zone(self, modified_chunk_id: str, depth: int = 1) -> list[CodeChunk]:
    """
    Get the "impact zone" - all chunks that might break if this one changes.

    This returns all CALLERS (things that depend on the modified chunk).
    If you change a function, its callers might break.

    Args:
        modified_chunk_id: The chunk being modified
        depth: How many levels of callers to include (default 1)

    Returns:
        List of chunks that might be affected
    """
    if modified_chunk_id not in self._chunks:
        return []

    seed = [self._chunks[modified_chunk_id]]
    return self.get_related_chunks(seed, depth=depth, direction="callers")

get_context_for_modification(modified_chunk_id)

Get comprehensive context for modifying a chunk.

Returns dict with: - callers: Who calls this (might break) - callees: What this calls (need to understand interface) - siblings: Other methods in same class

Source code in src/boring/rag/graph_builder.py
def get_context_for_modification(self, modified_chunk_id: str) -> dict[str, list[CodeChunk]]:
    """
    Get comprehensive context for modifying a chunk.

    Returns dict with:
    - callers: Who calls this (might break)
    - callees: What this calls (need to understand interface)
    - siblings: Other methods in same class
    """
    result = {
        "callers": self.get_callers(modified_chunk_id),
        "callees": self.get_callees(modified_chunk_id),
        "siblings": [],
    }

    chunk = self._chunks.get(modified_chunk_id)
    if chunk and chunk.parent:
        # Find other methods in the same class
        for cid, c in self._chunks.items():
            if c.parent == chunk.parent and cid != modified_chunk_id:
                result["siblings"].append(c)

    return result

get_stats()

Get graph statistics.

Source code in src/boring/rag/graph_builder.py
def get_stats(self) -> GraphStats:
    """Get graph statistics."""
    total_edges = sum(len(callees) for callees in self.callees.values())
    total_nodes = len(self._chunks)

    max_callers = max((len(c) for c in self.callers.values()), default=0)
    max_callees = max((len(c) for c in self.callees.values()), default=0)

    return GraphStats(
        total_nodes=total_nodes,
        total_edges=total_edges,
        avg_connections=total_edges / total_nodes if total_nodes > 0 else 0,
        max_callers=max_callers,
        max_callees=max_callees,
    )

find_path(from_id, to_id, max_depth=5)

Find shortest path between two chunks (if exists).

Useful for understanding how two distant pieces of code are connected.

Source code in src/boring/rag/graph_builder.py
def find_path(self, from_id: str, to_id: str, max_depth: int = 5) -> list[str] | None:
    """
    Find shortest path between two chunks (if exists).

    Useful for understanding how two distant pieces of code are connected.
    """
    if from_id not in self._chunks or to_id not in self._chunks:
        return None

    if from_id == to_id:
        return [from_id]

    # BFS
    visited = {from_id}
    queue = [(from_id, [from_id])]

    for _ in range(max_depth):
        if not queue:
            break

        next_queue = []
        for current, path in queue:
            # Check all neighbors (both callers and callees)
            neighbors = self.callers.get(current, set()) | self.callees.get(current, set())

            for neighbor in neighbors:
                if neighbor == to_id:
                    return path + [neighbor]

                if neighbor not in visited:
                    visited.add(neighbor)
                    next_queue.append((neighbor, path + [neighbor]))

        queue = next_queue

    return None  # No path found within max_depth

visualize(format='mermaid', max_nodes=50)

Generate a visualization of the dependency graph.

Parameters:

Name Type Description Default
format str

Output format ("mermaid" or "json")

'mermaid'
max_nodes int

Maximum nodes to include (for readability)

50

Returns:

Type Description
str

String representation of the graph

Source code in src/boring/rag/graph_builder.py
def visualize(self, format: str = "mermaid", max_nodes: int = 50) -> str:
    """
    Generate a visualization of the dependency graph.

    Args:
        format: Output format ("mermaid" or "json")
        max_nodes: Maximum nodes to include (for readability)

    Returns:
        String representation of the graph
    """
    if format == "json":
        import json

        nodes = []
        edges = []
        for cid, chunk in list(self._chunks.items())[:max_nodes]:
            nodes.append({"id": cid, "name": chunk.name, "type": chunk.chunk_type})
            for callee in self.callees.get(cid, set()):
                edges.append({"from": cid, "to": callee})
        return json.dumps({"nodes": nodes, "edges": edges}, indent=2)

    # Mermaid format
    lines = ["```mermaid", "flowchart TD"]

    # Add nodes (limit for readability)
    added = set()
    for cid, chunk in list(self._chunks.items())[:max_nodes]:
        safe_name = chunk.name.replace('"', "'")
        lines.append(f'    {cid[:8]}["{safe_name}"]')
        added.add(cid)

    # Add edges
    for cid in added:
        for callee in self.callees.get(cid, set()):
            if callee in added:
                lines.append(f"    {cid[:8]} --> {callee[:8]}")

    lines.append("```")
    return "\n".join(lines)

GraphStats dataclass

Statistics about the dependency graph.

Source code in src/boring/rag/graph_builder.py
@dataclass
class GraphStats:
    """Statistics about the dependency graph."""

    total_nodes: int = 0
    total_edges: int = 0
    avg_connections: float = 0.0
    max_callers: int = 0
    max_callees: int = 0

HyDEExpander

HyDE (Hypothetical Document Embeddings) for improved code retrieval.

Instead of directly embedding the query, we first generate a hypothetical code snippet that would answer the query, then embed that.

This improves retrieval because: 1. Hypothetical code is closer in embedding space to actual code 2. Query intent is better captured 3. Technical jargon is made explicit

Source code in src/boring/rag/hyde.py
class HyDEExpander:
    """
    HyDE (Hypothetical Document Embeddings) for improved code retrieval.

    Instead of directly embedding the query, we first generate a hypothetical
    code snippet that would answer the query, then embed that.

    This improves retrieval because:
    1. Hypothetical code is closer in embedding space to actual code
    2. Query intent is better captured
    3. Technical jargon is made explicit
    """

    # Code templates for different query types
    TEMPLATES = {
        "error": """
# Hypothetical code that handles this error
def handle_{error_type}():
    try:
        # Code that might cause: {query}
        pass
    except {error_type} as e:
        # Error handling for: {query}
        logger.error(f"Error: {{e}}")
        raise
""",
        "function": """
def {function_name}({params}):
    '''
    {docstring}

    Implements: {query}
    '''
    # Implementation for: {query}
    pass
""",
        "class": """
class {class_name}:
    '''
    {docstring}

    Purpose: {query}
    '''
    def __init__(self):
        # Initialize for: {query}
        pass
""",
        "test": """
def test_{test_name}():
    '''Test case for: {query}'''
    # Arrange
    # ...setup for {query}

    # Act
    result = function_under_test()

    # Assert
    assert result is not None
""",
        "general": """
# Code context for: {query}
# This code handles {keywords}

def solve_{sanitized_query}():
    '''
    Implementation for: {query}
    Keywords: {keywords}
    '''
    pass
""",
    }

    # Query type detection patterns
    QUERY_PATTERNS = {
        "error": ["error", "exception", "fix", "bug", "crash", "fail", "issue"],
        "function": ["function", "method", "implement", "create", "build", "write"],
        "class": ["class", "model", "object", "entity", "schema"],
        "test": ["test", "spec", "assert", "verify", "check", "validate"],
    }

    def __init__(self, use_llm: bool = False):
        """
        Initialize HyDE expander.

        Args:
            use_llm: If True, use LLM for better hypothetical generation.
                     If False, use template-based generation (faster, no API).
        """
        self.use_llm = use_llm
        self._llm_client = None

    def expand_query(self, query: str) -> HyDEResult:
        """
        Expand a query into a hypothetical document.

        Args:
            query: Natural language query or error message

        Returns:
            HyDEResult with hypothetical code and metadata
        """
        # Detect query type
        query_type = self._detect_query_type(query)

        # Extract keywords
        keywords = self._extract_keywords(query)

        # Generate hypothetical document
        if self.use_llm and self._get_llm_client():
            hypothetical_code = self._generate_with_llm(query, query_type)
        else:
            hypothetical_code = self._generate_from_template(query, query_type, keywords)

        # Build descriptive document
        hypothetical_document = f"""
Query: {query}
Type: {query_type}
Keywords: {", ".join(keywords)}

Hypothetical Implementation:
{hypothetical_code}
"""

        return HyDEResult(
            original_query=query,
            hypothetical_document=hypothetical_document,
            hypothetical_code=hypothetical_code,
            expanded_keywords=keywords,
            confidence=0.8 if self.use_llm else 0.6,
        )

    def _detect_query_type(self, query: str) -> str:
        """Detect the type of query based on keywords."""
        query_lower = query.lower()

        for query_type, patterns in self.QUERY_PATTERNS.items():
            if any(pattern in query_lower for pattern in patterns):
                return query_type

        return "general"

    def _extract_keywords(self, query: str) -> list[str]:
        """Extract meaningful keywords from query."""
        # Remove common stop words
        stop_words = {
            "the",
            "a",
            "an",
            "is",
            "are",
            "was",
            "were",
            "be",
            "been",
            "being",
            "have",
            "has",
            "had",
            "do",
            "does",
            "did",
            "will",
            "would",
            "could",
            "should",
            "may",
            "might",
            "must",
            "shall",
            "can",
            "need",
            "dare",
            "ought",
            "used",
            "to",
            "of",
            "in",
            "for",
            "on",
            "with",
            "at",
            "by",
            "from",
            "as",
            "into",
            "through",
            "during",
            "before",
            "after",
            "above",
            "below",
            "between",
            "under",
            "again",
            "further",
            "then",
            "once",
            "here",
            "there",
            "when",
            "where",
            "why",
            "how",
            "all",
            "each",
            "few",
            "more",
            "most",
            "other",
            "some",
            "such",
            "no",
            "nor",
            "not",
            "only",
            "own",
            "same",
            "so",
            "than",
            "too",
            "very",
            "just",
            "and",
            "but",
            "if",
            "or",
            "because",
            "until",
            "while",
            "although",
            "though",
            "that",
            "this",
            "these",
            "those",
            "what",
            "which",
            "who",
            "whom",
            "whose",
            "i",
            "me",
            "my",
            "myself",
            "we",
            "our",
            "ours",
            "ourselves",
            "you",
            "your",
            "yours",
            "yourself",
            "yourselves",
            "he",
            "him",
            "his",
            "himself",
            "she",
            "her",
            "hers",
            "herself",
            "it",
            "its",
            "itself",
            "they",
            "them",
            "their",
            "theirs",
            "themselves",
        }

        # Tokenize and filter
        words = query.lower().replace("_", " ").replace("-", " ").split()
        keywords = [
            word.strip(".,!?:;\"'()[]{}")
            for word in words
            if word not in stop_words and len(word) > 2
        ]

        # Add common programming terms based on context
        query_lower = query.lower()
        if "authentication" in query_lower or "auth" in query_lower:
            keywords.extend(["login", "token", "session", "user"])
        if "database" in query_lower or "db" in query_lower:
            keywords.extend(["query", "sql", "connection", "model"])
        if "api" in query_lower:
            keywords.extend(["endpoint", "request", "response", "http"])

        return list(set(keywords))[:10]  # Limit to 10 keywords

    def _generate_from_template(self, query: str, query_type: str, keywords: list[str]) -> str:
        """Generate hypothetical code from template."""
        template = self.TEMPLATES.get(query_type, self.TEMPLATES["general"])

        # Sanitize query for use in code
        sanitized = "_".join(w.lower() for w in query.split()[:3] if w.isalnum())

        # Extract potential names from query
        words = [w for w in query.split() if w[0].isupper() or w.endswith("Error")]
        error_type = next((w for w in words if "Error" in w or "Exception" in w), "ValueError")

        return template.format(
            query=query,
            keywords=", ".join(keywords),
            sanitized_query=sanitized or "solution",
            error_type=error_type,
            function_name=sanitized or "solve",
            class_name=sanitized.title().replace("_", "") or "Solution",
            test_name=sanitized or "feature",
            params="*args, **kwargs",
            docstring=query[:100],
        )

    def _get_llm_client(self):
        """Get LLM client for advanced generation."""
        if self._llm_client is None:
            try:
                from boring.gemini_client import get_client

                self._llm_client = get_client()
            except ImportError:
                logger.debug("LLM client not available for HyDE")
                self._llm_client = False
        return self._llm_client if self._llm_client else None

    def _generate_with_llm(self, query: str, query_type: str) -> str:
        """Generate hypothetical code using LLM."""
        client = self._get_llm_client()
        if not client:
            return self._generate_from_template(query, query_type, self._extract_keywords(query))

        prompt = f"""Generate a short Python code snippet that would be found when searching for:
"{query}"

Requirements:
- Write realistic code that answers this query
- Include function/class definitions with docstrings
- Include relevant comments
- Keep it under 20 lines
- Make it look like production code

Code:
```python
"""
        try:
            response = client.generate_content(prompt)
            code = response.text.strip()
            # Extract code block if present
            if "```python" in code:
                code = code.split("```python")[1].split("```")[0]
            elif "```" in code:
                code = code.split("```")[1].split("```")[0]
            return code
        except Exception as e:
            logger.warning(f"LLM generation failed: {e}")
            return self._generate_from_template(query, query_type, self._extract_keywords(query))

__init__(use_llm=False)

Initialize HyDE expander.

Parameters:

Name Type Description Default
use_llm bool

If True, use LLM for better hypothetical generation. If False, use template-based generation (faster, no API).

False
Source code in src/boring/rag/hyde.py
def __init__(self, use_llm: bool = False):
    """
    Initialize HyDE expander.

    Args:
        use_llm: If True, use LLM for better hypothetical generation.
                 If False, use template-based generation (faster, no API).
    """
    self.use_llm = use_llm
    self._llm_client = None

expand_query(query)

Expand a query into a hypothetical document.

Parameters:

Name Type Description Default
query str

Natural language query or error message

required

Returns:

Type Description
HyDEResult

HyDEResult with hypothetical code and metadata

Source code in src/boring/rag/hyde.py
    def expand_query(self, query: str) -> HyDEResult:
        """
        Expand a query into a hypothetical document.

        Args:
            query: Natural language query or error message

        Returns:
            HyDEResult with hypothetical code and metadata
        """
        # Detect query type
        query_type = self._detect_query_type(query)

        # Extract keywords
        keywords = self._extract_keywords(query)

        # Generate hypothetical document
        if self.use_llm and self._get_llm_client():
            hypothetical_code = self._generate_with_llm(query, query_type)
        else:
            hypothetical_code = self._generate_from_template(query, query_type, keywords)

        # Build descriptive document
        hypothetical_document = f"""
Query: {query}
Type: {query_type}
Keywords: {", ".join(keywords)}

Hypothetical Implementation:
{hypothetical_code}
"""

        return HyDEResult(
            original_query=query,
            hypothetical_document=hypothetical_document,
            hypothetical_code=hypothetical_code,
            expanded_keywords=keywords,
            confidence=0.8 if self.use_llm else 0.6,
        )

HyDEResult dataclass

Result from HyDE query expansion.

Source code in src/boring/rag/hyde.py
@dataclass
class HyDEResult:
    """Result from HyDE query expansion."""

    original_query: str
    hypothetical_document: str
    hypothetical_code: str
    expanded_keywords: list[str]
    confidence: float

TreeSitterParser

Wrapper for tree-sitter parsing.

Source code in src/boring/rag/parser.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
class TreeSitterParser:
    """Wrapper for tree-sitter parsing."""

    # File extension to language name mapping
    EXT_TO_LANG = {
        ".py": "python",
        ".js": "javascript",
        ".jsx": "javascript",
        ".ts": "typescript",
        ".tsx": "typescript",
        ".go": "go",
        ".java": "java",
        ".cpp": "cpp",
        ".cc": "cpp",
        ".c": "c",
        ".h": "c",
        ".rs": "rust",
        ".rb": "ruby",
        ".php": "php",
        ".kt": "kotlin",
        ".kts": "kotlin",
        ".scala": "scala",
    }

    # S-expression queries for extracting definitions
    # V11.0: Enhanced queries for cross-language precision
    # V11.1: Fixed JavaScript function_expression -> function node type
    QUERIES = {
        "python": """
            (function_definition
                name: (identifier) @name) @function
            (class_definition
                name: (identifier) @name) @class
            (decorated_definition
                definition: (function_definition
                    name: (identifier) @name)) @function
        """,
        "javascript": """
            ; Regular function declarations
            (function_declaration
                name: (identifier) @name) @function

            ; Class declarations
            (class_declaration
                name: (identifier) @name) @class

            ; Method definitions in classes
            (method_definition
                name: (property_identifier) @name) @method

            ; Arrow functions assigned to variables (React components pattern)
            (variable_declarator
                (identifier) @name
                (arrow_function)) @function

            ; Anonymous function expressions assigned to variables
            ; Note: In tree-sitter-languages, the node type is 'function' not 'function_expression'
            (variable_declarator
                (identifier) @name
                (function)) @function

            ; Export statement with function declaration
            (export_statement
                (function_declaration
                    name: (identifier) @name)) @function

            ; React.memo, React.forwardRef wrapped components
            (variable_declarator
                (identifier) @name
                (call_expression
                    (member_expression))) @function
        """,
        "typescript": """
            ; Function declarations
            (function_declaration
                name: (identifier) @name) @function

            ; Class declarations
            (class_declaration
                name: (type_identifier) @name) @class

            ; Interface declarations (V11.0 enhancement)
            (interface_declaration
                name: (type_identifier) @name) @interface

            ; Type alias declarations (V11.0 enhancement)
            (type_alias_declaration
                name: (type_identifier) @name) @type_alias

            ; Method definitions
            (method_definition
                name: (property_identifier) @name) @method

            ; Arrow functions (including React FC components)
            (variable_declarator
                name: (identifier) @name
                value: (arrow_function)) @function

            ; Typed arrow functions: const Component: React.FC = () => {}
            (lexical_declaration
                (variable_declarator
                    name: (identifier) @name
                    type: (type_annotation)
                    value: (arrow_function))) @function

            ; Export default function
            (export_statement
                declaration: (function_declaration
                    name: (identifier) @name)) @function

            ; Enum declarations
            (enum_declaration
                name: (identifier) @name) @class

            ; Abstract class
            (abstract_class_declaration
                name: (type_identifier) @name) @class
        """,
        "go": """
            ; Regular function declarations
            (function_declaration
                name: (identifier) @name) @function

            ; Method declarations with receiver (V11.0 enhancement)
            ; Captures both the method name and receiver type
            (method_declaration
                receiver: (parameter_list
                    (parameter_declaration
                        type: (_) @receiver_type))
                name: (field_identifier) @name) @method

            ; Type declarations (struct, interface)
            (type_declaration
                (type_spec
                    name: (type_identifier) @name)) @class

            ; Interface type specifically
            (type_declaration
                (type_spec
                    name: (type_identifier) @name
                    type: (interface_type))) @interface
        """,
        "java": """
            (method_declaration
                name: (identifier) @name) @function
            (class_declaration
                name: (identifier) @name) @class
            (interface_declaration
                name: (identifier) @name) @interface
            (constructor_declaration
                name: (identifier) @name) @function
            (enum_declaration
                name: (identifier) @name) @class
            (annotation_type_declaration
                name: (identifier) @name) @class
        """,
        "cpp": """
            ; Function definitions
            (function_definition
                declarator: (function_declarator
                    declarator: (identifier) @name)) @function

            ; Class specifiers
            (class_specifier
                name: (type_identifier) @name) @class

            ; Struct specifiers
            (struct_specifier
                name: (type_identifier) @name) @class

            ; Namespace definitions (V11.0 enhancement)
            (namespace_definition
                name: (namespace_identifier) @name) @namespace

            ; Template declarations (V11.0 enhancement)
            (template_declaration
                (function_definition
                    declarator: (function_declarator
                        declarator: (identifier) @name))) @function

            ; Template class
            (template_declaration
                (class_specifier
                    name: (type_identifier) @name)) @class
        """,
        "c": """
            (function_definition
                declarator: (function_declarator
                    declarator: (identifier) @name)) @function
            (struct_specifier
                name: (type_identifier) @name) @class
            (enum_specifier
                name: (type_identifier) @name) @class
            (type_definition
                declarator: (type_identifier) @name) @type_alias
        """,
        "rust": """
            (function_item
                name: (identifier) @name) @function
            (impl_item
                type: (type_identifier) @name) @class
            (struct_item
                name: (type_identifier) @name) @class
            (enum_item
                name: (type_identifier) @name) @class
            (trait_item
                name: (type_identifier) @name) @interface
            (type_item
                name: (type_identifier) @name) @type_alias
            (mod_item
                name: (identifier) @name) @namespace
        """,
        "ruby": """
            (method
                name: (identifier) @name) @function
            (class
                name: (constant) @name) @class
            (module
                name: (constant) @name) @class
            (singleton_method
                name: (identifier) @name) @function
        """,
        "php": """
            (function_definition
                name: (name) @name) @function
            (class_declaration
                name: (name) @name) @class
            (method_declaration
                name: (name) @name) @method
            (interface_declaration
                name: (name) @name) @interface
            (trait_declaration
                name: (name) @name) @class
        """,
        "kotlin": """
            (class_declaration
                (type_identifier) @name) @class
            (object_declaration
                (type_identifier) @name) @class
            (function_declaration
                (simple_identifier) @name) @function
            (property_declaration
                (variable_declaration
                    (simple_identifier) @name)) @method
        """,
        "scala": """
            (class_definition
                name: (identifier) @name) @class
            (object_definition
                name: (identifier) @name) @class
            (trait_definition
                name: (identifier) @name) @interface
            (function_definition
                name: (identifier) @name) @function
        """,
    }

    def __init__(self):
        self.parsers = {}

    def is_available(self) -> bool:
        """Check if tree-sitter is available."""
        return HAS_TREE_SITTER

    def get_language_for_file(self, file_path: Path) -> str | None:
        """Determine language from file extension."""
        return self.EXT_TO_LANG.get(file_path.suffix.lower())

    def parse_file(self, file_path: Path) -> list[ParsedChunk]:
        """
        Parse a file and extract semantic chunks.
        Returns empty list if language not supported or parser fails.
        """
        if not HAS_TREE_SITTER:
            return []

        lang_name = self.get_language_for_file(file_path)
        if not lang_name:
            return []

        try:
            content = file_path.read_text(encoding="utf-8")
        except Exception as e:
            logger.warning(f"Failed to read file {file_path}: {e}")
            return []

        return self.extract_chunks(content, lang_name)

    def extract_chunks(self, code: str, language: str) -> list[ParsedChunk]:
        """
        Extract chunks from code string using tree-sitter.

        V11.0: Enhanced to handle interface, type_alias, namespace, and Go method receivers.
        """
        if not HAS_TREE_SITTER:
            return []

        try:
            # Lazy load parser
            if language not in self.parsers:
                self.parsers[language] = get_parser(language)

            parser = self.parsers[language]
            tree = parser.parse(bytes(code, "utf8"))

            query_str = self.QUERIES.get(language)
            if not query_str:
                return []

            ts_language = get_language(language)
            query = ts_language.query(query_str)

            chunk_types = {
                "function",
                "class",
                "method",
                "interface",
                "type_alias",
                "namespace",
            }

            # V11.1: Type specificity ranking - more specific types take precedence
            # Higher number = more specific
            type_specificity = {
                "class": 1,
                "function": 2,
                "method": 3,
                "namespace": 4,
                "type_alias": 5,
                "interface": 6,  # interface is more specific than class for Go
            }

            result = []
            matches = query.matches(tree.root_node)

            # Map of node_id -> dict to avoid duplicates if multiple queries hit the same node
            processed_chunks = {}

            for _match_id, captures in matches:
                # In 0.21.3, captures is a dict mapping capture_name -> Node
                # (or list of Nodes depending on configuration, but usually Node for matches)

                # Identify the primary chunk node in this match
                chunk_node = None
                chunk_type = None
                for c_name, node in captures.items():
                    if c_name in chunk_types:
                        chunk_node = node
                        chunk_type = c_name
                        break

                if not chunk_node:
                    continue

                # Get or create the chunk entry
                node_id = chunk_node.id

                # V11.1: Check if this type is more specific than existing one
                if node_id in processed_chunks:
                    existing_type = processed_chunks[node_id]["type"]
                    existing_specificity = type_specificity.get(existing_type, 0)
                    new_specificity = type_specificity.get(chunk_type, 0)
                    if new_specificity > existing_specificity:
                        # Update to more specific type
                        processed_chunks[node_id]["type"] = chunk_type
                else:
                    start_line = chunk_node.start_point[0] + 1
                    end_line = chunk_node.end_point[0] + 1
                    content = chunk_node.text.decode("utf8")
                    lines = content.split("\n")
                    signature = lines[0] if lines else ""

                    # Capture signature spanning multiple lines
                    if "{" not in signature and len(lines) > 1:
                        for i, line in enumerate(lines[1:], 1):
                            signature += "\n" + line
                            if "{" in line or i >= 3:
                                break

                    processed_chunks[node_id] = {
                        "type": chunk_type,
                        "name": "anonymous",
                        "start_line": start_line,
                        "end_line": end_line,
                        "content": content,
                        "signature": signature.strip(),
                        "receiver": None,
                    }

                # Extract attributes from the same match
                chunk_data = processed_chunks[node_id]

                # Defensive capture name lookup
                capture_names = {
                    query.capture_names[k]: node
                    for k, node in captures.items()
                    if isinstance(k, int)
                }
                capture_names.update(
                    {k: node for k, node in captures.items() if isinstance(k, str)}
                )

                if "name" in capture_names:
                    name_node = capture_names["name"]
                    chunk_data["name"] = name_node.text.decode("utf8")

                if "receiver_type" in capture_names:
                    recv_node = capture_names["receiver_type"]
                    recv_text = recv_node.text.decode("utf8")
                    if recv_text.startswith("*"):
                        recv_text = recv_text[1:]
                    chunk_data["receiver"] = recv_text

            # Sort and return
            for data in processed_chunks.values():
                result.append(ParsedChunk(**data))

            return sorted(result, key=lambda x: x.start_line)

        except Exception as e:
            logger.error(f"Tree-sitter match failure for {language}: {e}")
            return []

    def validate_language_support(self, language: str, test_code: str) -> dict:
        """
        Validate that Tree-sitter queries work correctly for a given language.

        V11.0: Structured testing for cross-language precision.

        Args:
            language: Language name (e.g., 'go', 'typescript')
            test_code: Sample code to parse

        Returns:
            Dict with validation results
        """
        if not HAS_TREE_SITTER:
            return {"success": False, "error": "tree-sitter not available"}

        if language not in self.QUERIES:
            return {"success": False, "error": f"No query defined for language: {language}"}

        try:
            chunks = self.extract_chunks(test_code, language)
            return {
                "success": True,
                "language": language,
                "chunks_found": len(chunks),
                "chunk_types": list({c.type for c in chunks}),
                "chunk_names": [c.name for c in chunks],
                "receivers": [c.receiver for c in chunks if c.receiver],
            }
        except Exception as e:
            return {"success": False, "error": str(e)}

is_available()

Check if tree-sitter is available.

Source code in src/boring/rag/parser.py
def is_available(self) -> bool:
    """Check if tree-sitter is available."""
    return HAS_TREE_SITTER

get_language_for_file(file_path)

Determine language from file extension.

Source code in src/boring/rag/parser.py
def get_language_for_file(self, file_path: Path) -> str | None:
    """Determine language from file extension."""
    return self.EXT_TO_LANG.get(file_path.suffix.lower())

parse_file(file_path)

Parse a file and extract semantic chunks. Returns empty list if language not supported or parser fails.

Source code in src/boring/rag/parser.py
def parse_file(self, file_path: Path) -> list[ParsedChunk]:
    """
    Parse a file and extract semantic chunks.
    Returns empty list if language not supported or parser fails.
    """
    if not HAS_TREE_SITTER:
        return []

    lang_name = self.get_language_for_file(file_path)
    if not lang_name:
        return []

    try:
        content = file_path.read_text(encoding="utf-8")
    except Exception as e:
        logger.warning(f"Failed to read file {file_path}: {e}")
        return []

    return self.extract_chunks(content, lang_name)

extract_chunks(code, language)

Extract chunks from code string using tree-sitter.

V11.0: Enhanced to handle interface, type_alias, namespace, and Go method receivers.

Source code in src/boring/rag/parser.py
def extract_chunks(self, code: str, language: str) -> list[ParsedChunk]:
    """
    Extract chunks from code string using tree-sitter.

    V11.0: Enhanced to handle interface, type_alias, namespace, and Go method receivers.
    """
    if not HAS_TREE_SITTER:
        return []

    try:
        # Lazy load parser
        if language not in self.parsers:
            self.parsers[language] = get_parser(language)

        parser = self.parsers[language]
        tree = parser.parse(bytes(code, "utf8"))

        query_str = self.QUERIES.get(language)
        if not query_str:
            return []

        ts_language = get_language(language)
        query = ts_language.query(query_str)

        chunk_types = {
            "function",
            "class",
            "method",
            "interface",
            "type_alias",
            "namespace",
        }

        # V11.1: Type specificity ranking - more specific types take precedence
        # Higher number = more specific
        type_specificity = {
            "class": 1,
            "function": 2,
            "method": 3,
            "namespace": 4,
            "type_alias": 5,
            "interface": 6,  # interface is more specific than class for Go
        }

        result = []
        matches = query.matches(tree.root_node)

        # Map of node_id -> dict to avoid duplicates if multiple queries hit the same node
        processed_chunks = {}

        for _match_id, captures in matches:
            # In 0.21.3, captures is a dict mapping capture_name -> Node
            # (or list of Nodes depending on configuration, but usually Node for matches)

            # Identify the primary chunk node in this match
            chunk_node = None
            chunk_type = None
            for c_name, node in captures.items():
                if c_name in chunk_types:
                    chunk_node = node
                    chunk_type = c_name
                    break

            if not chunk_node:
                continue

            # Get or create the chunk entry
            node_id = chunk_node.id

            # V11.1: Check if this type is more specific than existing one
            if node_id in processed_chunks:
                existing_type = processed_chunks[node_id]["type"]
                existing_specificity = type_specificity.get(existing_type, 0)
                new_specificity = type_specificity.get(chunk_type, 0)
                if new_specificity > existing_specificity:
                    # Update to more specific type
                    processed_chunks[node_id]["type"] = chunk_type
            else:
                start_line = chunk_node.start_point[0] + 1
                end_line = chunk_node.end_point[0] + 1
                content = chunk_node.text.decode("utf8")
                lines = content.split("\n")
                signature = lines[0] if lines else ""

                # Capture signature spanning multiple lines
                if "{" not in signature and len(lines) > 1:
                    for i, line in enumerate(lines[1:], 1):
                        signature += "\n" + line
                        if "{" in line or i >= 3:
                            break

                processed_chunks[node_id] = {
                    "type": chunk_type,
                    "name": "anonymous",
                    "start_line": start_line,
                    "end_line": end_line,
                    "content": content,
                    "signature": signature.strip(),
                    "receiver": None,
                }

            # Extract attributes from the same match
            chunk_data = processed_chunks[node_id]

            # Defensive capture name lookup
            capture_names = {
                query.capture_names[k]: node
                for k, node in captures.items()
                if isinstance(k, int)
            }
            capture_names.update(
                {k: node for k, node in captures.items() if isinstance(k, str)}
            )

            if "name" in capture_names:
                name_node = capture_names["name"]
                chunk_data["name"] = name_node.text.decode("utf8")

            if "receiver_type" in capture_names:
                recv_node = capture_names["receiver_type"]
                recv_text = recv_node.text.decode("utf8")
                if recv_text.startswith("*"):
                    recv_text = recv_text[1:]
                chunk_data["receiver"] = recv_text

        # Sort and return
        for data in processed_chunks.values():
            result.append(ParsedChunk(**data))

        return sorted(result, key=lambda x: x.start_line)

    except Exception as e:
        logger.error(f"Tree-sitter match failure for {language}: {e}")
        return []

validate_language_support(language, test_code)

Validate that Tree-sitter queries work correctly for a given language.

V11.0: Structured testing for cross-language precision.

Parameters:

Name Type Description Default
language str

Language name (e.g., 'go', 'typescript')

required
test_code str

Sample code to parse

required

Returns:

Type Description
dict

Dict with validation results

Source code in src/boring/rag/parser.py
def validate_language_support(self, language: str, test_code: str) -> dict:
    """
    Validate that Tree-sitter queries work correctly for a given language.

    V11.0: Structured testing for cross-language precision.

    Args:
        language: Language name (e.g., 'go', 'typescript')
        test_code: Sample code to parse

    Returns:
        Dict with validation results
    """
    if not HAS_TREE_SITTER:
        return {"success": False, "error": "tree-sitter not available"}

    if language not in self.QUERIES:
        return {"success": False, "error": f"No query defined for language: {language}"}

    try:
        chunks = self.extract_chunks(test_code, language)
        return {
            "success": True,
            "language": language,
            "chunks_found": len(chunks),
            "chunk_types": list({c.type for c in chunks}),
            "chunk_names": [c.name for c in chunks],
            "receivers": [c.receiver for c in chunks if c.receiver],
        }
    except Exception as e:
        return {"success": False, "error": str(e)}

RAGRetriever

Hybrid RAG retriever for code context.

Features: - Semantic search via ChromaDB embeddings - 1-layer graph expansion (per user decision) - Smart jump: Agent can request deeper traversal on-demand - Recency boost: recently modified files rank higher

Usage

retriever = RAGRetriever(project_root) retriever.build_index()

Basic retrieval

results = retriever.retrieve("authentication error handling")

With graph expansion for specific function

context = retriever.get_modification_context("src/auth.py", "login")

Source code in src/boring/rag/rag_retriever.py
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
class RAGRetriever:
    """
    Hybrid RAG retriever for code context.

    Features:
    - Semantic search via ChromaDB embeddings
    - 1-layer graph expansion (per user decision)
    - Smart jump: Agent can request deeper traversal on-demand
    - Recency boost: recently modified files rank higher

    Usage:
        retriever = RAGRetriever(project_root)
        retriever.build_index()

        # Basic retrieval
        results = retriever.retrieve("authentication error handling")

        # With graph expansion for specific function
        context = retriever.get_modification_context("src/auth.py", "login")
    """

    # Default collection name in ChromaDB
    COLLECTION_NAME = "boring_code_rag"

    def __init__(
        self,
        project_root: Path,
        persist_dir: Path | None = None,
        collection_name: str | None = None,
        additional_roots: list[Path] | None = None,
    ):
        self.project_root = Path(project_root)
        self.persist_dir = persist_dir or (self.project_root / ".boring_memory" / "rag_db")
        self.collection_name = collection_name or self.COLLECTION_NAME

        # Multi-project support: list of all project roots to index
        self.all_project_roots: list[Path] = [self.project_root]
        if additional_roots:
            self.all_project_roots.extend([Path(p) for p in additional_roots])

        # Ensure persist directory exists
        self.persist_dir.mkdir(parents=True, exist_ok=True)

        # Components
        self.indexer = CodeIndexer(self.project_root)
        self.index_state = IndexState(self.project_root)
        self.graph: GraphRAG | None = None
        self._chunks: dict[str, CodeChunk] = {}
        self._file_to_chunks: dict[str, list[str]] = {}  # file_path -> chunk_ids

        # ChromaDB client
        self.client = None
        self.collection = None
        self._embedding_function = None

        if CHROMA_AVAILABLE:
            try:
                import chromadb
                from chromadb.config import Settings as ChromaSettings

                self.client = chromadb.PersistentClient(
                    path=str(self.persist_dir),
                    settings=ChromaSettings(anonymized_telemetry=False, allow_reset=True),
                )

                # V14.0: Use local embedding function for offline mode
                self._embedding_function = self._get_embedding_function()

                self.collection = self.client.get_or_create_collection(
                    name=self.collection_name,
                    metadata={"hnsw:space": "cosine"},
                    embedding_function=self._embedding_function,
                )
                logger.info(f"ChromaDB initialized at {self.persist_dir}")
            except Exception as e:
                logger.warning(f"Failed to initialize ChromaDB: {e}")
                self.client = None
                self.collection = None

    def _get_embedding_function(self):
        """
        V14.0: Get appropriate embedding function based on mode.

        Prefers local embedding for offline mode or when API is unavailable.
        """
        import os

        offline_mode = os.environ.get("BORING_OFFLINE_MODE", "").lower() == "true"

        try:
            from ..core.config import settings

            offline_mode = offline_mode or getattr(settings, "OFFLINE_MODE", False)
        except Exception:
            pass

        if offline_mode:
            try:
                from .local_embedding import get_chroma_embedding_function

                ef = get_chroma_embedding_function(offline_mode=True)
                if ef:
                    logger.info("Using local embedding function for offline mode")
                    return ef
            except ImportError:
                logger.debug("Local embedding not available")
            except Exception as e:
                logger.warning(f"Failed to load local embedding: {e}")

        # Default: return None to use ChromaDB's default embedding
        return None

    @property
    def is_available(self) -> bool:
        """Check if RAG system is available."""
        return CHROMA_AVAILABLE and self.collection is not None

    def build_index(self, force: bool = False, incremental: bool = True) -> int:
        """
        Index the entire codebase.

        Args:
            force: If True, rebuild even if index exists
            incremental: If True (and not force), only index changed files.

        Returns:
            Number of chunks indexed
        """
        if not self.is_available:
            logger.warning("ChromaDB not available, skipping index build")
            return 0

        # 1. Handle Force Rebuild
        existing_count = self.collection.count()
        if force:
            logger.info("Force rebuild: clearing existing index")
            try:
                self.client.delete_collection(self.collection_name)
                self.collection = self.client.create_collection(
                    name=self.collection_name, metadata={"hnsw:space": "cosine"}
                )
                self.index_state = IndexState(self.project_root)  # Reset state
                # Clear state file explicitly?
                # IndexState loads from disk. We should clear it.
                # But IndexState doesn't have clear method yet beyond remove items.
                # For now we just ignore load?
                # Better: implement clear() in IndexState later or just iterate deletions.
                # Here we assume starting fresh means empty DB, but IndexState might still have data on disk.
                # We should probably clear cached state file too.
                # self.index_state.clear() # user needs to add this method or we assume overwrite updates.
                pass
            except Exception as e:
                logger.error(f"Failed to clear collection: {e}")
                return 0

        # 2. Collect Files from all project roots (multi-project support)
        all_files = []
        for root in self.all_project_roots:
            indexer = CodeIndexer(root)
            all_files.extend(indexer.collect_files())

        # 3. Determine Changes
        current_commit = ""
        try:
            # Get current git commit hash
            import subprocess

            cmd = ["git", "rev-parse", "HEAD"]
            current_commit = subprocess.check_output(cmd, cwd=self.project_root, text=True).strip()
        except Exception:
            logger.debug("Could not get git commit hash")

        last_commit = self.index_state.get_last_commit()

        if incremental and not force and last_commit and current_commit:
            files_to_index = self.indexer.get_changed_files(last_commit)
            # For incremental diff based indexing, we don't easily know "stale" files
            # unless we scan everything or trust git diff for deletions (which get_changed_files doesn't strictly handle for removals from index yet).
            # Actually CodeIndexer.get_changed_files logic returns *existing* files that changed.
            # It doesn't return deleted files.
            # We need to handle deletions separately if we rely purely on git diff.
            # But let's stick to the current hybrid approach:
            # If we trust IndexState knows what IT thinks is indexed, we can check if those still exist?
            # Or use git diff for deletions too?
            # For simplicity in V1 (Phase 1.2), we will re-scan all for stale check (fast)
            # but only INDEX changed_files (slow).

            # Optimization: collect_files is fast (just os.walk). The heavy part is index_file (parsing).
            # So we can still do collect_files() to find stale ones.
            all_files_current = []
            for root in self.all_project_roots:
                idx = CodeIndexer(root)
                all_files_current.extend(idx.collect_files())

            stale_files_rel = self.index_state.get_stale_files(all_files_current)

            # But files_to_index should only be the changed ones from git diff
            # Note: get_changed_files from code_indexer uses git diff.
            # But we passed `self.indexer` which is initialized with project_root.
            # If changed_files are returned, we use them.
            # What if files_to_index is empty? means nothing changed content-wise.
            pass
        else:
            # Full scan fallback
            files_to_index = all_files
            stale_files_rel = []

        if not files_to_index and not stale_files_rel and (last_commit == current_commit):
            logger.info("Index is up to date (commit match).")
            # We still need to load chunks into memory for graph building
            self._load_chunks_from_db()
            return existing_count

        # 4. Handle Deletions
        for rel_path in stale_files_rel:
            chunk_ids = self.index_state.state.get(rel_path, {}).get("chunks", [])
            if chunk_ids:
                try:
                    self.collection.delete(ids=chunk_ids)
                except Exception:
                    pass
            self.index_state.remove(rel_path)
            logger.info(f"Removed stale file: {rel_path}")

        # 5. Handle Indexing
        new_chunks_buffer = []
        total_indexed = 0

        # Reset indexer stats for accurate reporting
        self.indexer.stats = IndexStats()

        for file_path in files_to_index:
            # Clear old chunks for modified file
            rel_path = self.index_state._get_rel_path(file_path)
            old_ids = self.index_state.get_chunks_for_file(file_path)
            if old_ids:
                try:
                    self.collection.delete(ids=old_ids)
                except Exception:
                    pass

            # Generate new chunks
            try:
                chunks = list(self.indexer.index_file(file_path))
            except Exception as e:
                logger.warning(f"Failed to index {file_path}: {e}")
                self.indexer.stats.skipped_files += 1
                continue

            if not chunks:
                continue

            # Track stats per file
            self.indexer.stats.total_files += 1
            for chunk in chunks:
                self.indexer.stats.total_chunks += 1
                if chunk.chunk_type == "function":
                    self.indexer.stats.functions += 1
                elif chunk.chunk_type == "class":
                    self.indexer.stats.classes += 1
                elif chunk.chunk_type == "method":
                    self.indexer.stats.methods += 1
                elif chunk.chunk_type == "script":
                    self.indexer.stats.script_chunks += 1

            # Batch upsert logic preparation
            new_chunks_buffer.extend(chunks)

            # Update state with new IDs
            ids = [c.chunk_id for c in chunks]
            self.index_state.update(file_path, ids)
            total_indexed += len(chunks)

        # 6. Bulk Upsert to Chroma
        if new_chunks_buffer:
            ids = [c.chunk_id for c in new_chunks_buffer]
            documents = [self._chunk_to_document(c) for c in new_chunks_buffer]
            metadatas = [self._chunk_to_metadata(c) for c in new_chunks_buffer]

            batch_size = 100
            for i in range(0, len(new_chunks_buffer), batch_size):
                end = min(i + batch_size, len(new_chunks_buffer))
                try:
                    self.collection.upsert(
                        ids=ids[i:end], documents=documents[i:end], metadatas=metadatas[i:end]
                    )
                except Exception as e:
                    logger.error(f"Failed to upsert batch: {e}")

        # 7. Persist State
        if current_commit:
            self.index_state.update_commit(current_commit)
        self.index_state.save()

        # 8. Reload fully for graph building (Hybrid RAG needs graph)
        # Note: In a huge repo, loading all chunks might be heavy.
        # Ideally we load only necessary graph data.
        # For V10 we load all instructions.
        self._load_chunks_from_db()

        # Update graph with new/all chunks
        # _load_chunks_from_db handles rebuilding self._chunks and self.graph

        logger.info(
            f"Indexed {len(files_to_index)} files ({total_indexed} chunks). Removed {len(stale_files_rel)} stale files."
        )

        return self.collection.count()

    def retrieve(
        self,
        query: str,
        n_results: int = 10,
        expand_graph: bool = True,
        file_filter: str | None = None,
        chunk_types: list[str] | None = None,
        threshold: float = 0.0,
        use_hyde: bool = True,
        use_rerank: bool = True,
    ) -> list[RetrievalResult]:
        """
        Retrieve relevant code chunks with caching.

        Args:
            query: Natural language query or error message
            n_results: Maximum results to return
            expand_graph: Whether to include 1-layer dependency context
            file_filter: Filter by file path substring (e.g., "auth")
            chunk_types: Filter by chunk types (e.g., ["function", "class"])
            threshold: Minimum relevance score (0.0 to 1.0)
            use_hyde: Whether to use HyDE query expansion (V10.24+)
            use_rerank: Whether to use Cross-Encoder reranking (V10.24+)

        Returns:
            List of RetrievalResult sorted by relevance
        """
        if not self.is_available:
            return []

        # Generate cache key
        cache_key = f"{query}:{n_results}:{expand_graph}:{file_filter}:{chunk_types}:{threshold}:{use_hyde}:{use_rerank}"

        # Check cache
        with _cache_lock:
            if cache_key in _query_cache:
                cached_results, cache_time = _query_cache[cache_key]
                if time.time() - cache_time < _QUERY_CACHE_TTL:
                    return cached_results

        # Perform actual retrieval
        results = self._retrieve_impl(
            query,
            n_results,
            expand_graph,
            file_filter,
            chunk_types,
            threshold,
            use_hyde=use_hyde,
            use_rerank=use_rerank,
        )

        # Update cache
        with _cache_lock:
            _query_cache[cache_key] = (results, time.time())

        return results

    def _retrieve_impl(
        self,
        query: str,
        n_results: int = 10,
        expand_graph: bool = True,
        file_filter: str | None = None,
        chunk_types: list[str] | None = None,
        threshold: float = 0.0,
        use_hyde: bool = True,
        use_rerank: bool = True,
    ) -> list[RetrievalResult]:
        """
        Internal implementation of retrieve without caching.
        """

        # 1. HyDE Expansion (V10.24+)
        search_query = query
        hyde_result: HyDEResult | None = None
        if use_hyde:
            try:
                expander = get_hyde_expander()
                hyde_result = expander.expand_query(query)
                search_query = hyde_result.hypothetical_code
                logger.debug(f"HyDE expansion applied for query: {query[:50]}...")
            except Exception as e:
                logger.warning(f"HyDE expansion failed: {e}")

        # Build ChromaDB filter
        where_filter = self._build_where_filter(file_filter, chunk_types)

        # Vector search
        try:
            results = self.collection.query(
                query_texts=[search_query],
                n_results=min(n_results * 2, 50),  # Fetch extra for graph expansion/reranking
                where=where_filter,
            )
        except Exception as e:
            logger.error(f"ChromaDB query failed: {e}")
            return []

        retrieved: list[RetrievalResult] = []
        seen_ids: set[str] = set()

        # Process vector search results
        if results and results.get("ids"):
            for i, chunk_id in enumerate(results["ids"][0]):
                if chunk_id in seen_ids:
                    continue
                seen_ids.add(chunk_id)

                # Calculate score from distance
                distance = results["distances"][0][i] if results.get("distances") else 0.5
                score = 1.0 - min(distance, 1.0)  # Convert distance to similarity

                # Filter by threshold
                if score < threshold:
                    continue

                # Get chunk from cache or reconstruct
                chunk = self._get_or_reconstruct_chunk(chunk_id, results, i)
                if not chunk:
                    continue

                retrieved.append(
                    RetrievalResult(
                        chunk=chunk, score=score, retrieval_method="vector", distance=distance
                    )
                )

        # 1-layer graph expansion (per user decision)
        if expand_graph and self.graph and retrieved:
            # Expand from top 3 results only (to limit context size)
            top_chunks = [r.chunk for r in retrieved[:3]]
            related = self.graph.get_related_chunks(top_chunks, depth=1)

            for chunk in related:
                if chunk.chunk_id not in seen_ids:
                    seen_ids.add(chunk.chunk_id)
                    retrieved.append(
                        RetrievalResult(
                            chunk=chunk,
                            score=0.5,  # Lower score for graph-expanded
                            retrieval_method="graph",
                        )
                    )

        # =================================================================
        # HYBRID SEARCH: Keyword boosting for better accuracy
        # =================================================================
        # Boost scores for chunks where query terms appear in name/content
        query_terms = set(query.lower().split())
        for result in retrieved:
            boost = 0.0
            chunk_name = result.chunk.name.lower() if result.chunk.name else ""

            # Strong boost for exact name match
            if any(term in chunk_name for term in query_terms):
                boost += 0.15

            # Medium boost for content keyword match
            chunk_content = result.chunk.content.lower()[:500] if result.chunk.content else ""
            matching_terms = sum(1 for term in query_terms if term in chunk_content)
            if matching_terms > 0:
                boost += min(0.1, matching_terms * 0.02)  # Cap at 0.1

            result.score = min(1.0, result.score + boost)  # Cap at 1.0

        # =================================================================
        # V10.23: SESSION CONTEXT BOOSTING
        # =================================================================
        session_ctx = get_session_context()
        if session_ctx:
            for result in retrieved:
                session_boost = 0.0
                chunk_file = result.chunk.file_path if result.chunk.file_path else ""

                # Boost for focus files
                if session_ctx.get("focus_files"):
                    for focus_file in session_ctx["focus_files"]:
                        if focus_file in chunk_file or chunk_file in focus_file:
                            session_boost += 0.2
                            break

                # Boost for task-type keywords
                task_type = session_ctx.get("task_type", "general")
                chunk_content = result.chunk.content.lower() if result.chunk.content else ""

                if task_type == "debugging":
                    if any(
                        kw in chunk_content
                        for kw in ["error", "exception", "try", "except", "catch"]
                    ):
                        session_boost += 0.1
                        result.task_relevance = 0.8
                elif task_type == "testing":
                    if any(kw in chunk_content for kw in ["test", "assert", "mock", "fixture"]):
                        session_boost += 0.1
                        result.task_relevance = 0.8
                elif task_type == "refactoring":
                    if any(kw in chunk_content for kw in ["class", "def", "function", "method"]):
                        session_boost += 0.05
                        result.task_relevance = 0.6

                # Boost for session keywords
                if session_ctx.get("keywords"):
                    matching = sum(
                        1 for kw in session_ctx["keywords"] if kw.lower() in chunk_content
                    )
                    session_boost += min(0.15, matching * 0.05)

                result.session_boost = session_boost
                result.score = min(1.0, result.score + session_boost)

        # =================================================================
        # INTELLIGENT RANKING: Re-rank based on usage patterns (V10.22+)
        # =================================================================
        ranker = _get_intelligent_ranker(self.project_root)
        if ranker:
            # V10.23: Pass session context to ranker
            context = {"session": session_ctx} if session_ctx else None
            retrieved = ranker.rerank(query, retrieved, top_k=n_results * 2, context=context)

        # =================================================================
        # HYBRID RAG: Cross-Encoder Reranking (V10.24+)
        # =================================================================
        if use_rerank and len(retrieved) > 1:
            try:
                reranker = get_ensemble_reranker()
                # Use ensemble reranker to combine semantic CE scores with metadata
                reranked_indices = reranker.rerank(
                    query=query,
                    chunks=[r.chunk for r in retrieved],
                    original_scores=[r.score for r in retrieved],
                    top_k=n_results,
                )

                # Reconstruct RetrievalResult list in new order
                new_retrieved = []
                for original_idx, score in reranked_indices:
                    res = retrieved[original_idx]
                    res.score = score  # Update with reranked score
                    new_retrieved.append(res)

                retrieved = new_retrieved
                logger.debug(f"Cross-Encoder reranking applied to {len(retrieved)} results")
            except Exception as e:
                logger.warning(f"Reranking failed: {e}")

        # Sort by score and limit
        retrieved.sort(key=lambda x: x.score, reverse=True)
        return retrieved[:n_results]

    def record_user_selection(self, chunk_id: str, query: str, session_id: str = ""):
        """
        Record that a user selected a specific chunk from results.

        This feedback improves future ranking for similar queries.

        Args:
            chunk_id: The chunk that was selected
            query: The query that produced the results
            session_id: Optional session identifier
        """
        ranker = _get_intelligent_ranker(self.project_root)
        if ranker:
            ranker.record_selection(chunk_id, query, session_id=session_id)
            logger.debug(f"Recorded selection: {chunk_id} for query: {query[:50]}")

    async def retrieve_async(
        self,
        query: str,
        n_results: int = 10,
        expand_graph: bool = True,
        file_filter: str | None = None,
        chunk_types: list[str] | None = None,
    ) -> list[RetrievalResult]:
        """
        Async version of retrieve for non-blocking operations.

        Wraps ChromaDB calls in a budgeted thread pool for async compatibility.
        """
        from ..core.resources import get_resources

        def _sync_retrieve():
            return self.retrieve(query, n_results, expand_graph, file_filter, chunk_types)

        return await get_resources().run_in_thread(_sync_retrieve)

    def get_modification_context(
        self, file_path: str, function_name: str | None = None, class_name: str | None = None
    ) -> dict[str, list[RetrievalResult]]:
        """
        Get comprehensive context for modifying a specific code location.

        This is the "smart" entry point that returns:
        - The target chunk itself
        - Its callers (might break)
        - Its callees (need to understand interface)
        - Sibling methods (if in a class)

        Args:
            file_path: Relative path to the file
            function_name: Name of function (optional)
            class_name: Name of class (optional)

        Returns:
            Dict with categorized context
        """
        result = {"target": [], "callers": [], "callees": [], "siblings": []}

        if not self.graph:
            return result

        # Find target chunk
        target_name = function_name or class_name
        if not target_name:
            return result

        # Look up by name
        candidates = self.graph.get_chunks_by_name(target_name)

        # Filter by file path if provided
        if file_path:
            candidates = [c for c in candidates if file_path in c.file_path]

        if not candidates:
            return result

        target = candidates[0]
        result["target"] = [RetrievalResult(chunk=target, score=1.0, retrieval_method="direct")]

        # Get context from graph
        context = self.graph.get_context_for_modification(target.chunk_id)

        for caller in context["callers"]:
            result["callers"].append(
                RetrievalResult(chunk=caller, score=0.8, retrieval_method="graph")
            )

        for callee in context["callees"]:
            result["callees"].append(
                RetrievalResult(chunk=callee, score=0.7, retrieval_method="graph")
            )

        for sibling in context["siblings"]:
            result["siblings"].append(
                RetrievalResult(chunk=sibling, score=0.6, retrieval_method="graph")
            )

        return result

    def smart_expand(self, chunk_id: str, depth: int = 2) -> list[RetrievalResult]:
        """
        On-demand deeper graph traversal (Agent-triggered "smart jump").

        When 1-layer expansion isn't enough, the agent can request
        deeper traversal for specific chunks.

        Args:
            chunk_id: The chunk to expand from
            depth: How many layers to expand (default 2)

        Returns:
            Additional context chunks
        """
        if not self.graph:
            return []

        chunk = self.graph.get_chunk(chunk_id)
        if not chunk:
            return []

        related = self.graph.get_related_chunks([chunk], depth=depth)

        return [
            RetrievalResult(
                chunk=c,
                score=0.4,  # Lower score for deep expansion
                retrieval_method="smart_jump",
            )
            for c in related
        ]

    def generate_context_injection(
        self, query: str, max_tokens: int = 4000, include_signatures_only: bool = False
    ) -> str:
        """
        Generate context string for AI prompt injection.

        Args:
            query: The current task or error
            max_tokens: Maximum tokens (estimate: 4 chars = 1 token)
            include_signatures_only: If True, only include function signatures

        Returns:
            Formatted context string ready for prompt injection
        """
        results = self.retrieve(query, n_results=15, expand_graph=True)

        if not results:
            return ""

        parts = ["## 📚 Relevant Code Context (RAG)", ""]
        current_chars = 0
        max_chars = max_tokens * 4

        for result in results:
            chunk = result.chunk

            # Use signature if available and requested
            if include_signatures_only and chunk.signature:
                content = chunk.signature
            else:
                content = chunk.content

            # Format chunk
            method_tag = f"[{result.retrieval_method.upper()}]"
            location = f"`{chunk.file_path}` → `{chunk.name}`"
            lines = f"L{chunk.start_line}-{chunk.end_line}"

            chunk_content = f"""### {method_tag} {location} ({lines})
```python
{content}
```
"""
            # Check token budget
            chunk_chars = len(chunk_content)
            if current_chars + chunk_chars > max_chars:
                break

            parts.append(chunk_content)
            current_chars += chunk_chars

        return "\n".join(parts)

    def get_stats(self) -> RAGStats:
        """Get combined RAG statistics."""
        return RAGStats(
            index_stats=self.indexer.get_stats() if self.indexer else None,
            graph_stats=self.graph.get_stats() if self.graph else None,
            total_chunks_indexed=len(self._chunks),
            last_index_time=datetime.now().isoformat() if self._chunks else None,
            chroma_available=CHROMA_AVAILABLE,
        )

    def update_file(self, file_path: Path) -> int:
        """
        Incrementally update index for a single changed file.

        Args:
            file_path: Path to the modified file

        Returns:
            Number of chunks updated
        """
        if not self.is_available:
            return 0

        try:
            rel_path = str(file_path.relative_to(self.project_root))
            # Normalize to forward slashes for cross-platform consistency
            rel_path = rel_path.replace("\\", "/")
        except ValueError:
            rel_path = str(file_path).replace("\\", "/")

        # Remove old chunks for this file
        old_chunk_ids = self._file_to_chunks.get(rel_path, [])
        if old_chunk_ids:
            try:
                self.collection.delete(ids=old_chunk_ids)
            except Exception as e:
                logger.warning(f"Failed to delete old chunks: {e}")

        # Re-index the file
        try:
            new_chunks = list(self.indexer.index_file(file_path))
        except Exception as e:
            logger.warning(f"Failed to index {file_path}: {e}")
            return 0

        if not new_chunks:
            return 0

        # Update in-memory structures
        for chunk in new_chunks:
            self._chunks[chunk.chunk_id] = chunk
            if self.graph:
                self.graph.add_chunk(chunk)

        self._file_to_chunks[rel_path] = [c.chunk_id for c in new_chunks]

        # Upsert to ChromaDB
        try:
            self.collection.upsert(
                ids=[c.chunk_id for c in new_chunks],
                documents=[self._chunk_to_document(c) for c in new_chunks],
                metadatas=[self._chunk_to_metadata(c) for c in new_chunks],
            )
        except Exception as e:
            logger.error(f"Failed to upsert chunks: {e}")
            return 0

        return len(new_chunks)

    def clear(self) -> None:
        """Clear all indexed data."""
        if self.client and self.collection:
            try:
                self.client.delete_collection(self.collection_name)
                self.collection = self.client.create_collection(
                    name=self.collection_name, metadata={"hnsw:space": "cosine"}
                )
            except Exception as e:
                logger.error(f"Failed to clear collection: {e}")

        self._chunks.clear()
        self._file_to_chunks.clear()
        self.graph = None

    # -------------------------------------------------------------------------
    # Private helpers
    # -------------------------------------------------------------------------

    def _chunk_to_document(self, chunk: CodeChunk) -> str:
        """Convert chunk to semantic document for embedding."""
        parts = [f"{chunk.chunk_type}::{chunk.name}"]

        if chunk.docstring:
            parts.append(chunk.docstring)

        if chunk.signature:
            parts.append(chunk.signature)
        else:
            parts.append(chunk.content[:500])  # Limit content size

        return "\n".join(parts)

    def _chunk_to_metadata(self, chunk: CodeChunk) -> dict:
        """Convert chunk to metadata for filtering."""
        return {
            "file_path": chunk.file_path,
            "chunk_type": chunk.chunk_type,
            "name": chunk.name,
            "start_line": chunk.start_line,
            "end_line": chunk.end_line,
            "parent": chunk.parent or "",
            "has_docstring": bool(chunk.docstring),
        }

    def _build_where_filter(
        self, file_filter: str | None, chunk_types: list[str] | None
    ) -> dict | None:
        """Build ChromaDB where filter."""
        conditions = []

        if file_filter:
            conditions.append({"file_path": {"$contains": file_filter}})

        if chunk_types:
            if len(chunk_types) == 1:
                conditions.append({"chunk_type": chunk_types[0]})
            else:
                conditions.append({"chunk_type": {"$in": chunk_types}})

        if not conditions:
            return None

        if len(conditions) == 1:
            return conditions[0]

        return {"$and": conditions}

    def _get_or_reconstruct_chunk(
        self, chunk_id: str, results: dict, index: int
    ) -> CodeChunk | None:
        """Get chunk from cache or reconstruct from query results."""
        if chunk_id in self._chunks:
            return self._chunks[chunk_id]

        # Reconstruct from metadata
        if not results.get("metadatas"):
            return None

        meta = results["metadatas"][0][index]
        doc = results["documents"][0][index] if results.get("documents") else ""

        return CodeChunk(
            chunk_id=chunk_id,
            file_path=meta.get("file_path", "unknown"),
            chunk_type=meta.get("chunk_type", "unknown"),
            name=meta.get("name", "unknown"),
            content=doc.split("\n", 2)[-1] if doc else "",  # Skip type::name header
            start_line=meta.get("start_line", 0),
            end_line=meta.get("end_line", 0),
            parent=meta.get("parent") or None,
        )

    def _load_chunks_from_db(self) -> None:
        """Load chunk metadata from existing ChromaDB collection."""
        if not self.collection:
            return

        try:
            # Get all items (limited for memory safety)
            results = self.collection.get(limit=10000, include=["metadatas", "documents"])

            if results and results.get("ids"):
                for i, chunk_id in enumerate(results["ids"]):
                    chunk = self._get_or_reconstruct_chunk(chunk_id, results, i)
                    if chunk:
                        self._chunks[chunk_id] = chunk

                        # Build file index
                        if chunk.file_path not in self._file_to_chunks:
                            self._file_to_chunks[chunk.file_path] = []
                        self._file_to_chunks[chunk.file_path].append(chunk_id)

                # Rebuild graph
                if self._chunks:
                    from .graph_builder import DependencyGraph

                    self.graph = DependencyGraph(list(self._chunks.values()))

                logger.info(f"Loaded {len(self._chunks)} chunks from existing index")
        except Exception as e:
            logger.warning(f"Failed to load chunks from DB: {e}")

    def _build_file_index(self, chunks: list[CodeChunk]) -> None:
        """Build file path to chunk ID mapping."""
        self._file_to_chunks.clear()
        for chunk in chunks:
            if chunk.file_path not in self._file_to_chunks:
                self._file_to_chunks[chunk.file_path] = []
            self._file_to_chunks[chunk.file_path].append(chunk.chunk_id)

is_available property

Check if RAG system is available.

build_index(force=False, incremental=True)

Index the entire codebase.

Parameters:

Name Type Description Default
force bool

If True, rebuild even if index exists

False
incremental bool

If True (and not force), only index changed files.

True

Returns:

Type Description
int

Number of chunks indexed

Source code in src/boring/rag/rag_retriever.py
def build_index(self, force: bool = False, incremental: bool = True) -> int:
    """
    Index the entire codebase.

    Args:
        force: If True, rebuild even if index exists
        incremental: If True (and not force), only index changed files.

    Returns:
        Number of chunks indexed
    """
    if not self.is_available:
        logger.warning("ChromaDB not available, skipping index build")
        return 0

    # 1. Handle Force Rebuild
    existing_count = self.collection.count()
    if force:
        logger.info("Force rebuild: clearing existing index")
        try:
            self.client.delete_collection(self.collection_name)
            self.collection = self.client.create_collection(
                name=self.collection_name, metadata={"hnsw:space": "cosine"}
            )
            self.index_state = IndexState(self.project_root)  # Reset state
            # Clear state file explicitly?
            # IndexState loads from disk. We should clear it.
            # But IndexState doesn't have clear method yet beyond remove items.
            # For now we just ignore load?
            # Better: implement clear() in IndexState later or just iterate deletions.
            # Here we assume starting fresh means empty DB, but IndexState might still have data on disk.
            # We should probably clear cached state file too.
            # self.index_state.clear() # user needs to add this method or we assume overwrite updates.
            pass
        except Exception as e:
            logger.error(f"Failed to clear collection: {e}")
            return 0

    # 2. Collect Files from all project roots (multi-project support)
    all_files = []
    for root in self.all_project_roots:
        indexer = CodeIndexer(root)
        all_files.extend(indexer.collect_files())

    # 3. Determine Changes
    current_commit = ""
    try:
        # Get current git commit hash
        import subprocess

        cmd = ["git", "rev-parse", "HEAD"]
        current_commit = subprocess.check_output(cmd, cwd=self.project_root, text=True).strip()
    except Exception:
        logger.debug("Could not get git commit hash")

    last_commit = self.index_state.get_last_commit()

    if incremental and not force and last_commit and current_commit:
        files_to_index = self.indexer.get_changed_files(last_commit)
        # For incremental diff based indexing, we don't easily know "stale" files
        # unless we scan everything or trust git diff for deletions (which get_changed_files doesn't strictly handle for removals from index yet).
        # Actually CodeIndexer.get_changed_files logic returns *existing* files that changed.
        # It doesn't return deleted files.
        # We need to handle deletions separately if we rely purely on git diff.
        # But let's stick to the current hybrid approach:
        # If we trust IndexState knows what IT thinks is indexed, we can check if those still exist?
        # Or use git diff for deletions too?
        # For simplicity in V1 (Phase 1.2), we will re-scan all for stale check (fast)
        # but only INDEX changed_files (slow).

        # Optimization: collect_files is fast (just os.walk). The heavy part is index_file (parsing).
        # So we can still do collect_files() to find stale ones.
        all_files_current = []
        for root in self.all_project_roots:
            idx = CodeIndexer(root)
            all_files_current.extend(idx.collect_files())

        stale_files_rel = self.index_state.get_stale_files(all_files_current)

        # But files_to_index should only be the changed ones from git diff
        # Note: get_changed_files from code_indexer uses git diff.
        # But we passed `self.indexer` which is initialized with project_root.
        # If changed_files are returned, we use them.
        # What if files_to_index is empty? means nothing changed content-wise.
        pass
    else:
        # Full scan fallback
        files_to_index = all_files
        stale_files_rel = []

    if not files_to_index and not stale_files_rel and (last_commit == current_commit):
        logger.info("Index is up to date (commit match).")
        # We still need to load chunks into memory for graph building
        self._load_chunks_from_db()
        return existing_count

    # 4. Handle Deletions
    for rel_path in stale_files_rel:
        chunk_ids = self.index_state.state.get(rel_path, {}).get("chunks", [])
        if chunk_ids:
            try:
                self.collection.delete(ids=chunk_ids)
            except Exception:
                pass
        self.index_state.remove(rel_path)
        logger.info(f"Removed stale file: {rel_path}")

    # 5. Handle Indexing
    new_chunks_buffer = []
    total_indexed = 0

    # Reset indexer stats for accurate reporting
    self.indexer.stats = IndexStats()

    for file_path in files_to_index:
        # Clear old chunks for modified file
        rel_path = self.index_state._get_rel_path(file_path)
        old_ids = self.index_state.get_chunks_for_file(file_path)
        if old_ids:
            try:
                self.collection.delete(ids=old_ids)
            except Exception:
                pass

        # Generate new chunks
        try:
            chunks = list(self.indexer.index_file(file_path))
        except Exception as e:
            logger.warning(f"Failed to index {file_path}: {e}")
            self.indexer.stats.skipped_files += 1
            continue

        if not chunks:
            continue

        # Track stats per file
        self.indexer.stats.total_files += 1
        for chunk in chunks:
            self.indexer.stats.total_chunks += 1
            if chunk.chunk_type == "function":
                self.indexer.stats.functions += 1
            elif chunk.chunk_type == "class":
                self.indexer.stats.classes += 1
            elif chunk.chunk_type == "method":
                self.indexer.stats.methods += 1
            elif chunk.chunk_type == "script":
                self.indexer.stats.script_chunks += 1

        # Batch upsert logic preparation
        new_chunks_buffer.extend(chunks)

        # Update state with new IDs
        ids = [c.chunk_id for c in chunks]
        self.index_state.update(file_path, ids)
        total_indexed += len(chunks)

    # 6. Bulk Upsert to Chroma
    if new_chunks_buffer:
        ids = [c.chunk_id for c in new_chunks_buffer]
        documents = [self._chunk_to_document(c) for c in new_chunks_buffer]
        metadatas = [self._chunk_to_metadata(c) for c in new_chunks_buffer]

        batch_size = 100
        for i in range(0, len(new_chunks_buffer), batch_size):
            end = min(i + batch_size, len(new_chunks_buffer))
            try:
                self.collection.upsert(
                    ids=ids[i:end], documents=documents[i:end], metadatas=metadatas[i:end]
                )
            except Exception as e:
                logger.error(f"Failed to upsert batch: {e}")

    # 7. Persist State
    if current_commit:
        self.index_state.update_commit(current_commit)
    self.index_state.save()

    # 8. Reload fully for graph building (Hybrid RAG needs graph)
    # Note: In a huge repo, loading all chunks might be heavy.
    # Ideally we load only necessary graph data.
    # For V10 we load all instructions.
    self._load_chunks_from_db()

    # Update graph with new/all chunks
    # _load_chunks_from_db handles rebuilding self._chunks and self.graph

    logger.info(
        f"Indexed {len(files_to_index)} files ({total_indexed} chunks). Removed {len(stale_files_rel)} stale files."
    )

    return self.collection.count()

retrieve(query, n_results=10, expand_graph=True, file_filter=None, chunk_types=None, threshold=0.0, use_hyde=True, use_rerank=True)

Retrieve relevant code chunks with caching.

Parameters:

Name Type Description Default
query str

Natural language query or error message

required
n_results int

Maximum results to return

10
expand_graph bool

Whether to include 1-layer dependency context

True
file_filter str | None

Filter by file path substring (e.g., "auth")

None
chunk_types list[str] | None

Filter by chunk types (e.g., ["function", "class"])

None
threshold float

Minimum relevance score (0.0 to 1.0)

0.0
use_hyde bool

Whether to use HyDE query expansion (V10.24+)

True
use_rerank bool

Whether to use Cross-Encoder reranking (V10.24+)

True

Returns:

Type Description
list[RetrievalResult]

List of RetrievalResult sorted by relevance

Source code in src/boring/rag/rag_retriever.py
def retrieve(
    self,
    query: str,
    n_results: int = 10,
    expand_graph: bool = True,
    file_filter: str | None = None,
    chunk_types: list[str] | None = None,
    threshold: float = 0.0,
    use_hyde: bool = True,
    use_rerank: bool = True,
) -> list[RetrievalResult]:
    """
    Retrieve relevant code chunks with caching.

    Args:
        query: Natural language query or error message
        n_results: Maximum results to return
        expand_graph: Whether to include 1-layer dependency context
        file_filter: Filter by file path substring (e.g., "auth")
        chunk_types: Filter by chunk types (e.g., ["function", "class"])
        threshold: Minimum relevance score (0.0 to 1.0)
        use_hyde: Whether to use HyDE query expansion (V10.24+)
        use_rerank: Whether to use Cross-Encoder reranking (V10.24+)

    Returns:
        List of RetrievalResult sorted by relevance
    """
    if not self.is_available:
        return []

    # Generate cache key
    cache_key = f"{query}:{n_results}:{expand_graph}:{file_filter}:{chunk_types}:{threshold}:{use_hyde}:{use_rerank}"

    # Check cache
    with _cache_lock:
        if cache_key in _query_cache:
            cached_results, cache_time = _query_cache[cache_key]
            if time.time() - cache_time < _QUERY_CACHE_TTL:
                return cached_results

    # Perform actual retrieval
    results = self._retrieve_impl(
        query,
        n_results,
        expand_graph,
        file_filter,
        chunk_types,
        threshold,
        use_hyde=use_hyde,
        use_rerank=use_rerank,
    )

    # Update cache
    with _cache_lock:
        _query_cache[cache_key] = (results, time.time())

    return results

record_user_selection(chunk_id, query, session_id='')

Record that a user selected a specific chunk from results.

This feedback improves future ranking for similar queries.

Parameters:

Name Type Description Default
chunk_id str

The chunk that was selected

required
query str

The query that produced the results

required
session_id str

Optional session identifier

''
Source code in src/boring/rag/rag_retriever.py
def record_user_selection(self, chunk_id: str, query: str, session_id: str = ""):
    """
    Record that a user selected a specific chunk from results.

    This feedback improves future ranking for similar queries.

    Args:
        chunk_id: The chunk that was selected
        query: The query that produced the results
        session_id: Optional session identifier
    """
    ranker = _get_intelligent_ranker(self.project_root)
    if ranker:
        ranker.record_selection(chunk_id, query, session_id=session_id)
        logger.debug(f"Recorded selection: {chunk_id} for query: {query[:50]}")

retrieve_async(query, n_results=10, expand_graph=True, file_filter=None, chunk_types=None) async

Async version of retrieve for non-blocking operations.

Wraps ChromaDB calls in a budgeted thread pool for async compatibility.

Source code in src/boring/rag/rag_retriever.py
async def retrieve_async(
    self,
    query: str,
    n_results: int = 10,
    expand_graph: bool = True,
    file_filter: str | None = None,
    chunk_types: list[str] | None = None,
) -> list[RetrievalResult]:
    """
    Async version of retrieve for non-blocking operations.

    Wraps ChromaDB calls in a budgeted thread pool for async compatibility.
    """
    from ..core.resources import get_resources

    def _sync_retrieve():
        return self.retrieve(query, n_results, expand_graph, file_filter, chunk_types)

    return await get_resources().run_in_thread(_sync_retrieve)

get_modification_context(file_path, function_name=None, class_name=None)

Get comprehensive context for modifying a specific code location.

This is the "smart" entry point that returns: - The target chunk itself - Its callers (might break) - Its callees (need to understand interface) - Sibling methods (if in a class)

Parameters:

Name Type Description Default
file_path str

Relative path to the file

required
function_name str | None

Name of function (optional)

None
class_name str | None

Name of class (optional)

None

Returns:

Type Description
dict[str, list[RetrievalResult]]

Dict with categorized context

Source code in src/boring/rag/rag_retriever.py
def get_modification_context(
    self, file_path: str, function_name: str | None = None, class_name: str | None = None
) -> dict[str, list[RetrievalResult]]:
    """
    Get comprehensive context for modifying a specific code location.

    This is the "smart" entry point that returns:
    - The target chunk itself
    - Its callers (might break)
    - Its callees (need to understand interface)
    - Sibling methods (if in a class)

    Args:
        file_path: Relative path to the file
        function_name: Name of function (optional)
        class_name: Name of class (optional)

    Returns:
        Dict with categorized context
    """
    result = {"target": [], "callers": [], "callees": [], "siblings": []}

    if not self.graph:
        return result

    # Find target chunk
    target_name = function_name or class_name
    if not target_name:
        return result

    # Look up by name
    candidates = self.graph.get_chunks_by_name(target_name)

    # Filter by file path if provided
    if file_path:
        candidates = [c for c in candidates if file_path in c.file_path]

    if not candidates:
        return result

    target = candidates[0]
    result["target"] = [RetrievalResult(chunk=target, score=1.0, retrieval_method="direct")]

    # Get context from graph
    context = self.graph.get_context_for_modification(target.chunk_id)

    for caller in context["callers"]:
        result["callers"].append(
            RetrievalResult(chunk=caller, score=0.8, retrieval_method="graph")
        )

    for callee in context["callees"]:
        result["callees"].append(
            RetrievalResult(chunk=callee, score=0.7, retrieval_method="graph")
        )

    for sibling in context["siblings"]:
        result["siblings"].append(
            RetrievalResult(chunk=sibling, score=0.6, retrieval_method="graph")
        )

    return result

smart_expand(chunk_id, depth=2)

On-demand deeper graph traversal (Agent-triggered "smart jump").

When 1-layer expansion isn't enough, the agent can request deeper traversal for specific chunks.

Parameters:

Name Type Description Default
chunk_id str

The chunk to expand from

required
depth int

How many layers to expand (default 2)

2

Returns:

Type Description
list[RetrievalResult]

Additional context chunks

Source code in src/boring/rag/rag_retriever.py
def smart_expand(self, chunk_id: str, depth: int = 2) -> list[RetrievalResult]:
    """
    On-demand deeper graph traversal (Agent-triggered "smart jump").

    When 1-layer expansion isn't enough, the agent can request
    deeper traversal for specific chunks.

    Args:
        chunk_id: The chunk to expand from
        depth: How many layers to expand (default 2)

    Returns:
        Additional context chunks
    """
    if not self.graph:
        return []

    chunk = self.graph.get_chunk(chunk_id)
    if not chunk:
        return []

    related = self.graph.get_related_chunks([chunk], depth=depth)

    return [
        RetrievalResult(
            chunk=c,
            score=0.4,  # Lower score for deep expansion
            retrieval_method="smart_jump",
        )
        for c in related
    ]

generate_context_injection(query, max_tokens=4000, include_signatures_only=False)

Generate context string for AI prompt injection.

Parameters:

Name Type Description Default
query str

The current task or error

required
max_tokens int

Maximum tokens (estimate: 4 chars = 1 token)

4000
include_signatures_only bool

If True, only include function signatures

False

Returns:

Type Description
str

Formatted context string ready for prompt injection

Source code in src/boring/rag/rag_retriever.py
    def generate_context_injection(
        self, query: str, max_tokens: int = 4000, include_signatures_only: bool = False
    ) -> str:
        """
        Generate context string for AI prompt injection.

        Args:
            query: The current task or error
            max_tokens: Maximum tokens (estimate: 4 chars = 1 token)
            include_signatures_only: If True, only include function signatures

        Returns:
            Formatted context string ready for prompt injection
        """
        results = self.retrieve(query, n_results=15, expand_graph=True)

        if not results:
            return ""

        parts = ["## 📚 Relevant Code Context (RAG)", ""]
        current_chars = 0
        max_chars = max_tokens * 4

        for result in results:
            chunk = result.chunk

            # Use signature if available and requested
            if include_signatures_only and chunk.signature:
                content = chunk.signature
            else:
                content = chunk.content

            # Format chunk
            method_tag = f"[{result.retrieval_method.upper()}]"
            location = f"`{chunk.file_path}` → `{chunk.name}`"
            lines = f"L{chunk.start_line}-{chunk.end_line}"

            chunk_content = f"""### {method_tag} {location} ({lines})
```python
{content}
```
"""
            # Check token budget
            chunk_chars = len(chunk_content)
            if current_chars + chunk_chars > max_chars:
                break

            parts.append(chunk_content)
            current_chars += chunk_chars

        return "\n".join(parts)

get_stats()

Get combined RAG statistics.

Source code in src/boring/rag/rag_retriever.py
def get_stats(self) -> RAGStats:
    """Get combined RAG statistics."""
    return RAGStats(
        index_stats=self.indexer.get_stats() if self.indexer else None,
        graph_stats=self.graph.get_stats() if self.graph else None,
        total_chunks_indexed=len(self._chunks),
        last_index_time=datetime.now().isoformat() if self._chunks else None,
        chroma_available=CHROMA_AVAILABLE,
    )

update_file(file_path)

Incrementally update index for a single changed file.

Parameters:

Name Type Description Default
file_path Path

Path to the modified file

required

Returns:

Type Description
int

Number of chunks updated

Source code in src/boring/rag/rag_retriever.py
def update_file(self, file_path: Path) -> int:
    """
    Incrementally update index for a single changed file.

    Args:
        file_path: Path to the modified file

    Returns:
        Number of chunks updated
    """
    if not self.is_available:
        return 0

    try:
        rel_path = str(file_path.relative_to(self.project_root))
        # Normalize to forward slashes for cross-platform consistency
        rel_path = rel_path.replace("\\", "/")
    except ValueError:
        rel_path = str(file_path).replace("\\", "/")

    # Remove old chunks for this file
    old_chunk_ids = self._file_to_chunks.get(rel_path, [])
    if old_chunk_ids:
        try:
            self.collection.delete(ids=old_chunk_ids)
        except Exception as e:
            logger.warning(f"Failed to delete old chunks: {e}")

    # Re-index the file
    try:
        new_chunks = list(self.indexer.index_file(file_path))
    except Exception as e:
        logger.warning(f"Failed to index {file_path}: {e}")
        return 0

    if not new_chunks:
        return 0

    # Update in-memory structures
    for chunk in new_chunks:
        self._chunks[chunk.chunk_id] = chunk
        if self.graph:
            self.graph.add_chunk(chunk)

    self._file_to_chunks[rel_path] = [c.chunk_id for c in new_chunks]

    # Upsert to ChromaDB
    try:
        self.collection.upsert(
            ids=[c.chunk_id for c in new_chunks],
            documents=[self._chunk_to_document(c) for c in new_chunks],
            metadatas=[self._chunk_to_metadata(c) for c in new_chunks],
        )
    except Exception as e:
        logger.error(f"Failed to upsert chunks: {e}")
        return 0

    return len(new_chunks)

clear()

Clear all indexed data.

Source code in src/boring/rag/rag_retriever.py
def clear(self) -> None:
    """Clear all indexed data."""
    if self.client and self.collection:
        try:
            self.client.delete_collection(self.collection_name)
            self.collection = self.client.create_collection(
                name=self.collection_name, metadata={"hnsw:space": "cosine"}
            )
        except Exception as e:
            logger.error(f"Failed to clear collection: {e}")

    self._chunks.clear()
    self._file_to_chunks.clear()
    self.graph = None

RAGStats dataclass

Combined statistics for RAG system.

Source code in src/boring/rag/rag_retriever.py
@dataclass
class RAGStats:
    """Combined statistics for RAG system."""

    index_stats: IndexStats | None = None
    graph_stats: GraphStats | None = None
    total_chunks_indexed: int = 0
    last_index_time: str | None = None
    chroma_available: bool = CHROMA_AVAILABLE
    # V10.23: Session stats
    session_context_active: bool = False
    session_boosts_applied: int = 0

RetrievalResult dataclass

A retrieved code chunk with relevance info.

Source code in src/boring/rag/rag_retriever.py
@dataclass
class RetrievalResult:
    """A retrieved code chunk with relevance info."""

    chunk: CodeChunk
    score: float
    retrieval_method: str  # "vector", "graph", "keyword", "session"
    distance: float | None = None
    # V10.23: Enhanced metadata
    session_boost: float = 0.0  # Boost from session context
    task_relevance: float = 0.0  # Task-type relevance score

CrossEncoderReranker

Cross-encoder based reranker for high-precision code retrieval.

Cross-encoders process query-document pairs jointly, allowing for deeper semantic understanding than bi-encoder approaches.

Models: - ms-marco-MiniLM-L-6-v2: Fast, good for general text (default) - cross-encoder/ms-marco-TinyBERT-L-2: Ultra-fast - cross-encoder/stsb-roberta-base: Good for semantic similarity

Source code in src/boring/rag/reranker.py
class CrossEncoderReranker:
    """
    Cross-encoder based reranker for high-precision code retrieval.

    Cross-encoders process query-document pairs jointly, allowing for
    deeper semantic understanding than bi-encoder approaches.

    Models:
    - ms-marco-MiniLM-L-6-v2: Fast, good for general text (default)
    - cross-encoder/ms-marco-TinyBERT-L-2: Ultra-fast
    - cross-encoder/stsb-roberta-base: Good for semantic similarity
    """

    # Available models for different use cases
    MODELS = {
        "fast": "cross-encoder/ms-marco-TinyBERT-L-2",
        "balanced": "cross-encoder/ms-marco-MiniLM-L-6-v2",
        "accurate": "cross-encoder/stsb-roberta-base",
    }

    def __init__(self, model_name: str = "balanced", device: str = "cpu"):
        """
        Initialize cross-encoder reranker.

        Args:
            model_name: Model preset ("fast", "balanced", "accurate") or HuggingFace model name
            device: Device to use ("cpu", "cuda", "mps")
        """
        self.model_name = self.MODELS.get(model_name, model_name)
        self.device = device
        self._model: Any | None = None
        self._initialized = False

    def _ensure_model(self) -> bool:
        """Lazily initialize the model."""
        if self._initialized:
            return self._model is not None

        self._initialized = True

        if not DependencyManager.check_chroma():
            logger.debug(
                "CrossEncoder not available (sentence-transformers missing), using heuristic reranking"
            )
            return False

        try:
            from sentence_transformers import CrossEncoder

            self._model = CrossEncoder(self.model_name, device=self.device)
            logger.info(f"CrossEncoder initialized: {self.model_name}")
            return True
        except Exception as e:
            logger.warning(f"Failed to load CrossEncoder: {e}")
            return False

    def rerank(
        self,
        query: str,
        documents: list[str],
        original_scores: list[float],
        top_k: int = 10,
        weight_original: float = 0.3,
    ) -> list[tuple[int, RerankScore]]:
        """
        Rerank documents using cross-encoder.

        Args:
            query: Search query
            documents: List of document texts
            original_scores: Original retrieval scores
            top_k: Number of results to return
            weight_original: Weight for original score in combination

        Returns:
            List of (original_index, RerankScore) sorted by combined score
        """
        if not documents:
            return []

        if self._ensure_model():
            return self._rerank_with_model(
                query, documents, original_scores, top_k, weight_original
            )
        else:
            return self._rerank_heuristic(query, documents, original_scores, top_k, weight_original)

    def _rerank_with_model(
        self,
        query: str,
        documents: list[str],
        original_scores: list[float],
        top_k: int,
        weight_original: float,
    ) -> list[tuple[int, RerankScore]]:
        """Rerank using actual cross-encoder model."""
        # Create query-document pairs
        pairs = [(query, doc) for doc in documents]

        # Get cross-encoder scores
        try:
            scores = self._model.predict(pairs, show_progress_bar=False)
        except Exception as e:
            logger.warning(f"Cross-encoder prediction failed: {e}")
            return self._rerank_heuristic(query, documents, original_scores, top_k, weight_original)

        # Normalize scores to [0, 1]
        min_score = min(scores)
        max_score = max(scores)
        range_score = max_score - min_score if max_score > min_score else 1.0
        normalized_scores = [(s - min_score) / range_score for s in scores]

        # Combine with original scores
        results = []
        for i, (rerank_score, orig_score) in enumerate(
            zip(normalized_scores, original_scores, strict=True)
        ):
            combined = (1 - weight_original) * rerank_score + weight_original * orig_score
            results.append(
                (
                    i,
                    RerankScore(
                        original_score=orig_score,
                        rerank_score=rerank_score,
                        combined_score=combined,
                        confidence=0.9,
                        method="cross_encoder",
                    ),
                )
            )

        # Sort by combined score
        results.sort(key=lambda x: x[1].combined_score, reverse=True)
        return results[:top_k]

    def _rerank_heuristic(
        self,
        query: str,
        documents: list[str],
        original_scores: list[float],
        top_k: int,
        weight_original: float,
    ) -> list[tuple[int, RerankScore]]:
        """Heuristic reranking when cross-encoder unavailable."""
        query_lower = query.lower()
        query_terms = set(query_lower.split())

        results = []
        for i, (doc, orig_score) in enumerate(zip(documents, original_scores, strict=True)):
            doc_lower = doc.lower()

            # Heuristic scoring factors
            factors = []

            # 1. Exact phrase match (strongest signal)
            if query_lower in doc_lower:
                factors.append(0.4)

            # 2. Term coverage
            matching_terms = sum(1 for term in query_terms if term in doc_lower)
            term_coverage = matching_terms / len(query_terms) if query_terms else 0
            factors.append(term_coverage * 0.3)

            # 3. Position of first match (earlier is better)
            first_match_pos = min(
                (doc_lower.find(term) for term in query_terms if term in doc_lower),
                default=len(doc_lower),
            )
            position_score = 1.0 - (first_match_pos / len(doc_lower)) if doc_lower else 0
            factors.append(position_score * 0.2)

            # 4. Length penalty (very long docs might be less relevant)
            length_penalty = min(1.0, 500 / max(len(doc), 1))
            factors.append(length_penalty * 0.1)

            # Combine factors
            rerank_score = sum(factors)
            combined = (1 - weight_original) * rerank_score + weight_original * orig_score

            results.append(
                (
                    i,
                    RerankScore(
                        original_score=orig_score,
                        rerank_score=rerank_score,
                        combined_score=combined,
                        confidence=0.6,
                        method="heuristic",
                    ),
                )
            )

        results.sort(key=lambda x: x[1].combined_score, reverse=True)
        return results[:top_k]

__init__(model_name='balanced', device='cpu')

Initialize cross-encoder reranker.

Parameters:

Name Type Description Default
model_name str

Model preset ("fast", "balanced", "accurate") or HuggingFace model name

'balanced'
device str

Device to use ("cpu", "cuda", "mps")

'cpu'
Source code in src/boring/rag/reranker.py
def __init__(self, model_name: str = "balanced", device: str = "cpu"):
    """
    Initialize cross-encoder reranker.

    Args:
        model_name: Model preset ("fast", "balanced", "accurate") or HuggingFace model name
        device: Device to use ("cpu", "cuda", "mps")
    """
    self.model_name = self.MODELS.get(model_name, model_name)
    self.device = device
    self._model: Any | None = None
    self._initialized = False

rerank(query, documents, original_scores, top_k=10, weight_original=0.3)

Rerank documents using cross-encoder.

Parameters:

Name Type Description Default
query str

Search query

required
documents list[str]

List of document texts

required
original_scores list[float]

Original retrieval scores

required
top_k int

Number of results to return

10
weight_original float

Weight for original score in combination

0.3

Returns:

Type Description
list[tuple[int, RerankScore]]

List of (original_index, RerankScore) sorted by combined score

Source code in src/boring/rag/reranker.py
def rerank(
    self,
    query: str,
    documents: list[str],
    original_scores: list[float],
    top_k: int = 10,
    weight_original: float = 0.3,
) -> list[tuple[int, RerankScore]]:
    """
    Rerank documents using cross-encoder.

    Args:
        query: Search query
        documents: List of document texts
        original_scores: Original retrieval scores
        top_k: Number of results to return
        weight_original: Weight for original score in combination

    Returns:
        List of (original_index, RerankScore) sorted by combined score
    """
    if not documents:
        return []

    if self._ensure_model():
        return self._rerank_with_model(
            query, documents, original_scores, top_k, weight_original
        )
    else:
        return self._rerank_heuristic(query, documents, original_scores, top_k, weight_original)

EnsembleReranker

Ensemble reranker combining multiple signals.

Combines: 1. Cross-encoder/heuristic scores 2. Keyword matching 3. Code structure analysis 4. Usage patterns (from IntelligentRanker)

Source code in src/boring/rag/reranker.py
class EnsembleReranker:
    """
    Ensemble reranker combining multiple signals.

    Combines:
    1. Cross-encoder/heuristic scores
    2. Keyword matching
    3. Code structure analysis
    4. Usage patterns (from IntelligentRanker)
    """

    def __init__(self, weights: dict[str, float] | None = None):
        """
        Initialize ensemble reranker.

        Args:
            weights: Custom weights for each signal
        """
        self.weights = weights or {
            "semantic": 0.35,
            "keyword": 0.25,
            "structure": 0.20,
            "usage": 0.20,
        }
        self.cross_encoder = CrossEncoderReranker()

    def rerank(
        self,
        query: str,
        chunks: list,  # list of CodeChunk or similar
        original_scores: list[float],
        usage_scores: dict[str, float] | None = None,
        top_k: int = 10,
    ) -> list[tuple[int, float]]:
        """
        Rerank using ensemble of signals.

        Args:
            query: Search query
            chunks: List of code chunks with content and metadata
            original_scores: Original retrieval scores
            usage_scores: Optional usage-based scores from IntelligentRanker
            top_k: Number of results to return

        Returns:
            List of (original_index, final_score) sorted by score
        """
        if not chunks:
            return []

        documents = [self._chunk_to_text(chunk) for chunk in chunks]

        # 1. Get semantic/cross-encoder scores
        ce_results = self.cross_encoder.rerank(query, documents, original_scores, top_k=len(chunks))
        semantic_scores = {idx: score.rerank_score for idx, score in ce_results}

        # 2. Keyword matching scores
        keyword_scores = self._compute_keyword_scores(query, chunks)

        # 3. Structure scores
        structure_scores = self._compute_structure_scores(query, chunks)

        # 4. Usage scores (if provided)
        usage_scores = usage_scores or {}

        # Combine all signals
        final_scores = []
        for i, chunk in enumerate(chunks):
            chunk_id = getattr(chunk, "chunk_id", str(i))

            combined = (
                self.weights["semantic"] * semantic_scores.get(i, 0)
                + self.weights["keyword"] * keyword_scores.get(i, 0)
                + self.weights["structure"] * structure_scores.get(i, 0)
                + self.weights["usage"] * usage_scores.get(chunk_id, 0.5)
            )

            final_scores.append((i, combined))

        final_scores.sort(key=lambda x: x[1], reverse=True)
        return final_scores[:top_k]

    def _chunk_to_text(self, chunk) -> str:
        """Convert chunk to searchable text."""
        parts = []

        if hasattr(chunk, "name") and chunk.name:
            parts.append(f"Name: {chunk.name}")

        if hasattr(chunk, "signature") and chunk.signature:
            parts.append(f"Signature: {chunk.signature}")

        if hasattr(chunk, "docstring") and chunk.docstring:
            parts.append(f"Doc: {chunk.docstring}")

        if hasattr(chunk, "content") and chunk.content:
            # Limit content length
            content = chunk.content[:500]
            parts.append(f"Code: {content}")

        return "\n".join(parts)

    def _compute_keyword_scores(self, query: str, chunks: list) -> dict[int, float]:
        """Compute keyword matching scores."""
        query_terms = set(query.lower().split())
        scores = {}

        for i, chunk in enumerate(chunks):
            text = self._chunk_to_text(chunk).lower()

            matching = sum(1 for term in query_terms if term in text)
            scores[i] = matching / len(query_terms) if query_terms else 0

        return scores

    def _compute_structure_scores(self, query: str, chunks: list) -> dict[int, float]:
        """Compute code structure relevance scores."""
        scores = {}
        query_lower = query.lower()

        # Determine expected chunk types from query
        expected_types = []
        if any(kw in query_lower for kw in ["class", "model", "schema"]):
            expected_types.append("class")
        if any(kw in query_lower for kw in ["function", "method", "def"]):
            expected_types.extend(["function", "method"])
        if any(kw in query_lower for kw in ["test", "spec"]):
            expected_types.append("function")  # Tests are usually functions

        for i, chunk in enumerate(chunks):
            score = 0.5  # Default

            chunk_type = getattr(chunk, "chunk_type", "")

            # Boost for matching chunk type
            if expected_types and chunk_type in expected_types:
                score += 0.3

            # Boost for docstring presence
            if hasattr(chunk, "docstring") and chunk.docstring:
                score += 0.1

            # Boost for reasonable size
            if hasattr(chunk, "content"):
                length = len(chunk.content)
                if 50 < length < 500:  # Sweet spot
                    score += 0.1

            scores[i] = min(1.0, score)

        return scores

__init__(weights=None)

Initialize ensemble reranker.

Parameters:

Name Type Description Default
weights dict[str, float] | None

Custom weights for each signal

None
Source code in src/boring/rag/reranker.py
def __init__(self, weights: dict[str, float] | None = None):
    """
    Initialize ensemble reranker.

    Args:
        weights: Custom weights for each signal
    """
    self.weights = weights or {
        "semantic": 0.35,
        "keyword": 0.25,
        "structure": 0.20,
        "usage": 0.20,
    }
    self.cross_encoder = CrossEncoderReranker()

rerank(query, chunks, original_scores, usage_scores=None, top_k=10)

Rerank using ensemble of signals.

Parameters:

Name Type Description Default
query str

Search query

required
chunks list

List of code chunks with content and metadata

required
original_scores list[float]

Original retrieval scores

required
usage_scores dict[str, float] | None

Optional usage-based scores from IntelligentRanker

None
top_k int

Number of results to return

10

Returns:

Type Description
list[tuple[int, float]]

List of (original_index, final_score) sorted by score

Source code in src/boring/rag/reranker.py
def rerank(
    self,
    query: str,
    chunks: list,  # list of CodeChunk or similar
    original_scores: list[float],
    usage_scores: dict[str, float] | None = None,
    top_k: int = 10,
) -> list[tuple[int, float]]:
    """
    Rerank using ensemble of signals.

    Args:
        query: Search query
        chunks: List of code chunks with content and metadata
        original_scores: Original retrieval scores
        usage_scores: Optional usage-based scores from IntelligentRanker
        top_k: Number of results to return

    Returns:
        List of (original_index, final_score) sorted by score
    """
    if not chunks:
        return []

    documents = [self._chunk_to_text(chunk) for chunk in chunks]

    # 1. Get semantic/cross-encoder scores
    ce_results = self.cross_encoder.rerank(query, documents, original_scores, top_k=len(chunks))
    semantic_scores = {idx: score.rerank_score for idx, score in ce_results}

    # 2. Keyword matching scores
    keyword_scores = self._compute_keyword_scores(query, chunks)

    # 3. Structure scores
    structure_scores = self._compute_structure_scores(query, chunks)

    # 4. Usage scores (if provided)
    usage_scores = usage_scores or {}

    # Combine all signals
    final_scores = []
    for i, chunk in enumerate(chunks):
        chunk_id = getattr(chunk, "chunk_id", str(i))

        combined = (
            self.weights["semantic"] * semantic_scores.get(i, 0)
            + self.weights["keyword"] * keyword_scores.get(i, 0)
            + self.weights["structure"] * structure_scores.get(i, 0)
            + self.weights["usage"] * usage_scores.get(chunk_id, 0.5)
        )

        final_scores.append((i, combined))

    final_scores.sort(key=lambda x: x[1], reverse=True)
    return final_scores[:top_k]

RerankScore dataclass

Score from cross-encoder reranking.

Source code in src/boring/rag/reranker.py
@dataclass
class RerankScore:
    """Score from cross-encoder reranking."""

    original_score: float
    rerank_score: float
    combined_score: float
    confidence: float
    method: str  # "cross_encoder", "heuristic", "ensemble"

expand_query_with_hyde(query, use_llm=False)

Convenience function to expand a query using HyDE.

Parameters:

Name Type Description Default
query str

Natural language query

required
use_llm bool

Whether to use LLM for generation

False

Returns:

Type Description
HyDEResult

HyDEResult with hypothetical document

Source code in src/boring/rag/hyde.py
def expand_query_with_hyde(query: str, use_llm: bool = False) -> HyDEResult:
    """
    Convenience function to expand a query using HyDE.

    Args:
        query: Natural language query
        use_llm: Whether to use LLM for generation

    Returns:
        HyDEResult with hypothetical document
    """
    expander = get_hyde_expander(use_llm)
    return expander.expand_query(query)

create_rag_retriever(project_root=None, persist_dir=None)

Factory function to create RAGRetriever with standard project paths.

Parameters:

Name Type Description Default
project_root Path | None

Project root directory

None
persist_dir Path | None

Optional custom persist directory

None

Returns:

Type Description
RAGRetriever

RAGRetriever instance

Source code in src/boring/rag/rag_retriever.py
def create_rag_retriever(
    project_root: Path | None = None, persist_dir: Path | None = None
) -> RAGRetriever:
    """
    Factory function to create RAGRetriever with standard project paths.

    Args:
        project_root: Project root directory
        persist_dir: Optional custom persist directory

    Returns:
        RAGRetriever instance
    """
    if project_root is None:
        project_root = Path.cwd()

    return RAGRetriever(project_root=project_root, persist_dir=persist_dir)