From 44bbd34a260d4e447d7c849230a45b9e0a7d540a Mon Sep 17 00:00:00 2001 From: Sevhena Walker <83547364+Sevhena@users.noreply.github.com> Date: Wed, 19 Mar 2025 22:34:58 -0400 Subject: [PATCH 1/3] refined string type determination logic in SCL detector closes #519 --- .../detect_string_concat_in_loop.py | 376 +++++++++++++----- tests/analyzers/test_str_concat_analyzer.py | 27 +- 2 files changed, 305 insertions(+), 98 deletions(-) diff --git a/src/ecooptimizer/analyzers/astroid_analyzers/detect_string_concat_in_loop.py b/src/ecooptimizer/analyzers/astroid_analyzers/detect_string_concat_in_loop.py index 442c6452..7be20d66 100644 --- a/src/ecooptimizer/analyzers/astroid_analyzers/detect_string_concat_in_loop.py +++ b/src/ecooptimizer/analyzers/astroid_analyzers/detect_string_concat_in_loop.py @@ -1,11 +1,15 @@ from pathlib import Path import re +from typing import Any from astroid import nodes, util, parse, AttributeInferenceError +from ...config import CONFIG from ...data_types.custom_fields import Occurence, SCLInfo from ...data_types.smell import SCLSmell from ...utils.smell_enums import CustomSmell +logger = CONFIG["detectLogger"] + def detect_string_concat_in_loop(file_path: Path, tree: nodes.Module): """ @@ -21,34 +25,40 @@ def detect_string_concat_in_loop(file_path: Path, tree: nodes.Module): smells: list[SCLSmell] = [] in_loop_counter = 0 current_loops: list[nodes.NodeNG] = [] - # current_semlls = { var_name : ( index of smell, index of loop )} current_smells: dict[str, tuple[int, int]] = {} + logger.debug(f"Starting analysis of file: {file_path}") + logger.debug( + f"Initial state - smells: {smells}, in_loop_counter: {in_loop_counter}, current_loops: {current_loops}, current_smells: {current_smells}" + ) + def create_smell(node: nodes.Assign): nonlocal current_loops, current_smells + logger.debug(f"Creating smell for node: {node.as_string()}") if node.lineno and node.col_offset: - smells.append( - SCLSmell( - path=str(file_path), - module=file_path.name, - obj=None, - type="performance", - symbol="string-concat-loop", - message="String concatenation inside loop detected", - messageId=CustomSmell.STR_CONCAT_IN_LOOP.value, - confidence="UNDEFINED", - occurences=[create_smell_occ(node)], - additionalInfo=SCLInfo( - innerLoopLine=current_loops[ - current_smells[node.targets[0].as_string()][1] - ].lineno, # type: ignore - concatTarget=node.targets[0].as_string(), - ), - ) + smell = SCLSmell( + path=str(file_path), + module=file_path.name, + obj=None, + type="performance", + symbol="string-concat-loop", + message="String concatenation inside loop detected", + messageId=CustomSmell.STR_CONCAT_IN_LOOP.value, + confidence="UNDEFINED", + occurences=[create_smell_occ(node)], + additionalInfo=SCLInfo( + innerLoopLine=current_loops[ + current_smells[node.targets[0].as_string()][1] + ].lineno, # type: ignore + concatTarget=node.targets[0].as_string(), + ), ) + smells.append(smell) + logger.debug(f"Added smell: {smell}") def create_smell_occ(node: nodes.Assign | nodes.AugAssign) -> Occurence: + logger.debug(f"Creating occurrence for node: {node.as_string()}") return Occurence( line=node.lineno, # type: ignore endLine=node.end_lineno, @@ -59,30 +69,46 @@ def create_smell_occ(node: nodes.Assign | nodes.AugAssign) -> Occurence: def visit(node: nodes.NodeNG): nonlocal smells, in_loop_counter, current_loops, current_smells + logger.debug(f"Visiting node: {node.as_string()}") if isinstance(node, (nodes.For, nodes.While)): in_loop_counter += 1 current_loops.append(node) + logger.debug( + f"Entered loop. in_loop_counter: {in_loop_counter}, current_loops: {current_loops}" + ) + for stmt in node.body: visit(stmt) in_loop_counter -= 1 + logger.debug(f"Exited loop. in_loop_counter: {in_loop_counter}") current_smells = { key: val for key, val in current_smells.items() if val[1] != in_loop_counter } current_loops.pop() + logger.debug( + f"Updated current_smells: {current_smells}, current_loops: {current_loops}" + ) elif in_loop_counter > 0 and isinstance(node, nodes.Assign): target = None value = None - if len(node.targets) == 1 > 1: + if len(node.targets) != 1: + logger.debug(f"Skipping node due to multiple targets: {node.as_string()}") return target = node.targets[0] value = node.value + logger.debug( + f"Processing assignment node. target: {target.as_string()}, value: {value.as_string()}" + ) if target and isinstance(value, nodes.BinOp) and value.op == "+": + logger.debug( + f"Found binary operation with '+' in loop. target: {target.as_string()}, value: {value.as_string()}" + ) if ( target.as_string() not in current_smells and is_string_type(node) @@ -93,11 +119,13 @@ def visit(node: nodes.NodeNG): len(smells), in_loop_counter - 1, ) + logger.debug(f"Adding new smell to current_smells: {current_smells}") create_smell(node) elif target.as_string() in current_smells and is_concatenating_with_self( value, target ): smell_id = current_smells[target.as_string()][0] + logger.debug(f"Updating existing smell with id: {smell_id}") smells[smell_id].occurences.append(create_smell_occ(node)) else: for child in node.get_children(): @@ -106,6 +134,7 @@ def visit(node: nodes.NodeNG): def is_not_referenced(node: nodes.Assign): nonlocal current_loops + logger.debug(f"Checking if node is referenced: {node.as_string()}") loop_source_str = current_loops[-1].as_string() loop_source_str = loop_source_str.replace(node.as_string(), "", 1) lines = loop_source_str.splitlines() @@ -114,11 +143,16 @@ def is_not_referenced(node: nodes.Assign): line.find(node.targets[0].as_string()) != -1 and re.search(rf"\b{re.escape(node.targets[0].as_string())}\b\s*=", line) is None ): + logger.debug(f"Node is referenced in line: {line}") return False + logger.debug("Node is not referenced in loop") return True def is_concatenating_with_self(binop_node: nodes.BinOp, target: nodes.NodeNG): """Check if the BinOp node includes the target variable being added.""" + logger.debug( + f"Checking if binop_node is concatenating with self: {binop_node.as_string()}, target: {target.as_string()}" + ) def is_same_variable(var1: nodes.NodeNG, var2: nodes.NodeNG): if isinstance(var1, nodes.Name) and isinstance(var2, nodes.AssignName): @@ -133,105 +167,248 @@ def is_same_variable(var1: nodes.NodeNG, var2: nodes.NodeNG): return False left, right = binop_node.left, binop_node.right + logger.debug(f"Left: {left.as_string()}, Right: {right.as_string()}") return is_same_variable(left, target) or is_same_variable(right, target) - def is_string_type(node: nodes.Assign) -> bool: + def is_string_type(node: nodes.Assign, visited: set[str] | None = None) -> bool: + """Check if assignment target is inferred to be string type.""" + if visited is None: + visited = set() + target = node.targets[0] + target_name = target.as_string() - # Check type hints first - if has_type_hints_str(node, target): + if target_name in visited: + logger.debug(f"Cycle detected for {target_name}") + return False + visited.add(target_name) + + logger.debug(f"Checking string type for {target_name}") + + # Check explicit type hints first + if has_type_hints_str(node, target, visited): return True - # Infer types - for inferred in target.infer(): - if inferred.repr_name() == "str": - return True - if isinstance(inferred, util.UninferableBase): - print(f"here: {node}") - if has_str_format(node.value) or has_str_interpolation(node.value): + # Check string literals + if isinstance(node.value, nodes.Const) and isinstance(node.value.value, str): + logger.debug(f"String literal found: {node.as_string()}") + return True + + # Check string operations + if has_str_operation(node.value): + logger.debug(f"String operation pattern found: {node.as_string()}") + return True + + # Check inferred type + try: + inferred_types = list(node.value.infer()) + except util.InferenceError: + inferred_types = [util.Uninferable] + + if not any(isinstance(t, util.UninferableBase) for t in inferred_types): + return is_inferred_string(node, inferred_types) + + def get_top_level_rhs_vars(value: nodes.NodeNG) -> list[nodes.NodeNG]: + """Get top-level variables from RHS expression.""" + if isinstance(value, nodes.BinOp): + return get_top_level_rhs_vars(value.left) + get_top_level_rhs_vars(value.right) + else: + return [value] + + # Recursive check for RHS variables + rhs_vars = get_top_level_rhs_vars(node.value) + for rhs_node in rhs_vars: + if isinstance(rhs_node, nodes.Const): + if rhs_node.pytype() == "str": + logger.debug(f"String literal found in RHS: {rhs_node.as_string()}") return True - for var in node.value.nodes_of_class( - (nodes.Name, nodes.Attribute, nodes.Subscript) - ): - if var.as_string() == target.as_string(): - for inferred_target in var.infer(): - if inferred_target.repr_name() == "str": - return True + else: + return False + if isinstance(rhs_node, nodes.Call): + if rhs_node.func.as_string() == "str": + logger.debug(f"str() call found in RHS: {rhs_node.as_string()}") + return True + else: + return False + try: + inferred_types = list(rhs_node.infer()) + except util.InferenceError: + inferred_types = [util.Uninferable] + + if not any(isinstance(t, util.UninferableBase) for t in inferred_types): + return is_inferred_string(rhs_node, inferred_types) + var_name = rhs_node.as_string() + if var_name == target_name: + continue - print(f"Checking type hints for {var}") - if has_type_hints_str(node, var): - return True + logger.debug(f"Checking RHS variable: {var_name}") + if has_type_hints_str(node, rhs_node, visited): # Pass new visited set + return True return False - def has_type_hints_str(context: nodes.NodeNG, target: nodes.NodeNG) -> bool: - """Checks if a variable has an explicit type hint for `str`""" + def is_inferred_string(node: nodes.NodeNG, inferred_types: list[Any]) -> bool: + if all(t.repr_name() == "str" for t in inferred_types): + logger.debug(f"Definitively inferred as string: {node.as_string()}") + return True + else: + logger.debug(f"Definitively non-string: {node.as_string()}") + return False + + def has_type_hints_str(context: nodes.NodeNG, target: nodes.NodeNG, visited: set[str]) -> bool: + """Check for string type hints with simplified subscript handling and scope-aware checks.""" + + def check_annotation(annotation: nodes.NodeNG) -> bool: + """Check if annotation is strictly a string type.""" + annotation_str = annotation.as_string() + + if re.search(r"(^|[^|\w])str($|[^|\w])", annotation_str): + # Ensure it's not part of a union or optional + if not re.search(r"\b(str\s*[|]\s*\w|\w\s*[|]\s*str)\b", annotation_str): + return True + return False + + def is_allowed_target(node: nodes.NodeNG) -> bool: + """Check if target matches allowed patterns: + - self.var + - self.var[subscript] + - var[subscript] + - simple_var + """ + # Case 1: Simple Name (var) + if isinstance(node, (nodes.AssignName, nodes.Name)): + return True + + # Case 2: Direct self attribute (self.var) + if isinstance(node, (nodes.AssignAttr, nodes.Attribute)): + return ( + node.attrname.count(".") == 0 # No nested attributes + and node.expr.as_string() == "self" # Direct self attribute + ) + + # Case 3: Simple subscript (var[sub] or self.var[sub]) + if isinstance(node, nodes.Subscript): + # Check base is allowed + base = node.value + return ( + is_allowed_target(base) # Base must be allowed target + and not has_nested_subscript(base) # No nested subscripts + ) + + return False + + def has_nested_subscript(node: nodes.NodeNG) -> bool: + """Check for nested subscript/attribute structures""" + if isinstance(node, nodes.Subscript): + return not is_allowed_target(node.value) + if isinstance(node, nodes.Attribute): + return node.expr.as_string() != "self" + return False + + target_name = target.as_string() + + # First: Filter complex targets according to rules + if not is_allowed_target(target): + logger.debug(f"Skipping complex target: {target_name}") + return False + + # Handle simple subscript case (single level) + base_name = target.value.as_string() if isinstance(target, nodes.Subscript) else target_name parent = context.scope() - # Function argument type hints + # 1. Check function parameters if isinstance(parent, nodes.FunctionDef) and parent.args.args: - for arg, ann in zip(parent.args.args, parent.args.annotations): - print(f"arg: {arg}, target: {target}, ann: {ann}") - if arg.name == target.as_string() and ann and ann.as_string() == "str": + for arg, ann in zip(parent.args.args, parent.args.annotations or []): + if arg.name == base_name and ann and check_annotation(ann): return True - # Class attributes (annotations in class scope or __init__) - if "self." in target.as_string(): - class_def = parent.frame() - if not isinstance(class_def, nodes.ClassDef): - class_def = next( - ( - ancestor - for ancestor in context.node_ancestors() - if isinstance(ancestor, nodes.ClassDef) - ), - None, - ) + # 2. Check class attributes for self.* targets + if target_name.startswith("self."): + class_def = next( + (n for n in context.node_ancestors() if isinstance(n, nodes.ClassDef)), None + ) + if class_def: + attr_name = target_name.split("self.", 1)[1].split("[")[0] + try: + for attr in class_def.instance_attr(attr_name): + if isinstance(attr, nodes.AnnAssign) and check_annotation(attr.annotation): + return True + else: + try: + inferred_types = list(attr.infer()) + except util.InferenceError: + inferred_types = [util.Uninferable] + + if not any(isinstance(t, util.UninferableBase) for t in inferred_types): + return is_inferred_string(attr, inferred_types) + return has_type_hints_str(attr, target, visited) + except AttributeInferenceError: + pass + + def get_ordered_scope_nodes( + scope: nodes.NodeNG, target: nodes.NodeNG + ) -> list[nodes.NodeNG]: + """Get all nodes in scope in execution order, flattening nested blocks.""" + nodes_list = [] + for child in scope.body: + # Recursively flatten block nodes (loops, ifs, etc) + if child.lineno >= target.lineno: # type: ignore + break + if isinstance(child, (nodes.For, nodes.While, nodes.If)): + nodes_list.extend(get_ordered_scope_nodes(child, target)) + else: + nodes_list.append(child) + return nodes_list + + try: + current_lineno = target.lineno + scope_nodes = get_ordered_scope_nodes(parent, target) + + for child in scope_nodes: + if child.lineno >= current_lineno: # type: ignore + break + + # Check AnnAssigns + if isinstance(child, nodes.AnnAssign): + if ( + isinstance(child.target, nodes.AssignName) + and child.target.name == target_name + and check_annotation(child.annotation) + ): + return True - if class_def: - attr_name = target.as_string().replace("self.", "") - try: - for attr in class_def.instance_attr(attr_name): - if ( - isinstance(attr, nodes.AnnAssign) - and attr.annotation.as_string() == "str" - ): - return True - if any(inf.repr_name() == "str" for inf in attr.infer()): - return True - except AttributeInferenceError: - pass - - # Global/scope variable annotations before assignment - for child in parent.nodes_of_class((nodes.AnnAssign, nodes.Assign)): - if child == context: - break - if ( - isinstance(child, nodes.AnnAssign) - and child.target.as_string() == target.as_string() - ): - return child.annotation.as_string() == "str" - print("checking var types") - if isinstance(child, nodes.Assign) and is_string_type(child): - return True + for child in scope_nodes: + if child.lineno >= current_lineno: # type: ignore + break + + # Check Assigns + if isinstance(child, nodes.Assign): + if any( + isinstance(t, nodes.AssignName) and t.name == target_name + for t in child.targets + ): + if is_string_type(child, visited): + return True + except (ValueError, AttributeError): + pass return False - def has_str_format(node: nodes.NodeNG): - if isinstance(node, nodes.BinOp) and node.op == "+": - str_repr = node.as_string() - match = re.search("{.*}", str_repr) - if match: + def has_str_operation(node: nodes.NodeNG) -> bool: + """Check for string-specific operations.""" + if isinstance(node, nodes.JoinedStr): + logger.debug(f"Found f-string: {node.as_string()}") + return True + + if isinstance(node, nodes.Call) and isinstance(node.func, nodes.Attribute): + if node.func.attrname == "format": + logger.debug(f"Found .format() call: {node.as_string()}") return True - return False + if isinstance(node, nodes.BinOp) and node.op == "%": + logger.debug(f"Found % formatting: {node.as_string()}") + return True - def has_str_interpolation(node: nodes.NodeNG): - if isinstance(node, nodes.BinOp) and node.op == "+": - str_repr = node.as_string() - match = re.search("%[a-z]", str_repr) - if match: - return True return False def transform_augassign_to_assign(code_file: str): @@ -241,6 +418,7 @@ def transform_augassign_to_assign(code_file: str): :param code_file: The source code file as a string :return: The same string source code with all AugAssign stmts changed to Assign """ + logger.debug("Transforming AugAssign to Assign in code file") str_code = code_file.splitlines() for i in range(len(str_code)): @@ -253,14 +431,18 @@ def transform_augassign_to_assign(code_file: str): # Replace '+=' with '=' to form an Assign string str_code[i] = str_code[i].replace("+=", f"= {target_var} +", 1) + logger.debug(f"Transformed line {i}: {str_code[i]}") return "\n".join(str_code) # Change all AugAssigns to Assigns + logger.debug(f"Transforming AugAssign to Assign in file: {file_path}") tree = parse(transform_augassign_to_assign(file_path.read_text())) - # Start traversal + # Entry Point + logger.debug("Starting AST traversal") for child in tree.get_children(): visit(child) + logger.debug(f"Analysis complete. Detected smells: {smells}") return smells diff --git a/tests/analyzers/test_str_concat_analyzer.py b/tests/analyzers/test_str_concat_analyzer.py index 15b9f11d..f385b433 100644 --- a/tests/analyzers/test_str_concat_analyzer.py +++ b/tests/analyzers/test_str_concat_analyzer.py @@ -493,7 +493,7 @@ def test(a, b): assert smells[0].additionalInfo.innerLoopLine == 4 -def test_detects_cls_attr_type_hint_concat(): +def test_detects_instance_attr_type_hint_concat(): """Detects string concats where type is inferred from class attributes.""" code = """ class Test: @@ -520,6 +520,31 @@ def test(self, a): assert smells[0].additionalInfo.innerLoopLine == 9 +def test_detects_cls_attr_type_hint_concat(): + """Detects string concats where type is inferred from class attributes.""" + code = """ + class Test: + text = "word" + + def test(self, a): + result = a + for i in range(5): + result = result + self.text + + a = Test() + a.test("this ") + """ + with patch.object(Path, "read_text", return_value=code): + smells = detect_string_concat_in_loop(Path("fake.py"), parse(code)) + + assert len(smells) == 1 + assert isinstance(smells[0], SCLSmell) + + assert len(smells[0].occurences) == 1 + assert smells[0].additionalInfo.concatTarget == "result" + assert smells[0].additionalInfo.innerLoopLine == 7 + + def test_detects_inferred_str_type_concat(): """Detects string concat where type is inferred from the initial value assigned.""" code = """ From 4f93b144e821a398b171510754ed9af77afee791 Mon Sep 17 00:00:00 2001 From: Sevhena Walker <83547364+Sevhena@users.noreply.github.com> Date: Wed, 19 Mar 2025 22:38:34 -0400 Subject: [PATCH 2/3] logging changes and fixes remove print statements or replaced them with log messages filtered out debug logs from being sent over websocket to prevent overloading the network --- .../detect_long_element_chain.py | 2 - src/ecooptimizer/api/routes/refactor_smell.py | 46 ++++++++----------- src/ecooptimizer/api/routes/show_logs.py | 13 ++++-- .../concrete/long_element_chain.py | 2 +- 4 files changed, 30 insertions(+), 33 deletions(-) diff --git a/src/ecooptimizer/analyzers/ast_analyzers/detect_long_element_chain.py b/src/ecooptimizer/analyzers/ast_analyzers/detect_long_element_chain.py index 3fa39d86..ae729adb 100644 --- a/src/ecooptimizer/analyzers/ast_analyzers/detect_long_element_chain.py +++ b/src/ecooptimizer/analyzers/ast_analyzers/detect_long_element_chain.py @@ -35,11 +35,9 @@ def check_chain(node: ast.Subscript, chain_length: int = 0): chain_length += 1 current = current.value - print(chain_length) if chain_length >= threshold: # Create a descriptive message for the detected long chain message = f"Dictionary chain too long ({chain_length}/{threshold})" - print(node.lineno) # Instantiate a Smell object with details about the detected issue smell = LECSmell( path=str(file_path), diff --git a/src/ecooptimizer/api/routes/refactor_smell.py b/src/ecooptimizer/api/routes/refactor_smell.py index ae762401..799700a5 100644 --- a/src/ecooptimizer/api/routes/refactor_smell.py +++ b/src/ecooptimizer/api/routes/refactor_smell.py @@ -15,6 +15,8 @@ from ...measurements.codecarbon_energy_meter import CodeCarbonEnergyMeter from ...data_types.smell import Smell +logger = CONFIG["refactorLogger"] + router = APIRouter() analyzer_controller = AnalyzerController() refactorer_controller = RefactorerController() @@ -46,33 +48,29 @@ class RefactorResModel(BaseModel): @router.post("/refactor", response_model=RefactorResModel) def refactor(request: RefactorRqModel): """Handles the refactoring process for a given smell.""" - CONFIG["refactorLogger"].info(f"{'=' * 100}") - CONFIG["refactorLogger"].info("🔄 Received refactor request.") + logger.info(f"{'=' * 100}") + logger.info("🔄 Received refactor request.") try: - CONFIG["refactorLogger"].info( - f"🔍 Analyzing smell: {request.smell.symbol} in {request.source_dir}" - ) + logger.info(f"🔍 Analyzing smell: {request.smell.symbol} in {request.source_dir}") refactor_data, updated_smells = perform_refactoring(Path(request.source_dir), request.smell) - CONFIG["refactorLogger"].info( - f"✅ Refactoring process completed. Updated smells: {len(updated_smells)}" - ) + logger.info(f"✅ Refactoring process completed. Updated smells: {len(updated_smells)}") if refactor_data: refactor_data = clean_refactored_data(refactor_data) - CONFIG["refactorLogger"].info(f"{'=' * 100}\n") + logger.info(f"{'=' * 100}\n") return RefactorResModel(refactoredData=refactor_data, updatedSmells=updated_smells) - CONFIG["refactorLogger"].info(f"{'=' * 100}\n") + logger.info(f"{'=' * 100}\n") return RefactorResModel(updatedSmells=updated_smells) except OSError as e: - CONFIG["refactorLogger"].error(f"❌ OS error: {e!s}") + logger.error(f"❌ OS error: {e!s}") raise HTTPException(status_code=404, detail=str(e)) from e except Exception as e: - CONFIG["refactorLogger"].error(f"❌ Refactoring error: {e!s}") - CONFIG["refactorLogger"].info(f"{'=' * 100}\n") + logger.error(f"❌ Refactoring error: {e!s}") + logger.info(f"{'=' * 100}\n") raise HTTPException(status_code=400, detail=str(e)) from e @@ -80,21 +78,21 @@ def perform_refactoring(source_dir: Path, smell: Smell): """Executes the refactoring process for a given smell.""" target_file = Path(smell.path) - CONFIG["refactorLogger"].info( + logger.info( f"🚀 Starting refactoring for {smell.symbol} at line {smell.occurences[0].line} in {target_file}" ) if not source_dir.is_dir(): - CONFIG["refactorLogger"].error(f"❌ Directory does not exist: {source_dir}") + logger.error(f"❌ Directory does not exist: {source_dir}") raise OSError(f"Directory {source_dir} does not exist.") initial_emissions = measure_energy(target_file) if not initial_emissions: - CONFIG["refactorLogger"].error("❌ Could not retrieve initial emissions.") + logger.error("❌ Could not retrieve initial emissions.") raise RuntimeError("Could not retrieve initial emissions.") - CONFIG["refactorLogger"].info(f"📊 Initial emissions: {initial_emissions} kg CO2") + logger.info(f"📊 Initial emissions: {initial_emissions} kg CO2") temp_dir = mkdtemp(prefix="ecooptimizer-") source_copy = Path(temp_dir) / source_dir.name @@ -120,25 +118,21 @@ def perform_refactoring(source_dir: Path, smell: Smell): if not final_emissions: print("❌ Could not retrieve final emissions. Discarding refactoring.") - CONFIG["refactorLogger"].error( - "❌ Could not retrieve final emissions. Discarding refactoring." - ) + logger.error("❌ Could not retrieve final emissions. Discarding refactoring.") shutil.rmtree(temp_dir, onerror=remove_readonly) raise RuntimeError("Could not retrieve final emissions.") if CONFIG["mode"] == "production" and final_emissions >= initial_emissions: - CONFIG["refactorLogger"].info(f"📊 Final emissions: {final_emissions} kg CO2") - CONFIG["refactorLogger"].info("⚠️ No measured energy savings. Discarding refactoring.") + logger.info(f"📊 Final emissions: {final_emissions} kg CO2") + logger.info("⚠️ No measured energy savings. Discarding refactoring.") print("❌ Could not retrieve final emissions. Discarding refactoring.") shutil.rmtree(temp_dir, onerror=remove_readonly) raise EnergySavingsError(str(target_file), "Energy was not saved after refactoring.") - CONFIG["refactorLogger"].info( - f"✅ Energy saved! Initial: {initial_emissions}, Final: {final_emissions}" - ) + logger.info(f"✅ Energy saved! Initial: {initial_emissions}, Final: {final_emissions}") refactor_data = { "tempDir": temp_dir, @@ -188,5 +182,5 @@ def clean_refactored_data(refactor_data: dict[str, Any]): ], ) except KeyError as e: - CONFIG["refactorLogger"].error(f"❌ Missing expected key in refactored data: {e}") + logger.error(f"❌ Missing expected key in refactored data: {e}") raise HTTPException(status_code=500, detail=f"Missing key: {e}") from e diff --git a/src/ecooptimizer/api/routes/show_logs.py b/src/ecooptimizer/api/routes/show_logs.py index d9b1b647..7e689978 100644 --- a/src/ecooptimizer/api/routes/show_logs.py +++ b/src/ecooptimizer/api/routes/show_logs.py @@ -2,6 +2,7 @@ import asyncio from pathlib import Path +import re from fastapi import APIRouter, WebSocketException from fastapi.websockets import WebSocketState, WebSocket, WebSocketDisconnect from pydantic import BaseModel @@ -68,16 +69,20 @@ async def websocket_log_stream(websocket: WebSocket, log_file: Path): try: with log_file.open(encoding="utf-8") as file: - file.seek(0, 2) # Start at file end + file.seek(0, 2) # Seek to end of the file while not listener_task.done(): if websocket.application_state != WebSocketState.CONNECTED: raise WebSocketDisconnect(reason="Connection closed") - line = file.readline() if line: - await websocket.send_text(line) + match = re.search(r"\[\b(ERROR|DEBUG|INFO|WARNING|TRACE)\b]", line) + if match: + level = match.group(1) + + if level in ("INFO", "WARNING", "ERROR", "CRITICAL"): + await websocket.send_text(line) else: - await asyncio.sleep(0.5) + await asyncio.sleep(0.1) # Short sleep when no new lines except FileNotFoundError: await websocket.send_text("Error: Log file not found.") except WebSocketDisconnect as e: diff --git a/src/ecooptimizer/refactorers/concrete/long_element_chain.py b/src/ecooptimizer/refactorers/concrete/long_element_chain.py index dc246e3d..b38df65c 100644 --- a/src/ecooptimizer/refactorers/concrete/long_element_chain.py +++ b/src/ecooptimizer/refactorers/concrete/long_element_chain.py @@ -124,7 +124,7 @@ def _find_access_pattern_in_file(self, tree: ast.AST, path: Path): dict_name, full_access, nesting_level, line_number, col_offset, path, node ) self.access_patterns.add(access) - print(self.access_patterns) + # print(self.access_patterns) self.min_value = min(self.min_value, nesting_level) def extract_full_dict_access(self, node: ast.Subscript): From c52118245bd5368c6635680bb1caedd01d0c7fa4 Mon Sep 17 00:00:00 2001 From: Sevhena Walker <83547364+Sevhena@users.noreply.github.com> Date: Fri, 21 Mar 2025 22:35:01 -0400 Subject: [PATCH 3/3] Made additional fixes + added tests --- .../detect_string_concat_in_loop.py | 171 ++++++++++-------- tests/analyzers/test_str_concat_analyzer.py | 136 ++++++++++++-- 2 files changed, 211 insertions(+), 96 deletions(-) diff --git a/src/ecooptimizer/analyzers/astroid_analyzers/detect_string_concat_in_loop.py b/src/ecooptimizer/analyzers/astroid_analyzers/detect_string_concat_in_loop.py index 7be20d66..05a8c125 100644 --- a/src/ecooptimizer/analyzers/astroid_analyzers/detect_string_concat_in_loop.py +++ b/src/ecooptimizer/analyzers/astroid_analyzers/detect_string_concat_in_loop.py @@ -1,7 +1,7 @@ from pathlib import Path import re from typing import Any -from astroid import nodes, util, parse, AttributeInferenceError +from astroid import nodes, util, parse, extract_node, AttributeInferenceError from ...config import CONFIG from ...data_types.custom_fields import Occurence, SCLInfo @@ -170,35 +170,36 @@ def is_same_variable(var1: nodes.NodeNG, var2: nodes.NodeNG): logger.debug(f"Left: {left.as_string()}, Right: {right.as_string()}") return is_same_variable(left, target) or is_same_variable(right, target) - def is_string_type(node: nodes.Assign, visited: set[str] | None = None) -> bool: + def is_string_type( + node: nodes.Assign, visited: set[tuple[str, nodes.NodeNG]] | None = None + ) -> bool: """Check if assignment target is inferred to be string type.""" if visited is None: visited = set() target = node.targets[0] target_name = target.as_string() + scope = node.scope() - if target_name in visited: + if (target_name, scope) in visited: logger.debug(f"Cycle detected for {target_name}") return False - visited.add(target_name) logger.debug(f"Checking string type for {target_name}") # Check explicit type hints first + logger.debug("Checking explicit type hints") if has_type_hints_str(node, target, visited): return True - # Check string literals - if isinstance(node.value, nodes.Const) and isinstance(node.value.value, str): - logger.debug(f"String literal found: {node.as_string()}") - return True + visited.add((target_name, scope)) - # Check string operations - if has_str_operation(node.value): - logger.debug(f"String operation pattern found: {node.as_string()}") + # Check for string format with % operator + if has_percent_format(node.value): + logger.debug(f"String format with % operator found: {node.as_string()}") return True + logger.debug("Checking inferred types") # Check inferred type try: inferred_types = list(node.value.infer()) @@ -217,19 +218,19 @@ def get_top_level_rhs_vars(value: nodes.NodeNG) -> list[nodes.NodeNG]: # Recursive check for RHS variables rhs_vars = get_top_level_rhs_vars(node.value) + logger.debug(f"RHS Vars: {rhs_vars}") for rhs_node in rhs_vars: if isinstance(rhs_node, nodes.Const): - if rhs_node.pytype() == "str": + if rhs_node.pytype() == "builtins.str": logger.debug(f"String literal found in RHS: {rhs_node.as_string()}") return True else: return False - if isinstance(rhs_node, nodes.Call): - if rhs_node.func.as_string() == "str": - logger.debug(f"str() call found in RHS: {rhs_node.as_string()}") - return True - else: - return False + + if has_str_operation(rhs_node): + logger.debug(f"String operation found in RHS: {rhs_node.as_string()}") + return True + try: inferred_types = list(rhs_node.infer()) except util.InferenceError: @@ -237,6 +238,7 @@ def get_top_level_rhs_vars(value: nodes.NodeNG) -> list[nodes.NodeNG]: if not any(isinstance(t, util.UninferableBase) for t in inferred_types): return is_inferred_string(rhs_node, inferred_types) + var_name = rhs_node.as_string() if var_name == target_name: continue @@ -255,7 +257,9 @@ def is_inferred_string(node: nodes.NodeNG, inferred_types: list[Any]) -> bool: logger.debug(f"Definitively non-string: {node.as_string()}") return False - def has_type_hints_str(context: nodes.NodeNG, target: nodes.NodeNG, visited: set[str]) -> bool: + def has_type_hints_str( + context: nodes.NodeNG, target: nodes.NodeNG, visited: set[tuple[str, nodes.NodeNG]] + ) -> bool: """Check for string type hints with simplified subscript handling and scope-aware checks.""" def check_annotation(annotation: nodes.NodeNG) -> bool: @@ -275,36 +279,28 @@ def is_allowed_target(node: nodes.NodeNG) -> bool: - var[subscript] - simple_var """ + logger.debug(f"Checking if target is allowed: {node}") + node_string = node.as_string() + + if node_string.startswith("self."): + base_var = extract_node(node_string.removeprefix("self.")) + if isinstance(base_var, nodes.NodeNG): + return is_allowed_target(base_var) + # Case 1: Simple Name (var) if isinstance(node, (nodes.AssignName, nodes.Name)): return True # Case 2: Direct self attribute (self.var) if isinstance(node, (nodes.AssignAttr, nodes.Attribute)): - return ( - node.attrname.count(".") == 0 # No nested attributes - and node.expr.as_string() == "self" # Direct self attribute - ) + return node.expr.as_string().count(".") == 0 # Case 3: Simple subscript (var[sub] or self.var[sub]) if isinstance(node, nodes.Subscript): - # Check base is allowed - base = node.value - return ( - is_allowed_target(base) # Base must be allowed target - and not has_nested_subscript(base) # No nested subscripts - ) + return isinstance(node.value, nodes.Name) return False - def has_nested_subscript(node: nodes.NodeNG) -> bool: - """Check for nested subscript/attribute structures""" - if isinstance(node, nodes.Subscript): - return not is_allowed_target(node.value) - if isinstance(node, nodes.Attribute): - return node.expr.as_string() != "self" - return False - target_name = target.as_string() # First: Filter complex targets according to rules @@ -312,8 +308,12 @@ def has_nested_subscript(node: nodes.NodeNG) -> bool: logger.debug(f"Skipping complex target: {target_name}") return False - # Handle simple subscript case (single level) - base_name = target.value.as_string() if isinstance(target, nodes.Subscript) else target_name + # Get the object name of the subscripted target + base_name = ( + target.value.as_string().partition("[")[0] + if isinstance(target, nodes.Subscript) + else target_name + ) parent = context.scope() # 1. Check function parameters @@ -323,7 +323,7 @@ def has_nested_subscript(node: nodes.NodeNG) -> bool: return True # 2. Check class attributes for self.* targets - if target_name.startswith("self."): + if not context.as_string().startswith("self.") and target_name.startswith("self."): class_def = next( (n for n in context.node_ancestors() if isinstance(n, nodes.ClassDef)), None ) @@ -331,17 +331,22 @@ def has_nested_subscript(node: nodes.NodeNG) -> bool: attr_name = target_name.split("self.", 1)[1].split("[")[0] try: for attr in class_def.instance_attr(attr_name): - if isinstance(attr, nodes.AnnAssign) and check_annotation(attr.annotation): + assign = attr.parent + if isinstance(assign, nodes.AnnAssign) and check_annotation( + assign.annotation + ): return True - else: + elif isinstance(assign, nodes.Assign): try: - inferred_types = list(attr.infer()) + inferred_types = list(assign.value.infer()) except util.InferenceError: inferred_types = [util.Uninferable] if not any(isinstance(t, util.UninferableBase) for t in inferred_types): - return is_inferred_string(attr, inferred_types) - return has_type_hints_str(attr, target, visited) + return is_inferred_string(assign, inferred_types) + return is_string_type(assign, visited) + else: + return False except AttributeInferenceError: pass @@ -356,46 +361,57 @@ def get_ordered_scope_nodes( break if isinstance(child, (nodes.For, nodes.While, nodes.If)): nodes_list.extend(get_ordered_scope_nodes(child, target)) - else: + elif isinstance(child, (nodes.Assign, nodes.AnnAssign)): nodes_list.append(child) return nodes_list - try: - current_lineno = target.lineno - scope_nodes = get_ordered_scope_nodes(parent, target) + scope_nodes = get_ordered_scope_nodes(parent, target) - for child in scope_nodes: - if child.lineno >= current_lineno: # type: ignore - break + # Check for type hints in scope + for child in scope_nodes: + if isinstance(child, nodes.AnnAssign): + if ( + isinstance(child.target, nodes.AssignName) + and child.target.name == target_name + and check_annotation(child.annotation) + ): + return True - # Check AnnAssigns - if isinstance(child, nodes.AnnAssign): - if ( - isinstance(child.target, nodes.AssignName) - and child.target.name == target_name - and check_annotation(child.annotation) - ): - return True - - for child in scope_nodes: - if child.lineno >= current_lineno: # type: ignore - break + # Check for Assigns in scope + previous_assign = next( + ( + child + for child in reversed(scope_nodes) + if isinstance(child, nodes.Assign) + and any(target.as_string() == target_name for target in child.targets) + ), + None, + ) - # Check Assigns - if isinstance(child, nodes.Assign): - if any( - isinstance(t, nodes.AssignName) and t.name == target_name - for t in child.targets - ): - if is_string_type(child, visited): - return True - except (ValueError, AttributeError): - pass + if previous_assign: + if is_string_type(previous_assign, visited): + return True + + return False + + def has_percent_format(node: nodes.NodeNG) -> bool: + """ + Check if a node contains % string formatting by traversing BinOp structure. + Handles nested binary operations and ensures % is found at any level. + """ + if isinstance(node, nodes.BinOp): + left = node.left + if node.op == "%": + if isinstance(left, nodes.Const) and isinstance(left.value, str): + return True + if isinstance(node.right, nodes.BinOp): + return has_percent_format(node.right) return False def has_str_operation(node: nodes.NodeNG) -> bool: """Check for string-specific operations.""" + logger.debug(f"Checking string operation for node: {node}") if isinstance(node, nodes.JoinedStr): logger.debug(f"Found f-string: {node.as_string()}") return True @@ -405,9 +421,10 @@ def has_str_operation(node: nodes.NodeNG) -> bool: logger.debug(f"Found .format() call: {node.as_string()}") return True - if isinstance(node, nodes.BinOp) and node.op == "%": - logger.debug(f"Found % formatting: {node.as_string()}") - return True + if isinstance(node, nodes.Call) and isinstance(node.func, nodes.Name): + if node.func.name == "str": + logger.debug(f"Found str() call: {node.as_string()}") + return True return False diff --git a/tests/analyzers/test_str_concat_analyzer.py b/tests/analyzers/test_str_concat_analyzer.py index f385b433..a3c1834d 100644 --- a/tests/analyzers/test_str_concat_analyzer.py +++ b/tests/analyzers/test_str_concat_analyzer.py @@ -109,6 +109,44 @@ def update(self): assert smells[0].additionalInfo.innerLoopLine == 6 +def test_complex_object_sub_concat(): + """Detects += modifying a complex subscript object inside a loop.""" + code = """ + def update(): + val = {"key": ["word1", "word2"]} + for i in range(5): + val["key"][1] += str(i) + """ + with patch.object(Path, "read_text", return_value=code): + smells = detect_string_concat_in_loop(Path("fake.py"), parse(code)) + + assert len(smells) == 1 + assert isinstance(smells[0], SCLSmell) + + assert len(smells[0].occurences) == 1 + assert smells[0].additionalInfo.concatTarget == "val['key'][1]" + assert smells[0].additionalInfo.innerLoopLine == 4 + + +def test_complex_object_attr_concat(): + """Detects += modifying a complex attribute object inside a loop.""" + code = """ + def update(): + val = RandomeClass() + for i in range(5): + val.attr1.attr2 += str(i) + """ + with patch.object(Path, "read_text", return_value=code): + smells = detect_string_concat_in_loop(Path("fake.py"), parse(code)) + + assert len(smells) == 1 + assert isinstance(smells[0], SCLSmell) + + assert len(smells[0].occurences) == 1 + assert smells[0].additionalInfo.concatTarget == "val.attr1.attr2" + assert smells[0].additionalInfo.innerLoopLine == 4 + + def test_detects_dict_value_concat(): """Detects += modifying a dictionary value inside a loop.""" code = """ @@ -181,11 +219,10 @@ def reset(): def test_detects_nested_loop_concat(): """Detects concatenation inside nested loops.""" code = """ - def test(): - result = "" + def test(result): for i in range(3): for j in range(3): - result += str(j) + result += "hi" """ with patch.object(Path, "read_text", return_value=code): smells = detect_string_concat_in_loop(Path("fake.py"), parse(code)) @@ -195,7 +232,7 @@ def test(): assert len(smells[0].occurences) == 1 assert smells[0].additionalInfo.concatTarget == "result" - assert smells[0].additionalInfo.innerLoopLine == 5 + assert smells[0].additionalInfo.innerLoopLine == 4 def test_detects_complex_nested_loop_concat(): @@ -250,8 +287,7 @@ def test(): def test_detects_f_string_concat(): """Detects += using f-strings inside a loop.""" code = """ - def test(): - result = "" + def test(result): for i in range(5): result += f"{i}" """ @@ -263,14 +299,13 @@ def test(): assert len(smells[0].occurences) == 1 assert smells[0].additionalInfo.concatTarget == "result" - assert smells[0].additionalInfo.innerLoopLine == 4 + assert smells[0].additionalInfo.innerLoopLine == 3 def test_detects_percent_format_concat(): """Detects += using % formatting inside a loop.""" code = """ - def test(): - result = "" + def test(result): for i in range(5): result += "%d" % i """ @@ -282,14 +317,13 @@ def test(): assert len(smells[0].occurences) == 1 assert smells[0].additionalInfo.concatTarget == "result" - assert smells[0].additionalInfo.innerLoopLine == 4 + assert smells[0].additionalInfo.innerLoopLine == 3 def test_detects_str_format_concat(): """Detects += using .format() inside a loop.""" code = """ - def test(): - result = "" + def test(result): for i in range(5): result += "{}".format(i) """ @@ -301,7 +335,7 @@ def test(): assert len(smells[0].occurences) == 1 assert smells[0].additionalInfo.concatTarget == "result" - assert smells[0].additionalInfo.innerLoopLine == 4 + assert smells[0].additionalInfo.innerLoopLine == 3 # === False Positives (Should NOT Detect) === @@ -350,6 +384,19 @@ def test(): assert len(smells) == 0 +def test_ignores_inferred_number_addition_inside_loop(): + """Ensures number operations with the += format are NOT flagged.""" + code = """ + def test(num): + for i in range(5): + num += 1 + """ + with patch.object(Path, "read_text", return_value=code): + smells = detect_string_concat_in_loop(Path("fake.py"), parse(code)) + + assert len(smells) == 0 + + def test_ignores_concat_outside_loop(): """Ensures that string concatenation OUTSIDE a loop is NOT flagged.""" code = """ @@ -371,8 +418,7 @@ def test(): def test_detects_sequential_concat(): """Detects a variable concatenated multiple times in the same loop iteration.""" code = """ - def test(): - result = "" + def test(result): for i in range(5): result += str(i) result += "-" @@ -385,7 +431,7 @@ def test(): assert len(smells[0].occurences) == 2 assert smells[0].additionalInfo.concatTarget == "result" - assert smells[0].additionalInfo.innerLoopLine == 4 + assert smells[0].additionalInfo.innerLoopLine == 3 def test_detects_concat_with_prefix_and_suffix(): @@ -493,8 +539,8 @@ def test(a, b): assert smells[0].additionalInfo.innerLoopLine == 4 -def test_detects_instance_attr_type_hint_concat(): - """Detects string concats where type is inferred from class attributes.""" +def test_detects_instance_attr_inferred_concat(): + """Detects string concats where type is inferred from instance attributes.""" code = """ class Test: @@ -520,7 +566,7 @@ def test(self, a): assert smells[0].additionalInfo.innerLoopLine == 9 -def test_detects_cls_attr_type_hint_concat(): +def test_detects_inferred_cls_attr_concat(): """Detects string concats where type is inferred from class attributes.""" code = """ class Test: @@ -545,6 +591,58 @@ def test(self, a): assert smells[0].additionalInfo.innerLoopLine == 7 +def test_detects_instance_attr_type_hint_concat(): + """Detects string concats where type is taken the instance attribute type hint.""" + code = """ + class Test: + def __init__(self, word): + self.text: str = word + + def test(self, a): + result = a + for i in range(5): + result = result + self.text + + a = Test() + a.test("this ") + """ + with patch.object(Path, "read_text", return_value=code): + smells = detect_string_concat_in_loop(Path("fake.py"), parse(code)) + + assert len(smells) == 1 + assert isinstance(smells[0], SCLSmell) + + assert len(smells[0].occurences) == 1 + assert smells[0].additionalInfo.concatTarget == "result" + assert smells[0].additionalInfo.innerLoopLine == 8 + + +def test_detects_inst_attr_init_hint_concat(): + """Detects string concats where type is taken from constructor attribute type hint.""" + code = """ + class Test: + def __init__(self, word: str): + self.text = word + + def test(self, a): + result = a + for i in range(5): + result = result + self.text + + a = Test() + a.test("this ") + """ + with patch.object(Path, "read_text", return_value=code): + smells = detect_string_concat_in_loop(Path("fake.py"), parse(code)) + + assert len(smells) == 1 + assert isinstance(smells[0], SCLSmell) + + assert len(smells[0].occurences) == 1 + assert smells[0].additionalInfo.concatTarget == "result" + assert smells[0].additionalInfo.innerLoopLine == 8 + + def test_detects_inferred_str_type_concat(): """Detects string concat where type is inferred from the initial value assigned.""" code = """