source: lliurex-store/trunk/fuentes/python3-lliurex-store.install/usr/share/lliurexstore/plugins/snapManager.py @ 7069

Last change on this file since 7069 was 7069, checked in by Juanma, 2 years ago

Fixed snap channels

File size: 8.3 KB
Line 
1#The name of the main class must match the file name in lowercase
2import os
3import urllib
4import shutil
5import gi
6from gi.repository import Gio
7gi.require_version ('Snapd', '1')
8from gi.repository import Snapd
9gi.require_version('AppStreamGlib', '1.0')
10from gi.repository import AppStreamGlib as appstream
11import time
12#Needed for async find method, perhaps only on xenial
13wrap=Gio.SimpleAsyncResult()
14class snapmanager:
15       
16        def __init__(self):
17                self.dbg=True
18                self.progress=0
19                self.partial_progress=0
20                self.plugin_actions={'install':'snap','remove':'snap','pkginfo':'snap','load':'snap'}
21                self.cache_dir=os.getenv("HOME")+"/.cache/lliurex-store"
22                self.icons_folder=self.cache_dir+"/icons"
23                self.images_folder=self.cache_dir+"/images"
24                self.result={}
25                self.result['data']={}
26                self.result['status']={}
27                self.disabled=False
28                self.icon_cache_enabled=True
29                self.cli_mode=False
30                if not os.path.isdir(self.icons_folder):
31                        try:
32                                os.makedirs(self.icons_folder)
33                        except:
34                                self.icon_cache_enabled=False
35                                self._debug("Icon cache disabled")
36        #def __init__
37
38        def set_debug(self,dbg=True):
39                self.dbg=dbg
40                self._debug ("Debug enabled")
41        #def set_debug
42
43        def _debug(self,msg=''):
44                if self.dbg:
45                        print ('DEBUG snap: '+msg)
46        #def debug
47
48        def register(self):
49                return(self.plugin_actions)
50
51        def enable(self,state=False):
52                self.disabled=state
53
54        def execute_action(self,action,applist=None,store=None,results=0):
55                if store:
56                        self.store=store
57                else:
58                        self.store=appstream.Store()
59                self.progress=0
60                self.result['status']={'status':-1,'msg':''}
61                self.result['data']=''
62               
63
64                if self.disabled==True:
65                        self._set_status(9)
66                        self.result['data']=self.store
67                else:
68                        self.snap_client=Snapd.Client()
69                        self.snap_client.connect_sync(None)
70                        dataList=[]
71                        if action=='load':
72                                self.result['data']=self._load_snap_store(self.store)
73                        else:
74                                for app_info in applist:
75                                        self.partial_progress=0
76                                        if action=='install':
77                                                dataList.append(self._install_snap(app_info))
78                                        if action=='remove':
79                                                dataList.append(self._remove_snap(app_info))
80                                        if action=='pkginfo':
81                                                dataList.append(self._get_info(app_info))
82                                        self.progress+=round(self.partial_progress/len(applist),1)
83                                        if self.progress>98:
84                                                self.progress=98
85                                self.result['data']=list(dataList)
86                self.progress=100
87                return(self.result)
88
89        def _set_status(self,status,msg=''):
90                self.result['status']={'status':status,'msg':msg}
91        #def _set_status
92
93        def _callback(self,client,change, _,user_data):
94            # Interate over tasks to determine the aggregate tasks for completion.
95            total = 0
96            done = 0
97            for task in change.get_tasks():
98                total += task.get_progress_total()
99                done += task.get_progress_done()
100            self.progress = round((done/total)*100)
101        #def _callback
102
103        def _load_snap_store(self,store):
104                pkgs=[]
105                if self.cli_mode:
106                        pkgs=self._search_snap("*")
107                else:
108                        pkgs=self._search_snap_async("*")
109                self._set_status(1)
110                for pkg in pkgs:
111                        store.add_app(self._generate_appstream_app(pkg))
112                return(store)
113
114        def _generate_appstream_app(self,pkg):
115                bundle=appstream.Bundle()
116                app=appstream.App()
117                icon=appstream.Icon()
118                screenshot=appstream.Screenshot()
119                #F*****g appstream have kinds undefined but kind_from_string works... wtf?
120#               bundle.set_kind(appstream.BundleKind.SNAP)
121                bundle.set_kind(bundle.kind_from_string('SNAP'))
122                bundle.set_id(pkg.get_name()+'.snap')
123                app.add_bundle(bundle)
124                app.set_name("C",pkg.get_name())
125                app.add_pkgname(pkg.get_name()+'.snap')
126                app.add_category("Snap")
127                release=appstream.Release()
128                release.set_version(pkg.get_version())
129                app.add_release(release)
130                app.set_id("io.snapcraft.%s"%pkg.get_name()+'.snap')
131#               app.set_id(pkg.get_name()+'.snap')
132                app.set_id_kind=appstream.IdKind.DESKTOP
133                description=("This is an Snap bundle of app %s. It hasn't been tested by our developers and comes from a 3rd party dev team. Please use it carefully."%pkg.get_name())
134                app.set_description("C",description+'<br>'+pkg.get_description())
135                app.set_comment("C",pkg.get_summary())
136                app.add_keyword("C",pkg.get_name())
137                for word in pkg.get_summary().split(' '):
138                        app.add_keyword("C",word)
139
140                if pkg.get_icon():
141                        if self.icon_cache_enabled:
142                                icon.set_kind(appstream.IconKind.LOCAL)
143                                icon.set_name(self._download_file(pkg.get_icon(),pkg.get_name()))
144                        else:
145                                icon.set_kind(appstream.IconKind.REMOTE)
146                                icon.set_name(pkg.get_icon())
147                        app.add_icon(icon)
148
149                if pkg.get_license():
150                        app.set_project_license(pkg.get_license())
151
152#               if pkg.get_screenshots():
153#                       img=appstream.Image()
154#                       screenshot_list=[]
155#                               for screen in pkg.get_screenshots():
156#                                       screenshot_list.append(screen.get_url())
157
158#                               app_info["screenshots"]=screenshot_list
159                return(app)
160
161        def _search_cb(self,obj,request,*args):
162                global wrap
163                wrap=request
164
165        def _search_snap_async(self,tokens):
166                self._debug("Async Searching %s"%tokens)
167                pkgs=None
168                global wrap
169                self.snap_client.find_async(Snapd.FindFlags.MATCH_NAME,
170                                                        tokens,
171                                                        None,
172                                                        self._search_cb,(None,),None)
173                while 'Snapd' not in str(type(wrap)):
174                        time.sleep(0.1)
175                snaps=self.snap_client.find_finish(wrap)
176                if type(snaps)!=type([]):
177                        pkgs=[snaps]
178                else:
179                        pkgs=snaps
180                stable_pkgs=[]
181                for pkg in pkgs:
182                        if pkg.get_channel()=='stable':
183                                stable_pkgs.append(pkg)
184                return(stable_pkgs)
185
186        def _search_snap(self,tokens):
187                self._debug("Searching %s"%tokens)
188                pkg=None
189                pkgs=None
190                try:
191                        pkgs=self.snap_client.find_sync(Snapd.FindFlags.MATCH_NAME,tokens,None,None)
192                except Exception as e:
193                        print(e)
194                        self._set_status(1)
195                stable_pkgs=[]
196                for pkg in pkgs:
197                        if pkg.get_channel()=='stable':
198                                stable_pkgs.append(pkg)
199                        else:
200                                self._debug(pkg.get_channel())
201                self._debug("Done")
202                return(stable_pkgs)
203        #def _search_snap
204
205        def _download_file(self,url,app_name):
206                target_file=self.icons_folder+'/'+app_name+".png"
207                if not os.path.isfile(target_file):
208                        self._debug("Downloading %s to %s"%(url,target_file))
209                        try:
210                                with urllib.request.urlopen(url) as response, open(target_file, 'wb') as out_file:
211                                        bf=16*1024
212                                        acumbf=0
213                                        file_size=int(response.info()['Content-Length'])
214                                        while True:
215                                                if acumbf>=file_size:
216                                                    break
217                                                shutil.copyfileobj(response, out_file,bf)
218                                                acumbf=acumbf+bf
219                                st = os.stat(target_file)
220                                os.chmod(target_file, st.st_mode | 0o111)
221                        except Exception as e:
222                                self._debug("Unable to download %s"%url)
223                                self._debug("Reason: %s"%e)
224                return(target_file)
225        #def _download_file
226
227
228        def _get_info(self,app_info):
229                #switch to launch async method when running under a gui
230                #For an unknown reason request will block when sync mode under a gui and async blocks when on cli (really funny)
231                self._debug("Getting info for %s"%app_info)
232                pkg=None
233                try:
234                        pkg=self.snap_client.list_one_sync(app_info['name'])
235                        app_info['state']='installed'
236                        pkgs=[pkg]
237                except:
238                        app_info['state']='available'
239                        if self.cli_mode:
240                                pkgs=self._search_snap(app_info['name'])
241                        else:
242                                pkgs=self._search_snap_async(app_info['name'])
243                        self._debug("Getting extended info for %s %s"%(app_info['name'],pkgs))
244                if type(pkgs)==type([]):
245                        for pkg in pkgs:
246                                self._debug("Getting extended info for %s"%app_info['name'])
247                                if pkg.get_download_size():
248                                        app_info['size']=str(pkg.get_download_size())
249                                else:
250                                        app_info['size']=str(pkg.get_installed_size())
251                                break
252                else:
253                        app_info['size']='0'
254                self._debug("Info for %s"%app_info)
255                self.partial_progress=100
256                return(app_info)
257        #def _get_info
258
259        def _install_snap(self,app_info):
260                self._debug("Installing %s"%app_info['name'])
261                if app_info['state']=='installed':
262                        self._set_status(4)
263                else:
264                        try:
265                                self.snap_client.install_sync(app_info['name'],
266                        None, # channel
267                        self._callback, (None,),
268                        None) # cancellable
269                                app_info['state']='installed'
270                                self._set_status(0)
271                        except Exception as e:
272                                print("Install error %s"%e)
273                                self._set_status(5)
274
275                self._debug("Installed %s"%app_info)
276                return app_info
277        #def _install_snap
278
279        def _remove_snap(self,app_info):
280                if app_info['state']=='available':
281                        self._set_status(3)
282                else:
283                        try:
284                                self.snap_client.remove_sync(app_info['name'],
285                       self._callback, (None,),
286                                                None) # cancellable
287                                app_info['state']='available'
288                                self._set_status(0)
289                        except Exception as e:
290                                print("Remove error %s"%e)
291                                self._set_status(6)
292                return app_info
293        #def _remove_snap
Note: See TracBrowser for help on using the repository browser.