Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
pacman_mirrors.py 26.52 KiB
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors.  If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Esclapion <esclapion@manjaro.org>
#          philm <philm@manjaro.org>
#          Ramon Buldó <rbuldo@gmail.com>
#          Hugo Posnic <huluti@manjaro.org>
#          Frede Hundewadt <frede@hundewadt.dk>

"""Pacman-Mirrors Main Module"""

import argparse
import importlib.util
import os
import shutil
import subprocess
import sys
from operator import itemgetter
from random import shuffle

from pacman_mirrors import __version__
from pacman_mirrors.config import configfn
from pacman_mirrors.config import configuration as conf
from pacman_mirrors.constants import colors as color
from pacman_mirrors.constants import txt
from pacman_mirrors.functions import apifn
from pacman_mirrors.functions import filefn
from pacman_mirrors.functions import httpfn
from pacman_mirrors.functions import jsonfn
from pacman_mirrors.functions import miscfn
from pacman_mirrors.functions import validfn
from pacman_mirrors.mirrors.mirror import Mirror
from pacman_mirrors.mirrors import mirrorfn
from pacman_mirrors.translation import i18n
from pacman_mirrors.translation.custom_help_formatter import CustomHelpFormatter

try:
    importlib.util.find_spec("gi.repository.Gtk")
except ImportError:
    GTK_AVAILABLE = False
else:
    GTK_AVAILABLE = True
_ = i18n.language.gettext


class PacmanMirrors:
    """Class PacmanMirrors"""

    def __init__(self):
        """Init"""
        self.config = {
            "config_file": conf.CONFIG_FILE  # purpose - testability
        }
        self.country_list = False
        self.custom = False
        self.default = False
        self.fasttrack = None
        self.geoip = False
        self.interactive = False
        self.max_wait_time = 2
        self.mirrors = Mirror()
        self.network = True
        self.no_mirrorlist = False
        self.no_display = False
        self.quiet = False
        self.selected_countries = []  # users selected countries
        self.sync = True

    def command_line_parse(self):
        """Read the arguments of the command line"""
        parser = argparse.ArgumentParser(formatter_class=CustomHelpFormatter)
        # Method arguments
        methods = parser.add_argument_group("METHODS")
        methods.add_argument("-g", "--generate",
                             action="store_true",
                             help=txt.HLP_ARG_GENERATE)
        methods.add_argument("-f", "--fasttrack",
                             type=int,
                             metavar=txt.NUMBER,
                             help="{} {}".format(txt.HLP_ARG_FASTTRACK,
                                                 txt.OVERRIDE_OPT))
        methods.add_argument("-i", "--interactive",
                             action="store_true",
                             help=txt.HLP_ARG_INTERACTIVE)
        methods.add_argument("-d", "--default",
                             action="store_true",
                             help="Interactive: " + txt.HLP_ARG_DEFAULT)
        methods.add_argument("-m", "--method",
                             type=str,
                             choices=["rank", "random"],
                             help=txt.HLP_ARG_METHOD)
        country = parser.add_argument_group("COUNTRY")
        country.add_argument("-c", "--country",
                             type=str,
                             nargs="+",
                             help=txt.HLP_ARG_COUNTRY)
        country.add_argument("--geoip",
                             action="store_true",
                             help=txt.HLP_ARG_GEOIP)
        country.add_argument("-l", "--country-list", "--list",
                             action="store_true",
                             help=txt.HLP_ARG_LIST)
        # Branch arguments
        branch = parser.add_argument_group("BRANCH")
        only_one = branch.add_mutually_exclusive_group()
        only_one.add_argument("-b", "--branch",
                              type=str,
                              choices=["stable", "testing", "unstable"],
                              help=txt.HLP_ARG_BRANCH)
        only_one.add_argument("-G", "--get-branch",
                              action="store_true",
                              help="API: " + txt.HLP_ARG_API_GET_BRANCH)
        only_one.add_argument("-S", "--set-branch",
                              choices=["stable", "testing", "unstable"],
                              help="API: " + txt.HLP_ARG_API_SET_BRANCH)
        # Api arguments
        api = parser.add_argument_group("API")
        api.add_argument("-a", "--api",
                         action="store_true",
                         help="[-p PREFIX][-R][-S|-G BRANCH][-P PROTO [PROTO ...]]")
        api.add_argument("-p", "--prefix",
                         type=str,
                         help="API: " + txt.HLP_ARG_API_PREFIX + txt.PREFIX_TIP)
        api.add_argument("-P", "--proto", "--protocols",
                         choices=["all", "http", "https", "ftp", "ftps"],
                         type=str,
                         nargs="+",
                         help="API: " + txt.HLP_ARG_API_PROTOCOLS)
        api.add_argument("-R", "--re-branch",
                         action="store_true",
                         help="API: " + txt.HLP_ARG_API_RE_BRANCH)
        api.add_argument("-U", "--url",
                         type=str,
                         help="API: " + txt.HLP_ARG_API_URL)
        # Misc arguments
        misc = parser.add_argument_group("MISC")
        misc.add_argument("-q", "--quiet",
                          action="store_true",
                          help=txt.HLP_ARG_QUIET)
        misc.add_argument("-t", "--timeout",
                          type=int,
                          metavar=txt.SECONDS,
                          help=txt.HLP_ARG_TIMEOUT)
        misc.add_argument("-v", "--version",
                          action="store_true",
                          help=txt.HLP_ARG_VERSION)
        sync = misc.add_mutually_exclusive_group()
        sync.add_argument("-n", "--no-mirrorlist",
                          action="store_true",
                          help=txt.HLP_ARG_NO_MIRRORLIST)
        sync.add_argument("-y", "--sync",
                          action="store_true",
                          help=txt.HLP_ARG_SYNC)

        args = parser.parse_args()
        if len(sys.argv) == 1:
            print("{}pacman-mirrors {}{}".format(color.GREEN, __version__, color.ENDCOLOR))
            parser.print_help()
            sys.exit(0)

        if args.version:
            print("{}pacman-mirrors {}{}".format(color.GREEN, __version__, color.ENDCOLOR))
            sys.exit(0)

        if args.country_list:
            self.country_list = True

        if os.getuid() != 0:
            print(".: {} {}".format(txt.ERR_CLR, txt.MUST_BE_ROOT))
            sys.exit(1)

        if args.method:
            self.config["method"] = args.method

        if args.branch:
            self.config["branch"] = args.branch

        if args.timeout:
            self.max_wait_time = args.timeout

        if args.quiet:
            self.quiet = True

        if args.sync:
            self.sync = True

        if args.interactive:
            self.interactive = True
            if not os.environ.get("DISPLAY") or not GTK_AVAILABLE:
                self.no_display = True

        if args.interactive and args.default:
            self.default = True

        # geoip and country are mutually exclusive
        if args.geoip:
            self.geoip = True
        if args.country and not args.geoip:
            self.custom = True
            if "," in args.country[0]:
                self.config["only_country"] = args.country[0].split(",")
            else:
                self.config["only_country"] = args.country

        if args.fasttrack:
            self.fasttrack = args.fasttrack
            self.geoip = False
            self.custom = False
            self.config["only_country"] = []

        if args.no_mirrorlist:
            self.no_mirrorlist = True

        if args.api:
            getbranch = False
            rebranch = False
            url = args.url
            setbranch = args.set_branch
            setprotocols = bool(args.proto)
            if args.get_branch:
                getbranch = True
            if args.re_branch:
                rebranch = True
            if args.proto:
                if "all" in args.proto:
                    self.config["protocols"] = []
                else:
                    if "," in args.proto:
                        self.config["protocols"] = args.proto.split(",")
                    else:
                        self.config["protocols"] = args.proto

            self.api_config(set_prefix=args.prefix, set_branch=setbranch, re_branch=rebranch,
                            get_branch=getbranch, set_protocols=setprotocols, set_url=url)

    def api_config(self, set_prefix=None, set_branch=None, re_branch=False,
                   get_branch=False, set_protocols=False, set_url=None):
        """Api functions
        :param set_prefix: prefix to the config paths
        :param set_branch: replace branch in pacman-mirrors.conf
        :param re_branch: replace branch in mirrorlist
        :param get_branch: sys.exit with branch
        :param set_protocols: replace protocols in pacman-mirrors.conf
        :param set_url: replace mirror url in mirrorlist
        """
        if set_url is None:
            set_url = ""

        if set_prefix is None:
            set_prefix = ""

        # Order of API tasks does matter
        # First API task
        if get_branch:
            print(self.config["branch"])
            sys.exit(0)
            # sys.exit(self.config["branch"])

        # apply api configuration to internal configuration object
        # Apply prefix if present
        if set_prefix:
            set_prefix = apifn.sanitize_prefix(set_prefix)
            self.config["config_file"] = set_prefix + self.config["config_file"]
            self.config["custom_file"] = set_prefix + self.config["custom_file"]
            self.config["mirror_file"] = set_prefix + self.config["mirror_file"]
            self.config["mirror_list"] = set_prefix + self.config["mirror_list"]
            self.config["status_file"] = set_prefix + self.config["status_file"]
            self.config["work_dir"] = set_prefix + self.config["work_dir"]
            # to be removed long time after 2017-04-18
            self.config["to_be_removed"] = set_prefix + self.config["to_be_removed"]
            # end removal
        # api tasks
        # Second API task: Set branch
        if set_branch:
            # Apply branch to internal config
            self.config["branch"] = set_branch
            # pacman-mirrors.conf could absent so check for it
            if not filefn.check_file(self.config["config_file"]):
                # Copy from host system
                filefn.create_dir(set_prefix + "/etc")
                shutil.copyfile("/etc/pacman-mirrors.conf", self.config["config_file"])
                # Normalize config
                apifn.normalize_config(self.config["config_file"])
            # Write branch to config
            apifn.write_config_branch(self.config["branch"],
                                      self.config["config_file"],
                                      quiet=self.quiet)
        # Third API task: Create a mirror list
        if set_url:
            # mirror list dir could absent so check for it
            filefn.create_dir(set_prefix + "/etc/pacman.d")
            mirror = [
                {
                    "url": apifn.sanitize_url(set_url),
                    "country": "BUILDMIRROR",
                    "protocols": [set_url[:set_url.find(":")]],
                    "resp_time": "00.00"
                }
            ]
            filefn.output_mirror_list(self.config, mirror, quiet=self.quiet)
            sys.exit(0)
        # Fourth API task: Write protocols to config
        if set_protocols:
            apifn.write_protocols(self.config["protocols"],
                                  self.config["config_file"],
                                  quiet=self.quiet)
        # Fifth API taks: Rebranch the mirrorlist
        if re_branch:
            if not set_branch:
                print(".: {} {}".format(txt.ERR_CLR, txt.API_ERROR_BRANCH))
                sys.exit(1)
            apifn.write_mirrorlist_branch(self.config["branch"],
                                          self.config["config_file"],
                                          quiet=self.quiet)

    def build_common_mirror_list(self):
        """Generate common mirrorlist"""
        worklist = mirrorfn.filter_mirror_country(self.mirrors.mirrorlist,
                                                  self.selected_countries)
        if self.config["protocols"]:
            worklist = mirrorfn.filter_mirror_protocols(
                worklist, self.config["protocols"])
        # worklist = self.filter_user_branch(worklist)
        if self.config["method"] == "rank":
            worklist = self.test_mirrors(worklist)
            worklist = sorted(worklist, key=itemgetter("resp_time"))
        else:
            shuffle(worklist)
        if worklist:
            filefn.output_mirror_list(self.config, worklist, quiet=self.quiet)
            if self.custom:
                configfn.modify_config(self.config, custom=self.custom)
            else:
                configfn.modify_config(self.config, custom=self.custom)
        else:
            print(".: {} {}".format(txt.WRN_CLR, txt.NO_SELECTION))
            print(".: {} {}".format(txt.INF_CLR, txt.NO_CHANGE))

    def build_fasttrack_mirror_list(self, number):
        """Fast-track the mirrorlist by filtering only up2date mirrors on the selected branch"""
        # randomize the load on up2date mirrors
        worklist = self.mirrors.mirrorlist
        shuffle(worklist)
        if self.config["protocols"]:
            worklist = mirrorfn.filter_mirror_protocols(
                worklist, self.config["protocols"])
        # filter not up-to-date mirrors for users selected branch not all branches
        up2date = self.filter_user_branch(worklist)
        worklist = []
        print(".: {}: {} - {}".format(txt.INF_CLR,
                                      txt.QUERY_MIRRORS,
                                      txt.TAKES_TIME))
        counter = 0
        cols, lines = miscfn.terminal_size()
        for mirror in up2date:
            if not self.quiet:
                message = "   ..... {:<15}: {}: {}".format(
                    mirror["country"], mirror["last_sync"], mirror["url"])
                print("{:.{}}".format(message, cols), end="")
                sys.stdout.flush()
            resp_time = httpfn.get_mirror_response(mirror["url"],
                                                   maxwait=self.max_wait_time,
                                                   quiet=self.quiet)
            mirror["resp_time"] = resp_time
            if float(resp_time) > self.max_wait_time:
                if not self.quiet:
                    print("\r")
            else:
                if not self.quiet:
                    print("\r   {:<5}{}{} ".format(color.GREEN, resp_time, color.ENDCOLOR))
                worklist.append(mirror)
                counter += 1
            if counter == number:
                break
        worklist = sorted(worklist, key=itemgetter("resp_time"))
        if worklist:
            filefn.output_mirror_list(self.config, worklist, quiet=self.quiet)
        else:
            print(".: {} {}".format(txt.WRN_CLR, txt.NO_SELECTION))
            print(".: {} {}".format(txt.INF_CLR, txt.NO_CHANGE))

    def build_interactive_mirror_list(self):
        """Prompt the user to select the mirrors with a gui.
        * Outputs a "custom" mirror file
        * Modify the configuration file to use the "custom" file.
        * Outputs a pacman mirrorlist,
        """
        worklist = mirrorfn.filter_mirror_country(self.mirrors.mirrorlist,
                                                  self.selected_countries)
        if self.config["protocols"]:
            worklist = mirrorfn.filter_mirror_protocols(
                worklist, self.config["protocols"])
        if not self.default:
            # filter not up-to-date mirrors for selected branch
            # worklist = self.filter_user_branch(worklist)
            if self.config["method"] == "rank":
                worklist = self.test_mirrors(worklist)
                worklist = sorted(worklist, key=itemgetter("resp_time"))
            else:
                shuffle(worklist)
        interactive_list = []
        for mirror in worklist:
            for protocol in enumerate(mirror["protocols"]):
                pos = mirror["url"].find(":")
                interactive_list.append({
                    "country": mirror["country"],
                    "resp_time": mirror["resp_time"],
                    "last_sync": mirror["last_sync"],
                    "url": "{}{}".format(protocol[1], mirror["url"][pos:])
                })
        if self.no_display:
            from . import consoleui as ui
        else:
            from . import graphicalui as ui
        interactive = ui.run(interactive_list,
                             self.config["method"] == "random",
                             self.default)
        if interactive.is_done:
            mirror_list = []  # written to mirrorlist
            mirror_file = []  # written to custom-mirror.json
            custom_list = interactive.custom_list
            for item in custom_list:
                ipos = item["url"].find(":")
                iurl = item["url"][ipos:]
                for server in self.mirrors.mirrorlist:
                    spos = server["url"].find(":")
                    surl = server["url"][spos:]
                    if iurl == surl:
                        mirror_file.append({
                            "country": server["country"],
                            "protocols": server["protocols"],
                            "url": server["url"]
                        })
                        server["protocols"] = self.config["protocols"]
                        mirror_list.append(server)
            if self.default and mirror_list:
                if self.config["method"] == "rank":
                    mirror_list = self.test_mirrors(mirror_list)
                    mirror_list = sorted(mirror_list,
                                         key=itemgetter("resp_time"))
                else:
                    shuffle(mirror_list)
            if mirror_file:
                print("\n.: {} {}".format(txt.INF_CLR,
                                          txt.CUSTOM_MIRROR_LIST))
                print("--------------------------")
                # output mirror file
                jsonfn.write_json_file(mirror_file, self.config["custom_file"])
                print(".: {} {}: {}".format(txt.INF_CLR,
                                            txt.CUSTOM_MIRROR_FILE_SAVED,
                                            self.config["custom_file"]))
                # output pacman mirrorlist
                filefn.output_mirror_list(self.config,
                                          mirror_list,
                                          custom=True,
                                          quiet=self.quiet,
                                          interactive=True)
                # always use "Custom" from interactive
                self.config["only_country"] = ["Custom"]
                configfn.modify_config(self.config, custom=True)
                print(".: {} {} {}".format(txt.INF_CLR,
                                           txt.RESET_CUSTOM_CONFIG,
                                           txt.RESET_TIP))
            else:
                print(".: {} {}".format(txt.WRN_CLR, txt.NO_SELECTION))
                print(".: {} {}".format(txt.INF_CLR, txt.NO_CHANGE))

    def disable_custom_config(self):
        """Perform reset of custom configuration"""
        self.config["only_country"] = []
        self.custom = False

    def filter_user_branch(self, mirrorlist):
        """Filter mirrorlist on users branch and branch sync state"""
        if self.config["branch"] == "stable":
            selected_branch = 0
        elif self.config["branch"] == "testing":
            selected_branch = 1
        else:
            selected_branch = 2
        filtered = []
        for mirror in mirrorlist:
            if mirror["branches"][selected_branch] == 1:
                filtered.append(mirror)
        if len(filtered) > 0:
            return filtered
        return mirrorlist

    def output_country_list(self):
        """List all available countries"""
        print("{}".format("\n".join(self.mirrors.countrylist)))

    def load_all_mirrors(self):
        """Load mirrors"""
        # decision on disable custom config
        if self.config["only_country"] == ["all"]:
            self.disable_custom_config()

        # decision on custom or default
        if self.config["only_country"] == ["Custom"]:
            # check if custom config is valid
            if validfn.custom_config_is_valid():
                self.custom = True
            else:
                self.disable_custom_config()
        else:
            self.selected_countries = self.config["only_country"]

        # decision on custom vs countries from conf or argument
        if self.custom and not self.selected_countries:
            self.load_custom_mirrors()
            self.selected_countries = self.mirrors.countrylist
        else:
            self.load_default_mirrors()
        # validate selection and build country list
        self.selected_countries = mirrorfn.build_country_list(self.selected_countries,
                                                              self.mirrors.countrylist,
                                                              self.geoip)

    def load_custom_mirrors(self):
        """Load available custom mirrors"""
        if self.default:
            self.load_default_mirrors()
        else:
            self.seed_mirrors(self.config["custom_file"])

    def load_default_mirrors(self):
        """Load all available mirrors"""
        (file, status) = filefn.return_mirror_filename(self.config)
        self.seed_mirrors(file, status)

    def sort_mirror_countries(self):
        self.mirrors.mirrorlist = sorted(self.mirrors.mirrorlist,
                                         key=itemgetter("country"))
        self.mirrors.countrylist = sorted(self.mirrors.countrylist)

    def seed_mirrors(self, file, status=False):
        """Seed mirrors"""
        mirrors = filefn.read_mirror_file(file)
        # seed mirror object
        if status:
            self.mirrors.seed(mirrors, status=status)
        else:
            self.mirrors.seed(mirrors)
        # sort mirrors countrywise
        self.sort_mirror_countries()

    def test_mirrors(self, worklist):
        """Query server for response time"""
        if self.custom:
            print(".: {} {}".format(txt.INF_CLR, txt.USING_CUSTOM_FILE))
        else:
            print(".: {} {}".format(txt.INF_CLR, txt.USING_DEFAULT_FILE))
        print(".: {} {} - {}".format(txt.INF_CLR,
                                     txt.QUERY_MIRRORS,
                                     txt.TAKES_TIME))
        cols, lines = miscfn.terminal_size()
        http_wait = self.max_wait_time
        ssl_wait = self.max_wait_time * 2
        ssl_verify = self.config["ssl_verify"]
        for mirror in worklist:
            pos = mirror["url"].find(":")
            url = mirror["url"][pos:]
            for idx, proto in enumerate(mirror["protocols"]):
                mirror["url"] = "{}{}".format(proto, url)
                if not self.quiet:
                    message = "   ..... {:<15}: {}".format(mirror["country"],
                                                           mirror["url"])
                    print("{:.{}}".format(message, cols), end="")
                    sys.stdout.flush()
                # https sometimes takes a short while for handshake
                if proto == "https" or proto == "ftps":
                    self.max_wait_time = ssl_wait
                else:
                    self.max_wait_time = http_wait
                # let's see how responsive you are
                resp_time = httpfn.get_mirror_response(mirror["url"],
                                                       maxwait=self.max_wait_time,
                                                       quiet=self.quiet,
                                                       ssl_verify=ssl_verify)
                mirror["resp_time"] = resp_time
                if float(resp_time) >= self.max_wait_time:
                    if not self.quiet:
                        print("\r")
                else:
                    if not self.quiet:
                        print("\r   {:<5}{}{} ".format(color.GREEN, resp_time, color.ENDCOLOR))
        return worklist

    def run(self):
        """Run"""
        (self.config, self.custom) = configfn.build_config()  # build config by parsing p-m.conf
        filefn.create_dir(self.config["work_dir"])  # ensure /var/lib/pacman-mirrors exist
        self.command_line_parse()  # parse command line
        self.network = httpfn.inet_conn_check()  # net check
        if self.network:  # update data files
            httpfn.update_mirrors(self.config, quiet=self.quiet)
        if self.no_mirrorlist:
            sys.exit(0)  # exit
        if not self.network:
            if not self.quiet:
                miscfn.internet_message()  # console message
            self.config["method"] = "random"  # use random instead of rank
            self.fasttrack = False  # using fasttrack is not possible
        self.load_all_mirrors()
        if self.country_list:
            self.output_country_list()  # print country list
            sys.exit(0)
        if self.fasttrack:
            self.build_fasttrack_mirror_list(self.fasttrack)  # fasttrack argument
        elif self.interactive:
            self.build_interactive_mirror_list()  # interactive argument
        else:
            self.build_common_mirror_list()  # default
        if self.network and self.sync:
            subprocess.call(["pacman", "-Syy"])  # sync pacman db


if __name__ == "__main__":
    app = PacmanMirrors()
    app.run()