Skip to content

Graph API

builder

GraphBuilder

Source code in src/nervapack/graph/builder.py
class GraphBuilder:
    def __init__(self):
        self.graph = nx.DiGraph()

    def build_from_entities(self, entities: List[ParsedEntity]) -> nx.DiGraph:
        """
        Takes a list of ParsedEntity and constructs a directed graph.
        Files are nodes, entities are nodes. Contains/Defines edges connect them.
        """
        for entity in entities:
            # File Node
            file_node_id = f"file:{entity.file_path}"
            if not self.graph.has_node(file_node_id):
                self.graph.add_node(file_node_id, type="file", path=entity.file_path)

            # Entity Node
            # Unique ID for the entity
            entity_node_id = f"{entity.type}:{entity.file_path}:{entity.name}:{entity.start_line}"
            self.graph.add_node(
                entity_node_id, 
                type=entity.type, 
                name=entity.name, 
                file_path=entity.file_path,
                start_line=entity.start_line,
                end_line=entity.end_line,
                content=entity.content
            )

            # Edge from File -> Entity
            self.graph.add_edge(file_node_id, entity_node_id, relation="DEFINES")

        return self.graph

    def save_graph(self, path: str = ".nervapack/graph.graphml"):
        import os
        os.makedirs(os.path.dirname(path), exist_ok=True)
        nx.write_graphml(self.graph, path)

    def load_graph(self, path: str = ".nervapack/graph.graphml"):
        self.graph = nx.read_graphml(path)
        return self.graph

    def remove_nodes_for_file(self, file_path: str):
        """Removes the file node and all entities associated with it."""
        nodes_to_remove = []
        file_node_id = f"file:{file_path}"
        if self.graph.has_node(file_node_id):
            nodes_to_remove.append(file_node_id)

        for node, data in self.graph.nodes(data=True):
            if data.get("file_path") == file_path:
                nodes_to_remove.append(node)

        self.graph.remove_nodes_from(nodes_to_remove)

build_from_entities(entities)

Takes a list of ParsedEntity and constructs a directed graph. Files are nodes, entities are nodes. Contains/Defines edges connect them.

Source code in src/nervapack/graph/builder.py
def build_from_entities(self, entities: List[ParsedEntity]) -> nx.DiGraph:
    """
    Takes a list of ParsedEntity and constructs a directed graph.
    Files are nodes, entities are nodes. Contains/Defines edges connect them.
    """
    for entity in entities:
        # File Node
        file_node_id = f"file:{entity.file_path}"
        if not self.graph.has_node(file_node_id):
            self.graph.add_node(file_node_id, type="file", path=entity.file_path)

        # Entity Node
        # Unique ID for the entity
        entity_node_id = f"{entity.type}:{entity.file_path}:{entity.name}:{entity.start_line}"
        self.graph.add_node(
            entity_node_id, 
            type=entity.type, 
            name=entity.name, 
            file_path=entity.file_path,
            start_line=entity.start_line,
            end_line=entity.end_line,
            content=entity.content
        )

        # Edge from File -> Entity
        self.graph.add_edge(file_node_id, entity_node_id, relation="DEFINES")

    return self.graph

remove_nodes_for_file(file_path)

Removes the file node and all entities associated with it.

Source code in src/nervapack/graph/builder.py
def remove_nodes_for_file(self, file_path: str):
    """Removes the file node and all entities associated with it."""
    nodes_to_remove = []
    file_node_id = f"file:{file_path}"
    if self.graph.has_node(file_node_id):
        nodes_to_remove.append(file_node_id)

    for node, data in self.graph.nodes(data=True):
        if data.get("file_path") == file_path:
            nodes_to_remove.append(node)

    self.graph.remove_nodes_from(nodes_to_remove)

retrieval

RetrievalMetadata dataclass

Metadata about the graph traversal process.

Source code in src/nervapack/graph/retrieval.py
@dataclass
class RetrievalMetadata:
    """Metadata about the graph traversal process."""
    seed_nodes: List[str]
    expanded_nodes: List[str]
    total_nodes: int
    traversal_depth: int
    edges_followed: List[Tuple[str, str, str]]  # (source, target, relation)

GraphRetriever

