#!/usr/bin/python3 # "manual" scheduler to spread out our backups better # We want to smear out backups over time, subject to the following constraints: # - no more than MAX_FULL full backups should run at any time, # - no more than MAX_FULL+MAX_DIFF full+diff backups should run at any time, # - no more than MAX_FULL+MAX_DIFF+MAX_INC full+diff+inc backups should run at any time, # - no more than MAX_TOTAL jobs are running and already created at any time # - for each client: # - do not schedule a backup if one failed or was aborted recently (MIN_FAILED_TIMEOUT) # - do not schedule a backup if one for that host is already running # - do not schedule a backup if one for that host is already pending/scheduled # - do not schedule a backup if we had run one recently (MIN_AGE) # - consider scheduling a {full,diff,inc} backup with propability # o 0 if the last of that type (or higher) is younger than XXX, # o 1 if the last of that type (or higher) is older than YYY, # o linearly interpolated in between. # For each scheduled backup considered, choose {full,diff,inc} up to the # limits imposed by MAX_*. # Copyright 2010, 2011, 2013, 2017, 2018 Peter Palfrader # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import argparse import datetime import hashlib import logging import logging.handlers import os.path import psycopg2 import psycopg2.extras import random import re import subprocess import sys NAME = "dsa-bacula-scheduler" SYSLOG_ADDRESS = '/dev/log' DSA_BACULA_DB_CONNECT = '/etc/dsa/bacula-reader-database' DSA_CLIENT_LIST_FILE = '/etc/bacula/dsa-clients' def convert_timedelta(s, default_unit='h'): m = re.match('([0-9]+)([smhdw])?$', s) if m is None: raise ValueError ticks = int(m.group(1)) unit = m.group(2) if unit is None: unit = default_unit if unit == 's': None elif unit == 'm': ticks *= 60 elif unit == 'h': ticks *= 60*60 elif unit == 'd': ticks *= 60*60*24 elif unit == 'w': ticks *= 60*60*24*7 else: raise ValueError return datetime.timedelta(seconds = ticks) MIN_AGE = '20h' MIN_FAILED_TIMEOUT = '6h' TYPESL = ['F', 'D', 'I'] TYPES = { 'F': 'full', 'D': 'diff', 'I': 'inc', } TYPES_COMPLETE_NAME = { 'F': 'full', 'D': 'differential', 'I': 'incremental', } SCHED = { 'F': '40d:45d', 'D': '7d:10d', 'I': '22h:26h', } MAX_JOBS = { 'F': 2, 'D': 1, 'I': 2, 'total': 12, } parser = argparse.ArgumentParser() parser.add_argument("-d", "--db-connect-string", metavar="connect-string", dest="db", help="Database connect string") parser.add_argument("-D", "--db-connect-string-file", metavar="FILE", dest="dbfile", default=DSA_BACULA_DB_CONNECT, help="File to read database connect string from (%s)"%(DSA_BACULA_DB_CONNECT,)) parser.add_argument("-c", "--client-list", metavar="FILE", dest="clientlist", default=DSA_CLIENT_LIST_FILE, help="File with a list of all clients (%s)"%(DSA_CLIENT_LIST_FILE,)) parser.add_argument("-v", "--verbose", dest="verbose", action='count', default=0, help="Be more verbose (repeat for more).") parser.add_argument("-n", "--nodo", dest="nodo", default=False, action="store_true", help="Print to cat rather than bconsole.") parser.add_argument("--min-age", dest="min_age", metavar="TIME", default=MIN_AGE, help="Do not consider running a backup for a client if we ran one within TIME (%s)"%(MIN_AGE,)) parser.add_argument("--min-failed-timeout", dest="min_failed_timeout", metavar="TIME", default=MIN_FAILED_TIMEOUT, help="Do not consider running a backup for a client if it had a backup failed or aborted within TIME (%s)"%(MIN_FAILED_TIMEOUT,)) for t in TYPESL: parser.add_argument("--sched-%s"%(TYPES[t],), dest="sched_%s"%(t,), metavar="SCHED", default=SCHED[t], help="Run %s backups somewhen in this age window (%s)"%(TYPES[t], SCHED[t])) parser.add_argument("--max-%s"%(TYPES[t],), dest="max_%s"%(t,), metavar="MAX", default=MAX_JOBS[t], type=int, help="Do not have more than MAX jobs run concurrently at level %s (%d) (can use free higher-level backup slots)"%(TYPES[t], MAX_JOBS[t])) parser.add_argument("--max-total", dest="max_total", metavar="MAX", default=MAX_JOBS['total'], type=int, help="Do not have more than MAX_TOTAL also counting created, blocked, etc (%s)"%(MAX_JOBS['total'])) args = parser.parse_args() def setup_logger(args): logger = logging.getLogger(NAME) logger.setLevel(logging.DEBUG) # log to stderr stream_handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s') stream_handler.setFormatter(formatter) if args.verbose >= 2: stream_handler.setLevel(logging.DEBUG) elif args.verbose >= 1: stream_handler.setLevel(logging.INFO) else: stream_handler.setLevel(logging.WARN) logger.addHandler(stream_handler) # log to syslog syslog_handler = logging.handlers.SysLogHandler(address = SYSLOG_ADDRESS) syslog_handler.setLevel(logging.INFO) logger.addHandler(syslog_handler) return logger logger = setup_logger(args) if args.db is not None: pass elif args.dbfile is not None: args.db = open(args.dbfile).read().rstrip() else: logger.error("Need one of -d or -D.") sys.exit(1) # parse schedules now = datetime.datetime.now() min_age = convert_timedelta(args.min_age) max_jobs = {} sched = {} for t in TYPESL: spec = vars(args)['sched_%s'%(t,)] try: (low, high) = spec.split(':', 1) except ValueError: logger.error("Invalid schedule spec (%s)", spec) sys.exit(1) sched[t] = { 'lo': convert_timedelta(low), 'hi': convert_timedelta(high), } if sched[t]['hi'] <= sched[t]['lo']: logger.error("Scheduling error in %s: low needs to be smaller than high (but we have %s and %s)", spec, low, high) sys.exit(1) max_jobs[t] = vars(args)['max_%s'%(t,)] max_jobs['total'] = args.max_total min_failed_timeout = convert_timedelta(args.min_failed_timeout) # Get list of clients if not os.path.exists(args.clientlist): logger.error("File %s does not exist", args.clientlist) sys.exit(1) clients = set(open(args.clientlist).read().split()) conn = psycopg2.connect(args.db) cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) # get backups currently # R running # C created # F | Waiting for Client # S | Waiting for Storage daemon # m | Waiting for new media # M | Waiting for media mount # s | Waiting for storage resource # j | Waiting for job resource # c | Waiting for client resource # d | Waiting on maximum jobs # t | Waiting on start time # p | Waiting on higher priority jobs cursor.execute(""" SELECT name, level FROM job WHERE type='B' AND jobstatus IN ('R') """, {}) current_backups = {} for t in TYPESL: current_backups[t] = set() for r in cursor.fetchall(): current_backups[r['level']].add(r['name']) cursor.execute(""" SELECT name, level FROM job WHERE type='B' AND jobstatus IN ('C', 'F', 'S', 'm', 'M', 's', 'j', 'c', 'd', 't', 'p') """, {}) pending_backups = {} for r in cursor.fetchall(): pending_backups[r['name']] = r['level'] cursor.execute(""" SELECT name, MAX(starttime) AS starttime FROM job WHERE type='B' AND jobstatus IN ('A', 'f') GROUP BY name """, {}) failed_backups = set() for r in cursor.fetchall(): if now - r['starttime'] < min_failed_timeout: failed_backups.add(r['name']) for t in TYPESL: logger.debug("Num jobs currently running %s: %d; max (including higher pri): %d", t, len(current_backups[t]), max_jobs[t]) logger.debug("Num jobs currently pending (created/waiting/..): %d; max (including running): %d", len(pending_backups), max_jobs['total']) most_recent = {} # get most recent backups of each type for all clients cursor.execute(""" SELECT name, level, MAX(starttime) AS starttime FROM job WHERE type='B' AND jobstatus='T' GROUP BY name, level """, {}) for r in cursor.fetchall(): if not r['name'] in most_recent: most_recent[r['name']] = {} most_recent[r['name']][r['level']] = r['starttime'] candidates = {} for t in TYPESL: candidates[t] = [] for c in clients: if c in failed_backups: logger.debug("Not considering %s as backups failed or were aborted recently", c) continue if any([c in cb for cb in current_backups.values()]): logger.debug("Not considering %s as a backup is currently running", c) continue if c in pending_backups: logger.debug("Not considering %s as a backup is currently created/waiting on things", c) continue if c in most_recent: recent = max(most_recent[c].values()) if recent >= now - min_age: logger.debug("Not considering %s as a backup ran recently (%s)", c, recent) #continue recent = datetime.datetime.min for t in TYPESL: if c in most_recent and t in most_recent[c]: recent = max(recent, most_recent[c][t]) logger.debug("Most recent backup for %s/%s ran %s", c, t, recent) age = (now - recent) frac = (age - sched[t]['lo'])/(sched[t]['hi'] - sched[t]['lo']) if frac <= 0: logger.debug("Most recent backup for %s/%s (%s) is new enough: %.2f < 0", c, t, recent, frac) runme = False elif frac >= 1: logger.debug("Most recent backup for %s/%s (%s) is old: %.2f >= 1", c, t, recent, frac) runme = True else: logger.debug("Most recent backup for %s/%s (%s) is so so: 0 <= %.2f <= 1", c, t, recent, frac) # get a semi random value that is consistent across multiple calls. # we hash the client name and the most recent backup time and use the first byte of the digest. d = hashlib.sha256(("%s - %s - %s"%(c, t, recent)).encode()).digest() prnd = d[0] / 256 runme = frac > prnd if runme: logger.debug("Considering running %s/%s", c, t) candidates[t].append( {'client': c, 'level': t} ) break total_cnt = len(pending_backups) + sum([len(current_backups[t]) for t in TYPESL]) total_free = max_jobs['total'] - total_cnt run_cnt = 0 total_slots = 0 cmd = [] for t in TYPESL: run_cnt += len(current_backups[t]) total_slots += max_jobs[t] if len(candidates[t]) == 0: logger.info("No candidates for %s(%s) jobs", t, TYPES_COMPLETE_NAME[t]) continue free_slots = min(total_slots - run_cnt, total_free) if free_slots <= 0: logger.info("No free slots for %s(%s) jobs but %d candidate(s)", t, TYPES_COMPLETE_NAME[t], len(candidates[t])) continue logger.info("Have %d candidate(s) and %d free slots for %s(%s) jobs.", len(candidates[t]), free_slots, t, TYPES_COMPLETE_NAME[t]) picked = random.sample(candidates[t], min(free_slots, len(candidates[t]))) for p in picked: logger.info(" Will run: %s/%s", p['client'], p['level']) for p in picked: cmd.append("run job=%s level=%s yes"%(p['client'], TYPES_COMPLETE_NAME[p['level']])) total_free -= 1 run_cnt += 1 if args.nodo: print("\n".join(cmd)) sys.exit(0) if args.verbose: for c in cmd: print("Now going to run: %s"%(c,)) if len(cmd) == 0: sys.exit(0) p = subprocess.Popen(['/usr/sbin/bconsole'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (out, err) = p.communicate("\n".join(cmd).encode()) if p.returncode != 0: raise Exception("bconsole failed. stdout:\n%s\nstderr:%s\n"%(out, err)) if args.verbose: print("stdout:\n") sys.stdout.buffer.write(out) print("\n") if err != b"": print("bconsole said on stderr:\n", file=sys.stderr) sys.stderr.buffer.write(out) print("", file=sys.stderr) sys.exit(1)