Merge remote-tracking branch 'origin/master' into staging
[mirror/dsa-puppet.git] / modules / bacula / files / dsa-bacula-scheduler
diff --git a/modules/bacula/files/dsa-bacula-scheduler b/modules/bacula/files/dsa-bacula-scheduler
new file mode 100755 (executable)
index 0000000..cefcb2b
--- /dev/null
@@ -0,0 +1,367 @@
+#!/usr/bin/python3
+
+# "manual" scheduler to spread out our backups better
+
+# We want to smear out backups over time, subject to the following constraints:
+#  - no more than MAX_FULL full backups should run at any time,
+#  - no more than MAX_FULL+MAX_DIFF full+diff backups should run at any time,
+#  - no more than MAX_FULL+MAX_DIFF+MAX_INC full+diff+inc backups should run at any time,
+#  - no more than MAX_TOTAL jobs are running and already created at any time
+#  - for each client:
+#    - do not schedule a backup if one failed or was aborted recently (MIN_FAILED_TIMEOUT)
+#    - do not schedule a backup if one for that host is already running
+#    - do not schedule a backup if one for that host is already pending/scheduled
+#    - do not schedule a backup if we had run one recently (MIN_AGE)
+#    - consider scheduling a {full,diff,inc} backup with propability
+#      o 0  if the last of that type (or higher) is younger than XXX,
+#      o 1  if the last of that type (or higher) is older than YYY,
+#      o linearly interpolated in between.
+# For each scheduled backup considered, choose {full,diff,inc} up to the
+# limits imposed by MAX_*.
+
+# Copyright 2010, 2011, 2013, 2017, 2018 Peter Palfrader
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import argparse
+import datetime
+import hashlib
+import logging
+import logging.handlers
+import os.path
+import psycopg2
+import psycopg2.extras
+import random
+import re
+import subprocess
+import sys
+
+NAME = "dsa-bacula-scheduler"
+SYSLOG_ADDRESS = '/dev/log'
+
+DSA_BACULA_DB_CONNECT = '/etc/dsa/bacula-reader-database'
+DSA_CLIENT_LIST_FILE = '/etc/bacula/dsa-clients'
+
+def convert_timedelta(s, default_unit='h'):
+  m = re.match('([0-9]+)([smhdw])?$', s)
+  if m is None: raise ValueError
+  ticks = int(m.group(1))
+  unit = m.group(2)
+  if unit is None: unit = default_unit
+
+  if unit == 's': None
+  elif unit == 'm': ticks *= 60
+  elif unit == 'h': ticks *= 60*60
+  elif unit == 'd': ticks *= 60*60*24
+  elif unit == 'w': ticks *= 60*60*24*7
+  else: raise ValueError
+  return datetime.timedelta(seconds = ticks)
+
+MIN_AGE = '20h'
+MIN_FAILED_TIMEOUT = '6h'
+TYPESL = ['F', 'D', 'I']
+TYPES = { 'F': 'full',
+          'D': 'diff',
+          'I': 'inc',
+          }
+TYPES_COMPLETE_NAME = { 'F': 'full',
+                        'D': 'differential',
+                        'I': 'incremental',
+          }
+SCHED = { 'F': '40d:45d',
+          'D': '7d:10d',
+          'I': '22h:26h',
+          }
+MAX_JOBS = { 'F': 2,
+             'D': 1,
+             'I': 2,
+             'total': 12,
+           }
+
+parser = argparse.ArgumentParser()
+parser.add_argument("-d", "--db-connect-string", metavar="connect-string", dest="db",
+  help="Database connect string")
+parser.add_argument("-D", "--db-connect-string-file", metavar="FILE", dest="dbfile",
+  default=DSA_BACULA_DB_CONNECT,
+  help="File to read database connect string from (%s)"%(DSA_BACULA_DB_CONNECT,))
+parser.add_argument("-c", "--client-list", metavar="FILE", dest="clientlist",
+  default=DSA_CLIENT_LIST_FILE,
+  help="File with a list of all clients (%s)"%(DSA_CLIENT_LIST_FILE,))
+parser.add_argument("-v", "--verbose", dest="verbose",
+   action='count', default=0,
+  help="Be more verbose (repeat for more).")
+parser.add_argument("-n", "--nodo", dest="nodo",
+  default=False, action="store_true",
+  help="Print to cat rather than bconsole.")
+parser.add_argument("--min-age", dest="min_age", metavar="TIME",
+  default=MIN_AGE,
+  help="Do not consider running a backup for a client if we ran one within TIME (%s)"%(MIN_AGE,))
+parser.add_argument("--min-failed-timeout", dest="min_failed_timeout", metavar="TIME",
+  default=MIN_FAILED_TIMEOUT,
+  help="Do not consider running a backup for a client if it had a backup failed or aborted within TIME (%s)"%(MIN_FAILED_TIMEOUT,))
+for t in TYPESL:
+  parser.add_argument("--sched-%s"%(TYPES[t],), dest="sched_%s"%(t,), metavar="SCHED",
+    default=SCHED[t],
+    help="Run %s backups somewhen in this age window (%s)"%(TYPES[t], SCHED[t]))
+  parser.add_argument("--max-%s"%(TYPES[t],), dest="max_%s"%(t,), metavar="MAX",
+    default=MAX_JOBS[t], type=int,
+    help="Do not have more than MAX jobs run concurrently at level %s (%d) (can use free higher-level backup slots)"%(TYPES[t], MAX_JOBS[t]))
+parser.add_argument("--max-total", dest="max_total", metavar="MAX",
+  default=MAX_JOBS['total'], type=int,
+  help="Do not have more than MAX_TOTAL also counting created, blocked, etc (%s)"%(MAX_JOBS['total']))
+args = parser.parse_args()
+
+def setup_logger(args):
+  logger = logging.getLogger(NAME)
+  logger.setLevel(logging.DEBUG)
+
+  # log to stderr
+  stream_handler = logging.StreamHandler()
+  formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
+  stream_handler.setFormatter(formatter)
+
+  if args.verbose >= 2:
+    stream_handler.setLevel(logging.DEBUG)
+  elif args.verbose >= 1:
+    stream_handler.setLevel(logging.INFO)
+  else:
+    stream_handler.setLevel(logging.WARN)
+
+  logger.addHandler(stream_handler)
+
+  # log to syslog
+  syslog_handler = logging.handlers.SysLogHandler(address = SYSLOG_ADDRESS)
+  syslog_handler.setLevel(logging.INFO)
+  logger.addHandler(syslog_handler)
+
+  return logger
+
+logger = setup_logger(args)
+
+if args.db is not None:
+  pass
+elif args.dbfile is not None:
+  args.db = open(args.dbfile).read().rstrip()
+else:
+  logger.error("Need one of -d or -D.")
+  sys.exit(1)
+
+# parse schedules
+now = datetime.datetime.now()
+min_age = convert_timedelta(args.min_age)
+max_jobs = {}
+sched = {}
+for t in TYPESL:
+  spec = vars(args)['sched_%s'%(t,)]
+  try:
+    (low, high) = spec.split(':', 1)
+  except ValueError:
+    logger.error("Invalid schedule spec (%s)", spec)
+    sys.exit(1)
+
+  sched[t] = {
+    'lo': convert_timedelta(low),
+    'hi': convert_timedelta(high),
+    }
+  if sched[t]['hi'] <= sched[t]['lo']:
+    logger.error("Scheduling error in %s: low needs to be smaller than high (but we have %s and %s)", spec, low, high)
+    sys.exit(1)
+
+  max_jobs[t] = vars(args)['max_%s'%(t,)]
+max_jobs['total'] = args.max_total
+min_failed_timeout = convert_timedelta(args.min_failed_timeout)
+
+
+# Get list of clients
+if not os.path.exists(args.clientlist):
+  logger.error("File %s does not exist", args.clientlist)
+  sys.exit(1)
+clients = set(open(args.clientlist).read().split())
+
+
+conn = psycopg2.connect(args.db)
+cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+# get backups currently
+#  R  running
+#  C  created
+#  F | Waiting for Client
+#  S | Waiting for Storage daemon
+#  m | Waiting for new media
+#  M | Waiting for media mount
+#  s | Waiting for storage resource
+#  j | Waiting for job resource
+#  c | Waiting for client resource
+#  d | Waiting on maximum jobs
+#  t | Waiting on start time
+#  p | Waiting on higher priority jobs
+cursor.execute("""
+  SELECT name, level
+  FROM job
+  WHERE type='B' AND
+        jobstatus IN ('R')
+""", {})
+current_backups = {}
+for t in TYPESL:
+  current_backups[t] = set()
+for r in cursor.fetchall():
+  current_backups[r['level']].add(r['name'])
+
+cursor.execute("""
+  SELECT name, level
+  FROM job
+  WHERE type='B' AND
+        jobstatus IN ('C', 'F', 'S', 'm', 'M', 's', 'j', 'c', 'd', 't', 'p')
+""", {})
+pending_backups = {}
+for r in cursor.fetchall():
+  pending_backups[r['name']] = r['level']
+
+cursor.execute("""
+  SELECT name, MAX(starttime) AS starttime
+  FROM job
+  WHERE type='B' AND
+        jobstatus IN ('A', 'f')
+  GROUP BY name
+""", {})
+failed_backups = set()
+for r in cursor.fetchall():
+  if now - r['starttime'] < min_failed_timeout:
+    failed_backups.add(r['name'])
+
+
+for t in TYPESL:
+  logger.debug("Num jobs currently running %s: %d; max (including higher pri): %d", t, len(current_backups[t]), max_jobs[t])
+logger.debug("Num jobs currently pending (created/waiting/..): %d; max (including running): %d", len(pending_backups), max_jobs['total'])
+
+most_recent = {}
+# get most recent backups of each type for all clients
+cursor.execute("""
+  SELECT name, level, MAX(starttime) AS starttime
+  FROM job
+  WHERE type='B' AND
+        jobstatus='T'
+  GROUP BY name, level
+""", {})
+for r in cursor.fetchall():
+  if not r['name'] in most_recent:
+    most_recent[r['name']] = {}
+  most_recent[r['name']][r['level']] = r['starttime']
+
+candidates = {}
+for t in TYPESL:
+  candidates[t] = []
+
+for c in clients:
+  if c in failed_backups:
+    logger.debug("Not considering %s as backups failed or were aborted recently", c)
+    continue
+  if any([c in cb for cb in current_backups.values()]):
+    logger.debug("Not considering %s as a backup is currently running", c)
+    continue
+  if c in pending_backups:
+    logger.debug("Not considering %s as a backup is currently created/waiting on things", c)
+    continue
+  if c in most_recent:
+    recent = max(most_recent[c].values())
+    if recent >= now - min_age:
+      logger.debug("Not considering %s as a backup ran recently (%s)", c, recent)
+      #continue
+  recent = datetime.datetime.min
+  for t in TYPESL:
+    if c in most_recent and t in most_recent[c]:
+      recent = max(recent, most_recent[c][t])
+    logger.debug("Most recent backup for %s/%s ran %s", c, t, recent)
+    age = (now - recent)
+    frac = (age - sched[t]['lo'])/(sched[t]['hi'] - sched[t]['lo'])
+    if frac <= 0:
+      logger.debug("Most recent backup for %s/%s (%s) is new enough: %.2f < 0", c, t, recent, frac)
+      runme = False
+    elif frac >= 1:
+      logger.debug("Most recent backup for %s/%s (%s) is old: %.2f >= 1", c, t, recent, frac)
+      runme = True
+    else:
+      logger.debug("Most recent backup for %s/%s (%s) is so so: 0 <= %.2f <= 1", c, t, recent, frac)
+
+      # get a semi random value that is consistent across multiple calls.
+      # we hash the client name and the most recent backup time and use the first byte of the digest.
+      d = hashlib.sha256(("%s - %s - %s"%(c, t, recent)).encode()).digest()
+      prnd = d[0] / 256
+
+      runme = frac > prnd
+    if runme:
+      logger.debug("Considering running %s/%s", c, t)
+      candidates[t].append( {'client': c, 'level': t} )
+      break
+
+
+total_cnt = len(pending_backups) + sum([len(current_backups[t]) for t in TYPESL])
+total_free = max_jobs['total'] - total_cnt
+
+run_cnt = 0
+total_slots = 0
+cmd = []
+for t in TYPESL:
+  run_cnt += len(current_backups[t])
+  total_slots += max_jobs[t]
+  if len(candidates[t]) == 0:
+    logger.info("No candidates for %s(%s) jobs", t, TYPES_COMPLETE_NAME[t])
+    continue
+  free_slots = min(total_slots - run_cnt, total_free)
+  if free_slots <= 0:
+    logger.info("No free slots for %s(%s) jobs but %d candidate(s)", t, TYPES_COMPLETE_NAME[t], len(candidates[t]))
+    continue
+
+  logger.info("Have %d candidate(s) and %d free slots for %s(%s) jobs.", len(candidates[t]), free_slots, t, TYPES_COMPLETE_NAME[t])
+  picked = random.sample(candidates[t], min(free_slots, len(candidates[t])))
+  for p in picked:
+    logger.info("  Will run: %s/%s", p['client'], p['level'])
+
+  for p in picked:
+    cmd.append("run job=%s level=%s yes"%(p['client'], TYPES_COMPLETE_NAME[p['level']]))
+    total_free -= 1
+    run_cnt += 1
+
+if args.nodo:
+  print("\n".join(cmd))
+  sys.exit(0)
+
+if args.verbose:
+    for c in cmd:
+      print("Now going to run: %s"%(c,))
+
+if len(cmd) == 0:
+  sys.exit(0)
+
+p = subprocess.Popen(['/usr/sbin/bconsole'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+(out, err) = p.communicate("\n".join(cmd).encode())
+if p.returncode != 0:
+    raise Exception("bconsole failed.  stdout:\n%s\nstderr:%s\n"%(out, err))
+
+if args.verbose:
+    print("stdout:\n")
+    sys.stdout.buffer.write(out)
+    print("\n")
+
+if err != b"":
+  print("bconsole said on stderr:\n", file=sys.stderr)
+  sys.stderr.buffer.write(out)
+  print("", file=sys.stderr)
+  sys.exit(1)