11 base='/home/staticsync/static-master'
12 subdirs = { 'master': 'master', # where updates from off-site end up going, the source of everything we do here
13 'cur': 'current-push', # where clients rsync from during a mirror push
14 'live': 'current-live'} # what is currently on the mirrors, and what they rsync from when they come back from being down
15 serialname = '.serial'
18 with open('/home/staticsync/etc/static-clients') as f:
20 clients.append(line.strip())
23 t = time.strftime("[%Y-%m-%d %H:%M:%S]", time.gmtime())
26 def stage1(pipes, status):
30 line = p.stdout.readline()
36 log("%s: failed with returncode %d"%(c,p.returncode))
40 log("%s >> %s"%(c, line))
41 if not line.startswith('[MSM]'): continue
42 kw = string.split(line, ' ', 2)[1]
44 if kw == 'ALREADY-CURRENT':
45 pipes[c].stdout.close()
46 pipes[c].stdin.close()
49 log("%s: already current"%(c,))
52 log("%s: said ALREADY-CURRENT but returncode %d"%(c,p.returncode))
55 elif kw == 'STAGE1-DONE':
56 log("%s: waiting"%(c,))
59 elif kw in ['STAGE1-START']:
62 log("%s: ignoring unknown line"%(c,))
64 def count_statuses(status):
68 if v not in cnt: cnt[v] = 1
72 def stage2(pipes, status, command):
74 if status[c] != 'waiting': continue
75 log("%s << %s"%(c, command))
76 pipes[c].stdin.write("%s\n"%(command,))
79 if status[c] != 'waiting': continue
82 (o, dummy) = p.communicate('')
83 for l in string.split(o, "\n"):
84 log("%s >> %s"%(c, l))
85 log("%s: returned %d"%(c, p.returncode))
88 log("Calling clients...")
92 args = ['ssh', '-o', 'BatchMode=yes', c, 'mirror', "%d"%(serial,)]
93 p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
95 status[c] = 'in-progress'
100 cnt = count_statuses(status)
102 if 'failed' in cnt > 0:
103 log("Some clients failed, aborting...")
104 stage2(pipes, status, 'abort')
106 elif 'waiting' in cnt > 0:
108 stage2(pipes, status, 'go')
111 log("All clients up to date.")
118 master = os.path.join(base, subdirs['master'])
119 cur = os.path.join(base, subdirs['cur'])
120 live = os.path.join(base, subdirs['live'])
121 tmpdir_new = tempfile.mkdtemp(prefix='live.new-', dir=base); cleanup_dirs.append(tmpdir_new);
122 tmpdir_old = tempfile.mkdtemp(prefix='live.new-', dir=base); cleanup_dirs.append(tmpdir_old);
123 os.chmod(tmpdir_new, 0755)
126 for p in (master, live, tmpdir_new):
127 if not os.path.exists(p): os.mkdir(p, 0755)
128 fd = os.open(p, os.O_RDONLY)
129 log("Acquiring lock for %s(%d)."%(p,fd))
130 fcntl.flock(fd, fcntl.LOCK_EX)
132 log("All locks acquired.")
134 serialfile = os.path.join(master, serialname)
136 with open(serialfile) as f: serial = int(f.read())
138 serial = int(time.time())
139 with open(serialfile, "w") as f: f.write("%d\n"%(serial,))
140 log("Serial is %s."%(serial,))
142 log("Populating %s."%(tmpdir_new,))
143 subprocess.check_call(['cp', '-al', os.path.join(master, '.'), tmpdir_new])
145 if os.path.exists(cur):
146 log("Removing existing %s."%(cur,))
149 log("Renaming %s to %s."%(tmpdir_new, cur))
150 os.rename(tmpdir_new, cur)
152 proceed = callout(serial)
155 log("Moving %s aside."%(live,))
156 os.rename(live, os.path.join(tmpdir_old, 'old'))
157 log("Renaming %s to %s."%(cur, live))
160 shutil.rmtree(tmpdir_old)
169 for p in cleanup_dirs:
170 if os.path.exists(p): shutil.rmtree(p)
173 # vim:set shiftwidth=2: