怎么使用wordpress做网站做教育网站的er图

张小明 2026/1/13 6:58:15
怎么使用wordpress做网站,做教育网站的er图,网站建设过程中要怎么打开速度,做网站要用什么软件引言在数字经济时代#xff0c;本地生活服务数据已成为企业决策和用户选择的重要依据。从餐饮点评、酒店预订到生活娱乐#xff0c;这些数据蕴含着巨大的商业价值。本文将介绍如何使用Python最新爬虫技术#xff0c;构建一个高效、稳定的本地生活服务数据采集系统#xff0…引言在数字经济时代本地生活服务数据已成为企业决策和用户选择的重要依据。从餐饮点评、酒店预订到生活娱乐这些数据蕴含着巨大的商业价值。本文将介绍如何使用Python最新爬虫技术构建一个高效、稳定的本地生活服务数据采集系统涵盖异步请求、智能解析、反爬对抗等核心技术。技术栈概览请求库: aiohttp (异步HTTP客户端)解析库: parsel lxml (支持XPath和CSS选择器)浏览器自动化: Playwright (处理动态加载内容)数据存储: PostgreSQL SQLAlchemy ORM代理中间件: 智能代理轮换系统任务调度: Celery Redis (分布式爬虫)数据清洗: Pandas 正则表达式核心代码实现1. 异步基础爬虫类pythonimport asyncio import aiohttp from abc import ABC, abstractmethod from typing import Dict, List, Optional, Any import logging from urllib.parse import urljoin import hashlib import json class AsyncBaseSpider(ABC): 异步爬虫基类 def __init__(self, name: str, concurrency: int 10): self.name name self.concurrency concurrency self.session: Optional[aiohttp.ClientSession] None self.logger logging.getLogger(fspider.{name}) self.semaphore asyncio.Semaphore(concurrency) # 默认请求头 self.default_headers { Accept: text/html,application/xhtmlxml,application/xml;q0.9,image/webp,*/*;q0.8, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, Upgrade-Insecure-Requests: 1, Sec-Fetch-Dest: document, Sec-Fetch-Mode: navigate, Sec-Fetch-Site: none, Sec-Fetch-User: ?1, Cache-Control: max-age0, } async def init_session(self): 初始化aiohttp会话 timeout aiohttp.ClientTimeout(total30) connector aiohttp.TCPConnector( limitself.concurrency * 2, sslFalse, force_closeTrue, enable_cleanup_closedTrue ) self.session aiohttp.ClientSession( connectorconnector, timeouttimeout, headersself.default_headers ) abstractmethod async def start(self, **kwargs): 启动爬虫 pass async def fetch(self, url: str, method: str GET, **kwargs) - Optional[str]: 异步请求页面 async with self.semaphore: try: async with self.session.request( methodmethod, urlurl, **kwargs ) as response: if response.status 200: content await response.read() # 自动检测编码 encoding response.get_encoding() if not encoding: encoding utf-8 return content.decode(encoding, errorsignore) else: self.logger.warning( f请求失败: {url}, 状态码: {response.status} ) return None except Exception as e: self.logger.error(f请求异常 {url}: {str(e)}) return None async def close(self): 关闭会话 if self.session: await self.session.close() def generate_request_id(self, url: str, params: Dict None) - str: 生成请求ID用于去重 data url (json.dumps(params, sort_keysTrue) if params else ) return hashlib.md5(data.encode()).hexdigest() async def __aenter__(self): await self.init_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close()2. 美团商家数据爬虫pythonimport re import random import asyncio from datetime import datetime from typing import List, Dict, Any from parsel import Selector import pandas as pd from playwright.async_api import async_playwright class MeituanSpider(AsyncBaseSpider): 美团商家数据爬虫 def __init__(self, city: str 北京): super().__init__(fmeituan_{city}, concurrency5) self.city city self.base_url https://www.meituan.com self.api_url https://apimobile.meituan.com # 商家数据结构 self.shop_fields [ shop_id, shop_name, address, phone, latitude, longitude, avg_price, score, comment_count, category, business_hours, city, district, business_circle, has_coupon, discount_info, source, crawl_time ] async def start(self, categories: List[str] None, max_pages: int 50): 启动爬虫 if categories is None: categories [美食, 酒店, 休闲娱乐, 生活服务] tasks [] for category in categories: for page in range(1, max_pages 1): task self.crawl_category_page(category, page) tasks.append(task) # 并发执行所有任务 results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理结果 all_shops [] for result in results: if isinstance(result, Exception): self.logger.error(f任务执行失败: {result}) continue if result: all_shops.extend(result) return all_shops async def crawl_category_page(self, category: str, page: int) - List[Dict]: 爬取分类页面 params { utm_source: shopList, ci: self._get_city_code(self.city), uuid: self._generate_uuid(), userid: , limit: 20, offset: str((page - 1) * 20), cateId: self._get_category_id(category), token: , partner: 126, platform: 3, riskLevel: 1, optimusCode: 10, _token: self._generate_token(), } url f{self.api_url}/group/v4/poi/pcsearch/1 headers self._get_api_headers() try: html await self.fetch(url, methodGET, paramsparams, headersheaders) if html: shops self.parse_search_result(html) return shops except Exception as e: self.logger.error(f爬取失败 {category} 第{page}页: {e}) return [] def parse_search_result(self, html: str) - List[Dict]: 解析搜索结果 data json.loads(html) shops [] if data.get(code) 0 and data in data: for item in data[data][searchResult]: shop { shop_id: item.get(id), shop_name: item.get(title, ).strip(), address: item.get(address, ).strip(), phone: item.get(phone, ), latitude: item.get(latitude), longitude: item.get(longitude), avg_price: item.get(avgprice), score: item.get(avgscore), comment_count: item.get(comments), category: item.get(backCateName, ), business_hours: item.get(openinfo, ), city: self.city, district: item.get(areaname, ), business_circle: item.get(frontPoiName, ), has_coupon: bool(item.get(deals)), discount_info: json.dumps(item.get(preferentialInfo, []), ensure_asciiFalse), source: meituan, crawl_time: datetime.now().strftime(%Y-%m-%d %H:%M:%S) } shops.append(shop) return shops async def crawl_shop_detail(self, shop_id: str) - Dict: 爬取商家详情页使用Playwright处理动态内容 detail_url f{self.base_url}/meishi/{shop_id}/ async with async_playwright() as p: # 启动浏览器可配置为无头模式 browser await p.chromium.launch( headlessTrue, args[--disable-blink-featuresAutomationControlled] ) # 创建上下文 context await browser.new_context( viewport{width: 1920, height: 1080}, user_agentself._get_random_user_agent() ) # 创建页面 page await context.new_page() try: # 导航到页面 await page.goto(detail_url, wait_untilnetworkidle, timeout30000) # 等待关键元素加载 await page.wait_for_selector(.dp-header, timeout10000) # 获取页面内容 content await page.content() # 解析详情数据 detail_data self.parse_shop_detail(content) detail_data[shop_id] shop_id return detail_data except Exception as e: self.logger.error(f爬取详情页失败 {shop_id}: {e}) return {} finally: await browser.close() def parse_shop_detail(self, html: str) - Dict: 解析商家详情页 selector Selector(texthtml) detail { shop_images: [], recommended_dishes: [], environment_score: 0, service_score: 0, taste_score: 0, feature_tags: [], facilities: [], parking_info: , reservation_info: , } # 解析图片 images selector.css(.shop-images img::attr(src)).getall() detail[shop_images] [urljoin(self.base_url, img) for img in images] # 解析推荐菜 dishes selector.css(.recommend-dish .dish-name::text).getall() detail[recommended_dishes] [d.strip() for d in dishes if d.strip()] # 解析评分 scores selector.css(.score-info span::text).getall() if len(scores) 3: detail[environment_score] float(scores[0] or 0) detail[service_score] float(scores[1] or 0) detail[taste_score] float(scores[2] or 0) # 解析特色标签 tags selector.css(.feature-tags span::text).getall() detail[feature_tags] [tag.strip() for tag in tags if tag.strip()] return detail def _get_city_code(self, city_name: str) - str: 获取城市代码 city_codes { 北京: 1, 上海: 10, 广州: 20, 深圳: 30, 杭州: 50, 成都: 60, 重庆: 70, 武汉: 80 } return city_codes.get(city_name, 1) def _get_category_id(self, category: str) - str: 获取分类ID category_ids { 美食: 1, 酒店: 12, 休闲娱乐: 5, 生活服务: 3, 购物: 4, 运动健身: 8 } return category_ids.get(category, 1) def _generate_uuid(self) - str: 生成UUID return .join(random.choices(0123456789abcdef, k32)) def _generate_token(self) - str: 生成访问令牌 import time timestamp int(time.time() * 1000) return hashlib.md5(f{timestamp}{random.random()}.encode()).hexdigest() def _get_api_headers(self) - Dict: 获取API请求头 headers { Accept: application/json, Content-Type: application/json, Origin: https://www.meituan.com, Referer: https://www.meituan.com/, User-Agent: self._get_random_user_agent(), X-Requested-With: XMLHttpRequest, } return headers def _get_random_user_agent(self) - str: 获取随机User-Agent user_agents [ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36, Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/537.36, ] return random.choice(user_agents)3. 数据存储与处理pythonfrom sqlalchemy import create_engine, Column, String, Integer, Float, Text, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import pandas as pd from contextlib import contextmanager import logging Base declarative_base() class LocalServiceShop(Base): 本地生活服务商家数据模型 __tablename__ local_service_shops id Column(Integer, primary_keyTrue, autoincrementTrue) shop_id Column(String(100), uniqueTrue, indexTrue, nullableFalse) shop_name Column(String(200), nullableFalse) address Column(Text) phone Column(String(50)) latitude Column(Float) longitude Column(Float) avg_price Column(Float) score Column(Float) comment_count Column(Integer) category Column(String(100)) business_hours Column(String(200)) city Column(String(50)) district Column(String(100)) business_circle Column(String(100)) has_coupon Column(Integer, default0) discount_info Column(Text) source Column(String(50)) environment_score Column(Float) service_score Column(Float) taste_score Column(Float) recommended_dishes Column(Text) feature_tags Column(Text) crawl_time Column(DateTime) create_time Column(DateTime, defaultdatetime.now) update_time Column(DateTime, defaultdatetime.now, onupdatedatetime.now) class DatabaseManager: 数据库管理类 def __init__(self, connection_string: str): self.engine create_engine(connection_string, pool_size20, max_overflow30) self.SessionLocal sessionmaker(bindself.engine, expire_on_commitFalse) self.logger logging.getLogger(database) # 创建表 Base.metadata.create_all(bindself.engine) contextmanager def get_session(self): 获取数据库会话 session self.SessionLocal() try: yield session session.commit() except Exception as e: session.rollback() self.logger.error(f数据库操作失败: {e}) raise finally: session.close() async def batch_insert_shops(self, shops: List[Dict]): 批量插入商家数据 if not shops: return with self.get_session() as session: # 批量插入或更新 for shop_data in shops: # 检查是否已存在 existing session.query(LocalServiceShop).filter_by( shop_idshop_data[shop_id], sourceshop_data[source] ).first() if existing: # 更新现有记录 for key, value in shop_data.items(): if hasattr(existing, key) and key ! id: setattr(existing, key, value) existing.update_time datetime.now() else: # 插入新记录 new_shop LocalServiceShop(**shop_data) session.add(new_shop) self.logger.info(f成功处理 {len(shops)} 条商家数据) def export_to_excel(self, filepath: str, city: str None): 导出数据到Excel with self.get_session() as session: query session.query(LocalServiceShop) if city: query query.filter_by(citycity) df pd.read_sql(query.statement, session.bind) # 数据清洗 df[discount_info] df[discount_info].apply( lambda x: json.loads(x) if x else [] ) # 导出Excel with pd.ExcelWriter(filepath, engineopenpyxl) as writer: df.to_excel(writer, sheet_name商家数据, indexFalse) # 添加统计信息 stats self._calculate_statistics(df) stats_df pd.DataFrame([stats]) stats_df.to_excel(writer, sheet_name统计信息, indexFalse) def _calculate_statistics(self, df: pd.DataFrame) - Dict: 计算统计信息 if df.empty: return {} return { 总商家数: len(df), 平均评分: df[score].mean(), 平均价格: df[avg_price].mean(), 总评论数: df[comment_count].sum(), 分类数量: df[category].nunique(), 有优惠商家数: df[has_coupon].sum(), }4. 分布式任务调度pythonfrom celery import Celery from celery.schedules import crontab import redis from typing import List, Dict class DistributedCrawler: 分布式爬虫调度器 def __init__(self, redis_url: str redis://localhost:6379/0): self.celery_app Celery( crawler_tasks, brokerredis_url, backendredis_url, include[crawler.tasks] ) # 配置Celery self.celery_app.conf.update( task_serializerjson, accept_content[json], result_serializerjson, timezoneAsia/Shanghai, enable_utcTrue, task_routes{ crawl_meituan: {queue: meituan}, crawl_dianping: {queue: dianping}, crawl_eleme: {queue: eleme}, }, beat_schedule{ daily-crawl-meituan: { task: crawl_meituan, schedule: crontab(hour2, minute0), # 每天凌晨2点 args: ([北京, 上海, 广州, 深圳],), }, weekly-full-crawl: { task: crawl_all_platforms, schedule: crontab(hour3, minute0, day_of_week0), # 每周日3点 }, } ) self.redis_client redis.from_url(redis_url) def start_crawler(self, platform: str, cities: List[str], categories: List[str] None): 启动爬虫任务 task_map { meituan: crawl_meituan, dianping: crawl_dianping, eleme: crawl_eleme, } if platform not in task_map: raise ValueError(f不支持的平台: {platform}) task_name task_map[platform] task self.celery_app.send_task( task_name, args[cities, categories], kwargs{} ) return task.id def monitor_progress(self, task_id: str) - Dict: 监控任务进度 task_result self.celery_app.AsyncResult(task_id) # 从Redis获取详细进度 progress_key fcrawler:progress:{task_id} progress self.redis_client.hgetall(progress_key) return { task_id: task_id, status: task_result.status, result: task_result.result if task_result.ready() else None, progress: progress, }5. 反爬虫对抗策略pythonimport random import time from typing import Optional from fake_useragent import UserAgent import asyncio class AntiAntiCrawler: 反反爬虫策略管理器 def __init__(self): self.ua UserAgent() self.proxy_pool [] self.request_history [] self.cookie_jars {} async def get_proxy(self) - Optional[str]: 获取代理IP if not self.proxy_pool: await self.refresh_proxy_pool() if self.proxy_pool: proxy random.choice(self.proxy_pool) # 验证代理可用性 if await self.validate_proxy(proxy): return proxy return None async def refresh_proxy_pool(self): 刷新代理池 # 从代理服务商获取代理 sources [ http://api.proxy.com/v1/proxies, http://proxy-pool.example.com/get, ] for source in sources: try: async with aiohttp.ClientSession() as session: async with session.get(source, timeout10) as response: if response.status 200: data await response.json() self.proxy_pool.extend(data.get(proxies, [])) except Exception as e: print(f获取代理失败 {source}: {e}) # 去重 self.proxy_pool list(set(self.proxy_pool)) async def validate_proxy(self, proxy: str) - bool: 验证代理可用性 try: async with aiohttp.ClientSession() as session: async with session.get( http://httpbin.org/ip, proxyfhttp://{proxy}, timeout5 ) as response: return response.status 200 except: return False def get_random_headers(self, referer: str None) - dict: 生成随机请求头 headers { User-Agent: self.ua.random, Accept: text/html,application/xhtmlxml,application/xml;q0.9,image/webp,*/*;q0.8, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, Connection: keep-alive, Upgrade-Insecure-Requests: 1, } if referer: headers[Referer] referer # 随机添加更多头部信息 extra_headers { Sec-Fetch-Dest: document, Sec-Fetch-Mode: navigate, Sec-Fetch-Site: same-origin, Sec-Fetch-User: ?1, Cache-Control: max-age0, DNT: 1, } if random.random() 0.5: headers.update(extra_headers) return headers def get_random_delay(self, base_delay: float 1.0, random_range: float 2.0) - float: 生成随机延迟 return base_delay random.random() * random_range def rotate_cookie(self, domain: str) - dict: 轮换Cookie if domain not in self.cookie_jars: self.cookie_jars[domain] self._generate_cookies() return self.cookie_jars[domain] def _generate_cookies(self) - dict: 生成随机Cookie cookies { Hm_lvt_ .join(random.choices(abcdef0123456789, k32)): str(int(time.time())), Hm_lpvt_ .join(random.choices(abcdef0123456789, k32)): str(int(time.time())), __guid: .join(random.choices(0123456789abcdef, k32)), monitor_count: str(random.randint(1, 100)), } return cookies6. 主程序入口pythonimport asyncio import argparse import logging from typing import List import sys class MainCrawler: 主爬虫程序 def __init__(self): self.setup_logging() self.logger logging.getLogger(__name__) def setup_logging(self): 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(crawler.log, encodingutf-8), logging.StreamHandler(sys.stdout) ] ) async def run(self, platform: str, cities: List[str], categories: List[str], max_pages: int 10): 运行爬虫 self.logger.info(f开始爬取 {platform} 数据) # 根据平台选择爬虫 spider_map { meituan: MeituanSpider, dianping: DianpingSpider, eleme: ElemeSpider, } if platform not in spider_map: self.logger.error(f不支持的平台: {platform}) return SpiderClass spider_map[platform] all_results [] for city in cities: self.logger.info(f开始爬取城市: {city}) async with SpiderClass(city) as spider: results await spider.start( categoriescategories, max_pagesmax_pages ) all_results.extend(results) self.logger.info(f城市 {city} 爬取完成获取 {len(results)} 条数据) # 避免请求过快 await asyncio.sleep(2) # 保存数据 if all_results: db_manager DatabaseManager(postgresql://user:passwordlocalhost/local_service) await db_manager.batch_insert_shops(all_results) # 导出Excel export_file f本地生活数据_{platform}_{datetime.now().strftime(%Y%m%d_%H%M%S)}.xlsx db_manager.export_to_excel(export_file) self.logger.info(f数据保存完成共 {len(all_results)} 条记录) self.logger.info(f数据已导出到: {export_file}) return all_results def main(): 命令行入口 parser argparse.ArgumentParser(description本地生活服务数据爬虫) parser.add_argument(--platform, -p, requiredTrue, choices[meituan, dianping, eleme], help选择爬取平台) parser.add_argument(--cities, -c, nargs, default[北京], help选择城市列表) parser.add_argument(--categories, -cat, nargs, default[美食, 酒店, 休闲娱乐], help选择分类) parser.add_argument(--pages, typeint, default10, help每类爬取页数) parser.add_argument(--output, -o, defaultoutput.xlsx, help输出文件路径) args parser.parse_args() # 创建并运行爬虫 crawler MainCrawler() try: asyncio.run(crawler.run( platformargs.platform, citiesargs.cities, categoriesargs.categories, max_pagesargs.pages )) except KeyboardInterrupt: print(\n用户中断程序) except Exception as e: print(f程序运行错误: {e}) logging.exception(程序异常) if __name__ __main__: main()部署与优化建议1. Docker容器化部署dockerfile# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ gcc \ g \ libpq-dev \ wget \ gnupg \ wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ sh -c echo deb [archamd64] http://dl.google.com/linux/chrome/deb/ stable main /etc/apt/sources.list.d/google.list \ apt-get update \ apt-get install -y google-chrome-stable \ rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制代码 COPY . . # 运行爬虫 CMD [python, main.py, --platform, meituan, --cities, 北京, 上海]2. 性能优化建议连接池优化: 调整aiohttp的连接池参数内存管理: 使用生成器处理大数据量缓存策略: Redis缓存已爬取数据错误重试: 实现指数退避重试机制监控告警: 集成Prometheus监控法律与道德声明遵守robots.txt: 尊重网站的爬虫协议限制爬取频率: 避免对目标网站造成压力数据使用规范: 仅用于学习和研究目的隐私保护: 不爬取用户个人信息版权尊重: 遵守数据版权规定
版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

