source: syncer/trunk/fuentes/syncer.install/usr/bin/syncer @ 4039

Last change on this file since 4039 was 4039, checked in by mabarracus, 3 years ago

First release

  • Property svn:executable set to *
File size: 9.1 KB
Line 
1#!/usr/bin/python3
2import sys
3import argparse
4import time
5from functools import reduce
6sys.path.append('/usr/lib/syncer')
7
8from lib.SyncerTypes import *
9
10def init_parser(s=None):
11    if s == None:
12        return
13    parser = argparse.ArgumentParser(description='Some kind of syncronizer')
14    parser.add_argument('-w','--workdir',help="Set a workdir")
15    group = parser.add_mutually_exclusive_group()
16    group.add_argument('-i','--init',metavar='',action='store_const',help='Initialize session',const=True)
17    group.add_argument('-d', '--destroy', metavar='', action='store_const', help='Destroy current session', const=True)
18    group.add_argument('-di','--destroy-init', metavar='',action='store_const', help="Recreate current session",const=True)
19    group.add_argument('-s', '--print-session', metavar='', action='store_const', help='Destroy current session', const=True)
20    group.add_argument('-a', '--add', action='store_true', help='Add plugin processing')
21    group.add_argument('-r', '--read', metavar='[-|file]', nargs='?', help='Read sync settings')
22    group.add_argument('-p', '--process',metavar='',action='store_const',help='Process current session',const=True)
23    group.add_argument('-pp', '--process-parallel', metavar='', action='store_const', help='Process current session', const=True)
24    group.add_argument('-pr', '--process-destroy',metavar='',action='store_const',help='Process current session',const=True)
25    group.add_argument('-ppr', '--process-parallel-destroy', metavar='', action='store_const', help='Process current session', const=True)
26
27    dinfo_parser=s.get_info_parser()
28
29    subparser=parser.add_subparsers(title='Plugins',description='Plugins Available',help='plugin name',dest='typeplug')
30    for plugin in dinfo_parser['choices']:
31        parser_plug=subparser.add_parser(plugin,help='select %s plugin'% plugin)
32        for param in dinfo_parser['parser_info'][plugin]:
33            parser_plug.add_argument(param,help='help '+param)
34
35    return parser.parse_args()
36
37import ctypes
38
39def terminate_thread(thread):
40    """Terminates a python thread from another thread.
41
42    :param thread: a threading.Thread instance
43    """
44    if not thread.isAlive():
45        return
46
47    exc = ctypes.py_object(SystemExit)
48    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
49        ctypes.c_long(thread.ident), exc)
50    if res == 0:
51        raise ValueError("nonexistent thread id")
52    elif res > 1:
53        # """if it returns a number greater than one, you're in trouble,
54        # and you should call it again with exc=NULL to revert the effect"""
55        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
56        raise SystemError("PyThreadState_SetAsyncExc failed")
57
58if __name__ == '__main__':
59    localpath=os.path.dirname(os.path.realpath(__file__))
60    s = SyncerClass(plugin_path=['/usr/lib/syncer/plugins'])
61    args = init_parser(s)
62    MAX_PLUG_TIME = 5
63    if args.workdir:
64        syncpath = args.workdir.strip()
65    else:
66        syncpath = '/var/tmp/syncer'
67    synclockfile = syncpath+'/.lock'
68    syncsessionfile = syncpath+'/session.sync'
69
70    sess = SyncerSession(syncpath,synclockfile,syncsessionfile)
71
72
73    #debug(args)
74    try:
75        if args.destroy or args.destroy_init:
76            sess.destroy()
77
78        if args.init or args.destroy_init:
79            sess.init()
80
81        out = []
82
83        if (args.add):
84            dparam=[]
85            #Get parameters from parser
86            for paramname in s.get_parameters_from_plugin(args.typeplug):
87                dparam.append(getattr(args,paramname))
88
89            #Put data in session
90            sess.put_in_session({'plugin':args.typeplug,'content':dparam})
91
92        if (args.read):
93            if args.read == '-':
94                sess.put_file_in_session(sys.stdin.buffer)
95            else:
96                sess.put_file_in_session(args.read.strip())
97
98        if (args.print_session):
99            for line in sess.get_session_data():
100                sys.stdout.write(line)
101
102        if (args.process or args.process_destroy):
103            session_data=sess.load_session()
104            for plugin,params in (action for action in session_data):
105                s.plugin_input(plugin,params)
106            #Get data from plugin
107            outputs={}
108            for i in range(1,s.sid+1):
109                pluginname=s.get_plugin_from_transid(i)
110                plugin=s.get_plugin(pluginname)
111                plugin.init_process(transid=i)
112                outputs[i] = plugin.output(transid=i)
113                if outputs[i]['status'] != True:
114                    break
115
116            debug(str(outputs))
117
118        if (args.process_parallel or args.process_parallel_destroy):
119            session_data = sess.load_session()
120            for plugin,params in (action for action in session_data):
121                s.plugin_input(plugin,params)
122            import threading
123            outputs={}
124            threads=[]
125            threads_plugin_list=[]
126            for i in range(1, s.sid + 1):
127                pluginname=s.get_plugin_from_transid(i)
128                plugin=s.get_plugin(pluginname)
129                if plugin.is_blocker:
130                    transid_plug_nameparam=list(plugin.data_input[i].keys())[0]
131                    transid_plug_valueparam=plugin.data_input[i][transid_plug_nameparam]
132                    if (transid_plug_valueparam == 'one'):
133                        threads_plugin_list.append('_block_')
134                    else:
135                        threads_plugin_list.append('_blockall_')
136                else:
137                    threads_plugin_list.append(pluginname)
138                t=threading.Thread(target=plugin.init_process,kwargs={'transid':i})
139                t.setDaemon(True)
140                threads.append(t)
141            wait_queue=list(range(0,s.sid))
142            any_alive = lambda x : reduce(lambda a,b : a or b ,x)
143            finished_threads = lambda x : [ x.index(t) for t in x if t._started._flag and not t.isAlive() ]
144            k=0
145            times=[None for x in range(0,s.sid)]
146            while(any_alive(wait_queue)):
147                i = wait_queue[k]
148                if k + 1 == len(wait_queue):
149                    time.sleep(0.1)
150                    for idx in range(0,s.sid):
151                        if threads[idx].isAlive() and times[idx] != None and time.time() > times[idx] + MAX_PLUG_TIME:
152                            for x in threads:
153                                terminate_thread(x)
154                            sys.exit(69)
155                            ##pass
156                k = (k + 1) % len(wait_queue)
157                finished=finished_threads(threads)
158                for x in finished:
159                    pluginname=s.get_plugin_from_transid(x+1)
160                    plugin=s.get_plugin(pluginname)
161                    out=plugin.output(transid=x+1)
162                    if (out['status']!=True):
163                        for i in range(0,s.sid):
164                            if i not in finished:
165                                terminate_thread(threads[i])
166                        sys.exit(66)
167                while i == None and any_alive(wait_queue):
168                    i=wait_queue[k]
169                    k=(k+1)%len(wait_queue)
170                if threads_plugin_list[i] != "_block_" and threads_plugin_list[i] != "_blockall_":
171                    if i == 0:
172                        threads[i].start()
173                        wait_queue[i]=None
174                        times[i]=time.time()
175                    else:
176                        if threads_plugin_list[i-1] == "_blockall_":
177                            threads[i-2].join()
178
179                            threads[i-1].start()
180                            wait_queue[i-1]=None
181                            times[i-1] = time.time()
182                            threads[i-1].join()
183                            threads[i].start()
184                            times[i] = time.time()
185                            wait_queue[i]=None
186
187                            threads[i].join()
188
189                        elif threads_plugin_list[i-1] == "_block_":
190                            if threads[i-2]._started._flag and not threads[i - 2].isAlive():
191                                threads[i - 1].start()
192                                times[i-1] = time.time()
193                                wait_queue[i-1]=None
194                                threads[i - 1].join()
195                                threads[i].start()
196                                times[i] = time.time()
197                                wait_queue[i]=None
198                        else:
199                            threads[i].start()
200                            times[i] = time.time()
201                            wait_queue[i]=None
202
203            for t in threads:
204                t.join()
205
206            for i in range(1, s.sid + 1):
207                pluginname=s.get_plugin_from_transid(i)
208                plugin=s.get_plugin(pluginname)
209                outputs[i]=plugin.output(transid=i)
210
211            for k,v in outputs.items():
212                debug(str(k) +' '+ str(v))
213
214        if args.process_destroy or args.process_parallel_destroy:
215            sess.destroy()
216
217        sys.exit(0)
218    except Exception as e:
219        print('Error %s' % str(e),file=sys.stderr)
220        sys.exit(1)
Note: See TracBrowser for help on using the repository browser.