source: lliurex-store/trunk/fuentes/python3-lliurex-store.install/usr/share/lliurexstore/plugins/snapManager.py @ 6645

Last change on this file since 6645 was 6645, checked in by Juanma, 2 years ago

Added snap support. Minor changes

File size: 7.8 KB
Line 
1#The name of the main class must match the file name in lowercase
2import os
3import urllib
4import shutil
5import gi
6from gi.repository import Gio
7gi.require_version ('Snapd', '1')
8from gi.repository import Snapd
9gi.require_version('AppStreamGlib', '1.0')
10from gi.repository import AppStreamGlib as appstream
11import time
12#Needed for async find method, perhaps only on xenial
13wrap=Gio.SimpleAsyncResult()
14class snapmanager:
15       
16        def __init__(self):
17                self.dbg=False
18                self.progress=0
19                self.partial_progress=0
20                self.plugin_actions={'install':'snap','remove':'snap','pkginfo':'snap','load':'snap'}
21                self.cache_dir="/var/lib/lliurexstore/bundles/cache"
22                self.icons_folder=self.cache_dir+"/icons"
23                self.result={}
24                self.result['data']={}
25                self.result['status']={}
26                self.disabled=False
27                self.icon_cache_enabled=True
28                self.cli_mode=False
29                if not os.path.isdir(self.icons_folder):
30                        try:
31                                os.makedirs(self.icons_folder)
32                        except:
33                                self.icon_cache_enabled=False
34                                self._debug("Icon cache disabled")
35        #def __init__
36
37        def set_debug(self,dbg=True):
38                self.dbg=dbg
39                self._debug ("Debug enabled")
40        #def set_debug
41
42        def _debug(self,msg=''):
43                if self.dbg:
44                        print ('DEBUG snap: '+msg)
45        #def debug
46
47        def register(self):
48                return(self.plugin_actions)
49
50        def enable(self,state=False):
51                self.disabled=state
52
53        def execute_action(self,action,applist=None,store=None,results=0):
54                if store:
55                        self.store=store
56                else:
57                        self.store=appstream.Store()
58                self.progress=0
59                self.result['status']={'status':-1,'msg':''}
60                self.result['data']=''
61               
62
63                if self.disabled==True:
64                        self._set_status(9)
65                        self.result['data']=self.store
66                else:
67                        self.snap_client=Snapd.Client()
68                        self.snap_client.connect_sync(None)
69                        dataList=[]
70                        if action=='load':
71                                self.result['data']=self._load_snap_store(self.store)
72                        else:
73                                for app_info in applist:
74                                        self.partial_progress=0
75                                        if action=='install':
76                                                dataList.append(self._install_snap(app_info))
77                                        if action=='remove':
78                                                dataList.append(self._remove_snap(app_info))
79                                        if action=='pkginfo':
80                                                dataList.append(self._get_info(app_info))
81                                        self.progress+=round(self.partial_progress/len(applist),1)
82                                        if self.progress>98:
83                                                self.progress=98
84                                self.result['data']=list(dataList)
85                self.progress=100
86                return(self.result)
87
88        def _set_status(self,status,msg=''):
89                self.result['status']={'status':status,'msg':msg}
90        #def _set_status
91
92        def _callback(self,client,change, _,user_data):
93            # Interate over tasks to determine the aggregate tasks for completion.
94            total = 0
95            done = 0
96            for task in change.get_tasks():
97                total += task.get_progress_total()
98                done += task.get_progress_done()
99            self.progress = round((done/total)*100)
100        #def _callback
101
102        def _load_snap_store(self,store):
103                try:
104                        for pkg in self._search_snap("*"):
105                                store.add_app(self._generate_appstream_app(pkg))
106                except:
107                        self._set_status(1)
108                return(store)
109
110        def _generate_appstream_app(self,pkg):
111                bundle=appstream.Bundle()
112                app=appstream.App()
113                #F*****g appstream have kinds undefined but kind_from_string works... wtf?
114#               bundle.set_kind(appstream.BundleKind.SNAP)
115                bundle.set_kind(bundle.kind_from_string('SNAP'))
116                bundle.set_id(pkg.get_name()+'.snap')
117                app.add_bundle(bundle)
118                app.set_name("C",pkg.get_name())
119                app.add_pkgname(pkg.get_name()+'.snap')
120                app.add_category("Snap")
121                release=appstream.Release()
122                release.set_version(pkg.get_version())
123                app.add_release(release)
124                app.set_id("io.snapcraft.%s"%pkg.get_name()+'.snap')
125#               app.set_id(pkg.get_name()+'.snap')
126                app.set_id_kind=appstream.IdKind.DESKTOP
127                description=("This is an Snap bundle of app %s. It hasn't been tested by our developers and comes from a 3rd party dev team. Please use it carefully."%pkg.get_name())
128                app.set_description("C",description+'<br>'+pkg.get_description())
129                app.set_comment("C",pkg.get_summary())
130                app.add_keyword("C",pkg.get_name())
131                for word in pkg.get_summary().split(' '):
132                        app.add_keyword("C",word)
133
134                if pkg.get_icon():
135                        app.set_icon_path(pkg.get_icon())
136                return(app)
137
138        def _search_cb(self,obj,request,*args):
139                global wrap
140                wrap=request
141
142        def _search_snap_async(self,tokens):
143                self._debug("Async Searching %s"%tokens)
144                pkgs=None
145                global wrap
146                self.snap_client.find_async(Snapd.FindFlags.MATCH_NAME,
147                                                        tokens,
148                                                        None,
149                                                        self._search_cb,(None,),None)
150                time.sleep(0.5)
151                snaps=self.snap_client.find_finish(wrap)
152                if type(snaps)!=type([]):
153                        pkgs=[snaps]
154                else:
155                        pkgs=snaps
156                return(pkgs)
157
158        def _search_snap(self,tokens):
159                self._debug("Searching %s"%tokens)
160                pkg=None
161                try:
162                        pkg=self.snap_client.find_sync(Snapd.FindFlags.MATCH_NAME,tokens,None,None)
163                except Exception as e:
164                        print(e)
165                        self._set_status(1)
166                self._debug("Done")
167                return(pkg)
168        #def _search_snap
169
170        def _download_file(self,url,app_name):
171                target_file=self.icons_folder+'/'+app_name+".png"
172                if not os.path.isfile(target_file):
173                        self._debug("Downloading %s to %s"%(url,target_file))
174                        try:
175                                with urllib.request.urlopen(url) as response, open(target_file, 'wb') as out_file:
176                                        bf=16*1024
177                                        acumbf=0
178                                        file_size=int(response.info()['Content-Length'])
179                                        while True:
180                                                if acumbf>=file_size:
181                                                    break
182                                                shutil.copyfileobj(response, out_file,bf)
183                                                acumbf=acumbf+bf
184                                st = os.stat(target_file)
185                                os.chmod(target_file, st.st_mode | 0o111)
186                        except Exception as e:
187                                self._debug("Unable to download %s"%url)
188                                self._debug("Reason: %s"%e)
189                return(target_file)
190        #def _download_file
191
192
193        def _get_info(self,app_info):
194                #switch to launch async method when running under a gui
195                #For an unknown request will block when sync mode under a gui
196                if self.cli_mode:
197                        pkgs=self._search_snap(app_info['name'])
198                else:
199                        pkgs=self._search_snap_async(app_info['name'])
200                if type(pkgs)==type([]):
201                        for pkg in pkgs:
202                                if pkg.get_download_size():
203                                        app_info['size']=str(pkg.get_download_size())
204                                else:
205                                        app_info['size']=str(pkg.get_installed_size())
206                                if pkg.get_icon():
207                                        if self.icon_cache_enabled:
208                                                app_info['icon']=self._download_file(pkg.get_icon(),app_info['name'])
209                                        else:
210                                                app_info['icon']=pkg.get_icon()
211                                if pkg.get_status():
212                                        #Not working on xenial
213#                                       if pkg.get_status()==Snapd.SnapStatus.INSTALLED or pkg.get_status()==3:
214#                                               app_info['state']='installed'
215                                        try:
216                                                self.snap_client.list_one_sync(pkg.get_name())
217                                                app_info['state']='installed'
218                                        except:
219                                                app_info['state']='available'
220                                if pkg.get_screenshots():
221                                        screenshot_list=[]
222                                        for screen in pkg.get_screenshots():
223                                                screenshot_list.append(screen.get_url())
224                                        app_info["screenshots"]=screenshot_list
225                                print (pkg.get_channel())
226                                #Method not working in xenial, license default type assigned in infoManager
227#                               if pkg.get_license():
228#                                       app_info["license"]=pkg.get_license()
229                                break
230                else:
231                        app_info['size']='0'
232                self._debug("Info for %s"%app_info)
233                self.partial_progress=100
234                return(app_info)
235        #def _get_info
236
237        def _install_snap(self,app_info):
238                self._debug("Installing %s"%app_info['name'])
239                if app_info['state']=='installed':
240                        self._set_status(4)
241                else:
242                        try:
243                                self.snap_client.install_sync(app_info['name'],
244                        None, # channel
245                        self._callback, (None,),
246                        None) # cancellable
247                                app_info['state']='installed'
248                                self._set_status(0)
249                        except Exception as e:
250                                print("Install error %s"%e)
251                                self._set_status(5)
252
253                self._debug("Installed %s"%app_info)
254                return app_info
255        #def _install_snap
256
257        def _remove_snap(self,app_info):
258                if app_info['state']=='available':
259                        self._set_status(3)
260                else:
261                        try:
262                                self.snap_client.remove_sync(app_info['name'],
263                       self._callback, (None,),
264                                                None) # cancellable
265                                app_info['state']='available'
266                                self._set_status(0)
267                        except Exception as e:
268                                print("Remove error %s"%e)
269                                self._set_status(6)
270                return app_info
271        #def _remove_snap
Note: See TracBrowser for help on using the repository browser.