3 # "manual" scheduler to spread out our backups better
5 # We want to smear out backups over time, subject to the following constraints:
6 # - no more than MAX_FULL full backups should run at any time,
7 # - no more than MAX_FULL+MAX_DIFF full+diff backups should run at any time,
8 # - no more than MAX_FULL+MAX_DIFF+MAX_INC full+diff+inc backups should run at any time,
9 # - no more than MAX_TOTAL jobs are running and already created at any time
11 # - do not schedule a backup if one failed or was aborted recently (MIN_FAILED_TIMEOUT)
12 # - do not schedule a backup if one for that host is already running
13 # - do not schedule a backup if one for that host is already pending/scheduled
14 # - do not schedule a backup if we had run one recently (MIN_AGE)
15 # - consider scheduling a {full,diff,inc} backup with propability
16 # o 0 if the last of that type (or higher) is younger than XXX,
17 # o 1 if the last of that type (or higher) is older than YYY,
18 # o linearly interpolated in between.
19 # For each scheduled backup considered, choose {full,diff,inc} up to the
20 # limits imposed by MAX_*.
22 # Copyright 2010, 2011, 2013, 2017, 2018 Peter Palfrader
24 # Permission is hereby granted, free of charge, to any person obtaining
25 # a copy of this software and associated documentation files (the
26 # "Software"), to deal in the Software without restriction, including
27 # without limitation the rights to use, copy, modify, merge, publish,
28 # distribute, sublicense, and/or sell copies of the Software, and to
29 # permit persons to whom the Software is furnished to do so, subject to
30 # the following conditions:
32 # The above copyright notice and this permission notice shall be
33 # included in all copies or substantial portions of the Software.
35 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
36 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
37 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
38 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
39 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
40 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
41 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
47 import logging.handlers
50 import psycopg2.extras
56 NAME = "dsa-bacula-scheduler"
57 SYSLOG_ADDRESS = '/dev/log'
59 DSA_BACULA_DB_CONNECT = '/etc/dsa/bacula-reader-database'
60 DSA_CLIENT_LIST_FILE = '/etc/bacula/dsa-clients'
62 def convert_timedelta(s, default_unit='h'):
63 m = re.match('([0-9]+)([smhdw])?$', s)
64 if m is None: raise ValueError
65 ticks = int(m.group(1))
67 if unit is None: unit = default_unit
70 elif unit == 'm': ticks *= 60
71 elif unit == 'h': ticks *= 60*60
72 elif unit == 'd': ticks *= 60*60*24
73 elif unit == 'w': ticks *= 60*60*24*7
74 else: raise ValueError
75 return datetime.timedelta(seconds = ticks)
78 MIN_FAILED_TIMEOUT = '6h'
79 TYPESL = ['F', 'D', 'I']
80 TYPES = { 'F': 'full',
84 TYPES_COMPLETE_NAME = { 'F': 'full',
88 SCHED = { 'F': '40d:45d',
98 parser = argparse.ArgumentParser()
99 parser.add_argument("-d", "--db-connect-string", metavar="connect-string", dest="db",
100 help="Database connect string")
101 parser.add_argument("-D", "--db-connect-string-file", metavar="FILE", dest="dbfile",
102 default=DSA_BACULA_DB_CONNECT,
103 help="File to read database connect string from (%s)"%(DSA_BACULA_DB_CONNECT,))
104 parser.add_argument("-c", "--client-list", metavar="FILE", dest="clientlist",
105 default=DSA_CLIENT_LIST_FILE,
106 help="File with a list of all clients (%s)"%(DSA_CLIENT_LIST_FILE,))
107 parser.add_argument("-v", "--verbose", dest="verbose",
108 action='count', default=0,
109 help="Be more verbose (repeat for more).")
110 parser.add_argument("-n", "--nodo", dest="nodo",
111 default=False, action="store_true",
112 help="Print to cat rather than bconsole.")
113 parser.add_argument("--min-age", dest="min_age", metavar="TIME",
115 help="Do not consider running a backup for a client if we ran one within TIME (%s)"%(MIN_AGE,))
116 parser.add_argument("--min-failed-timeout", dest="min_failed_timeout", metavar="TIME",
117 default=MIN_FAILED_TIMEOUT,
118 help="Do not consider running a backup for a client if it had a backup failed or aborted within TIME (%s)"%(MIN_FAILED_TIMEOUT,))
120 parser.add_argument("--sched-%s"%(TYPES[t],), dest="sched_%s"%(t,), metavar="SCHED",
122 help="Run %s backups somewhen in this age window (%s)"%(TYPES[t], SCHED[t]))
123 parser.add_argument("--max-%s"%(TYPES[t],), dest="max_%s"%(t,), metavar="MAX",
124 default=MAX_JOBS[t], type=int,
125 help="Do not have more than MAX jobs run concurrently at level %s (%d) (can use free higher-level backup slots)"%(TYPES[t], MAX_JOBS[t]))
126 parser.add_argument("--max-total", dest="max_total", metavar="MAX",
127 default=MAX_JOBS['total'], type=int,
128 help="Do not have more than MAX_TOTAL also counting created, blocked, etc (%s)"%(MAX_JOBS['total']))
129 args = parser.parse_args()
131 def setup_logger(args):
132 logger = logging.getLogger(NAME)
133 logger.setLevel(logging.DEBUG)
136 stream_handler = logging.StreamHandler()
137 formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
138 stream_handler.setFormatter(formatter)
140 if args.verbose >= 2:
141 stream_handler.setLevel(logging.DEBUG)
142 elif args.verbose >= 1:
143 stream_handler.setLevel(logging.INFO)
145 stream_handler.setLevel(logging.WARN)
147 logger.addHandler(stream_handler)
150 syslog_handler = logging.handlers.SysLogHandler(address = SYSLOG_ADDRESS)
151 syslog_handler.setLevel(logging.INFO)
152 logger.addHandler(syslog_handler)
156 logger = setup_logger(args)
158 if args.db is not None:
160 elif args.dbfile is not None:
161 args.db = open(args.dbfile).read().rstrip()
163 logger.error("Need one of -d or -D.")
167 now = datetime.datetime.now()
168 min_age = convert_timedelta(args.min_age)
172 spec = vars(args)['sched_%s'%(t,)]
174 (low, high) = spec.split(':', 1)
176 logger.error("Invalid schedule spec (%s)", spec)
180 'lo': convert_timedelta(low),
181 'hi': convert_timedelta(high),
183 if sched[t]['hi'] <= sched[t]['lo']:
184 logger.error("Scheduling error in %s: low needs to be smaller than high (but we have %s and %s)", spec, low, high)
187 max_jobs[t] = vars(args)['max_%s'%(t,)]
188 max_jobs['total'] = args.max_total
189 min_failed_timeout = convert_timedelta(args.min_failed_timeout)
192 # Get list of clients
193 if not os.path.exists(args.clientlist):
194 logger.error("File %s does not exist", args.clientlist)
196 clients = set(open(args.clientlist).read().split())
199 conn = psycopg2.connect(args.db)
200 cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
202 # get backups currently
205 # F | Waiting for Client
206 # S | Waiting for Storage daemon
207 # m | Waiting for new media
208 # M | Waiting for media mount
209 # s | Waiting for storage resource
210 # j | Waiting for job resource
211 # c | Waiting for client resource
212 # d | Waiting on maximum jobs
213 # t | Waiting on start time
214 # p | Waiting on higher priority jobs
223 current_backups[t] = set()
224 for r in cursor.fetchall():
225 current_backups[r['level']].add(r['name'])
231 jobstatus IN ('C', 'F', 'S', 'm', 'M', 's', 'j', 'c', 'd', 't', 'p')
234 for r in cursor.fetchall():
235 pending_backups[r['name']] = r['level']
238 SELECT name, MAX(starttime) AS starttime
241 jobstatus IN ('A', 'f')
244 failed_backups = set()
245 for r in cursor.fetchall():
246 if now - r['starttime'] < min_failed_timeout:
247 failed_backups.add(r['name'])
251 logger.debug("Num jobs currently running %s: %d; max (including higher pri): %d", t, len(current_backups[t]), max_jobs[t])
252 logger.debug("Num jobs currently pending (created/waiting/..): %d; max (including running): %d", len(pending_backups), max_jobs['total'])
255 # get most recent backups of each type for all clients
257 SELECT name, level, MAX(starttime) AS starttime
263 for r in cursor.fetchall():
264 if not r['name'] in most_recent:
265 most_recent[r['name']] = {}
266 most_recent[r['name']][r['level']] = r['starttime']
273 if c in failed_backups:
274 logger.debug("Not considering %s as backups failed or were aborted recently", c)
276 if any([c in cb for cb in current_backups.values()]):
277 logger.debug("Not considering %s as a backup is currently running", c)
279 if c in pending_backups:
280 logger.debug("Not considering %s as a backup is currently created/waiting on things", c)
283 recent = max(most_recent[c].values())
284 if recent >= now - min_age:
285 logger.debug("Not considering %s as a backup ran recently (%s)", c, recent)
287 recent = datetime.datetime.min
289 if c in most_recent and t in most_recent[c]:
290 recent = max(recent, most_recent[c][t])
291 logger.debug("Most recent backup for %s/%s ran %s", c, t, recent)
293 frac = (age - sched[t]['lo'])/(sched[t]['hi'] - sched[t]['lo'])
295 logger.debug("Most recent backup for %s/%s (%s) is new enough: %.2f < 0", c, t, recent, frac)
298 logger.debug("Most recent backup for %s/%s (%s) is old: %.2f >= 1", c, t, recent, frac)
301 logger.debug("Most recent backup for %s/%s (%s) is so so: 0 <= %.2f <= 1", c, t, recent, frac)
303 # get a semi random value that is consistent across multiple calls.
304 # we hash the client name and the most recent backup time and use the first byte of the digest.
305 d = hashlib.sha256(("%s - %s - %s"%(c, t, recent)).encode()).digest()
310 logger.debug("Considering running %s/%s", c, t)
311 candidates[t].append( {'client': c, 'level': t} )
315 total_cnt = len(pending_backups) + sum([len(current_backups[t]) for t in TYPESL])
316 total_free = max_jobs['total'] - total_cnt
322 run_cnt += len(current_backups[t])
323 total_slots += max_jobs[t]
324 if len(candidates[t]) == 0:
325 logger.info("No candidates for %s", t)
327 free_slots = min(total_slots - run_cnt, total_free)
329 logger.info("No free slots for %s but %d candidates", t, len(candidates[t]))
332 logger.info("Have %d candidates and %d free slots for %s.", len(candidates[t]), free_slots, t)
333 picked = random.sample(candidates[t], min(free_slots, len(candidates[t])))
335 logger.info(" Will run: %s/%s", p['client'], p['level'])
338 cmd.append("run job=%s level=%s yes"%(p['client'], TYPES_COMPLETE_NAME[p['level']]))
343 print("\n".join(cmd))
348 print("Now going to run: %s"%(c,))
353 p = subprocess.Popen(['/usr/sbin/bconsole'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
354 (out, err) = p.communicate("\n".join(cmd).encode())
355 if p.returncode != 0:
356 raise Exception("bconsole failed. stdout:\n%s\nstderr:%s\n"%(out, err))
360 sys.stdout.buffer.write(out)
364 print("bconsole said on stderr:\n", file=sys.stderr)
365 sys.stderr.buffer.write(out)
366 print("", file=sys.stderr)