diff --git a/CHANGES b/CHANGES index bdb5c381ac6f5b7af945499832ee7cec5df331a2..758387e612fbf4b3aac9e873c7386b878177947c 100755 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,21 @@ = Changes from previous Cobalt Versions = +== Changes to 1.0.14 == + * Fix for cluster systems where nodes that weren't numbered could cause + showres/qstat to fail to error out when displaying locations. + * TS has location added, TE records have location, and seconds from epoch UTC + start and end for tasks + * Memory management scripts have been updated so that they can handle larger + Cray XC40 systems. + * Memory management boot records have been enhanced with start/end seconds from + epoch UTC times and blocked locations on the boot-start record. + * Fix for a potential memory mode script switch crash when rebooting from certain + memory mode mixes. + * Group membership checks for queues are now being handled more efficiently. + * Boot start and boot end records from the memory mode management scripts now have + a corrected date format in the message timestamp that is consistient with other + accounting records. + == Changes to 1.0.13 == * Added a web-service interface "cweb" client. This presents job, reservation and node data over a REST API. This is based on the gronkd daemon from diff --git a/misc/cobalt.sles.spec b/misc/cobalt.sles.spec index edd66464bcb9eed315075455b9f9f69604d74376..356ba3eb421545a12a227d3f259af119a7690ab3 100644 --- a/misc/cobalt.sles.spec +++ b/misc/cobalt.sles.spec @@ -11,8 +11,6 @@ Source0: %{name}-%{version}.tar.gz BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root BuildRequires: systemd-rpm-macros Requires: python >= 2.7 -Requires: python-lockfile -Requires: python-python-daemon %package -n cobalt-clients Version: %{version} @@ -33,7 +31,7 @@ The Cobalt Resource Management System %{python} setup.py build %define client_wrapper_dir /usr/libexec/cobalt %define python_wrapper_dir %{client_wrapper_dir}/bin -%define python_site_packages %{_libdir}%{python}/site-packages +%define python_site_packages %{_libdir}/%{python}/site-packages %build cd src/clients && make PROGPREFIX=%{client_wrapper_dir} @@ -54,6 +52,8 @@ install -m 755 src/clients/cobalt-admin ${RPM_BUILD_ROOT}%{_sbindir$} %{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/bg_mpirun_forker.py ${RPM_BUILD_ROOT}%{_sbindir} %{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/bg_runjob_forker.py ${RPM_BUILD_ROOT}%{_sbindir} %{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/system_script_forker.py ${RPM_BUILD_ROOT}%{_sbindir} +%{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/alps_script_forker.py ${RPM_BUILD_ROOT}%{_sbindir} +%{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/alpssystem.py ${RPM_BUILD_ROOT}%{_sbindir} %{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/gravina.py ${RPM_BUILD_ROOT}%{_sbindir} %{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/partadm.py ${RPM_BUILD_ROOT}%{_sbindir} %{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/setres.py ${RPM_BUILD_ROOT}%{_sbindir} diff --git a/src/lib/Components/cqm.py b/src/lib/Components/cqm.py index 9e43a3b82df480c31a1acc82d8444dd83cf37d78..68d07c0f0873daeecf9154ceca947403967621aa 100644 --- a/src/lib/Components/cqm.py +++ b/src/lib/Components/cqm.py @@ -705,6 +705,7 @@ class Job (StateMachine): if self.user_hold: dbwriter.log_to_db(self.user, "user_hold", "job_prog", JobProgMsg(self)) + self.current_task_start = time.time() self.initializing = False @@ -786,6 +787,7 @@ class Job (StateMachine): if self.ion_kerneloptions == False: self.ion_kerneloptions = None self.runid = state.get("runid", None) + self.current_task_start = state.get("current_task_start", None) #for old statefiles, make sure to update the dependency state on restart: self.__dep_hold = False self.initializing = False @@ -885,7 +887,8 @@ class Job (StateMachine): else: self.__max_job_timer = Timer() self.__max_job_timer.start() - task_start = accounting.task_start(self.jobid, self.runid) + self.current_task_start = time.time() + task_start = accounting.task_start(self.jobid, self.runid, self.current_task_start, self.location) accounting_logger.info(task_start) logger.info(task_start) return Job.__rc_success @@ -894,7 +897,9 @@ class Job (StateMachine): '''get exit code from system component''' def end_time_and_log(): self.__max_job_timer.stop() - task_end = accounting.task_end(self.jobid, self.runid, self.__max_job_timer.elapsed_times[-1]) + task_end = accounting.task_end(self.jobid, self.runid, self.__max_job_timer.elapsed_times[-1], self.current_task_start, + time.time(), self.location) + self.current_task_start = None accounting_logger.info(task_end) logger.info(task_end) try: @@ -3260,21 +3265,18 @@ class Restriction (Data): retval = False retstr = "You are not allowed to submit to the '%s' queue (group restriction)" % self.queue.name queue_groups = self.value.split(':') - try: - if '*' in queue_groups: - retval = True - retstr = "" - elif grp.getgrgid(pwd.getpwnam(job['user']).pw_gid).gr_name in queue_groups: - retval = True - retstr = "" - else: - all_groups = grp.getgrall() - for group in all_groups: - if group.gr_name in queue_groups and job['user'] in group.gr_mem: - retval = True - retstr = "" - except KeyError: - retstr = "Group could not be verified for queue restriction." + if '*' in queue_groups: + return(True,"") + + for group_name in queue_groups: + try: + if job['user'] in grp.getgrnam(group_name).gr_mem: + retval = True + retstr = "" + break + except KeyError: + retstr = "Group could not be verified for queue restriction." + return retval, retstr def maxuserjobs(self, job, queuestate=None): diff --git a/src/lib/accounting.py b/src/lib/accounting.py index 4b263da5a4c8f372745f8c0ce09b001c9d24611c..208442bea380bea6d2ecfcd3a623bfcb53829d3b 100644 --- a/src/lib/accounting.py +++ b/src/lib/accounting.py @@ -255,26 +255,31 @@ def confirmed (reservation_id, requester): return entry("Y", reservation_id, {'requester':requester}) -def task_start(job_id, task_id): +def task_start(job_id, task_id, start_time, location): '''Indicate a task has started. Typically this would indicate that add_process_groups has been called successfully. Args: job_id - id of job that this task belongs to task_id - id of the task launched + start_time - time when the task started as seconds from epoch + location - a list of locations that this task is running on ''' - return entry("TS", job_id, {'task_id': task_id}) + return entry("TS", job_id, {'task_id': task_id, 'start': start_time, 'location': location}) -def task_end(job_id, task_id, task_runtime): +def task_end(job_id, task_id, task_runtime, start_time, end_time, location): '''Indicate a task has started. Typically this would indicate that add_process_groups has been called successfully. Args: job_id - id of job that this task belongs to task_id - id of the task launched task_runtime - The running time of the task in seconds. Start time for this is the task_start record timestamp. + start_time - time when the task started as seconds from epoch + end_time - tme when the task ended as seconds from epoch + location - a list of locations that this task is running on ''' - return entry("TE", job_id, {'task_id': task_id, 'task_runtime': task_runtime}) + return entry("TE", job_id, {'task_id': task_id, 'task_runtime': task_runtime, 'start': start_time, 'end': end_time, 'location': location}) class DatetimeFileHandler (BaseRotatingHandler): diff --git a/tools/memory_management/reset_memory_mode.py b/tools/memory_management/reset_memory_mode.py index fb483fcb33bfa23538367e0cac1c85f8b3a2e419..c442a8dd30f657f4d2b7920d99ccf0620071fcee 100755 --- a/tools/memory_management/reset_memory_mode.py +++ b/tools/memory_management/reset_memory_mode.py @@ -50,7 +50,7 @@ logger.addHandler(syslog) ACCOUNTING_LOG_PATH = '/var/log/pbs/boot' ACCOUNTING_MSG_FMT = "%s;%s;%s;%s" # Date, Type, Jobid, keyvals -ACCOUNTING_DATE_FMT = "%d/%m/%Y %H:%M:%S" +ACCOUNTING_DATE_FMT = "%m/%d/%Y %H:%M:%S" def dict_to_keyval_str(dct): '''put a record keyval dict into a string format for pbs logging''' @@ -104,7 +104,12 @@ def exec_fetch_output(cmd, args, timeout=None): cmd_list = [cmd] cmd_list.extend(args) proc = Popen(cmd_list, stdout=PIPE, stderr=PIPE) + stdout = "" + stderr = "" while(True): + curr_stdout, curr_stderr = proc.communicate() + stdout += curr_stdout + stderr += curr_stderr if endtime is not None and int(time.time()) >= endtime: #signal and kill timeout_trip @@ -114,8 +119,12 @@ def exec_fetch_output(cmd, args, timeout=None): if proc.poll() is not None: break time.sleep(POLL_INT) - - stdout, stderr = proc.communicate() + try: + curr_stdout, curr_stderr = proc.communicate() + stdout += curr_stdout + stderr += curr_stderr + except ValueError: + pass # Everything is closed and terminated. if timeout_trip: raise RuntimeError("%s timed out!" % cmd) if proc.returncode != 0: @@ -225,7 +234,6 @@ def main(): label = "%s/%s/%s" % (user, jobid, bootid) accounting_log_filename = '%s-%s' % (time.strftime('%Y%m%d-boot', time.gmtime()), bootid) - current_node_cfg = get_current_modes(node_list) nodes_to_modify = [] initial_modes = {} @@ -240,8 +248,8 @@ def main(): initial_modes[mode] = [int(nid)] nodes_to_modify.append(int(nid)) initial_mode_list = [] - for mode, node_list in initial_modes.items(): - initial_mode_list.append('%s:%s' % (mode, compact_num_list(node_list))) + for mode, mod_node_list in initial_modes.items(): + initial_mode_list.append('%s:%s' % (mode, compact_num_list(mod_node_list))) # assuming that mode change is immediately followed by reboot. Modify when # current setting inspection available. @@ -250,18 +258,21 @@ def main(): reboot_info = {'bootid': bootid, 'boot_time': 'N/A', 'rebooted': compact_num_list(nodes_to_modify), - 'blocked': compact_num_list(node_list), + 'blocked': node_list, 'from_mode': ','.join(initial_mode_list), - 'to_mode': '%s:%s:%s' %(mcdram_mode, numa_mode, compact_num_list(node_list)), + 'to_mode': '%s:%s:%s' % (mcdram_mode, numa_mode, node_list), 'successful': False, + 'start': None, + 'end': None, } if len(nodes_to_modify) != 0: #if we don't have to reboot, don't go through this. accounting_start_msg = ACCOUNTING_MSG_FMT % (time.strftime(ACCOUNTING_DATE_FMT, time.gmtime()), 'BS', jobid, - "bootid=%s" % bootid) + "bootid=%s blocked=%s" % (bootid, reboot_info['blocked'])) with open(os.path.join(ACCOUNTING_LOG_PATH, accounting_log_filename), "a+") as acc_file: acc_file.write(accounting_start_msg + '\n') logger.info("%s", accounting_start_msg) start = time.time() + reboot_info['start'] = start compact_nodes_to_modify = compact_num_list(nodes_to_modify) try: if not reset_modes(compact_nodes_to_modify, mcdram_mode, numa_mode, @@ -285,7 +296,9 @@ def main(): reboot_info['successful'] = success finally: - reboot_info['boot_time'] = int(time.time() - start) + end = time.time() + reboot_info['end'] = end + reboot_info['boot_time'] = int(end - start) accounting_end_msg = ACCOUNTING_MSG_FMT % (time.strftime(ACCOUNTING_DATE_FMT, time.gmtime()), 'BE', jobid, dict_to_keyval_str(reboot_info)) with open(os.path.join(ACCOUNTING_LOG_PATH, accounting_log_filename), "a+") as acc_file: