现代化专业群建设网站,网站做戒酒通知书,家具设计师招聘,网站手机网站怎么建立一、引言#xff1a;微博热搜背后的数据价值微博作为中国最大的社交媒体平台之一#xff0c;其热搜榜实时反映了社会热点、网民关注和舆论走向。对于舆情分析、市场研究、内容创作等领域#xff0c;微博热搜数据具有极高的价值。本文将详细介绍如何使用Python最新爬虫技术微博热搜背后的数据价值微博作为中国最大的社交媒体平台之一其热搜榜实时反映了社会热点、网民关注和舆论走向。对于舆情分析、市场研究、内容创作等领域微博热搜数据具有极高的价值。本文将详细介绍如何使用Python最新爬虫技术通过异步请求和API逆向工程构建一个高效的微博热搜采集系统实现定时抓取热搜榜及深度话题内容。二、技术栈概览现代化爬虫工具集在开始编码前让我们了解本次项目将使用的核心技术HTTP客户端httpx支持HTTP/2的异步HTTP客户端性能优于传统requestsaiohttp异步HTTP客户端/服务器框架异步编程asyncioPython原生异步IO框架anyio提供更统一的异步API接口数据解析parsel基于XPath和CSS选择器的数据提取库json处理API返回的JSON数据调度与存储apscheduler高级Python调度器sqlalchemySQL工具包和ORMredis缓存和消息队列反反爬策略请求头轮换IP代理池浏览器指纹模拟三、项目架构设计text微博热搜采集系统 ├── 数据采集层Async Spider │ ├── 热搜榜抓取模块 │ ├── 话题内容深度采集模块 │ └── 反反爬策略模块 ├── 数据存储层Database │ ├── 热搜数据表 │ ├── 话题内容表 │ └── 用户画像表可选 ├── 任务调度层Scheduler │ ├── 定时触发 │ ├── 失败重试 │ └── 任务监控 └── 数据接口层API ├── 实时数据查询 ├── 历史数据分析 └── 数据导出功能四、核心代码实现4.1 环境配置与依赖安装python# requirements.txt httpx0.24.0 aiohttp3.8.0 anyio3.0.0 parsel1.8.0 apscheduler3.10.0 sqlalchemy2.0.0 redis4.5.0 pydantic2.0.0 pandas2.0.0 playwright1.35.0 # 用于复杂JS渲染页面4.2 配置文件设计python# config.py from pydantic_settings import BaseSettings from typing import List, Optional import os class Settings(BaseSettings): # 微博API配置 WEIBO_HOT_API: str https://weibo.com/ajax/statuses/hot_band WEIBO_TOPIC_API: str https://weibo.com/ajax/statuses/topic_band WEIBO_DETAIL_API: str https://weibo.com/ajax/statuses/show # 请求配置 REQUEST_TIMEOUT: int 30 CONCURRENT_LIMIT: int 10 RETRY_TIMES: int 3 # 代理配置 PROXY_ENABLED: bool False PROXY_POOL: List[str] [] # 数据库配置 DATABASE_URL: str sqliteaiosqlite:///weibo_hot.db REDIS_URL: str redis://localhost:6379/0 # 调度配置 COLLECTION_INTERVAL: int 300 # 5分钟 DEEP_CRAWL_INTERVAL: int 3600 # 1小时 # 浏览器模拟配置 USE_BROWSER: bool False BROWSER_PATH: Optional[str] None class Config: env_file .env settings Settings()4.3 数据模型定义python# models.py from sqlalchemy import Column, Integer, String, DateTime, Text, Float, JSON from sqlalchemy.ext.declarative import declarative_base from datetime import datetime import json Base declarative_base() class HotSearch(Base): 热搜榜数据模型 __tablename__ hot_searches id Column(Integer, primary_keyTrue, autoincrementTrue) rank Column(Integer, nullableFalse, comment排名) keyword Column(String(255), nullableFalse, indexTrue, comment关键词) url Column(String(500), comment话题URL) tag Column(String(50), comment标签热、新、爆等) hot_value Column(Integer, comment热度值) category Column(String(100), comment分类) created_at Column(DateTime, defaultdatetime.now, indexTrue) raw_data Column(JSON, comment原始API数据) def __repr__(self): return fHotSearch(rank{self.rank}, keyword{self.keyword}, hot_value{self.hot_value}) class TopicContent(Base): 话题内容数据模型 __tablename__ topic_contents id Column(Integer, primary_keyTrue, autoincrementTrue) topic_id Column(String(100), nullableFalse, indexTrue) keyword Column(String(255), nullableFalse, indexTrue) title Column(String(500), comment话题标题) description Column(Text, comment话题描述) read_count Column(Integer, comment阅读数) discuss_count Column(Integer, comment讨论数) origin_url Column(String(500), comment原始链接) top_posts Column(JSON, comment热门微博内容) related_topics Column(JSON, comment相关话题) crawl_time Column(DateTime, defaultdatetime.now, indexTrue) def __repr__(self): return fTopicContent(keyword{self.keyword}, read{self.read_count}) class CrawlLog(Base): 爬取日志 __tablename__ crawl_logs id Column(Integer, primary_keyTrue, autoincrementTrue) task_type Column(String(50), nullableFalse) status Column(String(20), nullableFalse) # success, failed, partial items_count Column(Integer, default0) error_message Column(Text) created_at Column(DateTime, defaultdatetime.now, indexTrue)4.4 异步爬虫核心类python# spider.py import asyncio import httpx from typing import Dict, List, Optional, Any import json import random from datetime import datetime from dataclasses import dataclass from contextlib import asynccontextmanager import anyio from tenacity import retry, stop_after_attempt, wait_exponential dataclass class RequestConfig: 请求配置数据类 headers: Dict[str, str] timeout: float 30.0 verify_ssl: bool True follow_redirects: bool True class WeiboHotSearchSpider: 微博热搜异步爬虫 def __init__(self, configNone): self.config config or {} self.client None self.request_config self._create_request_config() # 用户代理池 self.user_agents [ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36, ] def _create_request_config(self) - RequestConfig: 创建请求配置 headers { User-Agent: random.choice(self.user_agents), Accept: application/json, text/plain, */*, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, Upgrade-Insecure-Requests: 1, Sec-Fetch-Dest: document, Sec-Fetch-Mode: navigate, Sec-Fetch-Site: none, Sec-Fetch-User: ?1, Cache-Control: max-age0, } return RequestConfig(headersheaders) asynccontextmanager async def get_client(self): 获取异步HTTP客户端 limits httpx.Limits(max_keepalive_connections5, max_connections10) timeout httpx.Timeout(timeoutself.request_config.timeout) async with httpx.AsyncClient( headersself.request_config.headers, timeouttimeout, limitslimits, verifyself.request_config.verify_ssl, follow_redirectsself.request_config.follow_redirects, http2True, # 启用HTTP/2 ) as client: yield client retry(stopstop_after_attempt(3), waitwait_exponential(multiplier1, min4, max10)) async def fetch_hot_search(self) - Dict[str, Any]: 获取微博热搜榜数据 使用微博官方API避免解析HTML api_url https://weibo.com/ajax/statuses/hot_band async with self.get_client() as client: try: # 添加随机延迟避免请求过于频繁 await asyncio.sleep(random.uniform(1, 3)) response await client.get( api_url, params{ source: weibo, type: uid, uid: , _t: int(datetime.now().timestamp() * 1000) } ) response.raise_for_status() data response.json() return self._parse_hot_search_data(data) except httpx.RequestError as e: print(f请求失败: {e}) raise except json.JSONDecodeError as e: print(fJSON解析失败: {e}) raise def _parse_hot_search_data(self, raw_data: Dict) - Dict[str, Any]: 解析热搜榜原始数据 result { timestamp: datetime.now().isoformat(), total: 0, hot_searches: [], topics: [], raw_data: raw_data } if data not in raw_data or band_list not in raw_data[data]: return result band_list raw_data[data][band_list] result[total] len(band_list) for item in band_list: hot_item { rank: item.get(rank, 0), keyword: item.get(word, ), url: fhttps://s.weibo.com/weibo?q{item.get(word_scheme, )}, tag: item.get(label_name, ), hot_value: item.get(num, 0), category: item.get(category, ), is_ad: item.get(is_ad, 0) 1, note: item.get(note, ), real_pos: item.get(realpos, 0), raw_item: item } result[hot_searches].append(hot_item) # 提取话题信息 if mblog in item and item[mblog]: result[topics].append({ topic_id: item[mblog].get(id, ), keyword: hot_item[keyword], title: item[mblog].get(text, )[:100] }) return result retry(stopstop_after_attempt(3), waitwait_exponential(multiplier1, min2, max5)) async def fetch_topic_detail(self, topic_id: str, keyword: str) - Optional[Dict]: 获取话题详细信息 使用微博话题API api_url fhttps://weibo.com/ajax/statuses/topic_band async with self.get_client() as client: try: await asyncio.sleep(random.uniform(0.5, 1.5)) params { q: keyword, page: 1, count: 20, _t: int(datetime.now().timestamp() * 1000) } response await client.get(api_url, paramsparams) response.raise_for_status() data response.json() return self._parse_topic_data(data, keyword) except Exception as e: print(f获取话题详情失败 {keyword}: {e}) return None def _parse_topic_data(self, raw_data: Dict, keyword: str) - Dict[str, Any]: 解析话题数据 result { keyword: keyword, crawl_time: datetime.now().isoformat(), read_count: 0, discuss_count: 0, top_posts: [], related_topics: [], raw_data: raw_data } if data not in raw_data: return result data raw_data[data] # 获取阅读和讨论数 if cardlistInfo in data: card_info data[cardlistInfo] result[read_count] card_info.get(total, 0) result[discuss_count] card_info.get(page_size, 0) # 提取热门微博 if cards in data: for card in data[cards]: if mblog in card: blog card[mblog] post { id: blog.get(id, ), text: blog.get(text, ), user: blog.get(user, {}).get(screen_name, ), reposts: blog.get(reposts_count, 0), comments: blog.get(comments_count, 0), attitudes: blog.get(attitudes_count, 0), created_at: blog.get(created_at, ) } result[top_posts].append(post) return result async def fetch_multiple_topics(self, topics: List[Dict], max_concurrent: int 5) - List[Dict]: 并发获取多个话题详情 semaphore asyncio.Semaphore(max_concurrent) async def fetch_with_semaphore(topic: Dict) - Optional[Dict]: async with semaphore: return await self.fetch_topic_detail( topic.get(topic_id, ), topic.get(keyword, ) ) tasks [fetch_with_semaphore(topic) for topic in topics] results await asyncio.gather(*tasks, return_exceptionsTrue) # 过滤掉异常结果 valid_results [] for result in results: if isinstance(result, dict): valid_results.append(result) elif result is not None: print(f任务返回异常: {type(result).__name__}) return valid_results4.5 数据库操作层python# database.py from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy import select, desc from contextlib import asynccontextmanager from typing import AsyncGenerator, List, Dict, Any import asyncio from models import Base, HotSearch, TopicContent, CrawlLog class DatabaseManager: 异步数据库管理器 def __init__(self, database_url: str): self.engine create_async_engine( database_url, echoFalse, pool_pre_pingTrue, pool_recycle3600, ) self.async_session async_sessionmaker( self.engine, class_AsyncSession, expire_on_commitFalse ) async def init_db(self): 初始化数据库表 async with self.engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) asynccontextmanager async def get_session(self) - AsyncGenerator[AsyncSession, None]: 获取数据库会话 async with self.async_session() as session: try: yield session await session.commit() except Exception: await session.rollback() raise async def save_hot_searches(self, hot_searches: List[Dict]) - int: 保存热搜数据 saved_count 0 async with self.get_session() as session: for item in hot_searches: # 检查是否已存在相同时间点的相同关键词 existing await session.execute( select(HotSearch).where( HotSearch.keyword item[keyword], HotSearch.created_at item[timestamp] ) ) if existing.scalar_one_or_none() is None: hot_search HotSearch( rankitem[rank], keyworditem[keyword], urlitem.get(url, ), tagitem.get(tag, ), hot_valueitem.get(hot_value, 0), categoryitem.get(category, ), created_atitem[timestamp], raw_dataitem.get(raw_item, {}) ) session.add(hot_search) saved_count 1 await session.commit() return saved_count async def save_topic_contents(self, topics: List[Dict]) - int: 保存话题内容 saved_count 0 async with self.get_session() as session: for topic in topics: # 生成topic_id使用关键词和时间戳 topic_id f{topic[keyword]}_{datetime.now().strftime(%Y%m%d%H)} topic_content TopicContent( topic_idtopic_id, keywordtopic[keyword], titletopic.get(title, )[:200], descriptiontopic.get(description, ), read_counttopic.get(read_count, 0), discuss_counttopic.get(discuss_count, 0), origin_urltopic.get(url, ), top_poststopic.get(top_posts, []), related_topicstopic.get(related_topics, []), crawl_timetopic.get(crawl_time, datetime.now()) ) session.add(topic_content) saved_count 1 await session.commit() return saved_count async def get_recent_hot_searches(self, limit: int 50) - List[Dict]: 获取最近的热搜数据 async with self.get_session() as session: result await session.execute( select(HotSearch) .order_by(desc(HotSearch.created_at), HotSearch.rank) .limit(limit) ) searches result.scalars().all() return [{ rank: s.rank, keyword: s.keyword, hot_value: s.hot_value, tag: s.tag, created_at: s.created_at.isoformat() } for s in searches] async def log_crawl_task(self, task_type: str, status: str, items_count: int 0, error_msg: str ): 记录爬取日志 async with self.get_session() as session: log CrawlLog( task_typetask_type, statusstatus, items_countitems_count, error_messageerror_msg ) session.add(log) await session.commit()4.6 定时任务调度器python# scheduler.py from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger from typing import Callable, Dict, Any import asyncio from datetime import datetime import logging class TaskScheduler: 异步任务调度器 def __init__(self): self.scheduler AsyncIOScheduler() self.logger logging.getLogger(__name__) def add_interval_task(self, func: Callable, seconds: int, args: tuple (), kwargs: Dict None): 添加间隔任务 trigger IntervalTrigger(secondsseconds) self.scheduler.add_job( func, trigger, argsargs, kwargskwargs or {}, idf{func.__name__}_{datetime.now().timestamp()}, replace_existingTrue ) self.logger.info(f添加间隔任务: {func.__name__}, 间隔: {seconds}秒) def add_cron_task(self, func: Callable, hour: str *, minute: str 0, args: tuple (), kwargs: Dict None): 添加Cron任务 trigger CronTrigger(hourhour, minuteminute) self.scheduler.add_job( func, trigger, argsargs, kwargskwargs or {}, idf{func.__name__}_cron, replace_existingTrue ) self.logger.info(f添加Cron任务: {func.__name__}, 时间: {hour}:{minute}) async def start(self): 启动调度器 if not self.scheduler.running: self.scheduler.start() self.logger.info(任务调度器已启动) # 保持主程序运行 try: while True: await asyncio.sleep(3600) # 每小时检查一次 except (KeyboardInterrupt, SystemExit): self.shutdown() def shutdown(self): 关闭调度器 if self.scheduler.running: self.scheduler.shutdown() self.logger.info(任务调度器已关闭)4.7 主程序整合python# main.py import asyncio import signal import sys from datetime import datetime from typing import List, Dict, Any import logging from contextlib import AsyncExitStack from config import settings from spider import WeiboHotSearchSpider from database import DatabaseManager from scheduler import TaskScheduler logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(weibo_crawler.log), logging.StreamHandler() ] ) logger logging.getLogger(__name__) class WeiboHotSearchCollector: 微博热搜采集主程序 def __init__(self): self.spider WeiboHotSearchSpider() self.db DatabaseManager(settings.DATABASE_URL) self.scheduler TaskScheduler() self.is_running True # 设置信号处理 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) def signal_handler(self, signum, frame): 信号处理函数 logger.info(f收到信号 {signum}, 准备关闭程序...) self.is_running False async def initialize(self): 初始化程序 logger.info(正在初始化微博热搜采集系统...) # 初始化数据库 await self.db.init_db() logger.info(数据库初始化完成) # 添加定时任务 self.scheduler.add_interval_task( self.collect_hot_searches, settings.COLLECTION_INTERVAL ) # 每小时深度采集一次 self.scheduler.add_cron_task( self.collect_topic_details, hour*, minute30 ) logger.info(定时任务已设置) async def collect_hot_searches(self): 采集热搜榜任务 logger.info(开始采集微博热搜榜...) try: # 获取热搜数据 hot_data await self.spider.fetch_hot_search() # 保存到数据库 saved_count await self.db.save_hot_searches(hot_data[hot_searches]) # 记录日志 await self.db.log_crawl_task( hot_search, success, saved_count ) logger.info(f热搜榜采集完成保存 {saved_count} 条数据) # 提取话题信息供后续深度采集 topics_for_deep_crawl [] for item in hot_data[hot_searches][:10]: # 只取前10个进行深度采集 if raw_item in item and mblog in item[raw_item]: topics_for_deep_crawl.append({ keyword: item[keyword], topic_id: item[raw_item][mblog].get(id, ), rank: item[rank] }) # 缓存话题信息供深度采集使用 # 这里可以存入Redis或内存缓存 self.cached_topics topics_for_deep_crawl except Exception as e: logger.error(f热搜榜采集失败: {e}) await self.db.log_crawl_task( hot_search, failed, error_msgstr(e) ) async def collect_topic_details(self): 深度采集话题详情 if not hasattr(self, cached_topics) or not self.cached_topics: logger.warning(没有可采集的话题信息) return logger.info(f开始深度采集 {len(self.cached_topics)} 个话题...) try: # 并发获取话题详情 topic_details await self.spider.fetch_multiple_topics( self.cached_topics, max_concurrent3 ) # 保存话题内容 saved_count await self.db.save_topic_contents(topic_details) # 记录日志 await self.db.log_crawl_task( topic_detail, success, saved_count ) logger.info(f话题详情采集完成保存 {saved_count} 条数据) except Exception as e: logger.error(f话题详情采集失败: {e}) await self.db.log_crawl_task( topic_detail, failed, error_msgstr(e) ) async def run_once(self): 单次运行所有采集任务 logger.info(执行单次采集任务...) await self.collect_hot_searches() await asyncio.sleep(5) # 等待5秒 await self.collect_topic_details() logger.info(单次采集任务完成) async def run_scheduled(self): 运行定时采集任务 logger.info(启动定时采集任务...) await self.initialize() await self.scheduler.start() async def cleanup(self): 清理资源 logger.info(正在清理资源...) # 这里可以添加其他清理逻辑 logger.info(资源清理完成) async def main(): 主函数 collector WeiboHotSearchCollector() # 使用AsyncExitStack管理资源 async with AsyncExitStack() as stack: try: # 根据命令行参数决定运行模式 if len(sys.argv) 1 and sys.argv[1] --once: await collector.run_once() else: await collector.run_scheduled() # 保持程序运行 while collector.is_running: await asyncio.sleep(1) except KeyboardInterrupt: logger.info(用户中断程序) finally: await collector.cleanup() logger.info(程序正常退出) if __name__ __main__: # 检查Python版本 import sys if sys.version_info (3, 8): print(请使用Python 3.8或更高版本) sys.exit(1) # 运行主程序 asyncio.run(main())4.8 反反爬策略增强版python# anti_anti_spider.py import random import time from typing import Dict, List import hashlib from urllib.parse import urlencode class AntiAntiSpider: 反反爬策略增强类 def __init__(self): self.cookie_pool [] self.proxy_pool [] self.last_request_time 0 self.min_request_interval 1.0 # 最小请求间隔 def generate_fingerprint(self) - Dict[str, str]: 生成浏览器指纹 return { User-Agent: self.get_random_user_agent(), Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, Upgrade-Insecure-Requests: 1, Sec-Fetch-Dest: document, Sec-Fetch-Mode: navigate, Sec-Fetch-Site: none, Pragma: no-cache, Cache-Control: no-cache, } def get_random_user_agent(self) - str: 获取随机User-Agent user_agents [ # Chrome Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36, # Firefox Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/120.0, Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/120.0, # Safari Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15, ] return random.choice(user_agents) def add_delay(self): 添加随机延迟 current_time time.time() elapsed current_time - self.last_request_time if elapsed self.min_request_interval: sleep_time self.min_request_interval - elapsed random.uniform(0.5, 2.0) time.sleep(sleep_time) self.last_request_time time.time() def generate_signature(self, params: Dict, secret: str weibo) - str: 生成请求签名模拟微博签名 # 对参数排序并拼接 sorted_params sorted(params.items(), keylambda x: x[0]) param_str .join([f{k}{v} for k, v in sorted_params]) # 添加密钥并计算MD5 sign_str param_str secret return hashlib.md5(sign_str.encode()).hexdigest() def rotate_proxy(self) - Dict[str, str]: 轮换代理 if not self.proxy_pool: return {} proxy random.choice(self.proxy_pool) return { http: proxy, https: proxy } def simulate_human_behavior(self): 模拟人类行为 # 随机鼠标移动和滚动 behaviors [ lambda: time.sleep(random.uniform(0.1, 0.5)), lambda: None, # 模拟点击 lambda: None, # 模拟滚动 ] random.choice(behaviors)()4.9 数据导出与API接口python# api.py from fastapi import FastAPI, HTTPException, Query, BackgroundTasks from fastapi.responses import JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware import pandas as pd from datetime import datetime, timedelta from typing import Optional, List import json import asyncio from database import DatabaseManager from config import settings app FastAPI(title微博热搜数据API, version1.0.0) # 添加CORS中间件 app.add_middleware( CORSMiddleware, allow_origins[*], allow_credentialsTrue, allow_methods[*], allow_headers[*], ) # 全局数据库管理器 db_manager None app.on_event(startup) async def startup(): 启动事件 global db_manager db_manager DatabaseManager(settings.DATABASE_URL) await db_manager.init_db() app.get(/) async def root(): 根路径 return {message: 微博热搜数据API, status: running} app.get(/api/hot-searches/recent) async def get_recent_hot_searches( limit: int Query(50, ge1, le1000), hours: int Query(24, ge1, le720) ): 获取最近的热搜数据 try: searches await db_manager.get_recent_hot_searches(limit) return { code: 0, message: success, data: searches, count: len(searches) } except Exception as e: raise HTTPException(status_code500, detailstr(e)) app.get(/api/hot-searches/trend) async def get_hot_search_trend( keyword: str Query(..., min_length1), days: int Query(7, ge1, le30) ): 获取关键词的热度趋势 try: # 这里实现趋势查询逻辑 # 由于篇幅限制省略具体实现 return { code: 0, message: success, data: {keyword: keyword, trend: []} } except Exception as e: raise HTTPException(status_code500, detailstr(e)) app.get(/api/topics/{keyword}) async def get_topic_detail( keyword: str, limit: int Query(20, ge1, le100) ): 获取话题详情 try: # 这里实现话题详情查询 return { code: 0, message: success, data: {keyword: keyword} } except Exception as e: raise HTTPException(status_code500, detailstr(e)) app.get(/api/export/csv) async def export_to_csv( start_date: str Query(None), end_date: str Query(None), background_tasks: BackgroundTasks None ): 导出数据为CSV try: # 构建查询条件 # 这里实现数据导出逻辑 # 模拟生成文件 df pd.DataFrame({ rank: range(1, 51), keyword: [f测试关键词{i} for i in range(50)], hot_value: [1000000 - i * 10000 for i in range(50)], time: [datetime.now().isoformat()] * 50 }) filename fweibo_hot_{datetime.now().strftime(%Y%m%d_%H%M%S)}.csv filepath f./exports/{filename} # 确保目录存在 import os os.makedirs(./exports, exist_okTrue) df.to_csv(filepath, indexFalse, encodingutf-8-sig) return FileResponse( filepath, filenamefilename, media_typetext/csv ) except Exception as e: raise HTTPException(status_code500, detailstr(e)) app.get(/api/stats/summary) async def get_stats_summary(): 获取统计摘要 try: # 这里实现统计逻辑 return { code: 0, message: success, data: { total_records: 10000, last_update: datetime.now().isoformat(), top_keywords: [], hot_categories: [] } } except Exception as e: raise HTTPException(status_code500, detailstr(e)) if __name__ __main__: import uvicorn uvicorn.run(app, host0.0.0.0, port8000, reloadTrue)五、部署与运行指南5.1 环境配置bash# 1. 克隆项目 git clone repository-url cd weibo-hot-search-crawler # 2. 创建虚拟环境 python -m venv venv # Windows venv\Scripts\activate # Linux/Mac source venv/bin/activate # 3. 安装依赖 pip install -r requirements.txt # 4. 配置环境变量 cp .env.example .env # 编辑.env文件配置数据库和API设置5.2 运行方式bash# 1. 单次运行测试用 python main.py --once # 2. 启动定时采集 python main.py # 3. 启动API服务 python api.py # 4. 使用Docker运行 docker build -t weibo-crawler . docker run -d --name weibo-crawler weibo-crawler5.3 Docker部署dockerfile# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ gcc \ g \ rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制项目文件 COPY . . # 创建数据目录 RUN mkdir -p /app/exports # 运行应用 CMD [python, main.py]六、优化建议与注意事项6.1 性能优化连接池优化调整HTTP连接池大小避免频繁建立连接缓存策略使用Redis缓存频繁访问的数据批量插入数据库操作使用批量插入减少IO次数6.2 反爬策略应对IP代理使用高质量的代理IP池请求随机化随机化请求间隔和请求头验证码识别集成验证码识别服务浏览器仿真对JS渲染页面使用Playwright6.3 数据质量保证数据去重建立完善的数据去重机制异常检测监控数据异常波动数据校验对采集的数据进行格式和完整性校验6.4 法律与合规遵守Robots协议尊重网站的robots.txt控制请求频率避免对目标网站造成压力数据使用规范遵守相关法律法规合理使用数据用户隐私保护避免采集用户隐私信息七、扩展功能建议实时监控告警对异常热搜进行实时告警情感分析对话题内容进行情感倾向分析舆情分析基于热搜数据进行舆情分析数据可视化构建数据看板直观展示数据多平台对比对比多个社交平台的热点话题