Files
ns2.0/华为应用市场爬虫系统开发文档.md

3448 lines
97 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 华为应用市场爬虫系统开发文档
> 基于原 Rust 项目的 Python + MySQL + Vue3 重构指南
## 📋 目录
- [1. 项目概述](#1-项目概述)
- [2. 系统架构](#2-系统架构)
- [3. 数据源分析](#3-数据源分析)
- [4. 数据库设计](#4-数据库设计)
- [5. 后端开发](#5-后端开发)
- [6. 前端开发](#6-前端开发)
- [7. 部署指南](#7-部署指南)
---
## 1. 项目概述
### 1.1 项目目标
开发一个华为应用市场AppGallery数据采集与可视化系统实现
- 自动爬取华为应用市场的应用信息
- 存储应用的基本信息、版本历史、下载量、评分等数据
- 提供 Web 界面展示数据统计、排行榜、趋势分析
- 支持用户搜索、筛选、投稿应用
### 1.2 技术栈选型
**后端:**
- Python 3.10+
- FastAPI (Web 框架)
- SQLAlchemy (ORM)
- MySQL 8.0+
- APScheduler (定时任务)
- httpx / aiohttp (异步 HTTP 客户端)
**前端:**
- Vue 3 + TypeScript
- Vite (构建工具)
- Element Plus / Ant Design Vue (UI 组件库)
- ECharts / Chart.js (图表库)
- Axios (HTTP 客户端)
- Pinia (状态管理)
**部署:**
- Docker + Docker Compose
- Nginx (反向代理)
- Gunicorn / Uvicorn (ASGI 服务器)
---
## 2. 系统架构
### 2.1 整体架构图
```
┌─────────────────────────────────────────────────────────────┐
│ 用户浏览器 │
└────────────────────────┬────────────────────────────────────┘
│ HTTP/HTTPS
┌─────────────────────────────────────────────────────────────┐
│ Nginx (反向代理) │
└──────────┬──────────────────────────────────┬───────────────┘
│ │
│ /api/* │ /*
▼ ▼
┌──────────────────────┐ ┌──────────────────────────┐
│ FastAPI 后端服务 │ │ Vue3 前端静态资源 │
│ - REST API │ │ - SPA 应用 │
│ - 数据查询 │ │ - 数据可视化 │
│ - 爬虫调度 │ └──────────────────────────┘
└──────────┬───────────┘
┌──────────────────────┐ ┌──────────────────────────┐
│ MySQL 数据库 │◄─────────│ 爬虫调度器 │
│ - 应用信息 │ │ - APScheduler │
│ - 历史数据 │ │ - 定时同步 │
│ - 统计数据 │ │ - 批量处理 │
└──────────────────────┘ └──────────┬───────────────┘
┌──────────────────────────┐
│ 华为应用市场 API │
│ - 应用信息接口 │
│ - 评分详情接口 │
└──────────────────────────┘
```
### 2.2 核心模块
1. **爬虫模块** - 负责从华为 API 获取数据
2. **数据处理模块** - 数据清洗、去重、入库
3. **API 服务模块** - 提供 RESTful API
4. **调度模块** - 定时任务和批量处理
5. **前端展示模块** - 数据可视化和交互
---
## 3. 数据源分析
### 3.1 华为应用市场 API
**基础信息:**
- API Base URL: `https://web-drcn.hispace.dbankcloud.com/edge`
- 需要动态获取认证 Tokeninterface-code 和 identity-id
- Token 有效期约 10 分钟,需定期刷新
### 3.2 主要接口
#### 3.2.1 获取应用基本信息
**接口地址:** `POST /webedge/appinfo`
**请求头:**
```http
Content-Type: application/json
User-Agent: HuaweiMarketCrawler/1.0
interface-code: {动态获取的token}
identity-id: {动态获取的token}
```
**请求体(按包名查询):**
```json
{
"pkgName": "com.huawei.hmsapp.appgallery",
"locale": "zh_CN"
}
```
**请求体按应用ID查询**
```json
{
"appId": "C1164531384803416384",
"locale": "zh_CN"
}
```
**响应示例:**
```json
{
"appId": "C1164531384803416384",
"name": "应用市场",
"pkgName": "com.huawei.hmsapp.appgallery",
"devId": "260086000000068459",
"developerName": "华为软件技术有限公司",
"devEnName": "Huawei Software Technologies Co., Ltd.",
"kindName": "工具",
"version": "6.3.2.302",
"size": 76591487,
"downCount": "14443706",
"rateNum": "125000",
"hot": "4.5",
"icon": "https://...",
"briefDes": "应用市场,点亮精彩生活",
"description": "...",
"releaseDate": 1234567890000,
"targetSdk": "12",
"minsdk": "9",
...
}
```
#### 3.2.2 获取应用评分详情
**接口地址:** `POST /harmony/page-detail`
**请求体:**
```json
{
"pageId": "webAgAppDetail|C1164531384803416384",
"pageNum": 1,
"pageSize": 100,
"zone": ""
}
```
**响应示例:**
```json
{
"pages": [{
"data": {
"cardlist": {
"layoutData": [{
"type": "fl.card.comment",
"data": [{
"starInfo": "{\"averageRating\":\"4.5\",\"oneStarRatingCount\":100,\"twoStarRatingCount\":200,...}"
}]
}]
}
}
}]
}
```
### 3.3 Token 获取策略
Token 需要从华为网页端动态获取,建议实现方式:
1. **方案一:** 使用 Selenium/Playwright 模拟浏览器访问获取
2. **方案二:** 逆向分析 JS 代码,实现 Token 生成算法
3. **方案三:** 定期手动更新 Token不推荐
**参考实现(伪代码):**
```python
import httpx
from playwright.async_api import async_playwright
async def get_huawei_token():
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
# 拦截网络请求获取 token
tokens = {}
async def handle_request(request):
if 'interface-code' in request.headers:
tokens['interface_code'] = request.headers['interface-code']
tokens['identity_id'] = request.headers['identity-id']
page.on('request', handle_request)
await page.goto('https://appgallery.huawei.com/')
await page.wait_for_timeout(3000)
await browser.close()
return tokens
```
### 3.4 数据字段说明
**核心字段:**
- `appId` - 应用唯一标识(长度>15为鸿蒙应用
- `pkgName` - 包名(唯一)
- `name` - 应用名称
- `developerName` - 开发者名称
- `downCount` - 下载量(字符串格式,如 "1000000+"
- `rateNum` - 评分人数
- `hot` - 热度评分
- `version` - 版本号
- `size` - 应用大小(字节)
- `releaseDate` - 发布时间(毫秒时间戳)
- `targetSdk` / `minsdk` - SDK 版本
**注意事项:**
1. 部分字段可能为空,需要设置默认值
2. 下载量可能包含 "+" 号,需要清洗
3. 某些应用(元服务)包名以 `com.atomicservice` 开头,无评分数据
4. JSON 中可能包含 `\0` 字符,需要清理
---
## 4. 数据库设计
### 4.1 MySQL 表结构
#### 4.1.1 应用基本信息表 (app_info)
```sql
CREATE TABLE `app_info` (
`app_id` VARCHAR(50) PRIMARY KEY COMMENT '应用唯一ID',
`alliance_app_id` VARCHAR(50) COMMENT '联盟应用ID',
`name` VARCHAR(255) NOT NULL COMMENT '应用名称',
`pkg_name` VARCHAR(255) NOT NULL UNIQUE COMMENT '应用包名',
`dev_id` VARCHAR(50) NOT NULL COMMENT '开发者ID',
`developer_name` VARCHAR(255) NOT NULL COMMENT '开发者名称',
`dev_en_name` VARCHAR(255) COMMENT '开发者英文名称',
`supplier` VARCHAR(255) COMMENT '供应商名称',
`kind_id` INT NOT NULL COMMENT '应用分类ID',
`kind_name` VARCHAR(100) NOT NULL COMMENT '应用分类名称',
`tag_name` VARCHAR(255) COMMENT '标签名称',
`kind_type_id` INT NOT NULL COMMENT '类型ID',
`kind_type_name` VARCHAR(100) NOT NULL COMMENT '类型名称',
`icon_url` TEXT NOT NULL COMMENT '应用图标URL',
`brief_desc` TEXT NOT NULL COMMENT '简短描述',
`description` LONGTEXT NOT NULL COMMENT '应用详细描述',
`privacy_url` TEXT NOT NULL COMMENT '隐私政策链接',
`ctype` INT NOT NULL COMMENT '客户端类型',
`detail_id` VARCHAR(100) NOT NULL COMMENT '详情页ID',
`app_level` INT NOT NULL COMMENT '应用等级',
`jocat_id` INT NOT NULL COMMENT '分类ID',
`iap` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否含应用内购买',
`hms` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否依赖HMS',
`tariff_type` VARCHAR(50) NOT NULL COMMENT '资费类型',
`packing_type` INT NOT NULL COMMENT '打包类型',
`order_app` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否预装应用',
`denpend_gms` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否依赖GMS',
`denpend_hms` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否依赖HMS',
`force_update` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否强制更新',
`img_tag` VARCHAR(50) NOT NULL COMMENT '图片标签',
`is_pay` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否付费',
`is_disciplined` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否合规',
`is_shelves` TINYINT(1) NOT NULL DEFAULT 1 COMMENT '是否上架',
`submit_type` INT NOT NULL DEFAULT 0 COMMENT '提交类型',
`delete_archive` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否删除归档',
`charging` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否收费',
`button_grey` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '按钮是否置灰',
`app_gift` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否有礼包',
`free_days` INT NOT NULL DEFAULT 0 COMMENT '免费天数',
`pay_install_type` INT NOT NULL DEFAULT 0 COMMENT '付费安装类型',
`comment` JSON COMMENT '评论或注释数据',
`listed_at` DATETIME NOT NULL COMMENT '应用上架时间',
`release_countries` JSON COMMENT '应用发布的国家/地区列表',
`main_device_codes` JSON COMMENT '应用支持的主要设备类型',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX `idx_pkg_name` (`pkg_name`),
INDEX `idx_developer_name` (`developer_name`),
INDEX `idx_kind_name` (`kind_name`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='应用基本信息表';
```
#### 4.1.2 应用指标表 (app_metrics)
```sql
CREATE TABLE `app_metrics` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
`app_id` VARCHAR(50) NOT NULL COMMENT '应用ID',
`pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名',
`version` VARCHAR(50) NOT NULL COMMENT '版本号',
`version_code` BIGINT NOT NULL COMMENT '版本代码',
`size_bytes` BIGINT NOT NULL COMMENT '应用大小(字节)',
`sha256` VARCHAR(64) NOT NULL COMMENT '安装包SHA256校验值',
`info_score` DECIMAL(3,1) NOT NULL COMMENT '信息评分',
`info_rate_count` BIGINT NOT NULL COMMENT '信息评分人数',
`download_count` BIGINT NOT NULL COMMENT '下载次数',
`price` DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '价格',
`release_date` BIGINT NOT NULL COMMENT '发布时间(时间戳毫秒)',
`new_features` TEXT COMMENT '新功能描述',
`upgrade_msg` TEXT COMMENT '升级信息',
`target_sdk` VARCHAR(20) NOT NULL COMMENT '目标SDK版本',
`min_sdk` VARCHAR(20) NOT NULL COMMENT '最小SDK版本',
`compile_sdk_version` INT DEFAULT 0 COMMENT '编译SDK版本',
`min_hmos_api_level` INT DEFAULT 0 COMMENT '最小HarmonyOS API等级',
`api_release_type` VARCHAR(50) DEFAULT 'Release' COMMENT 'API发布类型',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE,
FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE,
INDEX `idx_app_id` (`app_id`),
INDEX `idx_pkg_name` (`pkg_name`),
INDEX `idx_download_count` (`download_count`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='应用指标表';
```
#### 4.1.3 应用评分表 (app_rating)
```sql
CREATE TABLE `app_rating` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
`app_id` VARCHAR(50) NOT NULL COMMENT '应用ID',
`pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名',
`average_rating` DECIMAL(3,2) NOT NULL COMMENT '平均评分',
`star_1_count` INT NOT NULL DEFAULT 0 COMMENT '1星评分数量',
`star_2_count` INT NOT NULL DEFAULT 0 COMMENT '2星评分数量',
`star_3_count` INT NOT NULL DEFAULT 0 COMMENT '3星评分数量',
`star_4_count` INT NOT NULL DEFAULT 0 COMMENT '4星评分数量',
`star_5_count` INT NOT NULL DEFAULT 0 COMMENT '5星评分数量',
`total_rating_count` INT NOT NULL DEFAULT 0 COMMENT '总评分数量',
`only_star_count` INT NOT NULL DEFAULT 0 COMMENT '仅星级数量',
`full_average_rating` VARCHAR(20) COMMENT '完整平均评分',
`source_type` VARCHAR(50) COMMENT '来源类型',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE,
FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE,
INDEX `idx_app_id` (`app_id`),
INDEX `idx_pkg_name` (`pkg_name`),
INDEX `idx_average_rating` (`average_rating`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='应用评分表';
```
#### 4.1.4 原始数据历史表 (app_data_history)
```sql
CREATE TABLE `app_data_history` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
`app_id` VARCHAR(50) NOT NULL COMMENT '应用ID',
`pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名',
`raw_json_data` JSON NOT NULL COMMENT '原始应用数据JSON',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE,
FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE,
INDEX `idx_app_id` (`app_id`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='原始数据历史表';
```
#### 4.1.5 评分历史表 (app_rating_history)
```sql
CREATE TABLE `app_rating_history` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
`app_id` VARCHAR(50) NOT NULL COMMENT '应用ID',
`pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名',
`raw_json_rating` JSON NOT NULL COMMENT '原始评分数据JSON',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE,
FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE,
INDEX `idx_app_id` (`app_id`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='评分历史表';
```
### 4.2 索引优化建议
1. **联合索引:**
- `(pkg_name, created_at)` - 用于按包名查询历史
- `(developer_name, download_count)` - 用于开发者排行
- `(kind_name, download_count)` - 用于分类排行
2. **全文索引:**
- `name`, `brief_desc` - 用于应用搜索
3. **分区策略:**
- 历史表按月分区,提高查询效率
---
## 5. 后端开发
### 5.1 项目结构
```
backend/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 应用入口
│ ├── config.py # 配置文件
│ ├── database.py # 数据库连接
│ ├── models/ # SQLAlchemy 模型
│ │ ├── __init__.py
│ │ ├── app_info.py
│ │ ├── app_metrics.py
│ │ └── app_rating.py
│ ├── schemas/ # Pydantic 模型
│ │ ├── __init__.py
│ │ ├── app.py
│ │ └── response.py
│ ├── api/ # API 路由
│ │ ├── __init__.py
│ │ ├── apps.py
│ │ ├── rankings.py
│ │ ├── charts.py
│ │ └── submit.py
│ ├── crawler/ # 爬虫模块
│ │ ├── __init__.py
│ │ ├── huawei_api.py # 华为API封装
│ │ ├── token_manager.py # Token管理
│ │ └── data_processor.py # 数据处理
│ ├── scheduler/ # 调度模块
│ │ ├── __init__.py
│ │ └── tasks.py
│ └── utils/ # 工具函数
│ ├── __init__.py
│ └── helpers.py
├── requirements.txt
├── .env.example
└── README.md
```
### 5.2 核心代码实现
#### 5.2.1 配置文件 (config.py)
```python
from pydantic_settings import BaseSettings
from typing import List
class Settings(BaseSettings):
# 数据库配置
MYSQL_HOST: str = "localhost"
MYSQL_PORT: int = 3306
MYSQL_USER: str = "root"
MYSQL_PASSWORD: str = "password"
MYSQL_DATABASE: str = "huawei_market"
# 华为API配置
HUAWEI_API_BASE_URL: str = "https://web-drcn.hispace.dbankcloud.com/edge"
HUAWEI_LOCALE: str = "zh_CN"
# 爬虫配置
CRAWLER_INTERVAL: int = 1800 # 同步间隔(秒)
CRAWLER_BATCH_SIZE: int = 100 # 批量处理大小
CRAWLER_TIMEOUT: int = 30 # 请求超时(秒)
# API配置
API_PREFIX: str = "/api"
API_TITLE: str = "华为应用市场数据API"
API_VERSION: str = "1.0.0"
# 其他配置
DEBUG: bool = False
CORS_ORIGINS: List[str] = ["http://localhost:5173", "http://localhost:3000"]
@property
def database_url(self) -> str:
return f"mysql+aiomysql://{self.MYSQL_USER}:{self.MYSQL_PASSWORD}@{self.MYSQL_HOST}:{self.MYSQL_PORT}/{self.MYSQL_DATABASE}"
class Config:
env_file = ".env"
settings = Settings()
```
#### 5.2.2 数据库连接 (database.py)
```python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.config import settings
# 创建异步引擎
engine = create_async_engine(
settings.database_url,
echo=settings.DEBUG,
pool_size=10,
max_overflow=20,
pool_pre_ping=True
)
# 创建异步会话工厂
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
# 创建基类
Base = declarative_base()
# 依赖注入
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
```
#### 5.2.3 数据模型 (models/app_info.py)
```python
from sqlalchemy import Column, String, Integer, Text, DateTime, Boolean, JSON, DECIMAL, BigInteger
from sqlalchemy.sql import func
from app.database import Base
class AppInfo(Base):
__tablename__ = "app_info"
app_id = Column(String(50), primary_key=True, comment="应用唯一ID")
alliance_app_id = Column(String(50), comment="联盟应用ID")
name = Column(String(255), nullable=False, comment="应用名称")
pkg_name = Column(String(255), nullable=False, unique=True, index=True, comment="应用包名")
dev_id = Column(String(50), nullable=False, comment="开发者ID")
developer_name = Column(String(255), nullable=False, index=True, comment="开发者名称")
dev_en_name = Column(String(255), comment="开发者英文名称")
supplier = Column(String(255), comment="供应商名称")
kind_id = Column(Integer, nullable=False, comment="应用分类ID")
kind_name = Column(String(100), nullable=False, index=True, comment="应用分类名称")
tag_name = Column(String(255), comment="标签名称")
kind_type_id = Column(Integer, nullable=False, comment="类型ID")
kind_type_name = Column(String(100), nullable=False, comment="类型名称")
icon_url = Column(Text, nullable=False, comment="应用图标URL")
brief_desc = Column(Text, nullable=False, comment="简短描述")
description = Column(Text, nullable=False, comment="应用详细描述")
privacy_url = Column(Text, nullable=False, comment="隐私政策链接")
# 布尔字段
iap = Column(Boolean, default=False, comment="是否含应用内购买")
hms = Column(Boolean, default=False, comment="是否依赖HMS")
is_pay = Column(Boolean, default=False, comment="是否付费")
is_shelves = Column(Boolean, default=True, comment="是否上架")
# JSON字段
comment = Column(JSON, comment="评论或注释数据")
release_countries = Column(JSON, comment="应用发布的国家/地区列表")
main_device_codes = Column(JSON, comment="应用支持的主要设备类型")
# 时间字段
listed_at = Column(DateTime, nullable=False, comment="应用上架时间")
created_at = Column(DateTime, nullable=False, server_default=func.now(), comment="创建时间")
updated_at = Column(DateTime, nullable=False, server_default=func.now(), onupdate=func.now(), comment="更新时间")
```
#### 5.2.4 华为API封装 (crawler/huawei_api.py)
```python
import httpx
import asyncio
import json
from typing import Optional, Dict, Any
from app.config import settings
from app.crawler.token_manager import TokenManager
class HuaweiAPI:
def __init__(self):
self.base_url = settings.HUAWEI_API_BASE_URL
self.locale = settings.HUAWEI_LOCALE
self.token_manager = TokenManager()
self.client = httpx.AsyncClient(timeout=settings.CRAWLER_TIMEOUT)
async def get_app_info(self, pkg_name: Optional[str] = None, app_id: Optional[str] = None) -> Dict[str, Any]:
"""获取应用基本信息"""
if not pkg_name and not app_id:
raise ValueError("必须提供 pkg_name 或 app_id")
# 获取token
tokens = await self.token_manager.get_token()
# 构建请求
url = f"{self.base_url}/webedge/appinfo"
headers = {
"Content-Type": "application/json",
"User-Agent": "HuaweiMarketCrawler/1.0",
"interface-code": tokens["interface_code"],
"identity-id": tokens["identity_id"]
}
body = {"locale": self.locale}
if pkg_name:
body["pkgName"] = pkg_name
else:
body["appId"] = app_id
# 发送请求
response = await self.client.post(url, headers=headers, json=body)
response.raise_for_status()
data = response.json()
# 数据清洗
return self._clean_data(data)
async def get_app_rating(self, app_id: str) -> Optional[Dict[str, Any]]:
"""获取应用评分详情"""
# 跳过元服务
if app_id.startswith("com.atomicservice"):
return None
tokens = await self.token_manager.get_token()
url = f"{self.base_url}/harmony/page-detail"
headers = {
"Content-Type": "application/json",
"User-Agent": "HuaweiMarketCrawler/1.0",
"Interface-Code": tokens["interface_code"],
"identity-id": tokens["identity_id"]
}
body = {
"pageId": f"webAgAppDetail|{app_id}",
"pageNum": 1,
"pageSize": 100,
"zone": ""
}
try:
response = await self.client.post(url, headers=headers, json=body)
response.raise_for_status()
data = response.json()
# 解析评分数据
layouts = data["pages"][0]["data"]["cardlist"]["layoutData"]
comment_cards = [l for l in layouts if l.get("type") == "fl.card.comment"]
if not comment_cards:
return None
star_info_str = comment_cards[0]["data"][0]["starInfo"]
return json.loads(star_info_str)
except Exception as e:
print(f"获取评分失败: {e}")
return None
def _clean_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""清洗数据"""
# 移除 \0 字符
for key, value in data.items():
if isinstance(value, str):
data[key] = value.replace('\x00', '')
# 移除 AG-TraceId
data.pop('AG-TraceId', None)
# 验证 appId 长度
if len(data.get('appId', '')) < 15:
raise ValueError("appId长度小于15可能是安卓应用")
return data
async def close(self):
"""关闭客户端"""
await self.client.aclose()
```
#### 5.2.5 Token管理器 (crawler/token_manager.py)
```python
import asyncio
from datetime import datetime, timedelta
from typing import Dict
from playwright.async_api import async_playwright
class TokenManager:
def __init__(self):
self.tokens: Dict[str, str] = {}
self.token_expires_at: datetime = datetime.now()
self.lock = asyncio.Lock()
async def get_token(self) -> Dict[str, str]:
"""获取有效的token"""
async with self.lock:
if datetime.now() >= self.token_expires_at or not self.tokens:
await self._refresh_token()
return self.tokens
async def _refresh_token(self):
"""刷新token"""
print("正在刷新token...")
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# 拦截请求获取token
tokens = {}
async def handle_request(request):
headers = request.headers
if 'interface-code' in headers:
tokens['interface_code'] = headers['interface-code']
tokens['identity_id'] = headers['identity-id']
page.on('request', handle_request)
# 访问华为应用市场
await page.goto('https://appgallery.huawei.com/', wait_until='networkidle')
await page.wait_for_timeout(3000)
await browser.close()
if tokens:
self.tokens = tokens
# token有效期设为10分钟
self.token_expires_at = datetime.now() + timedelta(minutes=10)
print(f"Token刷新成功有效期至: {self.token_expires_at}")
else:
raise Exception("无法获取token")
```
#### 5.2.6 数据处理器 (crawler/data_processor.py)
```python
from typing import Dict, Any, Optional, Tuple
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.app_info import AppInfo
from app.models.app_metrics import AppMetrics
from app.models.app_rating import AppRating
from app.models.app_data_history import AppDataHistory
from app.models.app_rating_history import AppRatingHistory
class DataProcessor:
def __init__(self, db: AsyncSession):
self.db = db
async def save_app_data(
self,
app_data: Dict[str, Any],
rating_data: Optional[Dict[str, Any]] = None,
comment: Optional[Dict[str, Any]] = None
) -> Tuple[bool, bool, bool]:
"""
保存应用数据
返回: (是否插入新应用信息, 是否插入新指标, 是否插入新评分)
"""
app_id = app_data['appId']
pkg_name = app_data['pkgName']
# 检查应用是否存在
result = await self.db.execute(
select(AppInfo).where(AppInfo.app_id == app_id)
)
existing_app = result.scalar_one_or_none()
# 保存应用基本信息
info_inserted = False
if not existing_app or await self._is_info_changed(existing_app, app_data):
await self._save_app_info(app_data, comment)
info_inserted = True
# 保存应用指标
metric_inserted = False
if await self._should_save_metric(app_id, app_data):
await self._save_app_metric(app_data)
metric_inserted = True
# 保存评分数据
rating_inserted = False
if rating_data and await self._should_save_rating(app_id, rating_data):
await self._save_app_rating(app_id, pkg_name, rating_data)
rating_inserted = True
# 保存原始数据历史
if info_inserted or metric_inserted:
await self._save_data_history(app_id, pkg_name, app_data)
if rating_inserted:
await self._save_rating_history(app_id, pkg_name, rating_data)
await self.db.commit()
return info_inserted, metric_inserted, rating_inserted
async def _save_app_info(self, data: Dict[str, Any], comment: Optional[Dict] = None):
"""保存应用基本信息"""
app_info = AppInfo(
app_id=data['appId'],
alliance_app_id=data.get('allianceAppId', ''),
name=data['name'],
pkg_name=data['pkgName'],
dev_id=data['devId'],
developer_name=data['developerName'],
dev_en_name=data.get('devEnName', ''),
supplier=data.get('supplier', ''),
kind_id=int(data['kindId']),
kind_name=data['kindName'],
tag_name=data.get('tagName'),
kind_type_id=int(data['kindTypeId']),
kind_type_name=data['kindTypeName'],
icon_url=data['icon'],
brief_desc=data['briefDes'],
description=data['description'],
privacy_url=data['privacyUrl'],
iap=bool(data.get('iap', 0)),
hms=bool(data.get('hms', 0)),
is_pay=data.get('isPay') == '1',
is_shelves=bool(data.get('isShelves', 1)),
comment=comment,
release_countries=data.get('releaseCountries', []),
main_device_codes=data.get('mainDeviceCodes', []),
listed_at=datetime.fromtimestamp(data.get('releaseDate', 0) / 1000)
)
# 使用 merge 实现 upsert
self.db.add(app_info)
async def _save_app_metric(self, data: Dict[str, Any]):
"""保存应用指标"""
# 清洗下载量数据
download_count = self._parse_download_count(data.get('downCount', '0'))
metric = AppMetrics(
app_id=data['appId'],
pkg_name=data['pkgName'],
version=data['version'],
version_code=int(data['versionCode']),
size_bytes=int(data['size']),
sha256=data.get('sha256', ''),
info_score=float(data.get('hot', '0.0')),
info_rate_count=int(data.get('rateNum', '0')),
download_count=download_count,
price=float(data.get('price', '0')),
release_date=int(data.get('releaseDate', 0)),
new_features=data.get('newFeatures', ''),
upgrade_msg=data.get('upgradeMsg', ''),
target_sdk=data.get('targetSdk', ''),
min_sdk=data.get('minsdk', ''),
compile_sdk_version=int(data.get('compileSdkVersion', 0)),
min_hmos_api_level=int(data.get('minHmosApiLevel', 0)),
api_release_type=data.get('apiReleaseType', 'Release')
)
self.db.add(metric)
async def _save_app_rating(self, app_id: str, pkg_name: str, data: Dict[str, Any]):
"""保存应用评分"""
rating = AppRating(
app_id=app_id,
pkg_name=pkg_name,
average_rating=float(data['averageRating']),
star_1_count=int(data['oneStarRatingCount']),
star_2_count=int(data['twoStarRatingCount']),
star_3_count=int(data['threeStarRatingCount']),
star_4_count=int(data['fourStarRatingCount']),
star_5_count=int(data['fiveStarRatingCount']),
total_rating_count=int(data['totalStarRatingCount']),
only_star_count=int(data.get('onlyStarCount', 0)),
full_average_rating=data.get('fullAverageRating', ''),
source_type=data.get('sourceType', '')
)
self.db.add(rating)
def _parse_download_count(self, count_str: str) -> int:
"""解析下载量字符串"""
# 移除 + 号和其他非数字字符
count_str = count_str.replace('+', '').replace(',', '')
try:
return int(count_str)
except ValueError:
return 0
async def _is_info_changed(self, existing: AppInfo, new_data: Dict) -> bool:
"""检查应用信息是否变化"""
return (
existing.name != new_data['name'] or
existing.version != new_data.get('version', '') or
existing.description != new_data.get('description', '')
)
async def _should_save_metric(self, app_id: str, data: Dict) -> bool:
"""判断是否需要保存新的指标数据"""
# 查询最新的指标
result = await self.db.execute(
select(AppMetrics)
.where(AppMetrics.app_id == app_id)
.order_by(AppMetrics.created_at.desc())
.limit(1)
)
latest_metric = result.scalar_one_or_none()
if not latest_metric:
return True
# 比较关键字段
return (
latest_metric.version != data['version'] or
latest_metric.download_count != self._parse_download_count(data.get('downCount', '0'))
)
async def _should_save_rating(self, app_id: str, data: Dict) -> bool:
"""判断是否需要保存新的评分数据"""
result = await self.db.execute(
select(AppRating)
.where(AppRating.app_id == app_id)
.order_by(AppRating.created_at.desc())
.limit(1)
)
latest_rating = result.scalar_one_or_none()
if not latest_rating:
return True
return (
float(latest_rating.average_rating) != float(data['averageRating']) or
latest_rating.total_rating_count != int(data['totalStarRatingCount'])
)
```
#### 5.2.7 API路由 (api/apps.py)
```python
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, or_
from typing import Optional, List
from app.database import get_db
from app.models.app_info import AppInfo
from app.models.app_metrics import AppMetrics
from app.models.app_rating import AppRating
from app.schemas.response import ApiResponse
from app.crawler.huawei_api import HuaweiAPI
from app.crawler.data_processor import DataProcessor
router = APIRouter(prefix="/apps", tags=["应用"])
@router.get("/pkg_name/{pkg_name}")
async def get_app_by_pkg_name(
pkg_name: str,
db: AsyncSession = Depends(get_db)
):
"""按包名查询应用"""
# 尝试从API获取最新数据
api = HuaweiAPI()
try:
app_data = await api.get_app_info(pkg_name=pkg_name)
rating_data = await api.get_app_rating(app_data['appId'])
# 保存到数据库
processor = DataProcessor(db)
new_info, new_metric, new_rating = await processor.save_app_data(
app_data, rating_data
)
# 查询完整数据
result = await db.execute(
select(AppInfo, AppMetrics, AppRating)
.join(AppMetrics, AppInfo.app_id == AppMetrics.app_id)
.outerjoin(AppRating, AppInfo.app_id == AppRating.app_id)
.where(AppInfo.pkg_name == pkg_name)
.order_by(AppMetrics.created_at.desc())
.limit(1)
)
row = result.first()
return ApiResponse(
success=True,
data={
"info": row[0].__dict__ if row else None,
"metric": row[1].__dict__ if row and len(row) > 1 else None,
"rating": row[2].__dict__ if row and len(row) > 2 else None,
"new_info": new_info,
"new_metric": new_metric,
"new_rating": new_rating,
"get_data": True
}
)
except Exception as e:
# 回退到数据库数据
result = await db.execute(
select(AppInfo, AppMetrics, AppRating)
.join(AppMetrics, AppInfo.app_id == AppMetrics.app_id)
.outerjoin(AppRating, AppInfo.app_id == AppRating.app_id)
.where(AppInfo.pkg_name == pkg_name)
.order_by(AppMetrics.created_at.desc())
.limit(1)
)
row = result.first()
if not row:
raise HTTPException(status_code=404, detail=f"应用 {pkg_name} 不存在")
return ApiResponse(
success=True,
data={
"info": row[0].__dict__,
"metric": row[1].__dict__ if len(row) > 1 else None,
"rating": row[2].__dict__ if len(row) > 2 else None,
"get_data": False,
"error": str(e)
}
)
finally:
await api.close()
@router.get("/list/{page}")
async def get_app_list(
page: int = 1,
page_size: int = Query(100, le=500),
detail: bool = True,
sort: Optional[str] = None,
desc: bool = True,
search_key: Optional[str] = None,
search_value: Optional[str] = None,
search_exact: bool = False,
db: AsyncSession = Depends(get_db)
):
"""分页获取应用列表"""
# 构建基础查询
if detail:
query = select(AppInfo, AppMetrics, AppRating).join(
AppMetrics, AppInfo.app_id == AppMetrics.app_id
).outerjoin(
AppRating, AppInfo.app_id == AppRating.app_id
)
else:
query = select(AppInfo)
# 搜索过滤
if search_key and search_value:
if search_exact:
query = query.where(getattr(AppInfo, search_key) == search_value)
else:
query = query.where(getattr(AppInfo, search_key).like(f"%{search_value}%"))
# 排序
if sort:
order_column = getattr(AppMetrics if hasattr(AppMetrics, sort) else AppInfo, sort)
query = query.order_by(order_column.desc() if desc else order_column.asc())
else:
query = query.order_by(AppMetrics.download_count.desc())
# 计算总数
count_query = select(func.count()).select_from(AppInfo)
if search_key and search_value:
if search_exact:
count_query = count_query.where(getattr(AppInfo, search_key) == search_value)
else:
count_query = count_query.where(getattr(AppInfo, search_key).like(f"%{search_value}%"))
total_result = await db.execute(count_query)
total_count = total_result.scalar()
# 分页
offset = (page - 1) * page_size
query = query.offset(offset).limit(page_size)
result = await db.execute(query)
rows = result.all()
# 格式化数据
data = []
for row in rows:
if detail:
data.append({
"info": row[0].__dict__,
"metric": row[1].__dict__ if len(row) > 1 else None,
"rating": row[2].__dict__ if len(row) > 2 else None
})
else:
data.append(row[0].__dict__)
return ApiResponse(
success=True,
data=data,
total=total_count,
limit=page_size
)
@router.get("/metrics/{pkg_name}")
async def get_app_metrics_history(
pkg_name: str,
db: AsyncSession = Depends(get_db)
):
"""获取应用指标历史"""
result = await db.execute(
select(AppMetrics)
.where(AppMetrics.pkg_name == pkg_name)
.order_by(AppMetrics.created_at.desc())
)
metrics = result.scalars().all()
return ApiResponse(
success=True,
data=[m.__dict__ for m in metrics]
)
```
#### 5.2.8 排行榜API (api/rankings.py)
```python
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from datetime import datetime, timedelta
from app.database import get_db
from app.models.app_info import AppInfo
from app.models.app_metrics import AppMetrics
from app.models.app_rating import AppRating
from app.schemas.response import ApiResponse
router = APIRouter(prefix="/rankings", tags=["排行榜"])
@router.get("/top-downloads")
async def get_top_downloads(
limit: int = Query(10, le=100),
exclude_pattern: str = Query(None),
db: AsyncSession = Depends(get_db)
):
"""下载量排行榜"""
# 子查询:获取每个应用的最新指标
subquery = (
select(
AppMetrics.app_id,
func.max(AppMetrics.created_at).label('max_created_at')
)
.group_by(AppMetrics.app_id)
.subquery()
)
# 主查询
query = (
select(AppInfo, AppMetrics)
.join(AppMetrics, AppInfo.app_id == AppMetrics.app_id)
.join(
subquery,
and_(
AppMetrics.app_id == subquery.c.app_id,
AppMetrics.created_at == subquery.c.max_created_at
)
)
.order_by(AppMetrics.download_count.desc())
.limit(limit)
)
# 排除模式
if exclude_pattern:
query = query.where(~AppInfo.pkg_name.like(f"%{exclude_pattern}%"))
result = await db.execute(query)
rows = result.all()
data = [
{
"app_id": row[0].app_id,
"name": row[0].name,
"pkg_name": row[0].pkg_name,
"developer_name": row[0].developer_name,
"icon_url": row[0].icon_url,
"download_count": row[1].download_count,
"version": row[1].version
}
for row in rows
]
return ApiResponse(success=True, data=data, limit=limit)
@router.get("/ratings")
async def get_top_ratings(
limit: int = Query(10, le=100),
db: AsyncSession = Depends(get_db)
):
"""评分排行榜"""
subquery = (
select(
AppRating.app_id,
func.max(AppRating.created_at).label('max_created_at')
)
.group_by(AppRating.app_id)
.subquery()
)
query = (
select(AppInfo, AppRating)
.join(AppRating, AppInfo.app_id == AppRating.app_id)
.join(
subquery,
and_(
AppRating.app_id == subquery.c.app_id,
AppRating.created_at == subquery.c.max_created_at
)
)
.where(AppRating.total_rating_count >= 100) # 至少100个评分
.order_by(AppRating.average_rating.desc())
.limit(limit)
)
result = await db.execute(query)
rows = result.all()
data = [
{
"app_id": row[0].app_id,
"name": row[0].name,
"pkg_name": row[0].pkg_name,
"developer_name": row[0].developer_name,
"icon_url": row[0].icon_url,
"average_rating": float(row[1].average_rating),
"total_rating_count": row[1].total_rating_count
}
for row in rows
]
return ApiResponse(success=True, data=data, limit=limit)
@router.get("/developers")
async def get_top_developers(
limit: int = Query(10, le=100),
db: AsyncSession = Depends(get_db)
):
"""开发者排行榜(按应用数量)"""
query = (
select(
AppInfo.developer_name,
func.count(AppInfo.app_id).label('app_count'),
func.sum(AppMetrics.download_count).label('total_downloads')
)
.join(AppMetrics, AppInfo.app_id == AppMetrics.app_id)
.group_by(AppInfo.developer_name)
.order_by(func.count(AppInfo.app_id).desc())
.limit(limit)
)
result = await db.execute(query)
rows = result.all()
data = [
{
"developer_name": row[0],
"app_count": row[1],
"total_downloads": row[2] or 0
}
for row in rows
]
return ApiResponse(success=True, data=data, limit=limit)
```
#### 5.2.9 定时任务 (scheduler/tasks.py)
```python
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import AsyncSessionLocal
from app.config import settings
from app.crawler.huawei_api import HuaweiAPI
from app.crawler.data_processor import DataProcessor
import asyncio
import random
class CrawlerScheduler:
def __init__(self):
self.scheduler = AsyncIOScheduler()
self.is_running = False
def start(self):
"""启动调度器"""
# 添加定时任务
self.scheduler.add_job(
self.sync_all_apps,
trigger=IntervalTrigger(seconds=settings.CRAWLER_INTERVAL),
id='sync_all_apps',
name='同步所有应用',
replace_existing=True
)
self.scheduler.start()
print(f"调度器已启动,同步间隔: {settings.CRAWLER_INTERVAL}秒")
def stop(self):
"""停止调度器"""
self.scheduler.shutdown()
print("调度器已停止")
async def sync_all_apps(self):
"""同步所有应用"""
if self.is_running:
print("上一次同步尚未完成,跳过本次同步")
return
self.is_running = True
print(f"开始同步所有应用 - {datetime.now()}")
try:
async with AsyncSessionLocal() as db:
# 获取所有包名
from sqlalchemy import select
from app.models.app_info import AppInfo
result = await db.execute(select(AppInfo.pkg_name))
pkg_names = [row[0] for row in result.all()]
# 随机打乱顺序
random.shuffle(pkg_names)
print(f"共需同步 {len(pkg_names)} 个应用")
# 批量处理
api = HuaweiAPI()
processor = DataProcessor(db)
total_processed = 0
total_inserted = 0
total_failed = 0
for i in range(0, len(pkg_names), settings.CRAWLER_BATCH_SIZE):
batch = pkg_names[i:i + settings.CRAWLER_BATCH_SIZE]
# 并发处理批次
tasks = [
self._sync_single_app(api, processor, pkg_name)
for pkg_name in batch
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
for result in results:
total_processed += 1
if isinstance(result, Exception):
total_failed += 1
elif result:
total_inserted += 1
print(f"已处理 {total_processed}/{len(pkg_names)} 个应用")
# 批次间延迟
await asyncio.sleep(0.5)
await api.close()
print(f"同步完成 - 处理: {total_processed}, 更新: {total_inserted}, 失败: {total_failed}")
except Exception as e:
print(f"同步失败: {e}")
finally:
self.is_running = False
async def _sync_single_app(
self,
api: HuaweiAPI,
processor: DataProcessor,
pkg_name: str
) -> bool:
"""同步单个应用"""
try:
# 获取应用数据
app_data = await api.get_app_info(pkg_name=pkg_name)
rating_data = await api.get_app_rating(app_data['appId'])
# 保存数据
new_info, new_metric, new_rating = await processor.save_app_data(
app_data, rating_data
)
return new_info or new_metric or new_rating
except Exception as e:
print(f"同步 {pkg_name} 失败: {e}")
return False
# 全局调度器实例
scheduler = CrawlerScheduler()
```
#### 5.2.10 主应用 (main.py)
```python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from app.config import settings
from app.api import apps, rankings, charts, submit
from app.scheduler.tasks import scheduler
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时
print("应用启动中...")
scheduler.start()
yield
# 关闭时
print("应用关闭中...")
scheduler.stop()
# 创建FastAPI应用
app = FastAPI(
title=settings.API_TITLE,
version=settings.API_VERSION,
lifespan=lifespan
)
# CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(apps.router, prefix=settings.API_PREFIX)
app.include_router(rankings.router, prefix=settings.API_PREFIX)
app.include_router(charts.router, prefix=settings.API_PREFIX)
app.include_router(submit.router, prefix=settings.API_PREFIX)
@app.get("/")
async def root():
return {"message": "华为应用市场数据API", "version": settings.API_VERSION}
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
reload=settings.DEBUG
)
```
### 5.3 依赖文件 (requirements.txt)
```txt
fastapi==0.109.0
uvicorn[standard]==0.27.0
sqlalchemy==2.0.25
aiomysql==0.2.0
pydantic==2.5.3
pydantic-settings==2.1.0
httpx==0.26.0
playwright==1.41.0
apscheduler==3.10.4
python-dotenv==1.0.0
python-multipart==0.0.6
```
### 5.4 环境配置 (.env.example)
```env
# 数据库配置
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=your_password
MYSQL_DATABASE=huawei_market
# 华为API配置
HUAWEI_API_BASE_URL=https://web-drcn.hispace.dbankcloud.com/edge
HUAWEI_LOCALE=zh_CN
# 爬虫配置
CRAWLER_INTERVAL=1800
CRAWLER_BATCH_SIZE=100
CRAWLER_TIMEOUT=30
# API配置
API_PREFIX=/api
API_TITLE=华为应用市场数据API
API_VERSION=1.0.0
# 其他配置
DEBUG=False
CORS_ORIGINS=["http://localhost:5173","http://localhost:3000"]
```
---
## 6. 前端开发
### 6.1 项目结构
```
frontend/
├── public/
│ └── favicon.ico
├── src/
│ ├── assets/ # 静态资源
│ │ ├── styles/
│ │ │ └── main.css
│ │ └── images/
│ ├── components/ # 组件
│ │ ├── AppCard.vue
│ │ ├── AppTable.vue
│ │ ├── ChartCard.vue
│ │ ├── StatCard.vue
│ │ └── SearchBar.vue
│ ├── views/ # 页面
│ │ ├── Dashboard.vue
│ │ ├── AppDetail.vue
│ │ └── Rankings.vue
│ ├── api/ # API封装
│ │ ├── index.ts
│ │ └── apps.ts
│ ├── stores/ # 状态管理
│ │ └── app.ts
│ ├── types/ # 类型定义
│ │ └── app.ts
│ ├── utils/ # 工具函数
│ │ └── format.ts
│ ├── router/ # 路由
│ │ └── index.ts
│ ├── App.vue
│ └── main.ts
├── index.html
├── package.json
├── tsconfig.json
├── vite.config.ts
└── README.md
```
### 6.2 核心代码实现
#### 6.2.1 类型定义 (types/app.ts)
```typescript
export interface AppInfo {
app_id: string
name: string
pkg_name: string
developer_name: string
dev_en_name?: string
kind_name: string
kind_type_name: string
icon_url: string
brief_desc: string
description: string
privacy_url: string
iap: boolean
is_pay: boolean
listed_at: string
created_at: string
}
export interface AppMetric {
id: number
app_id: string
pkg_name: string
version: string
version_code: number
size_bytes: number
download_count: number
info_score: number
info_rate_count: number
price: number
release_date: number
target_sdk: string
min_sdk: string
created_at: string
}
export interface AppRating {
id: number
app_id: string
average_rating: number
star_1_count: number
star_2_count: number
star_3_count: number
star_4_count: number
star_5_count: number
total_rating_count: number
created_at: string
}
export interface FullAppInfo {
info: AppInfo
metric: AppMetric
rating?: AppRating
}
export interface ApiResponse<T = any> {
success: boolean
data: T
total?: number
limit?: number
timestamp: string
}
export interface MarketStats {
app_count: {
total: number
apps: number
atomic_services: number
}
developer_count: number
}
export interface RankingItem {
app_id: string
name: string
pkg_name: string
developer_name: string
icon_url: string
download_count?: number
average_rating?: number
total_rating_count?: number
}
```
#### 6.2.2 API封装 (api/apps.ts)
```typescript
import axios from 'axios'
import type { ApiResponse, FullAppInfo, MarketStats, RankingItem } from '@/types/app'
const api = axios.create({
baseURL: import.meta.env.VITE_API_BASE_URL || 'http://localhost:8000/api',
timeout: 30000
})
// 请求拦截器
api.interceptors.request.use(
config => {
// 可以在这里添加token等
return config
},
error => {
return Promise.reject(error)
}
)
// 响应拦截器
api.interceptors.response.use(
response => {
return response.data
},
error => {
console.error('API Error:', error)
return Promise.reject(error)
}
)
export const appsApi = {
// 获取市场统计信息
getMarketInfo: () =>
api.get<any, ApiResponse<MarketStats>>('/market_info'),
// 按包名查询应用
getAppByPkgName: (pkgName: string) =>
api.get<any, ApiResponse<FullAppInfo>>(`/apps/pkg_name/${pkgName}`),
// 按应用ID查询
getAppById: (appId: string) =>
api.get<any, ApiResponse<FullAppInfo>>(`/apps/app_id/${appId}`),
// 获取应用列表
getAppList: (params: {
page: number
page_size?: number
detail?: boolean
sort?: string
desc?: boolean
search_key?: string
search_value?: string
search_exact?: boolean
}) =>
api.get<any, ApiResponse<FullAppInfo[]>>(`/apps/list/${params.page}`, { params }),
// 获取应用指标历史
getAppMetrics: (pkgName: string) =>
api.get<any, ApiResponse<any[]>>(`/apps/metrics/${pkgName}`),
// 获取下载排行
getTopDownloads: (params?: { limit?: number; exclude_pattern?: string }) =>
api.get<any, ApiResponse<RankingItem[]>>('/rankings/top-downloads', { params }),
// 获取评分排行
getTopRatings: (params?: { limit?: number }) =>
api.get<any, ApiResponse<RankingItem[]>>('/rankings/ratings', { params }),
// 获取开发者排行
getTopDevelopers: (params?: { limit?: number }) =>
api.get<any, ApiResponse<any[]>>('/rankings/developers', { params }),
// 获取评分分布
getRatingDistribution: () =>
api.get<any, ApiResponse<Record<string, number>>>('/charts/rating'),
// 获取SDK分布
getMinSdkDistribution: () =>
api.get<any, ApiResponse<Record<string, number>>>('/charts/min_sdk'),
getTargetSdkDistribution: () =>
api.get<any, ApiResponse<Record<string, number>>>('/charts/target_sdk'),
// 投稿应用
submitApp: (data: {
pkg_name?: string
app_id?: string
comment?: any
}) =>
api.post<any, ApiResponse<any>>('/submit', data)
}
export default api
```
#### 6.2.3 状态管理 (stores/app.ts)
```typescript
import { defineStore } from 'pinia'
import { ref, computed } from 'vue'
import { appsApi } from '@/api/apps'
import type { MarketStats, FullAppInfo } from '@/types/app'
export const useAppStore = defineStore('app', () => {
// 状态
const marketStats = ref<MarketStats | null>(null)
const appList = ref<FullAppInfo[]>([])
const currentPage = ref(1)
const pageSize = ref(100)
const totalCount = ref(0)
const loading = ref(false)
// 计算属性
const totalPages = computed(() => Math.ceil(totalCount.value / pageSize.value))
// 方法
const fetchMarketStats = async () => {
try {
const response = await appsApi.getMarketInfo()
if (response.success) {
marketStats.value = response.data
}
} catch (error) {
console.error('获取市场统计失败:', error)
}
}
const fetchAppList = async (params: {
page?: number
page_size?: number
sort?: string
desc?: boolean
search_key?: string
search_value?: string
search_exact?: boolean
} = {}) => {
loading.value = true
try {
const response = await appsApi.getAppList({
page: params.page || currentPage.value,
page_size: params.page_size || pageSize.value,
detail: true,
...params
})
if (response.success) {
appList.value = response.data
totalCount.value = response.total || 0
currentPage.value = params.page || currentPage.value
}
} catch (error) {
console.error('获取应用列表失败:', error)
} finally {
loading.value = false
}
}
const searchApps = async (searchKey: string, searchValue: string, exact: boolean = false) => {
await fetchAppList({
page: 1,
search_key: searchKey,
search_value: searchValue,
search_exact: exact
})
}
return {
marketStats,
appList,
currentPage,
pageSize,
totalCount,
totalPages,
loading,
fetchMarketStats,
fetchAppList,
searchApps
}
})
```
#### 6.2.4 工具函数 (utils/format.ts)
```typescript
/**
* 格式化文件大小
*/
export function formatFileSize(bytes: number): string {
if (bytes === 0) return '0 B'
const k = 1024
const sizes = ['B', 'KB', 'MB', 'GB', 'TB']
const i = Math.floor(Math.log(bytes) / Math.log(k))
return Math.round(bytes / Math.pow(k, i) * 100) / 100 + ' ' + sizes[i]
}
/**
* 格式化下载量
*/
export function formatDownloadCount(count: number): string {
if (count >= 100000000) {
return (count / 100000000).toFixed(1) + '亿'
} else if (count >= 10000) {
return (count / 10000).toFixed(1) + '万'
}
return count.toString()
}
/**
* 格式化日期
*/
export function formatDate(date: string | number): string {
const d = new Date(date)
return d.toLocaleDateString('zh-CN', {
year: 'numeric',
month: '2-digit',
day: '2-digit',
hour: '2-digit',
minute: '2-digit'
})
}
/**
* 格式化评分
*/
export function formatRating(rating: number): string {
return rating.toFixed(1)
}
/**
* 获取星级数组
*/
export function getStarArray(rating: number): boolean[] {
const fullStars = Math.floor(rating)
const hasHalfStar = rating % 1 >= 0.5
const stars: boolean[] = []
for (let i = 0; i < 5; i++) {
stars.push(i < fullStars || (i === fullStars && hasHalfStar))
}
return stars
}
```
---
## 附录A如何获取应用包名
### A.1 从华为应用市场网页获取
#### 方法1从URL中提取
访问华为应用市场应用详情页URL格式如下
```
https://appgallery.huawei.com/app/C1164531384803416384
```
或者:
```
https://appgallery.huawei.com/#/app/C1164531384803416384
```
**注意:** URL中的是 `app_id`,不是包名。需要进一步获取包名。
#### 方法2从网页源码中提取
1. 打开应用详情页
2. 右键 -> 查看网页源代码
3. 搜索 `"pkgName"``"packageName"`
4. 找到类似这样的内容:
```json
{
"pkgName": "com.huawei.hmsapp.appgallery",
"appId": "C1164531384803416384",
...
}
```
#### 方法3使用浏览器开发者工具
1. 打开应用详情页
2. 按 F12 打开开发者工具
3. 切换到 Network网络标签
4. 刷新页面
5. 筛选 XHR 请求,找到 `appinfo` 相关的请求
6. 查看请求的 Response找到 `pkgName` 字段
**示例截图说明:**
```
Network -> XHR -> appinfo
Response:
{
"pkgName": "com.huawei.hmsapp.appgallery",
"name": "应用市场",
...
}
```
### A.2 从安卓设备获取
#### 方法1使用 ADB 命令
如果你有安卓设备或模拟器:
```bash
# 列出所有已安装应用的包名
adb shell pm list packages
# 列出第三方应用
adb shell pm list packages -3
# 搜索特定应用(例如包含 huawei 的)
adb shell pm list packages | grep huawei
# 获取当前运行应用的包名
adb shell dumpsys window | grep mCurrentFocus
```
**输出示例:**
```
package:com.huawei.hmsapp.appgallery
package:com.huawei.browser
package:com.huawei.music
```
#### 方法2使用应用信息查看器
在安卓设备上安装 "应用信息查看器" 类的应用,例如:
- **Package Name Viewer**
- **App Inspector**
- **Dev Tools**
这些应用可以直接显示已安装应用的包名。
### A.3 批量获取包名的方法
#### 方法1爬取华为应用市场分类页
```python
import httpx
from bs4 import BeautifulSoup
async def get_apps_from_category(category_id: str):
"""从分类页获取应用列表"""
url = f"https://appgallery.huawei.com/Featured/{category_id}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
# 查找应用链接
app_links = soup.find_all('a', href=True)
app_ids = []
for link in app_links:
href = link['href']
if '/app/' in href:
app_id = href.split('/app/')[-1]
app_ids.append(app_id)
return app_ids
# 使用示例
app_ids = await get_apps_from_category('10000000') # 工具分类
```
#### 方法2通过应用ID猜测
华为应用的 app_id 格式为:`C` + 19位数字
可以通过遍历数字范围来发现应用:
```python
async def guess_app_ids(start: int, end: int):
"""猜测应用ID"""
api = HuaweiAPI()
found_apps = []
for i in range(start, end):
app_id = f"C{i:019d}"
try:
app_data = await api.get_app_info(app_id=app_id)
found_apps.append({
'app_id': app_id,
'pkg_name': app_data['pkgName'],
'name': app_data['name']
})
print(f"找到应用: {app_data['name']} ({app_data['pkgName']})")
except:
pass
return found_apps
# 使用示例
apps = await guess_app_ids(1164531384803416384, 1164531384803416484)
```
#### 方法3从已有数据库扩展
如果已经有一些应用数据,可以通过以下方式扩展:
1. **同开发者的其他应用**
```sql
SELECT DISTINCT pkg_name
FROM app_info
WHERE developer_name = '华为软件技术有限公司'
```
2. **同分类的应用**
```sql
SELECT DISTINCT pkg_name
FROM app_info
WHERE kind_name = '工具'
```
3. **相关推荐应用**
- 访问应用详情页,查看"相关推荐"部分
- 提取推荐应用的 app_id
### A.4 常见应用包名示例
```python
# 华为系统应用
HUAWEI_SYSTEM_APPS = [
"com.huawei.hmsapp.appgallery", # 应用市场
"com.huawei.browser", # 浏览器
"com.huawei.music", # 音乐
"com.huawei.himovie", # 视频
"com.huawei.camera", # 相机
"com.huawei.health", # 运动健康
"com.huawei.wallet", # 钱包
]
# 热门第三方应用
POPULAR_APPS = [
"com.tencent.mm", # 微信
"com.tencent.mobileqq", # QQ
"com.sina.weibo", # 微博
"com.taobao.taobao", # 淘宝
"com.jingdong.app.mall", # 京东
"com.ss.android.ugc.aweme", # 抖音
]
# 鸿蒙元服务(包名特征)
ATOMIC_SERVICE_PATTERN = "com.atomicservice.*"
```
### A.5 包名命名规范
包名通常遵循以下规范:
**格式:** `com.公司名.应用名`
**示例:**
- `com.huawei.hmsapp.appgallery` - 华为应用市场
- `com.tencent.mm` - 腾讯微信
- `com.alibaba.android.rimet` - 阿里钉钉
**鸿蒙元服务:**
- `com.atomicservice.{19位数字}` - 元服务包名格式
### A.6 实用工具脚本
#### 从URL批量提取包名
```python
import re
import httpx
from typing import List
async def extract_pkg_names_from_urls(urls: List[str]) -> List[dict]:
"""从URL列表批量提取包名"""
api = HuaweiAPI()
results = []
for url in urls:
# 从URL提取app_id
match = re.search(r'/app/([A-Z0-9]+)', url)
if not match:
continue
app_id = match.group(1)
try:
app_data = await api.get_app_info(app_id=app_id)
results.append({
'url': url,
'app_id': app_id,
'pkg_name': app_data['pkgName'],
'name': app_data['name']
})
except Exception as e:
print(f"处理 {url} 失败: {e}")
return results
# 使用示例
urls = [
"https://appgallery.huawei.com/app/C1164531384803416384",
"https://appgallery.huawei.com/app/C100000000000000001",
]
results = await extract_pkg_names_from_urls(urls)
for r in results:
print(f"{r['name']}: {r['pkg_name']}")
```
#### 导出包名列表
```python
import csv
from sqlalchemy import select
from app.models.app_info import AppInfo
async def export_pkg_names_to_csv(db: AsyncSession, filename: str = "pkg_names.csv"):
"""导出所有包名到CSV文件"""
result = await db.execute(
select(AppInfo.pkg_name, AppInfo.name, AppInfo.developer_name)
.order_by(AppInfo.name)
)
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['包名', '应用名称', '开发者'])
for row in result:
writer.writerow([row.pkg_name, row.name, row.developer_name])
print(f"已导出到 {filename}")
```
### A.7 注意事项
1. **包名唯一性**
- 每个应用的包名在华为应用市场中是唯一的
- 同一个应用在不同应用市场的包名相同
2. **包名格式验证**
```python
import re
def is_valid_pkg_name(pkg_name: str) -> bool:
"""验证包名格式"""
pattern = r'^[a-z][a-z0-9_]*(\.[a-z][a-z0-9_]*)+$'
return bool(re.match(pattern, pkg_name))
# 示例
print(is_valid_pkg_name("com.huawei.hmsapp.appgallery")) # True
print(is_valid_pkg_name("Com.Huawei.App")) # False (大写)
print(is_valid_pkg_name("huawei.app")) # False (少于2段)
```
3. **元服务识别**
```python
def is_atomic_service(pkg_name: str) -> bool:
"""判断是否为元服务"""
return pkg_name.startswith("com.atomicservice.")
```
4. **获取频率限制**
- 避免过于频繁的请求
- 建议添加延迟:每次请求间隔 0.5-1 秒
- 使用批量处理时注意并发数量
5. **数据更新策略**
- 优先更新下载量高的应用
- 定期全量同步所有已知包名
- 新发现的包名及时入库
---
## 7. 部署指南
### 7.1 Docker 部署
#### 7.1.1 后端 Dockerfile
```dockerfile
# backend/Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
default-libmysqlclient-dev \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
# 安装 Playwright 依赖
RUN apt-get update && apt-get install -y \
libnss3 \
libnspr4 \
libatk1.0-0 \
libatk-bridge2.0-0 \
libcups2 \
libdrm2 \
libxkbcommon0 \
libxcomposite1 \
libxdamage1 \
libxfixes3 \
libxrandr2 \
libgbm1 \
libasound2
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 安装 Playwright 浏览器
RUN playwright install chromium
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
```
#### 7.1.2 前端 Dockerfile
```dockerfile
# frontend/Dockerfile
FROM node:18-alpine as builder
WORKDIR /app
# 复制依赖文件
COPY package*.json ./
# 安装依赖
RUN npm ci
# 复制源代码
COPY . .
# 构建
RUN npm run build
# 生产环境
FROM nginx:alpine
# 复制构建产物
COPY --from=builder /app/dist /usr/share/nginx/html
# 复制 Nginx 配置
COPY nginx.conf /etc/nginx/conf.d/default.conf
EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]
```
#### 7.1.3 Nginx 配置
```nginx
# frontend/nginx.conf
server {
listen 80;
server_name localhost;
root /usr/share/nginx/html;
index index.html;
# Gzip 压缩
gzip on;
gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
# 前端路由
location / {
try_files $uri $uri/ /index.html;
}
# API 代理
location /api {
proxy_pass http://backend:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# 静态资源缓存
location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ {
expires 1y;
add_header Cache-Control "public, immutable";
}
}
```
#### 7.1.4 Docker Compose
```yaml
# docker-compose.yml
version: '3.8'
services:
mysql:
image: mysql:8.0
container_name: huawei_market_mysql
restart: always
environment:
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
MYSQL_DATABASE: ${MYSQL_DATABASE}
MYSQL_USER: ${MYSQL_USER}
MYSQL_PASSWORD: ${MYSQL_PASSWORD}
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
- ./backend/sql:/docker-entrypoint-initdb.d
command: --default-authentication-plugin=mysql_native_password
networks:
- app_network
backend:
build:
context: ./backend
dockerfile: Dockerfile
container_name: huawei_market_backend
restart: always
environment:
MYSQL_HOST: mysql
MYSQL_PORT: 3306
MYSQL_USER: ${MYSQL_USER}
MYSQL_PASSWORD: ${MYSQL_PASSWORD}
MYSQL_DATABASE: ${MYSQL_DATABASE}
ports:
- "8000:8000"
depends_on:
- mysql
volumes:
- ./backend:/app
networks:
- app_network
frontend:
build:
context: ./frontend
dockerfile: Dockerfile
container_name: huawei_market_frontend
restart: always
ports:
- "80:80"
depends_on:
- backend
networks:
- app_network
volumes:
mysql_data:
networks:
app_network:
driver: bridge
```
#### 7.1.5 环境变量文件
```env
# .env
MYSQL_ROOT_PASSWORD=root_password_here
MYSQL_DATABASE=huawei_market
MYSQL_USER=market_user
MYSQL_PASSWORD=user_password_here
```
### 7.2 部署步骤
#### 7.2.1 准备工作
```bash
# 1. 克隆项目
git clone <your-repo-url>
cd huawei-market-crawler
# 2. 创建环境变量文件
cp .env.example .env
# 编辑 .env 文件,填入实际配置
# 3. 创建必要的目录
mkdir -p backend/logs
mkdir -p mysql_data
```
#### 7.2.2 使用 Docker Compose 部署
```bash
# 构建并启动所有服务
docker-compose up -d --build
# 查看服务状态
docker-compose ps
# 查看日志
docker-compose logs -f backend
# 停止服务
docker-compose down
# 停止并删除数据卷
docker-compose down -v
```
#### 7.2.3 初始化数据库
```bash
# 进入 MySQL 容器
docker exec -it huawei_market_mysql mysql -u root -p
# 执行初始化脚本
mysql> USE huawei_market;
mysql> SOURCE /docker-entrypoint-initdb.d/init.sql;
```
#### 7.2.4 验证部署
```bash
# 检查后端健康状态
curl http://localhost:8000/health
# 检查前端
curl http://localhost/
# 测试 API
curl http://localhost:8000/api/market_info
```
### 7.3 生产环境优化
#### 7.3.1 使用 Gunicorn 运行后端
```bash
# 安装 gunicorn
pip install gunicorn
# 启动命令
gunicorn app.main:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--access-logfile logs/access.log \
--error-logfile logs/error.log \
--log-level info
```
#### 7.3.2 MySQL 优化配置
```ini
# my.cnf
[mysqld]
# 基础配置
max_connections = 500
max_allowed_packet = 64M
# InnoDB 配置
innodb_buffer_pool_size = 2G
innodb_log_file_size = 256M
innodb_flush_log_at_trx_commit = 2
innodb_flush_method = O_DIRECT
# 查询缓存
query_cache_type = 1
query_cache_size = 128M
# 慢查询日志
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2
```
#### 7.3.3 Nginx 生产配置
```nginx
# /etc/nginx/sites-available/huawei-market
server {
listen 80;
server_name your-domain.com;
# 重定向到 HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name your-domain.com;
# SSL 证书
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
# SSL 配置
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
# 安全头
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
# 日志
access_log /var/log/nginx/huawei-market-access.log;
error_log /var/log/nginx/huawei-market-error.log;
# 前端
location / {
root /var/www/huawei-market/frontend;
try_files $uri $uri/ /index.html;
}
# API
location /api {
proxy_pass http://127.0.0.1:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时设置
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
}
```
### 7.4 监控与维护
#### 7.4.1 日志管理
```python
# app/utils/logger.py
import logging
from logging.handlers import RotatingFileHandler
import os
def setup_logger(name: str, log_file: str, level=logging.INFO):
"""配置日志"""
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 确保日志目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# 文件处理器(自动轮转)
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(formatter)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
```
#### 7.4.2 健康检查
```python
# app/api/health.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from app.database import get_db
router = APIRouter(tags=["健康检查"])
@router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
"""健康检查"""
try:
# 检查数据库连接
await db.execute(text("SELECT 1"))
return {
"status": "healthy",
"database": "connected",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "unhealthy",
"database": "disconnected",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
```
#### 7.4.3 性能监控
```bash
# 使用 Prometheus + Grafana 监控
# 1. 安装 prometheus-fastapi-instrumentator
pip install prometheus-fastapi-instrumentator
# 2. 在 main.py 中添加
from prometheus_fastapi_instrumentator import Instrumentator
app = FastAPI()
Instrumentator().instrument(app).expose(app)
```
### 7.5 备份策略
```bash
#!/bin/bash
# backup.sh - 数据库备份脚本
BACKUP_DIR="/backup/mysql"
DATE=$(date +%Y%m%d_%H%M%S)
MYSQL_USER="root"
MYSQL_PASSWORD="your_password"
DATABASE="huawei_market"
# 创建备份目录
mkdir -p $BACKUP_DIR
# 备份数据库
mysqldump -u$MYSQL_USER -p$MYSQL_PASSWORD \
--single-transaction \
--routines \
--triggers \
$DATABASE > $BACKUP_DIR/backup_$DATE.sql
# 压缩备份文件
gzip $BACKUP_DIR/backup_$DATE.sql
# 删除7天前的备份
find $BACKUP_DIR -name "backup_*.sql.gz" -mtime +7 -delete
echo "备份完成: backup_$DATE.sql.gz"
```
---
## 8. 开发建议与最佳实践
### 8.1 代码规范
- **Python**: 遵循 PEP 8 规范,使用 Black 格式化
- **TypeScript**: 使用 ESLint + Prettier
- **提交信息**: 遵循 Conventional Commits 规范
### 8.2 测试策略
```python
# tests/test_crawler.py
import pytest
from app.crawler.huawei_api import HuaweiAPI
@pytest.mark.asyncio
async def test_get_app_info():
api = HuaweiAPI()
data = await api.get_app_info(pkg_name="com.huawei.hmsapp.appgallery")
assert data['pkgName'] == "com.huawei.hmsapp.appgallery"
assert 'name' in data
assert 'appId' in data
await api.close()
```
### 8.3 性能优化
1. **数据库查询优化**
- 使用索引
- 避免 N+1 查询
- 使用连接池
2. **缓存策略**
- Redis 缓存热门数据
- 前端使用 LocalStorage
3. **异步处理**
- 使用异步 I/O
- 批量处理数据
### 8.4 安全建议
1. **API 安全**
- 添加 API 限流
- 使用 JWT 认证(如需要)
- 输入验证和清洗
2. **数据库安全**
- 使用参数化查询
- 最小权限原则
- 定期备份
3. **爬虫礼仪**
- 遵守 robots.txt
- 控制请求频率
- 使用合理的 User-Agent
---
## 9. 常见问题 FAQ
### Q1: Token 获取失败怎么办?
**A:**
1. 检查网络连接
2. 确认 Playwright 浏览器已安装
3. 尝试手动访问华为应用市场,检查是否需要验证码
4. 增加等待时间
### Q2: 数据库连接超时?
**A:**
1. 检查 MySQL 服务是否运行
2. 验证连接配置是否正确
3. 增加连接池大小
4. 检查防火墙设置
### Q3: 爬取速度太慢?
**A:**
1. 增加并发数量
2. 使用批量处理
3. 优化数据库写入
4. 考虑使用多台服务器分布式爬取
### Q4: 如何处理反爬虫?
**A:**
1. 降低请求频率
2. 使用代理IP池
3. 模拟真实浏览器行为
4. 定期更新 Token
---
## 10. 参考资源
- **FastAPI 文档**: https://fastapi.tiangolo.com/
- **Vue 3 文档**: https://vuejs.org/
- **SQLAlchemy 文档**: https://docs.sqlalchemy.org/
- **Playwright 文档**: https://playwright.dev/python/
- **MySQL 文档**: https://dev.mysql.com/doc/
---
## 附录B完整项目清单
### 后端文件清单
```
backend/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── database.py
│ ├── models/
│ ├── schemas/
│ ├── api/
│ ├── crawler/
│ ├── scheduler/
│ └── utils/
├── tests/
├── logs/
├── requirements.txt
├── .env
├── Dockerfile
└── README.md
```
### 前端文件清单
```
frontend/
├── public/
├── src/
│ ├── assets/
│ ├── components/
│ ├── views/
│ ├── api/
│ ├── stores/
│ ├── types/
│ ├── utils/
│ ├── router/
│ ├── App.vue
│ └── main.ts
├── package.json
├── vite.config.ts
├── tsconfig.json
├── Dockerfile
├── nginx.conf
└── README.md
```
---
**文档版本**: v1.0
**最后更新**: 2024年
**维护者**: [Your Name]
**许可证**: MIT
---
## 附录C原项目中的包名获取策略
原 Rust 项目使用了多种创新的方法来发现和获取应用包名,这些方法非常值得借鉴。
### C.1 核心策略概览
原项目提供了 **7 个独立工具** 用于获取包名和应用数据:
| 工具名 | 用途 | 策略 |
|--------|------|------|
| `guess_market` | 应用ID猜测 | 遍历指定范围的应用ID |
| `guess_rand` | 随机猜测 | 随机生成应用ID进行探测 |
| `guess_from_db` | 数据库扩展 | 基于已有数据推测相邻ID |
| `guess_large` | 大规模猜测 | 大范围ID扫描 |
| `get_nextmax` | 第三方数据源 | 从 nextmax.cn 获取 |
| `read_appgallery` | 应用市场爬取 | 直接爬取华为应用市场页面 |
| `read_pkg_name` | 批量导入 | 从文件读取包名列表 |
### C.2 方法详解
#### C.2.1 应用ID猜测法 (guess_market)
**原理:** 华为应用的 app_id 格式为固定前缀 + 数字,通过遍历数字范围来发现应用。
**app_id 格式:**
```
C576588020785 + 7位数字
例如: C5765880207856366961
```
**核心代码逻辑:**
```rust
// 定义扫描范围
let range = 2000000..=6390000;
let start = "C576588020785";
// 批量处理每批1000个
for bunch_id in range_vec.chunks(1000) {
let mut join_set = tokio::task::JoinSet::new();
for id in bunch_id.iter() {
let app_id = format!("{start}{id:07}"); // 格式化为7位数字
// 异步请求华为API
join_set.spawn(async move {
if let Ok(data) = query_app(&client, &api_url, &AppQuery::app_id(&app_id), &locale).await {
// 保存到数据库
db.save_app_data(&data.0, data.1.as_ref(), None, Some(comment)).await
}
});
}
join_set.join_all().await;
tokio::time::sleep(Duration::from_millis(25)).await; // 批次间延迟
}
```
**Python 实现示例:**
```python
import asyncio
from typing import List
async def guess_market_apps(
start_prefix: str = "C576588020785",
start_range: int = 2000000,
end_range: int = 6390000,
batch_size: int = 1000
):
"""通过ID猜测发现应用"""
api = HuaweiAPI()
db = Database()
for batch_start in range(start_range, end_range, batch_size):
batch_end = min(batch_start + batch_size, end_range)
tasks = []
for i in range(batch_start, batch_end):
app_id = f"{start_prefix}{i:07d}" # 7位数字不足补0
tasks.append(try_fetch_app(api, db, app_id))
# 并发执行
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
success_count = sum(1 for r in results if not isinstance(r, Exception))
print(f"批次 {batch_start}-{batch_end}: 成功 {success_count}/{len(tasks)}")
# 批次间延迟
await asyncio.sleep(0.025)
async def try_fetch_app(api: HuaweiAPI, db: Database, app_id: str):
"""尝试获取单个应用"""
try:
app_data = await api.get_app_info(app_id=app_id)
rating_data = await api.get_app_rating(app_id)
await db.save_app_data(app_data, rating_data, comment={
"user": "guess_market",
"method": "id_guessing"
})
print(f"✓ 发现应用: {app_data['name']} ({app_data['pkgName']})")
return True
except Exception as e:
# 应用不存在或请求失败,静默跳过
return False
```
**已知的应用ID前缀**
```python
KNOWN_APP_ID_PREFIXES = [
"C576588020785", # 主要前缀
"C69175", # 另一个前缀系列
# 可以通过分析已有数据发现更多前缀
]
```
#### C.2.2 随机猜测法 (guess_rand)
**原理:** 在已知的ID范围内随机生成ID提高发现效率。
**适用场景:**
- ID空间很大顺序遍历效率低
- 想要快速发现热门应用通常ID较新
**核心逻辑:**
```rust
let code_start = 59067092904725_u64;
let size = 85170011059280_u64 - code_start;
let start = "C69175";
loop {
let mut ids: Vec<u64> = Vec::with_capacity(1000);
for _ in 0..1000 {
let id = code_start + (rng.next() % size); // 随机生成
ids.push(id);
}
// 批量处理这些随机ID
// ...
}
```
**Python 实现:**
```python
import random
async def guess_random_apps(
prefix: str = "C69175",
start: int = 59067092904725,
end: int = 85170011059280,
batch_size: int = 1000
):
"""随机猜测应用ID"""
api = HuaweiAPI()
db = Database()
while True:
# 生成随机ID批次
random_ids = [
f"{prefix}{random.randint(start, end)}"
for _ in range(batch_size)
]
tasks = [try_fetch_app(api, db, app_id) for app_id in random_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = sum(1 for r in results if r is True)
print(f"随机批次: 成功 {success_count}/{batch_size}")
await asyncio.sleep(0.005)
```
#### C.2.3 数据库扩展法 (guess_from_db)
**原理:** 基于已有的应用ID推测其相邻的ID可能也是有效应用。
**策略:**
1. 从数据库获取所有已知的 app_id
2. 解析每个 app_id 的前缀和数字部分
3. 对每个数字,生成 ±1000 的范围
4. 合并重叠的范围
5. 扫描这些范围
**核心逻辑:**
```rust
// 1. 获取所有已知app_id
let existing_app_ids = db.get_all_app_ids().await?;
// 2. 为每个app_id生成扩展范围
for app_id in existing_app_ids {
if let Some((prefix, numeric_part)) = parse_app_id(&app_id) {
let start_range = numeric_part.saturating_sub(1000);
let end_range = numeric_part.saturating_add(1000);
all_ranges.insert((prefix, start_range, end_range));
}
}
// 3. 合并重叠范围
// 例如: (100, 1100) 和 (500, 1500) 合并为 (100, 1500)
// 4. 扫描合并后的范围
for (prefix, start, end) in merged_ranges {
for id in start..=end {
let app_id = format!("{}{}", prefix, id);
// 尝试获取应用
}
}
```
**Python 实现:**
```python
from typing import Tuple, Optional
import re
def parse_app_id(app_id: str) -> Optional[Tuple[str, int]]:
"""解析app_id返回(前缀, 数字)"""
match = re.match(r'^([A-Z]+)(\d+)$', app_id)
if match:
return match.group(1), int(match.group(2))
return None
async def guess_from_database(expand_range: int = 1000):
"""基于数据库已有数据扩展"""
db = Database()
# 1. 获取所有已知app_id
existing_ids = await db.get_all_app_ids()
# 2. 生成扩展范围
ranges = {}
for app_id in existing_ids:
parsed = parse_app_id(app_id)
if not parsed:
continue
prefix, num = parsed
start = max(0, num - expand_range)
end = num + expand_range
if prefix not in ranges:
ranges[prefix] = []
ranges[prefix].append((start, end))
# 3. 合并重叠范围
merged_ranges = {}
for prefix, range_list in ranges.items():
range_list.sort()
merged = []
current = range_list[0]
for r in range_list[1:]:
if r[0] <= current[1] + 1:
# 重叠或相邻,合并
current = (current[0], max(current[1], r[1]))
else:
merged.append(current)
current = r
merged.append(current)
merged_ranges[prefix] = merged
# 4. 扫描范围
api = HuaweiAPI()
for prefix, range_list in merged_ranges.items():
for start, end in range_list:
print(f"扫描范围: {prefix}{start} - {prefix}{end}")
await guess_market_apps(prefix, start, end)
```
#### C.2.4 从文件批量导入 (read_pkg_name)
**原理:** 从文本文件读取包名列表,批量获取应用数据。
**使用方式:**
```bash
# 创建包名列表文件
cat > pkg_names.txt << EOF
com.huawei.hmsapp.appgallery
com.tencent.mm
com.sina.weibo
EOF
# 运行工具
cargo run --bin read_pkg_name pkg_names.txt
```
**核心代码:**
```rust
// 从命令行参数获取文件路径
let cli_file = std::env::args().nth(1).ok_or_else(|| anyhow::anyhow!("No file path provided"))?;
// 读取文件中的包名
let pkg_names: Vec<String> = {
let file = std::fs::File::open(&cli_file)?;
let mut reader = std::io::BufReader::new(file);
let mut pkg_names = Vec::new();
let mut line = String::new();
while reader.read_line(&mut line)? > 0 {
pkg_names.push(line.trim().to_string());
line.clear();
}
pkg_names.into_iter()
.map(|l| l.trim_matches('\"').to_string())
.collect()
};
// 批量同步
sync::sync_all(&client, &db, &config).await?;
```
**Python 实现:**
```python
async def read_pkg_names_from_file(filepath: str):
"""从文件读取包名并批量获取"""
# 读取包名列表
with open(filepath, 'r', encoding='utf-8') as f:
pkg_names = [
line.strip().strip('"').strip("'")
for line in f
if line.strip()
]
print(f"从文件读取到 {len(pkg_names)} 个包名")
# 批量获取
api = HuaweiAPI()
db = Database()
for i in range(0, len(pkg_names), 100):
batch = pkg_names[i:i+100]
tasks = [
fetch_and_save_app(api, db, pkg_name)
for pkg_name in batch
]
await asyncio.gather(*tasks, return_exceptions=True)
print(f"已处理 {min(i+100, len(pkg_names))}/{len(pkg_names)}")
async def fetch_and_save_app(api: HuaweiAPI, db: Database, pkg_name: str):
"""获取并保存单个应用"""
try:
app_data = await api.get_app_info(pkg_name=pkg_name)
rating_data = await api.get_app_rating(app_data['appId'])
await db.save_app_data(app_data, rating_data)
print(f"✓ {pkg_name}")
except Exception as e:
print(f"✗ {pkg_name}: {e}")
```
#### C.2.5 Substance主题/合集)批量获取
**原理:** 华为应用市场有"主题"或"合集"功能,一个 substance 包含多个应用。
**Substance ID 格式:**
```
例如: webAgSubstanceDetail|12345
```
**核心逻辑:**
```rust
pub async fn get_app_from_substance(
client: &reqwest::Client,
api_url: &str,
substance_id: impl ToString,
) -> Result<(SubstanceData, JsonValue)> {
// 1. 请求 substance 详情
let body = serde_json::json!({
"pageId": format!("webAgSubstanceDetail|{}", substance_id.to_string()),
"pageNum": 1,
"pageSize": 100,
"zone": "",
"businessParam": { "animation": 0 }
});
let response = client.post(format!("{api_url}/harmony/page-detail"))
.json(&body)
.send()
.await?;
let data = response.json::<JsonValue>().await?;
// 2. 解析卡片数据提取应用ID
let layouts = data["pages"][0]["data"]["cardlist"]["layoutData"].as_array()?;
let mut apps = Vec::new();
for card in layouts {
match card["type"].as_str()? {
"com.huawei.hmsapp.appgallery.verticallistcard" => {
// 竖向列表卡片
for app in card["data"].as_array()? {
if let Some(app_id) = app.get("appId") {
apps.push(AppQuery::app_id(app_id.as_str()?));
}
}
}
"com.huawei.hmos.appgallery.scenariolistcard.landing" => {
// 场景列表卡片
let refs_list = card["data"][0]["refsList_app"].as_array()?;
for app in refs_list {
if let Some(app_id) = app.get("appId") {
apps.push(AppQuery::app_id(app_id.as_str()?));
}
}
}
_ => {}
}
}
// 3. 如果有更多页,继续获取
if data["hasMore"].as_i64()? != 0 {
let more_apps = get_more_substance(client, api_url, card_id).await?;
apps.extend(more_apps);
}
Ok((SubstanceData { id, title, apps }, data))
}
```
**Python 实现:**
```python
async def get_apps_from_substance(substance_id: str) -> List[str]:
"""从主题/合集获取应用列表"""
api = HuaweiAPI()
url = f"{api.base_url}/harmony/page-detail"
body = {
"pageId": f"webAgSubstanceDetail|{substance_id}",
"pageNum": 1,
"pageSize": 100,
"zone": "",
"businessParam": {"animation": 0}
}
tokens = await api.token_manager.get_token()
headers = {
"Content-Type": "application/json",
"Interface-Code": tokens["interface_code"],
"identity-id": tokens["identity_id"]
}
response = await api.client.post(url, json=body, headers=headers)
data = response.json()
app_ids = []
layouts = data["pages"][0]["data"]["cardlist"]["layoutData"]
for card in layouts:
card_type = card.get("type", "")
card_data = card.get("data", [])
if card_type == "com.huawei.hmsapp.appgallery.verticallistcard":
for app in card_data:
if "appId" in app:
app_ids.append(app["appId"])
elif card_type == "com.huawei.hmos.appgallery.scenariolistcard.landing":
if card_data and "refsList_app" in card_data[0]:
for app in card_data[0]["refsList_app"]:
if "appId" in app:
app_ids.append(app["appId"])
# 处理分页
if data.get("hasMore", 0) != 0:
card_id = data["cardlist"]["dataId"]
more_apps = await get_more_substance_pages(api, card_id)
app_ids.extend(more_apps)
return app_ids
async def get_more_substance_pages(api: HuaweiAPI, card_id: str) -> List[str]:
"""获取主题的更多页"""
app_ids = []
page_num = 2
has_more = True
while has_more:
url = f"{api.base_url}/harmony/card-list"
body = {
"dataId": card_id,
"locale": "zh",
"pageNum": page_num,
"pageSize": 25
}
response = await api.client.post(url, json=body)
data = response.json()
has_more = data.get("hasMore", 0) != 0
page_num += 1
for card in data.get("layoutData", []):
if card.get("type") == "com.huawei.hmsapp.appgallery.verticallistcard":
for app in card.get("data", []):
if "appId" in app:
app_ids.append(app["appId"])
return app_ids
```
### C.3 综合策略建议
**初始阶段(冷启动):**
1. 使用 `guess_market` 扫描已知的ID范围
2. 从华为应用市场首页爬取热门应用
3. 手动收集一些知名应用的包名
**扩展阶段:**
1. 使用 `guess_from_db` 基于已有数据扩展
2. 使用 `guess_rand` 随机发现新应用
3. 定期从 substance主题合集批量获取
**维护阶段:**
1. 定期同步已知包名的数据更新
2. 监控新应用ID的出现模式
3. 从用户投稿获取新包名
**效率优化:**
```python
# 组合策略示例
async def comprehensive_discovery():
"""综合发现策略"""
# 1. 先从数据库扩展(成功率高)
await guess_from_database(expand_range=500)
# 2. 扫描热门ID段
await guess_market_apps("C576588020785", 6000000, 6400000)
# 3. 随机探测(发现新应用)
asyncio.create_task(guess_random_apps()) # 后台运行
# 4. 定期同步已知应用
await sync_known_apps()
```
### C.4 注意事项
1. **请求频率控制**
- 批次间延迟25-50ms
- 单个请求超时30秒
- 并发数建议不超过1000
2. **错误处理**
- 应用不存在:静默跳过
- 网络错误重试3次
- Token过期自动刷新
3. **数据去重**
- 使用 app_id 或 pkg_name 作为唯一标识
- 插入前检查数据库是否已存在
4. **性能监控**
- 记录成功率(发现率)
- 监控请求耗时
- 统计每小时发现的新应用数
这些方法的组合使用,使得原项目能够高效地发现和收集华为应用市场的应用数据。