Revert "massage log messages"
[mirror/dsa-puppet.git] / modules / bacula / files / dsa-bacula-scheduler
1 #!/usr/bin/python3
2
3 # "manual" scheduler to spread out our backups better
4
5 # We want to smear out backups over time, subject to the following constraints:
6 #  - no more than MAX_FULL full backups should run at any time,
7 #  - no more than MAX_FULL+MAX_DIFF full+diff backups should run at any time,
8 #  - no more than MAX_FULL+MAX_DIFF+MAX_INC full+diff+inc backups should run at any time,
9 #  - no more than MAX_TOTAL jobs are running and already created at any time
10 #  - for each client:
11 #    - do not schedule a backup if one failed or was aborted recently (MIN_FAILED_TIMEOUT)
12 #    - do not schedule a backup if one for that host is already running
13 #    - do not schedule a backup if one for that host is already pending/scheduled
14 #    - do not schedule a backup if we had run one recently (MIN_AGE)
15 #    - consider scheduling a {full,diff,inc} backup with propability
16 #      o 0  if the last of that type (or higher) is younger than XXX,
17 #      o 1  if the last of that type (or higher) is older than YYY,
18 #      o linearly interpolated in between.
19 # For each scheduled backup considered, choose {full,diff,inc} up to the
20 # limits imposed by MAX_*.
21
22 # Copyright 2010, 2011, 2013, 2017, 2018 Peter Palfrader
23 #
24 # Permission is hereby granted, free of charge, to any person obtaining
25 # a copy of this software and associated documentation files (the
26 # "Software"), to deal in the Software without restriction, including
27 # without limitation the rights to use, copy, modify, merge, publish,
28 # distribute, sublicense, and/or sell copies of the Software, and to
29 # permit persons to whom the Software is furnished to do so, subject to
30 # the following conditions:
31 #
32 # The above copyright notice and this permission notice shall be
33 # included in all copies or substantial portions of the Software.
34 #
35 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
36 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
37 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
38 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
39 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
40 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
41 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
42
43 import argparse
44 import datetime
45 import hashlib
46 import logging
47 import logging.handlers
48 import os.path
49 import psycopg2
50 import psycopg2.extras
51 import random
52 import re
53 import subprocess
54 import sys
55
56 NAME = "dsa-bacula-scheduler"
57 SYSLOG_ADDRESS = '/dev/log'
58
59 DSA_BACULA_DB_CONNECT = '/etc/dsa/bacula-reader-database'
60 DSA_CLIENT_LIST_FILE = '/etc/bacula/dsa-clients'
61
62 def convert_timedelta(s, default_unit='h'):
63   m = re.match('([0-9]+)([smhdw])?$', s)
64   if m is None: raise ValueError
65   ticks = int(m.group(1))
66   unit = m.group(2)
67   if unit is None: unit = default_unit
68
69   if unit == 's': None
70   elif unit == 'm': ticks *= 60
71   elif unit == 'h': ticks *= 60*60
72   elif unit == 'd': ticks *= 60*60*24
73   elif unit == 'w': ticks *= 60*60*24*7
74   else: raise ValueError
75   return datetime.timedelta(seconds = ticks)
76
77 MIN_AGE = '20h'
78 MIN_FAILED_TIMEOUT = '6h'
79 TYPESL = ['F', 'D', 'I']
80 TYPES = { 'F': 'full',
81           'D': 'diff',
82           'I': 'inc',
83           }
84 TYPES_COMPLETE_NAME = { 'F': 'full',
85                         'D': 'differential',
86                         'I': 'incremental',
87           }
88 SCHED = { 'F': '40d:45d',
89           'D': '7d:10d',
90           'I': '22h:26h',
91           }
92 MAX_JOBS = { 'F': 2,
93              'D': 1,
94              'I': 2,
95              'total': 12,
96            }
97
98 parser = argparse.ArgumentParser()
99 parser.add_argument("-d", "--db-connect-string", metavar="connect-string", dest="db",
100   help="Database connect string")
101 parser.add_argument("-D", "--db-connect-string-file", metavar="FILE", dest="dbfile",
102   default=DSA_BACULA_DB_CONNECT,
103   help="File to read database connect string from (%s)"%(DSA_BACULA_DB_CONNECT,))
104 parser.add_argument("-c", "--client-list", metavar="FILE", dest="clientlist",
105   default=DSA_CLIENT_LIST_FILE,
106   help="File with a list of all clients (%s)"%(DSA_CLIENT_LIST_FILE,))
107 parser.add_argument("-v", "--verbose", dest="verbose",
108    action='count', default=0,
109   help="Be more verbose (repeat for more).")
110 parser.add_argument("-n", "--nodo", dest="nodo",
111   default=False, action="store_true",
112   help="Print to cat rather than bconsole.")
113 parser.add_argument("--min-age", dest="min_age", metavar="TIME",
114   default=MIN_AGE,
115   help="Do not consider running a backup for a client if we ran one within TIME (%s)"%(MIN_AGE,))
116 parser.add_argument("--min-failed-timeout", dest="min_failed_timeout", metavar="TIME",
117   default=MIN_FAILED_TIMEOUT,
118   help="Do not consider running a backup for a client if it had a backup failed or aborted within TIME (%s)"%(MIN_FAILED_TIMEOUT,))
119 for t in TYPESL:
120   parser.add_argument("--sched-%s"%(TYPES[t],), dest="sched_%s"%(t,), metavar="SCHED",
121     default=SCHED[t],
122     help="Run %s backups somewhen in this age window (%s)"%(TYPES[t], SCHED[t]))
123   parser.add_argument("--max-%s"%(TYPES[t],), dest="max_%s"%(t,), metavar="MAX",
124     default=MAX_JOBS[t], type=int,
125     help="Do not have more than MAX jobs run concurrently at level %s (%d) (can use free higher-level backup slots)"%(TYPES[t], MAX_JOBS[t]))
126 parser.add_argument("--max-total", dest="max_total", metavar="MAX",
127   default=MAX_JOBS['total'], type=int,
128   help="Do not have more than MAX_TOTAL also counting created, blocked, etc (%s)"%(MAX_JOBS['total']))
129 args = parser.parse_args()
130
131 def setup_logger(args):
132   logger = logging.getLogger(NAME)
133   logger.setLevel(logging.DEBUG)
134
135   # log to stderr
136   stream_handler = logging.StreamHandler()
137   formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
138   stream_handler.setFormatter(formatter)
139
140   if args.verbose >= 2:
141     stream_handler.setLevel(logging.DEBUG)
142   elif args.verbose >= 1:
143     stream_handler.setLevel(logging.INFO)
144   else:
145     stream_handler.setLevel(logging.WARN)
146
147   logger.addHandler(stream_handler)
148
149   # log to syslog
150   syslog_handler = logging.handlers.SysLogHandler(address = SYSLOG_ADDRESS)
151   syslog_handler.setLevel(logging.INFO)
152   logger.addHandler(syslog_handler)
153
154   return logger
155
156 logger = setup_logger(args)
157
158 if args.db is not None:
159   pass
160 elif args.dbfile is not None:
161   args.db = open(args.dbfile).read().rstrip()
162 else:
163   logger.error("Need one of -d or -D.")
164   sys.exit(1)
165
166 # parse schedules
167 now = datetime.datetime.now()
168 min_age = convert_timedelta(args.min_age)
169 max_jobs = {}
170 sched = {}
171 for t in TYPESL:
172   spec = vars(args)['sched_%s'%(t,)]
173   try:
174     (low, high) = spec.split(':', 1)
175   except ValueError:
176     logger.error("Invalid schedule spec (%s)", spec)
177     sys.exit(1)
178
179   sched[t] = {
180     'lo': convert_timedelta(low),
181     'hi': convert_timedelta(high),
182     }
183   if sched[t]['hi'] <= sched[t]['lo']:
184     logger.error("Scheduling error in %s: low needs to be smaller than high (but we have %s and %s)", spec, low, high)
185     sys.exit(1)
186
187   max_jobs[t] = vars(args)['max_%s'%(t,)]
188 max_jobs['total'] = args.max_total
189 min_failed_timeout = convert_timedelta(args.min_failed_timeout)
190
191
192 # Get list of clients
193 if not os.path.exists(args.clientlist):
194   logger.error("File %s does not exist", args.clientlist)
195   sys.exit(1)
196 clients = set(open(args.clientlist).read().split())
197
198
199 conn = psycopg2.connect(args.db)
200 cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
201
202 # get backups currently
203 #  R  running
204 #  C  created
205 #  F | Waiting for Client
206 #  S | Waiting for Storage daemon
207 #  m | Waiting for new media
208 #  M | Waiting for media mount
209 #  s | Waiting for storage resource
210 #  j | Waiting for job resource
211 #  c | Waiting for client resource
212 #  d | Waiting on maximum jobs
213 #  t | Waiting on start time
214 #  p | Waiting on higher priority jobs
215 cursor.execute("""
216   SELECT name, level
217   FROM job
218   WHERE type='B' AND
219         jobstatus IN ('R')
220 """, {})
221 current_backups = {}
222 for t in TYPESL:
223   current_backups[t] = set()
224 for r in cursor.fetchall():
225   current_backups[r['level']].add(r['name'])
226
227 cursor.execute("""
228   SELECT name, level
229   FROM job
230   WHERE type='B' AND
231         jobstatus IN ('C', 'F', 'S', 'm', 'M', 's', 'j', 'c', 'd', 't', 'p')
232 """, {})
233 pending_backups = {}
234 for r in cursor.fetchall():
235   pending_backups[r['name']] = r['level']
236
237 cursor.execute("""
238   SELECT name, MAX(starttime) AS starttime
239   FROM job
240   WHERE type='B' AND
241         jobstatus IN ('A', 'f')
242   GROUP BY name
243 """, {})
244 failed_backups = set()
245 for r in cursor.fetchall():
246   if now - r['starttime'] < min_failed_timeout:
247     failed_backups.add(r['name'])
248
249
250 for t in TYPESL:
251   logger.debug("Num jobs currently running %s: %d; max (including higher pri): %d", t, len(current_backups[t]), max_jobs[t])
252 logger.debug("Num jobs currently pending (created/waiting/..): %d; max (including running): %d", len(pending_backups), max_jobs['total'])
253
254 most_recent = {}
255 # get most recent backups of each type for all clients
256 cursor.execute("""
257   SELECT name, level, MAX(starttime) AS starttime
258   FROM job
259   WHERE type='B' AND
260         jobstatus='T'
261   GROUP BY name, level
262 """, {})
263 for r in cursor.fetchall():
264   if not r['name'] in most_recent:
265     most_recent[r['name']] = {}
266   most_recent[r['name']][r['level']] = r['starttime']
267
268 candidates = {}
269 for t in TYPESL:
270   candidates[t] = []
271
272 for c in clients:
273   if c in failed_backups:
274     logger.debug("Not considering %s as backups failed or were aborted recently", c)
275     continue
276   if any([c in cb for cb in current_backups.values()]):
277     logger.debug("Not considering %s as a backup is currently running", c)
278     continue
279   if c in pending_backups:
280     logger.debug("Not considering %s as a backup is currently created/waiting on things", c)
281     continue
282   if c in most_recent:
283     recent = max(most_recent[c].values())
284     if recent >= now - min_age:
285       logger.debug("Not considering %s as a backup ran recently (%s)", c, recent)
286       #continue
287   recent = datetime.datetime.min
288   for t in TYPESL:
289     if c in most_recent and t in most_recent[c]:
290       recent = max(recent, most_recent[c][t])
291     logger.debug("Most recent backup for %s/%s ran %s", c, t, recent)
292     age = (now - recent)
293     frac = (age - sched[t]['lo'])/(sched[t]['hi'] - sched[t]['lo'])
294     if frac <= 0:
295       logger.debug("Most recent backup for %s/%s (%s) is new enough: %.2f < 0", c, t, recent, frac)
296       runme = False
297     elif frac >= 1:
298       logger.debug("Most recent backup for %s/%s (%s) is old: %.2f >= 1", c, t, recent, frac)
299       runme = True
300     else:
301       logger.debug("Most recent backup for %s/%s (%s) is so so: 0 <= %.2f <= 1", c, t, recent, frac)
302
303       # get a semi random value that is consistent across multiple calls.
304       # we hash the client name and the most recent backup time and use the first byte of the digest.
305       d = hashlib.sha256(("%s - %s - %s"%(c, t, recent)).encode()).digest()
306       prnd = d[0] / 256
307
308       runme = frac > prnd
309     if runme:
310       logger.debug("Considering running %s/%s", c, t)
311       candidates[t].append( {'client': c, 'level': t} )
312       break
313
314
315 total_cnt = len(pending_backups) + sum([len(current_backups[t]) for t in TYPESL])
316 total_free = max_jobs['total'] - total_cnt
317
318 run_cnt = 0
319 total_slots = 0
320 cmd = []
321 for t in TYPESL:
322   run_cnt += len(current_backups[t])
323   total_slots += max_jobs[t]
324   if len(candidates[t]) == 0:
325     logger.info("No candidates for %s jobs", TYPES_COMPLETE_NAME[t])
326     continue
327   free_slots = min(total_slots - run_cnt, total_free)
328   if free_slots <= 0:
329     logger.info("No free slots for %s jobs but %d candidate(s)", TYPES_COMPLETE_NAME[t], len(candidates[t]))
330     continue
331
332   logger.info("Have %d candidate(s) and %d free slots for %s jobs.", len(candidates[t]), free_slots, TYPES_COMPLETE_NAME[t])
333   picked = random.sample(candidates[t], min(free_slots, len(candidates[t])))
334   for p in picked:
335     logger.info("  Will run: %s/%s", p['client'], p['level'])
336
337   for p in picked:
338     cmd.append("run job=%s level=%s yes"%(p['client'], TYPES_COMPLETE_NAME[p['level']]))
339     total_free -= 1
340     run_cnt += 1
341
342 if args.nodo:
343   print("\n".join(cmd))
344   sys.exit(0)
345
346 if args.verbose:
347     for c in cmd:
348       print("Now going to run: %s"%(c,))
349
350 if len(cmd) == 0:
351   sys.exit(0)
352
353 p = subprocess.Popen(['/usr/sbin/bconsole'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
354 (out, err) = p.communicate("\n".join(cmd).encode())
355 if p.returncode != 0:
356     raise Exception("bconsole failed.  stdout:\n%s\nstderr:%s\n"%(out, err))
357
358 if args.verbose:
359     print("stdout:\n")
360     sys.stdout.buffer.write(out)
361     print("\n")
362
363 if err != b"":
364   print("bconsole said on stderr:\n", file=sys.stderr)
365   sys.stderr.buffer.write(out)
366   print("", file=sys.stderr)
367   sys.exit(1)