本文目录导读:
我来提供一个Python接口封装的实用案例,以常见的天气API为例:
基础版:简单函数封装
import requests
import json
def get_weather(city):
"""获取天气信息"""
url = f"https://api.openweathermap.org/data/2.5/weather"
params = {
'q': city,
'appid': 'YOUR_API_KEY', # 替换为你的API密钥
'units': 'metric',
'lang': 'zh_cn'
}
try:
response = requests.get(url, params=params)
response.raise_for_status() # 检查请求是否成功
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
# 使用示例
weather_data = get_weather("Beijing")
if weather_data:
print(f"温度: {weather_data['main']['temp']}°C")
print(f"天气: {weather_data['weather'][0]['description']}")
进阶版:类封装
import requests
import time
from typing import Dict, Optional
class WeatherAPI:
"""天气API封装类"""
def __init__(self, api_key: str, base_url: str = "https://api.openweathermap.org/data/2.5"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.last_request_time = 0
def _make_request(self, endpoint: str, params: Dict) -> Optional[Dict]:
"""统一的请求方法"""
params['appid'] = self.api_key
params['units'] = 'metric'
params['lang'] = 'zh_cn'
# 限速控制,避免请求过快
current_time = time.time()
if current_time - self.last_request_time < 1:
time.sleep(1 - (current_time - self.last_request_time))
try:
url = f"{self.base_url}/{endpoint}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
self.last_request_time = time.time()
return response.json()
except requests.exceptions.RequestException as e:
print(f"API请求失败: {e}")
return None
def get_current_weather(self, city: str) -> Optional[Dict]:
"""获取当前天气"""
params = {'q': city}
return self._make_request('weather', params)
def get_forecast(self, city: str, days: int = 5) -> Optional[Dict]:
"""获取天气预报"""
params = {'q': city, 'cnt': days}
return self._make_request('forecast', params)
# 使用示例
api = WeatherAPI(api_key='YOUR_API_KEY')
# 获取当前天气
current = api.get_current_weather("Beijing")
if current:
print(f"当前温度: {current['main']['temp']}°C")
print(f"湿度: {current['main']['humidity']}%")
# 获取天气预报
forecast = api.get_forecast("Beijing")
if forecast:
for item in forecast['list'][:3]:
print(f"时间: {item['dt_txt']}, 温度: {item['main']['temp']}°C")
实际案例:REST API封装模式
import requests
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class HTTPMethod(Enum):
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
@dataclass
class APIResponse:
"""API响应数据类"""
status_code: int
data: Optional[Dict]
error: Optional[str] = None
class BaseAPIClient:
"""基础API客户端"""
def __init__(self, base_url: str, auth_token: Optional[str] = None):
self.base_url = base_url
self.session = requests.Session()
self.auth_token = auth_token
if auth_token:
self.session.headers.update({
'Authorization': f'Bearer {auth_token}',
'Content-Type': 'application/json'
})
def request(
self,
method: HTTPMethod,
endpoint: str,
params: Optional[Dict] = None,
data: Optional[Dict] = None
) -> APIResponse:
"""统一请求方法"""
url = f"{self.base_url}/{endpoint}"
try:
response = self.session.request(
method=method.value,
url=url,
params=params,
json=data,
timeout=30
)
response.raise_for_status()
logger.info(f"请求成功: {method.value} {url}")
try:
response_data = response.json()
except:
response_data = {"content": response.text}
return APIResponse(
status_code=response.status_code,
data=response_data
)
except requests.exceptions.RequestException as e:
logger.error(f"请求失败: {e}")
return APIResponse(
status_code=getattr(e.response, 'status_code', 500),
data=None,
error=str(e)
)
class WeatherClient(BaseAPIClient):
"""天气客户端"""
def get_weather_by_city(self, city: str) -> APIResponse:
"""根据城市名获取天气"""
return self.request(
HTTPMethod.GET,
'weather',
params={
'q': city,
'units': 'metric',
'lang': 'zh_cn'
}
)
def get_weather_by_coordinates(self, lat: float, lon: float) -> APIResponse:
"""根据坐标获取天气"""
return self.request(
HTTPMethod.GET,
'weather',
params={
'lat': lat,
'lon': lon,
'units': 'metric'
}
)
# 使用示例
def main():
# 创建客户端
client = WeatherClient(
base_url='https://api.openweathermap.org/data/2.5',
auth_token='YOUR_API_KEY'
)
# 获取天气
response = client.get_weather_by_city('Beijing')
if response.data:
weather = response.data
print(f"✅ 天气数据获取成功")
print(f"城市: {weather.get('name', '未知')}")
print(f"温度: {weather['main']['temp']}°C")
print(f"湿度: {weather['main']['humidity']}%")
print(f"天气: {weather['weather'][0]['description']}")
else:
print(f"❌ 获取天气失败: {response.error}")
if __name__ == "__main__":
main()
异步版本(使用aiohttp)
import aiohttp
import asyncio
from typing import Dict, Optional
class AsyncWeatherAPI:
"""异步天气API封装"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.openweathermap.org/data/2.5"
async def get_weather(self, city: str) -> Optional[Dict]:
"""异步获取天气"""
params = {
'q': city,
'appid': self.api_key,
'units': 'metric',
'lang': 'zh_cn'
}
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/weather", params=params) as response:
if response.status == 200:
return await response.json()
return None
# 异步使用示例
async def main():
api = AsyncWeatherAPI(api_key='YOUR_API_KEY')
# 同时获取多个城市的天气
cities = ['Beijing', 'Shanghai', 'Guangzhou']
tasks = [api.get_weather(city) for city in cities]
results = await asyncio.gather(*tasks)
for city, weather in zip(cities, results):
if weather:
print(f"{city}: {weather['main']['temp']}°C")
# 运行异步代码
# asyncio.run(main())
- 错误处理:使用try-except捕获网络错误
- 统一接口:提供一致的调用方式
- 配置管理:API密钥、基础URL等集中管理
- 重试机制:失败时可自动重试
- 日志记录:方便调试和监控
- 限速控制:避免请求频率过高
选择合适的封装方式取决于你的具体需求:简单项目用函数封装,复杂项目用类或异步实现。
标签: 接口封装