import asyncio from datetime import datetime, timedelta from typing import Dict from playwright.async_api import async_playwright class TokenManager: def __init__(self): self.tokens: Dict[str, str] = {} self.token_expires_at: datetime = datetime.now() self.lock = asyncio.Lock() async def get_token(self) -> Dict[str, str]: """获取有效的token""" async with self.lock: if datetime.now() >= self.token_expires_at or not self.tokens: await self._refresh_token() return self.tokens async def _refresh_token(self): """刷新token""" print("正在刷新token...") async with async_playwright() as p: browser = await p.chromium.launch(headless=True) page = await browser.new_page() # 拦截请求获取token tokens = {} async def handle_request(request): headers = request.headers if 'interface-code' in headers: tokens['interface_code'] = headers['interface-code'] tokens['identity_id'] = headers['identity-id'] page.on('request', handle_request) # 访问华为应用市场 await page.goto('https://appgallery.huawei.com/', wait_until='networkidle') await page.wait_for_timeout(3000) await browser.close() if tokens: self.tokens = tokens # token有效期设为10分钟 self.token_expires_at = datetime.now() + timedelta(minutes=10) print(f"Token刷新成功,有效期至: {self.token_expires_at}") else: raise Exception("无法获取token")