如何设计好酒店网站模板在哪个网站做外贸生意好

如何快速优化Windows右键菜单:告别卡顿提升操作效率 【免费下载链接】ContextMenuManager 🖱️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 你是否曾经在整理文件时,被缓慢的右键…

张小明 2026/1/9 4:50:17 网站建设

如何让域名指向网站网站备案取名

大模型微调是让通用预训练模型适配特定任务的核心技术,分为全量微调与参数高效微调(PEFT)两大类。对零基础学习者而言,PEFT方法因低资源需求、易上手的优势成为首选。以下详细解析7种主流微调方法,并梳理极简入门流程&…

张小明 2026/1/10 16:16:57 网站建设

网页设计与网站建设设计报告厦门建设网站

FlutterFire推送通知合规性实战终极指南:从风险识别到持续优化的完整框架 【免费下载链接】flutterfire firebase/flutterfire: FlutterFire是一系列Firebase官方提供的Flutter插件集合,用于在Flutter应用程序中集成Firebase的服务,包括身份验…

张小明 2026/1/10 18:31:44 网站建设

阿里云最低服务器可以做几个网站微信wap网站开发

GovernmentPolicy政策文件归档:HunyuanOCR辅助建立知识库 在各级政府加速推进数字化转型的今天,一个现实而紧迫的问题摆在面前:成千上万份纸质或扫描版的政策文件沉睡在档案柜中,无法被有效检索、难以进行语义关联,更谈…

张小明 2026/1/10 7:51:07 网站建设

有关网站开发的文献综述建筑招投标信息网

手把手教你用Lora微调Qwen3-VL模型实现LaTeX公式OCR识别 【免费下载链接】self-llm 项目地址: https://gitcode.com/GitHub_Trending/se/self-llm 还在为复杂的数学公式识别而烦恼吗?今天我们就一起来探索如何通过Lora微调技术,让Qwen3-VL模型在…

张小明 2026/1/7 23:10:19 网站建设