use tree_sitter::Node; use archlens_domain::{CodeElement, CodeElementKind, Relationship, RelationshipKind}; use crate::extraction_context::ExtractionContext; use crate::language_extractor::LanguageExtractor; pub struct PythonExtractor; impl LanguageExtractor for PythonExtractor { fn tree_sitter_language(&self) -> tree_sitter::Language { tree_sitter_python::LANGUAGE.into() } fn extract_types(&self, root: &Node, source: &str, ctx: &mut ExtractionContext) { // collect_classes handles class elements, inheritance, and field compositions // in a single pass — Python's relationship extraction is interleaved with type extraction collect_classes(root, source, ctx); } fn extract_relationships(&self, _root: &Node, _source: &str, _ctx: &mut ExtractionContext) { // Relationships are collected inside collect_classes for Python } fn extract_imports(&self, root: &Node, source: &str, ctx: &mut ExtractionContext) { collect_imports(root, source, ctx); } } fn collect_classes(node: &Node, source: &str, ctx: &mut ExtractionContext) { let mut cursor = node.walk(); for child in node.children(&mut cursor) { if child.kind() != "class_definition" { continue; } let Some(name_node) = child.child_by_field_name("name") else { continue; }; let name = &source[name_node.byte_range()]; let line = child.start_position().row + 1; let methods = child .child_by_field_name("body") .map(|body| collect_methods(&body, source)) .unwrap_or_default(); match CodeElement::new(name, CodeElementKind::Class, ctx.file_path().clone(), line) { Ok(element) => ctx.add_element(element.with_methods(methods)), Err(e) => { ctx.add_warning(ctx.file_path().clone(), line, &e.to_string()); continue; } } if let Some(superclasses) = child.child_by_field_name("superclasses") { collect_inheritance(&superclasses, source, name, ctx); } if let Some(body) = child.child_by_field_name("body") { collect_typed_fields(&body, source, name, ctx); collect_constructor_params(&body, source, name, ctx); } } } fn collect_inheritance( superclasses: &Node, source: &str, class_name: &str, ctx: &mut ExtractionContext, ) { let mut cursor = superclasses.walk(); for child in superclasses.children(&mut cursor) { if child.kind() == "identifier" { let base_name = &source[child.byte_range()]; if !is_python_builtin(base_name) && let Ok(rel) = Relationship::new(class_name, base_name, RelationshipKind::Inheritance) { ctx.add_relationship(rel); } } } } const PYTHON_BUILTINS: &[&str] = &[ "str", "int", "float", "bool", "bytes", "list", "dict", "set", "tuple", "None", "type", "object", "Exception", "BaseException", "Optional", "Any", "Union", "List", "Dict", "Set", "Tuple", "Callable", "Sequence", "Mapping", "Iterable", "Iterator", "Generator", "Coroutine", "AsyncGenerator", "ClassVar", "Final", "Literal", "TypeVar", "Generic", "Protocol", "runtime_checkable", "Self", ]; fn is_python_builtin(name: &str) -> bool { PYTHON_BUILTINS.contains(&name) } const STDLIB_MODULES: &[&str] = &[ "os", "sys", "typing", "logging", "json", "re", "io", "abc", "collections", "datetime", "enum", "functools", "hashlib", "http", "importlib", "inspect", "itertools", "math", "pathlib", "pickle", "random", "shutil", "signal", "socket", "string", "subprocess", "tempfile", "threading", "time", "traceback", "unittest", "urllib", "uuid", "warnings", "contextlib", "dataclasses", "copy", "struct", "base64", "csv", "glob", "operator", "textwrap", "asyncio", "concurrent", "multiprocessing", ]; fn is_external_import(module: &str) -> bool { let top = module.split('.').next().unwrap_or(module); if STDLIB_MODULES.contains(&top) { return true; } if top.starts_with('_') { return true; } false } fn collect_imports(node: &Node, source: &str, ctx: &mut ExtractionContext) { let file_name = std::path::Path::new(ctx.file_path().as_str()) .file_stem() .and_then(|s| s.to_str()) .unwrap_or("unknown") .to_string(); let mut cursor = node.walk(); for child in node.children(&mut cursor) { match child.kind() { "import_statement" => { let mut name_cursor = child.walk(); for name_child in child.children(&mut name_cursor) { if name_child.kind() == "dotted_name" { let module = &source[name_child.byte_range()]; if !is_external_import(module) && let Ok(rel) = Relationship::new(&file_name, module, RelationshipKind::Import) { ctx.add_relationship(rel); } } } } "import_from_statement" => { if let Some(module_node) = child.child_by_field_name("module_name") { let module = &source[module_node.byte_range()]; if !is_external_import(module) && let Ok(rel) = Relationship::new(&file_name, module, RelationshipKind::Import) { ctx.add_relationship(rel); } } } _ => {} } } } fn collect_methods(body: &Node, source: &str) -> Vec { let mut methods = Vec::new(); let mut cursor = body.walk(); for child in body.children(&mut cursor) { if child.kind() != "function_definition" { continue; } let Some(name_node) = child.child_by_field_name("name") else { continue; }; let fn_name = &source[name_node.byte_range()]; if fn_name.starts_with('_') { continue; } let params = child .child_by_field_name("parameters") .map(|p| extract_python_params(&p, source)) .unwrap_or_default(); let ret = child .child_by_field_name("return_type") .map(|n| source[n.byte_range()].trim().to_string()) .unwrap_or_default(); let sig = if ret.is_empty() { format!("+{fn_name}({params})") } else { format!("+{fn_name}({params}) -> {ret}") }; methods.push(sig); } methods } fn extract_python_params(params_node: &Node, source: &str) -> String { let mut parts = Vec::new(); let mut cursor = params_node.walk(); for param in params_node.children(&mut cursor) { match param.kind() { "typed_parameter" => { if let Some(type_node) = param.child_by_field_name("type") { let mut inner = param.walk(); let name = param .children(&mut inner) .find(|c| c.kind() == "identifier") .map(|c| &source[c.byte_range()]) .unwrap_or_default(); if name != "self" && name != "cls" && !name.is_empty() { let ty = source[type_node.byte_range()].trim(); parts.push(format!("{name}: {ty}")); } } } "identifier" => { let name = &source[param.byte_range()]; if name != "self" && name != "cls" { parts.push(name.to_string()); } } _ => {} } } parts.join(", ") } fn collect_constructor_params( body: &Node, source: &str, class_name: &str, ctx: &mut ExtractionContext, ) { let mut cursor = body.walk(); for child in body.children(&mut cursor) { if child.kind() != "function_definition" { continue; } let Some(fn_name) = child.child_by_field_name("name") else { continue; }; if &source[fn_name.byte_range()] != "__init__" { continue; } let Some(params) = child.child_by_field_name("parameters") else { continue; }; let mut param_cursor = params.walk(); for param in params.children(&mut param_cursor) { if param.kind() == "typed_parameter" && let Some(type_node) = param.child_by_field_name("type") { let type_text = &source[type_node.byte_range()]; let base_type = type_text.split('[').next().unwrap_or(type_text).trim(); if base_type != class_name && !base_type.is_empty() && !is_python_builtin(base_type) && let Ok(rel) = Relationship::new(class_name, base_type, RelationshipKind::Composition) { ctx.add_relationship(rel); } } } } } fn collect_typed_fields(body: &Node, source: &str, class_name: &str, ctx: &mut ExtractionContext) { collect_typed_fields_recursive(body, source, class_name, ctx); } fn collect_typed_fields_recursive( node: &Node, source: &str, class_name: &str, ctx: &mut ExtractionContext, ) { let mut cursor = node.walk(); for child in node.children(&mut cursor) { if (child.kind() == "assignment" || child.kind() == "typed_assignment") && let Some(type_node) = child.child_by_field_name("type") { let type_text = &source[type_node.byte_range()]; let base_type = type_text.split('[').next().unwrap_or(type_text).trim(); if base_type != class_name && !base_type.is_empty() && !is_python_builtin(base_type) && let Ok(rel) = Relationship::new(class_name, base_type, RelationshipKind::Composition) { ctx.add_relationship(rel); } } collect_typed_fields_recursive(&child, source, class_name, ctx); } }