Skip to content
Snippets Groups Projects
Commit c85e3ee2 authored by Frede H's avatar Frede H :speech_balloon:
Browse files

refactored all functions from main to separate files

parent 7ddd12e8
No related branches found
No related tags found
No related merge requests found
Showing
with 1328 additions and 756 deletions
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Interactive Fasttrack list Builder"""
import shutil
import sys
from pacman_mirrors.constants import txt
from pacman_mirrors.api import apifn
from pacman_mirrors.functions import fileFn
from pacman_mirrors.functions import util
def api_config(self, set_pfx=None, set_branch=None, re_branch=False, set_protocols=False, set_url=None):
"""
Api configuration function
:param self:
:param set_pfx: prefix to the config paths
:param set_branch: replace branch in pacman-mirrors.conf
:param re_branch: replace branch in mirrorlist
:param set_protocols: replace protocols in pacman-mirrors.conf
:param set_url: replace mirror url in mirrorlist
"""
if set_url is None:
set_url = ""
if set_pfx is None:
set_pfx = ""
"""
# apply api configuration to internal configuration object
# Apply prefix if present
"""
if set_pfx:
set_pfx = apifn.sanitize_prefix(set_pfx)
self.config["config_file"] = set_pfx + self.config["config_file"]
self.config["custom_file"] = set_pfx + self.config["custom_file"]
self.config["mirror_file"] = set_pfx + self.config["mirror_file"]
self.config["mirror_list"] = set_pfx + self.config["mirror_list"]
self.config["status_file"] = set_pfx + self.config["status_file"]
self.config["work_dir"] = set_pfx + self.config["work_dir"]
"""
# First API task: Set branch
"""
if set_branch:
# Apply branch to internal config
self.config["branch"] = set_branch
util.i686_check(self, write=False)
"""
# pacman-mirrors.conf could absent so check for it
"""
if not fileFn.check_existance_of(self.config["config_file"]):
"""
# Copy from host system
"""
fileFn.create_dir(set_pfx + "/etc")
shutil.copyfile("/etc/pacman-mirrors.conf",
self.config["config_file"])
"""
# Normalize config
"""
apifn.normalize_config(self.config["config_file"])
"""
# Write branch to config
"""
apifn.write_config_branch(self.config["branch"],
self.config["config_file"],
quiet=self.quiet)
"""
# Second API task: Create a mirror list
"""
if set_url:
"""
# mirror list dir could absent so check for it
"""
fileFn.create_dir(set_pfx + "/etc/pacman.d")
mirror = [
{
"url": apifn.sanitize_url(set_url),
"country": "BUILDMIRROR",
"protocols": [set_url[:set_url.find(":")]],
"resp_time": "00.00"
}
]
fileFn.write_mirror_list(self.config, mirror, quiet=self.quiet)
# exit gracefully
sys.exit(0)
"""
# Third API task: Write protocols to config
"""
if set_protocols:
apifn.write_protocols(self.config["protocols"],
self.config["config_file"],
quiet=self.quiet)
"""
# Fourth API task: Rebranch the mirrorlist
"""
if re_branch:
if not set_branch:
print(".: {} {}".format(txt.ERR_CLR, txt.API_ERROR_BRANCH))
sys.exit(1)
apifn.write_mirrorlist_branch(self.config["branch"],
self.config["config_file"],
quiet=self.quiet)
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Common Mirror list Builder"""
from operator import itemgetter
from random import shuffle
from pacman_mirrors.constants import txt
from pacman_mirrors.functions import filterFn
from pacman_mirrors.functions import outputFn
from pacman_mirrors.functions import testMirrorFn
def build_common_mirror_list(self):
"""
Generate common mirrorlist
"""
"""
Create a list based on the content of selected_countries
"""
mirror_selection = filterFn.filter_mirror_country(self.mirrors.mirror_pool,
self.selected_countries)
"""
Check the length of selected_countries against the full countrylist
If selected_countries is the lesser then we build a custom pool file
"""
if len(self.selected_countries) < len(self.mirrors.country_pool):
try:
_ = self.selected_countries[0]
outputFn.output_custom_mirror_pool_file(self, mirror_selection)
except IndexError:
pass
"""
Prototol filtering if applicable
"""
try:
_ = self.config["protocols"][0]
mirror_selection = filterFn.filter_mirror_protocols(
mirror_selection, self.config["protocols"])
except IndexError:
pass
"""
only list mirrors which are up-to-date for users selected branch
by removing not up-to-date mirrors from the list
UP-TO-DATE FILTERING NEXT
"""
mirror_selection = filterFn.filter_user_branch(mirror_selection, self.config)
if self.config["method"] == "rank":
mirror_selection = testMirrorFn.test_mirrors(self, mirror_selection)
mirror_selection = sorted(mirror_selection,
key=itemgetter("resp_time"))
else:
shuffle(mirror_selection)
"""
Try to write mirrorlist
"""
try:
_ = mirror_selection[0]
outputFn.output_mirror_list(self, mirror_selection)
if self.custom:
print(".: {} {} 'sudo {}'".format(txt.INF_CLR,
txt.REMOVE_CUSTOM_CONFIG,
txt.RESET_ALL))
except IndexError:
print(".: {} {}".format(txt.WRN_CLR, txt.NO_SELECTION))
print(".: {} {}".format(txt.INF_CLR, txt.NO_CHANGE))
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Interactive Fasttrack list Builder"""
import sys
from operator import itemgetter
from random import shuffle
from pacman_mirrors.constants import txt, colors as color
from pacman_mirrors.functions import filterFn
from pacman_mirrors.functions import httpFn
from pacman_mirrors.functions import outputFn
from pacman_mirrors.functions import util
def build_fasttrack_mirror_list(self, number):
"""
Fast-track the mirrorlist by filtering only up-to-date mirrors
The function takes into account the branch selected by the user
either on commandline or in pacman-mirrors.conf.
The function returns a filtered list consisting of a number of mirrors
Only mirrors from the active mirror file is used
either mirrors.json or custom-mirrors.json
"""
# randomize the load on up-to-date mirrors
worklist = self.mirrors.mirror_pool
shuffle(worklist)
if self.config["protocols"]:
worklist = filterFn.filter_mirror_protocols(
worklist, self.config["protocols"])
"""
Only pick mirrors which are up-to-date for users selected branch
by removing not up-to-date mirrors from the list
UP-TO-DATE FILTERING NEXT
"""
up_to_date_mirrors = filterFn.filter_user_branch(worklist, self.config)
worklist = []
print(".: {}: {} - {}".format(txt.INF_CLR,
txt.QUERY_MIRRORS,
txt.TAKES_TIME))
counter = 0
cols, lines = util.terminal_size()
for mirror in up_to_date_mirrors:
if not self.quiet:
message = " ..... {:<15}: {}: {}".format(
mirror["country"], mirror["last_sync"], mirror["url"])
print("{:.{}}".format(message, cols), end="")
sys.stdout.flush()
resp_time = httpFn.get_mirror_response(mirror["url"],
maxwait=self.max_wait_time,
quiet=self.quiet)
mirror["resp_time"] = resp_time
if float(resp_time) > self.max_wait_time:
if not self.quiet:
print("\r")
else:
if not self.quiet:
print("\r {:<5}{}{} ".format(color.GREEN,
resp_time,
color.ENDCOLOR))
worklist.append(mirror)
counter += 1
"""
Equality check will stop execution
when the desired number is reached.
In the possible event the first mirror's
response time exceeds the predefined response time,
the loop would stop execution if the check for zero is not present
"""
if counter is not 0 and counter == number:
break
worklist = sorted(worklist,
key=itemgetter("resp_time"))
"""
Try to write mirrorlist
"""
try:
_ = worklist[0]
outputFn.output_mirror_list(self, worklist)
except IndexError:
print(".: {} {}".format(txt.WRN_CLR, txt.NO_SELECTION))
print(".: {} {}".format(txt.INF_CLR, txt.NO_CHANGE))
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Interactive Mirror list Builder"""
from operator import itemgetter
from random import shuffle
from pacman_mirrors.constants import txt
from pacman_mirrors.functions import convertFn
from pacman_mirrors.functions import filterFn
from pacman_mirrors.functions import outputFn
from pacman_mirrors.functions import testMirrorFn
def build_interactive_mirror_list(self):
"""
Prompt the user to select the mirrors with a gui.
Outputs a "custom" mirror file
Modify the configuration file to use the "custom" file.
Outputs a pacman mirrorlist,
"""
"""
It would seem reasonable to implement a filter
based on the users branch and the mirrors update status
On the other hand, the interactive mode is for the user
to have total control over the mirror file.
So though it might seem prudent to only include updated mirrors,
we will not do it when user has selected interactive mode.
The final mirrorfile will include all mirrors selected by the user
The final mirrorlist will exclude (if possible) mirrors not up-to-date
"""
worklist = filterFn.filter_mirror_country(self.mirrors.mirror_pool,
self.selected_countries)
"""
If config.protols has content, that is a user decision and as such
it has nothing to do with the reasoning regarding mirrors
which might or might not be up-to-date
"""
try:
_ = self.config["protocols"][0]
worklist = filterFn.filter_mirror_protocols(
worklist, self.config["protocols"])
except IndexError:
pass
# rank or shuffle the mirrorlist before showing the ui
if not self.default:
if self.config["method"] == "rank":
worklist = testMirrorFn.test_mirrors(self, worklist)
worklist = sorted(worklist, key=itemgetter("resp_time"))
else:
shuffle(worklist)
"""
Create a list for display in ui.
The gui and the console ui expect the supplied list
to be in the old country dictionary format.
{
"country": "country_name",
"resp_time": "m.sss",
"last_sync": "HH:MM",
"url": "http://server/repo/"
}
Therefor we have to create a list in the old format,
thus avoiding rewrite of the ui and related functions.
We subseqently need to translate the result into:
a. a mirrorfile in the new json format,
b. a mirrorlist in pacman format.
"""
interactive_list = convertFn.translate_pool_to_interactive(worklist)
#
# import the right ui
if self.no_display:
# in console mode
from pacman_mirrors.dialogs import consoleui as ui
else:
# gobject introspection is present and accounted for
from pacman_mirrors.dialogs import graphicalui as ui
interactive = ui.run(interactive_list,
self.config["method"] == "random",
self.default)
# process user choices
if interactive.is_done:
custom_pool, mirror_list = convertFn.translate_interactive_to_pool(interactive.custom_list,
self.mirrors.mirror_pool,
self.config)
"""
Try selected method on the mirrorlist
"""
try:
_ = mirror_list[0]
if self.default:
if self.config["method"] == "rank":
mirror_list = testMirrorFn.test_mirrors(self, mirror_list)
mirror_list = sorted(mirror_list, key=itemgetter("resp_time"))
else:
shuffle(mirror_list)
except IndexError:
pass
"""
Try to write the mirrorfile and mirrorlist
"""
try:
_ = custom_pool[0]
self.custom = True
self.config["country_pool"] = ["Custom"]
outputFn.output_custom_mirror_pool_file(self, custom_pool)
"""
Writing the final mirrorlist
only write mirrors which are up-to-date for users selected branch
UP-TO-DATE FILTERING NEXT
"""
mirror_list = filterFn.filter_user_branch(mirror_list, self.config)
"""
Try writing mirrorlist
If no up-to-date mirrors exist for users branch
"""
try:
_ = mirror_list[0]
outputFn.output_mirror_list(self, mirror_list)
except IndexError:
raise IndexError
except IndexError:
print(".: {} {}".format(txt.WRN_CLR, txt.NO_SELECTION))
print(".: {} {}".format(txt.INF_CLR, txt.NO_CHANGE))
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Command Line Parser"""
import argparse
import os
import sys
from pacman_mirrors import __version__
from pacman_mirrors.api import api_handler
from pacman_mirrors.functions import customPoolFn
from pacman_mirrors.constants import txt
from pacman_mirrors.functions import outputFn
from pacman_mirrors.translation.custom_help_formatter \
import CustomHelpFormatter
def parse_command_line(self, gtk_available):
"""Read the arguments of the command line"""
args_summary = "[-h] [-f [{}]] [-i [-d]] [-m {}]\n" \
"\t\t[-c {} [{}...] | [--geoip]] [-l]\n" \
"\t\t[-q] [-t {}] [-v] [-n]\n" \
"\t\t[--api]\n" \
"\t\t\t[-S/-B {}] [-p {}]\n" \
"\t\t\t[-P {} [{}...]] [-R] [-U {}]\n".format(txt.NUMBER,
txt.METHOD,
txt.COUNTRY,
txt.COUNTRY,
txt.SECONDS,
txt.BRANCH,
txt.PREFIX,
txt.PROTO,
txt.PROTO,
txt.URL)
nusage = "\rVersion {}\n{}:\n pacman-mirrors".format(__version__, txt.USAGE)
usage = "{} {}".format(nusage, args_summary)
parser = argparse.ArgumentParser(formatter_class=CustomHelpFormatter,
add_help=False, usage=usage)
# Method arguments
methods = parser.add_argument_group(txt.METHODS)
methods.add_argument("-i", "--interactive",
action="store_true",
help=txt.HLP_ARG_INTERACTIVE)
methods_exclusive = methods.add_mutually_exclusive_group()
methods_exclusive.add_argument("-f", "--fasttrack",
nargs="?",
const=0,
type=int,
default=0,
metavar=txt.NUMBER,
help="{} {}".format(txt.HLP_ARG_FASTTRACK, txt.OVERRIDE_OPT))
methods_exclusive.add_argument("-c", "--country",
type=str,
nargs="+",
metavar=txt.COUNTRY,
help=txt.HLP_ARG_COUNTRY)
methods_exclusive.add_argument("-g", "--geoip",
action="store_true",
help=txt.HLP_ARG_GEOIP)
# Api arguments
api = parser.add_argument_group(txt.API)
api.add_argument("-a", "--api",
action="store_true",
help="[-p {}][-R][-S/-B|-G {}][-P {} [{} ...]]".format(
txt.PREFIX, txt.BRANCH, txt.PROTO, txt.PROTO))
api.add_argument("-S", "-B", "--set-branch",
choices=["stable", "testing", "unstable"],
help="{}: {}".format(
txt.API, txt.HLP_ARG_API_SET_BRANCH))
api.add_argument("-p", "--prefix",
type=str,
metavar=txt.PREFIX,
help="{}: {} {}".format(
txt.API, txt.HLP_ARG_API_PREFIX, txt.PREFIX_TIP))
api.add_argument("-P", "--proto", "--protocols",
choices=["all", "http", "https", "ftp", "ftps"],
type=str,
nargs="+",
help="{}: {}".format(
txt.API, txt.HLP_ARG_API_PROTOCOLS))
api.add_argument("-R", "--re-branch",
action="store_true",
help="{}: {}".format(
txt.API, txt.HLP_ARG_API_RE_BRANCH))
api.add_argument("-U", "--url",
type=str,
metavar=txt.URL,
help="{}: {}".format(
txt.API, txt.HLP_ARG_API_URL))
"""
Misc arguments
"""
misc = parser.add_argument_group(txt.MISC)
misc.add_argument("-G", "--get-branch",
action="store_true",
help="{}".format(txt.HLP_ARG_API_GET_BRANCH))
misc.add_argument("-d", "--default",
action="store_true",
help="INTERACTIVE: " + txt.HLP_ARG_DEFAULT)
misc.add_argument("-h", "--help",
action="store_true")
misc.add_argument("-l", "--list", "--country-list",
action="store_true",
help=txt.HLP_ARG_LIST)
misc.add_argument("-m", "--method",
type=str,
choices=["rank", "random"],
help=txt.HLP_ARG_METHOD)
misc.add_argument("-n", "--no-mirrorlist",
action="store_true",
help=txt.HLP_ARG_NO_MIRRORLIST)
misc.add_argument("-q", "--quiet",
action="store_true",
help=txt.HLP_ARG_QUIET)
misc.add_argument("-t", "--timeout",
type=int,
metavar=txt.SECONDS,
help=txt.HLP_ARG_TIMEOUT)
misc.add_argument("-v", "--version",
action="store_true",
help=txt.HLP_ARG_VERSION)
args = parser.parse_args()
"""
#############################################################
No root required
#############################################################
"""
if len(sys.argv) == 1 or args.help:
parser.print_help()
sys.exit(0)
if args.version:
print("Version {}".format(__version__))
sys.exit(0)
if args.list:
outputFn.output_country_pool_console(self)
sys.exit(0)
if args.get_branch:
print(self.config["branch"])
sys.exit(0)
"""
#############################################################
Validate arg combinations
#############################################################
"""
"""
If --set-branch, --protocols, --url, --prefix and not --api reject
"""
if args.set_branch or args.proto or args.url or args.prefix:
if not args.api:
print(".: {} {}".format(txt.ERR_CLR, txt.API_ARGUMENTS_ERROR))
sys.exit(1)
"""
If --default and not --interactive reject
"""
if args.default:
if not args.interactive:
print(".: {} {}".format(txt.ERR_CLR, txt.INTERACTIVE_ARGUMENTS_ERROR))
sys.exit(1)
"""
#############################################################
Root required
#############################################################
"""
if os.getuid() != 0:
print(".: {} {}".format(
txt.ERR_CLR, txt.MUST_BE_ROOT))
sys.exit(1)
if args.geoip:
self.geoip = True
if args.method:
self.config["method"] = args.method
if args.no_mirrorlist:
self.no_mirrorlist = True
if args.quiet:
self.quiet = True
if args.timeout:
self.max_wait_time = args.timeout
"""
Generation methods
"""
if args.country:
self.geoip = False
if "," in args.country[0]:
self.config["country_pool"] = args.country[0].split(",")
else:
self.config["country_pool"] = args.country
if self.config["country_pool"] == ["all"]:
customPoolFn.delete_custom_pool(self)
if args.fasttrack:
self.fasttrack = args.fasttrack
self.geoip = False
self.config["method"] = "rank"
if args.interactive:
self.interactive = True
if args.default:
self.default = True
if os.environ.get("XDG_SESSION_TYPE") == "wayland" or not os.environ.get("DISPLAY") or not gtk_available:
self.no_display = True
"""
API handling
Setup variables for passing to the api_config function
"""
if args.api:
rebranch = False
url = args.url
setbranch = args.set_branch
setprotocols = bool(args.proto)
if args.re_branch:
rebranch = True
if args.proto:
if "all" in args.proto:
self.config["protocols"] = []
else:
if "," in args.proto:
self.config["protocols"] = args.proto.split(",")
else:
self.config["protocols"] = args.proto
api_handler.api_config(self, set_pfx=args.prefix,
set_branch=setbranch, re_branch=rebranch,
set_protocols=setprotocols, set_url=url)
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Converter Functions"""
from pacman_mirrors.constants import txt
from pacman_mirrors.functions import util
def translate_interactive_to_pool(interactive_pool, mirror_pool, config):
"""
Translate mirror pool for interactive display
:param interactive_pool:
:param mirror_pool:
:param config:
:return:
"""
custom_pool = []
mirror_list = []
for custom in interactive_pool:
"""
url without protocol
"""
try:
custom_url = util.strip_protocol(custom["url"])
"""
locate mirror in the full mirror pool
"""
for mirror in mirror_pool:
try:
_ = mirror_pool[0]
mirror_url = util.strip_protocol(mirror["url"])
if custom_url == mirror_url:
custom_pool.append({
"country": mirror["country"],
"protocols": mirror["protocols"],
"url": mirror["url"]
})
try:
"""
Try to replace protocols with user selection
"""
_ = config["protocols"][0]
mirror["protocols"] = config["protocols"]
except IndexError:
pass
mirror_list.append(mirror)
except (KeyError, IndexError):
print("{} {}! The mirror pool is empty".format(txt.WRN_CLR, txt.HOUSTON))
break
except KeyError:
print("{} {}! The custom pool is empty".format(txt.WRN_CLR, txt.HOUSTON))
break
return custom_pool, mirror_list
def translate_pool_to_interactive(mirror_pool):
"""
Translate mirror pool for interactive display
:param mirror_pool:
:return: list of dictionaries
{
"country": "country_name",
"resp_time": "m.sss",
"last_sync": "HH:MM",
"url": "http://server/repo/"
}
"""
interactive_list = []
for mirror in mirror_pool:
try:
_ = mirror_pool[0]
ls = str(mirror["last_sync"]).split(":")
mirror_url = util.strip_protocol(mirror["url"])
for idx, protocol in enumerate(mirror["protocols"]):
interactive_list.append({
"country": mirror["country"],
"resp_time": mirror["resp_time"],
"last_sync": "{}h {}m".format(ls[0], ls[1]),
"url": "{}{}".format(protocol, mirror_url)
})
except (KeyError, IndexError):
print("{} {}! The mirror pool is empty".format(txt.WRN_CLR, txt.HOUSTON))
break
return interactive_list
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Default Mirror Functions"""
from pacman_mirrors.functions import jsonFn
def set_custom_mirror_status(config, custom_pool):
"""
Apply the current mirror status to the custom mirror file
:param config: config dictionary
:param custom_pool: the custom mirror pool
:return: custom mirror pool with status applied
"""
status_list = tuple(jsonFn.read_json_file(config["status_file"], dictionary=False))
custom_list = tuple(custom_pool)
try:
_ = status_list[0]
for custom in custom_list:
for status in status_list:
if custom["url"] in status["url"]:
custom["last_sync"] = status["last_sync"]
custom["branches"] = status["branches"]
return list(custom_list)
except (IndexError, KeyError):
return custom_pool
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Custom Pool Functions"""
from pacman_mirrors.functions import customMirrorFn
from pacman_mirrors.functions import defaultMirrorFn
from pacman_mirrors.functions import defaultPoolFn
from pacman_mirrors.functions import fileFn
from pacman_mirrors.functions import validFn
def check_custom_mirror_pool(self):
"""
Custom mirror pool or countries from CLI
:return: True/False
"""
if validFn.custom_config_is_valid():
self.custom = True
else:
self.selected_countries = self.config["country_pool"]
return self.custom
def delete_custom_pool(self):
"""
Delete custom mirror pool
"""
self.custom = False
self.config["country_pool"] = []
fileFn.delete_file(self.config["custom_file"])
def load_custom_mirror_pool(self):
"""
Load available custom mirrors and update their status from status.json
If user request the default mirror pool load the default pool
"""
if self.default:
defaultPoolFn.load_default_mirror_pool(self)
else:
defaultMirrorFn.seed_mirrors(self, self.config["custom_file"])
self.mirrors.mirror_pool = customMirrorFn.set_custom_mirror_status(
self.config, self.mirrors.mirror_pool)
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Default Mirror Functions"""
from operator import itemgetter
from pacman_mirrors.functions import fileFn
from pacman_mirrors.functions import httpFn
from pacman_mirrors.functions import validFn
def build_country_list(country_selection, country_pool, geoip=False):
"""
Do a check on the users country selection
:param country_selection:
:param country_pool:
:param geoip:
:return: list of valid countries
:rtype: list
"""
"""
Don't change this code
This works so please don't touch
"""
result = []
if country_selection:
if country_selection == ["all"]:
result = country_pool
else:
if validFn.country_list_is_valid(country_selection,
country_pool):
result = country_selection
if not result:
if geoip:
country = get_geoip_country(country_pool)
if country: # valid geoip
result = country
else:
result = country_pool
else:
result = country_pool
return result
def get_geoip_country(country_pool):
"""
Check if geoip is possible
:param country_pool:
:return: country name if found
"""
g_country = httpFn.get_geoip_country()
if g_country in country_pool:
return g_country
else:
return None
def seed_mirrors(self, file, status=False):
"""
Seed mirrors
"""
mirrors = fileFn.read_mirror_file(file)
if status:
self.mirrors.seed(mirrors, status=status)
else:
self.mirrors.seed(mirrors)
sort_mirror_countries(self)
def sort_mirror_countries(self):
self.mirrors.mirror_pool = sorted(self.mirrors.mirror_pool,
key=itemgetter("country"))
self.mirrors.country_pool = sorted(self.mirrors.country_pool)
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Default Pool Functions"""
from pacman_mirrors.functions import customPoolFn
from pacman_mirrors.functions import customPoolFn
from pacman_mirrors.functions import fileFn
from pacman_mirrors.functions import defaultMirrorFn
def load_all_mirrors(self):
"""
Load all mirrors from active mirror pool
"""
if customPoolFn.check_custom_mirror_pool(self) and not self.config["country_pool"]:
customPoolFn.load_custom_mirror_pool(self)
self.selected_countries = self.mirrors.country_pool
else:
if self.config["country_pool"]:
self.selected_countries = self.config["country_pool"]
load_default_mirror_pool(self)
"""
Validate the list of selected countries
"""
self.selected_countries = defaultMirrorFn.build_country_list(
self.selected_countries, self.mirrors.country_pool, self.geoip)
def load_default_mirror_pool(self):
"""
Load all available mirrors
"""
(file, status) = fileFn.return_mirror_filename(self.config)
defaultMirrorFn.seed_mirrors(self, file, status)
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Filter Functions"""
from pacman_mirrors.config import configuration as conf
def filter_mirror_country(mirror_pool, country_pool):
"""
Return new mirror pool with selected countries
:param mirror_pool:
:param country_pool:
:rtype: list
"""
result = []
for mirror in mirror_pool:
if mirror["country"] in country_pool:
result.append(mirror)
return result
def filter_mirror_protocols(mirror_pool, protocols=None):
"""
Return a new mirrorlist with protocols
:type mirror_pool: list
:type protocols: list
:rtype: list
"""
result = []
if not protocols:
return mirror_pool
for mirror in mirror_pool:
accepted = []
for idx, protocol in enumerate(protocols):
if protocol in mirror["protocols"]:
accepted.append(protocol)
if accepted:
mirror["protocols"] = accepted
result.append(mirror)
return result
def filter_user_branch(mirror_pool, config):
"""
Filter mirrorlist on users branch and branch sync state
"""
for idx, branch in enumerate(conf.BRANCHES):
if config["x32"]:
config_branch = config["branch"][4:]
else:
config_branch = config["branch"]
if branch == config_branch:
filtered = []
for mirror in mirror_pool:
try:
if mirror["branches"][idx] == 1:
filtered.append(mirror)
except IndexError:
pass
if len(filtered) > 0:
return filtered
return mirror_pool
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Outpout Functions"""
from pacman_mirrors.constants import txt
from pacman_mirrors.functions import fileFn
from pacman_mirrors.functions import jsonFn
from pacman_mirrors.functions import defaultPoolFn
def output_country_pool_console(self):
"""
List all available countries
"""
defaultPoolFn.load_default_mirror_pool(self)
print("{}".format("\n".join(self.mirrors.country_pool)))
def output_custom_mirror_pool_file(self, selected_mirrors):
"""
Output selected mirrors to custom mirror file
:param self:
:param selected_mirrors:
:return:
"""
print("\n.: {} {}".format(txt.INF_CLR,
txt.CUSTOM_MIRROR_LIST))
print("--------------------------")
# output mirror file
jsonFn.write_json_file(selected_mirrors,
self.config["custom_file"])
print(".: {} {}: {}".format(txt.INF_CLR,
txt.CUSTOM_MIRROR_FILE_SAVED,
self.config["custom_file"]))
def output_mirror_list(self, selected_servers):
"""
Outputs selected servers to mirrorlist
:param self:
:param selected_servers:
"""
if self.custom:
fileFn.write_mirror_list(self.config,
selected_servers,
custom=self.custom,
quiet=self.quiet,
interactive=True)
else:
fileFn.write_mirror_list(self.config,
selected_servers,
quiet=self.quiet)
#!/usr/bin/env python
#
# This file is part of pacman-mirrors.
#
# pacman-mirrors is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pacman-mirrors is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
"""Pacman-Mirrors Test Mirror Functions"""
import sys
from pacman_mirrors.constants import txt, colors as color
from pacman_mirrors.functions import httpFn
from pacman_mirrors.functions import util
def test_mirrors(self, worklist):
"""
Query server for response time
"""
if self.custom:
print(".: {} {}".format(txt.INF_CLR,
txt.USING_CUSTOM_FILE))
else:
print(".: {} {}".format(txt.INF_CLR,
txt.USING_DEFAULT_FILE))
print(".: {} {} - {}".format(txt.INF_CLR,
txt.QUERY_MIRRORS,
txt.TAKES_TIME))
cols, lines = util.terminal_size()
# set connection timeouts
http_wait = self.max_wait_time
ssl_wait = self.max_wait_time * 2
ssl_verify = self.config["ssl_verify"]
for mirror in worklist:
colon = mirror["url"].find(":")
url = mirror["url"][colon:]
for idx, proto in enumerate(mirror["protocols"]):
mirror["url"] = "{}{}".format(proto, url)
if not self.quiet:
message = " ..... {:<15}: {}".format(mirror["country"],
mirror["url"])
print("{:.{}}".format(message, cols), end="")
sys.stdout.flush()
# https sometimes takes longer for handshake
if proto == "https" or proto == "ftps":
self.max_wait_time = ssl_wait
else:
self.max_wait_time = http_wait
# let's see how responsive you are
mirror["resp_time"] = httpFn.get_mirror_response(
mirror["url"], maxwait=self.max_wait_time,
quiet=self.quiet, ssl_verify=ssl_verify)
if float(mirror["resp_time"]) >= self.max_wait_time:
if not self.quiet:
print("\r")
else:
if not self.quiet:
print("\r {:<5}{}{} ".format(color.GREEN,
mirror["resp_time"],
color.ENDCOLOR))
return worklist
......@@ -16,11 +16,24 @@
# along with pacman-mirrors. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Frede Hundewadt <echo ZmhAbWFuamFyby5vcmcK | base64 -d>
import platform
import shutil
from pacman_mirrors.api import apifn
from pacman_mirrors.constants import colors as color, txt
def i686_check(self, write=False):
if platform.machine() == "i686":
self.config["x32"] = True
if "x32" not in self.config["branch"]:
self.config["branch"] = "x32-{}".format(self.config["branch"])
if write:
apifn.write_config_branch(self.config["branch"], self.config["config_file"], quiet=True)
def strip_protocol(url):
"""
Splits an url
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment