--- /dev/null
+#!/usr/bin/python3
+
+# "manual" scheduler to spread out our backups better
+
+# We want to smear out backups over time, subject to the following constraints:
+# - no more than MAX_FULL full backups should run at any time,
+# - no more than MAX_FULL+MAX_DIFF full+diff backups should run at any time,
+# - no more than MAX_FULL+MAX_DIFF+MAX_INC full+diff+inc backups should run at any time,
+# - no more than MAX_TOTAL jobs are running and already created at any time
+# - for each client:
+# - do not schedule a backup if one failed or was aborted recently (MIN_FAILED_TIMEOUT)
+# - do not schedule a backup if one for that host is already running
+# - do not schedule a backup if one for that host is already pending/scheduled
+# - do not schedule a backup if we had run one recently (MIN_AGE)
+# - consider scheduling a {full,diff,inc} backup with propability
+# o 0 if the last of that type (or higher) is younger than XXX,
+# o 1 if the last of that type (or higher) is older than YYY,
+# o linearly interpolated in between.
+# For each scheduled backup considered, choose {full,diff,inc} up to the
+# limits imposed by MAX_*.
+
+# Copyright 2010, 2011, 2013, 2017, 2018 Peter Palfrader
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import argparse
+import datetime
+import hashlib
+import logging
+import logging.handlers
+import os.path
+import psycopg2
+import psycopg2.extras
+import random
+import re
+import subprocess
+import sys
+
+NAME = "dsa-bacula-scheduler"
+SYSLOG_ADDRESS = '/dev/log'
+
+DSA_BACULA_DB_CONNECT = '/etc/dsa/bacula-reader-database'
+DSA_CLIENT_LIST_FILE = '/etc/bacula/dsa-clients'
+
+def convert_timedelta(s, default_unit='h'):
+ m = re.match('([0-9]+)([smhdw])?$', s)
+ if m is None: raise ValueError
+ ticks = int(m.group(1))
+ unit = m.group(2)
+ if unit is None: unit = default_unit
+
+ if unit == 's': None
+ elif unit == 'm': ticks *= 60
+ elif unit == 'h': ticks *= 60*60
+ elif unit == 'd': ticks *= 60*60*24
+ elif unit == 'w': ticks *= 60*60*24*7
+ else: raise ValueError
+ return datetime.timedelta(seconds = ticks)
+
+MIN_AGE = '20h'
+MIN_FAILED_TIMEOUT = '6h'
+TYPESL = ['F', 'D', 'I']
+TYPES = { 'F': 'full',
+ 'D': 'diff',
+ 'I': 'inc',
+ }
+TYPES_COMPLETE_NAME = { 'F': 'full',
+ 'D': 'differential',
+ 'I': 'incremental',
+ }
+SCHED = { 'F': '40d:45d',
+ 'D': '7d:10d',
+ 'I': '22h:26h',
+ }
+MAX_JOBS = { 'F': 2,
+ 'D': 1,
+ 'I': 2,
+ 'total': 12,
+ }
+
+parser = argparse.ArgumentParser()
+parser.add_argument("-d", "--db-connect-string", metavar="connect-string", dest="db",
+ help="Database connect string")
+parser.add_argument("-D", "--db-connect-string-file", metavar="FILE", dest="dbfile",
+ default=DSA_BACULA_DB_CONNECT,
+ help="File to read database connect string from (%s)"%(DSA_BACULA_DB_CONNECT,))
+parser.add_argument("-c", "--client-list", metavar="FILE", dest="clientlist",
+ default=DSA_CLIENT_LIST_FILE,
+ help="File with a list of all clients (%s)"%(DSA_CLIENT_LIST_FILE,))
+parser.add_argument("-v", "--verbose", dest="verbose",
+ action='count', default=0,
+ help="Be more verbose (repeat for more).")
+parser.add_argument("-n", "--nodo", dest="nodo",
+ default=False, action="store_true",
+ help="Print to cat rather than bconsole.")
+parser.add_argument("--min-age", dest="min_age", metavar="TIME",
+ default=MIN_AGE,
+ help="Do not consider running a backup for a client if we ran one within TIME (%s)"%(MIN_AGE,))
+parser.add_argument("--min-failed-timeout", dest="min_failed_timeout", metavar="TIME",
+ default=MIN_FAILED_TIMEOUT,
+ help="Do not consider running a backup for a client if it had a backup failed or aborted within TIME (%s)"%(MIN_FAILED_TIMEOUT,))
+for t in TYPESL:
+ parser.add_argument("--sched-%s"%(TYPES[t],), dest="sched_%s"%(t,), metavar="SCHED",
+ default=SCHED[t],
+ help="Run %s backups somewhen in this age window (%s)"%(TYPES[t], SCHED[t]))
+ parser.add_argument("--max-%s"%(TYPES[t],), dest="max_%s"%(t,), metavar="MAX",
+ default=MAX_JOBS[t], type=int,
+ help="Do not have more than MAX jobs run concurrently at level %s (%d) (can use free higher-level backup slots)"%(TYPES[t], MAX_JOBS[t]))
+parser.add_argument("--max-total", dest="max_total", metavar="MAX",
+ default=MAX_JOBS['total'], type=int,
+ help="Do not have more than MAX_TOTAL also counting created, blocked, etc (%s)"%(MAX_JOBS['total']))
+args = parser.parse_args()
+
+def setup_logger(args):
+ logger = logging.getLogger(NAME)
+ logger.setLevel(logging.DEBUG)
+
+ # log to stderr
+ stream_handler = logging.StreamHandler()
+ formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
+ stream_handler.setFormatter(formatter)
+
+ if args.verbose >= 2:
+ stream_handler.setLevel(logging.DEBUG)
+ elif args.verbose >= 1:
+ stream_handler.setLevel(logging.INFO)
+ else:
+ stream_handler.setLevel(logging.WARN)
+
+ logger.addHandler(stream_handler)
+
+ # log to syslog
+ syslog_handler = logging.handlers.SysLogHandler(address = SYSLOG_ADDRESS)
+ syslog_handler.setLevel(logging.INFO)
+ logger.addHandler(syslog_handler)
+
+ return logger
+
+logger = setup_logger(args)
+
+if args.db is not None:
+ pass
+elif args.dbfile is not None:
+ args.db = open(args.dbfile).read().rstrip()
+else:
+ logger.error("Need one of -d or -D.")
+ sys.exit(1)
+
+# parse schedules
+now = datetime.datetime.now()
+min_age = convert_timedelta(args.min_age)
+max_jobs = {}
+sched = {}
+for t in TYPESL:
+ spec = vars(args)['sched_%s'%(t,)]
+ try:
+ (low, high) = spec.split(':', 1)
+ except ValueError:
+ logger.error("Invalid schedule spec (%s)", spec)
+ sys.exit(1)
+
+ sched[t] = {
+ 'lo': convert_timedelta(low),
+ 'hi': convert_timedelta(high),
+ }
+ if sched[t]['hi'] <= sched[t]['lo']:
+ logger.error("Scheduling error in %s: low needs to be smaller than high (but we have %s and %s)", spec, low, high)
+ sys.exit(1)
+
+ max_jobs[t] = vars(args)['max_%s'%(t,)]
+max_jobs['total'] = args.max_total
+min_failed_timeout = convert_timedelta(args.min_failed_timeout)
+
+
+# Get list of clients
+if not os.path.exists(args.clientlist):
+ logger.error("File %s does not exist", args.clientlist)
+ sys.exit(1)
+clients = set(open(args.clientlist).read().split())
+
+
+conn = psycopg2.connect(args.db)
+cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+# get backups currently
+# R running
+# C created
+# F | Waiting for Client
+# S | Waiting for Storage daemon
+# m | Waiting for new media
+# M | Waiting for media mount
+# s | Waiting for storage resource
+# j | Waiting for job resource
+# c | Waiting for client resource
+# d | Waiting on maximum jobs
+# t | Waiting on start time
+# p | Waiting on higher priority jobs
+cursor.execute("""
+ SELECT name, level
+ FROM job
+ WHERE type='B' AND
+ jobstatus IN ('R')
+""", {})
+current_backups = {}
+for t in TYPESL:
+ current_backups[t] = set()
+for r in cursor.fetchall():
+ current_backups[r['level']].add(r['name'])
+
+cursor.execute("""
+ SELECT name, level
+ FROM job
+ WHERE type='B' AND
+ jobstatus IN ('C', 'F', 'S', 'm', 'M', 's', 'j', 'c', 'd', 't', 'p')
+""", {})
+pending_backups = {}
+for r in cursor.fetchall():
+ pending_backups[r['name']] = r['level']
+
+cursor.execute("""
+ SELECT name, MAX(starttime) AS starttime
+ FROM job
+ WHERE type='B' AND
+ jobstatus IN ('A', 'f')
+ GROUP BY name
+""", {})
+failed_backups = set()
+for r in cursor.fetchall():
+ if now - r['starttime'] < min_failed_timeout:
+ failed_backups.add(r['name'])
+
+
+for t in TYPESL:
+ logger.debug("Num jobs currently running %s: %d; max (including higher pri): %d", t, len(current_backups[t]), max_jobs[t])
+logger.debug("Num jobs currently pending (created/waiting/..): %d; max (including running): %d", len(pending_backups), max_jobs['total'])
+
+most_recent = {}
+# get most recent backups of each type for all clients
+cursor.execute("""
+ SELECT name, level, MAX(starttime) AS starttime
+ FROM job
+ WHERE type='B' AND
+ jobstatus='T'
+ GROUP BY name, level
+""", {})
+for r in cursor.fetchall():
+ if not r['name'] in most_recent:
+ most_recent[r['name']] = {}
+ most_recent[r['name']][r['level']] = r['starttime']
+
+candidates = {}
+for t in TYPESL:
+ candidates[t] = []
+
+for c in clients:
+ if c in failed_backups:
+ logger.debug("Not considering %s as backups failed or were aborted recently", c)
+ continue
+ if any([c in cb for cb in current_backups.values()]):
+ logger.debug("Not considering %s as a backup is currently running", c)
+ continue
+ if c in pending_backups:
+ logger.debug("Not considering %s as a backup is currently created/waiting on things", c)
+ continue
+ if c in most_recent:
+ recent = max(most_recent[c].values())
+ if recent >= now - min_age:
+ logger.debug("Not considering %s as a backup ran recently (%s)", c, recent)
+ #continue
+ recent = datetime.datetime.min
+ for t in TYPESL:
+ if c in most_recent and t in most_recent[c]:
+ recent = max(recent, most_recent[c][t])
+ logger.debug("Most recent backup for %s/%s ran %s", c, t, recent)
+ age = (now - recent)
+ frac = (age - sched[t]['lo'])/(sched[t]['hi'] - sched[t]['lo'])
+ if frac <= 0:
+ logger.debug("Most recent backup for %s/%s (%s) is new enough: %.2f < 0", c, t, recent, frac)
+ runme = False
+ elif frac >= 1:
+ logger.debug("Most recent backup for %s/%s (%s) is old: %.2f >= 1", c, t, recent, frac)
+ runme = True
+ else:
+ logger.debug("Most recent backup for %s/%s (%s) is so so: 0 <= %.2f <= 1", c, t, recent, frac)
+
+ # get a semi random value that is consistent across multiple calls.
+ # we hash the client name and the most recent backup time and use the first byte of the digest.
+ d = hashlib.sha256(("%s - %s - %s"%(c, t, recent)).encode()).digest()
+ prnd = d[0] / 256
+
+ runme = frac > prnd
+ if runme:
+ logger.debug("Considering running %s/%s", c, t)
+ candidates[t].append( {'client': c, 'level': t} )
+ break
+
+
+total_cnt = len(pending_backups) + sum([len(current_backups[t]) for t in TYPESL])
+total_free = max_jobs['total'] - total_cnt
+
+run_cnt = 0
+total_slots = 0
+cmd = []
+for t in TYPESL:
+ run_cnt += len(current_backups[t])
+ total_slots += max_jobs[t]
+ if len(candidates[t]) == 0:
+ logger.info("No candidates for %s", t)
+ continue
+ free_slots = min(total_slots - run_cnt, total_free)
+ if free_slots <= 0:
+ logger.info("No free slots for %s but %d candidates", t, len(candidates[t]))
+ continue
+
+ logger.info("Have %d candidates and %d free slots for %s.", len(candidates[t]), free_slots, t)
+ picked = random.sample(candidates[t], min(free_slots, len(candidates[t])))
+ for p in picked:
+ logger.info(" Will run: %s/%s", p['client'], p['level'])
+
+ for p in picked:
+ cmd.append("run job=%s level=%s yes"%(p['client'], TYPES_COMPLETE_NAME[p['level']]))
+ total_free -= 1
+ run_cnt += 1
+
+if args.nodo:
+ print("\n".join(cmd))
+ sys.exit(0)
+
+if args.verbose:
+ for c in cmd:
+ print("Now going to run: %s"%(c,))
+
+if len(cmd) == 0:
+ sys.exit(0)
+
+p = subprocess.Popen(['/usr/sbin/bconsole'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+(out, err) = p.communicate("\n".join(cmd).encode())
+if p.returncode != 0:
+ raise Exception("bconsole failed. stdout:\n%s\nstderr:%s\n"%(out, err))
+
+if args.verbose:
+ print("stdout:\n")
+ sys.stdout.buffer.write(out)
+ print("\n")
+
+if err != b"":
+ print("bconsole said on stderr:\n", file=sys.stderr)
+ sys.stderr.buffer.write(out)
+ print("", file=sys.stderr)
+ sys.exit(1)