Skip to content

Python 微服务开发指南

目录

服务发现

Consul

python
import consul
from flask import Flask

app = Flask(__name__)
c = consul.Consul()

# 注册服务
def register_service():
    c.agent.service.register(
        name='user-service',
        service_id='user-service-1',
        address='localhost',
        port=5000,
        tags=['user', 'api']
    )

# 发现服务
def discover_service(service_name):
    index, services = c.health.service(service_name, passing=True)
    return services

@app.route('/users')
def get_users():
    # 发现用户服务
    services = discover_service('user-service')
    if services:
        service = services[0]
        # 调用用户服务
        return requests.get(f"http://{service['Service']['Address']}:{service['Service']['Port']}/users")
    return "Service not found", 404

etcd

python
import etcd3
from flask import Flask

app = Flask(__name__)
etcd = etcd3.client()

# 注册服务
def register_service():
    etcd.put('/services/user-service', 'localhost:5000'.encode())

# 发现服务
def discover_service(service_name):
    value, _ = etcd.get(f'/services/{service_name}')
    return value.decode() if value else None

@app.route('/users')
def get_users():
    # 发现用户服务
    service_address = discover_service('user-service')
    if service_address:
        # 调用用户服务
        return requests.get(f"http://{service_address}/users")
    return "Service not found", 404

配置中心

Apollo

python
from apollo.apollo_client import ApolloClient
from flask import Flask

app = Flask(__name__)
client = ApolloClient(
    app_id='user-service',
    cluster='default',
    config_server_url='http://localhost:8080'
)

# 获取配置
def get_config(key):
    return client.get_value(key)

@app.route('/users')
def get_users():
    # 获取数据库配置
    db_host = get_config('db.host')
    db_port = get_config('db.port')
    # 使用配置连接数据库
    return "Users", 200

Nacos

python
from nacos import NacosClient
from flask import Flask

app = Flask(__name__)
client = NacosClient(
    server_addresses="localhost:8848",
    namespace="public"
)

# 获取配置
def get_config(data_id, group):
    return client.get_config(data_id, group)

@app.route('/users')
def get_users():
    # 获取数据库配置
    db_config = get_config('user-service', 'DEFAULT_GROUP')
    # 使用配置连接数据库
    return "Users", 200

服务治理

熔断器

python
from pybreaker import CircuitBreaker
from flask import Flask

app = Flask(__name__)
breaker = CircuitBreaker(
    fail_max=5,
    reset_timeout=60
)

@breaker
def call_user_service():
    response = requests.get('http://user-service/users')
    response.raise_for_status()
    return response.json()

@app.route('/users')
def get_users():
    try:
        return call_user_service()
    except Exception as e:
        return "Service unavailable", 503

限流器

python
from ratelimit import limits, sleep_and_retry
from flask import Flask

app = Flask(__name__)

ONE_MINUTE = 60
MAX_REQUESTS_PER_MINUTE = 100

@sleep_and_retry
@limits(calls=MAX_REQUESTS_PER_MINUTE, period=ONE_MINUTE)
def call_user_service():
    response = requests.get('http://user-service/users')
    response.raise_for_status()
    return response.json()

@app.route('/users')
def get_users():
    try:
        return call_user_service()
    except Exception as e:
        return "Too many requests", 429

API 网关

Kong

python
from flask import Flask, request
import requests

app = Flask(__name__)

@app.route('/users', methods=['GET'])
def get_users():
    # 转发请求到用户服务
    response = requests.get(
        'http://user-service/users',
        headers=request.headers
    )
    return response.json(), response.status_code

@app.route('/orders', methods=['GET'])
def get_orders():
    # 转发请求到订单服务
    response = requests.get(
        'http://order-service/orders',
        headers=request.headers
    )
    return response.json(), response.status_code

消息队列

RabbitMQ

python
import pika
from flask import Flask

app = Flask(__name__)

# 连接 RabbitMQ
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='user_queue')

# 发布消息
def publish_message(message):
    channel.basic_publish(
        exchange='',
        routing_key='user_queue',
        body=message
    )

# 消费消息
def consume_message():
    def callback(ch, method, properties, body):
        print(f"Received message: {body}")
    
    channel.basic_consume(
        queue='user_queue',
        on_message_callback=callback,
        auto_ack=True
    )
    channel.start_consuming()

Kafka

python
from kafka import KafkaProducer, KafkaConsumer
from flask import Flask

app = Flask(__name__)

# 创建生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

# 创建消费者
consumer = KafkaConsumer(
    'user-topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# 发布消息
def publish_message(message):
    producer.send('user-topic', message)

# 消费消息
def consume_message():
    for message in consumer:
        print(f"Received message: {message.value}")

分布式追踪

Jaeger

python
from jaeger_client import Config
from flask import Flask
import opentracing

app = Flask(__name__)

# 初始化 Jaeger
def init_tracer():
    config = Config(
        config={
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'logging': True,
        },
        service_name='user-service',
    )
    return config.initialize_tracer()

tracer = init_tracer()

@app.route('/users')
def get_users():
    with tracer.start_span('get_users') as span:
        span.set_tag('http.method', 'GET')
        span.set_tag('http.url', '/users')
        # 处理请求
        return "Users", 200

容器化部署

Docker

dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "app.py"]

Kubernetes

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
    spec:
      containers:
      - name: user-service
        image: user-service:latest
        ports:
        - containerPort: 5000
        env:
        - name: CONSUL_HOST
          value: consul-server
        - name: CONSUL_PORT
          value: "8500"
---
apiVersion: v1
kind: Service
metadata:
  name: user-service
spec:
  selector:
    app: user-service
  ports:
  - port: 80
    targetPort: 5000
  type: ClusterIP

监控告警

Prometheus

python
from prometheus_client import Counter, Histogram
from flask import Flask

app = Flask(__name__)

# 定义指标
http_requests_total = Counter(
    'http_requests_total',
    'Total number of HTTP requests',
    ['method', 'endpoint']
)

http_request_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration in seconds',
    ['method', 'endpoint']
)

@app.route('/users')
def get_users():
    http_requests_total.labels(method='GET', endpoint='/users').inc()
    with http_request_duration.labels(method='GET', endpoint='/users').time():
        # 处理请求
        return "Users", 200

Grafana

python
from grafana_api.grafana_face import GrafanaFace
from flask import Flask

app = Flask(__name__)

# 创建 Grafana 客户端
grafana = GrafanaFace(
    auth=('admin', 'admin'),
    host='localhost:3000'
)

# 创建仪表板
def create_dashboard():
    dashboard = {
        'dashboard': {
            'title': 'User Service Dashboard',
            'panels': [
                {
                    'title': 'HTTP Requests',
                    'type': 'graph',
                    'datasource': 'Prometheus',
                    'targets': [
                        {
                            'expr': 'rate(http_requests_total[5m])'
                        }
                    ]
                }
            ]
        },
        'overwrite': True
    }
    grafana.dashboard.update_dashboard(dashboard)

启航团队技术文档