Python 微服务开发指南
目录
服务发现
Consul
python
import consul
from flask import Flask
app = Flask(__name__)
c = consul.Consul()
# 注册服务
def register_service():
c.agent.service.register(
name='user-service',
service_id='user-service-1',
address='localhost',
port=5000,
tags=['user', 'api']
)
# 发现服务
def discover_service(service_name):
index, services = c.health.service(service_name, passing=True)
return services
@app.route('/users')
def get_users():
# 发现用户服务
services = discover_service('user-service')
if services:
service = services[0]
# 调用用户服务
return requests.get(f"http://{service['Service']['Address']}:{service['Service']['Port']}/users")
return "Service not found", 404etcd
python
import etcd3
from flask import Flask
app = Flask(__name__)
etcd = etcd3.client()
# 注册服务
def register_service():
etcd.put('/services/user-service', 'localhost:5000'.encode())
# 发现服务
def discover_service(service_name):
value, _ = etcd.get(f'/services/{service_name}')
return value.decode() if value else None
@app.route('/users')
def get_users():
# 发现用户服务
service_address = discover_service('user-service')
if service_address:
# 调用用户服务
return requests.get(f"http://{service_address}/users")
return "Service not found", 404配置中心
Apollo
python
from apollo.apollo_client import ApolloClient
from flask import Flask
app = Flask(__name__)
client = ApolloClient(
app_id='user-service',
cluster='default',
config_server_url='http://localhost:8080'
)
# 获取配置
def get_config(key):
return client.get_value(key)
@app.route('/users')
def get_users():
# 获取数据库配置
db_host = get_config('db.host')
db_port = get_config('db.port')
# 使用配置连接数据库
return "Users", 200Nacos
python
from nacos import NacosClient
from flask import Flask
app = Flask(__name__)
client = NacosClient(
server_addresses="localhost:8848",
namespace="public"
)
# 获取配置
def get_config(data_id, group):
return client.get_config(data_id, group)
@app.route('/users')
def get_users():
# 获取数据库配置
db_config = get_config('user-service', 'DEFAULT_GROUP')
# 使用配置连接数据库
return "Users", 200服务治理
熔断器
python
from pybreaker import CircuitBreaker
from flask import Flask
app = Flask(__name__)
breaker = CircuitBreaker(
fail_max=5,
reset_timeout=60
)
@breaker
def call_user_service():
response = requests.get('http://user-service/users')
response.raise_for_status()
return response.json()
@app.route('/users')
def get_users():
try:
return call_user_service()
except Exception as e:
return "Service unavailable", 503限流器
python
from ratelimit import limits, sleep_and_retry
from flask import Flask
app = Flask(__name__)
ONE_MINUTE = 60
MAX_REQUESTS_PER_MINUTE = 100
@sleep_and_retry
@limits(calls=MAX_REQUESTS_PER_MINUTE, period=ONE_MINUTE)
def call_user_service():
response = requests.get('http://user-service/users')
response.raise_for_status()
return response.json()
@app.route('/users')
def get_users():
try:
return call_user_service()
except Exception as e:
return "Too many requests", 429API 网关
Kong
python
from flask import Flask, request
import requests
app = Flask(__name__)
@app.route('/users', methods=['GET'])
def get_users():
# 转发请求到用户服务
response = requests.get(
'http://user-service/users',
headers=request.headers
)
return response.json(), response.status_code
@app.route('/orders', methods=['GET'])
def get_orders():
# 转发请求到订单服务
response = requests.get(
'http://order-service/orders',
headers=request.headers
)
return response.json(), response.status_code消息队列
RabbitMQ
python
import pika
from flask import Flask
app = Flask(__name__)
# 连接 RabbitMQ
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='user_queue')
# 发布消息
def publish_message(message):
channel.basic_publish(
exchange='',
routing_key='user_queue',
body=message
)
# 消费消息
def consume_message():
def callback(ch, method, properties, body):
print(f"Received message: {body}")
channel.basic_consume(
queue='user_queue',
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()Kafka
python
from kafka import KafkaProducer, KafkaConsumer
from flask import Flask
app = Flask(__name__)
# 创建生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# 创建消费者
consumer = KafkaConsumer(
'user-topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# 发布消息
def publish_message(message):
producer.send('user-topic', message)
# 消费消息
def consume_message():
for message in consumer:
print(f"Received message: {message.value}")分布式追踪
Jaeger
python
from jaeger_client import Config
from flask import Flask
import opentracing
app = Flask(__name__)
# 初始化 Jaeger
def init_tracer():
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
},
service_name='user-service',
)
return config.initialize_tracer()
tracer = init_tracer()
@app.route('/users')
def get_users():
with tracer.start_span('get_users') as span:
span.set_tag('http.method', 'GET')
span.set_tag('http.url', '/users')
# 处理请求
return "Users", 200容器化部署
Docker
dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "app.py"]Kubernetes
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: user-service:latest
ports:
- containerPort: 5000
env:
- name: CONSUL_HOST
value: consul-server
- name: CONSUL_PORT
value: "8500"
---
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- port: 80
targetPort: 5000
type: ClusterIP监控告警
Prometheus
python
from prometheus_client import Counter, Histogram
from flask import Flask
app = Flask(__name__)
# 定义指标
http_requests_total = Counter(
'http_requests_total',
'Total number of HTTP requests',
['method', 'endpoint']
)
http_request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration in seconds',
['method', 'endpoint']
)
@app.route('/users')
def get_users():
http_requests_total.labels(method='GET', endpoint='/users').inc()
with http_request_duration.labels(method='GET', endpoint='/users').time():
# 处理请求
return "Users", 200Grafana
python
from grafana_api.grafana_face import GrafanaFace
from flask import Flask
app = Flask(__name__)
# 创建 Grafana 客户端
grafana = GrafanaFace(
auth=('admin', 'admin'),
host='localhost:3000'
)
# 创建仪表板
def create_dashboard():
dashboard = {
'dashboard': {
'title': 'User Service Dashboard',
'panels': [
{
'title': 'HTTP Requests',
'type': 'graph',
'datasource': 'Prometheus',
'targets': [
{
'expr': 'rate(http_requests_total[5m])'
}
]
}
]
},
'overwrite': True
}
grafana.dashboard.update_dashboard(dashboard)