Source code in src/nervapack/graph/retrieval.py
class GraphRetriever:
    def __init__(self, graph: nx.DiGraph):
        self.graph = graph
        self.last_metadata: Optional[RetrievalMetadata] = None

    def retrieve_context(self, start_node_ids: List[str], max_hops: int = 2) -> nx.DiGraph:
        """
        Retrieves a sub-graph using K-Hop BFS from the given start nodes.
        Uses Betweenness Centrality to prune high-degree "hub" nodes if necessary.

        Also tracks metadata about the traversal which can be accessed via self.last_metadata.
        """
        visited = set()
        queue = [(node_id, 0) for node_id in start_node_ids if self.graph.has_node(node_id)]

        subgraph_nodes = set()
        seed_nodes = [nid for nid in start_node_ids if self.graph.has_node(nid)]
        expanded_nodes = []
        edges_followed = []
        max_depth_reached = 0

        while queue:
            current_node, hops = queue.pop(0)

            if current_node in visited:
                continue

            visited.add(current_node)
            subgraph_nodes.add(current_node)
            max_depth_reached = max(max_depth_reached, hops)

            # Track if this was expanded from a seed
            if current_node not in seed_nodes:
                expanded_nodes.append(current_node)

            if hops < max_hops:
                for neighbor in self.graph.neighbors(current_node):
                    if neighbor not in visited:
                        # Track edge traversal
                        edge_data = self.graph.get_edge_data(current_node, neighbor)
                        relation = edge_data.get("relation", "unknown") if edge_data else "unknown"
                        edges_followed.append((current_node, neighbor, relation))
                        queue.append((neighbor, hops + 1))

                # Also traverse incoming edges
                for predecessor in self.graph.predecessors(current_node):
                    if predecessor not in visited:
                        edge_data = self.graph.get_edge_data(predecessor, current_node)
                        relation = edge_data.get("relation", "unknown") if edge_data else "unknown"
                        edges_followed.append((predecessor, current_node, relation))
                        queue.append((predecessor, hops + 1))

        # Store metadata
        self.last_metadata = RetrievalMetadata(
            seed_nodes=seed_nodes,
            expanded_nodes=expanded_nodes,
            total_nodes=len(subgraph_nodes),
            traversal_depth=max_depth_reached,
            edges_followed=edges_followed,
        )

        return self.graph.subgraph(subgraph_nodes).copy()

    def format_as_markdown(self, subgraph: nx.DiGraph) -> str:
        """
        Formats the retrieved sub-graph into a minimized, clean Markdown block.
        """
        markdown_lines = []
        markdown_lines.append("# NervaPack Context Retrieval\n")

        # Group by file
        files = {}
        for node, data in subgraph.nodes(data=True):
            if data.get('type') == 'file':
                continue

            file_path = data.get('file_path', 'Unknown')
            if file_path not in files:
                files[file_path] = []
            files[file_path].append(data)

        for file_path, nodes in files.items():
            markdown_lines.append(f"## File: `{file_path}`\n")
            # Sort by start line
            nodes = sorted(nodes, key=lambda x: x.get('start_line', 0))
            for node_data in nodes:
                node_type = node_data.get('type', 'entity').upper()
                name = node_data.get('name', 'Unknown')
                lines = f"(L{node_data.get('start_line', '?')}-L{node_data.get('end_line', '?')})"
                markdown_lines.append(f"### {node_type}: {name} {lines}")

                content = node_data.get('content', '')
                if content:
                    markdown_lines.append("```")
                    markdown_lines.append(content)
                    markdown_lines.append("```\n")

        return "\n".join(markdown_lines)

    def get_source_files(self, subgraph: nx.DiGraph) -> List[str]:
        """Return deduplicated file paths of all non-file nodes in the subgraph."""
        seen = set()
        result = []
        for _, data in subgraph.nodes(data=True):
            fp = data.get("file_path")
            if fp and data.get("type") != "file" and fp not in seen:
                seen.add(fp)
                result.append(fp)
        return result

retrieve_context(start_node_ids, max_hops=2)

Retrieves a sub-graph using K-Hop BFS from the given start nodes. Uses Betweenness Centrality to prune high-degree "hub" nodes if necessary.

