Python SSL/TLS配置
SSL/TLS是网络通信安全的基础。正确配置证书验证是防止中间人攻击的关键。
基本HTTPS请求
Python
import ssl
import urllib.request
# 默认验证证书
url = 'https://api.example.com/data'
response = urllib.request.urlopen(url)
data = response.read()
# 默认使用系统证书库,验证服务器证书
Python
import requests
# requests 默认验证证书
response = requests.get('https://api.example.com')
print(response.status_code)
# 禁用验证(仅测试使用,生产环境禁止)
# response = requests.get('https://api.example.com', verify=False) # 危险!
证书验证配置
Python
import ssl
# 创建SSL上下文
context = ssl.create_default_context()
# 指定证书文件
context.load_verify_locations('/path/to/ca-bundle.crt')
# 或使用系统默认
context = ssl.create_default_context()
# 验证模式
context.verify_mode = ssl.CERT_REQUIRED # 必须验证
# 检查主机名
context.check_hostname = True
# 使用上下文
import urllib.request
response = urllib.request.urlopen(url, context=context)
Python
import requests
# 指定CA证书路径
response = requests.get(
'https://api.example.com',
verify='/path/to/ca-bundle.crt'
)
# 使用自定义证书
session = requests.Session()
session.verify = '/path/to/company/ca.crt'
response = session.get('https://internal-api.company.com')
客户端证书
Python
import ssl
# 使用客户端证书
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
context.load_cert_chain(
certfile='/path/to/client.crt',
keyfile='/path/to/client.key',
password='key_password' # 如果密钥有密码
)
# 连接
import socket
with socket.create_connection(('server.example.com', 443)) as sock:
with context.wrap_socket(sock, server_hostname='server.example.com') as ssock:
print(ssock.version()) # TLS版本
ssock.send(b'GET / HTTP/1.0\r\n\r\n')
Python
import requests
# requests 使用客户端证书
response = requests.get(
'https://api.example.com',
cert=('client.crt', 'client.key')
)
# 密钥有密码时使用 Session
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
class ClientCertAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context()
context.load_cert_chain('client.crt', 'client.key', password='secret')
kwargs['ssl_context'] = context
return super().init_poolmanager(*args, **kwargs)
session = requests.Session()
session.mount('https://', ClientCertAdapter())
自签名证书处理
Python
import ssl
# 安全方式:添加自签名CA到信任库
context = ssl.create_default_context()
context.load_verify_locations('/path/to/self-signed-ca.crt')
# 生产环境不要禁用验证!
# 测试环境临时方案(仅用于开发)
context = ssl._create_unverified_context() # 不推荐
Python
# requests 处理自签名证书
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# 正确方式:信任自签名CA
session = requests.Session()
session.verify = '/path/to/company-ca.crt'
# 测试环境(不推荐)
# requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# session.verify = False
TLS版本控制
Python
import ssl
# 限制TLS版本(禁止旧版本)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.minimum_version = ssl.TLSVersion.TLSv1_2 # 最低TLS 1.2
context.maximum_version = ssl.TLSVersion.TLSv1_3 # 最高TLS 1.3
# 或使用选项设置
context.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
# 加载默认CA
context.load_default_certs()
Python
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
class TLSAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
ctx = create_urllib3_context()
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
kwargs['ssl_context'] = ctx
return super().init_poolmanager(*args, **kwargs)
session = requests.Session()
session.mount('https://', TLSAdapter())
证书信息检查
Python
import ssl
import socket
def get_cert_info(hostname: str, port: int = 443):
"获取服务器证书信息"
context = ssl.create_default_context()
with socket.create_connection((hostname, port)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
return {
'subject': dict(x[0] for x in cert.get('subject', [])),
'issuer': dict(x[0] for x in cert.get('issuer', [])),
'version': cert.get('version'),
'serial_number': cert.get('serialNumber'),
'not_before': cert.get('notBefore'),
'not_after': cert.get('notAfter'),
'san': cert.get('subjectAltName', []),
}
info = get_cert_info('example.com')
print(f"域名: {info['subject'].get('commonName')}")
print(f"有效期至: {info['not_after']}")
print(f"颁发者: {info['issuer'].get('organizationName')}")
Python
import ssl
from datetime import datetime
def check_cert_expiry(hostname: str) -> dict:
"检查证书过期"
context = ssl.create_default_context()
with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
# 解析过期时间
expiry_str = cert['notAfter']
expiry = datetime.strptime(expiry_str, '%b %d %H:%M:%S %Y %Z')
remaining = expiry - datetime.now()
return {
'expiry_date': expiry,
'days_remaining': remaining.days,
'is_valid': remaining.days > 0
}
status = check_cert_expiry('example.com')
print(f"剩余 {status['days_remaining']} 天")
安全WebSocket
Python
import ssl
import websocket
# WebSocket SSL配置
url = 'wss://api.example.com/ws'
# 默认验证
ws = websocket.create_connection(url)
# 自定义SSL上下文
context = ssl.create_default_context()
context.verify_mode = ssl.CERT_REQUIRED
ws = websocket.create_connection(
url,
sslopt={'cert_reqs': ssl.CERT_REQUIRED, 'ssl_version': ssl.PROTOCOL_TLS}
)
# 客户端证书
ws = websocket.create_connection(
url,
sslopt={
'certfile': 'client.crt',
'keyfile': 'client.key'
}
)
安全HTTP服务器
Python
import ssl
from http.server import HTTPServer, SimpleHTTPRequestHandler
# HTTPS服务器配置
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain('server.crt', 'server.key')
# 设置安全选项
context.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
context.minimum_version = ssl.TLSVersion.TLSv1_2
server = HTTPServer(('localhost', 8443), SimpleHTTPRequestHandler)
server.socket = context.wrap_socket(server.socket, server_side=True)
print("HTTPS服务器运行于 https://localhost:8443")
server.serve_forever()
证书固定(Certificate Pinning)
Python
import ssl
import hashlib
class PinningSSLContext:
"证书固定"
EXPECTED_HASHES = {
'example.com': 'sha256:ABC123...', # 预期证书哈希
}
def create_context(self, hostname: str):
"创建带固定的上下文"
context = ssl.create_default_context()
# 验证回调
def verify_cert(conn, cert, errno, depth, ok):
if depth == 0: # 服务器证书
cert_hash = hashlib.sha256(cert).hexdigest()
expected = self.EXPECTED_HASHES.get(hostname)
if expected and cert_hash != expected.split(':')[1]:
return False # 拒绝
return ok
# 注意:Python ssl模块不直接支持回调,需使用第三方库
return context
# 生产级实现建议使用 certifi + requests
要点总结
- 默认验证证书,禁用验证仅用于测试
- 自定义CA证书通过
verify参数指定 - 客户端证书使用
cert参数传递 - 限制TLS版本最低TLS 1.2
- 定期检查证书过期,提前更换
📝 发现内容有误?点击此处直接编辑