Skip to content
Snippets Groups Projects
httpfn.py 7.21 KiB
Newer Older
Frede H's avatar
Frede H committed
#!/usr/bin/env python
#
FH's avatar
FH committed
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors.  If not, see <http://www.gnu.org/licenses/>.
#
FH's avatar
FH committed
# Authors: Frede Hundewadt <frede@hundewadt.dk>
FH's avatar
FH committed

"""Manjaro-Mirrors HTTP Functions"""
FH's avatar
FH committed
import collections
FH's avatar
FH committed
import json
import ssl
FH's avatar
FH committed
import time
from pacman_mirrors import __version__
FH's avatar
FH committed
from http.client import HTTPException
import os
FH's avatar
FH committed
from os import system as system_call
FH's avatar
FH committed
from socket import timeout
FH's avatar
FH committed
from urllib.error import URLError
from urllib.request import urlopen
import urllib.request
Frede H's avatar
Frede H committed
from . import configuration as conf
from . import filefn
from . import jsonfn
Frede H's avatar
Frede H committed
from . import miscfn
FH's avatar
FH committed
from . import txt
Frede H's avatar
Frede H committed
def download_mirrors(config):
    """Retrieve mirrors from manjaro.org
Frede H's avatar
Frede H committed
    :param config:
Frede H's avatar
Frede H committed
    :returns: tuple with success for mirrors.json and status.json
    :rtype: tuple
Frede H's avatar
Frede H committed
    fetchmirrors = False
    fetchstatus = False
    try:
Frede H's avatar
Frede H committed
        with urlopen(config["url_mirrors_json"]) as response:
Frede H's avatar
Frede H committed
            mirrorlist = json.loads(response.read().decode("utf8"), object_pairs_hook=collections.OrderedDict)
Frede H's avatar
Frede H committed
        fetchmirrors = True
Frede H's avatar
Frede H committed
        tempfile = config["work_dir"] + "/temp.file"
Frede H's avatar
Frede H committed
        jsonfn.json_dump_file(mirrorlist, tempfile)
Frede H's avatar
Frede H committed
        filecmp.clear_cache()
        if not filecmp.cmp(tempfile, config["mirror_file"]):
            jsonfn.json_dump_file(mirrorlist, config["mirror_file"])
        os.remove(tempfile)
Frede H's avatar
Frede H committed
    except (HTTPException, json.JSONDecodeError, URLError):
        pass
    try:
Frede H's avatar
Frede H committed
        with urlopen(config["url_status_json"]) as response:
Frede H's avatar
Frede H committed
            statuslist = json.loads(
                response.read().decode("utf8"),
                object_pairs_hook=collections.OrderedDict)
Frede H's avatar
Frede H committed
        fetchstatus = True
        jsonfn.write_json_file(statuslist, config["status_file"])
Frede H's avatar
Frede H committed
    except (HTTPException, json.JSONDecodeError, URLError):
        pass
    return fetchmirrors, fetchstatus
def get_geoip_country():
    """Try to get the user country via GeoIP
    :return: country name or nothing
    """
    country_name = None
    try:
        res = urlopen("http://freegeoip.net/json/")
        json_obj = json.loads(res.read().decode("utf8"))
    except (URLError, HTTPException, json.JSONDecodeError):
        pass
    else:
        if "country_name" in json_obj:
            country_name = json_obj["country_name"]
            country_fix = {
                "Brazil": "Brasil",
                "Costa Rica": "Costa_Rica",
                "Czech Republic": "Czech",
                "South Africa": "Africa",
                "United Kingdom": "United_Kingdom",
                "United States": "United_States",
            }
            if country_name in country_fix.keys():
                country_name = country_fix[country_name]
    return country_name
FH's avatar
FH committed

FH's avatar
FH committed

def get_mirror_response(url, maxwait=2, count=1, quiet=False, ssl_verify=True):
    """Query mirrors availability
    :param ssl_verify: 
    :param ssl_verify: 
    :param url:
    :param maxwait:
    :param count:
    :param quiet:
    :returns string with response time
    """
    probe_start = time.time()
    response_time = txt.SERVER_RES
    probe_stop = None
    message = ""
    # context = None
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    headers = {"User-Agent": "pacman-mirrors {}".format(__version__)}
    if not ssl_verify:
        # context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
        context.verify_mode = ssl.CERT_NONE
    req = urllib.request.Request(
        url + "state", headers=headers
    )
Frede H's avatar
Frede H committed
    # noinspection PyBroadException
    try:
        for _ in range(count):
            urlopen(req, timeout=maxwait, context=context)
        probe_stop = time.time()
    except URLError as err:
        if hasattr(err, "reason"):
            message = "\n.: {} {} '{}'".format(txt.ERR_CLR,
                                               err.reason,
                                               url)
        elif hasattr(err, "code"):
            message = "\n.: {} {} '{}'".format(txt.ERR_CLR,
                                               err.reason,
                                               url)
    except timeout:
        message = "\n.: {} {} '{}'".format(txt.ERR_CLR,
                                           txt.TIMEOUT,
                                           url)
    except HTTPException:
        message = "\n.: {} {} '{}'".format(txt.ERR_CLR,
                                           txt.HTTP_EXCEPTION,
                                           url)
Frede H's avatar
Frede H committed
    except:
        message = "\n.: {} {} '{}'".format(txt.ERR_CLR,
                                           ssl.CertificateError,
                                           url)
    if message and not quiet:
        print(message)
    if probe_stop:
        calc = round((probe_stop - probe_start), 3)
        response_time = str(format(calc, ".3f"))
    return response_time
Frede H's avatar
Frede H committed

Frede H's avatar
Frede H committed
def inet_conn_check(maxwait=2):
Frede H's avatar
Frede H committed
    """Check for internet connection"""
Frede H's avatar
Frede H committed
    data = None
Frede H's avatar
Frede H committed
    hosts = conf.INET_CONN_CHECK_URLS
    for host in hosts:
        # noinspection PyBroadException
        try:
            data = urlopen(host, timeout=maxwait)
            break
        except:
            pass
Frede H's avatar
Frede H committed
    return bool(data)
Frede H's avatar
Frede H committed

def ping_host(host, count=1):
    """Check a hosts availability
    :param host:
    :param count:
    :rtype: boolean
    """
Frede H's avatar
Frede H committed
    print(".: {} ping {} x {}".format(txt.INF_CLR, host, count))
    return system_call("ping -c{} {} > /dev/null".format(count, host)) == 0


Frede H's avatar
Frede H committed
def update_mirrors(config):
    """Download updates from repo.manjaro.org
    :param config:
Frede H's avatar
Frede H committed
    :returns: tuple with result for mirrors.json and status.json
    :rtype: tuple
Frede H's avatar
Frede H committed
    """
    # one fallback file in /usr/share/pacman-mirrors should be enough.
    # the mirrors.json in /var/lib/pacman-mirrors is confusing.
    # so remove it
    if filefn.check_file(config["to_be_removed"]):
        os.remove(config["to_be_removed"])
Frede H's avatar
Frede H committed
    result = None
Frede H's avatar
Frede H committed
    connected = inet_conn_check()
Frede H's avatar
Frede H committed
    if connected:
Frede H's avatar
Frede H committed
        print(".: {} {} {}".format(txt.INF_CLR,
                                   txt.DOWNLOADING_MIRROR_FILE,
                                   txt.REPO_SERVER))
Frede H's avatar
Frede H committed
        result = download_mirrors(config)
    else:
        if not filefn.check_file(config["status_file"]):
            print(".: {} {} {} {}".format(txt.WRN_CLR,
                                          txt.MIRROR_FILE,
                                          config["status_file"],
                                          txt.IS_MISSING))
            print(".: {} {} {}".format(txt.WRN_CLR,
                                       txt.FALLING_BACK,
                                       conf.MIRROR_FILE))
Frede H's avatar
Frede H committed
            result = (True, False)
        if not filefn.check_file(config["mirror_file"]):
            print(".: {} {}".format(txt.ERR_CLR, txt.HOUSTON))
Frede H's avatar
Frede H committed
            result = (False, False)
    return result