Source code for shinken.daemons.schedulerdaemon

#!/usr/bin/python

# -*- coding: utf-8 -*-

# Copyright (C) 2009-2012:
#    Gabes Jean, naparuba@gmail.com
#    Gerhard Lausser, Gerhard.Lausser@consol.de
#    Gregory Starck, g.starck@gmail.com
#    Hartmut Goebel, h.goebel@goebel-consult.de
#
# This file is part of Shinken.
#
# Shinken is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Shinken is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Shinken.  If not, see <http://www.gnu.org/licenses/>.

import os
import time
import traceback
import cPickle
import zlib
import base64

from shinken.scheduler import Scheduler
from shinken.macroresolver import MacroResolver
from shinken.external_command import ExternalCommandManager
from shinken.daemon import Daemon
from shinken.property import PathProp, IntegerProp
from shinken.log import logger
from shinken.satellite import BaseSatellite, IForArbiter as IArb, Interface

# Interface for Workers

[docs]class IChecks(Interface): """ Interface for Workers: They connect here and see if they are still OK with our running_id, if not, they must drop their checks """ # poller or reactionner is asking us our running_id #def get_running_id(self): # return self.running_id # poller or reactionner ask us actions
[docs] def get_checks(self, do_checks=False, do_actions=False, poller_tags=['None'], \ reactionner_tags=['None'], worker_name='none', \ module_types=['fork']): #print "We ask us checks" do_checks = (do_checks == 'True') do_actions = (do_actions == 'True') res = self.app.get_to_run_checks(do_checks, do_actions, poller_tags, reactionner_tags, worker_name, module_types) #print "Sending %d checks" % len(res) self.app.nb_checks_send += len(res) return base64.b64encode(zlib.compress(cPickle.dumps(res), 2)) #return zlib.compress(cPickle.dumps(res), 2)
get_checks.encode = 'raw' # poller or reactionner are putting us results
[docs] def put_results(self, results): nb_received = len(results) self.app.nb_check_received += nb_received if nb_received != 0: logger.debug("Received %d results" % nb_received) for result in results: result.set_type_active() self.app.waiting_results.extend(results) #for c in results: #self.sched.put_results(c) return True
put_results.method = 'post'
[docs]class IBroks(Interface): """ Interface for Brokers: They connect here and get all broks (data for brokers). Data must be ORDERED! (initial status BEFORE update...) """ # A broker ask us broks
[docs] def get_broks(self, bname): # Maybe it was not registered as it should, if so, # do it for it if not bname in self.app.brokers: self.fill_initial_broks(bname) # Now get the broks for this specific broker res = self.app.get_broks(bname) # got only one global counter for broks self.app.nb_broks_send += len(res) # we do not more have a full broks in queue self.app.brokers[bname]['has_full_broks'] = False return base64.b64encode(zlib.compress(cPickle.dumps(res), 2)) #return zlib.compress(cPickle.dumps(res), 2)
get_broks.encode = 'raw' # A broker is a new one, if we do not have # a full broks, we clean our broks, and # fill it with all new values
[docs] def fill_initial_broks(self, bname): if bname not in self.app.brokers: logger.info("A new broker just connected : %s" % bname) self.app.brokers[bname] = {'broks' : {}, 'has_full_broks' : False} e = self.app.brokers[bname] if not e['has_full_broks']: e['broks'].clear() self.app.fill_initial_broks(bname, with_logs=True)
[docs]class IForArbiter(IArb): """ Interface for Arbiter. We ask him a for a conf and after that listen for instructions from the arbiter. The arbiter is the interface to the administrator, so we must listen carefully and give him the information he wants. Which could be for another scheduler """ # arbiter is sending us a external command. # it can send us global command, or specific ones
[docs] def run_external_commands(self, cmds): self.app.sched.run_external_commands(cmds)
run_external_commands.method = 'POST'
[docs] def put_conf(self, conf): self.app.sched.die() super(IForArbiter, self).put_conf(conf)
put_conf.method = 'POST' # Call by arbiter if it thinks we are running but we must not (like # if I was a spare that take a conf but the master returns, I must die # and wait for a new conf) # Us: No please... # Arbiter: I don't care, hasta la vista baby! # Us: ... <- Nothing! We are dead! you didn't follow or what??
[docs] def wait_new_conf(self): logger.debug("Arbiter wants me to wait for a new configuration") self.app.sched.die() super(IForArbiter, self).wait_new_conf() # The main app class
[docs]class Shinken(BaseSatellite): properties = BaseSatellite.properties.copy() properties.update({ 'pidfile': PathProp(default='schedulerd.pid'), 'port': IntegerProp(default='7768'), 'local_log': PathProp(default='schedulerd.log'), }) # Create the shinken class: # Create a Pyro server (port = arvg 1) # then create the interface for arbiter # Then, it wait for a first configuration def __init__(self, config_file, is_daemon, do_replace, debug, debug_file, profile=''): BaseSatellite.__init__(self, 'scheduler', config_file, is_daemon, do_replace, debug, debug_file) self.interface = IForArbiter(self) self.sched = Scheduler(self) self.ichecks = None self.ibroks = None self.must_run = True # Now the interface self.uri = None self.uri2 = None # And possible links for satellites # from now only pollers self.pollers = {} self.reactionners = {} self.brokers = {}
[docs] def do_stop(self): if self.http_daemon: if self.ibroks: self.http_daemon.unregister(self.ibroks) if self.ichecks: self.http_daemon.unregister(self.ichecks) super(Shinken, self).do_stop()
[docs] def compensate_system_time_change(self, difference): """ Compensate a system time change of difference for all hosts/services/checks/notifs """ logger.warning("A system time change of %d has been detected. Compensating..." % difference) # We only need to change some value self.program_start = max(0, self.program_start + difference) # Then we compensate all host/services for h in self.sched.hosts: h.compensate_system_time_change(difference) for s in self.sched.services: s.compensate_system_time_change(difference) # Now all checks and actions for c in self.sched.checks.values(): # Already launch checks should not be touch if c.status == 'scheduled' and c.t_to_go is not None: t_to_go = c.t_to_go ref = c.ref new_t = max(0, t_to_go + difference) if ref.check_period is not None: # But it's no so simple, we must match the timeperiod new_t = ref.check_period.get_next_valid_time_from_t(new_t) # But maybe no there is no more new value! Not good :( # Say as error, with error output if new_t is None: c.state = 'waitconsume' c.exit_status = 2 c.output = '(Error: there is no available check time after time change!)' c.check_time = time.time() c.execution_time = 0 else: c.t_to_go = new_t ref.next_chk = new_t # Now all checks and actions for c in self.sched.actions.values(): # Already launch checks should not be touch if c.status == 'scheduled': t_to_go = c.t_to_go # Event handler do not have ref ref = getattr(c, 'ref', None) new_t = max(0, t_to_go + difference) # Notification should be check with notification_period if c.is_a == 'notification': if ref.notification_period: # But it's no so simple, we must match the timeperiod new_t = ref.notification_period.get_next_valid_time_from_t(new_t) # And got a creation_time variable too c.creation_time = c.creation_time + difference # But maybe no there is no more new value! Not good :( # Say as error, with error output if new_t is None: c.state = 'waitconsume' c.exit_status = 2 c.output = '(Error: there is no available check time after time change!)' c.check_time = time.time() c.execution_time = 0 else: c.t_to_go = new_t
[docs] def manage_signal(self, sig, frame): logger.warning("Received a SIGNAL %s" % sig) # If we got USR1, just dump memory if sig == 10: self.sched.need_dump_memory = True elif sig == 12: #usr2, dump objects self.sched.need_objects_dump = True else: # if not, die :) self.sched.die() self.must_run = False Daemon.manage_signal(self, sig, frame)
[docs] def do_loop_turn(self): # Ok, now the conf self.wait_for_initial_conf() if not self.new_conf: return logger.info("New configuration received") self.setup_new_conf() logger.info("New configuration loaded") self.sched.run()
[docs] def setup_new_conf(self): pk = self.new_conf conf_raw = pk['conf'] override_conf = pk['override_conf'] modules = pk['modules'] satellites = pk['satellites'] instance_name = pk['instance_name'] push_flavor = pk['push_flavor'] skip_initial_broks = pk['skip_initial_broks'] t0 = time.time() conf = cPickle.loads(conf_raw) logger.debug("Conf received at %d. Unserialized in %d secs" % (t0, time.time() - t0)) self.new_conf = None # Tag the conf with our data self.conf = conf self.conf.push_flavor = push_flavor self.conf.instance_name = instance_name self.conf.skip_initial_broks = skip_initial_broks self.cur_conf = conf self.override_conf = override_conf self.modules = modules self.satellites = satellites #self.pollers = self.app.pollers if self.conf.human_timestamp_log: logger.set_human_format() # Now We create our pollers for pol_id in satellites['pollers']: # Must look if we already have it already_got = pol_id in self.pollers p = satellites['pollers'][pol_id] self.pollers[pol_id] = p if p['name'] in override_conf['satellitemap']: p = dict(p) # make a copy p.update(override_conf['satellitemap'][p['name']]) uri = 'http://%s:%s/' % (p['address'], p['port']) self.pollers[pol_id]['uri'] = uri self.pollers[pol_id]['last_connection'] = 0 # First mix conf and override_conf to have our definitive conf for prop in self.override_conf: #print "Overriding the property %s with value %s" % (prop, self.override_conf[prop]) val = self.override_conf[prop] setattr(self.conf, prop, val) if self.conf.use_timezone != '': logger.debug("Setting our timezone to %s" % str(self.conf.use_timezone)) os.environ['TZ'] = self.conf.use_timezone time.tzset() if len(self.modules) != 0: logger.debug("I've got %s modules" % str(self.modules)) # TODO: if scheduler had previous modules instanciated it must clean them! self.modules_manager.set_modules(self.modules) self.do_load_modules() # give it an interface # But first remove previous interface if exists if self.ichecks is not None: logger.debug("Deconnecting previous Check Interface") self.http_daemon.unregister(self.ichecks) # Now create and connect it self.ichecks = IChecks(self.sched) self.http_daemon.register(self.ichecks) logger.debug("The Scheduler Interface uri is: %s" % self.uri) # Same for Broks if self.ibroks is not None: logger.debug("Deconnecting previous Broks Interface") self.http_daemon.unregister(self.ibroks) # Create and connect it self.ibroks = IBroks(self.sched) self.http_daemon.register(self.ibroks) logger.info("Loading configuration.") self.conf.explode_global_conf() # we give sched it's conf self.sched.reset() self.sched.load_conf(self.conf) self.sched.load_satellites(self.pollers, self.reactionners) # We must update our Config dict macro with good value # from the config parameters self.sched.conf.fill_resource_macros_names_macros() #print "DBG: got macros", self.sched.conf.macros # Creating the Macroresolver Class & unique instance m = MacroResolver() m.init(self.conf) #self.conf.dump() #self.conf.quick_debug() # Now create the external commander # it's a applyer: it role is not to dispatch commands, # but to apply them e = ExternalCommandManager(self.conf, 'applyer') # Scheduler need to know about external command to # activate it if necessary self.sched.load_external_command(e) # External command need the sched because he can raise checks e.load_scheduler(self.sched) # We clear our schedulers managed (it's us :) ) # and set ourself in it self.schedulers = {self.conf.instance_id: self.sched} # Give the arbiter the data about what I manage # for me it's just my instance_id and my push flavor
[docs] def what_i_managed(self): if hasattr(self, 'conf'): return {self.conf.instance_id: self.conf.push_flavor} else: return {} # our main function, launch after the init
[docs] def main(self): try: self.load_config_file() self.look_for_early_exit() self.do_daemon_init_and_start() self.load_modules_manager() self.http_daemon.register(self.interface) self.http_daemon.unregister(self.interface) self.uri = self.http_daemon.uri logger.info("[scheduler] General interface is at: %s" % self.uri) self.do_mainloop() except Exception, exp: logger.critical("I got an unrecoverable error. I have to exit") logger.critical("You can log a bug ticket at https://github.com/naparuba/shinken/issues/new to get help") logger.critical("Back trace of it: %s" % (traceback.format_exc())) raise
Read the Docs v: documentation
Versions
latest
documentation
Downloads
PDF
HTML
Epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.