X-Git-Url: https://git.adam-barratt.org.uk/?a=blobdiff_plain;f=modules%2Fbacula%2Ffiles%2Fdsa-bacula-scheduler;fp=modules%2Fbacula%2Ffiles%2Fdsa-bacula-scheduler;h=cefcb2b1889ed72b7b3abdcc3c4cde63a93c556e;hb=76ca91bce24ecbcbcc4e62a37aa06fd0fb9f96c7;hp=0000000000000000000000000000000000000000;hpb=46cee04ab06b23ab6e9e4baba655cf470d10cfc4;p=mirror%2Fdsa-puppet.git diff --git a/modules/bacula/files/dsa-bacula-scheduler b/modules/bacula/files/dsa-bacula-scheduler new file mode 100755 index 000000000..cefcb2b18 --- /dev/null +++ b/modules/bacula/files/dsa-bacula-scheduler @@ -0,0 +1,367 @@ +#!/usr/bin/python3 + +# "manual" scheduler to spread out our backups better + +# We want to smear out backups over time, subject to the following constraints: +# - no more than MAX_FULL full backups should run at any time, +# - no more than MAX_FULL+MAX_DIFF full+diff backups should run at any time, +# - no more than MAX_FULL+MAX_DIFF+MAX_INC full+diff+inc backups should run at any time, +# - no more than MAX_TOTAL jobs are running and already created at any time +# - for each client: +# - do not schedule a backup if one failed or was aborted recently (MIN_FAILED_TIMEOUT) +# - do not schedule a backup if one for that host is already running +# - do not schedule a backup if one for that host is already pending/scheduled +# - do not schedule a backup if we had run one recently (MIN_AGE) +# - consider scheduling a {full,diff,inc} backup with propability +# o 0 if the last of that type (or higher) is younger than XXX, +# o 1 if the last of that type (or higher) is older than YYY, +# o linearly interpolated in between. +# For each scheduled backup considered, choose {full,diff,inc} up to the +# limits imposed by MAX_*. + +# Copyright 2010, 2011, 2013, 2017, 2018 Peter Palfrader +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import argparse +import datetime +import hashlib +import logging +import logging.handlers +import os.path +import psycopg2 +import psycopg2.extras +import random +import re +import subprocess +import sys + +NAME = "dsa-bacula-scheduler" +SYSLOG_ADDRESS = '/dev/log' + +DSA_BACULA_DB_CONNECT = '/etc/dsa/bacula-reader-database' +DSA_CLIENT_LIST_FILE = '/etc/bacula/dsa-clients' + +def convert_timedelta(s, default_unit='h'): + m = re.match('([0-9]+)([smhdw])?$', s) + if m is None: raise ValueError + ticks = int(m.group(1)) + unit = m.group(2) + if unit is None: unit = default_unit + + if unit == 's': None + elif unit == 'm': ticks *= 60 + elif unit == 'h': ticks *= 60*60 + elif unit == 'd': ticks *= 60*60*24 + elif unit == 'w': ticks *= 60*60*24*7 + else: raise ValueError + return datetime.timedelta(seconds = ticks) + +MIN_AGE = '20h' +MIN_FAILED_TIMEOUT = '6h' +TYPESL = ['F', 'D', 'I'] +TYPES = { 'F': 'full', + 'D': 'diff', + 'I': 'inc', + } +TYPES_COMPLETE_NAME = { 'F': 'full', + 'D': 'differential', + 'I': 'incremental', + } +SCHED = { 'F': '40d:45d', + 'D': '7d:10d', + 'I': '22h:26h', + } +MAX_JOBS = { 'F': 2, + 'D': 1, + 'I': 2, + 'total': 12, + } + +parser = argparse.ArgumentParser() +parser.add_argument("-d", "--db-connect-string", metavar="connect-string", dest="db", + help="Database connect string") +parser.add_argument("-D", "--db-connect-string-file", metavar="FILE", dest="dbfile", + default=DSA_BACULA_DB_CONNECT, + help="File to read database connect string from (%s)"%(DSA_BACULA_DB_CONNECT,)) +parser.add_argument("-c", "--client-list", metavar="FILE", dest="clientlist", + default=DSA_CLIENT_LIST_FILE, + help="File with a list of all clients (%s)"%(DSA_CLIENT_LIST_FILE,)) +parser.add_argument("-v", "--verbose", dest="verbose", + action='count', default=0, + help="Be more verbose (repeat for more).") +parser.add_argument("-n", "--nodo", dest="nodo", + default=False, action="store_true", + help="Print to cat rather than bconsole.") +parser.add_argument("--min-age", dest="min_age", metavar="TIME", + default=MIN_AGE, + help="Do not consider running a backup for a client if we ran one within TIME (%s)"%(MIN_AGE,)) +parser.add_argument("--min-failed-timeout", dest="min_failed_timeout", metavar="TIME", + default=MIN_FAILED_TIMEOUT, + help="Do not consider running a backup for a client if it had a backup failed or aborted within TIME (%s)"%(MIN_FAILED_TIMEOUT,)) +for t in TYPESL: + parser.add_argument("--sched-%s"%(TYPES[t],), dest="sched_%s"%(t,), metavar="SCHED", + default=SCHED[t], + help="Run %s backups somewhen in this age window (%s)"%(TYPES[t], SCHED[t])) + parser.add_argument("--max-%s"%(TYPES[t],), dest="max_%s"%(t,), metavar="MAX", + default=MAX_JOBS[t], type=int, + help="Do not have more than MAX jobs run concurrently at level %s (%d) (can use free higher-level backup slots)"%(TYPES[t], MAX_JOBS[t])) +parser.add_argument("--max-total", dest="max_total", metavar="MAX", + default=MAX_JOBS['total'], type=int, + help="Do not have more than MAX_TOTAL also counting created, blocked, etc (%s)"%(MAX_JOBS['total'])) +args = parser.parse_args() + +def setup_logger(args): + logger = logging.getLogger(NAME) + logger.setLevel(logging.DEBUG) + + # log to stderr + stream_handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s') + stream_handler.setFormatter(formatter) + + if args.verbose >= 2: + stream_handler.setLevel(logging.DEBUG) + elif args.verbose >= 1: + stream_handler.setLevel(logging.INFO) + else: + stream_handler.setLevel(logging.WARN) + + logger.addHandler(stream_handler) + + # log to syslog + syslog_handler = logging.handlers.SysLogHandler(address = SYSLOG_ADDRESS) + syslog_handler.setLevel(logging.INFO) + logger.addHandler(syslog_handler) + + return logger + +logger = setup_logger(args) + +if args.db is not None: + pass +elif args.dbfile is not None: + args.db = open(args.dbfile).read().rstrip() +else: + logger.error("Need one of -d or -D.") + sys.exit(1) + +# parse schedules +now = datetime.datetime.now() +min_age = convert_timedelta(args.min_age) +max_jobs = {} +sched = {} +for t in TYPESL: + spec = vars(args)['sched_%s'%(t,)] + try: + (low, high) = spec.split(':', 1) + except ValueError: + logger.error("Invalid schedule spec (%s)", spec) + sys.exit(1) + + sched[t] = { + 'lo': convert_timedelta(low), + 'hi': convert_timedelta(high), + } + if sched[t]['hi'] <= sched[t]['lo']: + logger.error("Scheduling error in %s: low needs to be smaller than high (but we have %s and %s)", spec, low, high) + sys.exit(1) + + max_jobs[t] = vars(args)['max_%s'%(t,)] +max_jobs['total'] = args.max_total +min_failed_timeout = convert_timedelta(args.min_failed_timeout) + + +# Get list of clients +if not os.path.exists(args.clientlist): + logger.error("File %s does not exist", args.clientlist) + sys.exit(1) +clients = set(open(args.clientlist).read().split()) + + +conn = psycopg2.connect(args.db) +cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + +# get backups currently +# R running +# C created +# F | Waiting for Client +# S | Waiting for Storage daemon +# m | Waiting for new media +# M | Waiting for media mount +# s | Waiting for storage resource +# j | Waiting for job resource +# c | Waiting for client resource +# d | Waiting on maximum jobs +# t | Waiting on start time +# p | Waiting on higher priority jobs +cursor.execute(""" + SELECT name, level + FROM job + WHERE type='B' AND + jobstatus IN ('R') +""", {}) +current_backups = {} +for t in TYPESL: + current_backups[t] = set() +for r in cursor.fetchall(): + current_backups[r['level']].add(r['name']) + +cursor.execute(""" + SELECT name, level + FROM job + WHERE type='B' AND + jobstatus IN ('C', 'F', 'S', 'm', 'M', 's', 'j', 'c', 'd', 't', 'p') +""", {}) +pending_backups = {} +for r in cursor.fetchall(): + pending_backups[r['name']] = r['level'] + +cursor.execute(""" + SELECT name, MAX(starttime) AS starttime + FROM job + WHERE type='B' AND + jobstatus IN ('A', 'f') + GROUP BY name +""", {}) +failed_backups = set() +for r in cursor.fetchall(): + if now - r['starttime'] < min_failed_timeout: + failed_backups.add(r['name']) + + +for t in TYPESL: + logger.debug("Num jobs currently running %s: %d; max (including higher pri): %d", t, len(current_backups[t]), max_jobs[t]) +logger.debug("Num jobs currently pending (created/waiting/..): %d; max (including running): %d", len(pending_backups), max_jobs['total']) + +most_recent = {} +# get most recent backups of each type for all clients +cursor.execute(""" + SELECT name, level, MAX(starttime) AS starttime + FROM job + WHERE type='B' AND + jobstatus='T' + GROUP BY name, level +""", {}) +for r in cursor.fetchall(): + if not r['name'] in most_recent: + most_recent[r['name']] = {} + most_recent[r['name']][r['level']] = r['starttime'] + +candidates = {} +for t in TYPESL: + candidates[t] = [] + +for c in clients: + if c in failed_backups: + logger.debug("Not considering %s as backups failed or were aborted recently", c) + continue + if any([c in cb for cb in current_backups.values()]): + logger.debug("Not considering %s as a backup is currently running", c) + continue + if c in pending_backups: + logger.debug("Not considering %s as a backup is currently created/waiting on things", c) + continue + if c in most_recent: + recent = max(most_recent[c].values()) + if recent >= now - min_age: + logger.debug("Not considering %s as a backup ran recently (%s)", c, recent) + #continue + recent = datetime.datetime.min + for t in TYPESL: + if c in most_recent and t in most_recent[c]: + recent = max(recent, most_recent[c][t]) + logger.debug("Most recent backup for %s/%s ran %s", c, t, recent) + age = (now - recent) + frac = (age - sched[t]['lo'])/(sched[t]['hi'] - sched[t]['lo']) + if frac <= 0: + logger.debug("Most recent backup for %s/%s (%s) is new enough: %.2f < 0", c, t, recent, frac) + runme = False + elif frac >= 1: + logger.debug("Most recent backup for %s/%s (%s) is old: %.2f >= 1", c, t, recent, frac) + runme = True + else: + logger.debug("Most recent backup for %s/%s (%s) is so so: 0 <= %.2f <= 1", c, t, recent, frac) + + # get a semi random value that is consistent across multiple calls. + # we hash the client name and the most recent backup time and use the first byte of the digest. + d = hashlib.sha256(("%s - %s - %s"%(c, t, recent)).encode()).digest() + prnd = d[0] / 256 + + runme = frac > prnd + if runme: + logger.debug("Considering running %s/%s", c, t) + candidates[t].append( {'client': c, 'level': t} ) + break + + +total_cnt = len(pending_backups) + sum([len(current_backups[t]) for t in TYPESL]) +total_free = max_jobs['total'] - total_cnt + +run_cnt = 0 +total_slots = 0 +cmd = [] +for t in TYPESL: + run_cnt += len(current_backups[t]) + total_slots += max_jobs[t] + if len(candidates[t]) == 0: + logger.info("No candidates for %s(%s) jobs", t, TYPES_COMPLETE_NAME[t]) + continue + free_slots = min(total_slots - run_cnt, total_free) + if free_slots <= 0: + logger.info("No free slots for %s(%s) jobs but %d candidate(s)", t, TYPES_COMPLETE_NAME[t], len(candidates[t])) + continue + + logger.info("Have %d candidate(s) and %d free slots for %s(%s) jobs.", len(candidates[t]), free_slots, t, TYPES_COMPLETE_NAME[t]) + picked = random.sample(candidates[t], min(free_slots, len(candidates[t]))) + for p in picked: + logger.info(" Will run: %s/%s", p['client'], p['level']) + + for p in picked: + cmd.append("run job=%s level=%s yes"%(p['client'], TYPES_COMPLETE_NAME[p['level']])) + total_free -= 1 + run_cnt += 1 + +if args.nodo: + print("\n".join(cmd)) + sys.exit(0) + +if args.verbose: + for c in cmd: + print("Now going to run: %s"%(c,)) + +if len(cmd) == 0: + sys.exit(0) + +p = subprocess.Popen(['/usr/sbin/bconsole'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) +(out, err) = p.communicate("\n".join(cmd).encode()) +if p.returncode != 0: + raise Exception("bconsole failed. stdout:\n%s\nstderr:%s\n"%(out, err)) + +if args.verbose: + print("stdout:\n") + sys.stdout.buffer.write(out) + print("\n") + +if err != b"": + print("bconsole said on stderr:\n", file=sys.stderr) + sys.stderr.buffer.write(out) + print("", file=sys.stderr) + sys.exit(1)