#!/usr/bin/env python3 """Scrape Chinese gov sites -> RSS feeds + ntfy notifications.""" import json, os, ssl, subprocess, urllib.request, re, random, string, hashlib, hmac, time from datetime import datetime, timezone from xml.sax.saxutils import escape from html.parser import HTMLParser import html as htmlmod SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) STATIC_DIR = "/home/docker/static" NTFY_URL = "http://127.0.0.1:8091/" LEGACY_CONF = "/tmp/openssl_legacy.cnf" NTFY_TOPICS = { "mission": "china-eu-mission", "france": "france-edu", "study": "studyinchina", "bit": "bit-en-news", "bit-cn": "bit-cn-announcements", "bit-rss": "bit-all-notifications", } TRANSLATE_API = "https://api.chinese-learning.cn/transfer-api/translation/translate" TRANSLATE_PAGE_ID = "f7adb5be8b83f712b3ea67a38a96a2d4" TRANSLATE_APP_ID = "oosctl" BAIDU_APPID = "20260614002631590" BAIDU_SECRET = "X_BcGA7wYbvYVwzkBirk" BAIDU_API = "https://fanyi-api.baidu.com/api/trans/vip/translate" def make_legacy_conf(): if not os.path.exists(LEGACY_CONF): with open(LEGACY_CONF, "w") as f: f.write("openssl_conf = openssl_init\n[openssl_init]\nssl_conf = ssl_sect\n[ssl_sect]\nsystem_default = system_default_sect\n[system_default_sect]\nOptions = UnsafeLegacyRenegotiation\n") def fetch(url, timeout=20): try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) return urllib.request.urlopen(req, timeout=timeout).read().decode("utf-8", errors="replace") except Exception as e: print(f" ERR {url[:60]}: {e}") return None def fetch_legacy(url, timeout=20): make_legacy_conf() env = os.environ.copy() env["OPENSSL_CONF"] = LEGACY_CONF try: code = 'import ssl, urllib.request\nctx = ssl.create_default_context()\nctx.check_hostname = False\nctx.verify_mode = ssl.CERT_NONE\nreq = urllib.request.Request(' + repr(url) + ', headers={"User-Agent": "Mozilla/5.0"})\nprint(urllib.request.urlopen(req, timeout=' + str(timeout) + ', context=ctx).read().decode("utf-8", errors="replace"), end="")' r = subprocess.run(["python3", "-c", code], capture_output=True, text=True, env=env, timeout=timeout) if r.returncode == 0 and r.stdout: return r.stdout print(f" LEGACY FAIL: {r.stderr[:200]}") return None except Exception as e: print(f" LEGACY ERR: {e}") return None def send_ntfy(title, url, tag, section="", topic_key=""): topic = NTFY_TOPICS.get(topic_key, "china-eu-mission") full_url = url if url.startswith("http") else "https://eu.china-mission.gov.cn" + url[1:] data = json.dumps({"topic": topic, "title": title[:80], "message": section or tag, "tags": [tag, "china"], "click": full_url, "priority": 5}, ensure_ascii=False).encode("utf-8") try: req = urllib.request.Request(NTFY_URL, data=data, headers={"Content-Type": "application/json; charset=utf-8"}) urllib.request.urlopen(req, timeout=10) print(f" NTFY[{topic}]: {title[:50]}") except Exception as e: print(f" NTFY FAIL: {e}") def translate_title(text, timeout=10): if not text or len(text) < 2: return text nonce = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) ts = str(int(time.time() * 1000)) sign = hmac.new(TRANSLATE_APP_ID.encode(), (nonce + ts).encode(), hashlib.sha256).hexdigest() payload = json.dumps({"pageId": TRANSLATE_PAGE_ID, "from": "cn", "to": "en", "text": text, "appId": TRANSLATE_APP_ID}).encode() headers = {"Content-Type": "application/json;charset=UTF-8", "Accept": "application/json, text/plain, */*", "User-Agent": "Mozilla/5.0", "Origin": "https://www.studyinchina.edu.cn", "Referer": "https://www.studyinchina.edu.cn/", "appid": TRANSLATE_APP_ID, "nonce": nonce, "timestamp": ts, "sign": sign} req = urllib.request.Request(TRANSLATE_API, data=payload, headers=headers) try: resp = urllib.request.urlopen(req, timeout=timeout) result = json.loads(resp.read().decode()) if result.get("code") == 0 or result.get("data"): dst = result["data"].get("dst", "") if dst and len(dst) > 2: return dst return text except Exception as e: print(f" TRANSLATE FAIL for '{text[:30]}': {e}") return text def strip_html(s): s = htmlmod.unescape(s) return re.sub(r'<[^>]+>', '', s).strip() def html_content(raw_html, base_url=""): if not raw_html: return "" s = htmlmod.unescape(raw_html) s = re.sub(r']*>.*?', '', s, flags=re.DOTALL | re.IGNORECASE) s = re.sub(r']*>.*?', '', s, flags=re.DOTALL | re.IGNORECASE) s = re.sub(r'', '', s, flags=re.DOTALL) s = re.sub(r'\s+style="[^"]*"', '', s) s = re.sub(r'\s+class="[^"]*"', '', s) s = re.sub(r'\s+width="[^"]*"', '', s) s = re.sub(r'\s+height="[^"]*"', '', s) def tag_filter(m): tag = m.group(2).lower() rest = m.group(0) if tag in ('img', 'p', 'br', 'a'): if tag == 'img' and base_url: rest = re.sub(r'src="(/[^"]+)"', f'src="{base_url}\\1"', rest) return rest return "" s = re.sub(r'<(/?)(\w+)([^>]*)(/?)>', tag_filter, s) s = re.sub(r'\n\s*\n', '\n', s) return s.strip() def is_chinese(text): if not text: return False cn = sum(1 for c in text if '\u4e00' <= c <= '\u9fff') total = len(text.strip().replace(" ", "")) return total > 0 and cn / total > 0.3 def translate_body(text, timeout=10): if not text or len(text) < 4 or not is_chinese(text): return text salt = str(random.randint(10000, 99999)) sign = hashlib.md5((BAIDU_APPID + text + salt + BAIDU_SECRET).encode()).hexdigest() try: data = urllib.parse.urlencode({"q": text, "from": "zh", "to": "en", "appid": BAIDU_APPID, "salt": salt, "sign": sign}).encode() req = urllib.request.Request(BAIDU_API, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}) resp = urllib.request.urlopen(req, timeout=timeout) result = json.loads(resp.read().decode()) if "trans_result" in result and result["trans_result"]: return result["trans_result"][0].get("dst", text) if "error_code" in result: print(f" BAIDU ERR {result.get('error_code')}: {result.get('error_msg','')}") return "" except Exception as e: print(f" BAIDU FAIL: {e}") return "" def load_seen(name): p = os.path.join(SCRIPTS_DIR, f"seen_{name}.json") return json.load(open(p)) if os.path.exists(p) else {} def save_seen(name, seen): json.dump(seen, open(os.path.join(SCRIPTS_DIR, f"seen_{name}.json"), "w"), indent=2, ensure_ascii=False) def write_rss(filename, items, title, desc, link): os.makedirs(STATIC_DIR, exist_ok=True) now = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S +0000") rss = '\n\n\n' rss += f' {escape(title)}\n {escape(link)}\n {escape(desc)}\n {now}\n' for key, val in items[:50]: t = escape(val.get("title", "")) d = val.get("description", "") is_html = bool(re.search(r'<[a-z]+[\s>]', d)) rss += f" \n {t}\n {escape(key)}\n {escape(key)}\n" if is_html: rss += f" \n" else: rss += f" {escape(d) or t}\n" if val.get("date"): dd = val["date"].replace("\u5e74", "-").replace("\u6708", "-").replace("\u65e5", "") try: rss += datetime.strptime(dd, "%Y-%m-%d").strftime(' %a, %d %b %Y %H:%M:%S +0000\n') except: pass rss += " \n" rss += "\n" open(os.path.join(STATIC_DIR, filename), "w", encoding="utf-8").write(rss) print(f" RSS: {filename} ({len(items)} items)") # Site 1: eu.china-mission.gov.cn --------------------------------------- MISSION = "https://eu.china-mission.gov.cn/eng" class MissionHTML(HTMLParser): def __init__(self): super().__init__() self.sections = []; self.cur = None; self.in_list = False self.in_span = False; self.in_a = False; self.item = {}; self.stack = []; self.st = "" def handle_starttag(self, tag, attrs): self.stack.append(tag); a = dict(attrs) if tag == "h4": self.st = "" if tag == "div" and a.get("class") == "info": if self.cur: self.sections.append(self.cur) self.cur = {"title": "", "items": []} if self.cur and a.get("class") == "list": self.in_list = True if self.in_list and tag == "li": self.item = {"title": "", "url": "", "date": ""} if self.in_list and tag == "a" and "href" in a: self.in_a = True; self.item["url"] = a["href"] if tag == "span" and self.in_list: self.in_span = True def handle_endtag(self, tag): self.stack.pop() if tag == "h4" and self.cur: self.cur["title"] = self.st.strip() if tag == "div": self.in_list = False if tag == "li": if self.item.get("title") and self.cur: self.cur["items"].append(self.item.copy()) self.item = {} if tag == "a": self.in_a = False if tag == "span": self.in_span = False def handle_data(self, data): s = data.strip() if not s: return t = [x for x in self.stack if x not in ("div", "ul", "li")] if "h4" in t and self.cur: self.st += data if self.in_span and self.item: self.item["date"] = s.strip("()\uff08\uff09") if self.in_a and self.item and s: self.item["title"] += data.strip() def scrape_mission(): print("\n=== eu.china-mission.gov.cn ===") page_html = fetch(MISSION + "/") if not page_html: return p = MissionHTML(); p.feed(page_html) if p.cur: p.sections.append(p.cur) seen = load_seen("mission"); n = 0 for sec in p.sections: sn = sec.get("title", "Unknown") for it in sec["items"]: url = it.get("url", "") url = url if url.startswith("http") else MISSION + (url if url.startswith("/") else "/" + url) t = it.get("title", "").strip() if not t or not url: continue if url not in seen: seen[url] = {"title": t, "date": it.get("date", "")} send_ntfy(t, url, "newspaper", sn, "mission"); n += 1 write_rss("china-eu-mission.xml", list(seen.items())[::-1], "China-EU Mission", "eu.china-mission.gov.cn", MISSION) save_seen("mission", seen); print(f" New: {n}, Total: {len(seen)}") # Site 2: france.lxgz.org.cn ------------------------------------------- FRANCE = "https://france.lxgz.org.cn" def parse_france(html_text): items = [] m = re.search(r']*class="news_list"[^>]*>(.*?)', html_text, re.DOTALL) if not m: return items for li in re.findall(r']*>(.*?)', m.group(1), re.DOTALL): a = re.search(r']*href="([^"]+)"[^>]*\btitle="([^"]*)"', li) if not a: a = re.search(r']*href="([^"]+)"[^>]*>', li) if not a: continue txt = li.split("")[0] t = re.sub(r'<[^>]+>', "", txt[txt.rfind(">")+1:]) if ">" in txt else "" else: t = a.group(2) h = a.group(1) url = h if h.startswith("http") else FRANCE + ("/" + h.lstrip("/") if h.startswith("/") else "/" + h) dm = re.search(r'([^<]+)', li); d = dm.group(1) if dm else "" if re.match(r"^\d{2}-\d{2}$", d): y = re.search(r"/(\d{4})\d{10}", h) if y: d = y.group(1) + "-" + d if t: items.append({"title": t.strip(), "url": url, "date": d}) return items def scrape_france(): print("\n=== france.lxgz.org.cn ===") page_html = fetch_legacy(FRANCE + "/") if not page_html: return items = parse_france(page_html) if not items: print(" No items found"); return seen = load_seen("france"); n = 0 for it in items: if it["url"] not in seen: seen[it["url"]] = {"title": it["title"], "date": it.get("date", "")} send_ntfy(it["title"], it["url"], "flag-france", "\u65b0\u95fb\u52a8\u6001", "france"); n += 1 write_rss("france-edu.xml", list(seen.items())[::-1], "France Education News", "france.lxgz.org.cn", FRANCE) save_seen("france", seen); print(f" New: {n}, Total: {len(seen)}") # Site 3: studyinchina.edu.cn ------------------------------------------ STUDY = "https://www.studyinchina.edu.cn" STUDY_API = "https://www.studyinchina.edu.cn/api/lxzgw/cms/GetArticleLst" STUDY_TYPE = "2fe6c123b6924b01a6c7d653240c0dfb" def api_post_legacy(url, data, timeout=20): make_legacy_conf() env = os.environ.copy() env["OPENSSL_CONF"] = LEGACY_CONF body = json.dumps(data, ensure_ascii=False) code = f'import ssl, urllib.request, json\nctx = ssl.create_default_context()\nctx.check_hostname = False\nctx.verify_mode = ssl.CERT_NONE\nbody = {repr(body)}\nreq = urllib.request.Request({repr(url)}, data=body.encode(), headers={{"User-Agent": "Mozilla/5.0", "Content-Type": "application/json;charset=UTF-8", "Accept": "application/json", "Referer": "https://www.studyinchina.edu.cn/", "Origin": "https://www.studyinchina.edu.cn"}})\ntry:\n resp = urllib.request.urlopen(req, timeout={timeout}, context=ctx)\n print(resp.read().decode(), end="")\nexcept Exception as e:\n print("ERROR:" + str(e), end="")' try: r = subprocess.run(["python3", "-c", code], capture_output=True, text=True, env=env, timeout=timeout) out = r.stdout.strip() if r.returncode != 0 or not out or out.startswith("ERROR:"): if out: print(f" API ERR: {out[:200]}") return None return json.loads(out) except Exception as e: print(f" API POST ERR: {e}") return None def scrape_study(): scrape_bit_rss() print("\n=== studyinchina.edu.cn ===") payload = {"type": STUDY_TYPE, "pageNo": 1, "pageSize": 50, "isSort": 1} result = api_post_legacy(STUDY_API, payload) if not result: print(" No response"); return code = result.get("code", -1) if code != 0: print(f" API error: code={code}"); return records = result.get("data", {}).get("records", []) if not records: print(" No records found"); return print(f" API: {len(records)} records") seen = load_seen("study"); n = 0 translated = 0 for r in records: art_id = r.get("id", "") title = r.get("title", "").strip() date = (r.get("showDate") or "")[:10] content = r.get("content", "") url = f"https://www.studyinchina.edu.cn/articleDetail?arcId={art_id}" if not title or not art_id: continue if url not in seen: en_title = translate_title(title) body_text = strip_html(content) body_en = translate_body(body_text[:300]) if is_chinese(body_text) else "" seen[url] = {"title": title, "en": en_title, "date": date.replace(" ", ""), "content": content, "body_en": body_en} send_ntfy(title, url, "mortar-board", "Latest News", "study"); n += 1 translated += 1 time.sleep(1.0) elif not seen[url].get("content"): seen[url]["content"] = content # backfill: translate body_en for latest 3 without it backfilled = 0 for url, val in list(seen.items())[:5]: if backfilled >= 3: break if not val.get("body_en"): c = val.get("content", "") bt = strip_html(c) if bt and is_chinese(bt) and len(bt) > 10: val["body_en"] = translate_body(bt[:300]) backfilled += 1 time.sleep(1.0) if translated or backfilled: print(f" Translated: {translated + backfilled}") rss_items = [] for url, val in list(seen.items())[::-1]: v = val.copy() if "en" in v and v["en"] != v.get("title", ""): v["title"] = v["en"] c = val.get("content", "") if c: body_html = html_content(c, STUDY) if body_html: v["description"] = body_html[:2500] if not v.get("description"): v["description"] = val.get("title", "") rss_items.append((url, v)) write_rss("studyinchina.xml", rss_items, "Study in China News", "From studyinchina.edu.cn", STUDY) save_seen("study", seen) print(f" New: {n}, Total: {len(seen)}") # Site 4: english.bit.edu.cn ------------------------------------------ BIT = "https://english.bit.edu.cn" def scrape_bit(): print("\n=== english.bit.edu.cn ===") page_html = fetch(BIT + "/latest.html") if not page_html: return articles = re.findall(r']*href="(\d{4}-\d{2}/\d{2}/c_\d+\.htm)"[^>]*>(.*?)', page_html, re.DOTALL) seen = load_seen("bit"); n = 0 for href, text in articles: clean = re.sub(r'<[^>]+>', '', text).strip() if not clean: continue url = BIT + "/" + href dm = re.search(r"(\d{4}-\d{2}/\d{2})", href) date = dm.group(1) if dm else "" if url not in seen: body = "" detail = fetch(url) if detail: m = re.search(r']*class="Artical_Content_Text"[^>]*>(.*?)', detail, re.DOTALL) if m: body = html_content(m.group(1), BIT) seen[url] = {"title": clean, "date": date, "content": body} send_ntfy(clean, url, "university", "BIT News", "bit"); n += 1 elif not seen[url].get("content"): detail = fetch(url) if detail: m = re.search(r']*class="Artical_Content_Text"[^>]*>(.*?)', detail, re.DOTALL) if m: seen[url]["content"] = html_content(m.group(1), BIT) rss_items = [] for url, val in list(seen.items())[::-1]: v = val.copy() c = val.get("content", "") if c: v["description"] = c[:800] if not v.get("description"): v["description"] = val.get("title", "") rss_items.append((url, v)) write_rss("bit-en-news.xml", rss_items, "BIT English News", "english.bit.edu.cn", BIT) save_seen("bit", seen); print(f" New: {n}, Total: {len(seen)}") # Site 5: www.bit.edu.cn (学工天地) ------------------------------------ BIT_CN = "https://www.bit.edu.cn" def scrape_bit_cn(): print("\n=== www.bit.edu.cn (学工天地) ===") page_html = fetch(BIT_CN + "/tzgg17/wthd132/") if not page_html: return articles = re.findall(r']*href="([a-f0-9]{32}\.htm)"[^>]*>(.*?)', page_html, re.DOTALL) seen = load_seen("bit_cn"); n = 0 for href, text in articles: clean = re.sub(r'<[^>]+>', '', text).strip() if not clean: continue url = f"{BIT_CN}/tzgg17/wthd132/{href}" if url not in seen: body = "" detail = fetch(url) if detail: m = re.search(r']*class="[^"]*v_news_content[^"]*"[^>]*>(.*?)', detail, re.DOTALL) if not m: m = re.search(r']*class="content"[^>]*>(.*?)', detail, re.DOTALL) if m: body = html_content(m.group(1), BIT_CN) seen[url] = {"title": clean, "content": body} send_ntfy(clean, url, "bell", "BIT Announcements", "bit-cn"); n += 1 elif not seen[url].get("content"): detail = fetch(url) if detail: m = re.search(r']*class="[^"]*v_news_content[^"]*"[^>]*>(.*?)', detail, re.DOTALL) if not m: m = re.search(r']*class="content"[^>]*>(.*?)', detail, re.DOTALL) if m: seen[url]["content"] = html_content(m.group(1), BIT_CN) rss_items = [] for url, val in list(seen.items())[::-1]: v = val.copy() c = val.get("content", "") if c: body_text = strip_html(c) if is_chinese(body_text) and len(body_text) > 10: en_desc = translate_body(body_text[:300]) v["description"] = "

" + htmlmod.escape(en_desc[:400]) + "


" + c[:400] + "

" else: v["description"] = c[:800] if not v.get("description"): v["description"] = val.get("title", "") rss_items.append((url, v)) write_rss("bit-cn-announcements.xml", rss_items, "BIT 学工天地 Announcements", "www.bit.edu.cn", BIT_CN) save_seen("bit_cn", seen); print(f" New: {n}, Total: {len(seen)}") # Site 6: BIT all-in-one RSS feed ------------------------------------- BIT_RSS_URL = "https://haobit.top/feed.rss" def scrape_bit_rss(): print(chr(10) + '=== BIT all-in-one RSS ===') import xml.etree.ElementTree as ET page_html = fetch(BIT_RSS_URL) if not page_html: return try: root = ET.fromstring(page_html) except ET.ParseError as e: print(f' XML parse error: {e}'); return seen = load_seen('bit_rss'); n = 0 for item in root.findall('.//item'): title = (item.findtext('title') or '').strip() link = (item.findtext('link') or '').strip() pub = (item.findtext('pubDate') or '').strip() cat = (item.findtext('category') or '').strip() if not title or not link: continue if link not in seen: en_title = translate_title(title) if is_chinese(title) else title seen[link] = {'title': title, 'en': en_title, 'date': pub, 'category': cat} send_ntfy(title, link, 'school', cat or 'BIT News', 'bit-rss'); n += 1 time.sleep(1.0) rss_items = [] for link, val in list(seen.items())[::-1]: v = val.copy() if 'en' in v and v['en'] != v.get('title', ''): v['title'] = v['en'] cat = val.get('category', '') v['description'] = f'来自{cat}' if cat else 'BIT Notification' rss_items.append((link, v)) write_rss('bit-all-notifications.xml', rss_items, 'BIT All Notifications', 'haobit.top', BIT_RSS_URL) save_seen('bit_rss', seen) print(f' New: {n}, Total: {len(seen)}') # Main ------------------------------------------------------------------ if __name__ == "__main__": t = datetime.now().strftime("%Y-%m-%d %H:%M") print(f"[{t}] Scraper run") scrape_mission() scrape_france() scrape_bit() scrape_bit_cn() scrape_study() scrape_bit_rss() print("Done")