source: lliurex-store/trunk/fuentes/python3-lliurex-store.install/usr/share/lliurexstore/plugins/snapManager.py @ 7132

Last change on this file since 7132 was 7132, checked in by Juanma, 19 months ago

WIP on releases

File size: 9.4 KB
Line 
1#The name of the main class must match the file name in lowercase
2import os
3import urllib
4import shutil
5import gi
6from gi.repository import Gio
7gi.require_version ('Snapd', '1')
8from gi.repository import Snapd
9gi.require_version('AppStreamGlib', '1.0')
10from gi.repository import AppStreamGlib as appstream
11import time
12#Needed for async find method, perhaps only on xenial
13wrap=Gio.SimpleAsyncResult()
14class snapmanager:
15       
16        def __init__(self):
17                self.dbg=False
18                self.progress=0
19                self.partial_progress=0
20                self.plugin_actions={'install':'snap','remove':'snap','pkginfo':'snap','load':'snap'}
21                self.cache_dir=os.getenv("HOME")+"/.cache/lliurex-store"
22                self.icons_folder=self.cache_dir+"/icons"
23                self.images_folder=self.cache_dir+"/images"
24                self.result={}
25                self.result['data']={}
26                self.result['status']={}
27                self.disabled=False
28                self.icon_cache_enabled=True
29                self.image_cache_enabled=True
30                self.cli_mode=False
31                if not os.path.isdir(self.icons_folder):
32                        try:
33                                os.makedirs(self.icons_folder)
34                        except:
35                                self.icon_cache_enabled=False
36                                self._debug("Icon cache disabled")
37                if not os.path.isdir(self.images_folder):
38                        try:
39                                os.makedirs(self.images_folder)
40                        except:
41                                self.image_cache_enabled=False
42                                self._debug("Image cache disabled")
43        #def __init__
44
45        def set_debug(self,dbg=True):
46                self.dbg=dbg
47                self._debug ("Debug enabled")
48        #def set_debug
49
50        def _debug(self,msg=''):
51                if self.dbg:
52                        print ('DEBUG snap: %s'%msg)
53        #def debug
54
55        def register(self):
56                return(self.plugin_actions)
57
58        def enable(self,state=False):
59                self.disabled=state
60
61        def execute_action(self,action,applist=None,store=None,results=0):
62                if store:
63                        self.store=store
64                else:
65                        self.store=appstream.Store()
66                self.progress=0
67                self.result['status']={'status':-1,'msg':''}
68                self.result['data']=''
69               
70
71                if self.disabled==True:
72                        self._set_status(9)
73                        self.result['data']=self.store
74                else:
75                        self.snap_client=Snapd.Client()
76                        self.snap_client.connect_sync(None)
77                        dataList=[]
78                        if action=='load':
79                                self.result['data']=self._load_snap_store(self.store)
80                        else:
81                                for app_info in applist:
82                                        self.partial_progress=0
83                                        if action=='install':
84                                                dataList.append(self._install_snap(app_info))
85                                        if action=='remove':
86                                                dataList.append(self._remove_snap(app_info))
87                                        if action=='pkginfo':
88                                                dataList.append(self._get_info(app_info))
89                                        self.progress+=round(self.partial_progress/len(applist),1)
90                                        if self.progress>98:
91                                                self.progress=98
92                                self.result['data']=list(dataList)
93                self.progress=100
94                return(self.result)
95
96        def _set_status(self,status,msg=''):
97                self.result['status']={'status':status,'msg':msg}
98        #def _set_status
99
100        def _callback(self,client,change, _,user_data):
101            # Interate over tasks to determine the aggregate tasks for completion.
102            total = 0
103            done = 0
104            for task in change.get_tasks():
105                total += task.get_progress_total()
106                done += task.get_progress_done()
107            self.progress = round((done/total)*100)
108        #def _callback
109
110        def _load_snap_store(self,store):
111                pkgs=[]
112                if self.cli_mode:
113                        pkgs=self._search_snap("*")
114                else:
115                        pkgs=self._search_snap_async("*")
116                self._set_status(1)
117                for pkg in pkgs:
118                        app=self.store.get_app_by_pkgname(pkg.get_name())
119                        if not app:
120                                self._debug("Searching for %s"%pkg.get_name())
121                                app=self.store.get_app_by_id(pkg.get_name().lower()+".desktop")
122                        if app:
123                                bundle=appstream.Bundle()
124                                bundle.set_kind(bundle.kind_from_string('SNAP'))
125                                bundle.set_id(pkg.get_name()+'.snap')
126                                app.add_bundle(bundle)
127                                app.add_category("Snap")
128#                               store.add_app(self._generate_appstream_app_from_snap(pkg))
129                        else:
130                                store.add_app(self._generate_appstream_app_from_snap(pkg))
131                return(store)
132
133        def _generate_appstream_app_from_snap(self,pkg):
134                bundle=appstream.Bundle()
135                app=appstream.App()
136                icon=appstream.Icon()
137                screenshot=appstream.Screenshot()
138                #F*****g appstream have kinds undefined but kind_from_string works... wtf?
139#               bundle.set_kind(appstream.BundleKind.SNAP)
140                bundle.set_kind(bundle.kind_from_string('SNAP'))
141                bundle.set_id(pkg.get_name()+'.snap')
142                app.add_bundle(bundle)
143                app.set_name("C",pkg.get_name())
144#               app.add_pkgname(pkg.get_name()+'.snap')
145                app.add_pkgname(pkg.get_name())
146                app.add_category("Snap")
147                release=appstream.Release()
148                release.set_version(pkg.get_version())
149                app.add_release(release)
150                app.set_id("io.snapcraft.%s"%pkg.get_name()+'.snap')
151#               app.set_id(pkg.get_name()+'.snap')
152                app.set_id_kind=appstream.IdKind.DESKTOP
153                description=("This is an Snap bundle of app %s. It hasn't been tested by our developers and comes from a 3rd party dev team. Please use it carefully."%pkg.get_name())
154                app.set_description("C",description+'<br>'+pkg.get_description())
155                app.set_comment("C",pkg.get_summary())
156                app.add_keyword("C",pkg.get_name())
157                for word in pkg.get_summary().split(' '):
158                        app.add_keyword("C",word)
159
160                if pkg.get_icon():
161                        if self.icon_cache_enabled:
162                                icon.set_kind(appstream.IconKind.LOCAL)
163                                icon.set_name(self._download_file(pkg.get_icon(),pkg.get_name(),self.icons_folder))
164                        else:
165                                icon.set_kind(appstream.IconKind.REMOTE)
166                                icon.set_name(pkg.get_icon())
167                        app.add_icon(icon)
168
169                if pkg.get_license():
170                        app.set_project_license(pkg.get_license())
171
172                if pkg.get_screenshots():
173                        for snap_screen in pkg.get_screenshots():
174                                img=appstream.Image()
175#                               img.load_filename(self._download_file(snap_screen.get_url(),pkg.get_name(),self.images_folder))
176                                img.set_kind(appstream.ImageKind.SOURCE)
177                                img.set_url(snap_screen.get_url())
178                                break
179                        screenshot.add_image(img)
180                        app.add_screenshot(screenshot)
181                return(app)
182
183        def _search_cb(self,obj,request,*args):
184                global wrap
185                wrap=request
186
187        def _search_snap_async(self,tokens):
188                self._debug("Async Searching %s"%tokens)
189                pkgs=None
190                global wrap
191                self.snap_client.find_async(Snapd.FindFlags.MATCH_NAME,
192                                                        tokens,
193                                                        None,
194                                                        self._search_cb,(None,),None)
195                while 'Snapd' not in str(type(wrap)):
196                        time.sleep(0.1)
197                snaps=self.snap_client.find_finish(wrap)
198                if type(snaps)!=type([]):
199                        pkgs=[snaps]
200                else:
201                        pkgs=snaps
202                stable_pkgs=[]
203                for pkg in pkgs:
204                        if pkg.get_channel()=='stable':
205                                stable_pkgs.append(pkg)
206                return(stable_pkgs)
207
208        def _search_snap(self,tokens):
209                self._debug("Searching %s"%tokens)
210                pkg=None
211                pkgs=None
212                try:
213                        pkgs=self.snap_client.find_sync(Snapd.FindFlags.MATCH_NAME,tokens,None,None)
214                except Exception as e:
215                        print(e)
216                        self._set_status(1)
217                stable_pkgs=[]
218                for pkg in pkgs:
219                        if pkg.get_channel()=='stable':
220                                stable_pkgs.append(pkg)
221                        else:
222                                self._debug(pkg.get_channel())
223                self._debug("Done")
224                return(stable_pkgs)
225        #def _search_snap
226
227        def _download_file(self,url,app_name,dest_dir):
228#               target_file=self.icons_folder+'/'+app_name+".png"
229                target_file=dest_dir+'/'+app_name+".png"
230                if not os.path.isfile(target_file):
231#                       shutil.copy("/usr/share/icons/hicolor/128x128/apps/lliurex-store.png",target_file)
232#                       if not os.fork():
233                        if not os.path.isfile(target_file):
234                                self._debug("Downloading %s to %s"%(url,target_file))
235                                try:
236                                        with urllib.request.urlopen(url) as response, open(target_file, 'wb') as out_file:
237                                                bf=16*1024
238                                                acumbf=0
239                                                file_size=int(response.info()['Content-Length'])
240                                                while True:
241                                                        if acumbf>=file_size:
242                                                            break
243                                                        shutil.copyfileobj(response, out_file,bf)
244                                                        acumbf=acumbf+bf
245                                        st = os.stat(target_file)
246                                except Exception as e:
247                                        self._debug("Unable to download %s"%url)
248                                        self._debug("Reason: %s"%e)
249                                        target_file=''
250#                               os._exit(0)
251                return(target_file)
252        #def _download_file
253
254
255        def _get_info(self,app_info):
256                #switch to launch async method when running under a gui
257                #For an unknown reason request will block when sync mode under a gui and async blocks when on cli (really funny)
258                self._debug("Getting info for %s"%app_info)
259                pkg=None
260                try:
261                        pkg=self.snap_client.list_one_sync(app_info['package'])
262                        app_info['state']='installed'
263                        pkgs=[pkg]
264                except:
265                        app_info['state']='available'
266                        if self.cli_mode:
267                                pkgs=self._search_snap(app_info['package'])
268                        else:
269                                pkgs=self._search_snap_async(app_info['package'])
270                        self._debug("Getting extended info for %s %s"%(app_info['name'],pkgs))
271                if type(pkgs)==type([]):
272                        for pkg in pkgs:
273                                self._debug("Getting extended info for %s"%app_info['name'])
274                                if pkg.get_download_size():
275                                        app_info['size']=str(pkg.get_download_size())
276                                else:
277                                        app_info['size']=str(pkg.get_installed_size())
278                                break
279                else:
280                        app_info['size']='0'
281                self._debug("Info for %s"%app_info)
282                self.partial_progress=100
283                return(app_info)
284        #def _get_info
285
286        def _install_snap(self,app_info):
287                self._debug("Installing %s"%app_info['name'])
288                if app_info['state']=='installed':
289                        self._set_status(4)
290                else:
291                        try:
292                                self.snap_client.install_sync(app_info['name'],
293                        None, # channel
294                        self._callback, (None,),
295                        None) # cancellable
296                                app_info['state']='installed'
297                                self._set_status(0)
298                        except Exception as e:
299                                print("Install error %s"%e)
300                                self._set_status(5)
301
302                self._debug("Installed %s"%app_info)
303                return app_info
304        #def _install_snap
305
306        def _remove_snap(self,app_info):
307                if app_info['state']=='available':
308                        self._set_status(3)
309                else:
310                        try:
311                                self.snap_client.remove_sync(app_info['name'],
312                       self._callback, (None,),
313                                                None) # cancellable
314                                app_info['state']='available'
315                                self._set_status(0)
316                        except Exception as e:
317                                print("Remove error %s"%e)
318                                self._set_status(6)
319                return app_info
320        #def _remove_snap
Note: See TracBrowser for help on using the repository browser.