Logo

加载中

获取

—————————依旧置顶行。

无框架;淘汰版;混乱式;搅屎棍。

不是给人家看的,这下可以乱写了吧!!!

。。其实一直都在乱写啊。

等结束了就可以把仓库删了。

嗯。碎贴符合笔记需求。

r = 读文本

rb = 读图片

w = 写文本(覆盖)

wb = 写图片(覆盖)

a = 追加文本

ab = 追加二进制

ctrl+Y 反撤回

save

CSV = Comma-Separated Values意思是:用逗号分隔数据的表格。可以用 Excel / WPS / 记事本 直接打开。

with open("菜价.csv", mode="w", encoding="utf-8", newline="") as f:
    csvWriter = csv.writer(f)
    csvWriter.writerow(["菜品", "价格", "市场"])
    csvWriter.writerow(["白菜", 2.5, "朝阳市场"])

save

with open('high_like_comments.json', 'w', encoding='utf-8') as f:
    json.dump(all_comments, f, ensure_ascii=False, indent=2)

save

os.makedirs(save_dir, exist_ok=True)

save

save_path = os.path.join(save_dir, decoded_soup_name)
if not os.path.exists(save_path): 没有才写入,有 说明已有
   with open(save_path, "wb") as f:
       f.write(grand_response.content)

save

with open(f"Imgs/NASA_imgs/{img_name}", "wb") as f:
   f.write(img_response.content)

save

with open(DATA_SAVE_PATH, "w", encoding="utf-8") as f:
    json.dump(jobs, f, ensure_ascii=False, indent=2) 显示中文,缩进两格

regular

^ $ 字符串

( ) 捕获组

(?: ) 非捕获分组

(?!  )  排除
(?=  )  包含

(?P<名字>内容) 命名分组   (?P<group_name>.*?)

^  除了

\d  数字

\w  字母、数字、下划线

\d{3}-\d{4}

url\("([^"]+)"\)

\s  空白符

+   至少有一个或多个
*   可以有零个或多个

{2,}    至少2个  [a-zA-Z]{2,}

?   可选  (?: )?  整个分组是可选的

.*?  非贪婪匹配(用 . 匹配除了换行符之外的任意字符),直到接下去的文本被匹配到。

re.S 或 re.DOTALL 让 . 可以匹配换行符
S = Short for DOTALL

re.MULTILINE  让 ^ 和 $ 能匹配【每一行】的开头和结尾,而不是只匹配【整个文本】的开头和结尾。

re.sub = 查找并替换

regular

pattern = r'^\d{3}-\d{4}$'
re.match(pattern, "123-4567")
current_url = re.sub(r"currentPage=\d+", f"currentPage={page_num}", TARGET_URL)

命名分组:当一个式子里面有多个捕获组时好用:

re.compile(r"(姓名)(.*?)(年龄)(.*?)(电话)(.*?)")

这时 group(1)、(2)、(3)各自是姓名、年龄、电话
就可以用命名分组。
obj = re.compile(r"(姓名)(?P<姓名>.*?)(年龄)(?P<年龄>.*?)(电话)(?P<电话>.*?)")

match = obj.search(html)

match.group('名字')
match.group('年龄')
match.group('电话')
pattern = r"""
<li>
  (?P<name>.*?)   # 组1:电影名
  .*?
  (?P<url>http.*?) # 组2:链接
  .*?
  (?P<score>\d+\.\d) # 组3:评分
</li>
"""
^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9-]+\.[a-zA-Z]{2,}(?:\.[a-zA-Z]{2,})?$

^[a-zA-Z0-9._%+-]+@m\.gduf\.edu\.cn$

<a\s+.*?href="([^"]+)".*?>([^<]+)</a>
 
re.search(r'"token"\s*:\s*"([^"]+)"', html)
re.search(r'token\s*:\s*\'([^\']+)\'', html)
re.search(r'window\.token\s*=\s*"([^"]+)"', html)

visit

from urllib.request import urlopen
from urllib.parse import quote

query = quote("清华大学开源软件镜像站")
url = f"https://cn.bing.com/search?q={query}"

response = urlopen(url)
print(response.read())
response.close()

visit

dict = {
     "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0"
 }
response = requests.get(url, headers=dict)
print(response)
print(response.text)
response.close()

visit; save cookie

with sync_playwright() as p:
    browser = p.chromium.launch(headless=False, slow_mo=300, executable_path=EDGE_PATH, args=["--disable-popup-blocking"])
    page = browser.new_page()
    page.set_viewport_size({"width": 400, "height": 300})
    page.goto(TARGET_URL, wait_until="domcontentloaded")
    with open(COOKIE_PATH, 'r', encoding='utf-8') as f:
        cookies = eval(f.read())
        page.context.add_cookies(cookies)
    page.reload(wait_until="domcontentloaded")
else:
cookies = page.context.cookies()
with open(COOKIE_PATH, 'w', encoding='utf-8') as f:
    f.write(str(cookies))

page.wait_for_load_state("networkidle")
page_source = page.content()
soup = BeautifulSoup(page_source, 'html.parser')

status

状态码:response.status_code

response.raise_for_status()  # 状态码非200时抛出异常

target

# /html/body/div/div/div[2]/div[1]/div[1]
#
# <div class="head-img"
# style=("background-image: "
#        "url(https://s3gw.cmbimg.com/sc/JonWtH3SmRv7WFuP1lJ3SUn0GCs=/SZ::"
#        "ZnQ3YzJfaM7bQJjuEjZjYRZz)
# ;")>
# </div>

visit

import playwright.sync_api as playwright

with playwright.sync_playwright() as p:
            browser = p.chromium.launch(headless=False,channel="msedge")
            page = browser.new_page()
            page.goto(target_url)

visit

with sync_playwright() as p:
    browser = p.chromium.launch(headless=False, channel="chrome")
    page = browser.new_page()
    page.set_viewport_size({"width": 400, "height": 300})
    page.goto(TARGET_URL, wait_until="networkidle")
    page.wait_for_timeout(1000)
    page_html = page.content()
    browser.close()
    return page_html

regular

(style) => {
  const match = style.match(/url\("([^"]+)"\)/);
  return match ? match[1] : null;
}

body

head_img_div = page.wait_for_selector("div.head-img", timeout=10000)
style_attr = head_img_div.get_attribute("style")
match_quote = page.evaluate(r"""(style) => {const match = style.match(/url\("([^"]+)"\)/);return match ? match[1] : null;}""",style_attr )
print(f"URL:{match_quote}")
browser.close()

body

def parse_bank_info(page_html):

    soup = BeautifulSoup(page_html, "lxml")
    result_list = []
    target_container = soup.find("div", class_="yhksw_bankAd")
    items = target_container.find_all("div", class_="imagesdiv yhzpw_index_sec")

    for idx, item in enumerate(items, 1):
            href = item.find("a").get("href", "") if item.find("a") else ""
            title = item.find("img").get("alt", "") if item.find("img") else ""
            img_url = img_tag.get("src", "") if img_tag else ""

            result_list.append({
                "序号": idx,
                "标题": title,
                "链接": href,
                "图片URL": img_url
            })
    return result_list

token;远程百度翻译

visit

import requests
import re
import hashlib
import time
import json

def get_baidu_token():
    url = ""
    headers = {
        "User-Agent": "",
        "Accept": "",
        "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2"
    }
    response = requests.get(url, headers=headers)
    
response.close()

regular

token_match = re.search(r'"token"\s*:\s*"([^"]+)"', response.text)
if token_match:
    return token_match.group(1)

special; body; fail or i don’t know

def generate_baidu_sign(query):
    """生成 sign 和 salt(时间戳)"""
    sign_key = "fanyideskweb"
    key = "Ygy_4c=r#e#4EX^NUGUc5"
    salt = str(int(time.time() * 1000))
    sign_str = f"{sign_key}{query}{salt}{key}".encode("utf-8")
    return hashlib.md5(sign_str).hexdigest(), salt

query = input("请输入要翻译的内容:")
token = get_baidu_token()

if token:
    sign, salt = generate_baidu_sign(query)
    url = "https://fanyi.baidu.com/v2transapi"
    data = {
        "from": "auto",
        "to": "en",
        "query": query,
        "transtype": "realtime",
        "simple_means_flag": "3",
        "sign": sign,
        "salt": salt,
        "token": token,
        "domain": "common"
    }
    headers = {
        "User-Agent": "",
        "Content-Type": "application/x-www-form-urlencoded",
        "Referer": "https://fanyi.baidu.com/mtpe-individual/transText",
        "Cookie": ""
    }
    response = requests.post(url, data=data, headers=headers)
    if response.status_code == 200:
        result = response.json()
        print("\n翻译结果:")
        print(json.dumps(result, ensure_ascii=False, indent=2))
        if "trans_result" in result and result["trans_result"].get("data"):
            print("\n简洁结果:", result["trans_result"]["data"][0]["dst"])

regular;

finditer = find + iterator(一个迭代器,可以用 for 循环挨个拿结果。)

sentence = ""
results = re.finditer(r'\d+', sentence)
for match in results:
    print(match.group())

regular

obj1 = re.compile(r"热片.*?<ul>(.*?)</ul>", re.S)
命名分组
obj1 = re.compile(r"热片.*?<ul>(?P<ul>.*?)</ul>", re.S)
for match in obj1.finditer(html):
    ul = match.group('ul')
    print(ul)

visit

url里的参数可以重新封装;ElementTree对象

url = "https://movie.douban.com/top250"
param = {
     "start": 0,
     "filter": ""
}
headers={
	"User-Agent":""
}
response = requests.get(url, params=param, headers=headers)
print(response.request.headers)
print(response.request.url)
response.close()

第二种
resp = requests.get(url, varify=False)
resp.encoding = 'utf-8'
html = resp.text # 或者:html = etree.HTML(resp.text)

visit; body; csv

import csv
import requests
from bs4 import BeautifulSoup

url = ""
response = requests.get(url)

open("菜价.csv", "w", encoding="utf-8-sig", newline="")
csvWriter = csv.writer(f)

page = BeautifulSoup(response.text, "html.parser")

dataname = page.find("table", attrs={"class": "hq_table"})

trs = dataname.find_all("tr")[1:]
for tr in trs:
    tds = tr.find_all("td")
    name = tds[0].text
    avg = tds[1].text
    kind = tds[2].text
    print(name, avg, kind)
    csvWriter.writerow([name, avg, kind])
f.close()

xml;xpath

xpath 是一门语言:在 xml文档中搜索内容的一门语言。xpath路径 可以在网页源代码 copy xpath,但是记得copy对,有些是hidden标签。

html是xml的一个子集,所以可以在 html 上使用 xpath。

先安装 lxml:pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/lxml

target

# XML字符串,
xml = '''
<book>
<id>1</id>
<name>野花遍地香</name>
<price>1.23</price>
<nick>臭豆腐</nick>
<author>
    <nick id="10086">周大强</nick>
    <nick id="10010">周芷若</nick>
    <nick class="joy">周杰伦</nick>
    <nick class="jolin">蔡依林</nick>
    <div>
    	<nick>惹了</nick>
    </div>
</author>
</book>
'''
from lxml import etree

使用 xpath

第一个/是根节点。//是后代的意思。

写法 1:拿所有 nick

写法 2:拿 author 下面的所有 nick

写法 3:拿带 id=“10086” 的 nick

写法 4:拿 author 下面 div 里的 nick

tree = etree.XML(xml)
# ======================
result = tree.xpath("//nick/text()")
result = tree.xpath("/book/author/nick/text()")
result = tree.xpath("//nick[@id='10086']/text()")
result = tree.xpath("/book/author/div/nick/text()")


result = tree.xpath("/book") 只打印标签本身(是元素对象列表),
输出大概长这样:[<Element book at 0xxxxx>]

# 路径之间有 * 则是通配符
# . 是在该路径内包裹的东西中继续查找

例如,要在 author 里面 找 nick:
author = tree.xpath("//author")[0]   找到所有 <author> 标签,拿出第 1<author> 标签
author.xpath(".//nick")

对于 html 也是一样的:

html = etree.HTML(response.text)
divs = html.xpath("......")
for div in divs:
     price = div.xpath(".//div[@class='price']")[0]
     title = "saas".join(div.xpath("./div/div/a[2]/text()"))

visit;selenium;百度

from selenium import webdriver
from selenium.webdriver.edge.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

def test_edge_browser():
    edge_driver_path = "msedgedriver.exe"
    try:
        service = Service(edge_driver_path)
        driver = webdriver.Edge(service=service)
        driver.get("https://www.baidu.com")
        wait = WebDriverWait(driver, 10)
        
        body
    
    except Exception as e:
        print(f"测试过程中出错:{str(e)}")
    finally:
        driver.quit()

body;do

By.ID, “kw” # 百度搜索框

By.ID, “su” # 百度搜索按钮

        search_box = wait.until(
            EC.presence_of_element_located((By.ID, "kw"))
        )
        search_box.send_keys("输入内容")

        search_button = wait.until(
            EC.element_to_be_clickable((By.ID, "su"))
        )
        search_button.click()

        time.sleep(3)

        results = driver.find_elements(By.CSS_SELECTOR, "h3.t a")
        
        <h3 class="t">
  			<a>我是搜索结果标题</a>
		</h3>
        
        print(f"\n找到{len(results)}个搜索结果:")
        for i, result in enumerate(results[:5], 1): # 编号设为从一开始,但一直是取前五条
            print(f"{i}. {result.text}")

随机延时代码

time.sleep(random.uniform(2, 4))
page.wait_for_timeout(random.randint(1000, 2000))

body

soup = BeautifulSoup(response.text, 'html.parser')
rows = soup.find_all('tr')
for row_idx, row in enumerate(rows, 1):
    tds = row.find_all('td')
    for td_idx, td in enumerate(tds, 1):
        if 'style' in td.attrs and 'line-height:180%' in td['style']:
            a_tags = td.find_all('a')
            for a in a_tags:
                link = a.get('href')
                full_link = 'https://wiki.mbalib.com/' + link
                child_response = requests.get(full_link, headers=headers)
                child_soup = BeautifulSoup(child_response.text, 'html.parser')
                catmore_div = child_soup.find('div', attrs={"class": "boilerplate", "id": "catmore"}

                <div class="boilerplate" id="catmore">
                     我就是要找的那个区块!
                </div>

                href = catmore_div.find('a').get('href')

body

经典第一次,NASA

page = BeautifulSoup(response.text, "html.parser")
alist = page.find_all("a", attrs={"class": "elementor-post__thumbnail__link"})
for i, a_tag in enumerate(alist, 1):
      href = a_tag.get("href")
      if href and href.startswith(('http://', 'https://')):
            child_page_response = requests.get(href)
            child_page_response.encoding = 'utf-8'
            child_page = BeautifulSoup(child_page_response.text, "html.parser")
            for a_tag in child_page.find_all("a", href=True):
# 算了,我妥协了,我不拿那个高清的href的图片地址了,我拿那个a包裹的 < img src = > 的 src的地址
            if a_tag.find("img") and a_tag.get("href", "").lower().endswith(".jpg"):
                print(a_tag.get("href"))
改为!!!
                img_tag = a_tag.find("img")
                if img_tag and img_tag.get("src", "").lower().endswith(".jpg"):
                    target_img = img_tag

切片成列表

encoded_soup_name = grand_href.split("/")[-1]
text.split(",")

.startswith(”/”)

.endswith(“.html”)

target_a = None
target_img = None

target

有的网站直接把数据库数据用 var_dump 输出在页面上:PHP 的 var_dump () 打印出来的数组:

array(3) {
    [0]=> array(11) {
        ["id"]=> string(1) "1"
        ["name"]=> string(5) "小白"
        ["age"]=> string(2) "20"
    }
    [1]=> array(11) {
        ["id"]=> string(1) "2"
        ["name"]=> string(5) "小黄"
        ["age"]=> string(2) "22"
    }
    [2]=> array(11) {
        ["id"]=> string(1) "3"
        ["name"]=> string(5) "小红"
        ["age"]=> string(2) "25"
    }
}

body

ranking_data = []
data_pattern = re.compile( r"array\(\d+\) \{\s*((?:\[\d+\]=>\s*array\(11\) \{(?:.|\s)*?\}\s*)*)\}", re.MULTILINE)
item_pattern = re.compile(r"\[\d+\]=>\s*array\(11\) \{\s*(.*?)\s*\}", re.DOTALL)
key_value_pattern = re.compile(r'\["(\w+)"\]=>\s*string\(\d+\) "([^"]*)"')

data_match = data_pattern.search(response.text)
items = item_pattern.findall(data_match.group(1))
for item in items:
	job_info = {}
    key_values = key_value_pattern.findall(item)
    for key, value in key_values:
        job_info[key] = value
        if all(k in job_info for k in ["job_id", "job_name", "pub_nums", "salary"]):
            ranking_data.append(job_info)

visit

PAGE_RANGE = range(0, 1)  # 抓取第1页
page_html_list = []
with sync_playwright() as p:
    browser = p.chromium.launch(
        headless=False, channel="chrome",
        args=["--no-sandbox", "--disable-blink-features=AutomationControlled"]
    )
    page = browser.new_page()
    
这豆包怎么如此平易近人:
超级大白话:
新建一个空白标签页!
page = 这个标签页的操作对象
你之后所有操作都是用这个 page:
page.goto("https://www.baidu.com")
page.click("按钮")
page.fill("输入框", "内容")

    user_agents = [ "", "" ]
    page.set_extra_http_headers({"User-Agent": random.choice(user_agents)})
    for page_num in PAGE_RANGE:
        current_url = re.sub(r"currentPage=\d+", f"currentPage={page_num}", TARGET_URL)
        page.goto(current_url, wait_until="networkidle")  
        
        idle→ 空闲,"networkidle0"完全网络空闲,0个请求
        
        page.wait_for_selector('a[data-nick="job-detail-job-info"]', timeout=10000)

        <a data-nick="job-detail-job-info">
        
        page_html = page.content()
        page_html_list.append(page_html)
browser.close()

body

for page_idx, page_html in enumerate(page_html_list, 1):
    soup = BeautifulSoup(page_html, "lxml")
    job_links = soup.find_all('a', attrs={"data-nick": "job-detail-job-info"})
    for idx, job_link in enumerate(job_links, 1):
        salary_elem = job_link.find("span", class_=re.compile(r"job-salary"))
        salary = salary_elem.get_text().strip()
        location = ""
        location_box = job_link.find("div", class_=re.compile(r"job-dq-box"))
        if location_box:
            location_text = location_box.get_text().strip()
            location_match = re.search(r"([^】]+)", location_text)
            location = location_match.group(1).strip()

target

<div 
    class="joblist-item-job"
    sensorsdata='{
        "jobId": "123456",
        "jobTitle": "Python开发工程师",
        "jobSalary": "15-25K·13薪",
        "jobArea": "北京·朝阳区",
        "jobYear": "3-5年",
        "jobDegree": "本科"
    }'
>
</div>
<div class="joblist-item-job">  <!-- ← y 就是这个 -->
    <a class="cname" href="https://xxx">腾讯科技</a>
    <span class="dc">互联网</span>
    <span class="dc">上市公司</span>
    <span class="dc">1000-9999人</span>
	<div class="tags">
    	<div class="tag">五险一金</div>
    	<div class="tag">带薪年假</div>
    	<div class="tag">双休</div>
	</div>
    <button class="apply" onclick="goApply('https://url……')">申请</button>
</div>

body

soup = BeautifulSoup(page_html, "lxml")
recruit_list = []
job_items = soup.find_all("div", class_="joblist-item-job")
for idx, y in enumerate(job_items, 1):
    sensors_data = y.get("sensorsdata")
    i_json = json.loads(sensors_data.replace("&quot;", "\"")) loads是将json字符串转成py字典
    i_id = i_json.get("jobId", "")
    i_title = i_json.get("jobTitle", "")
    i_salary = i_json.get("jobSalary", "")
    i_area = i_json.get("jobArea", "")
    i_year = i_json.get("jobYear", "")
    i_degree = i_json.get("jobDegree", "")
    company_name = y.find("a", class_="cname").get_text(strip=True) if y.find("a", class_="cname") else ""
    company_url = y.find("a", class_="cname")["href"] if (y.find("a", class_="cname") and "href" in y.find("a", class_="cname").attrs) else ""
    company_industry = y.find_all("span", class_="dc")[0].get_text(strip=True) if len( y.find_all("span", class_="dc")) > 0 else ""
    company_type = y.find_all("span", class_="dc")[1].get_text(strip=True) if len( y.find_all("span", class_="dc")) > 1 else ""
    tags = [tag.get_text(strip=True)+"," for tag in y.find_all("div", class_="tag")] if y.find("div", class_="tags") else []
    apply_url = y.find("button", class_="apply")["onclick"].split("'")[1] if (y.find("button", class_="apply") and "onclick" in y.find("button", class_="apply").attrs) else ""

target

<li>   ← 父级(这就是 finance_elem 拿到的)
 └── <a>金融</a>  ← 这是 finance_a

body; save

finance_elem = None
finance_a = soup.find('a', string=lambda t: t and '金融' in t.strip())
finance_elem = finance_a.find_parent('li')

pattern = re.compile(r'<a\s+.*?href="((?!java).+?)".*?>([^<]+)</a>', re.DOTALL)
matches = pattern.findall(str(finance_elem))
y_set = set()
with open(save_path, 'w', encoding='utf-8') as f:
    for x, y in matches:
        decoded_x = unquote(x.strip(), encoding='utf-8')
        y_set.add(decoded_y)
        f.write(f'<a x="{decoded_x}">{decoded_y}</a><br>\n')
    browser.close()

visit; session

session = requests.session()
data = {
    "loginName":"",
    "password":""
}
url = ""
session.post(url, data = data)
resp = session.get("https://user.17k.com/......", headers={ "Cookie":""})
print(resp.text)
print(resp.cookies)

visit; change proxies

proxies = {
    "http": "http://127.0.0.1:1080",
}
resp = requests.get("url", proxies=proxies)

多协程:

import asyncio
import time
async def func1():
    print("helloTask111.")
    # 不能加同步操作,否则异步会中断。改成这个,要求协程找没有睡觉的程序继续干活
    await asyncio.sleep(2) 
    print("helloTask111.")
async def func2():
    print("helloTask222.")
    await asyncio.sleep(2)
    print("helloTask222.")
async def func3():
    print("helloTask333.")
    await asyncio.sleep(2)
    print("helloTask333.")
async def main():
    t1 = time.time()
    await asyncio.gather(
        func1(),
        func2(),
        func3()
    )
    t2 = time.time()
    print(f"总耗时: {t2 - t1:.2f}秒")
if __name__ == '__main__':
    asyncio.run(main())

多协程:

task = asyncio.create_task(函数)
tasks.append(task)
await asyncio.gather(*tasks)

多线程:

from threading import Thread
def func1():
    for i in range(0, 11):
        print("func1: helloWorld.", i)
def func2():
    for i in range(0, 11):
        print("func2: helloWorld.", i)
if __name__ == '__main__':
    t1 = Thread(target=func1)
    t1.start()
    t2 = Thread(target=func2)
    t2.start()
    for i in range(0,11):
        print("Main: helloWorld.",i)
可以传参:
def func1(name):
    for i in range(0, 11):
        print(f"func1: {name}", i)

if __name__ == '__main__':
    t1 = Thread(target=func1, args=("周杰伦",))# 必须是元组
    t1.start()

线程锁:

import threading
counter = 0
lock = threading.Lock()
def increment():
    global counter
    for _ in range(100):
        lock.acquire()
        counter += 1
        lock.release()         
        (推荐使用 with lock:)
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join() # 等待线程结束,可以设时限,t.join(timeout=2)

print(f"期望值: 200, 实际值: {counter}") # 输出正确: 200

线程锁在爬虫的应用; csv; save

import threading
import csv

file_lock = threading.Lock()
f = open("data.csv", "w")
csvwriter = csv.writer(f)

def save_data(data):
    with file_lock:  # 加锁保证一次只有一个线程写入
        csvwriter.writerow(data)
        f.flush()

多线程之队列、消费者模式:(没仔细看)

import threading
import queue
import time
import random

class CrawlerProducerConsumer:
    def __init__(self):
        self.data_queue = queue.Queue(maxsize=100)
        self.stop_flag = False
        
    def producer(self, producer_id):
        for i in range(5):
            time.sleep(random.random())
            data = f"生产者{producer_id}爬取的数据-{i}"
            self.data_queue.put(data)
            print(f"[生产者{producer_id}] 生产: {data},队列大小: {self.data_queue.qsize()}")
        print(f"[生产者{producer_id}] 完成生产")
    
    def consumer(self):
        while not (self.stop_flag and self.data_queue.empty()):
            try:
                data = self.data_queue.get(timeout=1)
                time.sleep(0.5)
                print(f"[消费者] 消费: {data},剩余: {self.data_queue.qsize()}")
                self.data_queue.task_done()
            except queue.Empty:
                continue
        print("[消费者] 完成消费")
    
    def run(self):
        producers = []
        for i in range(3):  # 3个生产者同时爬取
            t = threading.Thread(target=self.producer, args=(i,))
            t.start()
            producers.append(t)
        
        consumer_thread = threading.Thread(target=self.consumer)
        consumer_thread.start()
        for p in producers:
            p.join()
        self.stop_flag = True
        consumer_thread.join()
        
        print("所有任务完成!")

if __name__ == '__main__':
    crawler = CrawlerProducerConsumer()
    crawler.run()

多线程的队列、消费者模式在爬虫的应用:(没仔细看)


import threading
import queue
import requests
from lxml import etree
import csv
import time

class XinfadiCrawler:
    def __init__(self, max_workers=5):
        self.url_queue = queue.Queue()
        self.data_queue = queue.Queue()
        self.max_workers = max_workers
        
    def url_producer(self):
        for page in range(1, 101):  # 生成1-100页的URL
            url = f"新发地网址/list/{page}.shtml"
            self.url_queue.put(url)
            print(f"生成URL: {url}")
    
    def page_crawler(self):
        while True:
            try:
                url = self.url_queue.get(timeout=3)
                resp = requests.get(url)
                html = etree.HTML(resp.text)
                table = html.xpath("/html/body/div[2]/div[4]/div[1]/table")[0]
                trs = table.xpath("./tr[position()>1]")
                
                for tr in trs:
                    data = tr.xpath("./td/text()")
                    data = [item.strip() for item in data]
                    self.data_queue.put(data)
                
                print(f"爬取完成: {url}, 获取{len(trs)}条数据")
                self.url_queue.task_done()
                
            except queue.Empty:
                break
    
    def data_consumer(self):
        with open("data.csv", "w", newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(["品名", "最低价", "最高价", "平均价", "规格", "单位", "日期"])
            
            while True:
                try:
                    data = self.data_queue.get(timeout=3)
                    writer.writerow(data)
                    f.flush()

                    self.data_queue.task_done()
                    
                except queue.Empty:
                    if self.url_queue.empty():
                        break
    
    def run(self):
        url_producer_thread = threading.Thread(target=self.url_producer)
        url_producer_thread.start()
        crawler_threads = []
        for i in range(self.max_workers):
            t = threading.Thread(target=self.page_crawler)
            t.start()
            crawler_threads.append(t)
        data_consumer_thread = threading.Thread(target=self.data_consumer)
        data_consumer_thread.start()
        url_producer_thread.join()
        for t in crawler_threads:
            t.join()
        self.data_queue.join()
        data_consumer_thread.join()
        
        print("所有任务完成!")

# 运行
crawler = XinfadiCrawler(max_workers=5)
crawler.run()
标签.get (“href”)BeautifulSoup标签拿 HTML 属性
字典.get (“name”)Python 字典拿字典里的键值
if item["pub_nums"].isdigit()
processed_list = []
for item in raw_data:
    pub_num = int(item["pub_nums"]) if item["pub_nums"].isdigit() else 0
    processed_list.append({
        "rank": len(processed_list) + 1,
        "job_name": item["job_name"],
        "job_url": f"https://www.bankhr.com/so/{item['job_id']}.html",
        "pub_nums": pub_num,
        "pub_percent": min(int((pub_num / max_pub_num) * 100), 100),
        "salary": format_salary(item["salary"]),
        "city": item.get("city_name", "全国")
    })
return sorted(processed_list, key=lambda x: x["pub_nums"], reverse=True)倒序就是从大到小

友情提示

哎呀,主包的博客还没有很多朋友呢,快来加入我的友链队伍吧!