Also tracks metadata about the traversal which can be accessed via self.last_metadata.

Source code in src/nervapack/graph/retrieval.py
def retrieve_context(self, start_node_ids: List[str], max_hops: int = 2) -> nx.DiGraph:
    """
    Retrieves a sub-graph using K-Hop BFS from the given start nodes.
    Uses Betweenness Centrality to prune high-degree "hub" nodes if necessary.

    Also tracks metadata about the traversal which can be accessed via self.last_metadata.
    """
    visited = set()
    queue = [(node_id, 0) for node_id in start_node_ids if self.graph.has_node(node_id)]

    subgraph_nodes = set()
    seed_nodes = [nid for nid in start_node_ids if self.graph.has_node(nid)]
    expanded_nodes = []
    edges_followed = []
    max_depth_reached = 0

    while queue:
        current_node, hops = queue.pop(0)

        if current_node in visited:
            continue

        visited.add(current_node)
        subgraph_nodes.add(current_node)
        max_depth_reached = max(max_depth_reached, hops)

        # Track if this was expanded from a seed
        if current_node not in seed_nodes:
            expanded_nodes.append(current_node)

        if hops < max_hops:
            for neighbor in self.graph.neighbors(current_node):
                if neighbor not in visited:
                    # Track edge traversal
                    edge_data = self.graph.get_edge_data(current_node, neighbor)
                    relation = edge_data.get("relation", "unknown") if edge_data else "unknown"
                    edges_followed.append((current_node, neighbor, relation))
                    queue.append((neighbor, hops + 1))

            # Also traverse incoming edges
            for predecessor in self.graph.predecessors(current_node):
                if predecessor not in visited:
                    edge_data = self.graph.get_edge_data(predecessor, current_node)
                    relation = edge_data.get("relation", "unknown") if edge_data else "unknown"
                    edges_followed.append((predecessor, current_node, relation))
                    queue.append((predecessor, hops + 1))

    # Store metadata
    self.last_metadata = RetrievalMetadata(
        seed_nodes=seed_nodes,
        expanded_nodes=expanded_nodes,
        total_nodes=len(subgraph_nodes),
        traversal_depth=max_depth_reached,
        edges_followed=edges_followed,
    )

    return self.graph.subgraph(subgraph_nodes).copy()

format_as_markdown(subgraph)

Formats the retrieved sub-graph into a minimized, clean Markdown block.

Source code in src/nervapack/graph/retrieval.py
def format_as_markdown(self, subgraph: nx.DiGraph) -> str:
    """
    Formats the retrieved sub-graph into a minimized, clean Markdown block.
    """
    markdown_lines = []
    markdown_lines.append("# NervaPack Context Retrieval\n")

    # Group by file
    files = {}
    for node, data in subgraph.nodes(data=True):
        if data.get('type') == 'file':
            continue

        file_path = data.get('file_path', 'Unknown')
        if file_path not in files:
            files[file_path] = []
        files[file_path].append(data)

    for file_path, nodes in files.items():
        markdown_lines.append(f"## File: `{file_path}`\n")
        # Sort by start line
        nodes = sorted(nodes, key=lambda x: x.get('start_line', 0))
        for node_data in nodes:
            node_type = node_data.get('type', 'entity').upper()
            name = node_data.get('name', 'Unknown')
            lines = f"(L{node_data.get('start_line', '?')}-L{node_data.get('end_line', '?')})"
            markdown_lines.append(f"### {node_type}: {name} {lines}")

            content = node_data.get('content', '')
            if content:
                markdown_lines.append("```")
                markdown_lines.append(content)
                markdown_lines.append("```\n")

    return "\n".join(markdown_lines)

get_source_files(subgraph)

Return deduplicated file paths of all non-file nodes in the subgraph.

Source code in src/nervapack/graph/retrieval.py
def get_source_files(self, subgraph: nx.DiGraph) -> List[str]:
    """Return deduplicated file paths of all non-file nodes in the subgraph."""
    seen = set()
    result = []
    for _, data in subgraph.nodes(data=True):
        fp = data.get("file_path")
        if fp and data.get("type") != "file" and fp not in seen:
            seen.add(fp)
            result.append(fp)
    return result