Python输入验证与清洗
输入验证是防止注入攻击的核心手段。建立分层验证策略,确保数据安全进入系统。
验证层次架构
Python
输入数据
↓
┌─────────────────────┐
│ 1. 类型验证 │ ← 确保正确的数据类型
└─────────────────────┘
↓
┌─────────────────────┐
│ 2. 格式验证 │ ← 符合预期的格式规范
└─────────────────────┘
↓
┌─────────────────────┐
│ 3. 范围验证 │ ← 在允许的范围内
└─────────────────────┘
↓
┌─────────────────────┐
│ 4. 内容验证 │ ← 不包含危险内容
└─────────────────────┘
↓
┌─────────────────────┐
│ 5. 业务验证 │ ← 符合业务规则
└─────────────────────┘
类型验证
Python
from typing import Any, Union
class TypeValidator:
"类型验证器"
@staticmethod
def validate_type(value: Any, expected_type: Union[type, tuple]) -> Any:
"验证类型"
if not isinstance(value, expected_type):
raise TypeError(
f"期望类型 {expected_type}, 实际 {type(value).__name__}"
)
return value
@staticmethod
def validate_int(value: Any) -> int:
"验证整数"
try:
if isinstance(value, bool): # bool不是int(虽然isinstance返回True)
raise TypeError("bool不是有效整数")
return int(value)
except (TypeError, ValueError):
raise ValueError("无效整数")
@staticmethod
def validate_str(value: Any) -> str:
"验证字符串"
if isinstance(value, bytes):
return value.decode('utf-8')
return TypeValidator.validate_type(value, str)
@staticmethod
def validate_list(value: Any, item_type: type = None) -> list:
"验证列表"
TypeValidator.validate_type(value, list)
if item_type:
for item in value:
TypeValidator.validate_type(item, item_type)
return value
validator = TypeValidator()
safe_int = validator.validate_int("123") # 123
safe_list = validator.validate_list([1, 2, 3], int)
格式验证
Python
import re
from datetime import datetime
class FormatValidator:
"格式验证器"
# 预定义模式
EMAIL_PATTERN = re.compile(
r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
)
PHONE_PATTERN = re.compile(r'^\+?[1-9]\d{1,14}$')
USERNAME_PATTERN = re.compile(r'^[a-zA-Z0-9_]{3,20}$')
DATE_PATTERN = re.compile(r'^\d{4}-\d{2}-\d{2}$')
URL_PATTERN = re.compile(
r'^https?://[a-zA-Z0-9.-]+(:\d+)?(/.*)?$'
)
@staticmethod
def validate_email(email: str) -> str:
"验证邮箱"
email = email.strip().lower()
if not FormatValidator.EMAIL_PATTERN.match(email):
raise ValueError("无效邮箱格式")
return email
@staticmethod
def validate_phone(phone: str) -> str:
"验证电话"
phone = phone.strip()
if not FormatValidator.PHONE_PATTERN.match(phone):
raise ValueError("无效电话格式")
return phone
@staticmethod
def validate_username(name: str) -> str:
"验证用户名"
name = name.strip()
if not FormatValidator.USERNAME_PATTERN.match(name):
raise ValueError("用户名: 3-20字符,字母数字下划线")
return name
@staticmethod
def validate_date(date_str: str) -> datetime:
"验证日期"
if not FormatValidator.DATE_PATTERN.match(date_str):
raise ValueError("日期格式: YYYY-MM-DD")
try:
return datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
raise ValueError("无效日期")
@staticmethod
def validate_url(url: str, allowed_schemes: list = None) -> str:
"验证URL"
from urllib.parse import urlparse
parsed = urlparse(url)
allowed = allowed_schemes or ['http', 'https']
if parsed.scheme not in allowed:
raise ValueError(f"仅允许协议: {allowed}")
if not parsed.hostname:
raise ValueError("缺少主机名")
return url
format_validator = FormatValidator()
safe_email = format_validator.validate_email("test@example.com")
范围验证
Python
from typing import Any
class RangeValidator:
"范围验证器"
@staticmethod
def validate_int_range(value: int, min_val: int, max_val: int) -> int:
"验证整数范围"
if value < min_val or value > max_val:
raise ValueError(f"值必须在 {min_val} 到 {max_val} 之间")
return value
@staticmethod
def validate_str_length(value: str, min_len: int, max_len: int) -> str:
"验证字符串长度"
length = len(value)
if length < min_len or length > max_len:
raise ValueError(f"长度必须在 {min_len} 到 {max_len} 之间")
return value
@staticmethod
def validate_list_length(value: list, min_len: int, max_len: int) -> list:
"验证列表长度"
length = len(value)
if length < min_len:
raise ValueError(f"至少需要 {min_len} 个元素")
if length > max_len:
raise ValueError(f"最多允许 {max_len} 个元素")
return value
@staticmethod
def validate_in_set(value: Any, allowed: set) -> Any:
"验证值在允许集合内"
if value not in allowed:
raise ValueError(f"值必须是: {allowed}")
return value
range_validator = RangeValidator()
safe_age = range_validator.validate_int_range(25, 0, 150)
safe_choice = range_validator.validate_in_set("option1", {"option1", "option2"})
内容清洗
Python
import re
from html.parser import HTMLParser
class ContentSanitizer:
"内容清洗器"
@staticmethod
def remove_html_tags(text: str) -> str:
"移除HTML标签"
class HTMLTagRemover(HTMLParser):
def __init__(self):
super().__init__()
self.result = []
def handle_data(self, data):
self.result.append(data)
remover = HTMLTagRemover()
remover.feed(text)
return ''.join(remover.result)
@staticmethod
def sanitize_html(text: str) -> str:
"转义HTML特殊字符"
replacements = {
'<': '<',
'>': '>',
'&': '&',
'"': '"',
"'": ''',
}
for char, replacement in replacements.items():
text = text.replace(char, replacement)
return text
@staticmethod
def sanitize_sql(text: str) -> str:
"清洗SQL危险字符"
# 注意:这不是替代参数化查询的方法
dangerous_chars = ["'", ";", "--", "/*", "*/"]
result = text
for char in dangerous_chars:
result = result.replace(char, '')
return result
@staticmethod
def sanitize_shell_arg(arg: str) -> str:
"清洗shell参数"
# 只允许安全字符
if not re.match(r'^[\w\-\.\/]+$', arg):
raise ValueError("参数包含危险字符")
return arg
@staticmethod
def sanitize_filename(filename: str) -> str:
"清洗文件名"
# 移除路径遍历
filename = filename.replace('..', '')
filename = filename.replace('/', '')
filename = filename.replace('\\', '')
# 只保留安全字符
filename = re.sub(r'[^\w\-\.]', '', filename)
if not filename:
raise ValueError("无效文件名")
return filename
@staticmethod
def remove_null_bytes(text: str) -> str:
"移除空字节"
return text.replace('\x00', '')
sanitizer = ContentSanitizer()
safe_text = sanitizer.sanitize_html("<script>alert('xss')</script>")
# 输出: <script>alert('xss')</script>
白名单验证
Python
from typing import Any
class WhitelistValidator:
"白名单验证器"
def __init__(self):
self.allowed_fields = {}
self.allowed_values = {}
def add_allowed_field(self, name: str, validators: list):
"添加允许的字段"
self.allowed_fields[name] = validators
def add_allowed_value(self, name: str, values: set):
"添加允许的值"
self.allowed_values[name] = values
def validate_dict(self, data: dict) -> dict:
"验证字典数据"
# 检查未知字段
unknown = set(data.keys()) - set(self.allowed_fields.keys())
if unknown:
raise ValueError(f"未知字段: {unknown}")
# 验证每个字段
result = {}
for name, value in data.items():
validators = self.allowed_fields[name]
for validator in validators:
value = validator(value)
# 白名单值检查
if name in self.allowed_values:
if value not in self.allowed_values[name]:
raise ValueError(f"{name}: 不允许的值")
result[name] = value
return result
# 使用示例
whitelist = WhitelistValidator()
whitelist.add_allowed_field('action', [
lambda v: str(v).strip(),
lambda v: v if v in {'create', 'read', 'update', 'delete'} else None
])
whitelist.add_allowed_field('id', [
lambda v: int(v),
lambda v: v if v > 0 else None
])
safe_data = whitelist.validate_dict({'action': 'create', 'id': '1'})
组合验证器
Python
from typing import Any, Callable, List
class CompositeValidator:
"组合验证器"
def __init__(self):
self.validators: List[Callable] = []
def add(self, validator: Callable) -> 'CompositeValidator':
"添加验证器"
self.validators.append(validator)
return self
def validate(self, value: Any) -> Any:
"执行所有验证"
result = value
for validator in self.validators:
result = validator(result)
return result
# 预定义验证器工厂
class ValidatorFactory:
"验证器工厂"
@staticmethod
def email() -> CompositeValidator:
return CompositeValidator() \
.add(lambda v: str(v).strip()) \
.add(lambda v: v.lower()) \
.add(FormatValidator.validate_email)
@staticmethod
def username() -> CompositeValidator:
return CompositeValidator() \
.add(lambda v: str(v).strip()) \
.add(FormatValidator.validate_username)
@staticmethod
def age() -> CompositeValidator:
return CompositeValidator() \
.add(TypeValidator.validate_int) \
.add(lambda v: RangeValidator.validate_int_range(v, 0, 150))
@staticmethod
def safe_text(max_length: int = 1000) -> CompositeValidator:
return CompositeValidator() \
.add(lambda v: str(v)) \
.add(ContentSanitizer.remove_html_tags) \
.add(ContentSanitizer.sanitize_html) \
.add(lambda v: RangeValidator.validate_str_length(v, 0, max_length))
# 使用
email_validator = ValidatorFactory.email()
username_validator = ValidatorFactory.username()
safe_email = email_validator.validate(" TEST@Example.COM ")
safe_name = username_validator.validate("alice123")
数据验证装饰器
Python
from functools import wraps
def validate_input(**validators):
"输入验证装饰器"
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 验证关键字参数
for param, validator in validators.items():
if param in kwargs:
kwargs[param] = validator(kwargs[param])
return func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
@validate_input(
email=ValidatorFactory.email().validate,
age=ValidatorFactory.age().validate,
username=ValidatorFactory.username().validate
)
def register_user(email, age, username):
return {"email": email, "age": age, "username": username}
result = register_user(
email="user@example.com",
age="25",
username="alice"
)
JSON输入验证
text
import json
class JSONInputValidator:
"JSON输入验证器"
def __init__(self, schema: dict):
self.schema = schema
def validate(self, data: str) -> dict:
"验证JSON字符串"
# 解析JSON
try:
parsed = json.loads(data)
except json.JSONDecodeError:
raise ValueError("无效JSON格式")
# 验证结构
return self._validate_schema(parsed, self.schema)
def _validate_schema(self, data: dict, schema: dict) -> dict:
"验证数据结构"
result = {}
for field, rules in schema.items():
if field not in data:
if rules.get('required', False):
raise ValueError(f"缺少必需字段: {field}")
result[field] = rules.get('default')
continue
value = data[field]
type_ = rules.get('type')
# 类型验证
if type_ == 'int':
value = TypeValidator.validate_int(value)
elif type_ == 'str':
value = TypeValidator.validate_str(value)
elif type_ == 'email':
value = FormatValidator.validate_email(value)
# 范围验证
if 'min' in rules and 'max' in rules:
value = RangeValidator.validate_int_range(
value, rules['min'], rules['max']
)
result[field] = value
return result
# Schema示例
user_schema = {
'name': {'type': 'str', 'required': True},
'email': {'type': 'email', 'required': True},
'age': {'type': 'int', 'min': 0, 'max': 150, 'required': False},
}
validator = JSONInputValidator(user_schema)
safe_data = validator.validate('{"name": "Alice", "email": "alice@example.com"}')
要点总结
- 分层验证:类型→格式→范围→内容→业务
- 白名单优先:只允许已知安全的输入
- 内容清洗:移除/转义危险字符
- 组合验证器:灵活组合验证逻辑
- 验证与清洗分离:验证拒绝无效,清洗处理有效
📝 发现内容有误?点击此处直接编辑