source: lliurex-store/trunk/fuentes/python3-lliurex-store.install/usr/share/lliurexstore/plugins/snapManager.py @ 7721

Last change on this file since 7721 was 7721, checked in by Juanma, 21 months ago

Availabe snaps are loaded from available sections

File size: 11.6 KB
Line 
1#The name of the main class must match the file name in lowercase
2import os
3import urllib
4import shutil
5import gi
6from gi.repository import Gio
7gi.require_version ('Snapd', '1')
8from gi.repository import Snapd
9gi.require_version('AppStreamGlib', '1.0')
10from gi.repository import AppStreamGlib as appstream
11import time
12import html
13#Needed for async find method, perhaps only on xenial
14wrap=Gio.SimpleAsyncResult()
15class snapmanager:
16       
17        def __init__(self):
18                self.dbg=False
19                self.progress=0
20                self.partial_progress=0
21                self.plugin_actions={'install':'snap','remove':'snap','pkginfo':'snap','load':'snap'}
22                self.cache_dir=os.getenv("HOME")+"/.cache/lliurex-store"
23                self.cache_xmls=self.cache_dir+'/xmls/snap'
24                self.cache_last_update=self.cache_xmls+'/.snap.lu'
25                self.icons_folder=self.cache_dir+"/icons"
26                self.images_folder=self.cache_dir+"/images"
27                self.result={}
28                self.result['data']={}
29                self.result['status']={}
30                self.disabled=False
31                self.icon_cache_enabled=True
32                self.image_cache_enabled=True
33                self.cli_mode=False
34                if not os.path.isdir(self.icons_folder):
35                        try:
36                                os.makedirs(self.icons_folder)
37                        except:
38                                self.icon_cache_enabled=False
39                                self._debug("Icon cache disabled")
40                if not os.path.isdir(self.images_folder):
41                        try:
42                                os.makedirs(self.images_folder)
43                        except:
44                                self.image_cache_enabled=False
45                                self._debug("Image cache disabled")
46        #def __init__
47
48        def set_debug(self,dbg=True):
49                self.dbg=dbg
50                self._debug ("Debug enabled")
51        #def set_debug
52
53        def _debug(self,msg=''):
54                if self.dbg:
55                        print ('DEBUG snap: %s'%msg)
56        #def debug
57
58        def register(self):
59                return(self.plugin_actions)
60
61        def enable(self,state=False):
62                self.disabled=state
63
64        def execute_action(self,action,applist=None,store=None,results=0):
65                if store:
66                        self.store=store
67                else:
68                        self.store=appstream.Store()
69                self.progress=0
70                self.result['status']={'status':-1,'msg':''}
71                self.result['data']=''
72               
73
74                if self.disabled==True:
75                        self._set_status(9)
76                        self.result['data']=self.store
77                else:
78                        self._check_dirs()
79                        self.snap_client=Snapd.Client()
80                        self.snap_client.connect_sync(None)
81                        dataList=[]
82                        if action=='load':
83                                self.result['data']=self._load_snap_store(self.store)
84                        else:
85                                for app_info in applist:
86                                        self.partial_progress=0
87                                        if action=='install':
88                                                dataList.append(self._install_snap(app_info))
89                                        if action=='remove':
90                                                dataList.append(self._remove_snap(app_info))
91                                        if action=='pkginfo':
92                                                dataList.append(self._get_info(app_info))
93                                        self.progress+=round(self.partial_progress/len(applist),1)
94                                        if self.progress>98:
95                                                self.progress=98
96                                self.result['data']=list(dataList)
97                self.progress=100
98                return(self.result)
99
100        def _set_status(self,status,msg=''):
101                self.result['status']={'status':status,'msg':msg}
102        #def _set_status
103
104        def _callback(self,client,change, _,user_data):
105            # Interate over tasks to determine the aggregate tasks for completion.
106            total = 0
107            done = 0
108            for task in change.get_tasks():
109                total += task.get_progress_total()
110                done += task.get_progress_done()
111            self.progress = round((done/total)*100)
112        #def _callback
113
114        def _check_dirs(self):
115                if not os.path.isdir(self.cache_xmls):
116                        os.makedirs(self.cache_xmls)
117
118        def _load_snap_store(self,store):
119                pkgs=[]
120                #Look if cache is up-to-date
121                if os.path.isfile(self.cache_last_update):
122                        epoch_time=time.time()
123                        fcache=open(self.cache_last_update,'r')
124                        fcache_update=fcache.read()
125                        if not fcache_update:
126                                fcache_update=0
127                        if int(epoch_time)-int(fcache_update)<86400:
128                                if not os.listdir(os.path.dirname(self.cache_xmls)):
129                                        self._debug("Loading snap from cache")
130                                        store=self._load_from_cache(store)
131                                        return store
132
133                fcache=open(self.cache_last_update,'w')
134                fcache.write(str(int(time.time())))
135                if self.cli_mode:
136#                       pkgs=self._search_snap("*")
137                        pkgs=self._load_sections()
138                else:
139                        pkgs=self._load_sections()
140#                       pkgs=self._search_snap_async("*")
141                self._set_status(1)
142                for pkg in pkgs:
143                        app=self.store.get_app_by_pkgname(pkg.get_name())
144                        if not app:
145                                self._debug("Searching for %s"%pkg.get_name())
146                                app=self.store.get_app_by_id(pkg.get_name().lower()+".desktop")
147                        if app:
148                                bundle=appstream.Bundle()
149                                bundle.set_kind(bundle.kind_from_string('SNAP'))
150                                bundle.set_id(pkg.get_name()+'.snap')
151                                app.add_bundle(bundle)
152                                app.add_category("Snap")
153                                store.add_app(self._generate_appstream_app_from_snap(pkg))
154                        else:
155                                store.add_app(self._generate_appstream_app_from_snap(pkg))
156                return(store)
157
158        def _load_from_cache(self,store):
159                for target_file in os.listdir(self.cache_xmls):
160                        if target_file.endswith('.xml'):
161                                store_file=Gio.File.new_for_path(self.cache_xmls+'/'+target_file)
162                                self._debug("Adding file %s/%s"%(self.cache_xmls,target_file))
163                                try:
164                                        store.from_file(store_file,None,None)
165                                except Exception as e:
166                                        self._debug("Couldn't add file %s to store"%target_file)
167                                        self._debug("Reason: %s"%e)
168                return store   
169
170        def _generate_appstream_app_from_snap(self,pkg):
171                bundle=appstream.Bundle()
172                app=appstream.App()
173                icon=appstream.Icon()
174                screenshot=appstream.Screenshot()
175#               bundle.set_kind(appstream.BundleKind.SNAP)
176                bundle.set_kind(bundle.kind_from_string('SNAP'))
177                bundle.set_id(pkg.get_name()+'.snap')
178                app.add_bundle(bundle)
179                app.set_name("C",pkg.get_name()+'.snap')
180                app.add_pkgname(pkg.get_name()+'.snap')
181                app.add_category("Snap")
182                release=appstream.Release()
183                release.set_version(pkg.get_version())
184                app.add_release(release)
185                app.set_id("io.snapcraft.%s"%pkg.get_name()+'.snap')
186                app.set_id_kind=appstream.IdKind.DESKTOP
187                app.set_metadata_license("CC0-1.0")
188                description="This is an Snap bundle. It hasn't been tested by our developers and comes from a 3rd party dev team. Please use it carefully."
189                pkg_description=pkg.get_description()
190                pkg_description=html.escape(pkg_description,quote=True)
191                pkg_description=pkg_description.replace("<","&gt;")
192                app.set_description("C","<p>%s</p><p>%s</p>"%(description,pkg_description))
193                app.set_comment("C",pkg.get_summary().rstrip('.'))
194
195                app.add_keyword("C",pkg.get_name())
196                for word in pkg.get_summary().split(' '):
197                        if len(word)>3:
198                                app.add_keyword("C",word)
199
200                if pkg.get_icon():
201                        if self.icon_cache_enabled:
202                                icon.set_kind(appstream.IconKind.LOCAL)
203                                icon.set_name(self._download_file(pkg.get_icon(),pkg.get_name(),self.icons_folder))
204                        else:
205                                icon.set_kind(appstream.IconKind.REMOTE)
206                                icon.set_name(pkg.get_icon())
207                                icon.set_url(pkg.get_icon())
208                        app.add_icon(icon)
209
210                if pkg.get_license():
211                        app.set_project_license(pkg.get_license())
212
213                if pkg.get_screenshots():
214                        for snap_screen in pkg.get_screenshots():
215                                img=appstream.Image()
216                                img.set_kind(appstream.ImageKind.SOURCE)
217                                img.set_url(snap_screen.get_url())
218                                break
219                        screenshot.add_image(img)
220                        app.add_screenshot(screenshot)
221
222                if not os.path.isfile(self.cache_xmls+'/'+app.get_id_filename()):
223                        xml_path='%s/%s.xml'%(self.cache_xmls,app.get_id_filename())
224                        gioFile=Gio.File.new_for_path(xml_path)
225                        app.to_file(gioFile)
226                        #Fix some things in app_file...
227                        xml_file=open(xml_path,'r')
228                        xml_data=xml_file.readlines()
229                        xml_file.close()
230                        count=0
231                        xml_data[0]=xml_data[0]+"<components>\n"
232                        xml_data[-1]=xml_data[-1]+"\n"+"</components>"
233                        xml_file=open(xml_path,'w')
234                        xml_file.writelines(xml_data)
235                        xml_file.close()
236                return(app)
237
238        def _search_cb(self,obj,request,*args):
239                global wrap
240                wrap=request
241
242        def _load_sections(self):
243                sections=self.snap_client.get_sections_sync()
244                stable_pkgs=[]
245                for section in sections:
246                        apps=self.snap_client.find_section_sync(Snapd.FindFlags.MATCH_NAME,section,None)
247                        for pkg in apps:
248                                stable_pkgs.append(pkg)
249                return(stable_pkgs)
250
251        def _search_snap_async(self,tokens):
252                self._debug("Async Searching %s"%tokens)
253                pkgs=None
254                global wrap
255                self.snap_client.find_async(Snapd.FindFlags.MATCH_NAME,
256                                                        tokens,
257                                                        None,
258                                                        self._search_cb,(None,),None)
259                while 'Snapd' not in str(type(wrap)):
260                        time.sleep(0.1)
261#               snaps,curr=self.snap_client.find_finish(wrap)
262                snaps=self.snap_client.find_finish(wrap)
263                if type(snaps)!=type([]):
264                        pkgs=[snaps]
265                else:
266                        pkgs=snaps
267                stable_pkgs=[]
268                for pkg in pkgs:
269                        if pkg.get_channel()=='stable':
270                                stable_pkgs.append(pkg)
271                return(stable_pkgs)
272
273        def _search_snap(self,tokens):
274                self._debug("Searching %s"%tokens)
275                pkg=None
276                pkgs=None
277                try:
278#                       pkgs,curr=self.snap_client.find_sync(Snapd.FindFlags.MATCH_NAME,tokens,None)
279                        pkgs=self.snap_client.find_sync(Snapd.FindFlags.MATCH_NAME,tokens,None)
280                except Exception as e:
281                        print("ERR: %s"%e)
282                        self._set_status(1)
283                stable_pkgs=[]
284                for pkg in pkgs:
285                        if pkg.get_channel()=='stable':
286                                stable_pkgs.append(pkg)
287                        else:
288                                self._debug(pkg.get_channel())
289                self._debug("Done")
290                return(stable_pkgs)
291        #def _search_snap
292
293        def _download_file(self,url,app_name,dest_dir):
294#               target_file=self.icons_folder+'/'+app_name+".png"
295                target_file=dest_dir+'/'+app_name+".png"
296                if not os.path.isfile(target_file):
297#                       shutil.copy("/usr/share/icons/hicolor/128x128/apps/lliurex-store.png",target_file)
298#                       if not os.fork():
299                        if not os.path.isfile(target_file):
300                                self._debug("Downloading %s to %s"%(url,target_file))
301                                try:
302                                        with urllib.request.urlopen(url) as response, open(target_file, 'wb') as out_file:
303                                                bf=16*1024
304                                                acumbf=0
305                                                file_size=int(response.info()['Content-Length'])
306                                                while True:
307                                                        if acumbf>=file_size:
308                                                            break
309                                                        shutil.copyfileobj(response, out_file,bf)
310                                                        acumbf=acumbf+bf
311                                        st = os.stat(target_file)
312                                except Exception as e:
313                                        self._debug("Unable to download %s"%url)
314                                        self._debug("Reason: %s"%e)
315                                        target_file=''
316#                               os._exit(0)
317                return(target_file)
318        #def _download_file
319
320
321        def _get_info(self,app_info):
322                #switch to launch async method when running under a gui
323                #For an unknown reason request will block when sync mode under a gui and async blocks when on cli (really funny)
324                self._debug("Getting info for %s"%app_info)
325                pkg=None
326                try:
327                        pkg=self.snap_client.list_one_sync(app_info['package'].replace('.snap',''))
328                        app_info['state']='installed'
329                        pkgs=[pkg]
330                except:
331                        app_info['state']='available'
332                        if self.cli_mode:
333                                pkgs=self._search_snap(app_info['package'].replace('.snap',''))
334                        else:
335                                pkgs=self._search_snap_async(app_info['package'].replace('.snap',''))
336                        self._debug("Getting extended info for %s %s"%(app_info['name'],pkgs))
337                if type(pkgs)==type([]):
338                        for pkg in pkgs:
339                                self._debug("Getting extended info for %s"%app_info['name'])
340                                if pkg.get_download_size():
341                                        app_info['size']=str(pkg.get_download_size())
342                                else:
343                                        app_info['size']=str(pkg.get_installed_size())
344                                break
345                else:
346                        app_info['size']='0'
347                self._debug("Info for %s"%app_info)
348                self.partial_progress=100
349                return(app_info)
350        #def _get_info
351
352        def _install_snap(self,app_info):
353                self._debug("Installing %s"%app_info['name'])
354                if app_info['state']=='installed':
355                        self._set_status(4)
356                else:
357                        try:
358                                self.snap_client.install_sync(app_info['name'].replace('.snap',''),
359                        None, # channel
360                        self._callback, (None,),
361                        None) # cancellable
362                                app_info['state']='installed'
363                                self._set_status(0)
364                        except Exception as e:
365                                print("Install error %s"%e)
366                                self._set_status(5)
367
368                self._debug("Installed %s"%app_info)
369                return app_info
370        #def _install_snap
371
372        def _remove_snap(self,app_info):
373                if app_info['state']=='available':
374                        self._set_status(3)
375                else:
376                        try:
377                                self.snap_client.remove_sync(app_info['name'].replace('.snap',''),
378                       self._callback, (None,),
379                                                None) # cancellable
380                                app_info['state']='available'
381                                self._set_status(0)
382                        except Exception as e:
383                                print("Remove error %s"%e)
384                                self._set_status(6)
385                return app_info
386        #def _remove_snap
Note: See TracBrowser for help on using the repository browser.