获取
—————————依旧置顶行。
无框架;淘汰版;混乱式;搅屎棍。
不是给人家看的,这下可以乱写了吧!!!
。。其实一直都在乱写啊。
等结束了就可以把仓库删了。
嗯。碎贴符合笔记需求。
r = 读文本
rb = 读图片
w = 写文本(覆盖)
wb = 写图片(覆盖)
a = 追加文本
ab = 追加二进制
ctrl+Y 反撤回
save
CSV = Comma-Separated Values意思是:用逗号分隔数据的表格。可以用 Excel / WPS / 记事本 直接打开。
with open("菜价.csv", mode="w", encoding="utf-8", newline="") as f:
csvWriter = csv.writer(f)
csvWriter.writerow(["菜品", "价格", "市场"])
csvWriter.writerow(["白菜", 2.5, "朝阳市场"])
save
with open('high_like_comments.json', 'w', encoding='utf-8') as f:
json.dump(all_comments, f, ensure_ascii=False, indent=2)
save
os.makedirs(save_dir, exist_ok=True)
save
save_path = os.path.join(save_dir, decoded_soup_name)
if not os.path.exists(save_path): 没有才写入,有 说明已有
with open(save_path, "wb") as f:
f.write(grand_response.content)
save
with open(f"Imgs/NASA_imgs/{img_name}", "wb") as f:
f.write(img_response.content)
save
with open(DATA_SAVE_PATH, "w", encoding="utf-8") as f:
json.dump(jobs, f, ensure_ascii=False, indent=2) 显示中文,缩进两格
regular
^ $ 字符串
( ) 捕获组
(?: ) 非捕获分组
(?! ) 排除
(?= ) 包含
(?P<名字>内容) 命名分组 (?P<group_name>.*?)
^ 除了
\d 数字
\w 字母、数字、下划线
\d{3}-\d{4}
url\("([^"]+)"\)
\s 空白符
+ 至少有一个或多个
* 可以有零个或多个
{2,} 至少2个 [a-zA-Z]{2,}
? 可选 (?: )? 整个分组是可选的
.*? 非贪婪匹配(用 . 匹配除了换行符之外的任意字符),直到接下去的文本被匹配到。
re.S 或 re.DOTALL 让 . 可以匹配换行符
S = Short for DOTALL
re.MULTILINE 让 ^ 和 $ 能匹配【每一行】的开头和结尾,而不是只匹配【整个文本】的开头和结尾。
re.sub = 查找并替换
regular
pattern = r'^\d{3}-\d{4}$'
re.match(pattern, "123-4567")
current_url = re.sub(r"currentPage=\d+", f"currentPage={page_num}", TARGET_URL)
命名分组:当一个式子里面有多个捕获组时好用:
re.compile(r"(姓名)(.*?)(年龄)(.*?)(电话)(.*?)")
这时 group(1)、(2)、(3)各自是姓名、年龄、电话
就可以用命名分组。
obj = re.compile(r"(姓名)(?P<姓名>.*?)(年龄)(?P<年龄>.*?)(电话)(?P<电话>.*?)")
match = obj.search(html)
match.group('名字')
match.group('年龄')
match.group('电话')
pattern = r"""
<li>
(?P<name>.*?) # 组1:电影名
.*?
(?P<url>http.*?) # 组2:链接
.*?
(?P<score>\d+\.\d) # 组3:评分
</li>
"""
^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9-]+\.[a-zA-Z]{2,}(?:\.[a-zA-Z]{2,})?$
^[a-zA-Z0-9._%+-]+@m\.gduf\.edu\.cn$
<a\s+.*?href="([^"]+)".*?>([^<]+)</a>
re.search(r'"token"\s*:\s*"([^"]+)"', html)
re.search(r'token\s*:\s*\'([^\']+)\'', html)
re.search(r'window\.token\s*=\s*"([^"]+)"', html)
visit
from urllib.request import urlopen
from urllib.parse import quote
query = quote("清华大学开源软件镜像站")
url = f"https://cn.bing.com/search?q={query}"
response = urlopen(url)
print(response.read())
response.close()
visit
dict = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0"
}
response = requests.get(url, headers=dict)
print(response)
print(response.text)
response.close()
visit; save cookie
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, slow_mo=300, executable_path=EDGE_PATH, args=["--disable-popup-blocking"])
page = browser.new_page()
page.set_viewport_size({"width": 400, "height": 300})
page.goto(TARGET_URL, wait_until="domcontentloaded")
with open(COOKIE_PATH, 'r', encoding='utf-8') as f:
cookies = eval(f.read())
page.context.add_cookies(cookies)
page.reload(wait_until="domcontentloaded")
else:
cookies = page.context.cookies()
with open(COOKIE_PATH, 'w', encoding='utf-8') as f:
f.write(str(cookies))
page.wait_for_load_state("networkidle")
page_source = page.content()
soup = BeautifulSoup(page_source, 'html.parser')
status
状态码:response.status_code
response.raise_for_status() # 状态码非200时抛出异常
target
# /html/body/div/div/div[2]/div[1]/div[1]
#
# <div class="head-img"
# style=("background-image: "
# "url(https://s3gw.cmbimg.com/sc/JonWtH3SmRv7WFuP1lJ3SUn0GCs=/SZ::"
# "ZnQ3YzJfaM7bQJjuEjZjYRZz)
# ;")>
# </div>
visit
import playwright.sync_api as playwright
with playwright.sync_playwright() as p:
browser = p.chromium.launch(headless=False,channel="msedge")
page = browser.new_page()
page.goto(target_url)
visit
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, channel="chrome")
page = browser.new_page()
page.set_viewport_size({"width": 400, "height": 300})
page.goto(TARGET_URL, wait_until="networkidle")
page.wait_for_timeout(1000)
page_html = page.content()
browser.close()
return page_html
regular
(style) => {
const match = style.match(/url\("([^"]+)"\)/);
return match ? match[1] : null;
}
body
head_img_div = page.wait_for_selector("div.head-img", timeout=10000)
style_attr = head_img_div.get_attribute("style")
match_quote = page.evaluate(r"""(style) => {const match = style.match(/url\("([^"]+)"\)/);return match ? match[1] : null;}""",style_attr )
print(f"URL:{match_quote}")
browser.close()
body
def parse_bank_info(page_html):
soup = BeautifulSoup(page_html, "lxml")
result_list = []
target_container = soup.find("div", class_="yhksw_bankAd")
items = target_container.find_all("div", class_="imagesdiv yhzpw_index_sec")
for idx, item in enumerate(items, 1):
href = item.find("a").get("href", "") if item.find("a") else ""
title = item.find("img").get("alt", "") if item.find("img") else ""
img_url = img_tag.get("src", "") if img_tag else ""
result_list.append({
"序号": idx,
"标题": title,
"链接": href,
"图片URL": img_url
})
return result_list
token;远程百度翻译
visit
import requests
import re
import hashlib
import time
import json
def get_baidu_token():
url = ""
headers = {
"User-Agent": "",
"Accept": "",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2"
}
response = requests.get(url, headers=headers)
response.close()
regular
token_match = re.search(r'"token"\s*:\s*"([^"]+)"', response.text)
if token_match:
return token_match.group(1)
special; body; fail or i don’t know
def generate_baidu_sign(query):
"""生成 sign 和 salt(时间戳)"""
sign_key = "fanyideskweb"
key = "Ygy_4c=r#e#4EX^NUGUc5"
salt = str(int(time.time() * 1000))
sign_str = f"{sign_key}{query}{salt}{key}".encode("utf-8")
return hashlib.md5(sign_str).hexdigest(), salt
query = input("请输入要翻译的内容:")
token = get_baidu_token()
if token:
sign, salt = generate_baidu_sign(query)
url = "https://fanyi.baidu.com/v2transapi"
data = {
"from": "auto",
"to": "en",
"query": query,
"transtype": "realtime",
"simple_means_flag": "3",
"sign": sign,
"salt": salt,
"token": token,
"domain": "common"
}
headers = {
"User-Agent": "",
"Content-Type": "application/x-www-form-urlencoded",
"Referer": "https://fanyi.baidu.com/mtpe-individual/transText",
"Cookie": ""
}
response = requests.post(url, data=data, headers=headers)
if response.status_code == 200:
result = response.json()
print("\n翻译结果:")
print(json.dumps(result, ensure_ascii=False, indent=2))
if "trans_result" in result and result["trans_result"].get("data"):
print("\n简洁结果:", result["trans_result"]["data"][0]["dst"])
regular;
finditer = find + iterator(一个迭代器,可以用 for 循环挨个拿结果。)
sentence = ""
results = re.finditer(r'\d+', sentence)
for match in results:
print(match.group())
regular
obj1 = re.compile(r"热片.*?<ul>(.*?)</ul>", re.S)
命名分组
obj1 = re.compile(r"热片.*?<ul>(?P<ul>.*?)</ul>", re.S)
for match in obj1.finditer(html):
ul = match.group('ul')
print(ul)
visit
url里的参数可以重新封装;ElementTree对象
url = "https://movie.douban.com/top250"
param = {
"start": 0,
"filter": ""
}
headers={
"User-Agent":""
}
response = requests.get(url, params=param, headers=headers)
print(response.request.headers)
print(response.request.url)
response.close()
第二种
resp = requests.get(url, varify=False)
resp.encoding = 'utf-8'
html = resp.text # 或者:html = etree.HTML(resp.text)
visit; body; csv
import csv
import requests
from bs4 import BeautifulSoup
url = ""
response = requests.get(url)
open("菜价.csv", "w", encoding="utf-8-sig", newline="")
csvWriter = csv.writer(f)
page = BeautifulSoup(response.text, "html.parser")
dataname = page.find("table", attrs={"class": "hq_table"})
trs = dataname.find_all("tr")[1:]
for tr in trs:
tds = tr.find_all("td")
name = tds[0].text
avg = tds[1].text
kind = tds[2].text
print(name, avg, kind)
csvWriter.writerow([name, avg, kind])
f.close()
xml;xpath
xpath 是一门语言:在 xml文档中搜索内容的一门语言。xpath路径 可以在网页源代码 copy xpath,但是记得copy对,有些是hidden标签。
html是xml的一个子集,所以可以在 html 上使用 xpath。
先安装 lxml:pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/lxml
target
# XML字符串,
xml = '''
<book>
<id>1</id>
<name>野花遍地香</name>
<price>1.23</price>
<nick>臭豆腐</nick>
<author>
<nick id="10086">周大强</nick>
<nick id="10010">周芷若</nick>
<nick class="joy">周杰伦</nick>
<nick class="jolin">蔡依林</nick>
<div>
<nick>惹了</nick>
</div>
</author>
</book>
'''
from lxml import etree
使用 xpath
第一个/是根节点。//是后代的意思。
写法 1:拿所有 nick
写法 2:拿 author 下面的所有 nick
写法 3:拿带 id=“10086” 的 nick
写法 4:拿 author 下面 div 里的 nick
tree = etree.XML(xml)
# ======================
result = tree.xpath("//nick/text()")
result = tree.xpath("/book/author/nick/text()")
result = tree.xpath("//nick[@id='10086']/text()")
result = tree.xpath("/book/author/div/nick/text()")
result = tree.xpath("/book") 只打印标签本身(是元素对象列表),
输出大概长这样:[<Element book at 0xxxxx>]
# 路径之间有 * 则是通配符
# . 是在该路径内包裹的东西中继续查找
例如,要在 author 里面 找 nick:
author = tree.xpath("//author")[0] 找到所有 <author> 标签,拿出第 1 个 <author> 标签
author.xpath(".//nick")
对于 html 也是一样的:
html = etree.HTML(response.text)
divs = html.xpath("......")
for div in divs:
price = div.xpath(".//div[@class='price']")[0]
title = "saas".join(div.xpath("./div/div/a[2]/text()"))
visit;selenium;百度
from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
def test_edge_browser():
edge_driver_path = "msedgedriver.exe"
try:
service = Service(edge_driver_path)
driver = webdriver.Edge(service=service)
driver.get("https://www.baidu.com")
wait = WebDriverWait(driver, 10)
body
except Exception as e:
print(f"测试过程中出错:{str(e)}")
finally:
driver.quit()
body;do
By.ID, “kw” # 百度搜索框
By.ID, “su” # 百度搜索按钮
search_box = wait.until(
EC.presence_of_element_located((By.ID, "kw"))
)
search_box.send_keys("输入内容")
search_button = wait.until(
EC.element_to_be_clickable((By.ID, "su"))
)
search_button.click()
time.sleep(3)
results = driver.find_elements(By.CSS_SELECTOR, "h3.t a")
<h3 class="t">
<a>我是搜索结果标题</a>
</h3>
print(f"\n找到{len(results)}个搜索结果:")
for i, result in enumerate(results[:5], 1): # 编号设为从一开始,但一直是取前五条
print(f"{i}. {result.text}")
随机延时代码
time.sleep(random.uniform(2, 4))
page.wait_for_timeout(random.randint(1000, 2000))
body
soup = BeautifulSoup(response.text, 'html.parser')
rows = soup.find_all('tr')
for row_idx, row in enumerate(rows, 1):
tds = row.find_all('td')
for td_idx, td in enumerate(tds, 1):
if 'style' in td.attrs and 'line-height:180%' in td['style']:
a_tags = td.find_all('a')
for a in a_tags:
link = a.get('href')
full_link = 'https://wiki.mbalib.com/' + link
child_response = requests.get(full_link, headers=headers)
child_soup = BeautifulSoup(child_response.text, 'html.parser')
catmore_div = child_soup.find('div', attrs={"class": "boilerplate", "id": "catmore"}
<div class="boilerplate" id="catmore">
我就是要找的那个区块!
</div>
href = catmore_div.find('a').get('href')
body
经典第一次,NASA
page = BeautifulSoup(response.text, "html.parser")
alist = page.find_all("a", attrs={"class": "elementor-post__thumbnail__link"})
for i, a_tag in enumerate(alist, 1):
href = a_tag.get("href")
if href and href.startswith(('http://', 'https://')):
child_page_response = requests.get(href)
child_page_response.encoding = 'utf-8'
child_page = BeautifulSoup(child_page_response.text, "html.parser")
for a_tag in child_page.find_all("a", href=True):
# 算了,我妥协了,我不拿那个高清的href的图片地址了,我拿那个a包裹的 < img src = > 的 src的地址
if a_tag.find("img") and a_tag.get("href", "").lower().endswith(".jpg"):
print(a_tag.get("href"))
改为!!!
img_tag = a_tag.find("img")
if img_tag and img_tag.get("src", "").lower().endswith(".jpg"):
target_img = img_tag
切片成列表
encoded_soup_name = grand_href.split("/")[-1]
text.split(",")
.startswith(”/”)
.endswith(“.html”)
target_a = None
target_img = None
target
有的网站直接把数据库数据用 var_dump 输出在页面上:PHP 的 var_dump () 打印出来的数组:
array(3) {
[0]=> array(11) {
["id"]=> string(1) "1"
["name"]=> string(5) "小白"
["age"]=> string(2) "20"
}
[1]=> array(11) {
["id"]=> string(1) "2"
["name"]=> string(5) "小黄"
["age"]=> string(2) "22"
}
[2]=> array(11) {
["id"]=> string(1) "3"
["name"]=> string(5) "小红"
["age"]=> string(2) "25"
}
}
body
ranking_data = []
data_pattern = re.compile( r"array\(\d+\) \{\s*((?:\[\d+\]=>\s*array\(11\) \{(?:.|\s)*?\}\s*)*)\}", re.MULTILINE)
item_pattern = re.compile(r"\[\d+\]=>\s*array\(11\) \{\s*(.*?)\s*\}", re.DOTALL)
key_value_pattern = re.compile(r'\["(\w+)"\]=>\s*string\(\d+\) "([^"]*)"')
data_match = data_pattern.search(response.text)
items = item_pattern.findall(data_match.group(1))
for item in items:
job_info = {}
key_values = key_value_pattern.findall(item)
for key, value in key_values:
job_info[key] = value
if all(k in job_info for k in ["job_id", "job_name", "pub_nums", "salary"]):
ranking_data.append(job_info)
visit
PAGE_RANGE = range(0, 1) # 抓取第1页
page_html_list = []
with sync_playwright() as p:
browser = p.chromium.launch(
headless=False, channel="chrome",
args=["--no-sandbox", "--disable-blink-features=AutomationControlled"]
)
page = browser.new_page()
这豆包怎么如此平易近人:
超级大白话:
新建一个空白标签页!
page = 这个标签页的操作对象
你之后所有操作都是用这个 page:
page.goto("https://www.baidu.com")
page.click("按钮")
page.fill("输入框", "内容")
user_agents = [ "", "" ]
page.set_extra_http_headers({"User-Agent": random.choice(user_agents)})
for page_num in PAGE_RANGE:
current_url = re.sub(r"currentPage=\d+", f"currentPage={page_num}", TARGET_URL)
page.goto(current_url, wait_until="networkidle")
idle→ 空闲,"networkidle0"完全网络空闲,0个请求
page.wait_for_selector('a[data-nick="job-detail-job-info"]', timeout=10000)
<a data-nick="job-detail-job-info">
page_html = page.content()
page_html_list.append(page_html)
browser.close()
body
for page_idx, page_html in enumerate(page_html_list, 1):
soup = BeautifulSoup(page_html, "lxml")
job_links = soup.find_all('a', attrs={"data-nick": "job-detail-job-info"})
for idx, job_link in enumerate(job_links, 1):
salary_elem = job_link.find("span", class_=re.compile(r"job-salary"))
salary = salary_elem.get_text().strip()
location = ""
location_box = job_link.find("div", class_=re.compile(r"job-dq-box"))
if location_box:
location_text = location_box.get_text().strip()
location_match = re.search(r"【([^】]+)】", location_text)
location = location_match.group(1).strip()
target
<div
class="joblist-item-job"
sensorsdata='{
"jobId": "123456",
"jobTitle": "Python开发工程师",
"jobSalary": "15-25K·13薪",
"jobArea": "北京·朝阳区",
"jobYear": "3-5年",
"jobDegree": "本科"
}'
>
</div>
<div class="joblist-item-job"> <!-- ← y 就是这个 -->
<a class="cname" href="https://xxx">腾讯科技</a>
<span class="dc">互联网</span>
<span class="dc">上市公司</span>
<span class="dc">1000-9999人</span>
<div class="tags">
<div class="tag">五险一金</div>
<div class="tag">带薪年假</div>
<div class="tag">双休</div>
</div>
<button class="apply" onclick="goApply('https://url……')">申请</button>
</div>
body
soup = BeautifulSoup(page_html, "lxml")
recruit_list = []
job_items = soup.find_all("div", class_="joblist-item-job")
for idx, y in enumerate(job_items, 1):
sensors_data = y.get("sensorsdata")
i_json = json.loads(sensors_data.replace(""", "\"")) loads是将json字符串转成py字典
i_id = i_json.get("jobId", "")
i_title = i_json.get("jobTitle", "")
i_salary = i_json.get("jobSalary", "")
i_area = i_json.get("jobArea", "")
i_year = i_json.get("jobYear", "")
i_degree = i_json.get("jobDegree", "")
company_name = y.find("a", class_="cname").get_text(strip=True) if y.find("a", class_="cname") else ""
company_url = y.find("a", class_="cname")["href"] if (y.find("a", class_="cname") and "href" in y.find("a", class_="cname").attrs) else ""
company_industry = y.find_all("span", class_="dc")[0].get_text(strip=True) if len( y.find_all("span", class_="dc")) > 0 else ""
company_type = y.find_all("span", class_="dc")[1].get_text(strip=True) if len( y.find_all("span", class_="dc")) > 1 else ""
tags = [tag.get_text(strip=True)+"," for tag in y.find_all("div", class_="tag")] if y.find("div", class_="tags") else []
apply_url = y.find("button", class_="apply")["onclick"].split("'")[1] if (y.find("button", class_="apply") and "onclick" in y.find("button", class_="apply").attrs) else ""
target
<li> ← 父级(这就是 finance_elem 拿到的)
└── <a>金融</a> ← 这是 finance_a
body; save
finance_elem = None
finance_a = soup.find('a', string=lambda t: t and '金融' in t.strip())
finance_elem = finance_a.find_parent('li')
pattern = re.compile(r'<a\s+.*?href="((?!java).+?)".*?>([^<]+)</a>', re.DOTALL)
matches = pattern.findall(str(finance_elem))
y_set = set()
with open(save_path, 'w', encoding='utf-8') as f:
for x, y in matches:
decoded_x = unquote(x.strip(), encoding='utf-8')
y_set.add(decoded_y)
f.write(f'<a x="{decoded_x}">{decoded_y}</a><br>\n')
browser.close()
visit; session
session = requests.session()
data = {
"loginName":"",
"password":""
}
url = ""
session.post(url, data = data)
resp = session.get("https://user.17k.com/......", headers={ "Cookie":""})
print(resp.text)
print(resp.cookies)
visit; change proxies
proxies = {
"http": "http://127.0.0.1:1080",
}
resp = requests.get("url", proxies=proxies)
多协程:
import asyncio
import time
async def func1():
print("helloTask111.")
# 不能加同步操作,否则异步会中断。改成这个,要求协程找没有睡觉的程序继续干活
await asyncio.sleep(2)
print("helloTask111.")
async def func2():
print("helloTask222.")
await asyncio.sleep(2)
print("helloTask222.")
async def func3():
print("helloTask333.")
await asyncio.sleep(2)
print("helloTask333.")
async def main():
t1 = time.time()
await asyncio.gather(
func1(),
func2(),
func3()
)
t2 = time.time()
print(f"总耗时: {t2 - t1:.2f}秒")
if __name__ == '__main__':
asyncio.run(main())
多协程:
task = asyncio.create_task(函数)
tasks.append(task)
await asyncio.gather(*tasks)
多线程:
from threading import Thread
def func1():
for i in range(0, 11):
print("func1: helloWorld.", i)
def func2():
for i in range(0, 11):
print("func2: helloWorld.", i)
if __name__ == '__main__':
t1 = Thread(target=func1)
t1.start()
t2 = Thread(target=func2)
t2.start()
for i in range(0,11):
print("Main: helloWorld.",i)
可以传参:
def func1(name):
for i in range(0, 11):
print(f"func1: {name}", i)
if __name__ == '__main__':
t1 = Thread(target=func1, args=("周杰伦",))# 必须是元组
t1.start()
线程锁:
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100):
lock.acquire()
counter += 1
lock.release()
(推荐使用 with lock:)
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join() # 等待线程结束,可以设时限,t.join(timeout=2)
print(f"期望值: 200, 实际值: {counter}") # 输出正确: 200
线程锁在爬虫的应用; csv; save
import threading
import csv
file_lock = threading.Lock()
f = open("data.csv", "w")
csvwriter = csv.writer(f)
def save_data(data):
with file_lock: # 加锁保证一次只有一个线程写入
csvwriter.writerow(data)
f.flush()
多线程之队列、消费者模式:(没仔细看)
import threading
import queue
import time
import random
class CrawlerProducerConsumer:
def __init__(self):
self.data_queue = queue.Queue(maxsize=100)
self.stop_flag = False
def producer(self, producer_id):
for i in range(5):
time.sleep(random.random())
data = f"生产者{producer_id}爬取的数据-{i}"
self.data_queue.put(data)
print(f"[生产者{producer_id}] 生产: {data},队列大小: {self.data_queue.qsize()}")
print(f"[生产者{producer_id}] 完成生产")
def consumer(self):
while not (self.stop_flag and self.data_queue.empty()):
try:
data = self.data_queue.get(timeout=1)
time.sleep(0.5)
print(f"[消费者] 消费: {data},剩余: {self.data_queue.qsize()}")
self.data_queue.task_done()
except queue.Empty:
continue
print("[消费者] 完成消费")
def run(self):
producers = []
for i in range(3): # 3个生产者同时爬取
t = threading.Thread(target=self.producer, args=(i,))
t.start()
producers.append(t)
consumer_thread = threading.Thread(target=self.consumer)
consumer_thread.start()
for p in producers:
p.join()
self.stop_flag = True
consumer_thread.join()
print("所有任务完成!")
if __name__ == '__main__':
crawler = CrawlerProducerConsumer()
crawler.run()
多线程的队列、消费者模式在爬虫的应用:(没仔细看)
import threading
import queue
import requests
from lxml import etree
import csv
import time
class XinfadiCrawler:
def __init__(self, max_workers=5):
self.url_queue = queue.Queue()
self.data_queue = queue.Queue()
self.max_workers = max_workers
def url_producer(self):
for page in range(1, 101): # 生成1-100页的URL
url = f"新发地网址/list/{page}.shtml"
self.url_queue.put(url)
print(f"生成URL: {url}")
def page_crawler(self):
while True:
try:
url = self.url_queue.get(timeout=3)
resp = requests.get(url)
html = etree.HTML(resp.text)
table = html.xpath("/html/body/div[2]/div[4]/div[1]/table")[0]
trs = table.xpath("./tr[position()>1]")
for tr in trs:
data = tr.xpath("./td/text()")
data = [item.strip() for item in data]
self.data_queue.put(data)
print(f"爬取完成: {url}, 获取{len(trs)}条数据")
self.url_queue.task_done()
except queue.Empty:
break
def data_consumer(self):
with open("data.csv", "w", newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(["品名", "最低价", "最高价", "平均价", "规格", "单位", "日期"])
while True:
try:
data = self.data_queue.get(timeout=3)
writer.writerow(data)
f.flush()
self.data_queue.task_done()
except queue.Empty:
if self.url_queue.empty():
break
def run(self):
url_producer_thread = threading.Thread(target=self.url_producer)
url_producer_thread.start()
crawler_threads = []
for i in range(self.max_workers):
t = threading.Thread(target=self.page_crawler)
t.start()
crawler_threads.append(t)
data_consumer_thread = threading.Thread(target=self.data_consumer)
data_consumer_thread.start()
url_producer_thread.join()
for t in crawler_threads:
t.join()
self.data_queue.join()
data_consumer_thread.join()
print("所有任务完成!")
# 运行
crawler = XinfadiCrawler(max_workers=5)
crawler.run()
| 标签.get (“href”) | BeautifulSoup | 标签拿 HTML 属性 |
| 字典.get (“name”) | Python 字典 | 拿字典里的键值 |
if item["pub_nums"].isdigit()
processed_list = []
for item in raw_data:
pub_num = int(item["pub_nums"]) if item["pub_nums"].isdigit() else 0
processed_list.append({
"rank": len(processed_list) + 1,
"job_name": item["job_name"],
"job_url": f"https://www.bankhr.com/so/{item['job_id']}.html",
"pub_nums": pub_num,
"pub_percent": min(int((pub_num / max_pub_num) * 100), 100),
"salary": format_salary(item["salary"]),
"city": item.get("city_name", "全国")
})
return sorted(processed_list, key=lambda x: x["pub_nums"], reverse=True)倒序就是从大到小