move things from modules/roles/static* to modules/static*
[mirror/dsa-puppet.git] / modules / staticsync / files / static-master-run
1 #!/usr/bin/python
2
3 import fcntl
4 import os
5 import shutil
6 import subprocess
7 import string
8 import sys
9 import tempfile
10 import time
11
12 serialname = '.serial'
13 had_warnings = False
14
15 conffile = '/etc/staticsync.conf'
16 config={}
17
18 with open(conffile) as f:
19   for line in f:
20     line = line.rstrip()
21     if not line or line.startswith("#"): continue
22     (name, value) = line.split("=")
23     config[name] = value
24
25 for key in ('base',):
26   if not key in config:
27     raise Exception("Configuration element '%s' not found in config file %s"%(key, conffile))
28
29 allclients = set()
30 with open('/etc/static-clients.conf') as f:
31   for line in f:
32     line = line.strip()
33     if line == "": continue
34     if line.startswith('#'): continue
35     allclients.add(line)
36
37 def log(m):
38   t = time.strftime("[%Y-%m-%d %H:%M:%S]", time.gmtime())
39   print t, m
40
41 def stage1(pipes, status, clients):
42   for c in clients:
43     p = pipes[c]
44     while 1:
45       line = p.stdout.readline()
46       if line == '':
47         status[c] = 'failed'
48         p.stdout.close()
49         p.stdin.close()
50         p.wait()
51         log("%s: failed with returncode %d"%(c,p.returncode))
52         break
53
54       line = line.strip()
55       log("%s >> %s"%(c, line))
56       if not line.startswith('[MSM]'): continue
57       kw = string.split(line, ' ', 2)[1]
58
59       if kw == 'ALREADY-CURRENT':
60         pipes[c].stdout.close()
61         pipes[c].stdin.close()
62         p.wait()
63         if p.returncode == 0:
64           log("%s: already current"%(c,))
65           status[c] = 'ok'
66         else:
67           log("%s: said ALREADY-CURRENT but returncode %d"%(c,p.returncode))
68           status[c] = 'failed'
69         break
70       elif kw == 'STAGE1-DONE':
71         log("%s: waiting"%(c,))
72         status[c] = 'waiting'
73         break
74       elif kw in ['STAGE1-START']:
75         pass
76       else:
77         log("%s: ignoring unknown line"%(c,))
78
79 def count_statuses(status):
80   cnt = {}
81   for k in status:
82     v = status[k]
83     if v not in cnt: cnt[v] = 1
84     else: cnt[v] += 1
85   return cnt
86
87 def stage2(pipes, status, command, clients):
88   for c in clients:
89     if status[c] != 'waiting': continue
90     log("%s << %s"%(c, command))
91     pipes[c].stdin.write("%s\n"%(command,))
92
93   for c in clients:
94     if status[c] != 'waiting': continue
95     p = pipes[c]
96
97     (o, dummy) = p.communicate('')
98     for l in string.split(o, "\n"):
99       log("%s >> %s"%(c, l))
100     log("%s: returned %d"%(c, p.returncode))
101
102 def callout(component, serial, clients):
103   log("Calling clients...")
104   pipes = {}
105   status = {}
106   for c in clients:
107     args = ['ssh', '-o', 'BatchMode=yes', c, 'mirror', component, "%d"%(serial,)]
108     p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
109     pipes[c] = p
110     status[c] = 'in-progress'
111
112   log("Stage 1...")
113   stage1(pipes, status, clients)
114   log("Stage 1 done.")
115   cnt = count_statuses(status)
116
117   if 'failed' in cnt and cnt['failed'] >= 2:
118     log("%d clients failed, aborting..."%(cnt['failed'],))
119     stage2(pipes, status, 'abort', clients)
120     return False
121
122   failedmirrorsfile = os.path.join(config['base'], 'master', component + "-failedmirrors")
123   if 'failed' in cnt:
124     log("WARNING: %d clients failed!  Continuing anyway!"%(cnt['failed'],))
125     global had_warnings
126     had_warnings = True
127     f = open(failedmirrorsfile, "w")
128     for c in status:
129       if status[c] == 'failed': f.write(c+"\n")
130     f.close()
131   else:
132     if os.path.exists(failedmirrorsfile): os.unlink(failedmirrorsfile)
133
134   if 'waiting' in cnt:
135     log("Committing...")
136     stage2(pipes, status, 'go', clients)
137     return True
138   else:
139     log("All clients up to date.")
140     return True
141
142 def load_component_info(component):
143   with open('/etc/static-components.conf') as f:
144     for line in f:
145       if line.startswith('#'): continue
146       field = line.strip().split()
147       if len(field) < 4: continue
148       if field[1] != component: continue
149       meta = {}
150       meta['master'] = field[0]
151       meta['sourcehost'] = field[2]
152       meta['sourcedir'] = field[3]
153       meta['extrapushhosts'] = set(field[4].split(',')) if len(field) > 4 else set()
154       meta['extraignoreclients'] = set(field[5].split(',')) if len(field) > 5 else set()
155       return meta
156     else:
157       return None
158
159 cleanup_dirs = []
160 def run_mirror(component):
161   meta = load_component_info(component)
162   if meta is None:
163     log("Component %s not found."%(component,))
164     return False
165   clients = allclients - meta['extraignoreclients']
166
167   # setup
168   basemaster = os.path.join(config['base'], 'master')
169   componentdir = os.path.join(basemaster, component)
170   cur = componentdir + '-current-push'
171   live = componentdir + '-current-live'
172   tmpdir_new = tempfile.mkdtemp(prefix=component+'-live.new-', dir=basemaster); cleanup_dirs.append(tmpdir_new);
173   tmpdir_old = tempfile.mkdtemp(prefix=component+'-live.old-', dir=basemaster); cleanup_dirs.append(tmpdir_old);
174   os.chmod(tmpdir_new, 0755)
175
176   locks = []
177   lockfiles = [ os.path.join(basemaster, component + ".lock") ]
178   for p in lockfiles:
179     fd = os.open(p, os.O_RDONLY)
180     log("Acquiring lock for %s(%d)."%(p,fd))
181     fcntl.flock(fd, fcntl.LOCK_EX)
182     locks.append(fd)
183   log("All locks acquired.")
184
185   for p in (live, ):
186     if not os.path.exists(p): os.mkdir(p, 0755)
187
188   #for p in (componentdir, live, tmpdir_new):
189   #  if not os.path.exists(p): os.mkdir(p, 0755)
190   #  fd = os.open(p, os.O_RDONLY)
191   #  log("Acquiring lock for %s(%d)."%(p,fd))
192   #  fcntl.flock(fd, fcntl.LOCK_EX)
193   #  locks.append(fd)
194   #log("All locks acquired.")
195
196   serialfile = os.path.join(componentdir, serialname)
197   try:
198     with open(serialfile) as f: serial = int(f.read())
199   except:
200     serial = int(time.time())
201     with open(serialfile, "w") as f: f.write("%d\n"%(serial,))
202   log("Serial is %s."%(serial,))
203
204   log("Populating %s."%(tmpdir_new,))
205   subprocess.check_call(['cp', '-al', os.path.join(componentdir, '.'), tmpdir_new])
206
207   if os.path.exists(cur):
208     log("Removing existing %s."%(cur,))
209     shutil.rmtree(cur)
210
211   log("Renaming %s to %s."%(tmpdir_new, cur))
212   os.rename(tmpdir_new, cur)
213
214   proceed = callout(component, serial, clients)
215
216   if proceed:
217     log("Moving %s aside."%(live,))
218     os.rename(live, os.path.join(tmpdir_old, 'old'))
219     log("Renaming %s to %s."%(cur, live))
220     os.rename(cur, live)
221     log("Cleaning up.")
222     shutil.rmtree(tmpdir_old)
223     if had_warnings: log("Done, with warnings.")
224     else: log("Done.")
225     ret = True
226   else:
227     log("Aborted.")
228     ret = False
229
230   for fd in locks:
231     os.close(fd)
232
233   return ret
234
235
236 if len(sys.argv) != 2:
237   print >> sys.stderr, "Usage: %s <component>"%(sys.argv[0],)
238   sys.exit(1)
239 component = sys.argv[1]
240
241 ok = False
242 try:
243   ok = run_mirror(component)
244 finally:
245   for p in cleanup_dirs:
246     if os.path.exists(p): shutil.rmtree(p)
247
248 if not ok:
249   sys.exit(1)
250 # vim:set et:
251 # vim:set ts=2:
252 # vim:set shiftwidth=2: