- Applied security fixes to: fix_cycle.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/src/agentic_security/fix_cycle.py: Added:
- path = Path(f).resolve(strict=True) # strict=True ensures all parts exist...
- try:...
- path.relative_to(base_dir)...
- except ValueError:...
- preexec_fn=os.setsid, # Ensure process group isolation...
- umask=0o077 # Restrictive file creation mask...
- if not temp_path.parent.samefile(changelog_path.parent):...
- raise ValueError("Temporary file must be in same directory as changelog")...
- with open(temp_path, 'r') as f:...
- if f.read() != safe_entry:...
- os.chmod(temp_path, 0o644)...
Modified:
- Changed: raise ValueError(f"File not found: {f}") → raise ValueError("File content verification failed")
- Changed: start_new_session=True # Isolate the process group → start_new_session=True, # Isolate the process group
- Changed: fd = os.open(temp_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) → fd = os.open(temp_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600)
Removed:
- path = Path(f).resolve()
- if not str(path).startswith(str(base_dir)):
- if not path.exists():
- Applied security fixes to: /workspaces/agentic-security/tests/test_features.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/test_features.py:Failed to generate diff summary: list index out of range
- Applied security fixes to: /workspaces/agentic-security/tests/samples/app.py, /workspaces/agentic-security/tests/samples/auth.py, /workspaces/agentic-security/tests/samples/api.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples/api.py: Added:
- r'<!ENTITY',...
- r'<!DOCTYPE',...
- r'<!ELEMENT',...
- r'<!ATTLIST',...
- r'<?xml-stylesheet',...
- r'data:',...
- r'file:',...
- r'gopher:',...
- r'http:',...
- r'ftp:'...
- ]...
- if not isinstance(response_data, str):...
- parse_float=decimal.Decimal, # Use Decimal for precise floating point...
- parse_constant=lambda x: ValueError(f'Invalid constant {x}') # Reject inf/nan...
- if not isinstance(parsed_data, (dict, list)):...
- New import: from typing import Union, Dict, List, Any
- ALLOWED_COMMANDS = {'ls', 'dir', 'echo', 'pwd'}...
- base_cmd = cmd_args[0].lower()...
- if base_cmd not in ALLOWED_COMMANDS:...
- for arg in cmd_args[1:]:...
- if arg.startswith('-'):...
- if any(c in arg for c in ';&|$()`'):...
- cmd_args,...
- text=True,...
- shell=False, # Prevent shell injection...
- timeout=10, # Prevent hanging...
- check=True # Raise on non-zero exit...
Modified:
- Changed: """Secure XML parsing with XXE protection""" → """Secure XML parsing with XXE protection and strict validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure response handling with validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure command execution with strict validation"""
- Changed: # Additional input validation → # Comprehensive input validation
- Changed: # Additional input validation → # Additional argument validation
- Changed: # Reject XML strings with potential malicious patterns → """Secure XML parsing with XXE protection and strict validation"""
- Changed: # Reject XML strings with potential malicious patterns → raise ValueError(f"Invalid XML string: Potentially malicious content detected: {pattern}")
- Changed: malicious_patterns = [r'<!ENTITY', r'<!DOCTYPE'] → malicious_patterns = [
- Changed: raise ValueError("Invalid XML string: Potential XXE attack detected") → raise ValueError(f"Invalid XML string: Potentially malicious content detected: {pattern}")
- Changed: raise ValueError("Invalid XML string: Potential XXE attack detected") → raise ValueError(f"Invalid character in argument: {arg}")
- Changed: # Handle invalid input with proper error logging → logging.error("Invalid input type for XML parsing")
- Changed: logging.error(f"Error: {e}") → logging.error("Invalid input type for XML parsing")
- Changed: logging.error(f"Error: {e}") → logging.error(f"XML Validation Error: {str(e)}")
- Changed: logging.error(f"Error: {e}") → logging.error("Invalid response data type")
- Changed: logging.error(f"Error: {e}") → logging.error(f"JSON Processing Error: {str(e)}")
- Changed: logging.error(f"Error: {e}") → logging.error("Invalid command type")
- Changed: logging.error(f"Error: {e}") → logging.error(f"Command Execution Error: {str(e)}")
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: """Secure response handling""" → """Secure response handling with validation"""
- Changed: return json.loads(response_data) → parsed_data = json.loads(
- Changed: return json.loads(response_data) → response_data,
- Changed: return json.loads(response_data) → return parsed_data
- Changed: except json.JSONDecodeError: → except (json.JSONDecodeError, ValueError) as e:
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: def parse_xml(xml_string): → if not isinstance(xml_string, str):
- Changed: # Secure XML parsing → # Set strict parsing options
- Changed: return ET.fromstring(xml_string, parser=parser) → if not isinstance(xml_string, str):
- Changed: import requests → import json
- Changed: import requests → import decimal
- Changed: import requests → import subprocess
- Changed: import requests → import shlex
- Changed: import requests → import re
- Changed: import subprocess → import json
- Changed: import subprocess → import decimal
- Changed: import subprocess → import logging
- Changed: import subprocess → import subprocess
- Changed: import subprocess → import shlex
- Changed: import subprocess → import re
- Changed: import subprocess → result = subprocess.run(
- Changed: import subprocess → stdout=subprocess.PIPE,
- Changed: import subprocess → stderr=subprocess.PIPE,
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: def parse_xml(xml_string): → if not isinstance(xml_string, str):
- Changed: """Secure XML parsing with XXE protection""" → """Secure XML parsing with XXE protection and strict validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure response handling with validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure command execution with strict validation"""
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: # Validate and sanitize input XML string → # Validate parsed data structure
- Changed: try: → try:
- Changed: if not xml_string: → if not isinstance(xml_string, str):
- Changed: if not xml_string: → if not isinstance(cmd, str):
- Changed: if not xml_string: → if not cmd_args:
- Changed: raise ValueError("Empty XML string") → logging.error("Invalid input type for XML parsing")
- Changed: raise ValueError("Empty XML string") → raise TypeError("XML input must be a string")
- Changed: raise ValueError("Empty XML string") → raise TypeError("Response data must be a string")
- Changed: raise ValueError("Empty XML string") → raise ValueError("Response must be a JSON object or array")
- Changed: raise ValueError("Empty XML string") → raise TypeError("Command must be a string")
- Changed: raise ValueError("Empty XML string") → raise ValueError("Empty command")
- Changed: raise ValueError("Empty XML string") → raise ValueError(f"Command not allowed: {base_cmd}")
- Changed: raise ValueError("Empty XML string") → raise ValueError(f"Command flags not allowed: {arg}")
- Changed: raise ValueError("Empty XML string") → raise ValueError(f"Invalid character in argument: {arg}")
- Changed: except ValueError as e: → except (json.JSONDecodeError, ValueError) as e:
- Changed: except ValueError as e: → except (ValueError, subprocess.SubprocessError) as e:
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: # Parse XML string securely → # Validate parsed data structure
- Changed: try: → try:
- Changed: tree = ET.fromstring(xml_string, parser=parser) → if not isinstance(xml_string, str):
- Changed: except ET.ParseError as e: → except (json.JSONDecodeError, ValueError) as e:
- Changed: except ET.ParseError as e: → except (ValueError, subprocess.SubprocessError) as e:
- Changed: # Handle XML parsing errors → # Set strict parsing options
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: return tree → return parsed_data
- Changed: return tree → return result.stdout
- Changed: # Secure command execution → """Secure command execution with strict validation"""
- Changed: cmd_args = shlex.split(cmd) → cmd_args = shlex.split(cmd)
- Changed: return result.stdout → return parsed_data
- Changed: return result.stdout → return result.stdout
Removed:
- def make_request(url):
- return requests.get(url, verify=False)
- import defusedxml.ElementTree as ET
- parser = ET.XMLParser(resolve_entities=False)
- import defusedxml.ElementTree as ET
- def make_request(url):
- return requests.get(url, verify=False)
- parser = ET.XMLParser(resolve_entities=False)
- xml_string = xml_string.strip()
- print(f"Error: {e}")
- print(f"Error: {e}")
- result = subprocess.run(cmd_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
- Applied security fixes to: /workspaces/agentic-security/tests/samples/app.py, /workspaces/agentic-security/tests/samples/auth.py, /workspaces/agentic-security/tests/samples/api.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples/app.py: Added:
- subprocess.run(['echo', user_input], shell=False, check=True)...
- subprocess.run(['echo', user_input], shell=False, check=True)...
Modified:
- Changed: import os → import subprocess
- Changed: import os → import shlex
- Changed: # Security Issue 2: Command Injection → # Use subprocess with shell=False for safe command execution
- Changed: # Command injection vulnerability → # Safe command execution using subprocess
- Changed: return f"{comment}" → return f"{escape(comment)}"
Removed:
- os.system(f"echo {user_input}")
- os.system(f"echo {user_input}")
Changes in /workspaces/agentic-security/tests/samples/api.py: Added:
- r'<!ENTITY',...
- r'<!DOCTYPE',...
- r'<!ELEMENT',...
- r'<!ATTLIST',...
- r'<?xml-stylesheet',...
- r'data:',...
- r'file:',...
- r'gopher:',...
- r'http:',...
- r'ftp:'...
- ]...
- if not isinstance(response_data, str):...
- parse_float=decimal.Decimal, # Use Decimal for precise floating point...
- parse_constant=lambda x: ValueError(f'Invalid constant {x}') # Reject inf/nan...
- if not isinstance(parsed_data, (dict, list)):...
- New import: from typing import Union, Dict, List, Any
- ALLOWED_COMMANDS = {'ls', 'dir', 'echo', 'pwd'}...
- base_cmd = cmd_args[0].lower()...
- if base_cmd not in ALLOWED_COMMANDS:...
- for arg in cmd_args[1:]:...
- if arg.startswith('-'):...
- if any(c in arg for c in ';&|$()`'):...
- cmd_args,...
- text=True,...
- shell=False, # Prevent shell injection...
- timeout=10, # Prevent hanging...
- check=True # Raise on non-zero exit...
Modified:
- Changed: """Secure XML parsing with XXE protection""" → """Secure XML parsing with XXE protection and strict validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure response handling with validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure command execution with strict validation"""
- Changed: # Additional input validation → # Comprehensive input validation
- Changed: # Additional input validation → # Additional argument validation
- Changed: # Reject XML strings with potential malicious patterns → """Secure XML parsing with XXE protection and strict validation"""
- Changed: # Reject XML strings with potential malicious patterns → raise ValueError(f"Invalid XML string: Potentially malicious content detected: {pattern}")
- Changed: malicious_patterns = [r'<!ENTITY', r'<!DOCTYPE'] → malicious_patterns = [
- Changed: raise ValueError("Invalid XML string: Potential XXE attack detected") → raise ValueError(f"Invalid XML string: Potentially malicious content detected: {pattern}")
- Changed: raise ValueError("Invalid XML string: Potential XXE attack detected") → raise ValueError(f"Invalid character in argument: {arg}")
- Changed: # Handle invalid input with proper error logging → logging.error("Invalid input type for XML parsing")
- Changed: logging.error(f"Error: {e}") → logging.error("Invalid input type for XML parsing")
- Changed: logging.error(f"Error: {e}") → logging.error(f"XML Validation Error: {str(e)}")
- Changed: logging.error(f"Error: {e}") → logging.error("Invalid response data type")
- Changed: logging.error(f"Error: {e}") → logging.error(f"JSON Processing Error: {str(e)}")
- Changed: logging.error(f"Error: {e}") → logging.error("Invalid command type")
- Changed: logging.error(f"Error: {e}") → logging.error(f"Command Execution Error: {str(e)}")
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: """Secure response handling""" → """Secure response handling with validation"""
- Changed: return json.loads(response_data) → parsed_data = json.loads(
- Changed: return json.loads(response_data) → response_data,
- Changed: return json.loads(response_data) → return parsed_data
- Changed: except json.JSONDecodeError: → except (json.JSONDecodeError, ValueError) as e:
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: def parse_xml(xml_string): → if not isinstance(xml_string, str):
- Changed: # Secure XML parsing → # Set strict parsing options
- Changed: return ET.fromstring(xml_string, parser=parser) → if not isinstance(xml_string, str):
- Changed: import requests → import json
- Changed: import requests → import decimal
- Changed: import requests → import subprocess
- Changed: import requests → import shlex
- Changed: import requests → import re
- Changed: import subprocess → import json
- Changed: import subprocess → import decimal
- Changed: import subprocess → import logging
- Changed: import subprocess → import subprocess
- Changed: import subprocess → import shlex
- Changed: import subprocess → import re
- Changed: import subprocess → result = subprocess.run(
- Changed: import subprocess → stdout=subprocess.PIPE,
- Changed: import subprocess → stderr=subprocess.PIPE,
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: def parse_xml(xml_string): → if not isinstance(xml_string, str):
- Changed: """Secure XML parsing with XXE protection""" → """Secure XML parsing with XXE protection and strict validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure response handling with validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure command execution with strict validation"""
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: # Validate and sanitize input XML string → # Validate parsed data structure
- Changed: try: → try:
- Changed: if not xml_string: → if not isinstance(xml_string, str):
- Changed: if not xml_string: → if not isinstance(cmd, str):
- Changed: if not xml_string: → if not cmd_args:
- Changed: raise ValueError("Empty XML string") → logging.error("Invalid input type for XML parsing")
- Changed: raise ValueError("Empty XML string") → raise TypeError("XML input must be a string")
- Changed: raise ValueError("Empty XML string") → raise TypeError("Response data must be a string")
- Changed: raise ValueError("Empty XML string") → raise ValueError("Response must be a JSON object or array")
- Changed: raise ValueError("Empty XML string") → raise TypeError("Command must be a string")
- Changed: raise ValueError("Empty XML string") → raise ValueError("Empty command")
- Changed: raise ValueError("Empty XML string") → raise ValueError(f"Command not allowed: {base_cmd}")
- Changed: raise ValueError("Empty XML string") → raise ValueError(f"Command flags not allowed: {arg}")
- Changed: raise ValueError("Empty XML string") → raise ValueError(f"Invalid character in argument: {arg}")
- Changed: except ValueError as e: → except (json.JSONDecodeError, ValueError) as e:
- Changed: except ValueError as e: → except (ValueError, subprocess.SubprocessError) as e:
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: # Parse XML string securely → # Validate parsed data structure
- Changed: try: → try:
- Changed: tree = ET.fromstring(xml_string, parser=parser) → if not isinstance(xml_string, str):
- Changed: except ET.ParseError as e: → except (json.JSONDecodeError, ValueError) as e:
- Changed: except ET.ParseError as e: → except (ValueError, subprocess.SubprocessError) as e:
- Changed: # Handle XML parsing errors → # Set strict parsing options
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: return tree → return parsed_data
- Changed: return tree → return result.stdout
- Changed: # Secure command execution → """Secure command execution with strict validation"""
- Changed: cmd_args = shlex.split(cmd) → cmd_args = shlex.split(cmd)
- Changed: return result.stdout → return parsed_data
- Changed: return result.stdout → return result.stdout
Removed:
- def make_request(url):
- return requests.get(url, verify=False)
- import defusedxml.ElementTree as ET
- parser = ET.XMLParser(resolve_entities=False)
- import defusedxml.ElementTree as ET
- def make_request(url):
- return requests.get(url, verify=False)
- parser = ET.XMLParser(resolve_entities=False)
- xml_string = xml_string.strip()
- print(f"Error: {e}")
- print(f"Error: {e}")
- result = subprocess.run(cmd_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
- Applied security fixes to: /workspaces/agentic-security/tests/samples/app.py, /workspaces/agentic-security/tests/samples/auth.py, /workspaces/agentic-security/tests/samples/api.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples/app.py: Added:
- subprocess.run(['echo', user_input], shell=False, check=True)...
- subprocess.run(['echo', user_input], shell=False, check=True)...
Modified:
- Changed: import os → import subprocess
- Changed: import os → import shlex
- Changed: # Security Issue 2: Command Injection → # Use subprocess with shell=False for safe command execution
- Changed: # Command injection vulnerability → # Safe command execution using subprocess
- Changed: return f"{comment}" → return f"{escape(comment)}"
Removed:
- os.system(f"echo {user_input}")
- os.system(f"echo {user_input}")
Changes in /workspaces/agentic-security/tests/samples/auth.py: Added:
- New class: TokenError
- """Custom exception for token-related errors"""...
- if not isinstance(password, str) or not isinstance(stored_hash, bytes):...
- if not isinstance(password, str):...
- raise ValueError("Password must be a string")...
- if not isinstance(user_data, dict):...
- raise ValueError("User data must be a dictionary")...
- safe_data = {...
- 'user_id': str(user_data.get('user_id', '')),...
- 'username': str(user_data.get('username', '')),...
- 'role': str(user_data.get('role', 'user'))...
- }...
- Args:...
- secret: Secret key used for token verification...
- Dict containing verified token data or None if verification fails...
- Raises:...
- TokenError: If token format or content is invalid...
- if not isinstance(token, str) or not isinstance(secret, str):...
- raise TokenError("Invalid token or secret format")...
- if not isinstance(data, dict):...
- raise TokenError("Invalid token payload structure")...
- required_fields = {...
- 'user_id': str,...
- 'username': str,...
- 'role': str,...
- 'exp': (int, float) # exp can be int or float timestamp...
- }...
- for field, expected_type in required_fields.items():...
- if field not in data:...
- raise TokenError(f"Missing required field: {field}")...
- if not isinstance(data[field], expected_type):...
- raise TokenError(f"Invalid type for field: {field}")...
- except Exception as e:...
- raise TokenError(f"Token verification failed: {str(e)}")...
Modified:
- Updated function: authenticate_user
- Updated function: generate_token
- Changed: """Secure token generation with proper secret and expiration""" → """Secure token generation with proper secret and expiration"""
- Changed: """Secure token generation with proper secret and expiration""" → Secure token verification with expiration check and safe deserialization
- Changed: secret = secrets.token_hex(32) # Generate secure secret key → secret = secrets.token_hex(32)
- Changed: user_data['exp'] = datetime.utcnow() + timedelta(hours=expiry_hours) → safe_data['exp'] = datetime.utcnow() + timedelta(hours=expiry_hours)
- Changed: token = jwt.encode(user_data, secret, algorithm='HS256') → token = jwt.encode(safe_data, secret, algorithm='HS256')
- Changed: return token, secret → return False
- Changed: return token, secret → return token, secret
- Changed: return token, secret → token: JWT token string
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Updated function: hash_password
- Changed: def verify_token(token, secret): → return token, secret
- Updated function: verify_token
- Changed: """Secure token verification with expiration check""" → """Secure token generation with proper secret and expiration"""
- Changed: """Secure token verification with expiration check""" → Secure token verification with expiration check and safe deserialization
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Updated function: hash_password
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: from datetime import datetime, timedelta → from typing import Dict, Optional, Tuple, Union
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Updated function: generate_token
- Changed: """Secure token generation with proper secret and expiration""" → """Secure token generation with proper secret and expiration"""
- Changed: """Secure token generation with proper secret and expiration""" → Secure token verification with expiration check and safe deserialization
- Changed: secret = secrets.token_hex(32) # Generate secure secret key → secret = secrets.token_hex(32)
- Changed: user_data['exp'] = datetime.utcnow() + timedelta(hours=expiry_hours) → safe_data['exp'] = datetime.utcnow() + timedelta(hours=expiry_hours)
- Changed: token = jwt.encode(user_data, secret, algorithm='HS256') → token = jwt.encode(safe_data, secret, algorithm='HS256')
- Changed: return token, secret → return False
- Changed: return token, secret → return token, secret
- Changed: return token, secret → token: JWT token string
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: def verify_token(token, secret): → return token, secret
- Updated function: verify_token
- Changed: """Secure token verification with expiration check""" → """Secure token generation with proper secret and expiration"""
- Changed: """Secure token verification with expiration check""" → Secure token verification with expiration check and safe deserialization
- Changed: data = jwt.decode(token, secret, algorithms=['HS256']) → token = jwt.encode(safe_data, secret, algorithm='HS256')
- Changed: return data → return False
- Changed: return data → return token, secret
- Changed: return data → Returns:
- Changed: return None → return False
- Changed: return None → return token, secret
- Changed: return None → Returns:
- Changed: return None → return False
- Changed: return None → return token, secret
- Changed: return None → Returns:
Removed:
- import bcrypt
- """Secure password hashing with bcrypt"""
- salt = bcrypt.gensalt()
- return bcrypt.hashpw(password.encode(), salt)
- import secrets
- try:
- except jwt.ExpiredSignatureError:
- except jwt.InvalidTokenError: Changes in /workspaces/agentic-security/tests/samples/api.py: Added:
- r'<!ENTITY',...
- r'<!DOCTYPE',...
- r'<!ELEMENT',...
- r'<!ATTLIST',...
- r'<?xml-stylesheet',...
- r'data:',...
- r'file:',...
- r'gopher:',...
- r'http:',...
- r'ftp:'...
- ]...
- if not isinstance(response_data, str):...
- parse_float=decimal.Decimal, # Use Decimal for precise floating point...
- parse_constant=lambda x: ValueError(f'Invalid constant {x}') # Reject inf/nan...
- if not isinstance(parsed_data, (dict, list)):...
- New import: from typing import Union, Dict, List, Any
- ALLOWED_COMMANDS = {'ls', 'dir', 'echo', 'pwd'}...
- base_cmd = cmd_args[0].lower()...
- if base_cmd not in ALLOWED_COMMANDS:...
- for arg in cmd_args[1:]:...
- if arg.startswith('-'):...
- if any(c in arg for c in ';&|$()`'):...
- cmd_args,...
- text=True,...
- shell=False, # Prevent shell injection...
- timeout=10, # Prevent hanging...
- check=True # Raise on non-zero exit...
Modified:
- Changed: """Secure XML parsing with XXE protection""" → """Secure XML parsing with XXE protection and strict validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure response handling with validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure command execution with strict validation"""
- Changed: # Additional input validation → # Comprehensive input validation
- Changed: # Additional input validation → # Additional argument validation
- Changed: # Reject XML strings with potential malicious patterns → """Secure XML parsing with XXE protection and strict validation"""
- Changed: # Reject XML strings with potential malicious patterns → raise ValueError(f"Invalid XML string: Potentially malicious content detected: {pattern}")
- Changed: malicious_patterns = [r'<!ENTITY', r'<!DOCTYPE'] → malicious_patterns = [
- Changed: raise ValueError("Invalid XML string: Potential XXE attack detected") → raise ValueError(f"Invalid XML string: Potentially malicious content detected: {pattern}")
- Changed: raise ValueError("Invalid XML string: Potential XXE attack detected") → raise ValueError(f"Invalid character in argument: {arg}")
- Changed: # Handle invalid input with proper error logging → logging.error("Invalid input type for XML parsing")
- Changed: logging.error(f"Error: {e}") → logging.error("Invalid input type for XML parsing")
- Changed: logging.error(f"Error: {e}") → logging.error(f"XML Validation Error: {str(e)}")
- Changed: logging.error(f"Error: {e}") → logging.error("Invalid response data type")
- Changed: logging.error(f"Error: {e}") → logging.error(f"JSON Processing Error: {str(e)}")
- Changed: logging.error(f"Error: {e}") → logging.error("Invalid command type")
- Changed: logging.error(f"Error: {e}") → logging.error(f"Command Execution Error: {str(e)}")
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: """Secure response handling""" → """Secure response handling with validation"""
- Changed: return json.loads(response_data) → parsed_data = json.loads(
- Changed: return json.loads(response_data) → response_data,
- Changed: return json.loads(response_data) → return parsed_data
- Changed: except json.JSONDecodeError: → except (json.JSONDecodeError, ValueError) as e:
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: def parse_xml(xml_string): → if not isinstance(xml_string, str):
- Changed: # Secure XML parsing → # Set strict parsing options
- Changed: return ET.fromstring(xml_string, parser=parser) → if not isinstance(xml_string, str):
- Changed: import requests → import json
- Changed: import requests → import decimal
- Changed: import requests → import subprocess
- Changed: import requests → import shlex
- Changed: import requests → import re
- Changed: import subprocess → import json
- Changed: import subprocess → import decimal
- Changed: import subprocess → import logging
- Changed: import subprocess → import subprocess
- Changed: import subprocess → import shlex
- Changed: import subprocess → import re
- Changed: import subprocess → result = subprocess.run(
- Changed: import subprocess → stdout=subprocess.PIPE,
- Changed: import subprocess → stderr=subprocess.PIPE,
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: def parse_xml(xml_string): → if not isinstance(xml_string, str):
- Changed: """Secure XML parsing with XXE protection""" → """Secure XML parsing with XXE protection and strict validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure response handling with validation"""
- Changed: """Secure XML parsing with XXE protection""" → """Secure command execution with strict validation"""
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: # Validate and sanitize input XML string → # Validate parsed data structure
- Changed: try: → try:
- Changed: if not xml_string: → if not isinstance(xml_string, str):
- Changed: if not xml_string: → if not isinstance(cmd, str):
- Changed: if not xml_string: → if not cmd_args:
- Changed: raise ValueError("Empty XML string") → logging.error("Invalid input type for XML parsing")
- Changed: raise ValueError("Empty XML string") → raise TypeError("XML input must be a string")
- Changed: raise ValueError("Empty XML string") → raise TypeError("Response data must be a string")
- Changed: raise ValueError("Empty XML string") → raise ValueError("Response must be a JSON object or array")
- Changed: raise ValueError("Empty XML string") → raise TypeError("Command must be a string")
- Changed: raise ValueError("Empty XML string") → raise ValueError("Empty command")
- Changed: raise ValueError("Empty XML string") → raise ValueError(f"Command not allowed: {base_cmd}")
- Changed: raise ValueError("Empty XML string") → raise ValueError(f"Command flags not allowed: {arg}")
- Changed: raise ValueError("Empty XML string") → raise ValueError(f"Invalid character in argument: {arg}")
- Changed: except ValueError as e: → except (json.JSONDecodeError, ValueError) as e:
- Changed: except ValueError as e: → except (ValueError, subprocess.SubprocessError) as e:
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: # Parse XML string securely → # Validate parsed data structure
- Changed: try: → try:
- Changed: tree = ET.fromstring(xml_string, parser=parser) → if not isinstance(xml_string, str):
- Changed: except ET.ParseError as e: → except (json.JSONDecodeError, ValueError) as e:
- Changed: except ET.ParseError as e: → except (ValueError, subprocess.SubprocessError) as e:
- Changed: # Handle XML parsing errors → # Set strict parsing options
- Changed: return None → return parsed_data
- Changed: return None → return result.stdout
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: return tree → return parsed_data
- Changed: return tree → return result.stdout
- Changed: # Secure command execution → """Secure command execution with strict validation"""
- Changed: cmd_args = shlex.split(cmd) → cmd_args = shlex.split(cmd)
- Changed: return result.stdout → return parsed_data
- Changed: return result.stdout → return result.stdout
Removed:
- def make_request(url):
- return requests.get(url, verify=False)
- import defusedxml.ElementTree as ET
- parser = ET.XMLParser(resolve_entities=False)
- import defusedxml.ElementTree as ET
- def make_request(url):
- return requests.get(url, verify=False)
- parser = ET.XMLParser(resolve_entities=False)
- xml_string = xml_string.strip()
- print(f"Error: {e}")
- print(f"Error: {e}")
- result = subprocess.run(cmd_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
- Applied security fixes to: /workspaces/agentic-security/tests/samples
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples:Failed to generate diff summary: [Errno 21] Is a directory: '/workspaces/agentic-security/tests/samples'
- Applied security fixes to: /workspaces/agentic-security/tests/samples/auth.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples/auth.py: Added:
- New import: import hmac
- New import: from functools import wraps
- New import: from time import time
- New class: AuthenticationError
- _auth_attempts = {}...
- _MAX_ATTEMPTS = 5...
- _LOCKOUT_TIME = 300 # 5 minutes in seconds...
- New function: _check_rate_limit
- current_time = time()...
- if username in _auth_attempts:...
- attempts, lockout_time = _auth_attempts[username]...
- if current_time < lockout_time:...
- raise AuthenticationError("Too many attempts. Please try again later.")...
- if current_time - lockout_time > _LOCKOUT_TIME:...
- _auth_attempts[username] = (1, current_time)...
- else:...
- if attempts >= _MAX_ATTEMPTS:...
- _auth_attempts[username] = (attempts, current_time)...
- raise AuthenticationError("Too many attempts. Please try again later.")...
- _auth_attempts[username] = (attempts + 1, current_time)...
- else:...
- _auth_attempts[username] = (1, current_time)...
- _check_rate_limit(username)...
- is_valid = hmac.compare_digest(...
- if not is_valid:...
- raise AuthenticationError("Invalid credentials")...
- raise AuthenticationError("Authentication failed")...
- 'alg': 'HS512',...
- 'enc': 'none'...
- data = jwt.decode(...
- token,...
- secret,...
- algorithms=['HS512'],...
- options={...
- 'verify_signature': True,...
- 'verify_exp': True,...
- 'verify_nbf': True,...
- 'verify_iat': True,...
- 'verify_aud': True,...
- 'require': ['exp', 'iat', 'nbf']...
- }...
Modified:
- Changed: """Secure authentication with bcrypt""" → """Custom exception for authentication-related errors"""
- Changed: """Secure authentication with bcrypt""" → """Check if authentication attempts are within allowed limits"""
- Changed: """Secure authentication with bcrypt""" → """Secure authentication with bcrypt and rate limiting"""
- Changed: """Secure authentication with bcrypt""" → raise AuthenticationError("Invalid input format")
- Changed: """Secure authentication with bcrypt""" → except AuthenticationError:
- Changed: return False → return True
- Changed: return bcrypt.checkpw(password.encode(), stored_hash) → bcrypt.hashpw(password.encode(), stored_hash),
- Changed: except Exception: → except AuthenticationError:
- Changed: except Exception: → except Exception as e:
- Changed: return False → return True
- Changed: # Use more secure JWT options → # Use more secure JWT options with additional headers
- Changed: 'typ': 'JWT' → 'typ': 'JWT',
- Changed: 'typ': 'JWT' → 'cty': 'JWT',
Removed:
- data = jwt.decode(token, secret, algorithms=['HS256'])
- Applied security fixes to: /workspaces/agentic-security/tests/samples/auth.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples/auth.py:Failed to generate diff summary: list index out of range
- Applied security fixes to: /workspaces/agentic-security/tests/samples/auth.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples/auth.py: Added:
- New import: from typing import Dict, Optional
- _redis_pool: Optional[ConnectionPool] = None...
- _redis_pool = ConnectionPool(...
- host='localhost',...
- port=6379,...
- db=0,...
- ssl=True,...
- ssl_cert_reqs='required',...
- ssl_ca_certs='/etc/ssl/certs/ca-certificates.crt',...
- max_connections=10,...
- socket_timeout=5.0,...
- retry_on_timeout=True...
- if not isinstance(ip_address, str) or not ip_address.strip():...
- raise AuthenticationError("Invalid IP address format")...
- pipe = redis.pipeline()...
- raise AuthenticationError("Too many attempts from this IP address")...
- pipe.execute()...
- 'zip': 'none', # Explicitly disable compression...
- 'x5t': secrets.token_urlsafe(32), # Add thumbprint for key tracking...
- 'jku': None, # Explicitly disable JWK URL...
- 'x5u': None # Explicitly disable x509 URL...
Modified:
- Changed: # Redis connection for rate limiting → from redis.connection import ConnectionPool
- Changed: # Redis connection for rate limiting → # Redis connection pool for rate limiting
- Changed: """Get Redis connection with lazy initialization""" → from redis.connection import ConnectionPool
- Changed: """Get Redis connection with lazy initialization""" → # Redis connection pool for rate limiting
- Changed: """Get Redis connection with lazy initialization""" → """Get Redis connection with connection pooling and SSL verification"""
- Changed: global _redis_client → global _redis_pool, _redis_client
- Changed: global _redis_client → if _redis_pool is None:
- Changed: _redis_client = Redis(host='localhost', port=6379, db=0, ssl=True) → _redis_client = Redis(connection_pool=_redis_pool)
- Updated function: _check_rate_limit
- Changed: def _check_rate_limit(username: str) -> None: → _check_rate_limit(username, ip_address)
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: redis_key = f"{_REDIS_KEY_PREFIX}{username}" → user_key = f"{_REDIS_KEY_PREFIX}user:{username}"
- Changed: redis_key = f"{_REDIS_KEY_PREFIX}{username}" → ip_key = f"{_REDIS_KEY_PREFIX}ip:{ip_address}"
- Changed: attempts = int(redis.get(redis_key) or 0) → user_attempts = int(redis.get(user_key) or 0)
- Changed: attempts = int(redis.get(redis_key) or 0) → ip_attempts = int(redis.get(ip_key) or 0)
- Changed: attempts = int(redis.get(redis_key) or 0) → lockout_remaining = redis.ttl(ip_key)
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: if attempts >= _MAX_ATTEMPTS: → if user_attempts >= _MAX_ATTEMPTS:
- Changed: if attempts >= _MAX_ATTEMPTS: → if ip_attempts >= _MAX_ATTEMPTS * 2: # Higher limit for IPs
- Changed: lockout_remaining = redis.ttl(redis_key) → lockout_remaining = redis.ttl(user_key)
- Changed: lockout_remaining = redis.ttl(redis_key) → lockout_remaining = redis.ttl(ip_key)
- Changed: lockout_remaining = redis.ttl(redis_key) → if lockout_remaining > 0:
- Changed: logger.warning(f"Rate limit exceeded for user: {username}") → logger.warning(f"User rate limit exceeded: {username}")
- Changed: logger.warning(f"Rate limit exceeded for user: {username}") → logger.warning(f"IP rate limit exceeded: {ip_address}")
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: # Increment attempts and set expiry → # Increment both counters atomically
- Changed: redis.incr(redis_key) → redis = _get_redis()
- Changed: redis.incr(redis_key) → pipe.incr(user_key)
- Changed: redis.incr(redis_key) → pipe.incr(ip_key)
- Changed: redis.expire(redis_key, _LOCKOUT_TIME) → pipe.expire(user_key, _LOCKOUT_TIME)
- Changed: redis.expire(redis_key, _LOCKOUT_TIME) → pipe.expire(ip_key, _LOCKOUT_TIME)
- Updated function: authenticate_user
- Updated function: authenticate_user
- Changed: _check_rate_limit(username) → def _check_rate_limit(username: str, ip_address: str) -> None:
- Changed: _check_rate_limit(username) → # Rate limit both by username and IP
- Changed: _check_rate_limit(username) → _check_rate_limit(username, ip_address)
- Changed: # Use more secure JWT options with additional headers → # Use more secure JWT options with enhanced headers
- Changed: 'kid': secrets.token_hex(8), → 'kid': secrets.token_hex(16), # Increased key ID length
- Changed: 'enc': 'none' → 'enc': 'none',
Removed:
- current_time = time()
- Applied security fixes to: /workspaces/agentic-security/tests/samples/auth.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples/auth.py: Added:
- New import: from redis.retry import Retry
- New import: from redis.backoff import ExponentialBackoff
- New import: from redis.exceptions import ConnectionError, TimeoutError
- retry = Retry(ExponentialBackoff(), 3)...
- socket_keepalive=True,...
- retry=retry,...
- health_check_interval=30...
- try:...
- except (ConnectionError, TimeoutError) as e:...
- logger.error(f"Redis connection failed: {str(e)}")...
- raise AuthenticationError("Authentication service temporarily unavailable")...
- username = username.lower().strip()...
- ip_address = ip_address.strip()...
- windows = {...
- 'minute': 60,...
- 'hour': 3600,...
- 'day': 86400...
- }...
- limits = {...
- 'minute': 3,...
- 'hour': 10,...
- 'day': 20...
- }...
- current_time = int(time())...
- pepper = os.getenv('PASSWORD_PEPPER', secrets.token_hex(32))...
- salted_pass = f"{password}{pepper}".encode()...
- audience: Expected token audience to validate against...
- """Security configuration settings"""...
- New import: import os
- New import: from datetime import timedelta
- RATE_LIMIT_WINDOWS = {...
- 'minute': 60,...
- 'hour': 3600,...
- 'day': 86400...
- }...
- RATE_LIMIT_ATTEMPTS = {...
- 'minute': 3,...
- 'hour': 10,...
- 'day': 20...
- }...
- MIN_PASSWORD_LENGTH = 12...
- PASSWORD_ROUNDS = 14...
- PASSWORD_PEPPER = os.getenv('PASSWORD_PEPPER', None)...
- TOKEN_EXPIRY = timedelta(hours=24)...
- TOKEN_ALGORITHM = 'HS512'...
- REQUIRED_CLAIMS = ['exp', 'iat', 'nbf', 'aud', 'sub']...
- REDIS_CONFIG = {...
- 'host': os.getenv('REDIS_HOST', 'localhost'),...
- 'port': int(os.getenv('REDIS_PORT', 6379)),...
- 'db': int(os.getenv('REDIS_DB', 0)),...
- 'ssl': True,...
- 'ssl_cert_reqs': 'required',...
- 'ssl_ca_certs': '/etc/ssl/certs/ca-certificates.crt',...
- 'socket_timeout': 5.0,...
- 'socket_keepalive': True,...
- 'health_check_interval': 30...
- }...
Modified:
- Changed: """Get Redis connection with connection pooling and SSL verification""" → """Get Redis connection with connection pooling, SSL verification and retries"""
- Changed: retry_on_timeout=True → retry_on_timeout=True,
- Changed: if _redis_client is None: → if _redis_client is None:
- Changed: if _redis_client is None: → _redis_client = Redis(connection_pool=_redis_pool)
- Changed: if _redis_client is None: → _redis_client.ping()
- Changed: if _redis_client is None: → return _redis_client
- Changed: _redis_client = Redis(connection_pool=_redis_pool) → if _redis_client is None:
- Changed: _redis_client = Redis(connection_pool=_redis_pool) → _redis_client = Redis(connection_pool=_redis_pool)
- Changed: return _redis_client → if _redis_client is None:
- Changed: return _redis_client → _redis_client.ping()
- Changed: return _redis_client → return _redis_client
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: # Use stronger work factor for bcrypt → # Use stronger work factor for bcrypt and add pepper
- Changed: salt = bcrypt.gensalt(rounds=12) → salt = bcrypt.gensalt(rounds=14)
- Changed: return bcrypt.hashpw(password.encode(), salt) → return bcrypt.hashpw(salted_pass, salt)
- Updated function: verify_token
Removed:
- Applied security fixes to: /workspaces/agentic-security/tests/samples/auth.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples/auth.py:Failed to generate diff summary: list index out of range
- Applied security fixes to: /workspaces/agentic-security/tests/samples/auth.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples/auth.py: Added:
- _MAX_POOL_SIZE = 10...
- _MAX_POOL_TIMEOUT = 5...
- """Security configuration constants"""...
- New import: from typing import Dict, Final
- RATE_LIMIT_WINDOWS: Final[Dict[str, int]] = {...
- 'minute': 60,...
- 'hour': 3600,...
- 'day': 86400,...
- 'week': 604800...
- }...
- RATE_LIMIT_ATTEMPTS: Final[Dict[str, int]] = {...
- 'minute': 3,...
- 'hour': 10,...
- 'day': 20,...
- 'week': 50...
- }...
- REDIS_MAX_POOL_SIZE: Final[int] = 10...
- REDIS_POOL_TIMEOUT: Final[int] = 5...
- REDIS_CONNECT_TIMEOUT: Final[float] = 2.0...
- REDIS_SOCKET_TIMEOUT: Final[float] = 3.0...
- MIN_PASSWORD_LENGTH: Final[int] = 16...
- BCRYPT_ROUNDS: Final[int] = 16...
- PASSWORD_COMPLEXITY: Final[Dict[str, str]] = {...
- 'uppercase': r'[A-Z]',...
- 'lowercase': r'[a-z]',...
- 'numbers': r'\d',...
- 'special': r'[!@#$%^&*(),.?":{}|<>]'...
- }...
- TOKEN_EXPIRY_HOURS: Final[int] = 1...
- TOKEN_ALGORITHM: Final[str] = 'HS512'...
- REQUIRED_TOKEN_CLAIMS: Final[tuple] = (...
- 'exp', 'iat', 'nbf', 'aud', 'sub', 'jti',...
- 'auth_time', 'nonce', 'sid'...
- KEY_ROTATION_HOURS: Final[int] = 24...
- KEY_LENGTH_BYTES: Final[int] = 64 # 512 bits...
Modified:
- Changed: # Redis connection pool for rate limiting → # Redis connection pool for rate limiting with strict limits
- Changed: # Redis connection pool for rate limiting → # Redis security settings
- Changed: 'max_connections': int(os.getenv('REDIS_MAX_CONNECTIONS', '20')), → 'max_connections': int(os.getenv('REDIS_MAX_CONNECTIONS', str(_MAX_POOL_SIZE))),
- Changed: # Use work factor 15 for increased security → # Use work factor 16 for increased security (industry recommended minimum)
- Changed: # Use work factor 15 for increased security → # Password security
- Changed: salt = bcrypt.gensalt(rounds=15) → salt = bcrypt.gensalt(rounds=16)
- Changed: expiry_hours: int = 12, # Reduced default expiry → expiry_hours: int = 1, # Reduced token lifetime for security
Removed:
- Applied security fixes to: /workspaces/agentic-security/tests/samples/auth.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples/auth.py: Added:
- New import: import ipaddress
- try:...
- ipaddress.ip_address(ip_address)...
- except ValueError:...
- raise AuthenticationError("Invalid IP address format")...
- ip_address: str = None...
- if ip_address:...
- redis = _get_redis()...
- token_key = f"token_gen:{ip_address}"...
- token_count = redis.incr(token_key)...
- if token_count == 1:...
- redis.expire(token_key, 3600) # 1 hour window...
- if token_count > 10: # Max 10 tokens per hour per IP...
- raise TokenError("Token generation rate limit exceeded")...
- try:...
- jwt.get_unverified_header(token)...
- except jwt.InvalidTokenError:...
- return None...
- New import: from auth_config import KEY_ROTATION_INTERVAL, MINIMUM_KEY_AGE, MAX_KEY_AGE
- key_age = time.time() - os.path.getmtime(self._key_path)...
- if key_age < MINIMUM_KEY_AGE:...
- return...
- if key_age > MAX_KEY_AGE:...
- return...
- if key_age > KEY_ROTATION_INTERVAL:...
- """Security configuration constants"""...
- New import: from typing import Dict, Final
- New import: import os
- TOKEN_RATE_LIMIT_WINDOW: Final[int] = 3600 # 1 hour...
- TOKEN_RATE_LIMIT_MAX: Final[int] = 10 # tokens per window...
- KEY_ROTATION_INTERVAL: Final[int] = 3600 # 1 hour...
- MINIMUM_KEY_AGE: Final[int] = 300 # 5 minutes...
- MAX_KEY_AGE: Final[int] = 7200 # 2 hours...
- REDIS_SECURITY_CONFIG: Final[Dict] = {...
- 'ssl': True,...
- 'ssl_cert_reqs': 'required',...
- 'ssl_ca_certs': os.getenv('REDIS_CA_CERTS', '/etc/ssl/certs/ca-certificates.crt'),...
- 'socket_timeout': 3.0,...
- 'socket_connect_timeout': 2.0,...
- 'health_check_interval': 30,...
- 'retry_on_timeout': True...
- }...
- TOKEN_SECURITY_CONFIG: Final[Dict] = {...
- 'algorithms': ['HS512'],...
- 'verify_signature': True,...
- 'verify_exp': True,...
- 'verify_nbf': True,...
- 'verify_iat': True,...
- 'verify_aud': True,...
- 'require': ['exp', 'iat', 'nbf', 'aud', 'sub', 'jti']...
- }...
Modified:
- Changed: expiry_hours: int = 1, # Reduced token lifetime for security → expiry_hours: int = 1,
- Changed: device_info: Optional[Dict] = None → device_info: Optional[Dict] = None,
- Changed: # Decode with explicit type verification → # Rate limit token generation by IP
- Changed: # Decode with explicit type verification → # Then decode with explicit verification
- Changed: """Rotate the secret key""" → """Rotate the secret key with safety checks"""
- Changed: self._generate_new_key() → self._generate_new_key()
- Changed: self._generate_new_key() → self._generate_new_key()
Removed:
- Applied security fixes to: /workspaces/agentic-security/tests/samples/auth.py
- Changes made based on provided instructions
Changes in /workspaces/agentic-security/tests/samples/auth.py: Added:
- ip = ipaddress.ip_address(ip_address)...
- if ip.is_private or ip.is_loopback or ip.is_multicast:...
- except ValueError:...
- try:...
- pepper = os.getenv('PASSWORD_PEPPER')...
- salted_pass = f"{password}{pepper}".encode('utf-8')...
- logger.warning(f"Failed login attempt for user: {username}")...
- logger.info(f"Successful authentication for user: {username}")...
- logger.error(f"Authentication error: {str(e)}")...
- if not all(isinstance(x, str) for x in filter(None, [token, secret, audience])):...
- redis = _get_redis()...
- verify_key = f"token_verify:{datetime.utcnow().strftime('%Y%m%d%H')}"...
- if int(redis.incr(verify_key)) > 1000: # Max 1000 verifications per hour...
- MAX_KEY_AGE, KEY_LENGTH_BYTES)...
- current_time = time.time()...
- try:...
- if key_stats.st_mode & 0o777 != 0o600:...
- logger.warning("Forcing key rotation due to age")...
- logger.info("Performing scheduled key rotation")...
- except OSError as e:...
- logger.error(f"Key file error during rotation: {str(e)}")...
- """Centralized security configuration"""...
- New import: from typing import Dict, Final
- New import: import os
- New import: from datetime import timedelta
- RATE_LIMIT_WINDOWS: Final[Dict[str, int]] = {...
- 'minute': 60,...
- 'hour': 3600,...
- 'day': 86400,...
- 'week': 604800...
- }...
- RATE_LIMIT_ATTEMPTS: Final[Dict[str, int]] = {...
- 'minute': 3,...
- 'hour': 10,...
- 'day': 20,...
- 'week': 50...
- }...
- REDIS_CONFIG: Final[Dict] = {...
- 'host': os.getenv('REDIS_HOST', 'localhost'),...
- 'port': int(os.getenv('REDIS_PORT', '6379')),...
- 'db': int(os.getenv('REDIS_DB', '0')),...
- 'password': os.getenv('REDIS_PASSWORD'),...
- 'ssl': True,...
- 'ssl_cert_reqs': 'required',...
- 'ssl_ca_certs': os.getenv('REDIS_CA_CERTS', '/etc/ssl/certs/ca-certificates.crt'),...
- 'ssl_certfile': os.getenv('REDIS_CERT_FILE'),...
- 'ssl_keyfile': os.getenv('REDIS_KEY_FILE'),...
- 'socket_timeout': 3.0,...
- 'socket_connect_timeout': 2.0,...
- 'socket_keepalive': True,...
- 'health_check_interval': 30,...
- 'retry_on_timeout': True,...
- 'max_connections': 10,...
- 'retry_on_error': [TimeoutError, ConnectionError]...
- }...
- MIN_PASSWORD_LENGTH: Final[int] = 16...
- BCRYPT_ROUNDS: Final[int] = 16...
- PASSWORD_COMPLEXITY: Final[Dict[str, str]] = {...
- 'uppercase': r'[A-Z]',...
- 'lowercase': r'[a-z]',...
- 'numbers': r'\d',...
- 'special': r'[!@#$%^&*(),.?":{}|<>]'...
- }...
- TOKEN_EXPIRY: Final[timedelta] = timedelta(hours=1)...
- TOKEN_ALGORITHM: Final[str] = 'HS512'...
- REQUIRED_TOKEN_CLAIMS: Final[tuple] = (...
- 'exp', 'iat', 'nbf', 'aud', 'sub', 'jti',...
- 'auth_time', 'nonce', 'sid'...
- TOKEN_RATE_LIMIT_WINDOW: Final[int] = 3600 # 1 hour...
- TOKEN_RATE_LIMIT_MAX: Final[int] = 10 # tokens per window...
- MINIMUM_KEY_AGE: Final[int] = 300 # 5 minutes...
- MAX_KEY_AGE: Final[int] = 7200 # 2 hours...
- KEY_LENGTH_BYTES: Final[int] = 64 # 512 bits...
Modified:
- Changed: if not isinstance(password, str) or not isinstance(stored_hash, bytes): → if not isinstance(stored_hash, bytes):
- Changed: raise AuthenticationError("Invalid input format") → raise AuthenticationError("Invalid input types")
- Changed: raise AuthenticationError("Invalid input format") → raise AuthenticationError("Invalid hash format")
- Changed: raise AuthenticationError("Invalid input format") → raise AuthenticationError("Invalid IP address")
- Changed: raise AuthenticationError("Invalid input format") → raise AuthenticationError("Invalid IP address format")
- Changed: raise AuthenticationError("Invalid input format") → raise AuthenticationError("Security configuration error")
- Changed: raise AuthenticationError("Invalid input format") → raise TokenError("Invalid input types")
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: _check_rate_limit(username, ip_address) → _check_rate_limit(username, str(ip))
- Changed: # Use constant-time comparison → # Add pepper before comparing
- Changed: # Use constant-time comparison → # Use constant-time comparison with pepper
- Changed: is_valid = hmac.compare_digest( → if not hmac.compare_digest(computed_hash, stored_hash):
- Changed: bcrypt.hashpw(password.encode(), stored_hash), → computed_hash = bcrypt.hashpw(salted_pass, stored_hash)
- Changed: ) → )
- Changed: if not is_valid: → if not pepper:
- Changed: Secure token verification with expiration check and safe deserialization → Secure token verification with enhanced security checks
- Changed: if not isinstance(token, str) or not isinstance(secret, str): → if not all(isinstance(x, str) for x in (username, password, ip_address)):
- Changed: if not isinstance(token, str) or not isinstance(secret, str): → if not isinstance(stored_hash, bytes):
- Changed: raise TokenError("Invalid token or secret format") → raise AuthenticationError("Invalid input types")
- Changed: raise TokenError("Invalid token or secret format") → raise AuthenticationError("Invalid hash format")
- Changed: raise TokenError("Invalid token or secret format") → raise AuthenticationError("Invalid IP address")
- Changed: raise TokenError("Invalid token or secret format") → raise AuthenticationError("Invalid IP address format")
- Changed: raise TokenError("Invalid token or secret format") → raise TokenError("Invalid input types")
- Changed: raise TokenError("Invalid token or secret format") → raise TokenError("Token verification rate limit exceeded")
- Changed: raise TokenError("Invalid token or secret format") → logger.error("Invalid key file permissions")
- Changed: """Rotate the secret key with safety checks""" → """Rotate the secret key with enhanced security checks"""
- Changed: from auth_config import KEY_ROTATION_INTERVAL, MINIMUM_KEY_AGE, MAX_KEY_AGE → from auth_config import (KEY_ROTATION_INTERVAL, MINIMUM_KEY_AGE,
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: # Check if key is too new to rotate → # Check if key is too new to rotate
- Changed: # Check if key is too new to rotate → logger.debug("Key too new for rotation")
- Changed: # Check if key is too new to rotate → # Force rotation if key is too old
- Changed: key_age = time.time() - os.path.getmtime(self._key_path) → key_age = current_time - os.path.getmtime(self._key_path)
- Changed: key_age = time.time() - os.path.getmtime(self._key_path) → key_stats = os.stat(self._key_path)
- Changed: if key_age < MINIMUM_KEY_AGE: → if key_age < MINIMUM_KEY_AGE:
- Changed: if key_age < MINIMUM_KEY_AGE: → if key_age > MAX_KEY_AGE:
- Changed: if key_age < MINIMUM_KEY_AGE: → if key_age > KEY_ROTATION_INTERVAL:
- Changed: return → return
- Changed: return → return
- Changed: return → return
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: # Force rotation if key is too old → # Check if key is too new to rotate
- Changed: # Force rotation if key is too old → # Force rotation if key is too old
- Changed: # Force rotation if key is too old → # Normal rotation after interval
- Changed: # Force rotation if key is too old → # Key rotation settings
- Changed: if key_age > MAX_KEY_AGE: → if key_age < MINIMUM_KEY_AGE:
- Changed: if key_age > MAX_KEY_AGE: → if key_age > MAX_KEY_AGE:
- Changed: if key_age > MAX_KEY_AGE: → if key_age > KEY_ROTATION_INTERVAL:
- Changed: self._generate_new_key() → self._generate_new_key()
- Changed: self._generate_new_key() → self._generate_new_key()
- Changed: self._generate_new_key() → self._generate_new_key()
- Changed: return → return
- Changed: return → return
- Changed: return → return
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: →
- Changed: # Normal rotation after interval → # Force rotation if key is too old
- Changed: # Normal rotation after interval → # Normal rotation after interval
- Changed: # Normal rotation after interval → # Key rotation settings
- Changed: if key_age > KEY_ROTATION_INTERVAL: → if key_age < MINIMUM_KEY_AGE:
- Changed: if key_age > KEY_ROTATION_INTERVAL: → if key_age > MAX_KEY_AGE:
- Changed: if key_age > KEY_ROTATION_INTERVAL: → if key_age > KEY_ROTATION_INTERVAL:
- Changed: if key_age > KEY_ROTATION_INTERVAL: → KEY_ROTATION_INTERVAL: Final[int] = 3600 # 1 hour
Removed: