Newer
Older
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
from pacman_mirrors import __version__
from pacman_mirrors.config import configuration as conf
from pacman_mirrors.functions import fileFn
from pacman_mirrors.functions import jsonFn
headers = {"User-Agent": "{}{}".format(conf.USER_AGENT, __version__)}
def download_mirrors(config: object) -> tuple:
:returns: tuple with bool for mirrors.json and status.json
# mirrors.json
req = urllib.request.Request(url=config["url_mirrors_json"],
headers=headers)
with urllib.request.urlopen(req) as response:
mirrorlist = json.loads(response.read().decode("utf8"),
object_pairs_hook=collections.OrderedDict)
tempfile = config["work_dir"] + "/.temp.file"
if fileFn.check_existance_of(conf.USR_DIR, folder=True):
if not fileFn.check_existance_of(config["mirror_file"]):
jsonFn.json_dump_file(mirrorlist, config["mirror_file"])
elif not filecmp.cmp(tempfile, config["mirror_file"]):
jsonFn.json_dump_file(mirrorlist, config["mirror_file"])
except (HTTPException, json.JSONDecodeError, URLError):
pass
try:
# status.json
req = urllib.request.Request(url=config["url_status_json"],
headers=headers)
with urllib.request.urlopen(req) as response:
statuslist = json.loads(
response.read().decode("utf8"),
object_pairs_hook=collections.OrderedDict)
jsonFn.write_json_file(statuslist, config["status_file"])
"""Try to get the user country via GeoIP
:return: country name or nothing
"""
req = urllib.request.Request(url="https://get.geojs.io/v1/ip/geo.json")
json_obj = json.loads(res.read().decode("utf8"))
for country in timezones.countries:
if tz in country["timezones"]:
country_name = country["name"]
country_fix = {
"Brazil": "Brasil",
"Costa Rica": "Costa_Rica",
"Czech Republic": "Czech",
"South Africa": "Africa",
"United Kingdom": "United_Kingdom",
"United States": "United_States",
}
if country_name in country_fix.keys():
country_name = country_fix[country_name]
except (URLError, HTTPException, json.JSONDecodeError):
pass
def get_mirror_response(url: str, config: object, tty: bool = False, maxwait: int = 2,
count: int = 1, quiet: bool = False, ssl_verify: bool = True) -> float:
:param url:
:param maxwait:
:param count:
:param quiet:
:returns string with response time
"""
probe_start = time.time()
response_time = txt.SERVER_RES
probe_stop = None
message = ""
arch = "x86_64"
if config["x32"]:
arch = "i686"
probe_url = "{}{}/core/{}/{}".format(url, config["branch"], arch, config["test_file"])
# context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
req = urllib.request.Request(url=probe_url, headers=headers)
response = urllib.request.urlopen(req, timeout=maxwait, context=context)
_ = response.read()
probe_stop = time.time()
except URLError as err:
if hasattr(err, "reason"):
message = f"{ssl.CertificateError} '{url}'"
util.msg(message=message, urgency=txt.ERR_CLR, tty=tty, newline=True)
# calc = round((probe_stop - probe_start), 3)
response_time = round((probe_stop - probe_start), 3)
def inet_conn_check(tty: bool = False, maxwait: int = 2) -> bool:
"""Check for internet connection
:param maxwait:
:param tty:
"""
hosts = conf.INET_CONN_CHECK_URLS
for host in hosts:
# noinspection PyBroadException
try:
resp = urllib.request.urlopen(host, timeout=maxwait)
util.msg(f"{host} '{e}'", urgency=txt.WRN_CLR, tty=tty)
def ping_host(host: str, tty: bool=False, count=1) -> bool:
"""Check a hosts availability
:param host:
:param count:
util.msg(f"ping {host} x {count}", urgency=txt.INF_CLR, tty=tty)
return system_call("ping -c{} {} > /dev/null".format(count, host)) == 0
def update_mirror_pool(config: object, tty: bool = False, quiet: bool = False) -> tuple:
"""Download updates from repo.manjaro.org
:param config:
:param tty:
:returns: tuple with True/False for mirrors.json and status.json
util.msg(message=f"{txt.DOWNLOADING_MIRROR_FILE} {txt.REPO_SERVER}",
urgency=txt.INF_CLR,
tty=tty)
if not fileFn.check_existance_of(config["status_file"]):
util.msg(message="{} {} {)".format(txt.MIRROR_FILE,
config["status_file"],
txt.IS_MISSING),
urgency=txt.WRN_CLR,
tty=tty)
util.msg(message=f"{txt.FALLING_BACK} {conf.MIRROR_FILE}",
urgency=txt.WRN_CLR,
tty=tty)
if not fileFn.check_existance_of(config["mirror_file"]):
util.msg(message=f"{txt.HOUSTON}",
urgency=txt.HOUSTON,
tty=tty)