Initial commit
This commit is contained in:
243
mikrotik-adlist-builder.py
Normal file
243
mikrotik-adlist-builder.py
Normal file
@@ -0,0 +1,243 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import gzip
|
||||
import io
|
||||
import re
|
||||
import sys
|
||||
import urllib.request
|
||||
|
||||
from urllib.parse import urlparse, unquote
|
||||
from pathlib import Path
|
||||
from typing import Iterable, Optional, Set
|
||||
|
||||
DEFAULT_URLS = [
|
||||
# Popular blocklists that often use ABP syntax
|
||||
"https://big.oisd.nl/",
|
||||
"https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts",
|
||||
|
||||
# Custom blocklists. These may contain ABP rules,
|
||||
# but mainly plain domain syntax.
|
||||
"./custom-domains.txt",
|
||||
]
|
||||
|
||||
ABP_DOMAIN_RE = re.compile(r"^\|\|([A-Za-z0-9._-]+)\^$")
|
||||
HOSTS_SPLIT_RE = re.compile(r"\s+")
|
||||
VALID_DOMAIN_RE = re.compile(
|
||||
r"^(?=.{1,253}$)(?!-)(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z0-9-]{2,63}\.?$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
def download_text(url: str, timeout: int = 30) -> str:
|
||||
parsed = urlparse(url)
|
||||
|
||||
# Local file via file://
|
||||
if parsed.scheme == "file":
|
||||
path = Path(unquote(parsed.path))
|
||||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||||
return f.read()
|
||||
|
||||
# Local file without scheme, e.g. ./list.txt or list.txt
|
||||
if parsed.scheme == "":
|
||||
path = Path(url)
|
||||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||||
return f.read()
|
||||
|
||||
# HTTP/HTTPS
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
headers={
|
||||
"User-Agent": "mikrotik-adlist-builder/1.0",
|
||||
"Accept-Encoding": "gzip",
|
||||
},
|
||||
)
|
||||
|
||||
with urllib.request.urlopen(req, timeout=timeout) as response:
|
||||
raw = response.read()
|
||||
encoding = response.headers.get("Content-Encoding", "").lower()
|
||||
|
||||
if encoding == "gzip":
|
||||
raw = gzip.decompress(raw)
|
||||
else:
|
||||
if len(raw) >= 2 and raw[:2] == b"\x1f\x8b":
|
||||
raw = gzip.decompress(raw)
|
||||
|
||||
charset = response.headers.get_content_charset() or "utf-8"
|
||||
return raw.decode(charset, errors="replace")
|
||||
|
||||
def normalize_domain(domain: str) -> Optional[str]:
|
||||
domain = domain.strip().lower().rstrip(".")
|
||||
if not domain:
|
||||
return None
|
||||
|
||||
if domain in {"localhost", "local", "broadcasthost"}:
|
||||
return None
|
||||
|
||||
if "/" in domain or "\\" in domain:
|
||||
return None
|
||||
|
||||
if ":" in domain:
|
||||
# Ignore IPv6, ports, and similar entries
|
||||
return None
|
||||
|
||||
if domain.startswith("*."):
|
||||
domain = domain[2:]
|
||||
|
||||
if not VALID_DOMAIN_RE.match(domain):
|
||||
return None
|
||||
|
||||
return domain
|
||||
|
||||
def extract_from_abp_line(line: str) -> Optional[str]:
|
||||
# Example: ||example.com^
|
||||
m = ABP_DOMAIN_RE.match(line)
|
||||
if m:
|
||||
return normalize_domain(m.group(1))
|
||||
|
||||
# Some variants may omit the trailing ^
|
||||
if line.startswith("||"):
|
||||
candidate = line[2:]
|
||||
for sep in ["^", "/", "$"]:
|
||||
if sep in candidate:
|
||||
candidate = candidate.split(sep, 1)[0]
|
||||
return normalize_domain(candidate)
|
||||
|
||||
return None
|
||||
|
||||
def extract_from_hosts_line(line: str) -> Set[str]:
|
||||
result: Set[str] = set()
|
||||
|
||||
# Remove inline comment
|
||||
line = line.split("#", 1)[0].strip()
|
||||
if not line:
|
||||
return result
|
||||
|
||||
parts = HOSTS_SPLIT_RE.split(line)
|
||||
if len(parts) < 2:
|
||||
return result
|
||||
|
||||
first = parts[0].lower()
|
||||
|
||||
# Common hosts file IP prefixes
|
||||
if first in {"0.0.0.0", "127.0.0.1", "::1", "::", "255.255.255.255"}:
|
||||
for item in parts[1:]:
|
||||
d = normalize_domain(item)
|
||||
if d:
|
||||
result.add(d)
|
||||
|
||||
return result
|
||||
|
||||
def extract_plain_domain(line: str) -> Optional[str]:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
return None
|
||||
|
||||
if line.startswith(("!", "#", "[")):
|
||||
return None
|
||||
|
||||
if line.startswith("@@"):
|
||||
# Ignore whitelist rules
|
||||
return None
|
||||
|
||||
if line.startswith(("||", "|")):
|
||||
return None
|
||||
|
||||
if any(x in line for x in [" ", "\t", "/", "^", "$"]):
|
||||
return None
|
||||
|
||||
return normalize_domain(line)
|
||||
|
||||
def extract_domains(text: str) -> Set[str]:
|
||||
domains: Set[str] = set()
|
||||
|
||||
for raw_line in io.StringIO(text):
|
||||
line = raw_line.strip()
|
||||
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# Skip comments and metadata
|
||||
if line.startswith(("!", "#", "[")):
|
||||
continue
|
||||
|
||||
# Skip ABP whitelist rules
|
||||
if line.startswith("@@"):
|
||||
continue
|
||||
|
||||
# 1) ABP syntax
|
||||
d = extract_from_abp_line(line)
|
||||
if d:
|
||||
domains.add(d)
|
||||
continue
|
||||
|
||||
# 2) hosts file syntax
|
||||
hosts_domains = extract_from_hosts_line(line)
|
||||
if hosts_domains:
|
||||
domains.update(hosts_domains)
|
||||
continue
|
||||
|
||||
# 3) plain domain syntax
|
||||
d = extract_plain_domain(line)
|
||||
if d:
|
||||
domains.add(d)
|
||||
continue
|
||||
|
||||
return domains
|
||||
|
||||
def build_output(urls: Iterable[str], output_file: str) -> int:
|
||||
all_domains: Set[str] = set()
|
||||
|
||||
for url in urls:
|
||||
print(f"[INFO] Downloading: {url}", file=sys.stderr)
|
||||
try:
|
||||
text = download_text(url)
|
||||
domains = extract_domains(text)
|
||||
print(f"[INFO] Domains found: {len(domains)}", file=sys.stderr)
|
||||
all_domains.update(domains)
|
||||
except Exception as exc:
|
||||
print(f"[ERROR] {url}: {exc}", file=sys.stderr)
|
||||
|
||||
sorted_domains = sorted(all_domains)
|
||||
|
||||
with open(output_file, "w", encoding="utf-8", newline="\n") as f:
|
||||
for domain in sorted_domains:
|
||||
f.write(f"0.0.0.0 {domain}\n")
|
||||
|
||||
return len(sorted_domains)
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Download multiple blocklists from URLs and create one MikroTik adlist in '0.0.0.0 domain' format."
|
||||
)
|
||||
parser.add_argument(
|
||||
"-u",
|
||||
"--url",
|
||||
action="append",
|
||||
dest="urls",
|
||||
help="Blocklist URL. Can be used multiple times.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o",
|
||||
"--output",
|
||||
default="adlist.txt",
|
||||
help="Output file.",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
urls = args.urls or DEFAULT_URLS
|
||||
|
||||
if not urls:
|
||||
print("Error: no URLs were provided. Use -u URL or edit DEFAULT_URLS.", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
count = build_output(urls, args.output)
|
||||
print(f"[OK] Output written to: {args.output}", file=sys.stderr)
|
||||
print(f"[OK] Total unique domains: {count}", file=sys.stderr)
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user