97 KiB
华为应用市场爬虫系统开发文档
基于原 Rust 项目的 Python + MySQL + Vue3 重构指南
📋 目录
1. 项目概述
1.1 项目目标
开发一个华为应用市场(AppGallery)数据采集与可视化系统,实现:
- 自动爬取华为应用市场的应用信息
- 存储应用的基本信息、版本历史、下载量、评分等数据
- 提供 Web 界面展示数据统计、排行榜、趋势分析
- 支持用户搜索、筛选、投稿应用
1.2 技术栈选型
后端:
- Python 3.10+
- FastAPI (Web 框架)
- SQLAlchemy (ORM)
- MySQL 8.0+
- APScheduler (定时任务)
- httpx / aiohttp (异步 HTTP 客户端)
前端:
- Vue 3 + TypeScript
- Vite (构建工具)
- Element Plus / Ant Design Vue (UI 组件库)
- ECharts / Chart.js (图表库)
- Axios (HTTP 客户端)
- Pinia (状态管理)
部署:
- Docker + Docker Compose
- Nginx (反向代理)
- Gunicorn / Uvicorn (ASGI 服务器)
2. 系统架构
2.1 整体架构图
┌─────────────────────────────────────────────────────────────┐
│ 用户浏览器 │
└────────────────────────┬────────────────────────────────────┘
│ HTTP/HTTPS
▼
┌─────────────────────────────────────────────────────────────┐
│ Nginx (反向代理) │
└──────────┬──────────────────────────────────┬───────────────┘
│ │
│ /api/* │ /*
▼ ▼
┌──────────────────────┐ ┌──────────────────────────┐
│ FastAPI 后端服务 │ │ Vue3 前端静态资源 │
│ - REST API │ │ - SPA 应用 │
│ - 数据查询 │ │ - 数据可视化 │
│ - 爬虫调度 │ └──────────────────────────┘
└──────────┬───────────┘
│
▼
┌──────────────────────┐ ┌──────────────────────────┐
│ MySQL 数据库 │◄─────────│ 爬虫调度器 │
│ - 应用信息 │ │ - APScheduler │
│ - 历史数据 │ │ - 定时同步 │
│ - 统计数据 │ │ - 批量处理 │
└──────────────────────┘ └──────────┬───────────────┘
│
▼
┌──────────────────────────┐
│ 华为应用市场 API │
│ - 应用信息接口 │
│ - 评分详情接口 │
└──────────────────────────┘
2.2 核心模块
- 爬虫模块 - 负责从华为 API 获取数据
- 数据处理模块 - 数据清洗、去重、入库
- API 服务模块 - 提供 RESTful API
- 调度模块 - 定时任务和批量处理
- 前端展示模块 - 数据可视化和交互
3. 数据源分析
3.1 华为应用市场 API
基础信息:
- API Base URL:
https://web-drcn.hispace.dbankcloud.com/edge - 需要动态获取认证 Token(interface-code 和 identity-id)
- Token 有效期约 10 分钟,需定期刷新
3.2 主要接口
3.2.1 获取应用基本信息
接口地址: POST /webedge/appinfo
请求头:
Content-Type: application/json
User-Agent: HuaweiMarketCrawler/1.0
interface-code: {动态获取的token}
identity-id: {动态获取的token}
请求体(按包名查询):
{
"pkgName": "com.huawei.hmsapp.appgallery",
"locale": "zh_CN"
}
请求体(按应用ID查询):
{
"appId": "C1164531384803416384",
"locale": "zh_CN"
}
响应示例:
{
"appId": "C1164531384803416384",
"name": "应用市场",
"pkgName": "com.huawei.hmsapp.appgallery",
"devId": "260086000000068459",
"developerName": "华为软件技术有限公司",
"devEnName": "Huawei Software Technologies Co., Ltd.",
"kindName": "工具",
"version": "6.3.2.302",
"size": 76591487,
"downCount": "14443706",
"rateNum": "125000",
"hot": "4.5",
"icon": "https://...",
"briefDes": "应用市场,点亮精彩生活",
"description": "...",
"releaseDate": 1234567890000,
"targetSdk": "12",
"minsdk": "9",
...
}
3.2.2 获取应用评分详情
接口地址: POST /harmony/page-detail
请求体:
{
"pageId": "webAgAppDetail|C1164531384803416384",
"pageNum": 1,
"pageSize": 100,
"zone": ""
}
响应示例:
{
"pages": [{
"data": {
"cardlist": {
"layoutData": [{
"type": "fl.card.comment",
"data": [{
"starInfo": "{\"averageRating\":\"4.5\",\"oneStarRatingCount\":100,\"twoStarRatingCount\":200,...}"
}]
}]
}
}
}]
}
3.3 Token 获取策略
Token 需要从华为网页端动态获取,建议实现方式:
- 方案一: 使用 Selenium/Playwright 模拟浏览器访问获取
- 方案二: 逆向分析 JS 代码,实现 Token 生成算法
- 方案三: 定期手动更新 Token(不推荐)
参考实现(伪代码):
import httpx
from playwright.async_api import async_playwright
async def get_huawei_token():
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
# 拦截网络请求获取 token
tokens = {}
async def handle_request(request):
if 'interface-code' in request.headers:
tokens['interface_code'] = request.headers['interface-code']
tokens['identity_id'] = request.headers['identity-id']
page.on('request', handle_request)
await page.goto('https://appgallery.huawei.com/')
await page.wait_for_timeout(3000)
await browser.close()
return tokens
3.4 数据字段说明
核心字段:
appId- 应用唯一标识(长度>15为鸿蒙应用)pkgName- 包名(唯一)name- 应用名称developerName- 开发者名称downCount- 下载量(字符串格式,如 "1000000+")rateNum- 评分人数hot- 热度评分version- 版本号size- 应用大小(字节)releaseDate- 发布时间(毫秒时间戳)targetSdk/minsdk- SDK 版本
注意事项:
- 部分字段可能为空,需要设置默认值
- 下载量可能包含 "+" 号,需要清洗
- 某些应用(元服务)包名以
com.atomicservice开头,无评分数据 - JSON 中可能包含
\0字符,需要清理
4. 数据库设计
4.1 MySQL 表结构
4.1.1 应用基本信息表 (app_info)
CREATE TABLE `app_info` (
`app_id` VARCHAR(50) PRIMARY KEY COMMENT '应用唯一ID',
`alliance_app_id` VARCHAR(50) COMMENT '联盟应用ID',
`name` VARCHAR(255) NOT NULL COMMENT '应用名称',
`pkg_name` VARCHAR(255) NOT NULL UNIQUE COMMENT '应用包名',
`dev_id` VARCHAR(50) NOT NULL COMMENT '开发者ID',
`developer_name` VARCHAR(255) NOT NULL COMMENT '开发者名称',
`dev_en_name` VARCHAR(255) COMMENT '开发者英文名称',
`supplier` VARCHAR(255) COMMENT '供应商名称',
`kind_id` INT NOT NULL COMMENT '应用分类ID',
`kind_name` VARCHAR(100) NOT NULL COMMENT '应用分类名称',
`tag_name` VARCHAR(255) COMMENT '标签名称',
`kind_type_id` INT NOT NULL COMMENT '类型ID',
`kind_type_name` VARCHAR(100) NOT NULL COMMENT '类型名称',
`icon_url` TEXT NOT NULL COMMENT '应用图标URL',
`brief_desc` TEXT NOT NULL COMMENT '简短描述',
`description` LONGTEXT NOT NULL COMMENT '应用详细描述',
`privacy_url` TEXT NOT NULL COMMENT '隐私政策链接',
`ctype` INT NOT NULL COMMENT '客户端类型',
`detail_id` VARCHAR(100) NOT NULL COMMENT '详情页ID',
`app_level` INT NOT NULL COMMENT '应用等级',
`jocat_id` INT NOT NULL COMMENT '分类ID',
`iap` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否含应用内购买',
`hms` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否依赖HMS',
`tariff_type` VARCHAR(50) NOT NULL COMMENT '资费类型',
`packing_type` INT NOT NULL COMMENT '打包类型',
`order_app` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否预装应用',
`denpend_gms` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否依赖GMS',
`denpend_hms` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否依赖HMS',
`force_update` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否强制更新',
`img_tag` VARCHAR(50) NOT NULL COMMENT '图片标签',
`is_pay` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否付费',
`is_disciplined` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否合规',
`is_shelves` TINYINT(1) NOT NULL DEFAULT 1 COMMENT '是否上架',
`submit_type` INT NOT NULL DEFAULT 0 COMMENT '提交类型',
`delete_archive` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否删除归档',
`charging` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否收费',
`button_grey` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '按钮是否置灰',
`app_gift` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否有礼包',
`free_days` INT NOT NULL DEFAULT 0 COMMENT '免费天数',
`pay_install_type` INT NOT NULL DEFAULT 0 COMMENT '付费安装类型',
`comment` JSON COMMENT '评论或注释数据',
`listed_at` DATETIME NOT NULL COMMENT '应用上架时间',
`release_countries` JSON COMMENT '应用发布的国家/地区列表',
`main_device_codes` JSON COMMENT '应用支持的主要设备类型',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX `idx_pkg_name` (`pkg_name`),
INDEX `idx_developer_name` (`developer_name`),
INDEX `idx_kind_name` (`kind_name`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='应用基本信息表';
4.1.2 应用指标表 (app_metrics)
CREATE TABLE `app_metrics` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
`app_id` VARCHAR(50) NOT NULL COMMENT '应用ID',
`pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名',
`version` VARCHAR(50) NOT NULL COMMENT '版本号',
`version_code` BIGINT NOT NULL COMMENT '版本代码',
`size_bytes` BIGINT NOT NULL COMMENT '应用大小(字节)',
`sha256` VARCHAR(64) NOT NULL COMMENT '安装包SHA256校验值',
`info_score` DECIMAL(3,1) NOT NULL COMMENT '信息评分',
`info_rate_count` BIGINT NOT NULL COMMENT '信息评分人数',
`download_count` BIGINT NOT NULL COMMENT '下载次数',
`price` DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '价格',
`release_date` BIGINT NOT NULL COMMENT '发布时间(时间戳毫秒)',
`new_features` TEXT COMMENT '新功能描述',
`upgrade_msg` TEXT COMMENT '升级信息',
`target_sdk` VARCHAR(20) NOT NULL COMMENT '目标SDK版本',
`min_sdk` VARCHAR(20) NOT NULL COMMENT '最小SDK版本',
`compile_sdk_version` INT DEFAULT 0 COMMENT '编译SDK版本',
`min_hmos_api_level` INT DEFAULT 0 COMMENT '最小HarmonyOS API等级',
`api_release_type` VARCHAR(50) DEFAULT 'Release' COMMENT 'API发布类型',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE,
FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE,
INDEX `idx_app_id` (`app_id`),
INDEX `idx_pkg_name` (`pkg_name`),
INDEX `idx_download_count` (`download_count`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='应用指标表';
4.1.3 应用评分表 (app_rating)
CREATE TABLE `app_rating` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
`app_id` VARCHAR(50) NOT NULL COMMENT '应用ID',
`pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名',
`average_rating` DECIMAL(3,2) NOT NULL COMMENT '平均评分',
`star_1_count` INT NOT NULL DEFAULT 0 COMMENT '1星评分数量',
`star_2_count` INT NOT NULL DEFAULT 0 COMMENT '2星评分数量',
`star_3_count` INT NOT NULL DEFAULT 0 COMMENT '3星评分数量',
`star_4_count` INT NOT NULL DEFAULT 0 COMMENT '4星评分数量',
`star_5_count` INT NOT NULL DEFAULT 0 COMMENT '5星评分数量',
`total_rating_count` INT NOT NULL DEFAULT 0 COMMENT '总评分数量',
`only_star_count` INT NOT NULL DEFAULT 0 COMMENT '仅星级数量',
`full_average_rating` VARCHAR(20) COMMENT '完整平均评分',
`source_type` VARCHAR(50) COMMENT '来源类型',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE,
FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE,
INDEX `idx_app_id` (`app_id`),
INDEX `idx_pkg_name` (`pkg_name`),
INDEX `idx_average_rating` (`average_rating`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='应用评分表';
4.1.4 原始数据历史表 (app_data_history)
CREATE TABLE `app_data_history` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
`app_id` VARCHAR(50) NOT NULL COMMENT '应用ID',
`pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名',
`raw_json_data` JSON NOT NULL COMMENT '原始应用数据JSON',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE,
FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE,
INDEX `idx_app_id` (`app_id`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='原始数据历史表';
4.1.5 评分历史表 (app_rating_history)
CREATE TABLE `app_rating_history` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
`app_id` VARCHAR(50) NOT NULL COMMENT '应用ID',
`pkg_name` VARCHAR(255) NOT NULL COMMENT '应用包名',
`raw_json_rating` JSON NOT NULL COMMENT '原始评分数据JSON',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
FOREIGN KEY (`app_id`) REFERENCES `app_info`(`app_id`) ON DELETE CASCADE,
FOREIGN KEY (`pkg_name`) REFERENCES `app_info`(`pkg_name`) ON DELETE CASCADE,
INDEX `idx_app_id` (`app_id`),
INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='评分历史表';
4.2 索引优化建议
-
联合索引:
(pkg_name, created_at)- 用于按包名查询历史(developer_name, download_count)- 用于开发者排行(kind_name, download_count)- 用于分类排行
-
全文索引:
name,brief_desc- 用于应用搜索
-
分区策略:
- 历史表按月分区,提高查询效率
5. 后端开发
5.1 项目结构
backend/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 应用入口
│ ├── config.py # 配置文件
│ ├── database.py # 数据库连接
│ ├── models/ # SQLAlchemy 模型
│ │ ├── __init__.py
│ │ ├── app_info.py
│ │ ├── app_metrics.py
│ │ └── app_rating.py
│ ├── schemas/ # Pydantic 模型
│ │ ├── __init__.py
│ │ ├── app.py
│ │ └── response.py
│ ├── api/ # API 路由
│ │ ├── __init__.py
│ │ ├── apps.py
│ │ ├── rankings.py
│ │ ├── charts.py
│ │ └── submit.py
│ ├── crawler/ # 爬虫模块
│ │ ├── __init__.py
│ │ ├── huawei_api.py # 华为API封装
│ │ ├── token_manager.py # Token管理
│ │ └── data_processor.py # 数据处理
│ ├── scheduler/ # 调度模块
│ │ ├── __init__.py
│ │ └── tasks.py
│ └── utils/ # 工具函数
│ ├── __init__.py
│ └── helpers.py
├── requirements.txt
├── .env.example
└── README.md
5.2 核心代码实现
5.2.1 配置文件 (config.py)
from pydantic_settings import BaseSettings
from typing import List
class Settings(BaseSettings):
# 数据库配置
MYSQL_HOST: str = "localhost"
MYSQL_PORT: int = 3306
MYSQL_USER: str = "root"
MYSQL_PASSWORD: str = "password"
MYSQL_DATABASE: str = "huawei_market"
# 华为API配置
HUAWEI_API_BASE_URL: str = "https://web-drcn.hispace.dbankcloud.com/edge"
HUAWEI_LOCALE: str = "zh_CN"
# 爬虫配置
CRAWLER_INTERVAL: int = 1800 # 同步间隔(秒)
CRAWLER_BATCH_SIZE: int = 100 # 批量处理大小
CRAWLER_TIMEOUT: int = 30 # 请求超时(秒)
# API配置
API_PREFIX: str = "/api"
API_TITLE: str = "华为应用市场数据API"
API_VERSION: str = "1.0.0"
# 其他配置
DEBUG: bool = False
CORS_ORIGINS: List[str] = ["http://localhost:5173", "http://localhost:3000"]
@property
def database_url(self) -> str:
return f"mysql+aiomysql://{self.MYSQL_USER}:{self.MYSQL_PASSWORD}@{self.MYSQL_HOST}:{self.MYSQL_PORT}/{self.MYSQL_DATABASE}"
class Config:
env_file = ".env"
settings = Settings()
5.2.2 数据库连接 (database.py)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.config import settings
# 创建异步引擎
engine = create_async_engine(
settings.database_url,
echo=settings.DEBUG,
pool_size=10,
max_overflow=20,
pool_pre_ping=True
)
# 创建异步会话工厂
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
# 创建基类
Base = declarative_base()
# 依赖注入
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
5.2.3 数据模型 (models/app_info.py)
from sqlalchemy import Column, String, Integer, Text, DateTime, Boolean, JSON, DECIMAL, BigInteger
from sqlalchemy.sql import func
from app.database import Base
class AppInfo(Base):
__tablename__ = "app_info"
app_id = Column(String(50), primary_key=True, comment="应用唯一ID")
alliance_app_id = Column(String(50), comment="联盟应用ID")
name = Column(String(255), nullable=False, comment="应用名称")
pkg_name = Column(String(255), nullable=False, unique=True, index=True, comment="应用包名")
dev_id = Column(String(50), nullable=False, comment="开发者ID")
developer_name = Column(String(255), nullable=False, index=True, comment="开发者名称")
dev_en_name = Column(String(255), comment="开发者英文名称")
supplier = Column(String(255), comment="供应商名称")
kind_id = Column(Integer, nullable=False, comment="应用分类ID")
kind_name = Column(String(100), nullable=False, index=True, comment="应用分类名称")
tag_name = Column(String(255), comment="标签名称")
kind_type_id = Column(Integer, nullable=False, comment="类型ID")
kind_type_name = Column(String(100), nullable=False, comment="类型名称")
icon_url = Column(Text, nullable=False, comment="应用图标URL")
brief_desc = Column(Text, nullable=False, comment="简短描述")
description = Column(Text, nullable=False, comment="应用详细描述")
privacy_url = Column(Text, nullable=False, comment="隐私政策链接")
# 布尔字段
iap = Column(Boolean, default=False, comment="是否含应用内购买")
hms = Column(Boolean, default=False, comment="是否依赖HMS")
is_pay = Column(Boolean, default=False, comment="是否付费")
is_shelves = Column(Boolean, default=True, comment="是否上架")
# JSON字段
comment = Column(JSON, comment="评论或注释数据")
release_countries = Column(JSON, comment="应用发布的国家/地区列表")
main_device_codes = Column(JSON, comment="应用支持的主要设备类型")
# 时间字段
listed_at = Column(DateTime, nullable=False, comment="应用上架时间")
created_at = Column(DateTime, nullable=False, server_default=func.now(), comment="创建时间")
updated_at = Column(DateTime, nullable=False, server_default=func.now(), onupdate=func.now(), comment="更新时间")
5.2.4 华为API封装 (crawler/huawei_api.py)
import httpx
import asyncio
import json
from typing import Optional, Dict, Any
from app.config import settings
from app.crawler.token_manager import TokenManager
class HuaweiAPI:
def __init__(self):
self.base_url = settings.HUAWEI_API_BASE_URL
self.locale = settings.HUAWEI_LOCALE
self.token_manager = TokenManager()
self.client = httpx.AsyncClient(timeout=settings.CRAWLER_TIMEOUT)
async def get_app_info(self, pkg_name: Optional[str] = None, app_id: Optional[str] = None) -> Dict[str, Any]:
"""获取应用基本信息"""
if not pkg_name and not app_id:
raise ValueError("必须提供 pkg_name 或 app_id")
# 获取token
tokens = await self.token_manager.get_token()
# 构建请求
url = f"{self.base_url}/webedge/appinfo"
headers = {
"Content-Type": "application/json",
"User-Agent": "HuaweiMarketCrawler/1.0",
"interface-code": tokens["interface_code"],
"identity-id": tokens["identity_id"]
}
body = {"locale": self.locale}
if pkg_name:
body["pkgName"] = pkg_name
else:
body["appId"] = app_id
# 发送请求
response = await self.client.post(url, headers=headers, json=body)
response.raise_for_status()
data = response.json()
# 数据清洗
return self._clean_data(data)
async def get_app_rating(self, app_id: str) -> Optional[Dict[str, Any]]:
"""获取应用评分详情"""
# 跳过元服务
if app_id.startswith("com.atomicservice"):
return None
tokens = await self.token_manager.get_token()
url = f"{self.base_url}/harmony/page-detail"
headers = {
"Content-Type": "application/json",
"User-Agent": "HuaweiMarketCrawler/1.0",
"Interface-Code": tokens["interface_code"],
"identity-id": tokens["identity_id"]
}
body = {
"pageId": f"webAgAppDetail|{app_id}",
"pageNum": 1,
"pageSize": 100,
"zone": ""
}
try:
response = await self.client.post(url, headers=headers, json=body)
response.raise_for_status()
data = response.json()
# 解析评分数据
layouts = data["pages"][0]["data"]["cardlist"]["layoutData"]
comment_cards = [l for l in layouts if l.get("type") == "fl.card.comment"]
if not comment_cards:
return None
star_info_str = comment_cards[0]["data"][0]["starInfo"]
return json.loads(star_info_str)
except Exception as e:
print(f"获取评分失败: {e}")
return None
def _clean_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""清洗数据"""
# 移除 \0 字符
for key, value in data.items():
if isinstance(value, str):
data[key] = value.replace('\x00', '')
# 移除 AG-TraceId
data.pop('AG-TraceId', None)
# 验证 appId 长度
if len(data.get('appId', '')) < 15:
raise ValueError("appId长度小于15,可能是安卓应用")
return data
async def close(self):
"""关闭客户端"""
await self.client.aclose()
5.2.5 Token管理器 (crawler/token_manager.py)
import asyncio
from datetime import datetime, timedelta
from typing import Dict
from playwright.async_api import async_playwright
class TokenManager:
def __init__(self):
self.tokens: Dict[str, str] = {}
self.token_expires_at: datetime = datetime.now()
self.lock = asyncio.Lock()
async def get_token(self) -> Dict[str, str]:
"""获取有效的token"""
async with self.lock:
if datetime.now() >= self.token_expires_at or not self.tokens:
await self._refresh_token()
return self.tokens
async def _refresh_token(self):
"""刷新token"""
print("正在刷新token...")
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# 拦截请求获取token
tokens = {}
async def handle_request(request):
headers = request.headers
if 'interface-code' in headers:
tokens['interface_code'] = headers['interface-code']
tokens['identity_id'] = headers['identity-id']
page.on('request', handle_request)
# 访问华为应用市场
await page.goto('https://appgallery.huawei.com/', wait_until='networkidle')
await page.wait_for_timeout(3000)
await browser.close()
if tokens:
self.tokens = tokens
# token有效期设为10分钟
self.token_expires_at = datetime.now() + timedelta(minutes=10)
print(f"Token刷新成功,有效期至: {self.token_expires_at}")
else:
raise Exception("无法获取token")
5.2.6 数据处理器 (crawler/data_processor.py)
from typing import Dict, Any, Optional, Tuple
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.app_info import AppInfo
from app.models.app_metrics import AppMetrics
from app.models.app_rating import AppRating
from app.models.app_data_history import AppDataHistory
from app.models.app_rating_history import AppRatingHistory
class DataProcessor:
def __init__(self, db: AsyncSession):
self.db = db
async def save_app_data(
self,
app_data: Dict[str, Any],
rating_data: Optional[Dict[str, Any]] = None,
comment: Optional[Dict[str, Any]] = None
) -> Tuple[bool, bool, bool]:
"""
保存应用数据
返回: (是否插入新应用信息, 是否插入新指标, 是否插入新评分)
"""
app_id = app_data['appId']
pkg_name = app_data['pkgName']
# 检查应用是否存在
result = await self.db.execute(
select(AppInfo).where(AppInfo.app_id == app_id)
)
existing_app = result.scalar_one_or_none()
# 保存应用基本信息
info_inserted = False
if not existing_app or await self._is_info_changed(existing_app, app_data):
await self._save_app_info(app_data, comment)
info_inserted = True
# 保存应用指标
metric_inserted = False
if await self._should_save_metric(app_id, app_data):
await self._save_app_metric(app_data)
metric_inserted = True
# 保存评分数据
rating_inserted = False
if rating_data and await self._should_save_rating(app_id, rating_data):
await self._save_app_rating(app_id, pkg_name, rating_data)
rating_inserted = True
# 保存原始数据历史
if info_inserted or metric_inserted:
await self._save_data_history(app_id, pkg_name, app_data)
if rating_inserted:
await self._save_rating_history(app_id, pkg_name, rating_data)
await self.db.commit()
return info_inserted, metric_inserted, rating_inserted
async def _save_app_info(self, data: Dict[str, Any], comment: Optional[Dict] = None):
"""保存应用基本信息"""
app_info = AppInfo(
app_id=data['appId'],
alliance_app_id=data.get('allianceAppId', ''),
name=data['name'],
pkg_name=data['pkgName'],
dev_id=data['devId'],
developer_name=data['developerName'],
dev_en_name=data.get('devEnName', ''),
supplier=data.get('supplier', ''),
kind_id=int(data['kindId']),
kind_name=data['kindName'],
tag_name=data.get('tagName'),
kind_type_id=int(data['kindTypeId']),
kind_type_name=data['kindTypeName'],
icon_url=data['icon'],
brief_desc=data['briefDes'],
description=data['description'],
privacy_url=data['privacyUrl'],
iap=bool(data.get('iap', 0)),
hms=bool(data.get('hms', 0)),
is_pay=data.get('isPay') == '1',
is_shelves=bool(data.get('isShelves', 1)),
comment=comment,
release_countries=data.get('releaseCountries', []),
main_device_codes=data.get('mainDeviceCodes', []),
listed_at=datetime.fromtimestamp(data.get('releaseDate', 0) / 1000)
)
# 使用 merge 实现 upsert
self.db.add(app_info)
async def _save_app_metric(self, data: Dict[str, Any]):
"""保存应用指标"""
# 清洗下载量数据
download_count = self._parse_download_count(data.get('downCount', '0'))
metric = AppMetrics(
app_id=data['appId'],
pkg_name=data['pkgName'],
version=data['version'],
version_code=int(data['versionCode']),
size_bytes=int(data['size']),
sha256=data.get('sha256', ''),
info_score=float(data.get('hot', '0.0')),
info_rate_count=int(data.get('rateNum', '0')),
download_count=download_count,
price=float(data.get('price', '0')),
release_date=int(data.get('releaseDate', 0)),
new_features=data.get('newFeatures', ''),
upgrade_msg=data.get('upgradeMsg', ''),
target_sdk=data.get('targetSdk', ''),
min_sdk=data.get('minsdk', ''),
compile_sdk_version=int(data.get('compileSdkVersion', 0)),
min_hmos_api_level=int(data.get('minHmosApiLevel', 0)),
api_release_type=data.get('apiReleaseType', 'Release')
)
self.db.add(metric)
async def _save_app_rating(self, app_id: str, pkg_name: str, data: Dict[str, Any]):
"""保存应用评分"""
rating = AppRating(
app_id=app_id,
pkg_name=pkg_name,
average_rating=float(data['averageRating']),
star_1_count=int(data['oneStarRatingCount']),
star_2_count=int(data['twoStarRatingCount']),
star_3_count=int(data['threeStarRatingCount']),
star_4_count=int(data['fourStarRatingCount']),
star_5_count=int(data['fiveStarRatingCount']),
total_rating_count=int(data['totalStarRatingCount']),
only_star_count=int(data.get('onlyStarCount', 0)),
full_average_rating=data.get('fullAverageRating', ''),
source_type=data.get('sourceType', '')
)
self.db.add(rating)
def _parse_download_count(self, count_str: str) -> int:
"""解析下载量字符串"""
# 移除 + 号和其他非数字字符
count_str = count_str.replace('+', '').replace(',', '')
try:
return int(count_str)
except ValueError:
return 0
async def _is_info_changed(self, existing: AppInfo, new_data: Dict) -> bool:
"""检查应用信息是否变化"""
return (
existing.name != new_data['name'] or
existing.version != new_data.get('version', '') or
existing.description != new_data.get('description', '')
)
async def _should_save_metric(self, app_id: str, data: Dict) -> bool:
"""判断是否需要保存新的指标数据"""
# 查询最新的指标
result = await self.db.execute(
select(AppMetrics)
.where(AppMetrics.app_id == app_id)
.order_by(AppMetrics.created_at.desc())
.limit(1)
)
latest_metric = result.scalar_one_or_none()
if not latest_metric:
return True
# 比较关键字段
return (
latest_metric.version != data['version'] or
latest_metric.download_count != self._parse_download_count(data.get('downCount', '0'))
)
async def _should_save_rating(self, app_id: str, data: Dict) -> bool:
"""判断是否需要保存新的评分数据"""
result = await self.db.execute(
select(AppRating)
.where(AppRating.app_id == app_id)
.order_by(AppRating.created_at.desc())
.limit(1)
)
latest_rating = result.scalar_one_or_none()
if not latest_rating:
return True
return (
float(latest_rating.average_rating) != float(data['averageRating']) or
latest_rating.total_rating_count != int(data['totalStarRatingCount'])
)
5.2.7 API路由 (api/apps.py)
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, or_
from typing import Optional, List
from app.database import get_db
from app.models.app_info import AppInfo
from app.models.app_metrics import AppMetrics
from app.models.app_rating import AppRating
from app.schemas.response import ApiResponse
from app.crawler.huawei_api import HuaweiAPI
from app.crawler.data_processor import DataProcessor
router = APIRouter(prefix="/apps", tags=["应用"])
@router.get("/pkg_name/{pkg_name}")
async def get_app_by_pkg_name(
pkg_name: str,
db: AsyncSession = Depends(get_db)
):
"""按包名查询应用"""
# 尝试从API获取最新数据
api = HuaweiAPI()
try:
app_data = await api.get_app_info(pkg_name=pkg_name)
rating_data = await api.get_app_rating(app_data['appId'])
# 保存到数据库
processor = DataProcessor(db)
new_info, new_metric, new_rating = await processor.save_app_data(
app_data, rating_data
)
# 查询完整数据
result = await db.execute(
select(AppInfo, AppMetrics, AppRating)
.join(AppMetrics, AppInfo.app_id == AppMetrics.app_id)
.outerjoin(AppRating, AppInfo.app_id == AppRating.app_id)
.where(AppInfo.pkg_name == pkg_name)
.order_by(AppMetrics.created_at.desc())
.limit(1)
)
row = result.first()
return ApiResponse(
success=True,
data={
"info": row[0].__dict__ if row else None,
"metric": row[1].__dict__ if row and len(row) > 1 else None,
"rating": row[2].__dict__ if row and len(row) > 2 else None,
"new_info": new_info,
"new_metric": new_metric,
"new_rating": new_rating,
"get_data": True
}
)
except Exception as e:
# 回退到数据库数据
result = await db.execute(
select(AppInfo, AppMetrics, AppRating)
.join(AppMetrics, AppInfo.app_id == AppMetrics.app_id)
.outerjoin(AppRating, AppInfo.app_id == AppRating.app_id)
.where(AppInfo.pkg_name == pkg_name)
.order_by(AppMetrics.created_at.desc())
.limit(1)
)
row = result.first()
if not row:
raise HTTPException(status_code=404, detail=f"应用 {pkg_name} 不存在")
return ApiResponse(
success=True,
data={
"info": row[0].__dict__,
"metric": row[1].__dict__ if len(row) > 1 else None,
"rating": row[2].__dict__ if len(row) > 2 else None,
"get_data": False,
"error": str(e)
}
)
finally:
await api.close()
@router.get("/list/{page}")
async def get_app_list(
page: int = 1,
page_size: int = Query(100, le=500),
detail: bool = True,
sort: Optional[str] = None,
desc: bool = True,
search_key: Optional[str] = None,
search_value: Optional[str] = None,
search_exact: bool = False,
db: AsyncSession = Depends(get_db)
):
"""分页获取应用列表"""
# 构建基础查询
if detail:
query = select(AppInfo, AppMetrics, AppRating).join(
AppMetrics, AppInfo.app_id == AppMetrics.app_id
).outerjoin(
AppRating, AppInfo.app_id == AppRating.app_id
)
else:
query = select(AppInfo)
# 搜索过滤
if search_key and search_value:
if search_exact:
query = query.where(getattr(AppInfo, search_key) == search_value)
else:
query = query.where(getattr(AppInfo, search_key).like(f"%{search_value}%"))
# 排序
if sort:
order_column = getattr(AppMetrics if hasattr(AppMetrics, sort) else AppInfo, sort)
query = query.order_by(order_column.desc() if desc else order_column.asc())
else:
query = query.order_by(AppMetrics.download_count.desc())
# 计算总数
count_query = select(func.count()).select_from(AppInfo)
if search_key and search_value:
if search_exact:
count_query = count_query.where(getattr(AppInfo, search_key) == search_value)
else:
count_query = count_query.where(getattr(AppInfo, search_key).like(f"%{search_value}%"))
total_result = await db.execute(count_query)
total_count = total_result.scalar()
# 分页
offset = (page - 1) * page_size
query = query.offset(offset).limit(page_size)
result = await db.execute(query)
rows = result.all()
# 格式化数据
data = []
for row in rows:
if detail:
data.append({
"info": row[0].__dict__,
"metric": row[1].__dict__ if len(row) > 1 else None,
"rating": row[2].__dict__ if len(row) > 2 else None
})
else:
data.append(row[0].__dict__)
return ApiResponse(
success=True,
data=data,
total=total_count,
limit=page_size
)
@router.get("/metrics/{pkg_name}")
async def get_app_metrics_history(
pkg_name: str,
db: AsyncSession = Depends(get_db)
):
"""获取应用指标历史"""
result = await db.execute(
select(AppMetrics)
.where(AppMetrics.pkg_name == pkg_name)
.order_by(AppMetrics.created_at.desc())
)
metrics = result.scalars().all()
return ApiResponse(
success=True,
data=[m.__dict__ for m in metrics]
)
5.2.8 排行榜API (api/rankings.py)
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from datetime import datetime, timedelta
from app.database import get_db
from app.models.app_info import AppInfo
from app.models.app_metrics import AppMetrics
from app.models.app_rating import AppRating
from app.schemas.response import ApiResponse
router = APIRouter(prefix="/rankings", tags=["排行榜"])
@router.get("/top-downloads")
async def get_top_downloads(
limit: int = Query(10, le=100),
exclude_pattern: str = Query(None),
db: AsyncSession = Depends(get_db)
):
"""下载量排行榜"""
# 子查询:获取每个应用的最新指标
subquery = (
select(
AppMetrics.app_id,
func.max(AppMetrics.created_at).label('max_created_at')
)
.group_by(AppMetrics.app_id)
.subquery()
)
# 主查询
query = (
select(AppInfo, AppMetrics)
.join(AppMetrics, AppInfo.app_id == AppMetrics.app_id)
.join(
subquery,
and_(
AppMetrics.app_id == subquery.c.app_id,
AppMetrics.created_at == subquery.c.max_created_at
)
)
.order_by(AppMetrics.download_count.desc())
.limit(limit)
)
# 排除模式
if exclude_pattern:
query = query.where(~AppInfo.pkg_name.like(f"%{exclude_pattern}%"))
result = await db.execute(query)
rows = result.all()
data = [
{
"app_id": row[0].app_id,
"name": row[0].name,
"pkg_name": row[0].pkg_name,
"developer_name": row[0].developer_name,
"icon_url": row[0].icon_url,
"download_count": row[1].download_count,
"version": row[1].version
}
for row in rows
]
return ApiResponse(success=True, data=data, limit=limit)
@router.get("/ratings")
async def get_top_ratings(
limit: int = Query(10, le=100),
db: AsyncSession = Depends(get_db)
):
"""评分排行榜"""
subquery = (
select(
AppRating.app_id,
func.max(AppRating.created_at).label('max_created_at')
)
.group_by(AppRating.app_id)
.subquery()
)
query = (
select(AppInfo, AppRating)
.join(AppRating, AppInfo.app_id == AppRating.app_id)
.join(
subquery,
and_(
AppRating.app_id == subquery.c.app_id,
AppRating.created_at == subquery.c.max_created_at
)
)
.where(AppRating.total_rating_count >= 100) # 至少100个评分
.order_by(AppRating.average_rating.desc())
.limit(limit)
)
result = await db.execute(query)
rows = result.all()
data = [
{
"app_id": row[0].app_id,
"name": row[0].name,
"pkg_name": row[0].pkg_name,
"developer_name": row[0].developer_name,
"icon_url": row[0].icon_url,
"average_rating": float(row[1].average_rating),
"total_rating_count": row[1].total_rating_count
}
for row in rows
]
return ApiResponse(success=True, data=data, limit=limit)
@router.get("/developers")
async def get_top_developers(
limit: int = Query(10, le=100),
db: AsyncSession = Depends(get_db)
):
"""开发者排行榜(按应用数量)"""
query = (
select(
AppInfo.developer_name,
func.count(AppInfo.app_id).label('app_count'),
func.sum(AppMetrics.download_count).label('total_downloads')
)
.join(AppMetrics, AppInfo.app_id == AppMetrics.app_id)
.group_by(AppInfo.developer_name)
.order_by(func.count(AppInfo.app_id).desc())
.limit(limit)
)
result = await db.execute(query)
rows = result.all()
data = [
{
"developer_name": row[0],
"app_count": row[1],
"total_downloads": row[2] or 0
}
for row in rows
]
return ApiResponse(success=True, data=data, limit=limit)
5.2.9 定时任务 (scheduler/tasks.py)
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import AsyncSessionLocal
from app.config import settings
from app.crawler.huawei_api import HuaweiAPI
from app.crawler.data_processor import DataProcessor
import asyncio
import random
class CrawlerScheduler:
def __init__(self):
self.scheduler = AsyncIOScheduler()
self.is_running = False
def start(self):
"""启动调度器"""
# 添加定时任务
self.scheduler.add_job(
self.sync_all_apps,
trigger=IntervalTrigger(seconds=settings.CRAWLER_INTERVAL),
id='sync_all_apps',
name='同步所有应用',
replace_existing=True
)
self.scheduler.start()
print(f"调度器已启动,同步间隔: {settings.CRAWLER_INTERVAL}秒")
def stop(self):
"""停止调度器"""
self.scheduler.shutdown()
print("调度器已停止")
async def sync_all_apps(self):
"""同步所有应用"""
if self.is_running:
print("上一次同步尚未完成,跳过本次同步")
return
self.is_running = True
print(f"开始同步所有应用 - {datetime.now()}")
try:
async with AsyncSessionLocal() as db:
# 获取所有包名
from sqlalchemy import select
from app.models.app_info import AppInfo
result = await db.execute(select(AppInfo.pkg_name))
pkg_names = [row[0] for row in result.all()]
# 随机打乱顺序
random.shuffle(pkg_names)
print(f"共需同步 {len(pkg_names)} 个应用")
# 批量处理
api = HuaweiAPI()
processor = DataProcessor(db)
total_processed = 0
total_inserted = 0
total_failed = 0
for i in range(0, len(pkg_names), settings.CRAWLER_BATCH_SIZE):
batch = pkg_names[i:i + settings.CRAWLER_BATCH_SIZE]
# 并发处理批次
tasks = [
self._sync_single_app(api, processor, pkg_name)
for pkg_name in batch
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
for result in results:
total_processed += 1
if isinstance(result, Exception):
total_failed += 1
elif result:
total_inserted += 1
print(f"已处理 {total_processed}/{len(pkg_names)} 个应用")
# 批次间延迟
await asyncio.sleep(0.5)
await api.close()
print(f"同步完成 - 处理: {total_processed}, 更新: {total_inserted}, 失败: {total_failed}")
except Exception as e:
print(f"同步失败: {e}")
finally:
self.is_running = False
async def _sync_single_app(
self,
api: HuaweiAPI,
processor: DataProcessor,
pkg_name: str
) -> bool:
"""同步单个应用"""
try:
# 获取应用数据
app_data = await api.get_app_info(pkg_name=pkg_name)
rating_data = await api.get_app_rating(app_data['appId'])
# 保存数据
new_info, new_metric, new_rating = await processor.save_app_data(
app_data, rating_data
)
return new_info or new_metric or new_rating
except Exception as e:
print(f"同步 {pkg_name} 失败: {e}")
return False
# 全局调度器实例
scheduler = CrawlerScheduler()
5.2.10 主应用 (main.py)
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from app.config import settings
from app.api import apps, rankings, charts, submit
from app.scheduler.tasks import scheduler
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时
print("应用启动中...")
scheduler.start()
yield
# 关闭时
print("应用关闭中...")
scheduler.stop()
# 创建FastAPI应用
app = FastAPI(
title=settings.API_TITLE,
version=settings.API_VERSION,
lifespan=lifespan
)
# CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(apps.router, prefix=settings.API_PREFIX)
app.include_router(rankings.router, prefix=settings.API_PREFIX)
app.include_router(charts.router, prefix=settings.API_PREFIX)
app.include_router(submit.router, prefix=settings.API_PREFIX)
@app.get("/")
async def root():
return {"message": "华为应用市场数据API", "version": settings.API_VERSION}
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
reload=settings.DEBUG
)
5.3 依赖文件 (requirements.txt)
fastapi==0.109.0
uvicorn[standard]==0.27.0
sqlalchemy==2.0.25
aiomysql==0.2.0
pydantic==2.5.3
pydantic-settings==2.1.0
httpx==0.26.0
playwright==1.41.0
apscheduler==3.10.4
python-dotenv==1.0.0
python-multipart==0.0.6
5.4 环境配置 (.env.example)
# 数据库配置
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=your_password
MYSQL_DATABASE=huawei_market
# 华为API配置
HUAWEI_API_BASE_URL=https://web-drcn.hispace.dbankcloud.com/edge
HUAWEI_LOCALE=zh_CN
# 爬虫配置
CRAWLER_INTERVAL=1800
CRAWLER_BATCH_SIZE=100
CRAWLER_TIMEOUT=30
# API配置
API_PREFIX=/api
API_TITLE=华为应用市场数据API
API_VERSION=1.0.0
# 其他配置
DEBUG=False
CORS_ORIGINS=["http://localhost:5173","http://localhost:3000"]
6. 前端开发
6.1 项目结构
frontend/
├── public/
│ └── favicon.ico
├── src/
│ ├── assets/ # 静态资源
│ │ ├── styles/
│ │ │ └── main.css
│ │ └── images/
│ ├── components/ # 组件
│ │ ├── AppCard.vue
│ │ ├── AppTable.vue
│ │ ├── ChartCard.vue
│ │ ├── StatCard.vue
│ │ └── SearchBar.vue
│ ├── views/ # 页面
│ │ ├── Dashboard.vue
│ │ ├── AppDetail.vue
│ │ └── Rankings.vue
│ ├── api/ # API封装
│ │ ├── index.ts
│ │ └── apps.ts
│ ├── stores/ # 状态管理
│ │ └── app.ts
│ ├── types/ # 类型定义
│ │ └── app.ts
│ ├── utils/ # 工具函数
│ │ └── format.ts
│ ├── router/ # 路由
│ │ └── index.ts
│ ├── App.vue
│ └── main.ts
├── index.html
├── package.json
├── tsconfig.json
├── vite.config.ts
└── README.md
6.2 核心代码实现
6.2.1 类型定义 (types/app.ts)
export interface AppInfo {
app_id: string
name: string
pkg_name: string
developer_name: string
dev_en_name?: string
kind_name: string
kind_type_name: string
icon_url: string
brief_desc: string
description: string
privacy_url: string
iap: boolean
is_pay: boolean
listed_at: string
created_at: string
}
export interface AppMetric {
id: number
app_id: string
pkg_name: string
version: string
version_code: number
size_bytes: number
download_count: number
info_score: number
info_rate_count: number
price: number
release_date: number
target_sdk: string
min_sdk: string
created_at: string
}
export interface AppRating {
id: number
app_id: string
average_rating: number
star_1_count: number
star_2_count: number
star_3_count: number
star_4_count: number
star_5_count: number
total_rating_count: number
created_at: string
}
export interface FullAppInfo {
info: AppInfo
metric: AppMetric
rating?: AppRating
}
export interface ApiResponse<T = any> {
success: boolean
data: T
total?: number
limit?: number
timestamp: string
}
export interface MarketStats {
app_count: {
total: number
apps: number
atomic_services: number
}
developer_count: number
}
export interface RankingItem {
app_id: string
name: string
pkg_name: string
developer_name: string
icon_url: string
download_count?: number
average_rating?: number
total_rating_count?: number
}
6.2.2 API封装 (api/apps.ts)
import axios from 'axios'
import type { ApiResponse, FullAppInfo, MarketStats, RankingItem } from '@/types/app'
const api = axios.create({
baseURL: import.meta.env.VITE_API_BASE_URL || 'http://localhost:8000/api',
timeout: 30000
})
// 请求拦截器
api.interceptors.request.use(
config => {
// 可以在这里添加token等
return config
},
error => {
return Promise.reject(error)
}
)
// 响应拦截器
api.interceptors.response.use(
response => {
return response.data
},
error => {
console.error('API Error:', error)
return Promise.reject(error)
}
)
export const appsApi = {
// 获取市场统计信息
getMarketInfo: () =>
api.get<any, ApiResponse<MarketStats>>('/market_info'),
// 按包名查询应用
getAppByPkgName: (pkgName: string) =>
api.get<any, ApiResponse<FullAppInfo>>(`/apps/pkg_name/${pkgName}`),
// 按应用ID查询
getAppById: (appId: string) =>
api.get<any, ApiResponse<FullAppInfo>>(`/apps/app_id/${appId}`),
// 获取应用列表
getAppList: (params: {
page: number
page_size?: number
detail?: boolean
sort?: string
desc?: boolean
search_key?: string
search_value?: string
search_exact?: boolean
}) =>
api.get<any, ApiResponse<FullAppInfo[]>>(`/apps/list/${params.page}`, { params }),
// 获取应用指标历史
getAppMetrics: (pkgName: string) =>
api.get<any, ApiResponse<any[]>>(`/apps/metrics/${pkgName}`),
// 获取下载排行
getTopDownloads: (params?: { limit?: number; exclude_pattern?: string }) =>
api.get<any, ApiResponse<RankingItem[]>>('/rankings/top-downloads', { params }),
// 获取评分排行
getTopRatings: (params?: { limit?: number }) =>
api.get<any, ApiResponse<RankingItem[]>>('/rankings/ratings', { params }),
// 获取开发者排行
getTopDevelopers: (params?: { limit?: number }) =>
api.get<any, ApiResponse<any[]>>('/rankings/developers', { params }),
// 获取评分分布
getRatingDistribution: () =>
api.get<any, ApiResponse<Record<string, number>>>('/charts/rating'),
// 获取SDK分布
getMinSdkDistribution: () =>
api.get<any, ApiResponse<Record<string, number>>>('/charts/min_sdk'),
getTargetSdkDistribution: () =>
api.get<any, ApiResponse<Record<string, number>>>('/charts/target_sdk'),
// 投稿应用
submitApp: (data: {
pkg_name?: string
app_id?: string
comment?: any
}) =>
api.post<any, ApiResponse<any>>('/submit', data)
}
export default api
6.2.3 状态管理 (stores/app.ts)
import { defineStore } from 'pinia'
import { ref, computed } from 'vue'
import { appsApi } from '@/api/apps'
import type { MarketStats, FullAppInfo } from '@/types/app'
export const useAppStore = defineStore('app', () => {
// 状态
const marketStats = ref<MarketStats | null>(null)
const appList = ref<FullAppInfo[]>([])
const currentPage = ref(1)
const pageSize = ref(100)
const totalCount = ref(0)
const loading = ref(false)
// 计算属性
const totalPages = computed(() => Math.ceil(totalCount.value / pageSize.value))
// 方法
const fetchMarketStats = async () => {
try {
const response = await appsApi.getMarketInfo()
if (response.success) {
marketStats.value = response.data
}
} catch (error) {
console.error('获取市场统计失败:', error)
}
}
const fetchAppList = async (params: {
page?: number
page_size?: number
sort?: string
desc?: boolean
search_key?: string
search_value?: string
search_exact?: boolean
} = {}) => {
loading.value = true
try {
const response = await appsApi.getAppList({
page: params.page || currentPage.value,
page_size: params.page_size || pageSize.value,
detail: true,
...params
})
if (response.success) {
appList.value = response.data
totalCount.value = response.total || 0
currentPage.value = params.page || currentPage.value
}
} catch (error) {
console.error('获取应用列表失败:', error)
} finally {
loading.value = false
}
}
const searchApps = async (searchKey: string, searchValue: string, exact: boolean = false) => {
await fetchAppList({
page: 1,
search_key: searchKey,
search_value: searchValue,
search_exact: exact
})
}
return {
marketStats,
appList,
currentPage,
pageSize,
totalCount,
totalPages,
loading,
fetchMarketStats,
fetchAppList,
searchApps
}
})
6.2.4 工具函数 (utils/format.ts)
/**
* 格式化文件大小
*/
export function formatFileSize(bytes: number): string {
if (bytes === 0) return '0 B'
const k = 1024
const sizes = ['B', 'KB', 'MB', 'GB', 'TB']
const i = Math.floor(Math.log(bytes) / Math.log(k))
return Math.round(bytes / Math.pow(k, i) * 100) / 100 + ' ' + sizes[i]
}
/**
* 格式化下载量
*/
export function formatDownloadCount(count: number): string {
if (count >= 100000000) {
return (count / 100000000).toFixed(1) + '亿'
} else if (count >= 10000) {
return (count / 10000).toFixed(1) + '万'
}
return count.toString()
}
/**
* 格式化日期
*/
export function formatDate(date: string | number): string {
const d = new Date(date)
return d.toLocaleDateString('zh-CN', {
year: 'numeric',
month: '2-digit',
day: '2-digit',
hour: '2-digit',
minute: '2-digit'
})
}
/**
* 格式化评分
*/
export function formatRating(rating: number): string {
return rating.toFixed(1)
}
/**
* 获取星级数组
*/
export function getStarArray(rating: number): boolean[] {
const fullStars = Math.floor(rating)
const hasHalfStar = rating % 1 >= 0.5
const stars: boolean[] = []
for (let i = 0; i < 5; i++) {
stars.push(i < fullStars || (i === fullStars && hasHalfStar))
}
return stars
}
附录A:如何获取应用包名
A.1 从华为应用市场网页获取
方法1:从URL中提取
访问华为应用市场应用详情页,URL格式如下:
https://appgallery.huawei.com/app/C1164531384803416384
或者:
https://appgallery.huawei.com/#/app/C1164531384803416384
注意: URL中的是 app_id,不是包名。需要进一步获取包名。
方法2:从网页源码中提取
- 打开应用详情页
- 右键 -> 查看网页源代码
- 搜索
"pkgName"或"packageName" - 找到类似这样的内容:
{
"pkgName": "com.huawei.hmsapp.appgallery",
"appId": "C1164531384803416384",
...
}
方法3:使用浏览器开发者工具
- 打开应用详情页
- 按 F12 打开开发者工具
- 切换到 Network(网络)标签
- 刷新页面
- 筛选 XHR 请求,找到
appinfo相关的请求 - 查看请求的 Response,找到
pkgName字段
示例截图说明:
Network -> XHR -> appinfo
Response:
{
"pkgName": "com.huawei.hmsapp.appgallery",
"name": "应用市场",
...
}
A.2 从安卓设备获取
方法1:使用 ADB 命令
如果你有安卓设备或模拟器:
# 列出所有已安装应用的包名
adb shell pm list packages
# 列出第三方应用
adb shell pm list packages -3
# 搜索特定应用(例如包含 huawei 的)
adb shell pm list packages | grep huawei
# 获取当前运行应用的包名
adb shell dumpsys window | grep mCurrentFocus
输出示例:
package:com.huawei.hmsapp.appgallery
package:com.huawei.browser
package:com.huawei.music
方法2:使用应用信息查看器
在安卓设备上安装 "应用信息查看器" 类的应用,例如:
- Package Name Viewer
- App Inspector
- Dev Tools
这些应用可以直接显示已安装应用的包名。
A.3 批量获取包名的方法
方法1:爬取华为应用市场分类页
import httpx
from bs4 import BeautifulSoup
async def get_apps_from_category(category_id: str):
"""从分类页获取应用列表"""
url = f"https://appgallery.huawei.com/Featured/{category_id}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
# 查找应用链接
app_links = soup.find_all('a', href=True)
app_ids = []
for link in app_links:
href = link['href']
if '/app/' in href:
app_id = href.split('/app/')[-1]
app_ids.append(app_id)
return app_ids
# 使用示例
app_ids = await get_apps_from_category('10000000') # 工具分类
方法2:通过应用ID猜测
华为应用的 app_id 格式为:C + 19位数字
可以通过遍历数字范围来发现应用:
async def guess_app_ids(start: int, end: int):
"""猜测应用ID"""
api = HuaweiAPI()
found_apps = []
for i in range(start, end):
app_id = f"C{i:019d}"
try:
app_data = await api.get_app_info(app_id=app_id)
found_apps.append({
'app_id': app_id,
'pkg_name': app_data['pkgName'],
'name': app_data['name']
})
print(f"找到应用: {app_data['name']} ({app_data['pkgName']})")
except:
pass
return found_apps
# 使用示例
apps = await guess_app_ids(1164531384803416384, 1164531384803416484)
方法3:从已有数据库扩展
如果已经有一些应用数据,可以通过以下方式扩展:
-
同开发者的其他应用
SELECT DISTINCT pkg_name FROM app_info WHERE developer_name = '华为软件技术有限公司' -
同分类的应用
SELECT DISTINCT pkg_name FROM app_info WHERE kind_name = '工具' -
相关推荐应用
- 访问应用详情页,查看"相关推荐"部分
- 提取推荐应用的 app_id
A.4 常见应用包名示例
# 华为系统应用
HUAWEI_SYSTEM_APPS = [
"com.huawei.hmsapp.appgallery", # 应用市场
"com.huawei.browser", # 浏览器
"com.huawei.music", # 音乐
"com.huawei.himovie", # 视频
"com.huawei.camera", # 相机
"com.huawei.health", # 运动健康
"com.huawei.wallet", # 钱包
]
# 热门第三方应用
POPULAR_APPS = [
"com.tencent.mm", # 微信
"com.tencent.mobileqq", # QQ
"com.sina.weibo", # 微博
"com.taobao.taobao", # 淘宝
"com.jingdong.app.mall", # 京东
"com.ss.android.ugc.aweme", # 抖音
]
# 鸿蒙元服务(包名特征)
ATOMIC_SERVICE_PATTERN = "com.atomicservice.*"
A.5 包名命名规范
包名通常遵循以下规范:
格式: com.公司名.应用名
示例:
com.huawei.hmsapp.appgallery- 华为应用市场com.tencent.mm- 腾讯微信com.alibaba.android.rimet- 阿里钉钉
鸿蒙元服务:
com.atomicservice.{19位数字}- 元服务包名格式
A.6 实用工具脚本
从URL批量提取包名
import re
import httpx
from typing import List
async def extract_pkg_names_from_urls(urls: List[str]) -> List[dict]:
"""从URL列表批量提取包名"""
api = HuaweiAPI()
results = []
for url in urls:
# 从URL提取app_id
match = re.search(r'/app/([A-Z0-9]+)', url)
if not match:
continue
app_id = match.group(1)
try:
app_data = await api.get_app_info(app_id=app_id)
results.append({
'url': url,
'app_id': app_id,
'pkg_name': app_data['pkgName'],
'name': app_data['name']
})
except Exception as e:
print(f"处理 {url} 失败: {e}")
return results
# 使用示例
urls = [
"https://appgallery.huawei.com/app/C1164531384803416384",
"https://appgallery.huawei.com/app/C100000000000000001",
]
results = await extract_pkg_names_from_urls(urls)
for r in results:
print(f"{r['name']}: {r['pkg_name']}")
导出包名列表
import csv
from sqlalchemy import select
from app.models.app_info import AppInfo
async def export_pkg_names_to_csv(db: AsyncSession, filename: str = "pkg_names.csv"):
"""导出所有包名到CSV文件"""
result = await db.execute(
select(AppInfo.pkg_name, AppInfo.name, AppInfo.developer_name)
.order_by(AppInfo.name)
)
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['包名', '应用名称', '开发者'])
for row in result:
writer.writerow([row.pkg_name, row.name, row.developer_name])
print(f"已导出到 {filename}")
A.7 注意事项
-
包名唯一性
- 每个应用的包名在华为应用市场中是唯一的
- 同一个应用在不同应用市场的包名相同
-
包名格式验证
import re def is_valid_pkg_name(pkg_name: str) -> bool: """验证包名格式""" pattern = r'^[a-z][a-z0-9_]*(\.[a-z][a-z0-9_]*)+$' return bool(re.match(pattern, pkg_name)) # 示例 print(is_valid_pkg_name("com.huawei.hmsapp.appgallery")) # True print(is_valid_pkg_name("Com.Huawei.App")) # False (大写) print(is_valid_pkg_name("huawei.app")) # False (少于2段) -
元服务识别
def is_atomic_service(pkg_name: str) -> bool: """判断是否为元服务""" return pkg_name.startswith("com.atomicservice.") -
获取频率限制
- 避免过于频繁的请求
- 建议添加延迟:每次请求间隔 0.5-1 秒
- 使用批量处理时注意并发数量
-
数据更新策略
- 优先更新下载量高的应用
- 定期全量同步所有已知包名
- 新发现的包名及时入库
7. 部署指南
7.1 Docker 部署
7.1.1 后端 Dockerfile
# backend/Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
default-libmysqlclient-dev \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
# 安装 Playwright 依赖
RUN apt-get update && apt-get install -y \
libnss3 \
libnspr4 \
libatk1.0-0 \
libatk-bridge2.0-0 \
libcups2 \
libdrm2 \
libxkbcommon0 \
libxcomposite1 \
libxdamage1 \
libxfixes3 \
libxrandr2 \
libgbm1 \
libasound2
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 安装 Playwright 浏览器
RUN playwright install chromium
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
7.1.2 前端 Dockerfile
# frontend/Dockerfile
FROM node:18-alpine as builder
WORKDIR /app
# 复制依赖文件
COPY package*.json ./
# 安装依赖
RUN npm ci
# 复制源代码
COPY . .
# 构建
RUN npm run build
# 生产环境
FROM nginx:alpine
# 复制构建产物
COPY --from=builder /app/dist /usr/share/nginx/html
# 复制 Nginx 配置
COPY nginx.conf /etc/nginx/conf.d/default.conf
EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]
7.1.3 Nginx 配置
# frontend/nginx.conf
server {
listen 80;
server_name localhost;
root /usr/share/nginx/html;
index index.html;
# Gzip 压缩
gzip on;
gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
# 前端路由
location / {
try_files $uri $uri/ /index.html;
}
# API 代理
location /api {
proxy_pass http://backend:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# 静态资源缓存
location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ {
expires 1y;
add_header Cache-Control "public, immutable";
}
}
7.1.4 Docker Compose
# docker-compose.yml
version: '3.8'
services:
mysql:
image: mysql:8.0
container_name: huawei_market_mysql
restart: always
environment:
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
MYSQL_DATABASE: ${MYSQL_DATABASE}
MYSQL_USER: ${MYSQL_USER}
MYSQL_PASSWORD: ${MYSQL_PASSWORD}
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
- ./backend/sql:/docker-entrypoint-initdb.d
command: --default-authentication-plugin=mysql_native_password
networks:
- app_network
backend:
build:
context: ./backend
dockerfile: Dockerfile
container_name: huawei_market_backend
restart: always
environment:
MYSQL_HOST: mysql
MYSQL_PORT: 3306
MYSQL_USER: ${MYSQL_USER}
MYSQL_PASSWORD: ${MYSQL_PASSWORD}
MYSQL_DATABASE: ${MYSQL_DATABASE}
ports:
- "8000:8000"
depends_on:
- mysql
volumes:
- ./backend:/app
networks:
- app_network
frontend:
build:
context: ./frontend
dockerfile: Dockerfile
container_name: huawei_market_frontend
restart: always
ports:
- "80:80"
depends_on:
- backend
networks:
- app_network
volumes:
mysql_data:
networks:
app_network:
driver: bridge
7.1.5 环境变量文件
# .env
MYSQL_ROOT_PASSWORD=root_password_here
MYSQL_DATABASE=huawei_market
MYSQL_USER=market_user
MYSQL_PASSWORD=user_password_here
7.2 部署步骤
7.2.1 准备工作
# 1. 克隆项目
git clone <your-repo-url>
cd huawei-market-crawler
# 2. 创建环境变量文件
cp .env.example .env
# 编辑 .env 文件,填入实际配置
# 3. 创建必要的目录
mkdir -p backend/logs
mkdir -p mysql_data
7.2.2 使用 Docker Compose 部署
# 构建并启动所有服务
docker-compose up -d --build
# 查看服务状态
docker-compose ps
# 查看日志
docker-compose logs -f backend
# 停止服务
docker-compose down
# 停止并删除数据卷
docker-compose down -v
7.2.3 初始化数据库
# 进入 MySQL 容器
docker exec -it huawei_market_mysql mysql -u root -p
# 执行初始化脚本
mysql> USE huawei_market;
mysql> SOURCE /docker-entrypoint-initdb.d/init.sql;
7.2.4 验证部署
# 检查后端健康状态
curl http://localhost:8000/health
# 检查前端
curl http://localhost/
# 测试 API
curl http://localhost:8000/api/market_info
7.3 生产环境优化
7.3.1 使用 Gunicorn 运行后端
# 安装 gunicorn
pip install gunicorn
# 启动命令
gunicorn app.main:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--access-logfile logs/access.log \
--error-logfile logs/error.log \
--log-level info
7.3.2 MySQL 优化配置
# my.cnf
[mysqld]
# 基础配置
max_connections = 500
max_allowed_packet = 64M
# InnoDB 配置
innodb_buffer_pool_size = 2G
innodb_log_file_size = 256M
innodb_flush_log_at_trx_commit = 2
innodb_flush_method = O_DIRECT
# 查询缓存
query_cache_type = 1
query_cache_size = 128M
# 慢查询日志
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2
7.3.3 Nginx 生产配置
# /etc/nginx/sites-available/huawei-market
server {
listen 80;
server_name your-domain.com;
# 重定向到 HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name your-domain.com;
# SSL 证书
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
# SSL 配置
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
# 安全头
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
# 日志
access_log /var/log/nginx/huawei-market-access.log;
error_log /var/log/nginx/huawei-market-error.log;
# 前端
location / {
root /var/www/huawei-market/frontend;
try_files $uri $uri/ /index.html;
}
# API
location /api {
proxy_pass http://127.0.0.1:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时设置
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
}
7.4 监控与维护
7.4.1 日志管理
# app/utils/logger.py
import logging
from logging.handlers import RotatingFileHandler
import os
def setup_logger(name: str, log_file: str, level=logging.INFO):
"""配置日志"""
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 确保日志目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# 文件处理器(自动轮转)
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(formatter)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
7.4.2 健康检查
# app/api/health.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from app.database import get_db
router = APIRouter(tags=["健康检查"])
@router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
"""健康检查"""
try:
# 检查数据库连接
await db.execute(text("SELECT 1"))
return {
"status": "healthy",
"database": "connected",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "unhealthy",
"database": "disconnected",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
7.4.3 性能监控
# 使用 Prometheus + Grafana 监控
# 1. 安装 prometheus-fastapi-instrumentator
pip install prometheus-fastapi-instrumentator
# 2. 在 main.py 中添加
from prometheus_fastapi_instrumentator import Instrumentator
app = FastAPI()
Instrumentator().instrument(app).expose(app)
7.5 备份策略
#!/bin/bash
# backup.sh - 数据库备份脚本
BACKUP_DIR="/backup/mysql"
DATE=$(date +%Y%m%d_%H%M%S)
MYSQL_USER="root"
MYSQL_PASSWORD="your_password"
DATABASE="huawei_market"
# 创建备份目录
mkdir -p $BACKUP_DIR
# 备份数据库
mysqldump -u$MYSQL_USER -p$MYSQL_PASSWORD \
--single-transaction \
--routines \
--triggers \
$DATABASE > $BACKUP_DIR/backup_$DATE.sql
# 压缩备份文件
gzip $BACKUP_DIR/backup_$DATE.sql
# 删除7天前的备份
find $BACKUP_DIR -name "backup_*.sql.gz" -mtime +7 -delete
echo "备份完成: backup_$DATE.sql.gz"
8. 开发建议与最佳实践
8.1 代码规范
- Python: 遵循 PEP 8 规范,使用 Black 格式化
- TypeScript: 使用 ESLint + Prettier
- 提交信息: 遵循 Conventional Commits 规范
8.2 测试策略
# tests/test_crawler.py
import pytest
from app.crawler.huawei_api import HuaweiAPI
@pytest.mark.asyncio
async def test_get_app_info():
api = HuaweiAPI()
data = await api.get_app_info(pkg_name="com.huawei.hmsapp.appgallery")
assert data['pkgName'] == "com.huawei.hmsapp.appgallery"
assert 'name' in data
assert 'appId' in data
await api.close()
8.3 性能优化
-
数据库查询优化
- 使用索引
- 避免 N+1 查询
- 使用连接池
-
缓存策略
- Redis 缓存热门数据
- 前端使用 LocalStorage
-
异步处理
- 使用异步 I/O
- 批量处理数据
8.4 安全建议
-
API 安全
- 添加 API 限流
- 使用 JWT 认证(如需要)
- 输入验证和清洗
-
数据库安全
- 使用参数化查询
- 最小权限原则
- 定期备份
-
爬虫礼仪
- 遵守 robots.txt
- 控制请求频率
- 使用合理的 User-Agent
9. 常见问题 FAQ
Q1: Token 获取失败怎么办?
A:
- 检查网络连接
- 确认 Playwright 浏览器已安装
- 尝试手动访问华为应用市场,检查是否需要验证码
- 增加等待时间
Q2: 数据库连接超时?
A:
- 检查 MySQL 服务是否运行
- 验证连接配置是否正确
- 增加连接池大小
- 检查防火墙设置
Q3: 爬取速度太慢?
A:
- 增加并发数量
- 使用批量处理
- 优化数据库写入
- 考虑使用多台服务器分布式爬取
Q4: 如何处理反爬虫?
A:
- 降低请求频率
- 使用代理IP池
- 模拟真实浏览器行为
- 定期更新 Token
10. 参考资源
- FastAPI 文档: https://fastapi.tiangolo.com/
- Vue 3 文档: https://vuejs.org/
- SQLAlchemy 文档: https://docs.sqlalchemy.org/
- Playwright 文档: https://playwright.dev/python/
- MySQL 文档: https://dev.mysql.com/doc/
附录B:完整项目清单
后端文件清单
backend/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── database.py
│ ├── models/
│ ├── schemas/
│ ├── api/
│ ├── crawler/
│ ├── scheduler/
│ └── utils/
├── tests/
├── logs/
├── requirements.txt
├── .env
├── Dockerfile
└── README.md
前端文件清单
frontend/
├── public/
├── src/
│ ├── assets/
│ ├── components/
│ ├── views/
│ ├── api/
│ ├── stores/
│ ├── types/
│ ├── utils/
│ ├── router/
│ ├── App.vue
│ └── main.ts
├── package.json
├── vite.config.ts
├── tsconfig.json
├── Dockerfile
├── nginx.conf
└── README.md
文档版本: v1.0
最后更新: 2024年
维护者: [Your Name]
许可证: MIT
附录C:原项目中的包名获取策略
原 Rust 项目使用了多种创新的方法来发现和获取应用包名,这些方法非常值得借鉴。
C.1 核心策略概览
原项目提供了 7 个独立工具 用于获取包名和应用数据:
| 工具名 | 用途 | 策略 |
|---|---|---|
guess_market |
应用ID猜测 | 遍历指定范围的应用ID |
guess_rand |
随机猜测 | 随机生成应用ID进行探测 |
guess_from_db |
数据库扩展 | 基于已有数据推测相邻ID |
guess_large |
大规模猜测 | 大范围ID扫描 |
get_nextmax |
第三方数据源 | 从 nextmax.cn 获取 |
read_appgallery |
应用市场爬取 | 直接爬取华为应用市场页面 |
read_pkg_name |
批量导入 | 从文件读取包名列表 |
C.2 方法详解
C.2.1 应用ID猜测法 (guess_market)
原理: 华为应用的 app_id 格式为固定前缀 + 数字,通过遍历数字范围来发现应用。
app_id 格式:
C576588020785 + 7位数字
例如: C5765880207856366961
核心代码逻辑:
// 定义扫描范围
let range = 2000000..=6390000;
let start = "C576588020785";
// 批量处理(每批1000个)
for bunch_id in range_vec.chunks(1000) {
let mut join_set = tokio::task::JoinSet::new();
for id in bunch_id.iter() {
let app_id = format!("{start}{id:07}"); // 格式化为7位数字
// 异步请求华为API
join_set.spawn(async move {
if let Ok(data) = query_app(&client, &api_url, &AppQuery::app_id(&app_id), &locale).await {
// 保存到数据库
db.save_app_data(&data.0, data.1.as_ref(), None, Some(comment)).await
}
});
}
join_set.join_all().await;
tokio::time::sleep(Duration::from_millis(25)).await; // 批次间延迟
}
Python 实现示例:
import asyncio
from typing import List
async def guess_market_apps(
start_prefix: str = "C576588020785",
start_range: int = 2000000,
end_range: int = 6390000,
batch_size: int = 1000
):
"""通过ID猜测发现应用"""
api = HuaweiAPI()
db = Database()
for batch_start in range(start_range, end_range, batch_size):
batch_end = min(batch_start + batch_size, end_range)
tasks = []
for i in range(batch_start, batch_end):
app_id = f"{start_prefix}{i:07d}" # 7位数字,不足补0
tasks.append(try_fetch_app(api, db, app_id))
# 并发执行
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
success_count = sum(1 for r in results if not isinstance(r, Exception))
print(f"批次 {batch_start}-{batch_end}: 成功 {success_count}/{len(tasks)}")
# 批次间延迟
await asyncio.sleep(0.025)
async def try_fetch_app(api: HuaweiAPI, db: Database, app_id: str):
"""尝试获取单个应用"""
try:
app_data = await api.get_app_info(app_id=app_id)
rating_data = await api.get_app_rating(app_id)
await db.save_app_data(app_data, rating_data, comment={
"user": "guess_market",
"method": "id_guessing"
})
print(f"✓ 发现应用: {app_data['name']} ({app_data['pkgName']})")
return True
except Exception as e:
# 应用不存在或请求失败,静默跳过
return False
已知的应用ID前缀:
KNOWN_APP_ID_PREFIXES = [
"C576588020785", # 主要前缀
"C69175", # 另一个前缀系列
# 可以通过分析已有数据发现更多前缀
]
C.2.2 随机猜测法 (guess_rand)
原理: 在已知的ID范围内随机生成ID,提高发现效率。
适用场景:
- ID空间很大,顺序遍历效率低
- 想要快速发现热门应用(通常ID较新)
核心逻辑:
let code_start = 59067092904725_u64;
let size = 85170011059280_u64 - code_start;
let start = "C69175";
loop {
let mut ids: Vec<u64> = Vec::with_capacity(1000);
for _ in 0..1000 {
let id = code_start + (rng.next() % size); // 随机生成
ids.push(id);
}
// 批量处理这些随机ID
// ...
}
Python 实现:
import random
async def guess_random_apps(
prefix: str = "C69175",
start: int = 59067092904725,
end: int = 85170011059280,
batch_size: int = 1000
):
"""随机猜测应用ID"""
api = HuaweiAPI()
db = Database()
while True:
# 生成随机ID批次
random_ids = [
f"{prefix}{random.randint(start, end)}"
for _ in range(batch_size)
]
tasks = [try_fetch_app(api, db, app_id) for app_id in random_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = sum(1 for r in results if r is True)
print(f"随机批次: 成功 {success_count}/{batch_size}")
await asyncio.sleep(0.005)
C.2.3 数据库扩展法 (guess_from_db)
原理: 基于已有的应用ID,推测其相邻的ID可能也是有效应用。
策略:
- 从数据库获取所有已知的 app_id
- 解析每个 app_id 的前缀和数字部分
- 对每个数字,生成 ±1000 的范围
- 合并重叠的范围
- 扫描这些范围
核心逻辑:
// 1. 获取所有已知app_id
let existing_app_ids = db.get_all_app_ids().await?;
// 2. 为每个app_id生成扩展范围
for app_id in existing_app_ids {
if let Some((prefix, numeric_part)) = parse_app_id(&app_id) {
let start_range = numeric_part.saturating_sub(1000);
let end_range = numeric_part.saturating_add(1000);
all_ranges.insert((prefix, start_range, end_range));
}
}
// 3. 合并重叠范围
// 例如: (100, 1100) 和 (500, 1500) 合并为 (100, 1500)
// 4. 扫描合并后的范围
for (prefix, start, end) in merged_ranges {
for id in start..=end {
let app_id = format!("{}{}", prefix, id);
// 尝试获取应用
}
}
Python 实现:
from typing import Tuple, Optional
import re
def parse_app_id(app_id: str) -> Optional[Tuple[str, int]]:
"""解析app_id,返回(前缀, 数字)"""
match = re.match(r'^([A-Z]+)(\d+)$', app_id)
if match:
return match.group(1), int(match.group(2))
return None
async def guess_from_database(expand_range: int = 1000):
"""基于数据库已有数据扩展"""
db = Database()
# 1. 获取所有已知app_id
existing_ids = await db.get_all_app_ids()
# 2. 生成扩展范围
ranges = {}
for app_id in existing_ids:
parsed = parse_app_id(app_id)
if not parsed:
continue
prefix, num = parsed
start = max(0, num - expand_range)
end = num + expand_range
if prefix not in ranges:
ranges[prefix] = []
ranges[prefix].append((start, end))
# 3. 合并重叠范围
merged_ranges = {}
for prefix, range_list in ranges.items():
range_list.sort()
merged = []
current = range_list[0]
for r in range_list[1:]:
if r[0] <= current[1] + 1:
# 重叠或相邻,合并
current = (current[0], max(current[1], r[1]))
else:
merged.append(current)
current = r
merged.append(current)
merged_ranges[prefix] = merged
# 4. 扫描范围
api = HuaweiAPI()
for prefix, range_list in merged_ranges.items():
for start, end in range_list:
print(f"扫描范围: {prefix}{start} - {prefix}{end}")
await guess_market_apps(prefix, start, end)
C.2.4 从文件批量导入 (read_pkg_name)
原理: 从文本文件读取包名列表,批量获取应用数据。
使用方式:
# 创建包名列表文件
cat > pkg_names.txt << EOF
com.huawei.hmsapp.appgallery
com.tencent.mm
com.sina.weibo
EOF
# 运行工具
cargo run --bin read_pkg_name pkg_names.txt
核心代码:
// 从命令行参数获取文件路径
let cli_file = std::env::args().nth(1).ok_or_else(|| anyhow::anyhow!("No file path provided"))?;
// 读取文件中的包名
let pkg_names: Vec<String> = {
let file = std::fs::File::open(&cli_file)?;
let mut reader = std::io::BufReader::new(file);
let mut pkg_names = Vec::new();
let mut line = String::new();
while reader.read_line(&mut line)? > 0 {
pkg_names.push(line.trim().to_string());
line.clear();
}
pkg_names.into_iter()
.map(|l| l.trim_matches('\"').to_string())
.collect()
};
// 批量同步
sync::sync_all(&client, &db, &config).await?;
Python 实现:
async def read_pkg_names_from_file(filepath: str):
"""从文件读取包名并批量获取"""
# 读取包名列表
with open(filepath, 'r', encoding='utf-8') as f:
pkg_names = [
line.strip().strip('"').strip("'")
for line in f
if line.strip()
]
print(f"从文件读取到 {len(pkg_names)} 个包名")
# 批量获取
api = HuaweiAPI()
db = Database()
for i in range(0, len(pkg_names), 100):
batch = pkg_names[i:i+100]
tasks = [
fetch_and_save_app(api, db, pkg_name)
for pkg_name in batch
]
await asyncio.gather(*tasks, return_exceptions=True)
print(f"已处理 {min(i+100, len(pkg_names))}/{len(pkg_names)}")
async def fetch_and_save_app(api: HuaweiAPI, db: Database, pkg_name: str):
"""获取并保存单个应用"""
try:
app_data = await api.get_app_info(pkg_name=pkg_name)
rating_data = await api.get_app_rating(app_data['appId'])
await db.save_app_data(app_data, rating_data)
print(f"✓ {pkg_name}")
except Exception as e:
print(f"✗ {pkg_name}: {e}")
C.2.5 Substance(主题/合集)批量获取
原理: 华为应用市场有"主题"或"合集"功能,一个 substance 包含多个应用。
Substance ID 格式:
例如: webAgSubstanceDetail|12345
核心逻辑:
pub async fn get_app_from_substance(
client: &reqwest::Client,
api_url: &str,
substance_id: impl ToString,
) -> Result<(SubstanceData, JsonValue)> {
// 1. 请求 substance 详情
let body = serde_json::json!({
"pageId": format!("webAgSubstanceDetail|{}", substance_id.to_string()),
"pageNum": 1,
"pageSize": 100,
"zone": "",
"businessParam": { "animation": 0 }
});
let response = client.post(format!("{api_url}/harmony/page-detail"))
.json(&body)
.send()
.await?;
let data = response.json::<JsonValue>().await?;
// 2. 解析卡片数据,提取应用ID
let layouts = data["pages"][0]["data"]["cardlist"]["layoutData"].as_array()?;
let mut apps = Vec::new();
for card in layouts {
match card["type"].as_str()? {
"com.huawei.hmsapp.appgallery.verticallistcard" => {
// 竖向列表卡片
for app in card["data"].as_array()? {
if let Some(app_id) = app.get("appId") {
apps.push(AppQuery::app_id(app_id.as_str()?));
}
}
}
"com.huawei.hmos.appgallery.scenariolistcard.landing" => {
// 场景列表卡片
let refs_list = card["data"][0]["refsList_app"].as_array()?;
for app in refs_list {
if let Some(app_id) = app.get("appId") {
apps.push(AppQuery::app_id(app_id.as_str()?));
}
}
}
_ => {}
}
}
// 3. 如果有更多页,继续获取
if data["hasMore"].as_i64()? != 0 {
let more_apps = get_more_substance(client, api_url, card_id).await?;
apps.extend(more_apps);
}
Ok((SubstanceData { id, title, apps }, data))
}
Python 实现:
async def get_apps_from_substance(substance_id: str) -> List[str]:
"""从主题/合集获取应用列表"""
api = HuaweiAPI()
url = f"{api.base_url}/harmony/page-detail"
body = {
"pageId": f"webAgSubstanceDetail|{substance_id}",
"pageNum": 1,
"pageSize": 100,
"zone": "",
"businessParam": {"animation": 0}
}
tokens = await api.token_manager.get_token()
headers = {
"Content-Type": "application/json",
"Interface-Code": tokens["interface_code"],
"identity-id": tokens["identity_id"]
}
response = await api.client.post(url, json=body, headers=headers)
data = response.json()
app_ids = []
layouts = data["pages"][0]["data"]["cardlist"]["layoutData"]
for card in layouts:
card_type = card.get("type", "")
card_data = card.get("data", [])
if card_type == "com.huawei.hmsapp.appgallery.verticallistcard":
for app in card_data:
if "appId" in app:
app_ids.append(app["appId"])
elif card_type == "com.huawei.hmos.appgallery.scenariolistcard.landing":
if card_data and "refsList_app" in card_data[0]:
for app in card_data[0]["refsList_app"]:
if "appId" in app:
app_ids.append(app["appId"])
# 处理分页
if data.get("hasMore", 0) != 0:
card_id = data["cardlist"]["dataId"]
more_apps = await get_more_substance_pages(api, card_id)
app_ids.extend(more_apps)
return app_ids
async def get_more_substance_pages(api: HuaweiAPI, card_id: str) -> List[str]:
"""获取主题的更多页"""
app_ids = []
page_num = 2
has_more = True
while has_more:
url = f"{api.base_url}/harmony/card-list"
body = {
"dataId": card_id,
"locale": "zh",
"pageNum": page_num,
"pageSize": 25
}
response = await api.client.post(url, json=body)
data = response.json()
has_more = data.get("hasMore", 0) != 0
page_num += 1
for card in data.get("layoutData", []):
if card.get("type") == "com.huawei.hmsapp.appgallery.verticallistcard":
for app in card.get("data", []):
if "appId" in app:
app_ids.append(app["appId"])
return app_ids
C.3 综合策略建议
初始阶段(冷启动):
- 使用
guess_market扫描已知的ID范围 - 从华为应用市场首页爬取热门应用
- 手动收集一些知名应用的包名
扩展阶段:
- 使用
guess_from_db基于已有数据扩展 - 使用
guess_rand随机发现新应用 - 定期从 substance(主题合集)批量获取
维护阶段:
- 定期同步已知包名的数据更新
- 监控新应用ID的出现模式
- 从用户投稿获取新包名
效率优化:
# 组合策略示例
async def comprehensive_discovery():
"""综合发现策略"""
# 1. 先从数据库扩展(成功率高)
await guess_from_database(expand_range=500)
# 2. 扫描热门ID段
await guess_market_apps("C576588020785", 6000000, 6400000)
# 3. 随机探测(发现新应用)
asyncio.create_task(guess_random_apps()) # 后台运行
# 4. 定期同步已知应用
await sync_known_apps()
C.4 注意事项
-
请求频率控制
- 批次间延迟:25-50ms
- 单个请求超时:30秒
- 并发数:建议不超过1000
-
错误处理
- 应用不存在:静默跳过
- 网络错误:重试3次
- Token过期:自动刷新
-
数据去重
- 使用 app_id 或 pkg_name 作为唯一标识
- 插入前检查数据库是否已存在
-
性能监控
- 记录成功率(发现率)
- 监控请求耗时
- 统计每小时发现的新应用数
这些方法的组合使用,使得原项目能够高效地发现和收集华为应用市场的应用数据。