# 华为应用市场爬虫系统开发文档 > 基于原 Rust 项目的 Python + MySQL + Vue3 重构指南 ## 📋 目录 - [1. 项目概述](#1-项目概述) - [2. 系统架构](#2-系统架构) - [3. 数据源分析](#3-数据源分析) - [4. 数据库设计](#4-数据库设计) - [5. 后端开发](#5-后端开发) - [6. 前端开发](#6-前端开发) - [7. 部署指南](#7-部署指南) --- ## 1. 项目概述 ### 1.1 项目目标 开发一个华为应用市场(AppGallery)数据采集与可视化系统,实现: - 自动爬取华为应用市场的应用信息 - 存储应用的基本信息、版本历史、下载量、评分等数据 - 提供 Web 界面展示数据统计、排行榜、趋势分析 - 支持用户搜索、筛选、投稿应用 ### 1.2 技术栈选型 **后端:** - Python 3.10+ - FastAPI (Web 框架) - SQLAlchemy (ORM) - MySQL 8.0+ - APScheduler (定时任务) - httpx / aiohttp (异步 HTTP 客户端) **前端:** - Vue 3 + TypeScript - Vite (构建工具) - Element Plus / Ant Design Vue (UI 组件库) - ECharts / Chart.js (图表库) - Axios (HTTP 客户端) - Pinia (状态管理) **部署:** - Docker + Docker Compose - Nginx (反向代理) - Gunicorn / Uvicorn (ASGI 服务器) --- ## 2. 系统架构 ### 2.1 整体架构图 ``` ┌─────────────────────────────────────────────────────────────┐ │ 用户浏览器 │ └────────────────────────┬────────────────────────────────────┘ │ HTTP/HTTPS ▼ ┌─────────────────────────────────────────────────────────────┐ │ Nginx (反向代理) │ └──────────┬──────────────────────────────────┬───────────────┘ │ │ │ /api/* │ /* ▼ ▼ ┌──────────────────────┐ ┌──────────────────────────┐ │ FastAPI 后端服务 │ │ Vue3 前端静态资源 │ │ - REST API │ │ - SPA 应用 │ │ - 数据查询 │ │ - 数据可视化 │ │ - 爬虫调度 │ └──────────────────────────┘ └──────────┬───────────┘ │ ▼ ┌──────────────────────┐ ┌──────────────────────────┐ │ MySQL 数据库 │◄─────────│ 爬虫调度器 │ │ - 应用信息 │ │ - APScheduler │ │ - 历史数据 │ │ - 定时同步 │ │ - 统计数据 │ │ - 批量处理 │ └──────────────────────┘ └──────────┬───────────────┘ │ ▼ ┌──────────────────────────┐ │ 华为应用市场 API │ │ - 应用信息接口 │ │ - 评分详情接口 │ └──────────────────────────┘ ``` ### 2.2 核心模块 1. **爬虫模块** - 负责从华为 API 获取数据 2. **数据处理模块** - 数据清洗、去重、入库 3. **API 服务模块** - 提供 RESTful API 4. **调度模块** - 定时任务和批量处理 5. **前端展示模块** - 数据可视化和交互 --- ## 3. 数据源分析 ### 3.1 华为应用市场 API **基础信息:** - API Base URL: `https://web-drcn.hispace.dbankcloud.com/edge` - 需要动态获取认证 Token(interface-code 和 identity-id) - Token 有效期约 10 分钟,需定期刷新 ### 3.2 主要接口 #### 3.2.1 获取应用基本信息 **接口地址:** `POST /webedge/appinfo` **请求头:** ```http Content-Type: application/json User-Agent: HuaweiMarketCrawler/1.0 interface-code: {动态获取的token} identity-id: {动态获取的token} ``` **请求体(按包名查询):** ```json { "pkgName": "com.huawei.hmsapp.appgallery", "locale": "zh_CN" } ``` **请求体(按应用ID查询):** ```json { "appId": "C1164531384803416384", "locale": "zh_CN" } ``` **响应示例:** ```json { "appId": "C1164531384803416384", "name": "应用市场", "pkgName": "com.huawei.hmsapp.appgallery", "devId": "260086000000068459", "developerName": "华为软件技术有限公司", "devEnName": "Huawei Software Technologies Co., Ltd.", "kindName": "工具", "version": "6.3.2.302", "size": 76591487, "downCount": "14443706", "rateNum": "125000", "hot": "4.5", "icon": "https://...", "briefDes": "应用市场,点亮精彩生活", "description": "...", "releaseDate": 1234567890000, "targetSdk": "12", "minsdk": "9", ... } ``` #### 3.2.2 获取应用评分详情 **接口地址:** `POST /harmony/page-detail` **请求体:** ```json { "pageId": "webAgAppDetail|C1164531384803416384", "pageNum": 1, "pageSize": 100, "zone": "" } ``` **响应示例:** ```json { "pages": [{ "data": { "cardlist": { "layoutData": [{ "type": "fl.card.comment", "data": [{ "starInfo": "{\"averageRating\":\"4.5\",\"oneStarRatingCount\":100,\"twoStarRatingCount\":200,...}" }] }] } } }] } ``` ### 3.3 Token 获取策略 Token 需要从华为网页端动态获取,建议实现方式: 1. **方案一:** 使用 Selenium/Playwright 模拟浏览器访问获取 2. **方案二:** 逆向分析 JS 代码,实现 Token 生成算法 3. **方案三:** 定期手动更新 Token(不推荐) **参考实现(伪代码):** ```python import httpx from playwright.async_api import async_playwright async def get_huawei_token(): async with async_playwright() as p: browser = await p.chromium.launch() page = await browser.new_page() # 拦截网络请求获取 token tokens = {} async def handle_request(request): if 'interface-code' in request.headers: tokens['interface_code'] = request.headers['interface-code'] tokens['identity_id'] = request.headers['identity-id'] page.on('request', handle_request) await page.goto('https://appgallery.huawei.com/') await page.wait_for_timeout(3000) await browser.close() return tokens ``` ### 3.4 数据字段说明 **核心字段:** - `appId` - 应用唯一标识(长度>15为鸿蒙应用) - `pkgName` - 包名(唯一) - `name` - 应用名称 - `developerName` - 开发者名称 - `downCount` - 下载量(字符串格式,如 "1000000+") - `rateNum` - 评分人数 - `hot` - 热度评分 - `version` - 版本号 - `size` - 应用大小(字节) - `releaseDate` - 发布时间(毫秒时间戳) - `targetSdk` / `minsdk` - SDK 版本 **注意事项:** 1. 部分字段可能为空,需要设置默认值 2. 下载量可能包含 "+" 号,需要清洗 3. 某些应用(元服务)包名以 `com.atomicservice` 开头,无评分数据 4. JSON 中可能包含 `\0` 字符,需要清理 --- ## 4. 数据库设计 ### 4.1 MySQL 表结构 #### 4.1.1 应用基本信息表 (app_info) ```sql CREATE TABLE `app_info` ( `app_id` VARCHAR(50) PRIMARY KEY COMMENT '应用唯一ID', `alliance_app_id` VARCHAR(50) COMMENT '联盟应用ID', `name` VARCHAR(255) NOT NULL COMMENT '应用名称', `pkg_name` VARCHAR(255) NOT NULL UNIQUE COMMENT '应用包名', `dev_id` VARCHAR(50) NOT NULL COMMENT '开发者ID', `developer_name` VARCHAR(255) NOT NULL COMMENT '开发者名称', `dev_en_name` VARCHAR(255) COMMENT '开发者英文名称', `supplier` VARCHAR(255) COMMENT '供应商名称', `kind_id` INT NOT NULL COMMENT '应用分类ID', `kind_name` VARCHAR(100) NOT NULL COMMENT '应用分类名称', `tag_name` VARCHAR(255) COMMENT '标签名称', `kind_type_id` INT NOT NULL COMMENT '类型ID', `kind_type_name` VARCHAR(100) NOT NULL COMMENT '类型名称', `icon_url` TEXT NOT NULL COMMENT '应用图标URL', `brief_desc` TEXT NOT NULL COMMENT '简短描述', `description` LONGTEXT NOT NULL COMMENT '应用详细描述', `privacy_url` TEXT NOT NULL COMMENT '隐私政策链接', `ctype` INT NOT NULL COMMENT '客户端类型', `detail_id` VARCHAR(100) NOT NULL COMMENT '详情页ID', `app_level` INT NOT NULL COMMENT '应用等级', `jocat_id` INT NOT NULL COMMENT '分类ID', `iap` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否含应用内购买', `hms` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否依赖HMS', `tariff_type` VARCHAR(50) NOT NULL COMMENT '资费类型', `packing_type` INT NOT NULL COMMENT '打包类型', `order_app` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否预装应用', `denpend_gms` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否依赖GMS', `denpend_hms` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否依赖HMS', `force_update` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否强制更新', `img_tag` VARCHAR(50) NOT NULL COMMENT '图片标签', `is_pay` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否付费', `is_disciplined` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否合规', `is_shelves` TINYINT(1) NOT NULL DEFAULT 1 COMMENT '是否上架', `submit_type` INT NOT NULL DEFAULT 0 COMMENT '提交类型', `delete_archive` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否删除归档', `charging` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否收费', `button_grey` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '按钮是否置灰', `app_gift` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否有礼包', `free_days` INT NOT NULL DEFAULT 0 COMMENT '免费天数', `pay_install_type` INT NOT NULL DEFAULT 0 COMMENT '付费安装类型', `comment` JSON COMMENT '评论或注释数据', `listed_at` DATETIME NOT NULL COMMENT '应用上架时间', `release_countries` JSON COMMENT '应用发布的国家/地区列表', `main_device_codes` JSON COMMENT '应用支持的主要设备类型', `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `updated_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', INDEX `idx_pkg_name` (`pkg_name`), INDEX `idx_developer_name` (`developer_name`), INDEX `idx_kind_name` (`kind_name`), INDEX `idx_created_at` (`created_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='应用基本信息表'; ``` #### 4.1.2 应用指标表 (app_metrics) ```sql CREATE TABLE `app_metrics` ( `id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID', `app_id` VARCHAR(50) NOT NULL COMMENT '应用ID', `pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名', `version` VARCHAR(50) NOT NULL COMMENT '版本号', `version_code` BIGINT NOT NULL COMMENT '版本代码', `size_bytes` BIGINT NOT NULL COMMENT '应用大小(字节)', `sha256` VARCHAR(64) NOT NULL COMMENT '安装包SHA256校验值', `info_score` DECIMAL(3,1) NOT NULL COMMENT '信息评分', `info_rate_count` BIGINT NOT NULL COMMENT '信息评分人数', `download_count` BIGINT NOT NULL COMMENT '下载次数', `price` DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '价格', `release_date` BIGINT NOT NULL COMMENT '发布时间(时间戳毫秒)', `new_features` TEXT COMMENT '新功能描述', `upgrade_msg` TEXT COMMENT '升级信息', `target_sdk` VARCHAR(20) NOT NULL COMMENT '目标SDK版本', `min_sdk` VARCHAR(20) NOT NULL COMMENT '最小SDK版本', `compile_sdk_version` INT DEFAULT 0 COMMENT '编译SDK版本', `min_hmos_api_level` INT DEFAULT 0 COMMENT '最小HarmonyOS API等级', `api_release_type` VARCHAR(50) DEFAULT 'Release' COMMENT 'API发布类型', `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE, FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE, INDEX `idx_app_id` (`app_id`), INDEX `idx_pkg_name` (`pkg_name`), INDEX `idx_download_count` (`download_count`), INDEX `idx_created_at` (`created_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='应用指标表'; ``` #### 4.1.3 应用评分表 (app_rating) ```sql CREATE TABLE `app_rating` ( `id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID', `app_id` VARCHAR(50) NOT NULL COMMENT '应用ID', `pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名', `average_rating` DECIMAL(3,2) NOT NULL COMMENT '平均评分', `star_1_count` INT NOT NULL DEFAULT 0 COMMENT '1星评分数量', `star_2_count` INT NOT NULL DEFAULT 0 COMMENT '2星评分数量', `star_3_count` INT NOT NULL DEFAULT 0 COMMENT '3星评分数量', `star_4_count` INT NOT NULL DEFAULT 0 COMMENT '4星评分数量', `star_5_count` INT NOT NULL DEFAULT 0 COMMENT '5星评分数量', `total_rating_count` INT NOT NULL DEFAULT 0 COMMENT '总评分数量', `only_star_count` INT NOT NULL DEFAULT 0 COMMENT '仅星级数量', `full_average_rating` VARCHAR(20) COMMENT '完整平均评分', `source_type` VARCHAR(50) COMMENT '来源类型', `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE, FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE, INDEX `idx_app_id` (`app_id`), INDEX `idx_pkg_name` (`pkg_name`), INDEX `idx_average_rating` (`average_rating`), INDEX `idx_created_at` (`created_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='应用评分表'; ``` #### 4.1.4 原始数据历史表 (app_data_history) ```sql CREATE TABLE `app_data_history` ( `id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID', `app_id` VARCHAR(50) NOT NULL COMMENT '应用ID', `pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名', `raw_json_data` JSON NOT NULL COMMENT '原始应用数据JSON', `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE, FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE, INDEX `idx_app_id` (`app_id`), INDEX `idx_created_at` (`created_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='原始数据历史表'; ``` #### 4.1.5 评分历史表 (app_rating_history) ```sql CREATE TABLE `app_rating_history` ( `id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID', `app_id` VARCHAR(50) NOT NULL COMMENT '应用ID', `pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名', `raw_json_rating` JSON NOT NULL COMMENT '原始评分数据JSON', `created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE, FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE, INDEX `idx_app_id` (`app_id`), INDEX `idx_created_at` (`created_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='评分历史表'; ``` ### 4.2 索引优化建议 1. **联合索引:** - `(pkg_name, created_at)` - 用于按包名查询历史 - `(developer_name, download_count)` - 用于开发者排行 - `(kind_name, download_count)` - 用于分类排行 2. **全文索引:** - `name`, `brief_desc` - 用于应用搜索 3. **分区策略:** - 历史表按月分区,提高查询效率 --- ## 5. 后端开发 ### 5.1 项目结构 ``` backend/ ├── app/ │ ├── __init__.py │ ├── main.py # FastAPI 应用入口 │ ├── config.py # 配置文件 │ ├── database.py # 数据库连接 │ ├── models/ # SQLAlchemy 模型 │ │ ├── __init__.py │ │ ├── app_info.py │ │ ├── app_metrics.py │ │ └── app_rating.py │ ├── schemas/ # Pydantic 模型 │ │ ├── __init__.py │ │ ├── app.py │ │ └── response.py │ ├── api/ # API 路由 │ │ ├── __init__.py │ │ ├── apps.py │ │ ├── rankings.py │ │ ├── charts.py │ │ └── submit.py │ ├── crawler/ # 爬虫模块 │ │ ├── __init__.py │ │ ├── huawei_api.py # 华为API封装 │ │ ├── token_manager.py # Token管理 │ │ └── data_processor.py # 数据处理 │ ├── scheduler/ # 调度模块 │ │ ├── __init__.py │ │ └── tasks.py │ └── utils/ # 工具函数 │ ├── __init__.py │ └── helpers.py ├── requirements.txt ├── .env.example └── README.md ``` ### 5.2 核心代码实现 #### 5.2.1 配置文件 (config.py) ```python from pydantic_settings import BaseSettings from typing import List class Settings(BaseSettings): # 数据库配置 MYSQL_HOST: str = "localhost" MYSQL_PORT: int = 3306 MYSQL_USER: str = "root" MYSQL_PASSWORD: str = "password" MYSQL_DATABASE: str = "huawei_market" # 华为API配置 HUAWEI_API_BASE_URL: str = "https://web-drcn.hispace.dbankcloud.com/edge" HUAWEI_LOCALE: str = "zh_CN" # 爬虫配置 CRAWLER_INTERVAL: int = 1800 # 同步间隔(秒) CRAWLER_BATCH_SIZE: int = 100 # 批量处理大小 CRAWLER_TIMEOUT: int = 30 # 请求超时(秒) # API配置 API_PREFIX: str = "/api" API_TITLE: str = "华为应用市场数据API" API_VERSION: str = "1.0.0" # 其他配置 DEBUG: bool = False CORS_ORIGINS: List[str] = ["http://localhost:5173", "http://localhost:3000"] @property def database_url(self) -> str: return f"mysql+aiomysql://{self.MYSQL_USER}:{self.MYSQL_PASSWORD}@{self.MYSQL_HOST}:{self.MYSQL_PORT}/{self.MYSQL_DATABASE}" class Config: env_file = ".env" settings = Settings() ``` #### 5.2.2 数据库连接 (database.py) ```python from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from app.config import settings # 创建异步引擎 engine = create_async_engine( settings.database_url, echo=settings.DEBUG, pool_size=10, max_overflow=20, pool_pre_ping=True ) # 创建异步会话工厂 AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) # 创建基类 Base = declarative_base() # 依赖注入 async def get_db(): async with AsyncSessionLocal() as session: try: yield session finally: await session.close() ``` #### 5.2.3 数据模型 (models/app_info.py) ```python from sqlalchemy import Column, String, Integer, Text, DateTime, Boolean, JSON, DECIMAL, BigInteger from sqlalchemy.sql import func from app.database import Base class AppInfo(Base): __tablename__ = "app_info" app_id = Column(String(50), primary_key=True, comment="应用唯一ID") alliance_app_id = Column(String(50), comment="联盟应用ID") name = Column(String(255), nullable=False, comment="应用名称") pkg_name = Column(String(255), nullable=False, unique=True, index=True, comment="应用包名") dev_id = Column(String(50), nullable=False, comment="开发者ID") developer_name = Column(String(255), nullable=False, index=True, comment="开发者名称") dev_en_name = Column(String(255), comment="开发者英文名称") supplier = Column(String(255), comment="供应商名称") kind_id = Column(Integer, nullable=False, comment="应用分类ID") kind_name = Column(String(100), nullable=False, index=True, comment="应用分类名称") tag_name = Column(String(255), comment="标签名称") kind_type_id = Column(Integer, nullable=False, comment="类型ID") kind_type_name = Column(String(100), nullable=False, comment="类型名称") icon_url = Column(Text, nullable=False, comment="应用图标URL") brief_desc = Column(Text, nullable=False, comment="简短描述") description = Column(Text, nullable=False, comment="应用详细描述") privacy_url = Column(Text, nullable=False, comment="隐私政策链接") # 布尔字段 iap = Column(Boolean, default=False, comment="是否含应用内购买") hms = Column(Boolean, default=False, comment="是否依赖HMS") is_pay = Column(Boolean, default=False, comment="是否付费") is_shelves = Column(Boolean, default=True, comment="是否上架") # JSON字段 comment = Column(JSON, comment="评论或注释数据") release_countries = Column(JSON, comment="应用发布的国家/地区列表") main_device_codes = Column(JSON, comment="应用支持的主要设备类型") # 时间字段 listed_at = Column(DateTime, nullable=False, comment="应用上架时间") created_at = Column(DateTime, nullable=False, server_default=func.now(), comment="创建时间") updated_at = Column(DateTime, nullable=False, server_default=func.now(), onupdate=func.now(), comment="更新时间") ``` #### 5.2.4 华为API封装 (crawler/huawei_api.py) ```python import httpx import asyncio import json from typing import Optional, Dict, Any from app.config import settings from app.crawler.token_manager import TokenManager class HuaweiAPI: def __init__(self): self.base_url = settings.HUAWEI_API_BASE_URL self.locale = settings.HUAWEI_LOCALE self.token_manager = TokenManager() self.client = httpx.AsyncClient(timeout=settings.CRAWLER_TIMEOUT) async def get_app_info(self, pkg_name: Optional[str] = None, app_id: Optional[str] = None) -> Dict[str, Any]: """获取应用基本信息""" if not pkg_name and not app_id: raise ValueError("必须提供 pkg_name 或 app_id") # 获取token tokens = await self.token_manager.get_token() # 构建请求 url = f"{self.base_url}/webedge/appinfo" headers = { "Content-Type": "application/json", "User-Agent": "HuaweiMarketCrawler/1.0", "interface-code": tokens["interface_code"], "identity-id": tokens["identity_id"] } body = {"locale": self.locale} if pkg_name: body["pkgName"] = pkg_name else: body["appId"] = app_id # 发送请求 response = await self.client.post(url, headers=headers, json=body) response.raise_for_status() data = response.json() # 数据清洗 return self._clean_data(data) async def get_app_rating(self, app_id: str) -> Optional[Dict[str, Any]]: """获取应用评分详情""" # 跳过元服务 if app_id.startswith("com.atomicservice"): return None tokens = await self.token_manager.get_token() url = f"{self.base_url}/harmony/page-detail" headers = { "Content-Type": "application/json", "User-Agent": "HuaweiMarketCrawler/1.0", "Interface-Code": tokens["interface_code"], "identity-id": tokens["identity_id"] } body = { "pageId": f"webAgAppDetail|{app_id}", "pageNum": 1, "pageSize": 100, "zone": "" } try: response = await self.client.post(url, headers=headers, json=body) response.raise_for_status() data = response.json() # 解析评分数据 layouts = data["pages"][0]["data"]["cardlist"]["layoutData"] comment_cards = [l for l in layouts if l.get("type") == "fl.card.comment"] if not comment_cards: return None star_info_str = comment_cards[0]["data"][0]["starInfo"] return json.loads(star_info_str) except Exception as e: print(f"获取评分失败: {e}") return None def _clean_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """清洗数据""" # 移除 \0 字符 for key, value in data.items(): if isinstance(value, str): data[key] = value.replace('\x00', '') # 移除 AG-TraceId data.pop('AG-TraceId', None) # 验证 appId 长度 if len(data.get('appId', '')) < 15: raise ValueError("appId长度小于15,可能是安卓应用") return data async def close(self): """关闭客户端""" await self.client.aclose() ``` #### 5.2.5 Token管理器 (crawler/token_manager.py) ```python import asyncio from datetime import datetime, timedelta from typing import Dict from playwright.async_api import async_playwright class TokenManager: def __init__(self): self.tokens: Dict[str, str] = {} self.token_expires_at: datetime = datetime.now() self.lock = asyncio.Lock() async def get_token(self) -> Dict[str, str]: """获取有效的token""" async with self.lock: if datetime.now() >= self.token_expires_at or not self.tokens: await self._refresh_token() return self.tokens async def _refresh_token(self): """刷新token""" print("正在刷新token...") async with async_playwright() as p: browser = await p.chromium.launch(headless=True) page = await browser.new_page() # 拦截请求获取token tokens = {} async def handle_request(request): headers = request.headers if 'interface-code' in headers: tokens['interface_code'] = headers['interface-code'] tokens['identity_id'] = headers['identity-id'] page.on('request', handle_request) # 访问华为应用市场 await page.goto('https://appgallery.huawei.com/', wait_until='networkidle') await page.wait_for_timeout(3000) await browser.close() if tokens: self.tokens = tokens # token有效期设为10分钟 self.token_expires_at = datetime.now() + timedelta(minutes=10) print(f"Token刷新成功,有效期至: {self.token_expires_at}") else: raise Exception("无法获取token") ``` #### 5.2.6 数据处理器 (crawler/data_processor.py) ```python from typing import Dict, Any, Optional, Tuple from datetime import datetime from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.models.app_info import AppInfo from app.models.app_metrics import AppMetrics from app.models.app_rating import AppRating from app.models.app_data_history import AppDataHistory from app.models.app_rating_history import AppRatingHistory class DataProcessor: def __init__(self, db: AsyncSession): self.db = db async def save_app_data( self, app_data: Dict[str, Any], rating_data: Optional[Dict[str, Any]] = None, comment: Optional[Dict[str, Any]] = None ) -> Tuple[bool, bool, bool]: """ 保存应用数据 返回: (是否插入新应用信息, 是否插入新指标, 是否插入新评分) """ app_id = app_data['appId'] pkg_name = app_data['pkgName'] # 检查应用是否存在 result = await self.db.execute( select(AppInfo).where(AppInfo.app_id == app_id) ) existing_app = result.scalar_one_or_none() # 保存应用基本信息 info_inserted = False if not existing_app or await self._is_info_changed(existing_app, app_data): await self._save_app_info(app_data, comment) info_inserted = True # 保存应用指标 metric_inserted = False if await self._should_save_metric(app_id, app_data): await self._save_app_metric(app_data) metric_inserted = True # 保存评分数据 rating_inserted = False if rating_data and await self._should_save_rating(app_id, rating_data): await self._save_app_rating(app_id, pkg_name, rating_data) rating_inserted = True # 保存原始数据历史 if info_inserted or metric_inserted: await self._save_data_history(app_id, pkg_name, app_data) if rating_inserted: await self._save_rating_history(app_id, pkg_name, rating_data) await self.db.commit() return info_inserted, metric_inserted, rating_inserted async def _save_app_info(self, data: Dict[str, Any], comment: Optional[Dict] = None): """保存应用基本信息""" app_info = AppInfo( app_id=data['appId'], alliance_app_id=data.get('allianceAppId', ''), name=data['name'], pkg_name=data['pkgName'], dev_id=data['devId'], developer_name=data['developerName'], dev_en_name=data.get('devEnName', ''), supplier=data.get('supplier', ''), kind_id=int(data['kindId']), kind_name=data['kindName'], tag_name=data.get('tagName'), kind_type_id=int(data['kindTypeId']), kind_type_name=data['kindTypeName'], icon_url=data['icon'], brief_desc=data['briefDes'], description=data['description'], privacy_url=data['privacyUrl'], iap=bool(data.get('iap', 0)), hms=bool(data.get('hms', 0)), is_pay=data.get('isPay') == '1', is_shelves=bool(data.get('isShelves', 1)), comment=comment, release_countries=data.get('releaseCountries', []), main_device_codes=data.get('mainDeviceCodes', []), listed_at=datetime.fromtimestamp(data.get('releaseDate', 0) / 1000) ) # 使用 merge 实现 upsert self.db.add(app_info) async def _save_app_metric(self, data: Dict[str, Any]): """保存应用指标""" # 清洗下载量数据 download_count = self._parse_download_count(data.get('downCount', '0')) metric = AppMetrics( app_id=data['appId'], pkg_name=data['pkgName'], version=data['version'], version_code=int(data['versionCode']), size_bytes=int(data['size']), sha256=data.get('sha256', ''), info_score=float(data.get('hot', '0.0')), info_rate_count=int(data.get('rateNum', '0')), download_count=download_count, price=float(data.get('price', '0')), release_date=int(data.get('releaseDate', 0)), new_features=data.get('newFeatures', ''), upgrade_msg=data.get('upgradeMsg', ''), target_sdk=data.get('targetSdk', ''), min_sdk=data.get('minsdk', ''), compile_sdk_version=int(data.get('compileSdkVersion', 0)), min_hmos_api_level=int(data.get('minHmosApiLevel', 0)), api_release_type=data.get('apiReleaseType', 'Release') ) self.db.add(metric) async def _save_app_rating(self, app_id: str, pkg_name: str, data: Dict[str, Any]): """保存应用评分""" rating = AppRating( app_id=app_id, pkg_name=pkg_name, average_rating=float(data['averageRating']), star_1_count=int(data['oneStarRatingCount']), star_2_count=int(data['twoStarRatingCount']), star_3_count=int(data['threeStarRatingCount']), star_4_count=int(data['fourStarRatingCount']), star_5_count=int(data['fiveStarRatingCount']), total_rating_count=int(data['totalStarRatingCount']), only_star_count=int(data.get('onlyStarCount', 0)), full_average_rating=data.get('fullAverageRating', ''), source_type=data.get('sourceType', '') ) self.db.add(rating) def _parse_download_count(self, count_str: str) -> int: """解析下载量字符串""" # 移除 + 号和其他非数字字符 count_str = count_str.replace('+', '').replace(',', '') try: return int(count_str) except ValueError: return 0 async def _is_info_changed(self, existing: AppInfo, new_data: Dict) -> bool: """检查应用信息是否变化""" return ( existing.name != new_data['name'] or existing.version != new_data.get('version', '') or existing.description != new_data.get('description', '') ) async def _should_save_metric(self, app_id: str, data: Dict) -> bool: """判断是否需要保存新的指标数据""" # 查询最新的指标 result = await self.db.execute( select(AppMetrics) .where(AppMetrics.app_id == app_id) .order_by(AppMetrics.created_at.desc()) .limit(1) ) latest_metric = result.scalar_one_or_none() if not latest_metric: return True # 比较关键字段 return ( latest_metric.version != data['version'] or latest_metric.download_count != self._parse_download_count(data.get('downCount', '0')) ) async def _should_save_rating(self, app_id: str, data: Dict) -> bool: """判断是否需要保存新的评分数据""" result = await self.db.execute( select(AppRating) .where(AppRating.app_id == app_id) .order_by(AppRating.created_at.desc()) .limit(1) ) latest_rating = result.scalar_one_or_none() if not latest_rating: return True return ( float(latest_rating.average_rating) != float(data['averageRating']) or latest_rating.total_rating_count != int(data['totalStarRatingCount']) ) ``` #### 5.2.7 API路由 (api/apps.py) ```python from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, or_ from typing import Optional, List from app.database import get_db from app.models.app_info import AppInfo from app.models.app_metrics import AppMetrics from app.models.app_rating import AppRating from app.schemas.response import ApiResponse from app.crawler.huawei_api import HuaweiAPI from app.crawler.data_processor import DataProcessor router = APIRouter(prefix="/apps", tags=["应用"]) @router.get("/pkg_name/{pkg_name}") async def get_app_by_pkg_name( pkg_name: str, db: AsyncSession = Depends(get_db) ): """按包名查询应用""" # 尝试从API获取最新数据 api = HuaweiAPI() try: app_data = await api.get_app_info(pkg_name=pkg_name) rating_data = await api.get_app_rating(app_data['appId']) # 保存到数据库 processor = DataProcessor(db) new_info, new_metric, new_rating = await processor.save_app_data( app_data, rating_data ) # 查询完整数据 result = await db.execute( select(AppInfo, AppMetrics, AppRating) .join(AppMetrics, AppInfo.app_id == AppMetrics.app_id) .outerjoin(AppRating, AppInfo.app_id == AppRating.app_id) .where(AppInfo.pkg_name == pkg_name) .order_by(AppMetrics.created_at.desc()) .limit(1) ) row = result.first() return ApiResponse( success=True, data={ "info": row[0].__dict__ if row else None, "metric": row[1].__dict__ if row and len(row) > 1 else None, "rating": row[2].__dict__ if row and len(row) > 2 else None, "new_info": new_info, "new_metric": new_metric, "new_rating": new_rating, "get_data": True } ) except Exception as e: # 回退到数据库数据 result = await db.execute( select(AppInfo, AppMetrics, AppRating) .join(AppMetrics, AppInfo.app_id == AppMetrics.app_id) .outerjoin(AppRating, AppInfo.app_id == AppRating.app_id) .where(AppInfo.pkg_name == pkg_name) .order_by(AppMetrics.created_at.desc()) .limit(1) ) row = result.first() if not row: raise HTTPException(status_code=404, detail=f"应用 {pkg_name} 不存在") return ApiResponse( success=True, data={ "info": row[0].__dict__, "metric": row[1].__dict__ if len(row) > 1 else None, "rating": row[2].__dict__ if len(row) > 2 else None, "get_data": False, "error": str(e) } ) finally: await api.close() @router.get("/list/{page}") async def get_app_list( page: int = 1, page_size: int = Query(100, le=500), detail: bool = True, sort: Optional[str] = None, desc: bool = True, search_key: Optional[str] = None, search_value: Optional[str] = None, search_exact: bool = False, db: AsyncSession = Depends(get_db) ): """分页获取应用列表""" # 构建基础查询 if detail: query = select(AppInfo, AppMetrics, AppRating).join( AppMetrics, AppInfo.app_id == AppMetrics.app_id ).outerjoin( AppRating, AppInfo.app_id == AppRating.app_id ) else: query = select(AppInfo) # 搜索过滤 if search_key and search_value: if search_exact: query = query.where(getattr(AppInfo, search_key) == search_value) else: query = query.where(getattr(AppInfo, search_key).like(f"%{search_value}%")) # 排序 if sort: order_column = getattr(AppMetrics if hasattr(AppMetrics, sort) else AppInfo, sort) query = query.order_by(order_column.desc() if desc else order_column.asc()) else: query = query.order_by(AppMetrics.download_count.desc()) # 计算总数 count_query = select(func.count()).select_from(AppInfo) if search_key and search_value: if search_exact: count_query = count_query.where(getattr(AppInfo, search_key) == search_value) else: count_query = count_query.where(getattr(AppInfo, search_key).like(f"%{search_value}%")) total_result = await db.execute(count_query) total_count = total_result.scalar() # 分页 offset = (page - 1) * page_size query = query.offset(offset).limit(page_size) result = await db.execute(query) rows = result.all() # 格式化数据 data = [] for row in rows: if detail: data.append({ "info": row[0].__dict__, "metric": row[1].__dict__ if len(row) > 1 else None, "rating": row[2].__dict__ if len(row) > 2 else None }) else: data.append(row[0].__dict__) return ApiResponse( success=True, data=data, total=total_count, limit=page_size ) @router.get("/metrics/{pkg_name}") async def get_app_metrics_history( pkg_name: str, db: AsyncSession = Depends(get_db) ): """获取应用指标历史""" result = await db.execute( select(AppMetrics) .where(AppMetrics.pkg_name == pkg_name) .order_by(AppMetrics.created_at.desc()) ) metrics = result.scalars().all() return ApiResponse( success=True, data=[m.__dict__ for m in metrics] ) ``` #### 5.2.8 排行榜API (api/rankings.py) ```python from fastapi import APIRouter, Depends, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_ from datetime import datetime, timedelta from app.database import get_db from app.models.app_info import AppInfo from app.models.app_metrics import AppMetrics from app.models.app_rating import AppRating from app.schemas.response import ApiResponse router = APIRouter(prefix="/rankings", tags=["排行榜"]) @router.get("/top-downloads") async def get_top_downloads( limit: int = Query(10, le=100), exclude_pattern: str = Query(None), db: AsyncSession = Depends(get_db) ): """下载量排行榜""" # 子查询:获取每个应用的最新指标 subquery = ( select( AppMetrics.app_id, func.max(AppMetrics.created_at).label('max_created_at') ) .group_by(AppMetrics.app_id) .subquery() ) # 主查询 query = ( select(AppInfo, AppMetrics) .join(AppMetrics, AppInfo.app_id == AppMetrics.app_id) .join( subquery, and_( AppMetrics.app_id == subquery.c.app_id, AppMetrics.created_at == subquery.c.max_created_at ) ) .order_by(AppMetrics.download_count.desc()) .limit(limit) ) # 排除模式 if exclude_pattern: query = query.where(~AppInfo.pkg_name.like(f"%{exclude_pattern}%")) result = await db.execute(query) rows = result.all() data = [ { "app_id": row[0].app_id, "name": row[0].name, "pkg_name": row[0].pkg_name, "developer_name": row[0].developer_name, "icon_url": row[0].icon_url, "download_count": row[1].download_count, "version": row[1].version } for row in rows ] return ApiResponse(success=True, data=data, limit=limit) @router.get("/ratings") async def get_top_ratings( limit: int = Query(10, le=100), db: AsyncSession = Depends(get_db) ): """评分排行榜""" subquery = ( select( AppRating.app_id, func.max(AppRating.created_at).label('max_created_at') ) .group_by(AppRating.app_id) .subquery() ) query = ( select(AppInfo, AppRating) .join(AppRating, AppInfo.app_id == AppRating.app_id) .join( subquery, and_( AppRating.app_id == subquery.c.app_id, AppRating.created_at == subquery.c.max_created_at ) ) .where(AppRating.total_rating_count >= 100) # 至少100个评分 .order_by(AppRating.average_rating.desc()) .limit(limit) ) result = await db.execute(query) rows = result.all() data = [ { "app_id": row[0].app_id, "name": row[0].name, "pkg_name": row[0].pkg_name, "developer_name": row[0].developer_name, "icon_url": row[0].icon_url, "average_rating": float(row[1].average_rating), "total_rating_count": row[1].total_rating_count } for row in rows ] return ApiResponse(success=True, data=data, limit=limit) @router.get("/developers") async def get_top_developers( limit: int = Query(10, le=100), db: AsyncSession = Depends(get_db) ): """开发者排行榜(按应用数量)""" query = ( select( AppInfo.developer_name, func.count(AppInfo.app_id).label('app_count'), func.sum(AppMetrics.download_count).label('total_downloads') ) .join(AppMetrics, AppInfo.app_id == AppMetrics.app_id) .group_by(AppInfo.developer_name) .order_by(func.count(AppInfo.app_id).desc()) .limit(limit) ) result = await db.execute(query) rows = result.all() data = [ { "developer_name": row[0], "app_count": row[1], "total_downloads": row[2] or 0 } for row in rows ] return ApiResponse(success=True, data=data, limit=limit) ``` #### 5.2.9 定时任务 (scheduler/tasks.py) ```python from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from sqlalchemy.ext.asyncio import AsyncSession from app.database import AsyncSessionLocal from app.config import settings from app.crawler.huawei_api import HuaweiAPI from app.crawler.data_processor import DataProcessor import asyncio import random class CrawlerScheduler: def __init__(self): self.scheduler = AsyncIOScheduler() self.is_running = False def start(self): """启动调度器""" # 添加定时任务 self.scheduler.add_job( self.sync_all_apps, trigger=IntervalTrigger(seconds=settings.CRAWLER_INTERVAL), id='sync_all_apps', name='同步所有应用', replace_existing=True ) self.scheduler.start() print(f"调度器已启动,同步间隔: {settings.CRAWLER_INTERVAL}秒") def stop(self): """停止调度器""" self.scheduler.shutdown() print("调度器已停止") async def sync_all_apps(self): """同步所有应用""" if self.is_running: print("上一次同步尚未完成,跳过本次同步") return self.is_running = True print(f"开始同步所有应用 - {datetime.now()}") try: async with AsyncSessionLocal() as db: # 获取所有包名 from sqlalchemy import select from app.models.app_info import AppInfo result = await db.execute(select(AppInfo.pkg_name)) pkg_names = [row[0] for row in result.all()] # 随机打乱顺序 random.shuffle(pkg_names) print(f"共需同步 {len(pkg_names)} 个应用") # 批量处理 api = HuaweiAPI() processor = DataProcessor(db) total_processed = 0 total_inserted = 0 total_failed = 0 for i in range(0, len(pkg_names), settings.CRAWLER_BATCH_SIZE): batch = pkg_names[i:i + settings.CRAWLER_BATCH_SIZE] # 并发处理批次 tasks = [ self._sync_single_app(api, processor, pkg_name) for pkg_name in batch ] results = await asyncio.gather(*tasks, return_exceptions=True) # 统计结果 for result in results: total_processed += 1 if isinstance(result, Exception): total_failed += 1 elif result: total_inserted += 1 print(f"已处理 {total_processed}/{len(pkg_names)} 个应用") # 批次间延迟 await asyncio.sleep(0.5) await api.close() print(f"同步完成 - 处理: {total_processed}, 更新: {total_inserted}, 失败: {total_failed}") except Exception as e: print(f"同步失败: {e}") finally: self.is_running = False async def _sync_single_app( self, api: HuaweiAPI, processor: DataProcessor, pkg_name: str ) -> bool: """同步单个应用""" try: # 获取应用数据 app_data = await api.get_app_info(pkg_name=pkg_name) rating_data = await api.get_app_rating(app_data['appId']) # 保存数据 new_info, new_metric, new_rating = await processor.save_app_data( app_data, rating_data ) return new_info or new_metric or new_rating except Exception as e: print(f"同步 {pkg_name} 失败: {e}") return False # 全局调度器实例 scheduler = CrawlerScheduler() ``` #### 5.2.10 主应用 (main.py) ```python from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from app.config import settings from app.api import apps, rankings, charts, submit from app.scheduler.tasks import scheduler @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" # 启动时 print("应用启动中...") scheduler.start() yield # 关闭时 print("应用关闭中...") scheduler.stop() # 创建FastAPI应用 app = FastAPI( title=settings.API_TITLE, version=settings.API_VERSION, lifespan=lifespan ) # CORS中间件 app.add_middleware( CORSMiddleware, allow_origins=settings.CORS_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 注册路由 app.include_router(apps.router, prefix=settings.API_PREFIX) app.include_router(rankings.router, prefix=settings.API_PREFIX) app.include_router(charts.router, prefix=settings.API_PREFIX) app.include_router(submit.router, prefix=settings.API_PREFIX) @app.get("/") async def root(): return {"message": "华为应用市场数据API", "version": settings.API_VERSION} @app.get("/health") async def health_check(): return {"status": "healthy"} if __name__ == "__main__": import uvicorn uvicorn.run( "app.main:app", host="0.0.0.0", port=8000, reload=settings.DEBUG ) ``` ### 5.3 依赖文件 (requirements.txt) ```txt fastapi==0.109.0 uvicorn[standard]==0.27.0 sqlalchemy==2.0.25 aiomysql==0.2.0 pydantic==2.5.3 pydantic-settings==2.1.0 httpx==0.26.0 playwright==1.41.0 apscheduler==3.10.4 python-dotenv==1.0.0 python-multipart==0.0.6 ``` ### 5.4 环境配置 (.env.example) ```env # 数据库配置 MYSQL_HOST=localhost MYSQL_PORT=3306 MYSQL_USER=root MYSQL_PASSWORD=your_password MYSQL_DATABASE=huawei_market # 华为API配置 HUAWEI_API_BASE_URL=https://web-drcn.hispace.dbankcloud.com/edge HUAWEI_LOCALE=zh_CN # 爬虫配置 CRAWLER_INTERVAL=1800 CRAWLER_BATCH_SIZE=100 CRAWLER_TIMEOUT=30 # API配置 API_PREFIX=/api API_TITLE=华为应用市场数据API API_VERSION=1.0.0 # 其他配置 DEBUG=False CORS_ORIGINS=["http://localhost:5173","http://localhost:3000"] ``` --- ## 6. 前端开发 ### 6.1 项目结构 ``` frontend/ ├── public/ │ └── favicon.ico ├── src/ │ ├── assets/ # 静态资源 │ │ ├── styles/ │ │ │ └── main.css │ │ └── images/ │ ├── components/ # 组件 │ │ ├── AppCard.vue │ │ ├── AppTable.vue │ │ ├── ChartCard.vue │ │ ├── StatCard.vue │ │ └── SearchBar.vue │ ├── views/ # 页面 │ │ ├── Dashboard.vue │ │ ├── AppDetail.vue │ │ └── Rankings.vue │ ├── api/ # API封装 │ │ ├── index.ts │ │ └── apps.ts │ ├── stores/ # 状态管理 │ │ └── app.ts │ ├── types/ # 类型定义 │ │ └── app.ts │ ├── utils/ # 工具函数 │ │ └── format.ts │ ├── router/ # 路由 │ │ └── index.ts │ ├── App.vue │ └── main.ts ├── index.html ├── package.json ├── tsconfig.json ├── vite.config.ts └── README.md ``` ### 6.2 核心代码实现 #### 6.2.1 类型定义 (types/app.ts) ```typescript export interface AppInfo { app_id: string name: string pkg_name: string developer_name: string dev_en_name?: string kind_name: string kind_type_name: string icon_url: string brief_desc: string description: string privacy_url: string iap: boolean is_pay: boolean listed_at: string created_at: string } export interface AppMetric { id: number app_id: string pkg_name: string version: string version_code: number size_bytes: number download_count: number info_score: number info_rate_count: number price: number release_date: number target_sdk: string min_sdk: string created_at: string } export interface AppRating { id: number app_id: string average_rating: number star_1_count: number star_2_count: number star_3_count: number star_4_count: number star_5_count: number total_rating_count: number created_at: string } export interface FullAppInfo { info: AppInfo metric: AppMetric rating?: AppRating } export interface ApiResponse { success: boolean data: T total?: number limit?: number timestamp: string } export interface MarketStats { app_count: { total: number apps: number atomic_services: number } developer_count: number } export interface RankingItem { app_id: string name: string pkg_name: string developer_name: string icon_url: string download_count?: number average_rating?: number total_rating_count?: number } ``` #### 6.2.2 API封装 (api/apps.ts) ```typescript import axios from 'axios' import type { ApiResponse, FullAppInfo, MarketStats, RankingItem } from '@/types/app' const api = axios.create({ baseURL: import.meta.env.VITE_API_BASE_URL || 'http://localhost:8000/api', timeout: 30000 }) // 请求拦截器 api.interceptors.request.use( config => { // 可以在这里添加token等 return config }, error => { return Promise.reject(error) } ) // 响应拦截器 api.interceptors.response.use( response => { return response.data }, error => { console.error('API Error:', error) return Promise.reject(error) } ) export const appsApi = { // 获取市场统计信息 getMarketInfo: () => api.get>('/market_info'), // 按包名查询应用 getAppByPkgName: (pkgName: string) => api.get>(`/apps/pkg_name/${pkgName}`), // 按应用ID查询 getAppById: (appId: string) => api.get>(`/apps/app_id/${appId}`), // 获取应用列表 getAppList: (params: { page: number page_size?: number detail?: boolean sort?: string desc?: boolean search_key?: string search_value?: string search_exact?: boolean }) => api.get>(`/apps/list/${params.page}`, { params }), // 获取应用指标历史 getAppMetrics: (pkgName: string) => api.get>(`/apps/metrics/${pkgName}`), // 获取下载排行 getTopDownloads: (params?: { limit?: number; exclude_pattern?: string }) => api.get>('/rankings/top-downloads', { params }), // 获取评分排行 getTopRatings: (params?: { limit?: number }) => api.get>('/rankings/ratings', { params }), // 获取开发者排行 getTopDevelopers: (params?: { limit?: number }) => api.get>('/rankings/developers', { params }), // 获取评分分布 getRatingDistribution: () => api.get>>('/charts/rating'), // 获取SDK分布 getMinSdkDistribution: () => api.get>>('/charts/min_sdk'), getTargetSdkDistribution: () => api.get>>('/charts/target_sdk'), // 投稿应用 submitApp: (data: { pkg_name?: string app_id?: string comment?: any }) => api.post>('/submit', data) } export default api ``` #### 6.2.3 状态管理 (stores/app.ts) ```typescript import { defineStore } from 'pinia' import { ref, computed } from 'vue' import { appsApi } from '@/api/apps' import type { MarketStats, FullAppInfo } from '@/types/app' export const useAppStore = defineStore('app', () => { // 状态 const marketStats = ref(null) const appList = ref([]) const currentPage = ref(1) const pageSize = ref(100) const totalCount = ref(0) const loading = ref(false) // 计算属性 const totalPages = computed(() => Math.ceil(totalCount.value / pageSize.value)) // 方法 const fetchMarketStats = async () => { try { const response = await appsApi.getMarketInfo() if (response.success) { marketStats.value = response.data } } catch (error) { console.error('获取市场统计失败:', error) } } const fetchAppList = async (params: { page?: number page_size?: number sort?: string desc?: boolean search_key?: string search_value?: string search_exact?: boolean } = {}) => { loading.value = true try { const response = await appsApi.getAppList({ page: params.page || currentPage.value, page_size: params.page_size || pageSize.value, detail: true, ...params }) if (response.success) { appList.value = response.data totalCount.value = response.total || 0 currentPage.value = params.page || currentPage.value } } catch (error) { console.error('获取应用列表失败:', error) } finally { loading.value = false } } const searchApps = async (searchKey: string, searchValue: string, exact: boolean = false) => { await fetchAppList({ page: 1, search_key: searchKey, search_value: searchValue, search_exact: exact }) } return { marketStats, appList, currentPage, pageSize, totalCount, totalPages, loading, fetchMarketStats, fetchAppList, searchApps } }) ``` #### 6.2.4 工具函数 (utils/format.ts) ```typescript /** * 格式化文件大小 */ export function formatFileSize(bytes: number): string { if (bytes === 0) return '0 B' const k = 1024 const sizes = ['B', 'KB', 'MB', 'GB', 'TB'] const i = Math.floor(Math.log(bytes) / Math.log(k)) return Math.round(bytes / Math.pow(k, i) * 100) / 100 + ' ' + sizes[i] } /** * 格式化下载量 */ export function formatDownloadCount(count: number): string { if (count >= 100000000) { return (count / 100000000).toFixed(1) + '亿' } else if (count >= 10000) { return (count / 10000).toFixed(1) + '万' } return count.toString() } /** * 格式化日期 */ export function formatDate(date: string | number): string { const d = new Date(date) return d.toLocaleDateString('zh-CN', { year: 'numeric', month: '2-digit', day: '2-digit', hour: '2-digit', minute: '2-digit' }) } /** * 格式化评分 */ export function formatRating(rating: number): string { return rating.toFixed(1) } /** * 获取星级数组 */ export function getStarArray(rating: number): boolean[] { const fullStars = Math.floor(rating) const hasHalfStar = rating % 1 >= 0.5 const stars: boolean[] = [] for (let i = 0; i < 5; i++) { stars.push(i < fullStars || (i === fullStars && hasHalfStar)) } return stars } ``` --- ## 附录A:如何获取应用包名 ### A.1 从华为应用市场网页获取 #### 方法1:从URL中提取 访问华为应用市场应用详情页,URL格式如下: ``` https://appgallery.huawei.com/app/C1164531384803416384 ``` 或者: ``` https://appgallery.huawei.com/#/app/C1164531384803416384 ``` **注意:** URL中的是 `app_id`,不是包名。需要进一步获取包名。 #### 方法2:从网页源码中提取 1. 打开应用详情页 2. 右键 -> 查看网页源代码 3. 搜索 `"pkgName"` 或 `"packageName"` 4. 找到类似这样的内容: ```json { "pkgName": "com.huawei.hmsapp.appgallery", "appId": "C1164531384803416384", ... } ``` #### 方法3:使用浏览器开发者工具 1. 打开应用详情页 2. 按 F12 打开开发者工具 3. 切换到 Network(网络)标签 4. 刷新页面 5. 筛选 XHR 请求,找到 `appinfo` 相关的请求 6. 查看请求的 Response,找到 `pkgName` 字段 **示例截图说明:** ``` Network -> XHR -> appinfo Response: { "pkgName": "com.huawei.hmsapp.appgallery", "name": "应用市场", ... } ``` ### A.2 从安卓设备获取 #### 方法1:使用 ADB 命令 如果你有安卓设备或模拟器: ```bash # 列出所有已安装应用的包名 adb shell pm list packages # 列出第三方应用 adb shell pm list packages -3 # 搜索特定应用(例如包含 huawei 的) adb shell pm list packages | grep huawei # 获取当前运行应用的包名 adb shell dumpsys window | grep mCurrentFocus ``` **输出示例:** ``` package:com.huawei.hmsapp.appgallery package:com.huawei.browser package:com.huawei.music ``` #### 方法2:使用应用信息查看器 在安卓设备上安装 "应用信息查看器" 类的应用,例如: - **Package Name Viewer** - **App Inspector** - **Dev Tools** 这些应用可以直接显示已安装应用的包名。 ### A.3 批量获取包名的方法 #### 方法1:爬取华为应用市场分类页 ```python import httpx from bs4 import BeautifulSoup async def get_apps_from_category(category_id: str): """从分类页获取应用列表""" url = f"https://appgallery.huawei.com/Featured/{category_id}" async with httpx.AsyncClient() as client: response = await client.get(url) soup = BeautifulSoup(response.text, 'html.parser') # 查找应用链接 app_links = soup.find_all('a', href=True) app_ids = [] for link in app_links: href = link['href'] if '/app/' in href: app_id = href.split('/app/')[-1] app_ids.append(app_id) return app_ids # 使用示例 app_ids = await get_apps_from_category('10000000') # 工具分类 ``` #### 方法2:通过应用ID猜测 华为应用的 app_id 格式为:`C` + 19位数字 可以通过遍历数字范围来发现应用: ```python async def guess_app_ids(start: int, end: int): """猜测应用ID""" api = HuaweiAPI() found_apps = [] for i in range(start, end): app_id = f"C{i:019d}" try: app_data = await api.get_app_info(app_id=app_id) found_apps.append({ 'app_id': app_id, 'pkg_name': app_data['pkgName'], 'name': app_data['name'] }) print(f"找到应用: {app_data['name']} ({app_data['pkgName']})") except: pass return found_apps # 使用示例 apps = await guess_app_ids(1164531384803416384, 1164531384803416484) ``` #### 方法3:从已有数据库扩展 如果已经有一些应用数据,可以通过以下方式扩展: 1. **同开发者的其他应用** ```sql SELECT DISTINCT pkg_name FROM app_info WHERE developer_name = '华为软件技术有限公司' ``` 2. **同分类的应用** ```sql SELECT DISTINCT pkg_name FROM app_info WHERE kind_name = '工具' ``` 3. **相关推荐应用** - 访问应用详情页,查看"相关推荐"部分 - 提取推荐应用的 app_id ### A.4 常见应用包名示例 ```python # 华为系统应用 HUAWEI_SYSTEM_APPS = [ "com.huawei.hmsapp.appgallery", # 应用市场 "com.huawei.browser", # 浏览器 "com.huawei.music", # 音乐 "com.huawei.himovie", # 视频 "com.huawei.camera", # 相机 "com.huawei.health", # 运动健康 "com.huawei.wallet", # 钱包 ] # 热门第三方应用 POPULAR_APPS = [ "com.tencent.mm", # 微信 "com.tencent.mobileqq", # QQ "com.sina.weibo", # 微博 "com.taobao.taobao", # 淘宝 "com.jingdong.app.mall", # 京东 "com.ss.android.ugc.aweme", # 抖音 ] # 鸿蒙元服务(包名特征) ATOMIC_SERVICE_PATTERN = "com.atomicservice.*" ``` ### A.5 包名命名规范 包名通常遵循以下规范: **格式:** `com.公司名.应用名` **示例:** - `com.huawei.hmsapp.appgallery` - 华为应用市场 - `com.tencent.mm` - 腾讯微信 - `com.alibaba.android.rimet` - 阿里钉钉 **鸿蒙元服务:** - `com.atomicservice.{19位数字}` - 元服务包名格式 ### A.6 实用工具脚本 #### 从URL批量提取包名 ```python import re import httpx from typing import List async def extract_pkg_names_from_urls(urls: List[str]) -> List[dict]: """从URL列表批量提取包名""" api = HuaweiAPI() results = [] for url in urls: # 从URL提取app_id match = re.search(r'/app/([A-Z0-9]+)', url) if not match: continue app_id = match.group(1) try: app_data = await api.get_app_info(app_id=app_id) results.append({ 'url': url, 'app_id': app_id, 'pkg_name': app_data['pkgName'], 'name': app_data['name'] }) except Exception as e: print(f"处理 {url} 失败: {e}") return results # 使用示例 urls = [ "https://appgallery.huawei.com/app/C1164531384803416384", "https://appgallery.huawei.com/app/C100000000000000001", ] results = await extract_pkg_names_from_urls(urls) for r in results: print(f"{r['name']}: {r['pkg_name']}") ``` #### 导出包名列表 ```python import csv from sqlalchemy import select from app.models.app_info import AppInfo async def export_pkg_names_to_csv(db: AsyncSession, filename: str = "pkg_names.csv"): """导出所有包名到CSV文件""" result = await db.execute( select(AppInfo.pkg_name, AppInfo.name, AppInfo.developer_name) .order_by(AppInfo.name) ) with open(filename, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['包名', '应用名称', '开发者']) for row in result: writer.writerow([row.pkg_name, row.name, row.developer_name]) print(f"已导出到 {filename}") ``` ### A.7 注意事项 1. **包名唯一性** - 每个应用的包名在华为应用市场中是唯一的 - 同一个应用在不同应用市场的包名相同 2. **包名格式验证** ```python import re def is_valid_pkg_name(pkg_name: str) -> bool: """验证包名格式""" pattern = r'^[a-z][a-z0-9_]*(\.[a-z][a-z0-9_]*)+$' return bool(re.match(pattern, pkg_name)) # 示例 print(is_valid_pkg_name("com.huawei.hmsapp.appgallery")) # True print(is_valid_pkg_name("Com.Huawei.App")) # False (大写) print(is_valid_pkg_name("huawei.app")) # False (少于2段) ``` 3. **元服务识别** ```python def is_atomic_service(pkg_name: str) -> bool: """判断是否为元服务""" return pkg_name.startswith("com.atomicservice.") ``` 4. **获取频率限制** - 避免过于频繁的请求 - 建议添加延迟:每次请求间隔 0.5-1 秒 - 使用批量处理时注意并发数量 5. **数据更新策略** - 优先更新下载量高的应用 - 定期全量同步所有已知包名 - 新发现的包名及时入库 --- ## 7. 部署指南 ### 7.1 Docker 部署 #### 7.1.1 后端 Dockerfile ```dockerfile # backend/Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ default-libmysqlclient-dev \ pkg-config \ && rm -rf /var/lib/apt/lists/* # 安装 Playwright 依赖 RUN apt-get update && apt-get install -y \ libnss3 \ libnspr4 \ libatk1.0-0 \ libatk-bridge2.0-0 \ libcups2 \ libdrm2 \ libxkbcommon0 \ libxcomposite1 \ libxdamage1 \ libxfixes3 \ libxrandr2 \ libgbm1 \ libasound2 # 复制依赖文件 COPY requirements.txt . # 安装 Python 依赖 RUN pip install --no-cache-dir -r requirements.txt # 安装 Playwright 浏览器 RUN playwright install chromium # 复制应用代码 COPY . . # 暴露端口 EXPOSE 8000 # 启动命令 CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] ``` #### 7.1.2 前端 Dockerfile ```dockerfile # frontend/Dockerfile FROM node:18-alpine as builder WORKDIR /app # 复制依赖文件 COPY package*.json ./ # 安装依赖 RUN npm ci # 复制源代码 COPY . . # 构建 RUN npm run build # 生产环境 FROM nginx:alpine # 复制构建产物 COPY --from=builder /app/dist /usr/share/nginx/html # 复制 Nginx 配置 COPY nginx.conf /etc/nginx/conf.d/default.conf EXPOSE 80 CMD ["nginx", "-g", "daemon off;"] ``` #### 7.1.3 Nginx 配置 ```nginx # frontend/nginx.conf server { listen 80; server_name localhost; root /usr/share/nginx/html; index index.html; # Gzip 压缩 gzip on; gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript; # 前端路由 location / { try_files $uri $uri/ /index.html; } # API 代理 location /api { proxy_pass http://backend:8000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } # 静态资源缓存 location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ { expires 1y; add_header Cache-Control "public, immutable"; } } ``` #### 7.1.4 Docker Compose ```yaml # docker-compose.yml version: '3.8' services: mysql: image: mysql:8.0 container_name: huawei_market_mysql restart: always environment: MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD} MYSQL_DATABASE: ${MYSQL_DATABASE} MYSQL_USER: ${MYSQL_USER} MYSQL_PASSWORD: ${MYSQL_PASSWORD} ports: - "3306:3306" volumes: - mysql_data:/var/lib/mysql - ./backend/sql:/docker-entrypoint-initdb.d command: --default-authentication-plugin=mysql_native_password networks: - app_network backend: build: context: ./backend dockerfile: Dockerfile container_name: huawei_market_backend restart: always environment: MYSQL_HOST: mysql MYSQL_PORT: 3306 MYSQL_USER: ${MYSQL_USER} MYSQL_PASSWORD: ${MYSQL_PASSWORD} MYSQL_DATABASE: ${MYSQL_DATABASE} ports: - "8000:8000" depends_on: - mysql volumes: - ./backend:/app networks: - app_network frontend: build: context: ./frontend dockerfile: Dockerfile container_name: huawei_market_frontend restart: always ports: - "80:80" depends_on: - backend networks: - app_network volumes: mysql_data: networks: app_network: driver: bridge ``` #### 7.1.5 环境变量文件 ```env # .env MYSQL_ROOT_PASSWORD=root_password_here MYSQL_DATABASE=huawei_market MYSQL_USER=market_user MYSQL_PASSWORD=user_password_here ``` ### 7.2 部署步骤 #### 7.2.1 准备工作 ```bash # 1. 克隆项目 git clone cd huawei-market-crawler # 2. 创建环境变量文件 cp .env.example .env # 编辑 .env 文件,填入实际配置 # 3. 创建必要的目录 mkdir -p backend/logs mkdir -p mysql_data ``` #### 7.2.2 使用 Docker Compose 部署 ```bash # 构建并启动所有服务 docker-compose up -d --build # 查看服务状态 docker-compose ps # 查看日志 docker-compose logs -f backend # 停止服务 docker-compose down # 停止并删除数据卷 docker-compose down -v ``` #### 7.2.3 初始化数据库 ```bash # 进入 MySQL 容器 docker exec -it huawei_market_mysql mysql -u root -p # 执行初始化脚本 mysql> USE huawei_market; mysql> SOURCE /docker-entrypoint-initdb.d/init.sql; ``` #### 7.2.4 验证部署 ```bash # 检查后端健康状态 curl http://localhost:8000/health # 检查前端 curl http://localhost/ # 测试 API curl http://localhost:8000/api/market_info ``` ### 7.3 生产环境优化 #### 7.3.1 使用 Gunicorn 运行后端 ```bash # 安装 gunicorn pip install gunicorn # 启动命令 gunicorn app.main:app \ --workers 4 \ --worker-class uvicorn.workers.UvicornWorker \ --bind 0.0.0.0:8000 \ --access-logfile logs/access.log \ --error-logfile logs/error.log \ --log-level info ``` #### 7.3.2 MySQL 优化配置 ```ini # my.cnf [mysqld] # 基础配置 max_connections = 500 max_allowed_packet = 64M # InnoDB 配置 innodb_buffer_pool_size = 2G innodb_log_file_size = 256M innodb_flush_log_at_trx_commit = 2 innodb_flush_method = O_DIRECT # 查询缓存 query_cache_type = 1 query_cache_size = 128M # 慢查询日志 slow_query_log = 1 slow_query_log_file = /var/log/mysql/slow.log long_query_time = 2 ``` #### 7.3.3 Nginx 生产配置 ```nginx # /etc/nginx/sites-available/huawei-market server { listen 80; server_name your-domain.com; # 重定向到 HTTPS return 301 https://$server_name$request_uri; } server { listen 443 ssl http2; server_name your-domain.com; # SSL 证书 ssl_certificate /etc/nginx/ssl/cert.pem; ssl_certificate_key /etc/nginx/ssl/key.pem; # SSL 配置 ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers HIGH:!aNULL:!MD5; ssl_prefer_server_ciphers on; # 安全头 add_header X-Frame-Options "SAMEORIGIN" always; add_header X-Content-Type-Options "nosniff" always; add_header X-XSS-Protection "1; mode=block" always; # 日志 access_log /var/log/nginx/huawei-market-access.log; error_log /var/log/nginx/huawei-market-error.log; # 前端 location / { root /var/www/huawei-market/frontend; try_files $uri $uri/ /index.html; } # API location /api { proxy_pass http://127.0.0.1:8000; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection 'upgrade'; proxy_set_header Host $host; proxy_cache_bypass $http_upgrade; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # 超时设置 proxy_connect_timeout 60s; proxy_send_timeout 60s; proxy_read_timeout 60s; } } ``` ### 7.4 监控与维护 #### 7.4.1 日志管理 ```python # app/utils/logger.py import logging from logging.handlers import RotatingFileHandler import os def setup_logger(name: str, log_file: str, level=logging.INFO): """配置日志""" formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 确保日志目录存在 os.makedirs(os.path.dirname(log_file), exist_ok=True) # 文件处理器(自动轮转) file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, # 10MB backupCount=5 ) file_handler.setFormatter(formatter) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger = logging.getLogger(name) logger.setLevel(level) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger ``` #### 7.4.2 健康检查 ```python # app/api/health.py from fastapi import APIRouter, Depends from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import text from app.database import get_db router = APIRouter(tags=["健康检查"]) @router.get("/health") async def health_check(db: AsyncSession = Depends(get_db)): """健康检查""" try: # 检查数据库连接 await db.execute(text("SELECT 1")) return { "status": "healthy", "database": "connected", "timestamp": datetime.now().isoformat() } except Exception as e: return { "status": "unhealthy", "database": "disconnected", "error": str(e), "timestamp": datetime.now().isoformat() } ``` #### 7.4.3 性能监控 ```bash # 使用 Prometheus + Grafana 监控 # 1. 安装 prometheus-fastapi-instrumentator pip install prometheus-fastapi-instrumentator # 2. 在 main.py 中添加 from prometheus_fastapi_instrumentator import Instrumentator app = FastAPI() Instrumentator().instrument(app).expose(app) ``` ### 7.5 备份策略 ```bash #!/bin/bash # backup.sh - 数据库备份脚本 BACKUP_DIR="/backup/mysql" DATE=$(date +%Y%m%d_%H%M%S) MYSQL_USER="root" MYSQL_PASSWORD="your_password" DATABASE="huawei_market" # 创建备份目录 mkdir -p $BACKUP_DIR # 备份数据库 mysqldump -u$MYSQL_USER -p$MYSQL_PASSWORD \ --single-transaction \ --routines \ --triggers \ $DATABASE > $BACKUP_DIR/backup_$DATE.sql # 压缩备份文件 gzip $BACKUP_DIR/backup_$DATE.sql # 删除7天前的备份 find $BACKUP_DIR -name "backup_*.sql.gz" -mtime +7 -delete echo "备份完成: backup_$DATE.sql.gz" ``` --- ## 8. 开发建议与最佳实践 ### 8.1 代码规范 - **Python**: 遵循 PEP 8 规范,使用 Black 格式化 - **TypeScript**: 使用 ESLint + Prettier - **提交信息**: 遵循 Conventional Commits 规范 ### 8.2 测试策略 ```python # tests/test_crawler.py import pytest from app.crawler.huawei_api import HuaweiAPI @pytest.mark.asyncio async def test_get_app_info(): api = HuaweiAPI() data = await api.get_app_info(pkg_name="com.huawei.hmsapp.appgallery") assert data['pkgName'] == "com.huawei.hmsapp.appgallery" assert 'name' in data assert 'appId' in data await api.close() ``` ### 8.3 性能优化 1. **数据库查询优化** - 使用索引 - 避免 N+1 查询 - 使用连接池 2. **缓存策略** - Redis 缓存热门数据 - 前端使用 LocalStorage 3. **异步处理** - 使用异步 I/O - 批量处理数据 ### 8.4 安全建议 1. **API 安全** - 添加 API 限流 - 使用 JWT 认证(如需要) - 输入验证和清洗 2. **数据库安全** - 使用参数化查询 - 最小权限原则 - 定期备份 3. **爬虫礼仪** - 遵守 robots.txt - 控制请求频率 - 使用合理的 User-Agent --- ## 9. 常见问题 FAQ ### Q1: Token 获取失败怎么办? **A:** 1. 检查网络连接 2. 确认 Playwright 浏览器已安装 3. 尝试手动访问华为应用市场,检查是否需要验证码 4. 增加等待时间 ### Q2: 数据库连接超时? **A:** 1. 检查 MySQL 服务是否运行 2. 验证连接配置是否正确 3. 增加连接池大小 4. 检查防火墙设置 ### Q3: 爬取速度太慢? **A:** 1. 增加并发数量 2. 使用批量处理 3. 优化数据库写入 4. 考虑使用多台服务器分布式爬取 ### Q4: 如何处理反爬虫? **A:** 1. 降低请求频率 2. 使用代理IP池 3. 模拟真实浏览器行为 4. 定期更新 Token --- ## 10. 参考资源 - **FastAPI 文档**: https://fastapi.tiangolo.com/ - **Vue 3 文档**: https://vuejs.org/ - **SQLAlchemy 文档**: https://docs.sqlalchemy.org/ - **Playwright 文档**: https://playwright.dev/python/ - **MySQL 文档**: https://dev.mysql.com/doc/ --- ## 附录B:完整项目清单 ### 后端文件清单 ``` backend/ ├── app/ │ ├── __init__.py │ ├── main.py │ ├── config.py │ ├── database.py │ ├── models/ │ ├── schemas/ │ ├── api/ │ ├── crawler/ │ ├── scheduler/ │ └── utils/ ├── tests/ ├── logs/ ├── requirements.txt ├── .env ├── Dockerfile └── README.md ``` ### 前端文件清单 ``` frontend/ ├── public/ ├── src/ │ ├── assets/ │ ├── components/ │ ├── views/ │ ├── api/ │ ├── stores/ │ ├── types/ │ ├── utils/ │ ├── router/ │ ├── App.vue │ └── main.ts ├── package.json ├── vite.config.ts ├── tsconfig.json ├── Dockerfile ├── nginx.conf └── README.md ``` --- **文档版本**: v1.0 **最后更新**: 2024年 **维护者**: [Your Name] **许可证**: MIT --- ## 附录C:原项目中的包名获取策略 原 Rust 项目使用了多种创新的方法来发现和获取应用包名,这些方法非常值得借鉴。 ### C.1 核心策略概览 原项目提供了 **7 个独立工具** 用于获取包名和应用数据: | 工具名 | 用途 | 策略 | |--------|------|------| | `guess_market` | 应用ID猜测 | 遍历指定范围的应用ID | | `guess_rand` | 随机猜测 | 随机生成应用ID进行探测 | | `guess_from_db` | 数据库扩展 | 基于已有数据推测相邻ID | | `guess_large` | 大规模猜测 | 大范围ID扫描 | | `get_nextmax` | 第三方数据源 | 从 nextmax.cn 获取 | | `read_appgallery` | 应用市场爬取 | 直接爬取华为应用市场页面 | | `read_pkg_name` | 批量导入 | 从文件读取包名列表 | ### C.2 方法详解 #### C.2.1 应用ID猜测法 (guess_market) **原理:** 华为应用的 app_id 格式为固定前缀 + 数字,通过遍历数字范围来发现应用。 **app_id 格式:** ``` C576588020785 + 7位数字 例如: C5765880207856366961 ``` **核心代码逻辑:** ```rust // 定义扫描范围 let range = 2000000..=6390000; let start = "C576588020785"; // 批量处理(每批1000个) for bunch_id in range_vec.chunks(1000) { let mut join_set = tokio::task::JoinSet::new(); for id in bunch_id.iter() { let app_id = format!("{start}{id:07}"); // 格式化为7位数字 // 异步请求华为API join_set.spawn(async move { if let Ok(data) = query_app(&client, &api_url, &AppQuery::app_id(&app_id), &locale).await { // 保存到数据库 db.save_app_data(&data.0, data.1.as_ref(), None, Some(comment)).await } }); } join_set.join_all().await; tokio::time::sleep(Duration::from_millis(25)).await; // 批次间延迟 } ``` **Python 实现示例:** ```python import asyncio from typing import List async def guess_market_apps( start_prefix: str = "C576588020785", start_range: int = 2000000, end_range: int = 6390000, batch_size: int = 1000 ): """通过ID猜测发现应用""" api = HuaweiAPI() db = Database() for batch_start in range(start_range, end_range, batch_size): batch_end = min(batch_start + batch_size, end_range) tasks = [] for i in range(batch_start, batch_end): app_id = f"{start_prefix}{i:07d}" # 7位数字,不足补0 tasks.append(try_fetch_app(api, db, app_id)) # 并发执行 results = await asyncio.gather(*tasks, return_exceptions=True) # 统计结果 success_count = sum(1 for r in results if not isinstance(r, Exception)) print(f"批次 {batch_start}-{batch_end}: 成功 {success_count}/{len(tasks)}") # 批次间延迟 await asyncio.sleep(0.025) async def try_fetch_app(api: HuaweiAPI, db: Database, app_id: str): """尝试获取单个应用""" try: app_data = await api.get_app_info(app_id=app_id) rating_data = await api.get_app_rating(app_id) await db.save_app_data(app_data, rating_data, comment={ "user": "guess_market", "method": "id_guessing" }) print(f"✓ 发现应用: {app_data['name']} ({app_data['pkgName']})") return True except Exception as e: # 应用不存在或请求失败,静默跳过 return False ``` **已知的应用ID前缀:** ```python KNOWN_APP_ID_PREFIXES = [ "C576588020785", # 主要前缀 "C69175", # 另一个前缀系列 # 可以通过分析已有数据发现更多前缀 ] ``` #### C.2.2 随机猜测法 (guess_rand) **原理:** 在已知的ID范围内随机生成ID,提高发现效率。 **适用场景:** - ID空间很大,顺序遍历效率低 - 想要快速发现热门应用(通常ID较新) **核心逻辑:** ```rust let code_start = 59067092904725_u64; let size = 85170011059280_u64 - code_start; let start = "C69175"; loop { let mut ids: Vec = Vec::with_capacity(1000); for _ in 0..1000 { let id = code_start + (rng.next() % size); // 随机生成 ids.push(id); } // 批量处理这些随机ID // ... } ``` **Python 实现:** ```python import random async def guess_random_apps( prefix: str = "C69175", start: int = 59067092904725, end: int = 85170011059280, batch_size: int = 1000 ): """随机猜测应用ID""" api = HuaweiAPI() db = Database() while True: # 生成随机ID批次 random_ids = [ f"{prefix}{random.randint(start, end)}" for _ in range(batch_size) ] tasks = [try_fetch_app(api, db, app_id) for app_id in random_ids] results = await asyncio.gather(*tasks, return_exceptions=True) success_count = sum(1 for r in results if r is True) print(f"随机批次: 成功 {success_count}/{batch_size}") await asyncio.sleep(0.005) ``` #### C.2.3 数据库扩展法 (guess_from_db) **原理:** 基于已有的应用ID,推测其相邻的ID可能也是有效应用。 **策略:** 1. 从数据库获取所有已知的 app_id 2. 解析每个 app_id 的前缀和数字部分 3. 对每个数字,生成 ±1000 的范围 4. 合并重叠的范围 5. 扫描这些范围 **核心逻辑:** ```rust // 1. 获取所有已知app_id let existing_app_ids = db.get_all_app_ids().await?; // 2. 为每个app_id生成扩展范围 for app_id in existing_app_ids { if let Some((prefix, numeric_part)) = parse_app_id(&app_id) { let start_range = numeric_part.saturating_sub(1000); let end_range = numeric_part.saturating_add(1000); all_ranges.insert((prefix, start_range, end_range)); } } // 3. 合并重叠范围 // 例如: (100, 1100) 和 (500, 1500) 合并为 (100, 1500) // 4. 扫描合并后的范围 for (prefix, start, end) in merged_ranges { for id in start..=end { let app_id = format!("{}{}", prefix, id); // 尝试获取应用 } } ``` **Python 实现:** ```python from typing import Tuple, Optional import re def parse_app_id(app_id: str) -> Optional[Tuple[str, int]]: """解析app_id,返回(前缀, 数字)""" match = re.match(r'^([A-Z]+)(\d+)$', app_id) if match: return match.group(1), int(match.group(2)) return None async def guess_from_database(expand_range: int = 1000): """基于数据库已有数据扩展""" db = Database() # 1. 获取所有已知app_id existing_ids = await db.get_all_app_ids() # 2. 生成扩展范围 ranges = {} for app_id in existing_ids: parsed = parse_app_id(app_id) if not parsed: continue prefix, num = parsed start = max(0, num - expand_range) end = num + expand_range if prefix not in ranges: ranges[prefix] = [] ranges[prefix].append((start, end)) # 3. 合并重叠范围 merged_ranges = {} for prefix, range_list in ranges.items(): range_list.sort() merged = [] current = range_list[0] for r in range_list[1:]: if r[0] <= current[1] + 1: # 重叠或相邻,合并 current = (current[0], max(current[1], r[1])) else: merged.append(current) current = r merged.append(current) merged_ranges[prefix] = merged # 4. 扫描范围 api = HuaweiAPI() for prefix, range_list in merged_ranges.items(): for start, end in range_list: print(f"扫描范围: {prefix}{start} - {prefix}{end}") await guess_market_apps(prefix, start, end) ``` #### C.2.4 从文件批量导入 (read_pkg_name) **原理:** 从文本文件读取包名列表,批量获取应用数据。 **使用方式:** ```bash # 创建包名列表文件 cat > pkg_names.txt << EOF com.huawei.hmsapp.appgallery com.tencent.mm com.sina.weibo EOF # 运行工具 cargo run --bin read_pkg_name pkg_names.txt ``` **核心代码:** ```rust // 从命令行参数获取文件路径 let cli_file = std::env::args().nth(1).ok_or_else(|| anyhow::anyhow!("No file path provided"))?; // 读取文件中的包名 let pkg_names: Vec = { let file = std::fs::File::open(&cli_file)?; let mut reader = std::io::BufReader::new(file); let mut pkg_names = Vec::new(); let mut line = String::new(); while reader.read_line(&mut line)? > 0 { pkg_names.push(line.trim().to_string()); line.clear(); } pkg_names.into_iter() .map(|l| l.trim_matches('\"').to_string()) .collect() }; // 批量同步 sync::sync_all(&client, &db, &config).await?; ``` **Python 实现:** ```python async def read_pkg_names_from_file(filepath: str): """从文件读取包名并批量获取""" # 读取包名列表 with open(filepath, 'r', encoding='utf-8') as f: pkg_names = [ line.strip().strip('"').strip("'") for line in f if line.strip() ] print(f"从文件读取到 {len(pkg_names)} 个包名") # 批量获取 api = HuaweiAPI() db = Database() for i in range(0, len(pkg_names), 100): batch = pkg_names[i:i+100] tasks = [ fetch_and_save_app(api, db, pkg_name) for pkg_name in batch ] await asyncio.gather(*tasks, return_exceptions=True) print(f"已处理 {min(i+100, len(pkg_names))}/{len(pkg_names)}") async def fetch_and_save_app(api: HuaweiAPI, db: Database, pkg_name: str): """获取并保存单个应用""" try: app_data = await api.get_app_info(pkg_name=pkg_name) rating_data = await api.get_app_rating(app_data['appId']) await db.save_app_data(app_data, rating_data) print(f"✓ {pkg_name}") except Exception as e: print(f"✗ {pkg_name}: {e}") ``` #### C.2.5 Substance(主题/合集)批量获取 **原理:** 华为应用市场有"主题"或"合集"功能,一个 substance 包含多个应用。 **Substance ID 格式:** ``` 例如: webAgSubstanceDetail|12345 ``` **核心逻辑:** ```rust pub async fn get_app_from_substance( client: &reqwest::Client, api_url: &str, substance_id: impl ToString, ) -> Result<(SubstanceData, JsonValue)> { // 1. 请求 substance 详情 let body = serde_json::json!({ "pageId": format!("webAgSubstanceDetail|{}", substance_id.to_string()), "pageNum": 1, "pageSize": 100, "zone": "", "businessParam": { "animation": 0 } }); let response = client.post(format!("{api_url}/harmony/page-detail")) .json(&body) .send() .await?; let data = response.json::().await?; // 2. 解析卡片数据,提取应用ID let layouts = data["pages"][0]["data"]["cardlist"]["layoutData"].as_array()?; let mut apps = Vec::new(); for card in layouts { match card["type"].as_str()? { "com.huawei.hmsapp.appgallery.verticallistcard" => { // 竖向列表卡片 for app in card["data"].as_array()? { if let Some(app_id) = app.get("appId") { apps.push(AppQuery::app_id(app_id.as_str()?)); } } } "com.huawei.hmos.appgallery.scenariolistcard.landing" => { // 场景列表卡片 let refs_list = card["data"][0]["refsList_app"].as_array()?; for app in refs_list { if let Some(app_id) = app.get("appId") { apps.push(AppQuery::app_id(app_id.as_str()?)); } } } _ => {} } } // 3. 如果有更多页,继续获取 if data["hasMore"].as_i64()? != 0 { let more_apps = get_more_substance(client, api_url, card_id).await?; apps.extend(more_apps); } Ok((SubstanceData { id, title, apps }, data)) } ``` **Python 实现:** ```python async def get_apps_from_substance(substance_id: str) -> List[str]: """从主题/合集获取应用列表""" api = HuaweiAPI() url = f"{api.base_url}/harmony/page-detail" body = { "pageId": f"webAgSubstanceDetail|{substance_id}", "pageNum": 1, "pageSize": 100, "zone": "", "businessParam": {"animation": 0} } tokens = await api.token_manager.get_token() headers = { "Content-Type": "application/json", "Interface-Code": tokens["interface_code"], "identity-id": tokens["identity_id"] } response = await api.client.post(url, json=body, headers=headers) data = response.json() app_ids = [] layouts = data["pages"][0]["data"]["cardlist"]["layoutData"] for card in layouts: card_type = card.get("type", "") card_data = card.get("data", []) if card_type == "com.huawei.hmsapp.appgallery.verticallistcard": for app in card_data: if "appId" in app: app_ids.append(app["appId"]) elif card_type == "com.huawei.hmos.appgallery.scenariolistcard.landing": if card_data and "refsList_app" in card_data[0]: for app in card_data[0]["refsList_app"]: if "appId" in app: app_ids.append(app["appId"]) # 处理分页 if data.get("hasMore", 0) != 0: card_id = data["cardlist"]["dataId"] more_apps = await get_more_substance_pages(api, card_id) app_ids.extend(more_apps) return app_ids async def get_more_substance_pages(api: HuaweiAPI, card_id: str) -> List[str]: """获取主题的更多页""" app_ids = [] page_num = 2 has_more = True while has_more: url = f"{api.base_url}/harmony/card-list" body = { "dataId": card_id, "locale": "zh", "pageNum": page_num, "pageSize": 25 } response = await api.client.post(url, json=body) data = response.json() has_more = data.get("hasMore", 0) != 0 page_num += 1 for card in data.get("layoutData", []): if card.get("type") == "com.huawei.hmsapp.appgallery.verticallistcard": for app in card.get("data", []): if "appId" in app: app_ids.append(app["appId"]) return app_ids ``` ### C.3 综合策略建议 **初始阶段(冷启动):** 1. 使用 `guess_market` 扫描已知的ID范围 2. 从华为应用市场首页爬取热门应用 3. 手动收集一些知名应用的包名 **扩展阶段:** 1. 使用 `guess_from_db` 基于已有数据扩展 2. 使用 `guess_rand` 随机发现新应用 3. 定期从 substance(主题合集)批量获取 **维护阶段:** 1. 定期同步已知包名的数据更新 2. 监控新应用ID的出现模式 3. 从用户投稿获取新包名 **效率优化:** ```python # 组合策略示例 async def comprehensive_discovery(): """综合发现策略""" # 1. 先从数据库扩展(成功率高) await guess_from_database(expand_range=500) # 2. 扫描热门ID段 await guess_market_apps("C576588020785", 6000000, 6400000) # 3. 随机探测(发现新应用) asyncio.create_task(guess_random_apps()) # 后台运行 # 4. 定期同步已知应用 await sync_known_apps() ``` ### C.4 注意事项 1. **请求频率控制** - 批次间延迟:25-50ms - 单个请求超时:30秒 - 并发数:建议不超过1000 2. **错误处理** - 应用不存在:静默跳过 - 网络错误:重试3次 - Token过期:自动刷新 3. **数据去重** - 使用 app_id 或 pkg_name 作为唯一标识 - 插入前检查数据库是否已存在 4. **性能监控** - 记录成功率(发现率) - 监控请求耗时 - 统计每小时发现的新应用数 这些方法的组合使用,使得原项目能够高效地发现和收集华为应用市场的应用数据。