Newer
Older
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
from pacman_mirrors import __version__
from pacman_mirrors.config import configuration as conf
from pacman_mirrors.constants import txt
from pacman_mirrors.functions import fileFn
from pacman_mirrors.functions import jsonFn
headers = {"User-Agent": "{}{}".format(conf.USER_AGENT, __version__)}
:returns: tuple with bool for mirrors.json and status.json
# mirrors.json
req = urllib.request.Request(url=config["url_mirrors_json"],
headers=headers)
with urllib.request.urlopen(req) as response:
mirrorlist = json.loads(response.read().decode("utf8"),
object_pairs_hook=collections.OrderedDict)
tempfile = config["work_dir"] + "/.temp.file"
if fileFn.check_existance_of(conf.USR_DIR, folder=True):
if not fileFn.check_existance_of(config["mirror_file"]):
jsonFn.json_dump_file(mirrorlist, config["mirror_file"])
elif not filecmp.cmp(tempfile, config["mirror_file"]):
jsonFn.json_dump_file(mirrorlist, config["mirror_file"])
except (HTTPException, json.JSONDecodeError, URLError):
pass
try:
# status.json
req = urllib.request.Request(url=config["url_status_json"],
headers=headers)
with urllib.request.urlopen(req) as response:
statuslist = json.loads(
response.read().decode("utf8"),
object_pairs_hook=collections.OrderedDict)
jsonFn.write_json_file(statuslist, config["status_file"])
def get_geoip_country():
"""Try to get the user country via GeoIP
:return: country name or nothing
"""
country_name = None
try:
req = urllib.request.Request(url="http://freegeoip.net/json/",
headers=headers)
res = urllib.request.urlopen(req)
json_obj = json.loads(res.read().decode("utf8"))
except (URLError, HTTPException, json.JSONDecodeError):
pass
else:
if "country_name" in json_obj:
country_name = json_obj["country_name"]
country_fix = {
"Brazil": "Brasil",
"Costa Rica": "Costa_Rica",
"Czech Republic": "Czech",
"South Africa": "Africa",
"United Kingdom": "United_Kingdom",
"United States": "United_States",
}
if country_name in country_fix.keys():
def get_mirror_response(url, config, maxwait=2, count=1, quiet=False, ssl_verify=True):
:param url:
:param maxwait:
:param count:
:param quiet:
:returns string with response time
"""
probe_start = time.time()
response_time = txt.SERVER_RES
probe_stop = None
message = ""
arch = "x86_64"
if config["x32"]:
arch = "i686"
probe_url = "{}{}/core/{}/{}".format(url, config["branch"], arch, conf.TEST_FILE)
# context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
req = urllib.request.Request(url=probe_url, headers=headers)
response = urllib.request.urlopen(req, timeout=maxwait, context=context)
_ = response.read()
probe_stop = time.time()
except URLError as err:
if hasattr(err, "reason"):
message = "\n.: {} {} '{}'".format(txt.ERR_CLR,
err.reason,
url)
message = "\n.: {} {} '{}'".format(txt.ERR_CLR,
err.reason,
url)
message = "\n.: {} {} '{}'".format(txt.ERR_CLR,
txt.TIMEOUT,
url)
message = "\n.: {} {} '{}'".format(txt.ERR_CLR,
txt.HTTP_EXCEPTION,
url)
message = "\n.: {} {} '{}'".format(txt.ERR_CLR,
ssl.CertificateError,
url)
except Exception as e:
message = "\n.: {} {} '{}'".format(txt.ERR_CLR,
e,
url)
if message and not quiet:
print(message)
if probe_stop:
calc = round((probe_stop - probe_start), 3)
response_time = str(format(calc, ".3f"))
return response_time
hosts = conf.INET_CONN_CHECK_URLS
for host in hosts:
# noinspection PyBroadException
try:
resp = urllib.request.urlopen(host, timeout=maxwait)
except Exception as e:
print(".: {} {} '{}'".format(txt.WRN_CLR, host, e))
def ping_host(host, count=1):
"""Check a hosts availability
:param host:
:param count:
:rtype: boolean
"""
print(".: {} ping {} x {}".format(txt.INF_CLR, host, count))
return system_call("ping -c{} {} > /dev/null".format(count, host)) == 0
"""Download updates from repo.manjaro.org
:param config:
:returns: tuple with result for mirrors.json and status.json
:rtype: tuple
if not quiet:
print(".: {} {} {}".format(txt.INF_CLR,
txt.DOWNLOADING_MIRROR_FILE,
txt.REPO_SERVER))
if not fileFn.check_existance_of(config["status_file"]):
if not quiet:
print(".: {} {} {} {}".format(txt.WRN_CLR,
txt.MIRROR_FILE,
config["status_file"],
txt.IS_MISSING))
print(".: {} {} {}".format(txt.WRN_CLR,
txt.FALLING_BACK,
conf.MIRROR_FILE))
if not fileFn.check_existance_of(config["mirror_file"]):
if not quiet:
print(".: {} {}".format(txt.ERR_CLR, txt.HOUSTON))