File scripts/builder.php changed (mode: 100644) (index 30dc40b..05761ec) |
... |
... |
require_once($INC . '/mime.inc.php'); |
33 |
33 |
if ($rg_builder_port == 0) |
if ($rg_builder_port == 0) |
34 |
34 |
exit(0); |
exit(0); |
35 |
35 |
|
|
|
36 |
|
function rg_clean_job(int $jid) |
|
37 |
|
{ |
|
38 |
|
global $state_dir; |
|
39 |
|
global $jobs; |
|
40 |
|
|
|
41 |
|
rg_log($jid . ': removing .ser file and unsetting jobs[jid]'); |
|
42 |
|
@unlink($state_dir . '/job-' . $jid . '.ser'); |
|
43 |
|
unset($jobs[$jid]); |
|
44 |
|
} |
|
45 |
|
|
36 |
46 |
function job_save($job) |
function job_save($job) |
37 |
47 |
{ |
{ |
38 |
48 |
global $state_dir; |
global $state_dir; |
|
... |
... |
function worker_active_jobs($worker_id) |
89 |
99 |
{ |
{ |
90 |
100 |
global $jobs; |
global $jobs; |
91 |
101 |
|
|
92 |
|
$ret = 0; |
|
|
102 |
|
$ret = array('no_of_jobs' => 0, 'jobs' => array()); |
93 |
103 |
foreach ($jobs as $jid => $job) { |
foreach ($jobs as $jid => $job) { |
94 |
104 |
if (!isset($job['worker_id'])) { |
if (!isset($job['worker_id'])) { |
95 |
105 |
rg_log($jid . ': DEBUG: worker_id is not defined!'); |
rg_log($jid . ': DEBUG: worker_id is not defined!'); |
96 |
106 |
continue; |
continue; |
97 |
107 |
} |
} |
98 |
108 |
|
|
99 |
|
if ($job['worker_id'] == $worker_id) |
|
100 |
|
$ret++; |
|
|
109 |
|
if (array_key_exists('delete_me', $job)) { |
|
110 |
|
rg_clean_job($jid); |
|
111 |
|
continue; |
|
112 |
|
} |
|
113 |
|
|
|
114 |
|
if ($job['worker_id'] == $worker_id) { |
|
115 |
|
$ret['jobs'][] = $jid; |
|
116 |
|
$ret['no_of_jobs']++; |
|
117 |
|
} |
101 |
118 |
} |
} |
102 |
119 |
|
|
103 |
120 |
return $ret; |
return $ret; |
|
... |
... |
function xdispatch_one($key, $data) |
175 |
192 |
global $features; |
global $features; |
176 |
193 |
global $state_dir; |
global $state_dir; |
177 |
194 |
|
|
|
195 |
|
rg_log_enter('xdispatch_one: key=' . $key); |
|
196 |
|
|
178 |
197 |
$now = time(); |
$now = time(); |
179 |
198 |
$s = &$rg_conns[$key]; |
$s = &$rg_conns[$key]; |
180 |
199 |
//rg_log_debug('data=' . $data); |
//rg_log_debug('data=' . $data); |
|
... |
... |
function xdispatch_one($key, $data) |
184 |
203 |
if ($u === NULL) { |
if ($u === NULL) { |
185 |
204 |
$m = 'cannot decode JSON: ' . json_last_error_msg(); |
$m = 'cannot decode JSON: ' . json_last_error_msg(); |
186 |
205 |
$err = array('errstr' => $m); |
$err = array('errstr' => $m); |
187 |
|
rg_log_ml($key . ': data=[' . $data . ']: ' . $m); |
|
|
206 |
|
rg_log_ml('data=[' . $data . ']: ' . $m); |
188 |
207 |
rg_conn_enq($key, json_encode($err) . "\n"); |
rg_conn_enq($key, json_encode($err) . "\n"); |
189 |
208 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
190 |
209 |
break; |
break; |
|
... |
... |
function xdispatch_one($key, $data) |
194 |
213 |
|
|
195 |
214 |
if (!isset($u['op'])) { |
if (!isset($u['op'])) { |
196 |
215 |
$err = array('errstr' => 'op parameter is missing'); |
$err = array('errstr' => 'op parameter is missing'); |
197 |
|
rg_log_ml($key . ': DEBUG: op parameter is missing; u: ' . rg_array2string($u)); |
|
|
216 |
|
rg_log_ml('op parameter is missing; u: ' . rg_array2string($u)); |
198 |
217 |
rg_conn_enq($key, json_encode($err) . "\n"); |
rg_conn_enq($key, json_encode($err) . "\n"); |
199 |
218 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
200 |
219 |
break; |
break; |
|
... |
... |
function xdispatch_one($key, $data) |
203 |
222 |
$op = $u['op']; unset($u['op']); |
$op = $u['op']; unset($u['op']); |
204 |
223 |
$ignore_for_log = array('pong' => 1, 'WORKER_STATS' => 1); |
$ignore_for_log = array('pong' => 1, 'WORKER_STATS' => 1); |
205 |
224 |
if (!array_key_exists($op, $ignore_for_log)) |
if (!array_key_exists($op, $ignore_for_log)) |
206 |
|
rg_log_debug($key . ': op=[' . $op . ']'); |
|
|
225 |
|
rg_log('op=[' . $op . ']'); |
207 |
226 |
|
|
208 |
227 |
if (strcmp($op, 'features') == 0) { |
if (strcmp($op, 'features') == 0) { |
209 |
228 |
$rg_conns[$key]['feat'] = $u['features']; |
$rg_conns[$key]['feat'] = $u['features']; |
|
... |
... |
function xdispatch_one($key, $data) |
217 |
236 |
} |
} |
218 |
237 |
|
|
219 |
238 |
if (strcmp($op, 'ANN') == 0) { |
if (strcmp($op, 'ANN') == 0) { |
220 |
|
rg_log($key . ': ANN received'); |
|
|
239 |
|
rg_log('ANN received'); |
221 |
240 |
|
|
222 |
241 |
if (($u['boot_time'] < $now - 30) || ($u['boot_time'] > $now + 30)) { |
if (($u['boot_time'] < $now - 30) || ($u['boot_time'] > $now + 30)) { |
223 |
242 |
$err = array( |
$err = array( |
224 |
243 |
'op' => $op, |
'op' => $op, |
225 |
244 |
'errstr' => 'boot_time is too old; time desync or replay attack?' |
'errstr' => 'boot_time is too old; time desync or replay attack?' |
226 |
245 |
); |
); |
227 |
|
rg_log($key . ': boot_time is too old; abort'); |
|
|
246 |
|
rg_log('boot_time is too old; abort'); |
228 |
247 |
rg_conn_enq($key, json_encode($err) . "\n"); |
rg_conn_enq($key, json_encode($err) . "\n"); |
229 |
248 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
230 |
249 |
break; |
break; |
|
... |
... |
function xdispatch_one($key, $data) |
236 |
255 |
if (strcasecmp($u['type'], 'global') == 0) { |
if (strcasecmp($u['type'], 'global') == 0) { |
237 |
256 |
$worker_uid = 0; |
$worker_uid = 0; |
238 |
257 |
} else if (!isset($u['user'])) { |
} else if (!isset($u['user'])) { |
239 |
|
rg_log($key . ': user field is not present; abort'); |
|
|
258 |
|
rg_log('user field is not present; abort'); |
240 |
259 |
$err = array('op' => $op, 'errstr' => 'user not defined in conf file'); |
$err = array('op' => $op, 'errstr' => 'user not defined in conf file'); |
241 |
260 |
rg_conn_enq($key, json_encode($err) . "\n"); |
rg_conn_enq($key, json_encode($err) . "\n"); |
242 |
261 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
|
... |
... |
function xdispatch_one($key, $data) |
244 |
263 |
} else { |
} else { |
245 |
264 |
$w_ui = rg_user_info($s['db'], 0, $u['user'], ''); |
$w_ui = rg_user_info($s['db'], 0, $u['user'], ''); |
246 |
265 |
if ($w_ui['exists'] !== 1) { |
if ($w_ui['exists'] !== 1) { |
247 |
|
rg_log($key . ': invalid user; abort'); |
|
|
266 |
|
rg_log('invalid user; abort'); |
248 |
267 |
$err = array('op' => $op, 'errstr' => 'invalid user'); |
$err = array('op' => $op, 'errstr' => 'invalid user'); |
249 |
268 |
rg_conn_enq($key, json_encode($err) . "\n"); |
rg_conn_enq($key, json_encode($err) . "\n"); |
250 |
269 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
|
... |
... |
function xdispatch_one($key, $data) |
257 |
276 |
// Check if worker is registered |
// Check if worker is registered |
258 |
277 |
$wi = rg_worker_find_by_name($s['db'], $worker_uid, $u['name']); |
$wi = rg_worker_find_by_name($s['db'], $worker_uid, $u['name']); |
259 |
278 |
if ($wi === -1) { |
if ($wi === -1) { |
260 |
|
rg_log($key . ': cannot load worker info: ' |
|
|
279 |
|
rg_log('cannot load worker info: ' |
261 |
280 |
. rg_worker_error() . '; abort'); |
. rg_worker_error() . '; abort'); |
262 |
281 |
$err = array('op' => $op, 'errstr' => 'internal error'); |
$err = array('op' => $op, 'errstr' => 'internal error'); |
263 |
282 |
rg_conn_enq($key, json_encode($err) . "\n"); |
rg_conn_enq($key, json_encode($err) . "\n"); |
|
... |
... |
function xdispatch_one($key, $data) |
265 |
284 |
break; |
break; |
266 |
285 |
} |
} |
267 |
286 |
if ($wi === 0) { |
if ($wi === 0) { |
268 |
|
rg_log($key . ': name [' . $u['name'] . '] not found; abort'); |
|
|
287 |
|
rg_log('name [' . $u['name'] . '] not found; abort'); |
269 |
288 |
$err = array('op' >= $op, |
$err = array('op' >= $op, |
270 |
289 |
'errstr' => 'builder name not found, add it' |
'errstr' => 'builder name not found, add it' |
271 |
290 |
. ' in the web interface'); |
. ' in the web interface'); |
|
... |
... |
function xdispatch_one($key, $data) |
276 |
295 |
|
|
277 |
296 |
$sign = hash_hmac('sha512', $u['boot_time'], $wi['key']); |
$sign = hash_hmac('sha512', $u['boot_time'], $wi['key']); |
278 |
297 |
if (strcmp($sign, $u['sign']) != 0) { |
if (strcmp($sign, $u['sign']) != 0) { |
279 |
|
rg_log($key . ': signature is not ok [' . $sign . ']' |
|
|
298 |
|
rg_log('signature is not ok [' . $sign . ']' |
280 |
299 |
. ' != [' . $u['sign'] . ']'); |
. ' != [' . $u['sign'] . ']'); |
281 |
300 |
$err = array('op' => $op, 'errstr' => 'wrong signature'); |
$err = array('op' => $op, 'errstr' => 'wrong signature'); |
282 |
301 |
rg_conn_enq($key, json_encode($err) . "\n"); |
rg_conn_enq($key, json_encode($err) . "\n"); |
|
... |
... |
function xdispatch_one($key, $data) |
300 |
319 |
|
|
301 |
320 |
$r = rg_worker_update($s['db'], $worker_uid, $wi['id'], $a); |
$r = rg_worker_update($s['db'], $worker_uid, $wi['id'], $a); |
302 |
321 |
if ($r !== TRUE) { |
if ($r !== TRUE) { |
303 |
|
rg_log($key . ': cannot update worker: ' . rg_worker_error()); |
|
|
322 |
|
rg_log('cannot update worker: ' . rg_worker_error()); |
304 |
323 |
$err = array('op' => $op, 'errstr' => rg_worker_error()); |
$err = array('op' => $op, 'errstr' => rg_worker_error()); |
305 |
324 |
rg_conn_enq($key, json_encode($err) . "\n"); |
rg_conn_enq($key, json_encode($err) . "\n"); |
306 |
325 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
307 |
326 |
break; |
break; |
308 |
327 |
} |
} |
309 |
328 |
|
|
310 |
|
rg_log($key . ': peer [' . $u['name'] . '] announce processed.'); |
|
|
329 |
|
rg_log('peer [' . $u['name'] . '] announce processed.'); |
311 |
330 |
break; |
break; |
312 |
331 |
} |
} |
313 |
332 |
|
|
314 |
333 |
if ($s['auth'] != 1) { |
if ($s['auth'] != 1) { |
315 |
|
rg_log($key . ': Client not authenticated!'); |
|
|
334 |
|
rg_log('Client not authenticated!'); |
316 |
335 |
$err = array('op' => $op, 'errstr' => 'client not authenticated'); |
$err = array('op' => $op, 'errstr' => 'client not authenticated'); |
317 |
336 |
rg_conn_enq($key, json_encode($err) . "\n"); |
rg_conn_enq($key, json_encode($err) . "\n"); |
318 |
337 |
break; |
break; |
|
... |
... |
function xdispatch_one($key, $data) |
328 |
347 |
// The rest of the commands need a job id |
// The rest of the commands need a job id |
329 |
348 |
if (!isset($u['id'])) { |
if (!isset($u['id'])) { |
330 |
349 |
$m = 'protocol error: field \'id\' not found'; |
$m = 'protocol error: field \'id\' not found'; |
331 |
|
rg_log($key . ': ' . $m); |
|
|
350 |
|
rg_log($m); |
332 |
351 |
$err = array('op' => $op, 'errstr' => $m); |
$err = array('op' => $op, 'errstr' => $m); |
333 |
352 |
rg_conn_enq($key, @json_encode($err) . "\n"); |
rg_conn_enq($key, @json_encode($err) . "\n"); |
334 |
353 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
335 |
354 |
break; |
break; |
336 |
355 |
} |
} |
337 |
356 |
$jid = $u['id']; |
$jid = $u['id']; |
|
357 |
|
rg_log('jid=' . $jid); |
338 |
358 |
|
|
339 |
359 |
if (!isset($jobs[$jid])) { |
if (!isset($jobs[$jid])) { |
340 |
|
rg_log($jid . ': job not found. Probably an old one. Sending DRE.'); |
|
|
360 |
|
rg_log('Job not found. Probably an old one. Sending DRE.'); |
341 |
361 |
$a = array('op' => 'DRE', 'id' => $jid); |
$a = array('op' => 'DRE', 'id' => $jid); |
342 |
362 |
rg_conn_enq($key, @json_encode($a) . "\n"); |
rg_conn_enq($key, @json_encode($a) . "\n"); |
343 |
363 |
break; |
break; |
|
... |
... |
function xdispatch_one($key, $data) |
346 |
366 |
|
|
347 |
367 |
// security check |
// security check |
348 |
368 |
if (!isset($job['worker_id'])) |
if (!isset($job['worker_id'])) |
349 |
|
rg_log_ml($jid . ': DEBUG: NO WORKER_ID: job: ' . rg_array2string($job)); |
|
|
369 |
|
rg_log_ml('NO WORKER_ID: job: ' . rg_array2string($job)); |
350 |
370 |
if (!isset($job['worker_id']) |
if (!isset($job['worker_id']) |
351 |
371 |
|| ($job['worker_id'] != $s['worker_id'])) { |
|| ($job['worker_id'] != $s['worker_id'])) { |
352 |
372 |
$m = 'job not associated with worker'; |
$m = 'job not associated with worker'; |
353 |
|
rg_log($key . ': ' . $jid . ': error: ' . $m); |
|
354 |
|
rg_log($key . ': ' . $jid . ': job[worker_id]=' |
|
|
373 |
|
rg_log('error: ' . $m); |
|
374 |
|
rg_log('job[worker_id]=' |
355 |
375 |
. (isset($job['worker_id']) ? $job['worker_id'] : '-')); |
. (isset($job['worker_id']) ? $job['worker_id'] : '-')); |
356 |
|
rg_log($key . ': ' . $jid . ': s[worker_id]=' . $s['worker_id']); |
|
|
376 |
|
rg_log('s[worker_id]=' . $s['worker_id']); |
357 |
377 |
$err = array('op' => 'abort_job', 'jid' => $jid, 'errstr' => $m); |
$err = array('op' => 'abort_job', 'jid' => $jid, 'errstr' => $m); |
358 |
378 |
rg_conn_enq($key, json_encode($err) . "\n"); |
rg_conn_enq($key, json_encode($err) . "\n"); |
359 |
379 |
break; |
break; |
|
... |
... |
function xdispatch_one($key, $data) |
361 |
381 |
|
|
362 |
382 |
if (strcmp($op, 'rg_notify') == 0) { |
if (strcmp($op, 'rg_notify') == 0) { |
363 |
383 |
$m = isset($u['m']) ? $u['m'] : 'MISSING'; |
$m = isset($u['m']) ? $u['m'] : 'MISSING'; |
364 |
|
rg_log($key . ': ' . $jid . ': rg_notify: ' . $m); |
|
|
384 |
|
rg_log('rg_notify: ' . $m); |
365 |
385 |
break; |
break; |
366 |
386 |
} |
} |
367 |
387 |
|
|
368 |
388 |
if (strcmp($op, 'vm_stats') == 0) { |
if (strcmp($op, 'vm_stats') == 0) { |
369 |
|
//rg_log_debug($key . ': ' . $jid . ': stats: ' . rg_array2string_short($u)); |
|
|
389 |
|
//rg_log($key . ': ' . $jid . ': stats: ' . rg_array2string_short($u)); |
370 |
390 |
rg_build_stats_add($s['db'], $jid, $u['stats']); |
rg_build_stats_add($s['db'], $jid, $u['stats']); |
371 |
391 |
break; |
break; |
372 |
392 |
} |
} |
|
... |
... |
function xdispatch_one($key, $data) |
374 |
394 |
if (strcmp($op, 'STA') == 0) { |
if (strcmp($op, 'STA') == 0) { |
375 |
395 |
$job['worker_started'] = $now; |
$job['worker_started'] = $now; |
376 |
396 |
job_save($job); |
job_save($job); |
377 |
|
rg_log($key . ': ' . $jid . ': worker started work on job'); |
|
|
397 |
|
rg_log('worker started work on job'); |
378 |
398 |
break; |
break; |
379 |
399 |
} |
} |
380 |
400 |
|
|
381 |
401 |
if (strcmp($op, 'DON') == 0) { |
if (strcmp($op, 'DON') == 0) { |
382 |
|
rg_log_ml($key . ': ' . $jid . ': DEBUG: DON u:' . rg_array2string($u)); |
|
|
402 |
|
rg_log_ml('DON u:' . rg_array2string($u)); |
|
403 |
|
|
|
404 |
|
$send_confirmation = TRUE; // to worker |
|
405 |
|
|
|
406 |
|
if (!isset($job['tries'])) |
|
407 |
|
$job['tries'] = 0; |
|
408 |
|
$job['tries']++; |
|
409 |
|
|
|
410 |
|
$retry = TRUE; |
|
411 |
|
if (empty($u['error'])) { |
|
412 |
|
$retry = FALSE; |
|
413 |
|
} else { |
|
414 |
|
rg_log('job tries: ' . $job['tries']); |
|
415 |
|
if ($job['tries'] > 3) |
|
416 |
|
$retry = FALSE; |
|
417 |
|
} |
|
418 |
|
|
|
419 |
|
$build_error = ''; |
|
420 |
|
if (array_key_exists('build_error', $u)) |
|
421 |
|
$build_error = $u['build_error']; |
|
422 |
|
rg_log_ml('build_error=[' . $build_error . ']'); |
|
423 |
|
if (!empty($build_error)) |
|
424 |
|
$retry = FALSE; |
383 |
425 |
|
|
384 |
|
$send_confirmation = TRUE; |
|
385 |
426 |
if (!empty($u['error'])) { |
if (!empty($u['error'])) { |
386 |
|
// TODO: we need to distinguish between fatal errors and transient errors |
|
387 |
|
if (!isset($job['tries'])) |
|
388 |
|
$job['tries'] = 0; |
|
389 |
|
$job['tries']++; |
|
390 |
|
rg_log($key . ': ' . $jid . ': job tries: ' . $job['tries']); |
|
391 |
|
|
|
392 |
|
rg_log($key . ': ' . $jid . ': job failed with error: ' . $u['error']); |
|
393 |
|
rg_log($key . ': ' . $jid . ': job failed with error2: ' . $u['error2']); |
|
|
427 |
|
rg_log('job failed with error: ' . $u['error']); |
|
428 |
|
rg_log('job failed with error2: ' . $u['error2']); |
|
429 |
|
} |
|
430 |
|
|
|
431 |
|
if ($retry) { |
394 |
432 |
// Delay job and retry (on another worker) |
// Delay job and retry (on another worker) |
395 |
433 |
$job['next_try'] = $now + $job['tries'] * 600; |
$job['next_try'] = $now + $job['tries'] * 600; |
396 |
434 |
$k = $s['worker_id']; |
$k = $s['worker_id']; |
397 |
435 |
$job['avoid'][$k] = time() + 12 * 3600; |
$job['avoid'][$k] = time() + 12 * 3600; |
398 |
436 |
$job['worker_id'] = 0; |
$job['worker_id'] = 0; |
399 |
437 |
job_save($job); |
job_save($job); |
|
438 |
|
rg_log('We will retry the job at' |
|
439 |
|
. ' ' . date('Y-m-d H:i:s', $job['next_try']) |
|
440 |
|
. ' and we will avoid worker [' . $s['worker_name'] . ']'); |
400 |
441 |
} else { |
} else { |
401 |
442 |
$r = rg_builder_done($s['db'], $job, $u['status']); |
$r = rg_builder_done($s['db'], $job, $u['status']); |
402 |
|
if ($r === TRUE) { |
|
403 |
|
@unlink($state_dir . '/job-' . $jid . '.ser'); |
|
404 |
|
unset($jobs[$jid]); |
|
405 |
|
unset($s['artifacts'][$jid]); |
|
406 |
|
} else { |
|
|
443 |
|
if ($r['ok'] === 1) |
|
444 |
|
$job['delete_me'] = 1; |
|
445 |
|
else |
407 |
446 |
$send_confirmation = FALSE; // we expect the client to send it again |
$send_confirmation = FALSE; // we expect the client to send it again |
408 |
|
} |
|
409 |
447 |
} |
} |
410 |
448 |
|
|
411 |
449 |
if ($send_confirmation) { |
if ($send_confirmation) { |
412 |
450 |
// Send DoneREceived - so client will delete the job |
// Send DoneREceived - so client will delete the job |
413 |
451 |
$a = array('op' => 'DRE', 'id' => $jid); |
$a = array('op' => 'DRE', 'id' => $jid); |
414 |
|
rg_log_ml($key . ': ' . $jid . ': DEBUG: Sending DRE: ' . rg_array2string($a)); |
|
|
452 |
|
rg_log_ml('Sending DRE: ' . rg_array2string($a)); |
415 |
453 |
rg_conn_enq($key, @json_encode($a) . "\n"); |
rg_conn_enq($key, @json_encode($a) . "\n"); |
416 |
454 |
} |
} |
417 |
455 |
break; |
break; |
418 |
456 |
} |
} |
419 |
457 |
|
|
420 |
458 |
if (strcmp($op, 'artifact_upload_info') == 0) { |
if (strcmp($op, 'artifact_upload_info') == 0) { |
421 |
|
rg_log_ml($key . ': ' . $jid . ': ' . $op . ': u: ' . rg_array2string($u)); |
|
|
459 |
|
rg_log_ml($op . ': u: ' . rg_array2string($u)); |
422 |
460 |
if (!isset($job['artifacts'])) |
if (!isset($job['artifacts'])) |
423 |
461 |
$job['artifacts'] = array(); |
$job['artifacts'] = array(); |
424 |
462 |
|
|
|
... |
... |
function xdispatch_one($key, $data) |
443 |
481 |
} |
} |
444 |
482 |
$aid = $u['aid']; |
$aid = $u['aid']; |
445 |
483 |
$base['aid'] = $aid; |
$base['aid'] = $aid; |
|
484 |
|
rg_log('aid=' . $aid); |
446 |
485 |
|
|
447 |
|
rg_log_ml($key . ': ' . $jid . ': ' . $aid . ': ' |
|
448 |
|
. $op . ': u: ' . rg_array2string($u)); |
|
|
486 |
|
rg_log_ml($op . ': u: ' . rg_array2string($u)); |
449 |
487 |
|
|
450 |
488 |
// TODO: Move this where the job is prepared - where?! |
// TODO: Move this where the job is prepared - where?! |
451 |
489 |
if (!isset($job['repo_path'])) |
if (!isset($job['repo_path'])) |
|
... |
... |
function xdispatch_one($key, $data) |
461 |
499 |
|
|
462 |
500 |
if (strcmp($op, 'artifact_upload_status') == 0) { |
if (strcmp($op, 'artifact_upload_status') == 0) { |
463 |
501 |
$base['status'] = isset($job['artifacts'][$aid]['done']) ? $job['artifacts'][$aid]['done'] : 0; |
$base['status'] = isset($job['artifacts'][$aid]['done']) ? $job['artifacts'][$aid]['done'] : 0; |
464 |
|
rg_log_ml($key . ': ' . $jid . ': ' . ': ' . $aid . ': ' . $op . ': sending status ' . $base['status']); |
|
|
502 |
|
rg_log_ml($op . ': sending status ' . $base['status']); |
465 |
503 |
rg_conn_enq($key, @json_encode($base) . "\n"); |
rg_conn_enq($key, @json_encode($base) . "\n"); |
466 |
504 |
break; |
break; |
467 |
505 |
} |
} |
|
... |
... |
function xdispatch_one($key, $data) |
479 |
517 |
$s['artifacts'][$jid][$aid]['fd'] = @fopen($job['artifacts'][$aid]['tpath'], 'wb'); |
$s['artifacts'][$jid][$aid]['fd'] = @fopen($job['artifacts'][$aid]['tpath'], 'wb'); |
480 |
518 |
if ($s['artifacts'][$jid][$aid]['fd'] === FALSE) { |
if ($s['artifacts'][$jid][$aid]['fd'] === FALSE) { |
481 |
519 |
$m = 'cannot open temp artifacts file: ' . rg_php_err(); |
$m = 'cannot open temp artifacts file: ' . rg_php_err(); |
482 |
|
rg_log($key . ': ' . $jid . ': error: ' . $m); |
|
|
520 |
|
rg_log('error: ' . $m); |
483 |
521 |
break; |
break; |
484 |
522 |
} |
} |
485 |
523 |
|
|
|
... |
... |
function xdispatch_one($key, $data) |
492 |
530 |
if (strncmp($op, 'artifact_upload_', 16) == 0) { |
if (strncmp($op, 'artifact_upload_', 16) == 0) { |
493 |
531 |
if (!isset($job['artifacts'][$aid])) { |
if (!isset($job['artifacts'][$aid])) { |
494 |
532 |
$m = 'aid not found'; |
$m = 'aid not found'; |
495 |
|
rg_log($key . ': ' . $jid . ': error: ' . $m); |
|
|
533 |
|
rg_log('error: ' . $m); |
496 |
534 |
$base['errstr'] = $m; |
$base['errstr'] = $m; |
497 |
535 |
rg_conn_enq($key, json_encode($base) . "\n"); |
rg_conn_enq($key, json_encode($base) . "\n"); |
498 |
536 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
|
... |
... |
function xdispatch_one($key, $data) |
501 |
539 |
|
|
502 |
540 |
if ($s['artifacts'][$jid][$aid]['ready_to_write'] != 1) { |
if ($s['artifacts'][$jid][$aid]['ready_to_write'] != 1) { |
503 |
541 |
$m = 'artifact chunk/done op before start'; |
$m = 'artifact chunk/done op before start'; |
504 |
|
rg_log($key . ': ' . $jid . ': error: ' . $m); |
|
|
542 |
|
rg_log('error: ' . $m); |
505 |
543 |
$base['errstr'] = $m; |
$base['errstr'] = $m; |
506 |
544 |
rg_conn_enq($key, json_encode($base) . "\n"); |
rg_conn_enq($key, json_encode($base) . "\n"); |
507 |
545 |
rg_conn_shutdown($key, 2); // TODO: we should not close the connection so aggressively (everyehere) |
rg_conn_shutdown($key, 2); // TODO: we should not close the connection so aggressively (everyehere) |
|
... |
... |
function xdispatch_one($key, $data) |
510 |
548 |
} |
} |
511 |
549 |
|
|
512 |
550 |
if (strcmp($op, 'artifact_upload_chunk') == 0) { |
if (strcmp($op, 'artifact_upload_chunk') == 0) { |
513 |
|
//rg_log($key . ': ' . $jid . ': ' . $aid |
|
514 |
|
// . ': DEBUG: seeking to ' . $u['pos'] . '...'); |
|
|
551 |
|
//rg_log('DEBUG: seeking to ' . $u['pos'] . '...'); |
515 |
552 |
$r = @fseek($s['artifacts'][$jid][$aid]['fd'], $u['pos'], SEEK_SET); |
$r = @fseek($s['artifacts'][$jid][$aid]['fd'], $u['pos'], SEEK_SET); |
516 |
553 |
if ($r === -1) { |
if ($r === -1) { |
517 |
554 |
$m = 'cannot seek: ' . rg_php_err(); |
$m = 'cannot seek: ' . rg_php_err(); |
518 |
|
rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); |
|
|
555 |
|
rg_log('error: ' . $m); |
519 |
556 |
$base['errstr'] = $m; |
$base['errstr'] = $m; |
520 |
557 |
rg_conn_enq($key, json_encode($base) . "\n"); |
rg_conn_enq($key, json_encode($base) . "\n"); |
521 |
558 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
|
... |
... |
function xdispatch_one($key, $data) |
533 |
570 |
$r = @fclose($s['artifacts'][$jid][$aid]['fd']); |
$r = @fclose($s['artifacts'][$jid][$aid]['fd']); |
534 |
571 |
if ($r === FALSE) { |
if ($r === FALSE) { |
535 |
572 |
$m = 'cannot close file: ' . rg_php_err(); |
$m = 'cannot close file: ' . rg_php_err(); |
536 |
|
rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); |
|
|
573 |
|
rg_log('error: ' . $m); |
537 |
574 |
$base['errstr'] = $m; |
$base['errstr'] = $m; |
538 |
575 |
rg_conn_enq($key, json_encode($base) . "\n"); |
rg_conn_enq($key, json_encode($base) . "\n"); |
539 |
576 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
|
... |
... |
function xdispatch_one($key, $data) |
546 |
583 |
$r = @mkdir($adir, 0770, TRUE); |
$r = @mkdir($adir, 0770, TRUE); |
547 |
584 |
if ($r === FALSE) { |
if ($r === FALSE) { |
548 |
585 |
$m = 'cannot create artifacts dir: ' . rg_php_err(); |
$m = 'cannot create artifacts dir: ' . rg_php_err(); |
549 |
|
rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); |
|
|
586 |
|
rg_log('error: ' . $m); |
550 |
587 |
$base['errstr'] = $m; |
$base['errstr'] = $m; |
551 |
588 |
rg_conn_enq($key, json_encode($base) . "\n"); |
rg_conn_enq($key, json_encode($base) . "\n"); |
552 |
589 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
553 |
590 |
} |
} |
554 |
591 |
} |
} |
555 |
|
rg_log_ml($key . ': ' . $jid . ': ' . $aid |
|
556 |
|
. ': DEBUG: adir=' . $adir); |
|
|
592 |
|
rg_log_ml('DEBUG: adir=' . $adir); |
557 |
593 |
|
|
558 |
594 |
// Prepare the replace of special strings |
// Prepare the replace of special strings |
559 |
595 |
$a = array(); $b = array(); |
$a = array(); $b = array(); |
|
... |
... |
function xdispatch_one($key, $data) |
596 |
632 |
$meta['map_into_source'][] = $path; |
$meta['map_into_source'][] = $path; |
597 |
633 |
} |
} |
598 |
634 |
} else { |
} else { |
599 |
|
rg_log_debug('para does not have map_into_source: ' . rg_array2string($para)); |
|
|
635 |
|
rg_log_debug(' para does not have map_into_source: ' |
|
636 |
|
. rg_array2string($para)); |
600 |
637 |
} |
} |
601 |
638 |
|
|
602 |
639 |
foreach ($para['map'] as $path) { |
foreach ($para['map'] as $path) { |
603 |
|
rg_log_ml($key . ': ' . $jid . ': ' . $aid . ': DEBUG map ' |
|
|
640 |
|
rg_log_ml('DEBUG map ' |
604 |
641 |
. $job['artifacts'][$aid]['tpath'] . ' to ' |
. $job['artifacts'][$aid]['tpath'] . ' to ' |
605 |
642 |
. $path); |
. $path); |
606 |
643 |
|
|
|
... |
... |
function xdispatch_one($key, $data) |
612 |
649 |
|
|
613 |
650 |
if (strstr($path, '..')) { |
if (strstr($path, '..')) { |
614 |
651 |
$m = 'path [' . $path . '] is trying to escape the artifacts dir; ignore it'; |
$m = 'path [' . $path . '] is trying to escape the artifacts dir; ignore it'; |
615 |
|
rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); |
|
|
652 |
|
rg_log('error: ' . $m); |
616 |
653 |
continue; |
continue; |
617 |
654 |
} |
} |
618 |
655 |
|
|
619 |
656 |
$apath = $adir . '/' . $path; |
$apath = $adir . '/' . $path; |
620 |
|
rg_log($key . ': ' . $jid . ': ' . $aid |
|
621 |
|
. ': DEBUG: apath=' . $apath); |
|
|
657 |
|
rg_log('DEBUG: apath=' . $apath); |
622 |
658 |
$d = dirname($apath); |
$d = dirname($apath); |
623 |
659 |
if (!is_dir($d)) { |
if (!is_dir($d)) { |
624 |
660 |
$r = @mkdir($d, 0770, TRUE); |
$r = @mkdir($d, 0770, TRUE); |
625 |
661 |
if ($r === FALSE) { |
if ($r === FALSE) { |
626 |
662 |
$m = 'cannot create artifacts dir: ' . rg_php_err(); |
$m = 'cannot create artifacts dir: ' . rg_php_err(); |
627 |
|
rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); |
|
|
663 |
|
rg_log('error: ' . $m); |
628 |
664 |
$base['errstr'] = $m; |
$base['errstr'] = $m; |
629 |
665 |
rg_conn_enq($key, json_encode($base) . "\n"); |
rg_conn_enq($key, json_encode($base) . "\n"); |
630 |
666 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
|
... |
... |
function xdispatch_one($key, $data) |
637 |
673 |
if ($r === FALSE) { |
if ($r === FALSE) { |
638 |
674 |
$m = 'cannot link file ' . $job['artifacts'][$aid]['tpath'] |
$m = 'cannot link file ' . $job['artifacts'][$aid]['tpath'] |
639 |
675 |
. ' to ' . $apath . '.tmp: ' . rg_php_err(); |
. ' to ' . $apath . '.tmp: ' . rg_php_err(); |
640 |
|
rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); |
|
|
676 |
|
rg_log('error: ' . $m); |
641 |
677 |
$base['errstr'] = $m; |
$base['errstr'] = $m; |
642 |
678 |
rg_conn_enq($key, json_encode($base) . "\n"); |
rg_conn_enq($key, json_encode($base) . "\n"); |
643 |
679 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
|
... |
... |
function xdispatch_one($key, $data) |
648 |
684 |
$r = @rename($apath . '.tmp', $apath); |
$r = @rename($apath . '.tmp', $apath); |
649 |
685 |
if ($r === FALSE) { |
if ($r === FALSE) { |
650 |
686 |
$m = 'cannot rename: ' . rg_php_err(); |
$m = 'cannot rename: ' . rg_php_err(); |
651 |
|
rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); |
|
|
687 |
|
rg_log('error: ' . $m); |
652 |
688 |
$base['errstr'] = $m; |
$base['errstr'] = $m; |
653 |
689 |
rg_conn_enq($key, json_encode($base) . "\n"); |
rg_conn_enq($key, json_encode($base) . "\n"); |
654 |
690 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
|
... |
... |
function xdispatch_one($key, $data) |
657 |
693 |
} |
} |
658 |
694 |
$meta['upload_ts'] = time(); |
$meta['upload_ts'] = time(); |
659 |
695 |
|
|
660 |
|
rg_log_ml($key . ': ' . $jid . ': ' . $aid . ': meta: ' . rg_array2string($meta)); |
|
|
696 |
|
rg_log_ml('meta: ' . rg_array2string($meta)); |
661 |
697 |
$r = @file_put_contents($apath . '.rg.meta', |
$r = @file_put_contents($apath . '.rg.meta', |
662 |
698 |
rg_serialize($meta)); |
rg_serialize($meta)); |
663 |
699 |
if ($r === FALSE) { |
if ($r === FALSE) { |
664 |
700 |
$m = 'cannot save metadata: ' . rg_php_err(); |
$m = 'cannot save metadata: ' . rg_php_err(); |
665 |
|
rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); |
|
|
701 |
|
rg_log('error: ' . $m); |
666 |
702 |
$base['errstr'] = $m; |
$base['errstr'] = $m; |
667 |
703 |
rg_conn_enq($key, @json_encode($base) . "\n"); |
rg_conn_enq($key, @json_encode($base) . "\n"); |
668 |
704 |
rg_conn_shutdown($key, 2); |
rg_conn_shutdown($key, 2); |
|
... |
... |
function xdispatch_one($key, $data) |
683 |
719 |
break; |
break; |
684 |
720 |
} |
} |
685 |
721 |
|
|
686 |
|
rg_log_ml($key . ': Unknown operation [' . $op . ']! u: ' . rg_array2string($u)); |
|
|
722 |
|
rg_log_ml('Unknown operation [' . $op . ']! u: ' . rg_array2string($u)); |
687 |
723 |
$err = array('op' => $op, 'errstr' => 'unknown operation'); |
$err = array('op' => $op, 'errstr' => 'unknown operation'); |
688 |
724 |
rg_conn_enq($key, json_encode($err) . "\n"); |
rg_conn_enq($key, json_encode($err) . "\n"); |
689 |
725 |
break; |
break; |
690 |
726 |
} |
} |
691 |
|
unset($job); |
|
692 |
727 |
|
|
693 |
|
//rg_log_debug('s: ' . rg_array2string($s)); |
|
694 |
|
unset($s); |
|
|
728 |
|
rg_log_exit(); |
695 |
729 |
} |
} |
696 |
730 |
|
|
697 |
731 |
/* |
/* |
|
... |
... |
function rg_process_job($db, &$job) |
734 |
768 |
global $rg_conns; |
global $rg_conns; |
735 |
769 |
|
|
736 |
770 |
$jid = $job['id']; |
$jid = $job['id']; |
737 |
|
|
|
738 |
|
// Job is already in progress? |
|
739 |
|
if ($job['worker_id'] > 0) { |
|
740 |
|
//rg_log($jid . ': job is already assigned to' |
|
741 |
|
// . ' worker [' . $job['worker_name'] . ']' |
|
742 |
|
// . ' (id ' . $job['worker_id'] . '); skip it'); |
|
743 |
|
return TRUE; |
|
744 |
|
} |
|
745 |
|
|
|
746 |
|
// Should we delay because of a previous fail? |
|
747 |
|
if (isset($job['next_try']) && ($job['next_try'] > time())) { |
|
748 |
|
//rg_log_debug($jid . ': job is suspended till ' |
|
749 |
|
// . date('Y-m-d H:i:s', $job['next_try']) |
|
750 |
|
// . ' because [' . $job['next_try_reason'] . ']'); |
|
751 |
|
return TRUE; |
|
752 |
|
} |
|
753 |
|
|
|
754 |
|
rg_log_ml($jid . ': processing job...'); |
|
755 |
|
|
|
756 |
|
// 'request' is the new way |
|
757 |
|
if (!isset($job['request'])) |
|
758 |
|
$req = $job; |
|
759 |
|
else |
|
760 |
|
$req = $job['request']; |
|
761 |
|
|
|
762 |
|
// Get the worker list, so we can sort it |
|
763 |
|
$workers_list = rg_worker_list_all($db, $req['uid']); |
|
764 |
|
if ($workers_list === FALSE) { |
|
765 |
|
rg_log($jid . ': cannot load workers list: ' . rg_worker_error()); |
|
766 |
|
$job['next_try'] = time() + 30; |
|
767 |
|
return FALSE; |
|
768 |
|
} |
|
769 |
|
//rg_log_debug('workers list: ' . rg_array2string($workers_list)); |
|
770 |
|
|
|
771 |
|
// Trying to find a worker in the list of connections |
|
772 |
|
$reason = array(); |
|
773 |
|
$delta = 30; // how many seconds to postpone the job |
|
774 |
|
$total_time_allowed = 0; // unlimited |
|
775 |
|
foreach ($rg_conns as $key => &$i) { |
|
776 |
|
if (strcmp($key, 'master') == 0) |
|
777 |
|
continue; |
|
778 |
|
|
|
779 |
|
if (!isset($i['ann'])) { |
|
780 |
|
rg_log($key . ': ' . $jid . ': conn has no announce.'); |
|
781 |
|
// TODO: close after some time? |
|
782 |
|
$reason[] = 'no_announce'; |
|
783 |
|
continue; |
|
|
771 |
|
rg_log_enter('rg_process_job: jid=' . $jid); |
|
772 |
|
|
|
773 |
|
$ret = TRUE; |
|
774 |
|
do { |
|
775 |
|
// Job is already in progress? |
|
776 |
|
if ($job['worker_id'] > 0) { |
|
777 |
|
//rg_log($jid . ': job is already assigned to' |
|
778 |
|
// . ' worker [' . $job['worker_name'] . ']' |
|
779 |
|
// . ' (id ' . $job['worker_id'] . '); skip it'); |
|
780 |
|
$ret = TRUE; |
|
781 |
|
break; |
784 |
782 |
} |
} |
785 |
783 |
|
|
786 |
|
if (empty($i['ann']['env'])) { |
|
787 |
|
//rg_log($key . ': ' . $jid . ': conn has no environments.'); |
|
788 |
|
$reason[] = 'no env'; |
|
789 |
|
continue; |
|
|
784 |
|
// Should we delay because of a previous fail? |
|
785 |
|
if (isset($job['next_try']) && ($job['next_try'] > time())) { |
|
786 |
|
//rg_log_debug($jid . ': job is suspended till ' |
|
787 |
|
// . date('Y-m-d H:i:s', $job['next_try']) |
|
788 |
|
// . ' because [' . $job['next_try_reason'] . ']'); |
|
789 |
|
$ret = TRUE; |
|
790 |
|
break; |
790 |
791 |
} |
} |
791 |
792 |
|
|
792 |
|
if (($i['worker_uid'] > 0) && ($i['worker_uid'] != $req['uid'])) { |
|
793 |
|
rg_log($key . ': ' . $jid . ': uids do not match, try next'); |
|
794 |
|
$reason[] = 'uids_mismatch'; |
|
795 |
|
continue; |
|
|
793 |
|
// 'request' is the new way |
|
794 |
|
if (!isset($job['request'])) |
|
795 |
|
$req = $job; |
|
796 |
|
else |
|
797 |
|
$req = $job['request']; |
|
798 |
|
|
|
799 |
|
// Get the worker list, so we can sort it |
|
800 |
|
$workers_list = rg_worker_list_all($db, $req['uid']); |
|
801 |
|
if ($workers_list === FALSE) { |
|
802 |
|
rg_log($jid . ': cannot load workers list: ' . rg_worker_error()); |
|
803 |
|
$job['next_try'] = time() + 30; |
|
804 |
|
$ret = FALSE; |
|
805 |
|
break; |
796 |
806 |
} |
} |
|
807 |
|
//rg_log_debug('workers list: ' . rg_array2string($workers_list)); |
|
808 |
|
|
|
809 |
|
// Trying to find a worker in the list of connections |
|
810 |
|
$reason = array(); |
|
811 |
|
$delta = 30; // how many seconds to postpone the job |
|
812 |
|
$total_time_allowed = 0; // unlimited |
|
813 |
|
foreach ($rg_conns as $key => &$i) { |
|
814 |
|
if (strcmp($key, 'master') == 0) |
|
815 |
|
continue; |
797 |
816 |
|
|
798 |
|
$k = $i['worker_id']; |
|
799 |
|
$name = $i['worker_name']; |
|
800 |
|
if (!isset($workers_list[$k])) { |
|
801 |
|
rg_internal_error('Worker ' . $name . ' not found' |
|
802 |
|
. ' in workers_list! Strange!'); |
|
803 |
|
$reason[] = 'worker_not_found'; |
|
804 |
|
continue; |
|
805 |
|
} |
|
806 |
|
$wi = $workers_list[$k]; |
|
|
817 |
|
rg_log('key ' . $key); |
|
818 |
|
if (!isset($i['ann'])) { |
|
819 |
|
rg_log('conn has no announce.'); |
|
820 |
|
// TODO: close after some time? |
|
821 |
|
$reason[] = 'no_announce'; |
|
822 |
|
continue; |
|
823 |
|
} |
807 |
824 |
|
|
808 |
|
if (isset($job['avoid'][$k]) && ($job['avoid'][$k] > time())) { |
|
809 |
|
rg_log($key . ': ' . $jid . ': we must avoid worker ' . $name); |
|
810 |
|
$reason[] = 'avoid'; |
|
811 |
|
continue; |
|
812 |
|
} |
|
|
825 |
|
if (empty($i['ann']['env'])) { |
|
826 |
|
//rg_log('conn has no environments.'); |
|
827 |
|
$reason[] = 'no env'; |
|
828 |
|
continue; |
|
829 |
|
} |
813 |
830 |
|
|
814 |
|
// If number of active jobs is == max workers, skip it |
|
815 |
|
if ($wi['workers']) { |
|
816 |
|
$aj = worker_active_jobs($i['worker_id']); |
|
817 |
|
if ($aj >= $wi['workers']) { |
|
818 |
|
rg_log($key . ': ' . $jid |
|
819 |
|
. ': DEBUG: skip worker ' . $name |
|
820 |
|
. ' because sent jobs(' . $aj . ')' |
|
821 |
|
. ' >= workers(' . $wi['workers'] . ')'); |
|
822 |
|
$reason[] = 'jobs/worker'; |
|
|
831 |
|
if (($i['worker_uid'] > 0) && ($i['worker_uid'] != $req['uid'])) { |
|
832 |
|
rg_log('uids do not match, try next'); |
|
833 |
|
$reason[] = 'uids_mismatch'; |
823 |
834 |
continue; |
continue; |
824 |
835 |
} |
} |
825 |
|
} |
|
826 |
836 |
|
|
827 |
|
$env_found = FALSE; |
|
828 |
|
foreach ($i['ann']['env'] as $env => $junk) { |
|
829 |
|
if (strcasecmp($req['env'], $env) != 0) { |
|
830 |
|
//rg_log($key . ': ' . $jid . ': DEBUG: worker ' . $name |
|
831 |
|
// . ': job env [' . $req['env'] . ']' |
|
832 |
|
// . ' != worker [' . $env . ']'); |
|
|
837 |
|
$k = $i['worker_id']; |
|
838 |
|
$name = $i['worker_name']; |
|
839 |
|
if (!isset($workers_list[$k])) { |
|
840 |
|
rg_internal_error('Worker ' . $name . ' not found' |
|
841 |
|
. ' in workers_list! Strange!'); |
|
842 |
|
$reason[] = 'worker_not_found'; |
833 |
843 |
continue; |
continue; |
834 |
844 |
} |
} |
|
845 |
|
$wi = $workers_list[$k]; |
835 |
846 |
|
|
836 |
|
$env_found = TRUE; |
|
|
847 |
|
if (isset($job['avoid'][$k]) && ($job['avoid'][$k] > time())) { |
|
848 |
|
rg_log('we must avoid worker ' . $name); |
|
849 |
|
$reason[] = 'avoid'; |
|
850 |
|
continue; |
|
851 |
|
} |
|
852 |
|
|
|
853 |
|
// If number of active jobs is == max workers, skip it |
|
854 |
|
// 'workers' is the number of allowed jobs for this worker |
|
855 |
|
if ($wi['workers'] > 0) { |
|
856 |
|
$aj = worker_active_jobs($i['worker_id']); |
|
857 |
|
if ($aj['no_of_jobs'] >= $wi['workers']) { |
|
858 |
|
rg_log('Skip worker ' . $name |
|
859 |
|
. ' because sent jobs(' . implode(',', $aj['jobs']) . ')' |
|
860 |
|
. ' >= workers(' . $wi['workers'] . ')'); |
|
861 |
|
$reason[] = 'jobs-per-worker'; |
|
862 |
|
$delta = 0; |
|
863 |
|
continue; |
|
864 |
|
} |
|
865 |
|
} |
|
866 |
|
|
|
867 |
|
$env_found = FALSE; |
|
868 |
|
foreach ($i['ann']['env'] as $env => $junk) { |
|
869 |
|
if (strcasecmp($req['env'], $env) != 0) { |
|
870 |
|
//rg_log('worker ' . $name |
|
871 |
|
// . ': job env [' . $req['env'] . ']' |
|
872 |
|
// . ' != worker [' . $env . ']'); |
|
873 |
|
continue; |
|
874 |
|
} |
|
875 |
|
|
|
876 |
|
$env_found = TRUE; |
|
877 |
|
break; |
|
878 |
|
} |
|
879 |
|
if (!$env_found) { |
|
880 |
|
$reason[] = 'env_not_found[' . $req['env'] . ']'; |
|
881 |
|
$total_time_allowed = 24 * 3600; |
|
882 |
|
$delta = 3600; |
|
883 |
|
continue; |
|
884 |
|
} |
|
885 |
|
|
|
886 |
|
// Send only what is really needed |
|
887 |
|
$job2 = $req; |
|
888 |
|
|
|
889 |
|
$job2['worker_id'] = $i['worker_id']; |
|
890 |
|
$job2['worker_name'] = $name; |
|
891 |
|
|
|
892 |
|
$job2['op'] = 'BLD'; |
|
893 |
|
$job2['id'] = $jid; |
|
894 |
|
rg_conn_enq($key, json_encode($job2) . "\n"); |
|
895 |
|
|
|
896 |
|
$job['worker_id'] = $i['worker_id']; |
|
897 |
|
$job['worker_name'] = $name; |
|
898 |
|
$job['worker_started'] = 0; |
|
899 |
|
$job['worker_sent'] = time(); |
|
900 |
|
|
|
901 |
|
//rg_log_debug('After sending BLD: job: ' . rg_array2string($job)); |
|
902 |
|
// TODO: after some time, if worker_started is still 0, |
|
903 |
|
// set the 'worker_id' to 0 to be able to go in other place |
|
904 |
|
// TODO: maybe the client must resync with server to |
|
905 |
|
// abort jobs already done on another host, to not |
|
906 |
|
// duplicate work |
|
907 |
|
rg_log('job sent to worker [' . $name . ']'); |
|
908 |
|
rg_log_debug('sent job: ' . rg_array2string($job2)); |
|
909 |
|
$ret = TRUE; |
837 |
910 |
break; |
break; |
838 |
911 |
} |
} |
839 |
|
if (!$env_found) { |
|
840 |
|
$reason[] = 'env_not_found[' . $req['env'] . ']'; |
|
841 |
|
$total_time_allowed = 24 * 3600; |
|
842 |
|
$delta = 3600; |
|
843 |
|
continue; |
|
|
912 |
|
unset($i); |
|
913 |
|
|
|
914 |
|
rg_log('No workers found [' . implode(' ', $reason) . ']!'); |
|
915 |
|
|
|
916 |
|
if (($total_time_allowed > 0) |
|
917 |
|
&& array_key_exists('start', $job) |
|
918 |
|
&& ($job['start'] + $total_time_allowed < time())) { |
|
919 |
|
rg_log('cancel job because' |
|
920 |
|
. ' start + total_time_allowed < now'); |
|
921 |
|
$status = array(); |
|
922 |
|
$status['error'] = 'Env not found after a lot of time'; |
|
923 |
|
$r = rg_builder_done($db, $job, $status); |
|
924 |
|
if ($r === TRUE) |
|
925 |
|
$job['delete_me'] = 1; |
|
926 |
|
|
|
927 |
|
$ret = TRUE; |
|
928 |
|
break; |
844 |
929 |
} |
} |
845 |
930 |
|
|
846 |
|
// Send only what is really needed |
|
847 |
|
$job2 = $req; |
|
848 |
|
|
|
849 |
|
$job2['worker_id'] = $i['worker_id']; |
|
850 |
|
$job2['worker_name'] = $name; |
|
851 |
|
|
|
852 |
|
$job2['op'] = 'BLD'; |
|
853 |
|
$job2['id'] = $jid; |
|
854 |
|
rg_conn_enq($key, json_encode($job2) . "\n"); |
|
855 |
|
|
|
856 |
|
$job['worker_id'] = $i['worker_id']; |
|
857 |
|
$job['worker_name'] = $name; |
|
858 |
|
$job['worker_started'] = 0; |
|
859 |
|
$job['worker_sent'] = time(); |
|
860 |
|
|
|
861 |
|
//rg_log_debug('After sending BLD: job: ' . rg_array2string($job)); |
|
862 |
|
// TODO: after some time, if worker_started is still 0, |
|
863 |
|
// set the 'worker_id' to 0 to be able to go in other place |
|
864 |
|
// TODO: maybe the client must resync with server to |
|
865 |
|
// abort jobs already done on another host, to not |
|
866 |
|
// duplicate work |
|
867 |
|
rg_log($key . ': ' . $jid . ': job sent to worker [' . $name . ']'); |
|
868 |
|
rg_log_debug('sent job: ' . rg_array2string($job2)); |
|
869 |
|
return TRUE; |
|
870 |
|
} |
|
871 |
|
unset($i); |
|
872 |
|
|
|
873 |
|
rg_log($key . ': ' . $jid . ': No workers found [' |
|
874 |
|
. implode(' ', $reason) . ']!'); |
|
875 |
|
|
|
876 |
|
if (($total_time_allowed > 0) |
|
877 |
|
&& array_key_exists('start', $job) |
|
878 |
|
&& ($job['start'] + $total_time_allowed < time())) { |
|
879 |
|
rg_log($key . ': ' . $jid . ': cancel job because' |
|
880 |
|
. ' start + total_time_allowed < now'); |
|
881 |
|
$status = array(); |
|
882 |
|
$status['error'] = 'Env not found after a lot of time'; |
|
883 |
|
$r = rg_builder_done($db, $job, $status); |
|
884 |
|
if ($r === TRUE) { |
|
885 |
|
@unlink($state_dir . '/job-' . $jid . '.ser'); |
|
886 |
|
$job['delete_me'] = 1; |
|
|
931 |
|
// TODO: we should signal this and prevent the call if the list of |
|
932 |
|
// workers does not change. |
|
933 |
|
if ($delta > 0) { |
|
934 |
|
rg_log('Suspending job for ' . $delta . 's'); |
|
935 |
|
$job['next_try'] = time() + $delta; |
|
936 |
|
$job['next_try_reason'] = implode(',', $reason); |
887 |
937 |
} |
} |
888 |
|
return TRUE; |
|
889 |
|
} |
|
890 |
938 |
|
|
891 |
|
// TODO: we should signal this and prevent the call if the list of |
|
892 |
|
// workers does not change. |
|
893 |
|
rg_log($key . ': ' . $jid . ': Suspend job for ' . $delta . 's'); |
|
894 |
|
$job['next_try'] = time() + $delta; |
|
895 |
|
$job['next_try_reason'] = implode(',', $reason); |
|
896 |
|
return TRUE; |
|
|
939 |
|
$ret = TRUE; |
|
940 |
|
} while (0); |
|
941 |
|
|
|
942 |
|
rg_log_exit(); |
|
943 |
|
return $ret; |
897 |
944 |
} |
} |
898 |
945 |
|
|
899 |
946 |
|
|
|
... |
... |
do { |
987 |
1034 |
continue; |
continue; |
988 |
1035 |
|
|
989 |
1036 |
if ($_i['last_ping'] + 10 < $now) { |
if ($_i['last_ping'] + 10 < $now) { |
990 |
|
//rg_log($_k . ': Sending ping'); |
|
|
1037 |
|
rg_log($_k . ': Sending ping'); |
991 |
1038 |
$j = array('op' => 'ping'); |
$j = array('op' => 'ping'); |
992 |
1039 |
rg_conn_enq($_k, json_encode($j) . "\n"); |
rg_conn_enq($_k, json_encode($j) . "\n"); |
993 |
1040 |
$rg_conns[$_k]['last_ping'] = $now; |
$rg_conns[$_k]['last_ping'] = $now; |
|
... |
... |
do { |
1042 |
1089 |
|
|
1043 |
1090 |
$_r = rg_process_job($db, $jobs[$jid]); |
$_r = rg_process_job($db, $jobs[$jid]); |
1044 |
1091 |
if (array_key_exists('delete_me', $jobs[$jid])) |
if (array_key_exists('delete_me', $jobs[$jid])) |
1045 |
|
unset($jobs[$jid]); |
|
|
1092 |
|
rg_clean_job($jid); |
1046 |
1093 |
if ($_r === FALSE) |
if ($_r === FALSE) |
1047 |
1094 |
break; |
break; |
1048 |
1095 |
} |
} |