#!/usr/bin/env python3
"""Scrape Chinese gov sites -> RSS feeds + ntfy notifications."""
import json, os, ssl, subprocess, urllib.request, re, random, string, hashlib, hmac, time
from datetime import datetime, timezone
from xml.sax.saxutils import escape
from html.parser import HTMLParser
import html as htmlmod
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
STATIC_DIR = "/home/docker/static"
NTFY_URL = "http://127.0.0.1:8091/"
LEGACY_CONF = "/tmp/openssl_legacy.cnf"
NTFY_TOPICS = {
"mission": "china-eu-mission",
"france": "france-edu",
"study": "studyinchina",
"bit": "bit-en-news",
"bit-cn": "bit-cn-announcements",
"bit-rss": "bit-all-notifications",
}
TRANSLATE_API = "https://api.chinese-learning.cn/transfer-api/translation/translate"
TRANSLATE_PAGE_ID = "f7adb5be8b83f712b3ea67a38a96a2d4"
TRANSLATE_APP_ID = "oosctl"
BAIDU_APPID = "20260614002631590"
BAIDU_SECRET = "X_BcGA7wYbvYVwzkBirk"
BAIDU_API = "https://fanyi-api.baidu.com/api/trans/vip/translate"
def make_legacy_conf():
if not os.path.exists(LEGACY_CONF):
with open(LEGACY_CONF, "w") as f:
f.write("openssl_conf = openssl_init\n[openssl_init]\nssl_conf = ssl_sect\n[ssl_sect]\nsystem_default = system_default_sect\n[system_default_sect]\nOptions = UnsafeLegacyRenegotiation\n")
def fetch(url, timeout=20):
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
return urllib.request.urlopen(req, timeout=timeout).read().decode("utf-8", errors="replace")
except Exception as e:
print(f" ERR {url[:60]}: {e}")
return None
def fetch_legacy(url, timeout=20):
make_legacy_conf()
env = os.environ.copy()
env["OPENSSL_CONF"] = LEGACY_CONF
try:
code = 'import ssl, urllib.request\nctx = ssl.create_default_context()\nctx.check_hostname = False\nctx.verify_mode = ssl.CERT_NONE\nreq = urllib.request.Request(' + repr(url) + ', headers={"User-Agent": "Mozilla/5.0"})\nprint(urllib.request.urlopen(req, timeout=' + str(timeout) + ', context=ctx).read().decode("utf-8", errors="replace"), end="")'
r = subprocess.run(["python3", "-c", code], capture_output=True, text=True, env=env, timeout=timeout)
if r.returncode == 0 and r.stdout:
return r.stdout
print(f" LEGACY FAIL: {r.stderr[:200]}")
return None
except Exception as e:
print(f" LEGACY ERR: {e}")
return None
def send_ntfy(title, url, tag, section="", topic_key=""):
topic = NTFY_TOPICS.get(topic_key, "china-eu-mission")
full_url = url if url.startswith("http") else "https://eu.china-mission.gov.cn" + url[1:]
data = json.dumps({"topic": topic, "title": title[:80], "message": section or tag, "tags": [tag, "china"], "click": full_url, "priority": 5}, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(NTFY_URL, data=data, headers={"Content-Type": "application/json; charset=utf-8"})
urllib.request.urlopen(req, timeout=10)
print(f" NTFY[{topic}]: {title[:50]}")
except Exception as e:
print(f" NTFY FAIL: {e}")
def translate_title(text, timeout=10):
if not text or len(text) < 2:
return text
nonce = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
ts = str(int(time.time() * 1000))
sign = hmac.new(TRANSLATE_APP_ID.encode(), (nonce + ts).encode(), hashlib.sha256).hexdigest()
payload = json.dumps({"pageId": TRANSLATE_PAGE_ID, "from": "cn", "to": "en", "text": text, "appId": TRANSLATE_APP_ID}).encode()
headers = {"Content-Type": "application/json;charset=UTF-8", "Accept": "application/json, text/plain, */*", "User-Agent": "Mozilla/5.0", "Origin": "https://www.studyinchina.edu.cn", "Referer": "https://www.studyinchina.edu.cn/", "appid": TRANSLATE_APP_ID, "nonce": nonce, "timestamp": ts, "sign": sign}
req = urllib.request.Request(TRANSLATE_API, data=payload, headers=headers)
try:
resp = urllib.request.urlopen(req, timeout=timeout)
result = json.loads(resp.read().decode())
if result.get("code") == 0 or result.get("data"):
dst = result["data"].get("dst", "")
if dst and len(dst) > 2:
return dst
return text
except Exception as e:
print(f" TRANSLATE FAIL for '{text[:30]}': {e}")
return text
def strip_html(s):
s = htmlmod.unescape(s)
return re.sub(r'<[^>]+>', '', s).strip()
def html_content(raw_html, base_url=""):
if not raw_html:
return ""
s = htmlmod.unescape(raw_html)
s = re.sub(r'', '', s, flags=re.DOTALL | re.IGNORECASE)
s = re.sub(r'', '', s, flags=re.DOTALL | re.IGNORECASE)
s = re.sub(r'', '', s, flags=re.DOTALL)
s = re.sub(r'\s+style="[^"]*"', '', s)
s = re.sub(r'\s+class="[^"]*"', '', s)
s = re.sub(r'\s+width="[^"]*"', '', s)
s = re.sub(r'\s+height="[^"]*"', '', s)
def tag_filter(m):
tag = m.group(2).lower()
rest = m.group(0)
if tag in ('img', 'p', 'br', 'a'):
if tag == 'img' and base_url:
rest = re.sub(r'src="(/[^"]+)"', f'src="{base_url}\\1"', rest)
return rest
return ""
s = re.sub(r'<(/?)(\w+)([^>]*)(/?)>', tag_filter, s)
s = re.sub(r'\n\s*\n', '\n', s)
return s.strip()
def is_chinese(text):
if not text:
return False
cn = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
total = len(text.strip().replace(" ", ""))
return total > 0 and cn / total > 0.3
def translate_body(text, timeout=10):
if not text or len(text) < 4 or not is_chinese(text):
return text
salt = str(random.randint(10000, 99999))
sign = hashlib.md5((BAIDU_APPID + text + salt + BAIDU_SECRET).encode()).hexdigest()
try:
data = urllib.parse.urlencode({"q": text, "from": "zh", "to": "en", "appid": BAIDU_APPID, "salt": salt, "sign": sign}).encode()
req = urllib.request.Request(BAIDU_API, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"})
resp = urllib.request.urlopen(req, timeout=timeout)
result = json.loads(resp.read().decode())
if "trans_result" in result and result["trans_result"]:
return result["trans_result"][0].get("dst", text)
if "error_code" in result:
print(f" BAIDU ERR {result.get('error_code')}: {result.get('error_msg','')}")
return ""
except Exception as e:
print(f" BAIDU FAIL: {e}")
return ""
def load_seen(name):
p = os.path.join(SCRIPTS_DIR, f"seen_{name}.json")
return json.load(open(p)) if os.path.exists(p) else {}
def save_seen(name, seen):
json.dump(seen, open(os.path.join(SCRIPTS_DIR, f"seen_{name}.json"), "w"), indent=2, ensure_ascii=False)
def write_rss(filename, items, title, desc, link):
os.makedirs(STATIC_DIR, exist_ok=True)
now = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S +0000")
rss = '\n\n\n'
rss += f' {escape(title)}\n {escape(link)}\n {escape(desc)}\n {now}\n'
for key, val in items[:50]:
t = escape(val.get("title", ""))
d = val.get("description", "")
is_html = bool(re.search(r'<[a-z]+[\s>]', d))
rss += f" \n {t}\n {escape(key)}\n {escape(key)}\n"
if is_html:
rss += f" \n"
else:
rss += f" {escape(d) or t}\n"
if val.get("date"):
dd = val["date"].replace("\u5e74", "-").replace("\u6708", "-").replace("\u65e5", "")
try:
rss += datetime.strptime(dd, "%Y-%m-%d").strftime(' %a, %d %b %Y %H:%M:%S +0000\n')
except: pass
rss += " \n"
rss += "\n"
open(os.path.join(STATIC_DIR, filename), "w", encoding="utf-8").write(rss)
print(f" RSS: {filename} ({len(items)} items)")
# Site 1: eu.china-mission.gov.cn ---------------------------------------
MISSION = "https://eu.china-mission.gov.cn/eng"
class MissionHTML(HTMLParser):
def __init__(self):
super().__init__()
self.sections = []; self.cur = None; self.in_list = False
self.in_span = False; self.in_a = False; self.item = {}; self.stack = []; self.st = ""
def handle_starttag(self, tag, attrs):
self.stack.append(tag); a = dict(attrs)
if tag == "h4": self.st = ""
if tag == "div" and a.get("class") == "info":
if self.cur: self.sections.append(self.cur)
self.cur = {"title": "", "items": []}
if self.cur and a.get("class") == "list": self.in_list = True
if self.in_list and tag == "li": self.item = {"title": "", "url": "", "date": ""}
if self.in_list and tag == "a" and "href" in a: self.in_a = True; self.item["url"] = a["href"]
if tag == "span" and self.in_list: self.in_span = True
def handle_endtag(self, tag):
self.stack.pop()
if tag == "h4" and self.cur: self.cur["title"] = self.st.strip()
if tag == "div": self.in_list = False
if tag == "li":
if self.item.get("title") and self.cur: self.cur["items"].append(self.item.copy())
self.item = {}
if tag == "a": self.in_a = False
if tag == "span": self.in_span = False
def handle_data(self, data):
s = data.strip()
if not s: return
t = [x for x in self.stack if x not in ("div", "ul", "li")]
if "h4" in t and self.cur: self.st += data
if self.in_span and self.item: self.item["date"] = s.strip("()\uff08\uff09")
if self.in_a and self.item and s: self.item["title"] += data.strip()
def scrape_mission():
print("\n=== eu.china-mission.gov.cn ===")
page_html = fetch(MISSION + "/")
if not page_html: return
p = MissionHTML(); p.feed(page_html)
if p.cur: p.sections.append(p.cur)
seen = load_seen("mission"); n = 0
for sec in p.sections:
sn = sec.get("title", "Unknown")
for it in sec["items"]:
url = it.get("url", "")
url = url if url.startswith("http") else MISSION + (url if url.startswith("/") else "/" + url)
t = it.get("title", "").strip()
if not t or not url: continue
if url not in seen:
seen[url] = {"title": t, "date": it.get("date", "")}
send_ntfy(t, url, "newspaper", sn, "mission"); n += 1
write_rss("china-eu-mission.xml", list(seen.items())[::-1], "China-EU Mission", "eu.china-mission.gov.cn", MISSION)
save_seen("mission", seen); print(f" New: {n}, Total: {len(seen)}")
# Site 2: france.lxgz.org.cn -------------------------------------------
FRANCE = "https://france.lxgz.org.cn"
def parse_france(html_text):
items = []
m = re.search(r'
]*class="news_list"[^>]*>(.*?)
', html_text, re.DOTALL)
if not m: return items
for li in re.findall(r'
]*>(.*?)
', m.group(1), re.DOTALL):
a = re.search(r']*href="([^"]+)"[^>]*\btitle="([^"]*)"', li)
if not a:
a = re.search(r']*href="([^"]+)"[^>]*>', li)
if not a: continue
txt = li.split("")[0]
t = re.sub(r'<[^>]+>', "", txt[txt.rfind(">")+1:]) if ">" in txt else ""
else:
t = a.group(2)
h = a.group(1)
url = h if h.startswith("http") else FRANCE + ("/" + h.lstrip("/") if h.startswith("/") else "/" + h)
dm = re.search(r'([^<]+)', li); d = dm.group(1) if dm else ""
if re.match(r"^\d{2}-\d{2}$", d):
y = re.search(r"/(\d{4})\d{10}", h)
if y: d = y.group(1) + "-" + d
if t: items.append({"title": t.strip(), "url": url, "date": d})
return items
def scrape_france():
print("\n=== france.lxgz.org.cn ===")
page_html = fetch_legacy(FRANCE + "/")
if not page_html: return
items = parse_france(page_html)
if not items: print(" No items found"); return
seen = load_seen("france"); n = 0
for it in items:
if it["url"] not in seen:
seen[it["url"]] = {"title": it["title"], "date": it.get("date", "")}
send_ntfy(it["title"], it["url"], "flag-france", "\u65b0\u95fb\u52a8\u6001", "france"); n += 1
write_rss("france-edu.xml", list(seen.items())[::-1], "France Education News", "france.lxgz.org.cn", FRANCE)
save_seen("france", seen); print(f" New: {n}, Total: {len(seen)}")
# Site 3: studyinchina.edu.cn ------------------------------------------
STUDY = "https://www.studyinchina.edu.cn"
STUDY_API = "https://www.studyinchina.edu.cn/api/lxzgw/cms/GetArticleLst"
STUDY_TYPE = "2fe6c123b6924b01a6c7d653240c0dfb"
def api_post_legacy(url, data, timeout=20):
make_legacy_conf()
env = os.environ.copy()
env["OPENSSL_CONF"] = LEGACY_CONF
body = json.dumps(data, ensure_ascii=False)
code = f'import ssl, urllib.request, json\nctx = ssl.create_default_context()\nctx.check_hostname = False\nctx.verify_mode = ssl.CERT_NONE\nbody = {repr(body)}\nreq = urllib.request.Request({repr(url)}, data=body.encode(), headers={{"User-Agent": "Mozilla/5.0", "Content-Type": "application/json;charset=UTF-8", "Accept": "application/json", "Referer": "https://www.studyinchina.edu.cn/", "Origin": "https://www.studyinchina.edu.cn"}})\ntry:\n resp = urllib.request.urlopen(req, timeout={timeout}, context=ctx)\n print(resp.read().decode(), end="")\nexcept Exception as e:\n print("ERROR:" + str(e), end="")'
try:
r = subprocess.run(["python3", "-c", code], capture_output=True, text=True, env=env, timeout=timeout)
out = r.stdout.strip()
if r.returncode != 0 or not out or out.startswith("ERROR:"):
if out: print(f" API ERR: {out[:200]}")
return None
return json.loads(out)
except Exception as e:
print(f" API POST ERR: {e}")
return None
def scrape_study():
scrape_bit_rss()
print("\n=== studyinchina.edu.cn ===")
payload = {"type": STUDY_TYPE, "pageNo": 1, "pageSize": 50, "isSort": 1}
result = api_post_legacy(STUDY_API, payload)
if not result:
print(" No response"); return
code = result.get("code", -1)
if code != 0:
print(f" API error: code={code}"); return
records = result.get("data", {}).get("records", [])
if not records:
print(" No records found"); return
print(f" API: {len(records)} records")
seen = load_seen("study"); n = 0
translated = 0
for r in records:
art_id = r.get("id", "")
title = r.get("title", "").strip()
date = (r.get("showDate") or "")[:10]
content = r.get("content", "")
url = f"https://www.studyinchina.edu.cn/articleDetail?arcId={art_id}"
if not title or not art_id: continue
if url not in seen:
en_title = translate_title(title)
body_text = strip_html(content)
body_en = translate_body(body_text[:300]) if is_chinese(body_text) else ""
seen[url] = {"title": title, "en": en_title, "date": date.replace(" ", ""), "content": content, "body_en": body_en}
send_ntfy(title, url, "mortar-board", "Latest News", "study"); n += 1
translated += 1
time.sleep(1.0)
elif not seen[url].get("content"):
seen[url]["content"] = content
# backfill: translate body_en for latest 3 without it
backfilled = 0
for url, val in list(seen.items())[:5]:
if backfilled >= 3: break
if not val.get("body_en"):
c = val.get("content", "")
bt = strip_html(c)
if bt and is_chinese(bt) and len(bt) > 10:
val["body_en"] = translate_body(bt[:300])
backfilled += 1
time.sleep(1.0)
if translated or backfilled:
print(f" Translated: {translated + backfilled}")
rss_items = []
for url, val in list(seen.items())[::-1]:
v = val.copy()
if "en" in v and v["en"] != v.get("title", ""):
v["title"] = v["en"]
c = val.get("content", "")
if c:
body_html = html_content(c, STUDY)
if body_html:
v["description"] = body_html[:2500]
if not v.get("description"):
v["description"] = val.get("title", "")
rss_items.append((url, v))
write_rss("studyinchina.xml", rss_items, "Study in China News", "From studyinchina.edu.cn", STUDY)
save_seen("study", seen)
print(f" New: {n}, Total: {len(seen)}")
# Site 4: english.bit.edu.cn ------------------------------------------
BIT = "https://english.bit.edu.cn"
def scrape_bit():
print("\n=== english.bit.edu.cn ===")
page_html = fetch(BIT + "/latest.html")
if not page_html: return
articles = re.findall(r']*href="(\d{4}-\d{2}/\d{2}/c_\d+\.htm)"[^>]*>(.*?)', page_html, re.DOTALL)
seen = load_seen("bit"); n = 0
for href, text in articles:
clean = re.sub(r'<[^>]+>', '', text).strip()
if not clean: continue
url = BIT + "/" + href
dm = re.search(r"(\d{4}-\d{2}/\d{2})", href)
date = dm.group(1) if dm else ""
if url not in seen:
body = ""
detail = fetch(url)
if detail:
m = re.search(r'
]*class="Artical_Content_Text"[^>]*>(.*?)
', detail, re.DOTALL)
if m: body = html_content(m.group(1), BIT)
seen[url] = {"title": clean, "date": date, "content": body}
send_ntfy(clean, url, "university", "BIT News", "bit"); n += 1
elif not seen[url].get("content"):
detail = fetch(url)
if detail:
m = re.search(r'
]*class="Artical_Content_Text"[^>]*>(.*?)
', detail, re.DOTALL)
if m: seen[url]["content"] = html_content(m.group(1), BIT)
rss_items = []
for url, val in list(seen.items())[::-1]:
v = val.copy()
c = val.get("content", "")
if c:
v["description"] = c[:800]
if not v.get("description"):
v["description"] = val.get("title", "")
rss_items.append((url, v))
write_rss("bit-en-news.xml", rss_items, "BIT English News", "english.bit.edu.cn", BIT)
save_seen("bit", seen); print(f" New: {n}, Total: {len(seen)}")
# Site 5: www.bit.edu.cn (学工天地) ------------------------------------
BIT_CN = "https://www.bit.edu.cn"
def scrape_bit_cn():
print("\n=== www.bit.edu.cn (学工天地) ===")
page_html = fetch(BIT_CN + "/tzgg17/wthd132/")
if not page_html: return
articles = re.findall(r']*href="([a-f0-9]{32}\.htm)"[^>]*>(.*?)', page_html, re.DOTALL)
seen = load_seen("bit_cn"); n = 0
for href, text in articles:
clean = re.sub(r'<[^>]+>', '', text).strip()
if not clean: continue
url = f"{BIT_CN}/tzgg17/wthd132/{href}"
if url not in seen:
body = ""
detail = fetch(url)
if detail:
m = re.search(r'
]*class="[^"]*v_news_content[^"]*"[^>]*>(.*?)
', detail, re.DOTALL)
if not m:
m = re.search(r'
]*class="content"[^>]*>(.*?)
', detail, re.DOTALL)
if m: body = html_content(m.group(1), BIT_CN)
seen[url] = {"title": clean, "content": body}
send_ntfy(clean, url, "bell", "BIT Announcements", "bit-cn"); n += 1
elif not seen[url].get("content"):
detail = fetch(url)
if detail:
m = re.search(r'
]*class="[^"]*v_news_content[^"]*"[^>]*>(.*?)
', detail, re.DOTALL)
if not m:
m = re.search(r'
]*class="content"[^>]*>(.*?)
', detail, re.DOTALL)
if m: seen[url]["content"] = html_content(m.group(1), BIT_CN)
rss_items = []
for url, val in list(seen.items())[::-1]:
v = val.copy()
c = val.get("content", "")
if c:
body_text = strip_html(c)
if is_chinese(body_text) and len(body_text) > 10:
en_desc = translate_body(body_text[:300])
v["description"] = "
" + htmlmod.escape(en_desc[:400]) + "
" + c[:400] + "
"
else:
v["description"] = c[:800]
if not v.get("description"):
v["description"] = val.get("title", "")
rss_items.append((url, v))
write_rss("bit-cn-announcements.xml", rss_items, "BIT 学工天地 Announcements", "www.bit.edu.cn", BIT_CN)
save_seen("bit_cn", seen); print(f" New: {n}, Total: {len(seen)}")
# Site 6: BIT all-in-one RSS feed -------------------------------------
BIT_RSS_URL = "https://haobit.top/feed.rss"
def scrape_bit_rss():
print(chr(10) + '=== BIT all-in-one RSS ===')
import xml.etree.ElementTree as ET
page_html = fetch(BIT_RSS_URL)
if not page_html: return
try:
root = ET.fromstring(page_html)
except ET.ParseError as e:
print(f' XML parse error: {e}'); return
seen = load_seen('bit_rss'); n = 0
for item in root.findall('.//item'):
title = (item.findtext('title') or '').strip()
link = (item.findtext('link') or '').strip()
pub = (item.findtext('pubDate') or '').strip()
cat = (item.findtext('category') or '').strip()
if not title or not link: continue
if link not in seen:
en_title = translate_title(title) if is_chinese(title) else title
seen[link] = {'title': title, 'en': en_title, 'date': pub, 'category': cat}
send_ntfy(title, link, 'school', cat or 'BIT News', 'bit-rss'); n += 1
time.sleep(1.0)
rss_items = []
for link, val in list(seen.items())[::-1]:
v = val.copy()
if 'en' in v and v['en'] != v.get('title', ''):
v['title'] = v['en']
cat = val.get('category', '')
v['description'] = f'来自{cat}' if cat else 'BIT Notification'
rss_items.append((link, v))
write_rss('bit-all-notifications.xml', rss_items, 'BIT All Notifications', 'haobit.top', BIT_RSS_URL)
save_seen('bit_rss', seen)
print(f' New: {n}, Total: {len(seen)}')
# Main ------------------------------------------------------------------
if __name__ == "__main__":
t = datetime.now().strftime("%Y-%m-%d %H:%M")
print(f"[{t}] Scraper run")
scrape_mission()
scrape_france()
scrape_bit()
scrape_bit_cn()
scrape_study()
scrape_bit_rss()
print("Done")