#!/usr/bin/env python # # This file is part of pacman-mirrors. # # pacman-mirrors is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # pacman-mirrors is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>. # # Authors: Esclapion <esclapion@manjaro.org> # philm <philm@manjaro.org> # Ramon Buldó <rbuldo@gmail.com> # Hugo Posnic <huluti@manjaro.org> # Frede Hundewadt <frede@hundewadt.dk> """Pacman-Mirrors Main Module""" import argparse import importlib.util import os import shutil import subprocess import sys from operator import itemgetter from random import shuffle from pacman_mirrors import __version__ from pacman_mirrors.config import configfn from pacman_mirrors.config import configuration as conf from pacman_mirrors.constants import colors as color from pacman_mirrors.constants import txt from pacman_mirrors.functions import apifn from pacman_mirrors.functions import filefn from pacman_mirrors.functions import httpfn from pacman_mirrors.functions import jsonfn from pacman_mirrors.functions import miscfn from pacman_mirrors.functions import validfn from pacman_mirrors.mirrors.mirror import Mirror from pacman_mirrors.mirrors import mirrorfn from pacman_mirrors.translation import i18n from pacman_mirrors.translation.custom_help_formatter import CustomHelpFormatter try: importlib.util.find_spec("gi.repository.Gtk") except ImportError: GTK_AVAILABLE = False else: GTK_AVAILABLE = True _ = i18n.language.gettext class PacmanMirrors: """Class PacmanMirrors""" def __init__(self): """Init""" self.config = { "config_file": conf.CONFIG_FILE # purpose - testability } self.country_list = False self.custom = False self.default = False self.fasttrack = None self.geoip = False self.interactive = False self.max_wait_time = 2 self.mirrors = Mirror() self.network = True self.no_mirrorlist = False self.no_display = False self.quiet = False self.selected_countries = [] # users selected countries self.sync = True def command_line_parse(self): """Read the arguments of the command line""" parser = argparse.ArgumentParser(formatter_class=CustomHelpFormatter) # Method arguments methods = parser.add_argument_group("METHODS") methods.add_argument("-g", "--generate", action="store_true", help=txt.HLP_ARG_GENERATE) methods.add_argument("-f", "--fasttrack", type=int, metavar=txt.NUMBER, help="{} {}".format(txt.HLP_ARG_FASTTRACK, txt.OVERRIDE_OPT)) methods.add_argument("-i", "--interactive", action="store_true", help=txt.HLP_ARG_INTERACTIVE) methods.add_argument("-d", "--default", action="store_true", help="Interactive: " + txt.HLP_ARG_DEFAULT) methods.add_argument("-m", "--method", type=str, choices=["rank", "random"], help=txt.HLP_ARG_METHOD) country = parser.add_argument_group("COUNTRY") country.add_argument("-c", "--country", type=str, nargs="+", help=txt.HLP_ARG_COUNTRY) country.add_argument("--geoip", action="store_true", help=txt.HLP_ARG_GEOIP) country.add_argument("-l", "--country-list", "--list", action="store_true", help=txt.HLP_ARG_LIST) # Branch arguments branch = parser.add_argument_group("BRANCH") only_one = branch.add_mutually_exclusive_group() only_one.add_argument("-b", "--branch", type=str, choices=["stable", "testing", "unstable"], help=txt.HLP_ARG_BRANCH) only_one.add_argument("-G", "--get-branch", action="store_true", help="API: " + txt.HLP_ARG_API_GET_BRANCH) only_one.add_argument("-S", "--set-branch", choices=["stable", "testing", "unstable"], help="API: " + txt.HLP_ARG_API_SET_BRANCH) # Api arguments api = parser.add_argument_group("API") api.add_argument("-a", "--api", action="store_true", help="[-p PREFIX][-R][-S|-G BRANCH][-P PROTO [PROTO ...]]") api.add_argument("-p", "--prefix", type=str, help="API: " + txt.HLP_ARG_API_PREFIX + txt.PREFIX_TIP) api.add_argument("-P", "--proto", "--protocols", choices=["all", "http", "https", "ftp", "ftps"], type=str, nargs="+", help="API: " + txt.HLP_ARG_API_PROTOCOLS) api.add_argument("-R", "--re-branch", action="store_true", help="API: " + txt.HLP_ARG_API_RE_BRANCH) api.add_argument("-U", "--url", type=str, help="API: " + txt.HLP_ARG_API_URL) # Misc arguments misc = parser.add_argument_group("MISC") misc.add_argument("-q", "--quiet", action="store_true", help=txt.HLP_ARG_QUIET) misc.add_argument("-t", "--timeout", type=int, metavar=txt.SECONDS, help=txt.HLP_ARG_TIMEOUT) misc.add_argument("-v", "--version", action="store_true", help=txt.HLP_ARG_VERSION) sync = misc.add_mutually_exclusive_group() sync.add_argument("-n", "--no-mirrorlist", action="store_true", help=txt.HLP_ARG_NO_MIRRORLIST) sync.add_argument("-y", "--sync", action="store_true", help=txt.HLP_ARG_SYNC) args = parser.parse_args() if len(sys.argv) == 1: print("{}pacman-mirrors {}{}".format(color.GREEN, __version__, color.ENDCOLOR)) parser.print_help() sys.exit(0) if args.version: print("{}pacman-mirrors {}{}".format(color.GREEN, __version__, color.ENDCOLOR)) sys.exit(0) if args.country_list: self.country_list = True if os.getuid() != 0: print(".: {} {}".format(txt.ERR_CLR, txt.MUST_BE_ROOT)) sys.exit(1) if args.method: self.config["method"] = args.method if args.branch: self.config["branch"] = args.branch if args.timeout: self.max_wait_time = args.timeout if args.quiet: self.quiet = True if args.sync: self.sync = True if args.interactive: self.interactive = True if not os.environ.get("DISPLAY") or not GTK_AVAILABLE: self.no_display = True if args.interactive and args.default: self.default = True # geoip and country are mutually exclusive if args.geoip: self.geoip = True if args.country and not args.geoip: self.custom = True if "," in args.country[0]: self.config["only_country"] = args.country[0].split(",") else: self.config["only_country"] = args.country if args.fasttrack: self.fasttrack = args.fasttrack self.geoip = False self.custom = False self.config["only_country"] = [] if args.no_mirrorlist: self.no_mirrorlist = True if args.api: getbranch = False rebranch = False url = args.url setbranch = args.set_branch setprotocols = bool(args.proto) if args.get_branch: getbranch = True if args.re_branch: rebranch = True if args.proto: if "all" in args.proto: self.config["protocols"] = [] else: if "," in args.proto: self.config["protocols"] = args.proto.split(",") else: self.config["protocols"] = args.proto self.api_config(set_prefix=args.prefix, set_branch=setbranch, re_branch=rebranch, get_branch=getbranch, set_protocols=setprotocols, set_url=url) def api_config(self, set_prefix=None, set_branch=None, re_branch=False, get_branch=False, set_protocols=False, set_url=None): """Api functions :param set_prefix: prefix to the config paths :param set_branch: replace branch in pacman-mirrors.conf :param re_branch: replace branch in mirrorlist :param get_branch: sys.exit with branch :param set_protocols: replace protocols in pacman-mirrors.conf :param set_url: replace mirror url in mirrorlist """ if set_url is None: set_url = "" if set_prefix is None: set_prefix = "" # Order of API tasks does matter # First API task if get_branch: print(self.config["branch"]) sys.exit(0) # sys.exit(self.config["branch"]) # apply api configuration to internal configuration object # Apply prefix if present if set_prefix: set_prefix = apifn.sanitize_prefix(set_prefix) self.config["config_file"] = set_prefix + self.config["config_file"] self.config["custom_file"] = set_prefix + self.config["custom_file"] self.config["mirror_file"] = set_prefix + self.config["mirror_file"] self.config["mirror_list"] = set_prefix + self.config["mirror_list"] self.config["status_file"] = set_prefix + self.config["status_file"] self.config["work_dir"] = set_prefix + self.config["work_dir"] # to be removed long time after 2017-04-18 self.config["to_be_removed"] = set_prefix + self.config["to_be_removed"] # end removal # api tasks # Second API task: Set branch if set_branch: # Apply branch to internal config self.config["branch"] = set_branch # pacman-mirrors.conf could absent so check for it if not filefn.check_file(self.config["config_file"]): # Copy from host system filefn.create_dir(set_prefix + "/etc") shutil.copyfile("/etc/pacman-mirrors.conf", self.config["config_file"]) # Normalize config apifn.normalize_config(self.config["config_file"]) # Write branch to config apifn.write_config_branch(self.config["branch"], self.config["config_file"], quiet=self.quiet) # Third API task: Create a mirror list if set_url: # mirror list dir could absent so check for it filefn.create_dir(set_prefix + "/etc/pacman.d") mirror = [ { "url": apifn.sanitize_url(set_url), "country": "BUILDMIRROR", "protocols": [set_url[:set_url.find(":")]], "resp_time": "00.00" } ] filefn.output_mirror_list(self.config, mirror, quiet=self.quiet) sys.exit(0) # Fourth API task: Write protocols to config if set_protocols: apifn.write_protocols(self.config["protocols"], self.config["config_file"], quiet=self.quiet) # Fifth API taks: Rebranch the mirrorlist if re_branch: if not set_branch: print(".: {} {}".format(txt.ERR_CLR, txt.API_ERROR_BRANCH)) sys.exit(1) apifn.write_mirrorlist_branch(self.config["branch"], self.config["config_file"], quiet=self.quiet) def build_common_mirror_list(self): """Generate common mirrorlist""" worklist = mirrorfn.filter_mirror_country(self.mirrors.mirrorlist, self.selected_countries) if self.config["protocols"]: worklist = mirrorfn.filter_mirror_protocols( worklist, self.config["protocols"]) # worklist = self.filter_user_branch(worklist) if self.config["method"] == "rank": worklist = self.test_mirrors(worklist) worklist = sorted(worklist, key=itemgetter("resp_time")) else: shuffle(worklist) if worklist: filefn.output_mirror_list(self.config, worklist, quiet=self.quiet) if self.custom: configfn.modify_config(self.config, custom=self.custom) else: configfn.modify_config(self.config, custom=self.custom) else: print(".: {} {}".format(txt.WRN_CLR, txt.NO_SELECTION)) print(".: {} {}".format(txt.INF_CLR, txt.NO_CHANGE)) def build_fasttrack_mirror_list(self, number): """Fast-track the mirrorlist by filtering only up2date mirrors on the selected branch""" # randomize the load on up2date mirrors worklist = self.mirrors.mirrorlist shuffle(worklist) if self.config["protocols"]: worklist = mirrorfn.filter_mirror_protocols( worklist, self.config["protocols"]) # filter not up-to-date mirrors for users selected branch not all branches up2date = self.filter_user_branch(worklist) worklist = [] print(".: {}: {} - {}".format(txt.INF_CLR, txt.QUERY_MIRRORS, txt.TAKES_TIME)) counter = 0 cols, lines = miscfn.terminal_size() for mirror in up2date: if not self.quiet: message = " ..... {:<15}: {}: {}".format( mirror["country"], mirror["last_sync"], mirror["url"]) print("{:.{}}".format(message, cols), end="") sys.stdout.flush() resp_time = httpfn.get_mirror_response(mirror["url"], maxwait=self.max_wait_time, quiet=self.quiet) mirror["resp_time"] = resp_time if float(resp_time) > self.max_wait_time: if not self.quiet: print("\r") else: if not self.quiet: print("\r {:<5}{}{} ".format(color.GREEN, resp_time, color.ENDCOLOR)) worklist.append(mirror) counter += 1 if counter == number: break worklist = sorted(worklist, key=itemgetter("resp_time")) if worklist: filefn.output_mirror_list(self.config, worklist, quiet=self.quiet) else: print(".: {} {}".format(txt.WRN_CLR, txt.NO_SELECTION)) print(".: {} {}".format(txt.INF_CLR, txt.NO_CHANGE)) def build_interactive_mirror_list(self): """Prompt the user to select the mirrors with a gui. * Outputs a "custom" mirror file * Modify the configuration file to use the "custom" file. * Outputs a pacman mirrorlist, """ worklist = mirrorfn.filter_mirror_country(self.mirrors.mirrorlist, self.selected_countries) if self.config["protocols"]: worklist = mirrorfn.filter_mirror_protocols( worklist, self.config["protocols"]) if not self.default: # filter not up-to-date mirrors for selected branch # worklist = self.filter_user_branch(worklist) if self.config["method"] == "rank": worklist = self.test_mirrors(worklist) worklist = sorted(worklist, key=itemgetter("resp_time")) else: shuffle(worklist) interactive_list = [] for mirror in worklist: for protocol in enumerate(mirror["protocols"]): pos = mirror["url"].find(":") interactive_list.append({ "country": mirror["country"], "resp_time": mirror["resp_time"], "last_sync": mirror["last_sync"], "url": "{}{}".format(protocol[1], mirror["url"][pos:]) }) if self.no_display: from . import consoleui as ui else: from . import graphicalui as ui interactive = ui.run(interactive_list, self.config["method"] == "random", self.default) if interactive.is_done: mirror_list = [] # written to mirrorlist mirror_file = [] # written to custom-mirror.json custom_list = interactive.custom_list for item in custom_list: ipos = item["url"].find(":") iurl = item["url"][ipos:] for server in self.mirrors.mirrorlist: spos = server["url"].find(":") surl = server["url"][spos:] if iurl == surl: mirror_file.append({ "country": server["country"], "protocols": server["protocols"], "url": server["url"] }) server["protocols"] = self.config["protocols"] mirror_list.append(server) if self.default and mirror_list: if self.config["method"] == "rank": mirror_list = self.test_mirrors(mirror_list) mirror_list = sorted(mirror_list, key=itemgetter("resp_time")) else: shuffle(mirror_list) if mirror_file: print("\n.: {} {}".format(txt.INF_CLR, txt.CUSTOM_MIRROR_LIST)) print("--------------------------") # output mirror file jsonfn.write_json_file(mirror_file, self.config["custom_file"]) print(".: {} {}: {}".format(txt.INF_CLR, txt.CUSTOM_MIRROR_FILE_SAVED, self.config["custom_file"])) # output pacman mirrorlist filefn.output_mirror_list(self.config, mirror_list, custom=True, quiet=self.quiet, interactive=True) # always use "Custom" from interactive self.config["only_country"] = ["Custom"] configfn.modify_config(self.config, custom=True) print(".: {} {} {}".format(txt.INF_CLR, txt.RESET_CUSTOM_CONFIG, txt.RESET_TIP)) else: print(".: {} {}".format(txt.WRN_CLR, txt.NO_SELECTION)) print(".: {} {}".format(txt.INF_CLR, txt.NO_CHANGE)) def disable_custom_config(self): """Perform reset of custom configuration""" self.config["only_country"] = [] self.custom = False def filter_user_branch(self, mirrorlist): """Filter mirrorlist on users branch and branch sync state""" if self.config["branch"] == "stable": selected_branch = 0 elif self.config["branch"] == "testing": selected_branch = 1 else: selected_branch = 2 filtered = [] for mirror in mirrorlist: if mirror["branches"][selected_branch] == 1: filtered.append(mirror) if len(filtered) > 0: return filtered return mirrorlist def output_country_list(self): """List all available countries""" print("{}".format("\n".join(self.mirrors.countrylist))) def load_all_mirrors(self): """Load mirrors""" # decision on disable custom config if self.config["only_country"] == ["all"]: self.disable_custom_config() # decision on custom or default if self.config["only_country"] == ["Custom"]: # check if custom config is valid if validfn.custom_config_is_valid(): self.custom = True else: self.disable_custom_config() else: self.selected_countries = self.config["only_country"] # decision on custom vs countries from conf or argument if self.custom and not self.selected_countries: self.load_custom_mirrors() self.selected_countries = self.mirrors.countrylist else: self.load_default_mirrors() # validate selection and build country list self.selected_countries = mirrorfn.build_country_list(self.selected_countries, self.mirrors.countrylist, self.geoip) def load_custom_mirrors(self): """Load available custom mirrors""" if self.default: self.load_default_mirrors() else: self.seed_mirrors(self.config["custom_file"]) def load_default_mirrors(self): """Load all available mirrors""" (file, status) = filefn.return_mirror_filename(self.config) self.seed_mirrors(file, status) def sort_mirror_countries(self): self.mirrors.mirrorlist = sorted(self.mirrors.mirrorlist, key=itemgetter("country")) self.mirrors.countrylist = sorted(self.mirrors.countrylist) def seed_mirrors(self, file, status=False): """Seed mirrors""" mirrors = filefn.read_mirror_file(file) # seed mirror object if status: self.mirrors.seed(mirrors, status=status) else: self.mirrors.seed(mirrors) # sort mirrors countrywise self.sort_mirror_countries() def test_mirrors(self, worklist): """Query server for response time""" if self.custom: print(".: {} {}".format(txt.INF_CLR, txt.USING_CUSTOM_FILE)) else: print(".: {} {}".format(txt.INF_CLR, txt.USING_DEFAULT_FILE)) print(".: {} {} - {}".format(txt.INF_CLR, txt.QUERY_MIRRORS, txt.TAKES_TIME)) cols, lines = miscfn.terminal_size() http_wait = self.max_wait_time ssl_wait = self.max_wait_time * 2 ssl_verify = self.config["ssl_verify"] for mirror in worklist: pos = mirror["url"].find(":") url = mirror["url"][pos:] for idx, proto in enumerate(mirror["protocols"]): mirror["url"] = "{}{}".format(proto, url) if not self.quiet: message = " ..... {:<15}: {}".format(mirror["country"], mirror["url"]) print("{:.{}}".format(message, cols), end="") sys.stdout.flush() # https sometimes takes a short while for handshake if proto == "https" or proto == "ftps": self.max_wait_time = ssl_wait else: self.max_wait_time = http_wait # let's see how responsive you are resp_time = httpfn.get_mirror_response(mirror["url"], maxwait=self.max_wait_time, quiet=self.quiet, ssl_verify=ssl_verify) mirror["resp_time"] = resp_time if float(resp_time) >= self.max_wait_time: if not self.quiet: print("\r") else: if not self.quiet: print("\r {:<5}{}{} ".format(color.GREEN, resp_time, color.ENDCOLOR)) return worklist def run(self): """Run""" (self.config, self.custom) = configfn.build_config() # build config by parsing p-m.conf filefn.create_dir(self.config["work_dir"]) # ensure /var/lib/pacman-mirrors exist self.command_line_parse() # parse command line self.network = httpfn.inet_conn_check() # net check if self.network: # update data files httpfn.update_mirrors(self.config, quiet=self.quiet) if self.no_mirrorlist: sys.exit(0) # exit if not self.network: if not self.quiet: miscfn.internet_message() # console message self.config["method"] = "random" # use random instead of rank self.fasttrack = False # using fasttrack is not possible self.load_all_mirrors() if self.country_list: self.output_country_list() # print country list sys.exit(0) if self.fasttrack: self.build_fasttrack_mirror_list(self.fasttrack) # fasttrack argument elif self.interactive: self.build_interactive_mirror_list() # interactive argument else: self.build_common_mirror_list() # default if self.network and self.sync: subprocess.call(["pacman", "-Syy"]) # sync pacman db if __name__ == "__main__": app = PacmanMirrors() app.run()