Changeset 3547


Ignore:
Timestamp:
Feb 7, 2017, 2:44:17 PM (3 years ago)
Author:
hectorgh
Message:

VariablesManager? changes. Read changelog. startup thread is launched after service is up

Location:
n4d/trunk/fuentes
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • n4d/trunk/fuentes/debian/changelog

    r2427 r3547  
     1n4d (0.107) xenial; urgency=high
     2
     3  * Startups, one-shots and cron threads are executed after service is up
     4  * VariablesManager now supports server-client communication. If a variable
     5    is modified in the server, clients are notified of such changes.
     6  * VariablesManager as a Variables Server keeps track of a list of
     7    alive clients using their IP and MACs as identifiers.
     8  * VariablesManager now supports triggers executed when a certain variable is
     9    modified.
     10  * set_variable makes the distinction between setting a new value and
     11    trying to set a value that is already stored in the variable.
     12
     13 -- Hector Garcia Huerta <hectorgh@gmail.com>  Tue, 07 Feb 2017 14:36:19 +0100
     14
    115n4d (0.106) xenial; urgency=high
    216
  • n4d/trunk/fuentes/install-files/etc/n4d/conf.d/VariablesManager

    r103 r3547  
    1515backup=adm,admins,admin
    1616restore=adm,admins,admin
     17(ip)register_instance=anonymous
     18(ip)server_changed=anonymous
  • n4d/trunk/fuentes/install-files/usr/share/n4d/python-plugins/VariablesManager.py

    r1441 r3547  
    1111import tarfile
    1212import threading
     13import subprocess
     14import time
     15import string
    1316
    1417class VariablesManager:
     
    2427        def __init__(self):
    2528               
     29                self.instance_id="".join(random.sample(string.letters+string.digits, 50))
     30                self.server_instance_id=None
    2631                self.variables={}
    2732                self.variables_ok=False
     33                self.variables_clients={}
     34                self.variables_triggers={}
     35                t=threading.Thread(target=self.check_clients,args=())
     36                t.daemon=True
     37                t.start()
     38               
    2839                if os.path.exists(VariablesManager.LOCK_FILE):
    2940                        os.remove(VariablesManager.LOCK_FILE)
     
    5970        #def __init__
    6071       
    61         # DO NOT TOUCH THIS
    6272       
    6373        def startup(self,options):
    64                 pass
    65                
    66         # DONE ============
     74
     75                if "REMOTE_VARIABLES_SERVER" in self.variables:
     76                        self.register_n4d_instance_to_server()
     77                       
     78        #def startup
     79       
     80       
     81        def is_ip_in_range(self,ip,network):
     82               
     83                try:
     84                        return netaddr.ip.IPAddress(ip) in netaddr.IPNetwork(network).iter_hosts()
     85                except:
     86                        return False
     87                       
     88        #def is_ip_in_range
     89       
     90
     91        def get_net_size(self,netmask):
     92               
     93                netmask=netmask.split(".")
     94                binary_str=""
     95                for octet in netmask:
     96                        binary_str += bin(int(octet))[2:].zfill(8)
     97                       
     98                return str(len(binary_str.rstrip('0')))
     99               
     100        #def get_net_size
     101
     102
     103        def get_ip(self):
     104               
     105                for item in netifaces.interfaces():
     106                        tmp=netifaces.ifaddresses(item)
     107                        if tmp.has_key(netifaces.AF_INET):
     108                                if tmp[netifaces.AF_INET][0].has_key("broadcast") and tmp[netifaces.AF_INET][0]["broadcast"]=="10.0.2.255":
     109                                        return tmp[netifaces.AF_INET][0]["addr"]
     110                return None
     111               
     112        #def get_ip
     113       
     114
     115        def route_get_ip(self,ip):
     116               
     117                p=subprocess.Popen(["ip route get %s"%ip],shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()
     118                if "dev" in p[0]:
     119                        dev=p[0].split("dev ")[1].split(" ")[0]
     120                else:
     121                        dev=None
     122                return dev
     123               
     124        #def route_get_ip
     125               
     126
     127        def get_mac_from_device(self,dev):
     128
     129                for item in netifaces.interfaces():
     130                       
     131                        try:
     132                                i=netifaces.ifaddresses(item)
     133                                mac=i[17][0]["addr"]
     134                                broadcast=i[2][0]["broadcast"]
     135                                network=broadcast
     136                                netmask=i[2][0]["netmask"]
     137                                network+="/%s"%self.get_net_size(netmask)
     138                                ip=i[2][0]["addr"]
     139                        except Exception as e:
     140                                continue
     141                       
     142                        if dev=="lo":
     143                                return mac
     144                       
     145                        if item==dev:
     146                                return mac
     147                               
     148                return None
     149
     150        #def get_mac_from_device_in_server_network
     151       
     152       
     153        def register_instance(self,autocompleted_secured_ip,mac):
     154
     155                client={}
     156                client["last_check"]=int(time.time())
     157                client["missed_pings"]=0
     158                client["ip"]=autocompleted_secured_ip
     159                self.variables_clients[mac]=client
     160               
     161                return self.instance_id
     162
     163        #def register_instance
     164       
     165
     166        def register_n4d_instance_to_server(self):
     167               
     168                try:
     169                        server_ip=socket.gethostbyname(self.variables["REMOTE_VARIABLES_SERVER"][u"value"])
     170                        if self.get_ip()!=server_ip:
     171                       
     172                                c=xmlrpclib.ServerProxy("https://%s:9779"%server_ip)
     173                                mac=self.get_mac_from_device(self.route_get_ip(server_ip))
     174                                self.server_instance_id=c.register_instance("","VariablesManager","",mac)
     175                       
     176                except Exception as e:
     177
     178                        return None
     179
     180        #def register_n4d_instance_to_server   
     181       
     182       
     183        def check_clients(self):
     184               
     185                while True:
     186                       
     187                        for item in self.variables_clients:
     188                                ip=self.variables_clients[item]["ip"]
     189                                sys.stdout.write("[VariablesManager] Checking client { MAC:%s IP:%s } ... "%(item,ip))
     190                                c=xmlrpclib.ServerProxy("https://%s:9779"%ip)
     191                                try:
     192                                        c.get_methods()
     193                                        self.variables_clients[item]["last_check"]=time.time()
     194                                        self.variables_clients[item]["missed_pings"]=0
     195                                        print("OK")
     196                                except:
     197                                        self.variables_clients[item]["missed_pings"]+=1
     198                                        print("FAILED")
     199                                        if self.variables_clients[item]["missed_pings"] >=3:
     200                                                print "[VariablesManager] Removing client due to too many missed pings."
     201                                                self.variables_clients.pop(item)
     202                       
     203                        time.sleep(60*5)
     204               
     205               
     206        #def check_clients
     207       
     208        def notify_changes(self,variable):
     209               
     210                print "[VariablesManager] Notifying changes... "
     211               
     212                for mac in self.variables_clients:
     213                       
     214                        ip=self.variables_clients[mac]["ip"]
     215                        c=xmlrpclib.ServerProxy("https://%s:9779"%ip)
     216                        try:
     217                                c.server_changed("","VariablesManager","",self.instance_id,variable)
     218                               
     219                        except:
     220                                self.variables_clients[mac]["missed_pings"]+=1
     221                               
     222                        self.variables_clients[mac]["last_check"]=time.time()
     223               
     224        #def announce_changes
     225       
     226       
     227        def server_changed(self,autocompleted_server_ip,server_instance_id,variable_name):
     228
     229                if server_instance_id==self.server_instance_id:
     230
     231                        if self.variables["REMOTE_VARIABLES_SERVER"] [u'value']!=autocompleted_server_ip:
     232                                return False
     233
     234                        print "[VariablesManager] Server instance ID validated"
     235
     236                        t=threading.Thread(target=self.execute_trigger,args=(variable_name,))
     237                        t.daemon=True
     238                        t.start()
     239                       
     240                        return True
     241                       
     242                else:
     243                       
     244                        return False
     245               
     246        #def server_changed
     247       
     248       
     249        def execute_trigger(self,variable_name):
     250               
     251                if variable_name in self.variables_triggers:
     252                        for i in self.variables_triggers[variable_name]:
     253                                class_name,function=i
     254                                try:
     255                                        print "[VariablesManager] Executing %s.%s trigger..."%(class_name,function.im_func.func_name)
     256                                        function()
     257                                except Exception as e:
     258                                        print e
     259                                        pass
     260               
     261        #def execute_trigger
     262       
     263       
     264        def register_trigger(self,variable_name,class_name,function):
     265               
     266                if variable_name not in self.variables_triggers:
     267                        self.variables_triggers[variable_name]=[]
     268                       
     269                self.variables_triggers[variable_name].append((class_name,function))
     270               
     271        #def register_trigger
    67272       
    68273       
     
    86291               
    87292        #def backup
     293
    88294       
    89295        def restore(self,file_path=None):
     
    166372                ret=re.findall(pattern,value)
    167373               
    168                 print "hola",ret
    169                
    170374                for item in ret:
    171375                        tmp=item.replace("_@START@_","")
     
    178382                return value
    179383               
    180                
    181                
    182384        #def remove_calculated_chars
    183385       
     
    188390                        if not self.variables[item].has_key("volatile"):
    189391                                self.variables[item]["volatile"]=False
    190                        
    191                
    192392               
    193393        #def add_volatile_info
     
    215415                                               
    216416                return ret.strip("\n")
    217                                        
    218                
    219417               
    220418        #def  showvars
     419
    221420       
    222421        def get_variables(self):
     
    314513                return [True,""]
    315514                               
    316                
    317                        
    318                        
    319                 '''
    320                
    321                 if os.path.exists(VariablesManager.INBOX):
    322                        
    323                         for file in os.listdir(VariablesManager.INBOX):
    324                                 file_path=VariablesManager.INBOX+file
    325                                
    326                                 try:
    327                                         execfile(file_path)
    328                                         os.remove(file_path)
    329                                 except Exception as e:
    330                                         self.log(file_path + ": " + str(e))
    331                        
    332                 '''
    333                
    334515        #def read_inbox
     516
    335517       
    336518        def empty_trash(self,force_write=False):
     
    380562        #def empty_trash
    381563       
    382         def get_ip(self):
    383                
    384                 for item in netifaces.interfaces():
    385                         tmp=netifaces.ifaddresses(item)
    386                         if tmp.has_key(netifaces.AF_INET):
    387                                 if tmp[netifaces.AF_INET][0].has_key("broadcast") and tmp[netifaces.AF_INET][0]["broadcast"]=="10.0.2.255":
    388                                         return tmp[netifaces.AF_INET][0]["addr"]
    389                 return None
    390                
    391         #def get_ip
    392564
    393565        def get_variable_list(self,variable_list,store=False,full_info=False):
     
    415587                        return None
    416588                       
    417                
    418        
    419589                if name in self.variables and self.variables[name].has_key("function"):
    420590                        try:
     
    487657
    488658                if name in self.variables:
     659                       
     660                        if value == self.variables[name][u"value"]:
     661                                return [True,"Variable already contained that value"]
     662                       
    489663                        if type(value)==type(""):
    490664                                self.variables[name][u"value"]=unicode(value).encode("utf-8")
     
    505679                                if "description" not in self.variables["name"]:
    506680                                        self.variables[name]["description"]=""
    507                                        
     681                       
     682                        t=threading.Thread(target=self.notify_changes,args=(name,))
     683                        t.daemon=True
     684                        t.start()
     685                       
    508686                        return [True,""]
    509687                else:
     
    512690               
    513691        #def set_variable
     692
    514693       
    515694        def add_variable(self,name,value,function,description,depends,volatile=False,root_protected=False):
     
    537716                else:
    538717                        return [False,"Variable already exists. Use set_variable"]
    539                
     718                       
     719        #def add_variable
     720
     721
    540722        def write_file(self,fname=None):
    541                
    542                 '''
    543                
    544                 try:
    545                         while os.path.exists(VariablesManager.LOCK_FILE):
    546                                 time.sleep(2)
    547                         f=open(VariablesManager.LOCK_FILE,"w")
    548                         f.close()
    549                         tmp={}
    550                         for item in self.variables:
    551                                 if self.variables[item].has_key("volatile") and self.variables[item]["volatile"]==False:
    552                                         tmp[item]=self.variables[item]
    553 
    554                         if fname==None:
    555                                 f=open(VariablesManager.VARIABLES_FILE,"w")
    556                         else:
    557                                 f=open(fname,"w")
    558                                
    559                         data=unicode(json.dumps(tmp,indent=4,encoding="utf-8",ensure_ascii=False)).encode("utf-8")
    560                         f.write(data)
    561                         f.close()
    562                         os.remove(VariablesManager.LOCK_FILE)
    563                         return True
    564                        
    565                 except Exception as e:
    566                         os.remove(VariablesManager.LOCK_FILE)
    567                         print(e)
    568                         return False
    569                        
    570                 '''
    571723               
    572724                try:
     
    604756                        return False
    605757               
    606                        
    607                
    608758        #def write_file
     759
    609760
    610761        def chmod(self,file,mode):
     
    618769                        os.umask(prevmask)
    619770                        return False
     771                       
    620772        #def chmod
     773       
    621774       
    622775        def init_variable(self,variable,args={},force=False,full_info=False):
     
    638791                        return (False,e)
    639792               
    640                
    641793        #def init_variable
    642794       
  • n4d/trunk/fuentes/install-files/usr/share/n4d/xmlrpc-server/core.py

    r1200 r3547  
    274274                       
    275275
    276 t=threading.Thread(target=one_shot)
    277 t.daemon=True
    278 t.start()
     276n4d_core_one_shot_t=threading.Thread(target=one_shot)
     277n4d_core_one_shot_t.daemon=True
     278# Will be executed in server.py after socket is ready
     279#n4d_core_one_shot_t.start()
    279280
    280281
     
    497498                                count+=1
    498499
    499                         p=threading.Thread(target=startup_launcher,args=(objects,))
    500                         p.daemon=True
    501                         p.start()
     500                        load_t=threading.Thread(target=startup_launcher,args=(objects,))
     501                        load_t.daemon=True
     502                        load_t.start()
    502503                        #startup_launcher(objects)
    503504                       
     
    646647
    647648
    648 p=threading.Thread(target=startup_launcher,args=(objects,))
    649 p.daemon=True
    650 p.start()
     649n4d_core_startup_t=threading.Thread(target=startup_launcher,args=(objects,))
     650n4d_core_startup_t.daemon=True
     651# Will be executed in server.py after socket is ready
     652#n4d_core_startup_t.start()
    651653
    652654
     
    705707n4d_cron_thread=threading.Thread(target=n4d_cron)
    706708n4d_cron_thread.daemon=True
    707 n4d_cron_thread.start()
     709# Will be executed in server.py after socket is ready
     710#n4d_cron_thread.start()
    708711       
    709712# OLD FUNCTION LIST
     
    916919#def validate_user
    917920
     921def exec_threads():
     922       
     923        global n4d_core_startup_t, n4d_core_one_shot_t,n4d_cron_thread
     924
     925        n4d_core_one_shot_t.start()
     926        n4d_core_startup_t.start()
     927        n4d_cron_thread.start()
     928       
     929#def exec_threads
     930
    918931
    919932srv_logger.close()
     
    947960                self.run=True
    948961                import threading
    949                 t=threading.Thread(target=self.clear_credentials,args=())
    950                 t.daemon=True
    951                 t.start()
     962                n4d_core_clear_t=threading.Thread(target=self.clear_credentials,args=())
     963                n4d_core_clear_t.daemon=True
     964                n4d_core_clear_t.start()
    952965               
    953966        def launch_triggers(self):
  • n4d/trunk/fuentes/install-files/usr/share/n4d/xmlrpc-server/server.py

    r103 r3547  
    325325
    326326
    327         def startup(self):
     327        def startup(self,funct):
    328328                'run until quit signaled from keyboard...'
    329329                print '[SERVER] Starting; Press CTRL-C to quit ...'
     330                funct()
    330331                while True:
    331332                        try:
     
    393394        l=obj.Core()
    394395        server_address = ('', 9779) # (address, port)
    395         server = SecureDocXMLRPCServer(l, server_address, DEFAULTKEYFILE, DEFAULTCERTFILE)     
    396         #server.register_introspection_functions()
    397         '''
    398         for plugin in l.cm.plugins:
    399                 for method in plugin.function:
    400                         server.register_function(getattr(l.objects[plugin.class_name],method))
    401         '''
    402        
     396        tries=1
     397        max_tries=5
     398        while tries <= max_tries:
     399                try:
     400                        server = SecureDocXMLRPCServer(l, server_address, DEFAULTKEYFILE, DEFAULTCERTFILE)     
     401                        tries=max_tries+1
     402                except:
     403                        print "[SERVER] Failed to obtain socket. Trying again [%s/%s]..."%(tries,max_tries)
     404                        time.sleep(1)
     405                        tries+=1
     406                        if tries >= 5:
     407                                print "[SERVER] Unable to open socket. Exiting..."
     408                                sys.exit(1)
    403409                       
    404410        sa = server.socket.getsockname()
    405411        print "[SERVER] HTTPS on", sa[0], "port", sa[1]
    406         server.startup()
    407 
    408 
     412        server.startup(obj.exec_threads)
     413       
     414
     415
Note: See TracChangeset for help on using the changeset viewer.