Commit b62860d4 authored by Saul Ibarra's avatar Saul Ibarra

Refactor file transfers


They are now handled in the middleware.
parent d1add4a7
......@@ -5,7 +5,6 @@ __all__ = ['ClientConference', 'ConferenceDialog', 'AudioSessionModel', 'AudioSe
import bisect
import cPickle as pickle
import hashlib
import os
import re
import string
......@@ -17,7 +16,6 @@ from collections import defaultdict, deque
from datetime import datetime, timedelta
from itertools import chain, count
from operator import attrgetter
from threading import Event
from PyQt4 import uic
from PyQt4.QtCore import Qt, QAbstractListModel, QByteArray, QEasingCurve, QEvent, QMimeData, QModelIndex, QObject, QPointF, QProcess, QPropertyAnimation, QRect, QRectF, QSize, QTimer, QUrl, pyqtSignal
......@@ -28,7 +26,6 @@ from application.notification import IObserver, NotificationCenter, Notification
from application.python import Null, limit
from application.python.types import MarkerType, Singleton
from application.python.weakref import weakobjectmap
from application.system import makedirs, unlink
from eventlib.proc import spawn
from zope.interface import implements
......@@ -47,7 +44,7 @@ from sipsimple.threading import run_in_thread, run_in_twisted_thread
from blink.configuration.settings import BlinkSettings
from blink.resources import ApplicationData, Resources
from blink.screensharing import ScreensharingWindow, VNCClient, ServerDefault
from blink.util import call_later, call_in_gui_thread, run_in_gui_thread
from blink.util import call_later, run_in_gui_thread
from blink.widgets.buttons import LeftSegment, MiddleSegment, RightSegment
from blink.widgets.labels import Status
from blink.widgets.color import ColorHelperMixin, ColorUtils, cache_result, background_color_key
......@@ -3556,12 +3553,13 @@ class FileTransfer(object):
self._file_selector = None
self._error = False
self._finished = False
self._reason = None
self._finished = False
self._stream_ended = False
def __getstate__(self):
state = dict(id=self.id, direction=self.direction, state=self.state, filename=self.filename, _error=self._error, _finished=self._finished, _reason=self._reason)
return (self.account.id, self.contact.name, self.contact_uri.uri, state)
state = dict(id=self.id, direction=self.direction, state=self.state, filename=self.filename, _error=self._error, _reason=self._reason)
return self.account.id, self.contact.name, self.contact_uri.uri, state
def __setstate__(self, state):
from blink.contacts import URIUtils
......@@ -3575,10 +3573,7 @@ class FileTransfer(object):
self.account = account_manager.default_account
self.contact, self.contact_uri = URIUtils.find_contact(contact_uri, display_name=contact_name)
if self.direction == 'outgoing':
self._stop_event = Event()
self._uri = self._normalize_uri(contact_uri)
else:
self._local_hash = hashlib.sha1()
def init_incoming(self, contact, contact_uri, session, stream):
assert self.state is None
......@@ -3593,17 +3588,7 @@ class FileTransfer(object):
self.stream = stream
self._file_selector = stream.file_selector
self._local_hash = hashlib.sha1()
settings = BlinkSettings()
directory = settings.transfers_directory.normalized
makedirs(directory)
filename = os.path.basename(self._file_selector.name)
for name in UniqueFilenameGenerator.generate(os.path.join(directory, filename)):
if not os.path.exists(name) and not os.path.exists(name + self.tmp_file_suffix):
self.filename = name
break
self._file_selector.fd = open(self.filename+self.tmp_file_suffix, 'wb+')
self.filename = self._file_selector.name
self.state = 'connecting'
notification_center = NotificationCenter()
......@@ -3620,11 +3605,9 @@ class FileTransfer(object):
self.contact = contact
self.contact_uri = contact_uri
self._stop_event = Event()
self._uri = self._normalize_uri(contact_uri.uri)
self._file_selector = FileSelector.for_file(filename.encode(sys.getfilesystemencoding()), hash=None)
self._file_selector = FileSelector.for_file(filename)
self.filename = filename
self.state = 'initialized'
......@@ -3633,18 +3616,36 @@ class FileTransfer(object):
def connect(self):
assert self.direction == 'outgoing' and self.state in ('initialized', 'ended')
notification_center = NotificationCenter()
if self.state == 'ended':
# Reinitialize to retry
self._error = False
self._finished = False
self._reason = None
self._stop_event.clear()
self._file_selector = FileSelector.for_file(self.filename.encode(sys.getfilesystemencoding()), hash=None)
self._finished = False
self._stream_ended = False
# TODO: remember hash and mtime to avoid re-computing hash -Saul
self._file_selector = FileSelector.for_file(self.filename)
self.state = 'initialized'
notification_center = NotificationCenter()
notification_center.post_notification('FileTransferWillRetry', self)
# TODO: use a pool of threads -Saul
self._calculate_hash()
settings = SIPSimpleSettings()
if isinstance(self.account, Account):
if self.account.sip.outbound_proxy is not None:
uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport})
elif self.account.sip.always_use_my_proxy:
uri = SIPURI(host=self.account.id.domain)
else:
uri = self._uri
else:
uri = self._uri
lookup = DNSLookup()
notification_center.add_observer(self, sender=lookup)
lookup.lookup_sip_proxy(uri, settings.sip.transport_list)
self.state = 'connecting/dns_lookup'
def end(self):
assert self.state is not None
......@@ -3657,7 +3658,7 @@ class FileTransfer(object):
assert self.direction == 'outgoing'
self._error = True
self._reason = 'Cancelled'
self._stop_event.set()
self._terminate()
def _get_state(self):
return self.__dict__['state']
......@@ -3705,90 +3706,6 @@ class FileTransfer(object):
stream = property(_get_stream, _set_stream)
del _get_stream, _set_stream
@run_in_thread('file-transfer')
def _process_received_chunk(self, data):
if data is not None:
try:
self._file_selector.fd.write(data)
except EnvironmentError, e:
self._error = True
self._reason = str(e)
call_in_gui_thread(self.end)
else:
self._local_hash.update(data)
else:
if not self._finished and not self._error:
self._error = True
self._reason = 'Cancelled'
self._terminate()
@run_in_thread('file-hash')
def _calculate_hash(self):
hash = hashlib.sha1()
pos = 0
progress = 0
size = self._file_selector.size
if size == 0:
self._error = True
self._reason = 'Empty file'
self._terminate()
return
chunk_size = limit(size/100, min=65536, max=1048576)
self.state = 'connecting/hashing'
notification_center = NotificationCenter()
notification_center.post_notification('FileTransferHashProgress', sender=self, data=NotificationData(progress=0))
while not self._stop_event.is_set():
try:
content = self._file_selector.fd.read(chunk_size)
except EnvironmentError, e:
self._error = True
self._reason = str(e)
self._terminate()
return
if not content:
break
hash.update(content)
pos += len(content)
progress = int(pos * 100 / size)
notification_center.post_notification('FileTransferHashProgress', sender=self, data=NotificationData(progress=progress))
else:
self._terminate()
return
self._file_selector.fd.seek(0)
self._file_selector.hash = hash
self._start_outgoing_session()
@run_in_gui_thread
def _start_outgoing_session(self):
if self._stop_event.is_set():
self._terminate()
return
settings = SIPSimpleSettings()
if isinstance(self.account, Account):
if self.account.sip.outbound_proxy is not None:
uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport})
elif self.account.sip.always_use_my_proxy:
uri = SIPURI(host=self.account.id.domain)
else:
uri = self._uri
else:
uri = self._uri
lookup = DNSLookup()
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=lookup)
lookup.lookup_sip_proxy(uri, settings.sip.transport_list)
self.state = 'connecting/dns_lookup'
def _normalize_uri(self, uri):
if '@' not in uri:
uri += '@' + self.account.id.domain
......@@ -3796,43 +3713,18 @@ class FileTransfer(object):
uri = 'sip:' + uri
return SIPURI.parse(str(uri).translate(None, ' \t'))
@run_in_gui_thread
def _terminate(self):
if self.state != 'ending':
self.state = 'ending'
reason, error = self._reason, self._error
if self._file_selector is not None and self._file_selector.fd is not None:
self._file_selector.fd.close()
self._file_selector.fd = None
if self.direction == 'incoming':
filename = self.filename+self.tmp_file_suffix
if error:
unlink(filename)
else:
local_hash = 'sha1:' + ':'.join(re.findall(r'..', self._local_hash.hexdigest()))
remote_hash = self._file_selector.hash.lower()
if local_hash == remote_hash:
tmp_name = filename
os.rename(tmp_name, self.filename)
reason = 'Completed (%s)' % FileSizeFormatter.format(self._file_selector.size)
else:
error = True
reason = 'File hash mismatch'
unlink(filename)
self.state = 'ending' # if the state is not ending already, simulate it
self.sip_session = None
self.stream = None
state = SessionState('ended')
state.reason = reason
state.error = error
state.reason = self._reason
state.error = self._error
self.state = state
notification_center = NotificationCenter()
notification_center.post_notification('FileTransferDidEnd', sender=self, data=NotificationData(reason=reason, error=error))
notification_center.post_notification('FileTransferDidEnd', sender=self, data=NotificationData(reason=self._reason, error=self._error))
@run_in_gui_thread
def handle_notification(self, notification):
......@@ -3841,17 +3733,14 @@ class FileTransfer(object):
def _NH_DNSLookupDidSucceed(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
if self._stop_event.is_set():
self._terminate()
if self.state in ('ending', 'ended'):
return
routes = notification.data.result
if not routes:
self._error = True
self._reason = 'Destination not found'
self._terminate()
return
self.sip_session = Session(self.account)
registry = MediaStreamRegistry()
cls = registry.get('file-transfer')
......@@ -3860,8 +3749,7 @@ class FileTransfer(object):
def _NH_DNSLookupDidFail(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
if self._stop_event.is_set():
self._terminate()
if self.state in ('ending', 'ended'):
return
self._error = True
self._reason = 'DNS Lookup failed'
......@@ -3893,44 +3781,46 @@ class FileTransfer(object):
self._reason = reason
self._terminate()
def _NH_MediaStreamDidFail(self, notification):
# In principle the Session will end itself because this is the only stream,
# but lets be explicit about it -Saul
def _NH_MediaStreamDidInitialize(self, notification):
if self.direction == 'incoming':
self.filename = os.path.splitext(notification.sender.file_selector.name)[0]
def _NH_MediaStreamDidNotInitialize(self, notification):
self._error = True
self._reason = notification.data.reason
self.end()
def _NH_MediaStreamDidEnd(self, notification):
if self.direction == 'incoming':
# Mark end of write operations
self._process_received_chunk(None)
elif self.state != 'ended':
# In case of SIPSessionDidFail, _terminate() was already called -Saul
if self._finished:
self._error = False
self._reason = 'Completed (%s)' % FileSizeFormatter.format(self._file_selector.size)
else:
self._stream_ended = True
if not self._finished:
if self.direction == 'outgoing':
self._error = True
self._reason = 'Cancelled'
self._reason = 'Interrupted'
self._terminate()
else:
self._terminate()
def _NH_FileTransferStreamGotChunk(self, notification):
if not self._error:
self._file_selector.size = notification.data.file_size
self._process_received_chunk(notification.data.content)
notification.center.post_notification('FileTransferProgress', sender=self, data=NotificationData(bytes=notification.data.transferred_bytes,
total_bytes=notification.data.file_size))
def _NH_FileTransferStreamHashProgress(self, notification):
progress = int(notification.data.processed * 100 / notification.data.total)
notification.center.post_notification('FileTransferHashProgress', sender=self, data=NotificationData(progress=progress))
def _NH_FileTransferStreamDidDeliverChunk(self, notification):
def _NH_FileTransferStreamProgress(self, notification):
notification.center.post_notification('FileTransferProgress', sender=self, data=NotificationData(bytes=notification.data.transferred_bytes,
total_bytes=notification.data.file_size))
def _NH_FileTransferStreamDidNotDeliverChunk(self, notification):
if notification.data.chunk.size > 0:
self._error = True
self._reason = notification.data.reason
self.end()
total_bytes=notification.data.total_bytes))
def _NH_FileTransferStreamDidFinish(self, notification):
def _NH_FileTransferStreamDidFinishTransfer(self, notification):
if self.direction == 'incoming':
# filename could have changed
self.filename = os.path.splitext(notification.sender.file_selector.name)[0]
self._finished = True
self._error = notification.data.error
if not self._error:
self._reason = 'Completed (%s)' % FileSizeFormatter.format(self._file_selector.size)
else:
self._reason = notification.data.reason
if self._stream_ended:
self._terminate()
else:
if self.direction == 'incoming':
call_later(3, self.end)
else:
......@@ -4123,13 +4013,13 @@ class FileTransferItemWidget(base_class, ui_class):
def update_content(self, item, initial=False):
if initial:
self.filename_label.setText(os.path.basename(item.filename))
if item.direction == 'outgoing':
self.name_label.setText(u'To: ' + item.name)
self.icon_label.setPixmap(self.pixmaps.outgoing_transfer)
else:
self.name_label.setText(u'From: ' + item.name)
self.icon_label.setPixmap(self.pixmaps.incoming_transfer)
self.filename_label.setText(os.path.basename(item.filename))
self.status_label.setText(item.status)
if item.ended:
self.state_indicator.display_mode = self.state_indicator.InactiveDisplayMode
......@@ -4213,10 +4103,7 @@ class FileTransferItem(object):
state = notification.data.new_state
if state == 'connecting/dns_lookup':
self.status = u'Looking up destination...'
elif state == 'connecting/hashing':
self.status = u'Computing hash...'
elif state == 'connecting':
self.progress = None
self.status = u'Connecting...'
elif state == 'connecting/ringing':
self.status = u'Ringing...'
......@@ -4228,6 +4115,7 @@ class FileTransferItem(object):
self.status = u'Ending...'
else:
self.status = None
self.progress = None
self.widget.update_content(self)
notification.center.post_notification('FileTransferItemDidChange', sender=self)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment