Merge pull request #64 from Aerglonus/dev

fix(proxies): Fixes WindscribeVPN server authentication
This commit is contained in:
Sp5rky
2026-01-30 08:19:27 -07:00
committed by GitHub
4 changed files with 128 additions and 64 deletions

View File

@@ -58,11 +58,25 @@ from unshackle.core.titles.episode import Episode
from unshackle.core.tracks import Audio, Subtitle, Tracks, Video
from unshackle.core.tracks.attachment import Attachment
from unshackle.core.tracks.hybrid import Hybrid
from unshackle.core.utilities import (find_font_with_fallbacks, get_debug_logger, get_system_fonts, init_debug_logger,
is_close_match, suggest_font_packages, time_elapsed_since)
from unshackle.core.utilities import (
find_font_with_fallbacks,
get_debug_logger,
get_system_fonts,
init_debug_logger,
is_close_match,
suggest_font_packages,
time_elapsed_since,
)
from unshackle.core.utils import tags
from unshackle.core.utils.click_types import (LANGUAGE_RANGE, QUALITY_LIST, SEASON_RANGE, ContextData, MultipleChoice,
SubtitleCodecChoice, VideoCodecChoice)
from unshackle.core.utils.click_types import (
LANGUAGE_RANGE,
QUALITY_LIST,
SEASON_RANGE,
ContextData,
MultipleChoice,
SubtitleCodecChoice,
VideoCodecChoice,
)
from unshackle.core.utils.collections import merge_dict
from unshackle.core.utils.subprocess import ffprobe
from unshackle.core.vaults import Vaults
@@ -673,8 +687,10 @@ class dl:
# requesting proxy from a specific proxy provider
requested_provider, proxy = proxy.split(":", maxsplit=1)
# Match simple region codes (us, ca, uk1) or provider:region format (nordvpn:ca, windscribe:us)
if re.match(r"^[a-z]{2}(?:\d+)?$", proxy, re.IGNORECASE) or re.match(
r"^[a-z]+:[a-z]{2}(?:\d+)?$", proxy, re.IGNORECASE
if (
requested_provider
or re.match(r"^[a-z]{2}(?:\d+)?$", proxy, re.IGNORECASE)
or re.match(r"^[a-z]+:[a-z]{2}(?:\d+)?$", proxy, re.IGNORECASE)
):
proxy = proxy.lower()
status_msg = (

View File

@@ -1,8 +1,10 @@
import base64
import json
import random
import re
from typing import Optional
from urllib.parse import quote
import requests
from unshackle.core.proxies.proxy import Proxy
@@ -14,8 +16,10 @@ class WindscribeVPN(Proxy):
Proxy Service using WindscribeVPN Service Credentials.
A username and password must be provided. These are Service Credentials, not your Login Credentials.
The Service Credentials can be found here: https://windscribe.com/getconfig/openvpn
The Service Credentials can be found login in through the Windscribe Extension.
Both username and password are Base64 encoded.
"""
if not username:
raise ValueError("No Username was provided to the WindscribeVPN Proxy Service.")
if not password:
@@ -24,12 +28,22 @@ class WindscribeVPN(Proxy):
if server_map is not None and not isinstance(server_map, dict):
raise TypeError(f"Expected server_map to be a dict mapping a region to a hostname, not '{server_map!r}'.")
self.username = username
self.password = password
self.username = self._try_decode(username)
self.password = self._try_decode(password)
self.server_map = server_map or {}
self.countries = self.get_countries()
@staticmethod
def _try_decode(value: str) -> str:
"""
Attempt to decode a Base64 string, returning original if failed.
"""
try:
return base64.b64decode(value).decode("utf-8")
except Exception:
return value
def __repr__(self) -> str:
countries = len(set(x.get("country_code") for x in self.countries if x.get("country_code")))
servers = sum(
@@ -44,10 +58,11 @@ class WindscribeVPN(Proxy):
def get_proxy(self, query: str) -> Optional[str]:
"""
Get an HTTPS proxy URI for a WindscribeVPN server.
Supports:
- Country code: "us", "ca", "gb"
- City selection: "us:seattle", "ca:toronto"
- Country code: "us", "ca", "gb"
- City selection: "us:seattle", "ca:toronto"
- Server code: "us-central-096", "uk-london-055"
Note: Windscribes static OpenVPN credentials from the configurator are per server use the extension credentials.
"""
query = query.lower()
city = None
@@ -57,69 +72,94 @@ class WindscribeVPN(Proxy):
query, city = query.split(":", maxsplit=1)
city = city.strip()
# Check server_map for pinned servers (can include city)
safe_username = quote(self.username, safe="")
safe_password = quote(self.password, safe="")
proxy = f"https://{safe_username}:{safe_password}@"
server_map_key = f"{query}:{city}" if city else query
if server_map_key in self.server_map:
hostname = self.server_map[server_map_key]
elif query in self.server_map and not city:
hostname = self.server_map[query]
else:
if re.match(r"^[a-z]+$", query):
hostname = self.get_random_server(query, city)
try:
if server_map_key in self.server_map:
# Use a forced server from server_map if provided
hostname = f"{self.server_map[server_map_key]}.totallyacdn.com"
elif "-" in query and not city:
# Supports server codes like "windscribe:us-central-096"
hostname = f"{query}.totallyacdn.com"
else:
raise ValueError(f"The query provided is unsupported and unrecognized: {query}")
# Query is likely a country code (e.g., "us") or country+city (e.g., "us:seattle") and not in server_map
if re.match(r"^[a-z]+$", query):
hostname = self.get_random_server(query, city)
else:
raise ValueError(f"The query provided is unsupported and unrecognized: {query}")
except ValueError as e:
raise Exception(f"Windscribe Proxy Error: {e}")
if not hostname:
raise Exception(f"Windscribe has no servers for {query!r}")
if not hostname:
return None
return f"{proxy}{hostname}:443"
hostname = hostname.split(':')[0]
return f"https://{self.username}:{self.password}@{hostname}:443"
def get_random_server(self, country_code: str, city: Optional[str] = None) -> Optional[str]:
def get_random_server(self, country_code: str, city: Optional[str]) -> Optional[str]:
"""
Get a random server hostname for a country, optionally filtered by city.
Get a random server hostname for a country.
Args:
country_code: The country code (e.g., "us", "ca")
city: Optional city name to filter by (case-insensitive)
Returns:
A random hostname from matching servers, or None if none available.
The hostname of a server in the specified country (and city if provided).
- If city is provided but not found, falls back to any server in the country.
Raise error if no servers are available for the country.
"""
for location in self.countries:
if location.get("country_code", "").lower() == country_code.lower():
hostnames = []
for group in location.get("groups", []):
# Filter by city if specified
if city:
group_city = group.get("city", "")
if group_city.lower() != city.lower():
continue
# Collect hostnames from this group
for host in group.get("hosts", []):
if hostname := host.get("hostname"):
hostnames.append(hostname)
country_code = country_code.lower()
if hostnames:
return random.choice(hostnames)
elif city:
# No servers found for the specified city
raise ValueError(
f"No servers found in city '{city}' for country code '{country_code}'. "
"Try a different city or check the city name spelling."
)
# Find the country entry
location = next(
(c for c in self.countries if c.get("country_code", "").lower() == country_code),
None,
)
return None
if not location:
raise ValueError(f"No servers found for country code '{country_code}'.")
all_hosts = []
city_hosts = []
for group in location.get("groups", []):
group_city = group.get("city", "").lower()
for host in group.get("hosts", []):
entry = {
"hostname": host["hostname"],
"health": host.get("health", float("inf")),
}
all_hosts.append(entry)
if city and group_city == city.lower():
city_hosts.append(entry)
# Prefer city-specific servers if available and select the healthiest
if city_hosts:
return min(city_hosts, key=lambda x: x["health"])["hostname"]
# Fallback to country-level servers and select the healthiest
if all_hosts:
return min(all_hosts, key=lambda x: x["health"])["hostname"]
# Country exists but has zero servers
raise ValueError(
f"No servers found in city '{city}' for country code '{country_code}'. Try a different city or check the city name spelling."
)
@staticmethod
def get_countries() -> list[dict]:
"""Get a list of available Countries and their metadata."""
res = requests.get(
url="https://assets.windscribe.com/serverlist/firefox/1/1",
url="https://assets.windscribe.com/serverlist/chrome/1/937dd9fcfba6925d7a9253ab34e655a453719e02",
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Content-Type": "application/json",
"Host": "assets.windscribe.com",
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
},
)
if not res.ok:

View File

@@ -89,7 +89,9 @@ class Service(metaclass=ABCMeta):
proxy = mapped_proxy_uri
self.log.info(f"Using mapped proxy from {proxy_provider.__class__.__name__}: {proxy}")
else:
self.log.warning(f"Failed to get proxy for mapped value '{mapped_value}', using default")
self.log.warning(
f"Failed to get proxy for mapped value '{mapped_value}', using default"
)
else:
self.log.warning(f"Proxy provider '{proxy_provider_name}' not found, using default proxy")
else:
@@ -140,11 +142,11 @@ class Service(metaclass=ABCMeta):
}
)
# Always verify proxy IP - proxies can change exit nodes
try:
proxy_ip_info = get_ip_info(self.session)
self.current_region = proxy_ip_info.get("country", "").lower() if proxy_ip_info else None
except Exception as e:
self.log.warning(f"Failed to verify proxy IP: {e}")
proxy_ip_info = get_ip_info(self.session)
if proxy_ip_info:
self.current_region = proxy_ip_info.get("country", "").lower()
else:
self.log.warning("Failed to verify proxy IP, falling back to proxy config for region")
# Fallback to extracting region from proxy config
self.current_region = get_region_from_proxy(proxy)
else:

View File

@@ -359,7 +359,13 @@ def get_ip_info(session: Optional[requests.Session] = None) -> dict:
If you provide a Requests Session with a Proxy, that proxies IP information
is what will be returned.
"""
return (session or requests.Session()).get("https://ipinfo.io/json").json()
try:
response = (session or requests.Session()).get("https://ipinfo.io/json", timeout=10)
if response.ok:
return response.json()
except (requests.RequestException, json.JSONDecodeError):
pass
return None
def get_cached_ip_info(session: Optional[requests.Session] = None) -> Optional[dict]: