#!/usr/bin/python import fcntl import os import shutil import subprocess import string import sys import tempfile import time serialname = '.serial' had_warnings = False conffile = '/etc/staticsync.conf' config={} with open(conffile) as f: for line in f: line = line.rstrip() if not line or line.startswith("#"): continue (name, value) = line.split("=") config[name] = value for key in ('base',): if not key in config: raise Exception("Configuration element '%s' not found in config file %s"%(key, conffile)) allclients = set() with open('/etc/static-clients.conf') as f: for line in f: line = line.strip() if line == "": continue if line.startswith('#'): continue allclients.add(line) def log(m): t = time.strftime("[%Y-%m-%d %H:%M:%S]", time.gmtime()) print t, m def stage1(pipes, status, clients): for c in clients: p = pipes[c] while 1: line = p.stdout.readline() if line == '': status[c] = 'failed' p.stdout.close() p.stdin.close() p.wait() log("%s: failed with returncode %d"%(c,p.returncode)) break line = line.strip() log("%s >> %s"%(c, line)) if not line.startswith('[MSM]'): continue kw = string.split(line, ' ', 2)[1] if kw == 'ALREADY-CURRENT': pipes[c].stdout.close() pipes[c].stdin.close() p.wait() if p.returncode == 0: log("%s: already current"%(c,)) status[c] = 'ok' else: log("%s: said ALREADY-CURRENT but returncode %d"%(c,p.returncode)) status[c] = 'failed' break elif kw == 'STAGE1-DONE': log("%s: waiting"%(c,)) status[c] = 'waiting' break elif kw in ['STAGE1-START']: pass else: log("%s: ignoring unknown line"%(c,)) def count_statuses(status): cnt = {} for k in status: v = status[k] if v not in cnt: cnt[v] = 1 else: cnt[v] += 1 return cnt def stage2(pipes, status, command, clients): for c in clients: if status[c] != 'waiting': continue log("%s << %s"%(c, command)) pipes[c].stdin.write("%s\n"%(command,)) for c in clients: if status[c] != 'waiting': continue p = pipes[c] (o, dummy) = p.communicate('') for l in string.split(o, "\n"): log("%s >> %s"%(c, l)) log("%s: returned %d"%(c, p.returncode)) def callout(component, serial, clients): log("Calling clients...") pipes = {} status = {} for c in clients: args = ['ssh', '-o', 'BatchMode=yes', c, 'mirror', component, "%d"%(serial,)] p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) pipes[c] = p status[c] = 'in-progress' log("Stage 1...") stage1(pipes, status, clients) log("Stage 1 done.") cnt = count_statuses(status) if 'failed' in cnt and cnt['failed'] >= 2: log("%d clients failed, aborting..."%(cnt['failed'],)) stage2(pipes, status, 'abort', clients) return False failedmirrorsfile = os.path.join(config['base'], 'master', component + "-failedmirrors") if 'failed' in cnt: log("WARNING: %d clients failed! Continuing anyway!"%(cnt['failed'],)) global had_warnings had_warnings = True f = open(failedmirrorsfile, "w") for c in status: if status[c] == 'failed': f.write(c+"\n") f.close() else: if os.path.exists(failedmirrorsfile): os.unlink(failedmirrorsfile) if 'waiting' in cnt: log("Committing...") stage2(pipes, status, 'go', clients) return True else: log("All clients up to date.") return True def load_component_info(component): with open('/etc/static-components.conf') as f: for line in f: if line.startswith('#'): continue field = line.strip().split() if len(field) < 4: continue if field[1] != component: continue meta = {} meta['master'] = field[0] meta['sourcehost'] = field[2] meta['sourcedir'] = field[3] meta['extrapushhosts'] = set(field[4].split(',')) if len(field) > 4 else set() meta['extraignoreclients'] = set(field[5].split(',')) if len(field) > 5 else set() return meta else: return None cleanup_dirs = [] def run_mirror(component): meta = load_component_info(component) if meta is None: log("Component %s not found."%(component,)) return False clients = allclients - meta['extraignoreclients'] # setup basemaster = os.path.join(config['base'], 'master') componentdir = os.path.join(basemaster, component) cur = componentdir + '-current-push' live = componentdir + '-current-live' tmpdir_new = tempfile.mkdtemp(prefix=component+'-live.new-', dir=basemaster); cleanup_dirs.append(tmpdir_new); tmpdir_old = tempfile.mkdtemp(prefix=component+'-live.old-', dir=basemaster); cleanup_dirs.append(tmpdir_old); os.chmod(tmpdir_new, 0755) locks = [] lockfiles = [ os.path.join(basemaster, component + ".lock") ] for p in lockfiles: fd = os.open(p, os.O_RDONLY) log("Acquiring lock for %s(%d)."%(p,fd)) fcntl.flock(fd, fcntl.LOCK_EX) locks.append(fd) log("All locks acquired.") for p in (live, ): if not os.path.exists(p): os.mkdir(p, 0755) #for p in (componentdir, live, tmpdir_new): # if not os.path.exists(p): os.mkdir(p, 0755) # fd = os.open(p, os.O_RDONLY) # log("Acquiring lock for %s(%d)."%(p,fd)) # fcntl.flock(fd, fcntl.LOCK_EX) # locks.append(fd) #log("All locks acquired.") serialfile = os.path.join(componentdir, serialname) try: with open(serialfile) as f: serial = int(f.read()) except: serial = int(time.time()) with open(serialfile, "w") as f: f.write("%d\n"%(serial,)) log("Serial is %s."%(serial,)) log("Populating %s."%(tmpdir_new,)) subprocess.check_call(['cp', '-al', os.path.join(componentdir, '.'), tmpdir_new]) if os.path.exists(cur): log("Removing existing %s."%(cur,)) shutil.rmtree(cur) log("Renaming %s to %s."%(tmpdir_new, cur)) os.rename(tmpdir_new, cur) proceed = callout(component, serial, clients) if proceed: log("Moving %s aside."%(live,)) os.rename(live, os.path.join(tmpdir_old, 'old')) log("Renaming %s to %s."%(cur, live)) os.rename(cur, live) log("Cleaning up.") shutil.rmtree(tmpdir_old) if had_warnings: log("Done, with warnings.") else: log("Done.") ret = True else: log("Aborted.") ret = False for fd in locks: os.close(fd) return ret if len(sys.argv) != 2: print >> sys.stderr, "Usage: %s "%(sys.argv[0],) sys.exit(1) component = sys.argv[1] ok = False try: ok = run_mirror(component) finally: for p in cleanup_dirs: if os.path.exists(p): shutil.rmtree(p) if not ok: sys.exit(1) # vim:set et: # vim:set ts=2: # vim:set shiftwidth=2: