Commit dfd60bb2 authored by Saul Ibarra's avatar Saul Ibarra

Refactor file transfers for resume capability

parent 33fa0018
...@@ -5,6 +5,7 @@ __all__ = ['ClientConference', 'ConferenceDialog', 'AudioSessionModel', 'AudioSe ...@@ -5,6 +5,7 @@ __all__ = ['ClientConference', 'ConferenceDialog', 'AudioSessionModel', 'AudioSe
import bisect import bisect
import cPickle as pickle import cPickle as pickle
import contextlib
import os import os
import re import re
import string import string
...@@ -3546,17 +3547,17 @@ class BlinkFileTransfer(object): ...@@ -3546,17 +3547,17 @@ class BlinkFileTransfer(object):
self.sip_session = None self.sip_session = None
self.stream = None self.stream = None
self.filename = None self.file_selector = None
self.handler = None
self._file_selector = None # used for outgoing transfers
self._uri = None
self._error = False self._stat = None
self._reason = None
self._finished = False
self._stream_ended = False
def __getstate__(self): def __getstate__(self):
state = dict(id=self.id, direction=self.direction, state=self.state, filename=self.filename, _error=self._error, _reason=self._reason) # duplicate the selector, we cannot serialize the fd
file_selector = FileSelector(name=self.file_selector.name, type=self.file_selector.type, size=self.file_selector.size, hash=self.file_selector.hash)
state = dict(id=self.id, direction=self.direction, state=self.state, file_selector=file_selector)
return self.account.id, self.contact.name, self.contact_uri.uri, state return self.account.id, self.contact.name, self.contact_uri.uri, state
def __setstate__(self, state): def __setstate__(self, state):
...@@ -3573,6 +3574,14 @@ class BlinkFileTransfer(object): ...@@ -3573,6 +3574,14 @@ class BlinkFileTransfer(object):
if self.direction == 'outgoing': if self.direction == 'outgoing':
self._uri = self._normalize_uri(contact_uri) self._uri = self._normalize_uri(contact_uri)
@property
def filename(self):
return self.file_selector.name if self.file_selector is not None else None
@property
def hash(self):
return self.file_selector.hash if self.file_selector is not None else None
def init_incoming(self, contact, contact_uri, session, stream): def init_incoming(self, contact, contact_uri, session, stream):
assert self.state is None assert self.state is None
self.direction = 'incoming' self.direction = 'incoming'
...@@ -3585,8 +3594,8 @@ class BlinkFileTransfer(object): ...@@ -3585,8 +3594,8 @@ class BlinkFileTransfer(object):
self.sip_session = session self.sip_session = session
self.stream = stream self.stream = stream
self._file_selector = stream.file_selector self.file_selector = stream.file_selector
self.filename = self._file_selector.name self.handler = stream.handler
self.state = 'connecting' self.state = 'connecting'
notification_center = NotificationCenter() notification_center = NotificationCenter()
...@@ -3605,8 +3614,8 @@ class BlinkFileTransfer(object): ...@@ -3605,8 +3614,8 @@ class BlinkFileTransfer(object):
self._uri = self._normalize_uri(contact_uri.uri) self._uri = self._normalize_uri(contact_uri.uri)
self._file_selector = FileSelector.for_file(filename) self.file_selector = FileSelector.for_file(filename)
self.filename = filename self._stat = os.fstat(self.file_selector.fd.fileno())
self.state = 'initialized' self.state = 'initialized'
notification_center = NotificationCenter() notification_center = NotificationCenter()
...@@ -3619,12 +3628,12 @@ class BlinkFileTransfer(object): ...@@ -3619,12 +3628,12 @@ class BlinkFileTransfer(object):
if self.state == 'ended': if self.state == 'ended':
# Reinitialize to retry # Reinitialize to retry
self._error = False file_selector = FileSelector.for_file(self.filename)
self._reason = None stat = os.fstat(file_selector.fd.fileno())
self._finished = False if stat.st_mtime == self._stat.st_mtime:
self._stream_ended = False file_selector.hash = self.file_selector.hash
# TODO: remember hash and mtime to avoid re-computing hash -Saul self.file_selector = file_selector
self._file_selector = FileSelector.for_file(self.filename) self._stat = stat
self.state = 'initialized' self.state = 'initialized'
notification_center.post_notification('BlinkFileTransferWillRetry', self) notification_center.post_notification('BlinkFileTransferWillRetry', self)
...@@ -3654,9 +3663,7 @@ class BlinkFileTransfer(object): ...@@ -3654,9 +3663,7 @@ class BlinkFileTransfer(object):
self.sip_session.end() self.sip_session.end()
else: else:
assert self.direction == 'outgoing' assert self.direction == 'outgoing'
self._error = True self._terminate(failure_reason='Cancelled')
self._reason = 'Cancelled'
self._terminate()
def _get_state(self): def _get_state(self):
return self.__dict__['state'] return self.__dict__['state']
...@@ -3704,6 +3711,22 @@ class BlinkFileTransfer(object): ...@@ -3704,6 +3711,22 @@ class BlinkFileTransfer(object):
stream = property(_get_stream, _set_stream) stream = property(_get_stream, _set_stream)
del _get_stream, _set_stream del _get_stream, _set_stream
def _get_handler(self):
return self.__dict__['handler']
def _set_handler(self, value):
old_handler = self.__dict__.get('handler', None)
new_handler = self.__dict__['handler'] = value
if new_handler != old_handler:
notification_center = NotificationCenter()
if old_handler is not None:
notification_center.remove_observer(self, sender=old_handler)
if new_handler is not None:
notification_center.add_observer(self, sender=new_handler)
handler = property(_get_handler, _set_handler)
del _get_handler, _set_handler
def _normalize_uri(self, uri): def _normalize_uri(self, uri):
if '@' not in uri: if '@' not in uri:
uri += '@' + self.account.id.domain uri += '@' + self.account.id.domain
...@@ -3711,18 +3734,23 @@ class BlinkFileTransfer(object): ...@@ -3711,18 +3734,23 @@ class BlinkFileTransfer(object):
uri = 'sip:' + uri uri = 'sip:' + uri
return SIPURI.parse(str(uri).translate(None, ' \t')) return SIPURI.parse(str(uri).translate(None, ' \t'))
def _terminate(self): def _terminate(self, failure_reason=None):
self.state = 'ending' # if the state is not ending already, simulate it self.state = 'ending' # if the state is not ending already, simulate it
self.sip_session = None self.sip_session = None
self.stream = None self.stream = None
self.handler = None
if failure_reason is not None:
end_reason = failure_reason
else:
end_reason = 'Completed (%s)' % FileSizeFormatter.format(self.file_selector.size)
state = SessionState('ended') state = SessionState('ended')
state.reason = self._reason state.reason = end_reason
state.error = self._error state.error = failure_reason is not None
self.state = state self.state = state
notification_center = NotificationCenter() notification_center = NotificationCenter()
notification_center.post_notification('BlinkFileTransferDidEnd', sender=self, data=NotificationData(reason=self._reason, error=self._error)) notification_center.post_notification('BlinkFileTransferDidEnd', sender=self, data=NotificationData(reason=state.reason, error=state.error))
@run_in_gui_thread @run_in_gui_thread
def handle_notification(self, notification): def handle_notification(self, notification):
...@@ -3735,23 +3763,20 @@ class BlinkFileTransfer(object): ...@@ -3735,23 +3763,20 @@ class BlinkFileTransfer(object):
return return
routes = notification.data.result routes = notification.data.result
if not routes: if not routes:
self._error = True self._terminate(failure_reason='Destination not found')
self._reason = 'Destination not found'
self._terminate()
return return
self.sip_session = Session(self.account) self.sip_session = Session(self.account)
registry = MediaStreamRegistry() registry = MediaStreamRegistry()
cls = registry.get('file-transfer') cls = registry.get('file-transfer')
self.stream = cls(self._file_selector, 'sendonly', transfer_id=self.id) self.stream = cls(self.file_selector, 'sendonly', transfer_id=self.id)
self.handler = self.stream.handler
self.sip_session.connect(ToHeader(self._uri), routes, [self.stream]) self.sip_session.connect(ToHeader(self._uri), routes, [self.stream])
def _NH_DNSLookupDidFail(self, notification): def _NH_DNSLookupDidFail(self, notification):
notification.center.remove_observer(self, sender=notification.sender) notification.center.remove_observer(self, sender=notification.sender)
if self.state in ('ending', 'ended'): if self.state in ('ending', 'ended'):
return return
self._error = True self._terminate(failure_reason='DNS Lookup failed')
self._reason = 'DNS Lookup failed'
self._terminate()
def _NH_SIPSessionNewOutgoing(self, notification): def _NH_SIPSessionNewOutgoing(self, notification):
self.state = 'connecting' self.state = 'connecting'
...@@ -3766,63 +3791,22 @@ class BlinkFileTransfer(object): ...@@ -3766,63 +3791,22 @@ class BlinkFileTransfer(object):
def _NH_SIPSessionDidStart(self, notification): def _NH_SIPSessionDidStart(self, notification):
self.state = 'connected' self.state = 'connected'
def _NH_SIPSessionDidFail(self, notification):
if notification.data.failure_reason == 'user request':
if notification.data.code == 487:
reason = 'Cancelled'
else:
reason = notification.data.reason
else:
reason = notification.data.failure_reason
if self.state != 'ended':
self._error = True
self._reason = reason
self._terminate()
def _NH_MediaStreamDidInitialize(self, notification):
if self.direction == 'incoming':
self.filename = os.path.splitext(notification.sender.file_selector.name)[0]
def _NH_MediaStreamDidNotInitialize(self, notification): def _NH_MediaStreamDidNotInitialize(self, notification):
self._error = True self._terminate(failure_reason=notification.data.reason)
self._reason = notification.data.reason
self.end()
def _NH_MediaStreamDidEnd(self, notification): def _NH_FileTransferHandlerHashProgress(self, notification):
self._stream_ended = True
if not self._finished:
if self.direction == 'outgoing':
self._error = True
self._reason = 'Interrupted'
self._terminate()
else:
self._terminate()
def _NH_FileTransferHashProgress(self, notification):
progress = int(notification.data.processed * 100 / notification.data.total) progress = int(notification.data.processed * 100 / notification.data.total)
notification.center.post_notification('BlinkFileTransferHashProgress', sender=self, data=NotificationData(progress=progress)) notification.center.post_notification('BlinkFileTransferHashProgress', sender=self, data=NotificationData(progress=progress))
def _NH_FileTransferProgress(self, notification): def _NH_FileTransferHandlerProgress(self, notification):
notification.center.post_notification('BlinkFileTransferProgress', sender=self, data=NotificationData(bytes=notification.data.transferred_bytes, notification.center.post_notification('BlinkFileTransferProgress', sender=self, data=NotificationData(bytes=notification.data.transferred_bytes,
total_bytes=notification.data.total_bytes)) total_bytes=notification.data.total_bytes))
def _NH_FileTransferHandlerDidEnd(self, notification):
def _NH_FileTransferDidEnd(self, notification):
if self.direction == 'incoming':
# filename could have changed
self.filename = notification.sender.file_selector.name
self._finished = True
self._error = notification.data.error
if not self._error:
self._reason = 'Completed (%s)' % FileSizeFormatter.format(self._file_selector.size)
else:
self._reason = notification.data.reason
if self._stream_ended:
self._terminate()
else:
if self.direction == 'incoming': if self.direction == 'incoming':
call_later(3, self.end) call_later(3, self.sip_session.end)
else: else:
self.end() self.sip_session.end()
self._terminate(failure_reason=notification.data.reason)
class TransferStateLabel(QLabel, ColorHelperMixin): class TransferStateLabel(QLabel, ColorHelperMixin):
...@@ -4218,6 +4202,14 @@ class TransferHistory(object): ...@@ -4218,6 +4202,14 @@ class TransferHistory(object):
self._transaction_level = 0 self._transaction_level = 0
self._pending_items = [] self._pending_items = []
@contextlib.contextmanager
def transaction(self):
self.begin_transaction()
try:
yield
finally:
self.commit_transaction()
def begin_transaction(self): def begin_transaction(self):
self._transaction_level += 1 self._transaction_level += 1
...@@ -4260,8 +4252,8 @@ class FileTransferModel(QAbstractListModel): ...@@ -4260,8 +4252,8 @@ class FileTransferModel(QAbstractListModel):
notification_center = NotificationCenter() notification_center = NotificationCenter()
notification_center.add_observer(self, name='BlinkFileTransferNewIncoming') notification_center.add_observer(self, name='BlinkFileTransferNewIncoming')
notification_center.add_observer(self, name='BlinkFileTransferNewOutgoing') notification_center.add_observer(self, name='BlinkFileTransferNewOutgoing')
notification_center.add_observer(self, name='BlinkFileTransferItemDidChange')
notification_center.add_observer(self, name='BlinkFileTransferDidEnd') notification_center.add_observer(self, name='BlinkFileTransferDidEnd')
notification_center.add_observer(self, name='FileTransferItemDidChange')
notification_center.add_observer(self, name='SIPApplicationDidStart') notification_center.add_observer(self, name='SIPApplicationDidStart')
@property @property
...@@ -4269,12 +4261,9 @@ class FileTransferModel(QAbstractListModel): ...@@ -4269,12 +4261,9 @@ class FileTransferModel(QAbstractListModel):
return [item for item in self.items if item.ended] return [item for item in self.items if item.ended]
def clear_ended(self): def clear_ended(self):
self.history.begin_transaction() with self.history.transaction():
try:
for item in self.ended_items: for item in self.ended_items:
self.removeItem(item) self.removeItem(item)
finally:
self.history.commit_transaction()
def rowCount(self, parent=QModelIndex()): def rowCount(self, parent=QModelIndex()):
return len(self.items) return len(self.items)
...@@ -4317,18 +4306,24 @@ class FileTransferModel(QAbstractListModel): ...@@ -4317,18 +4306,24 @@ class FileTransferModel(QAbstractListModel):
handler(notification) handler(notification)
def _NH_BlinkFileTransferNewIncoming(self, notification): def _NH_BlinkFileTransferNewIncoming(self, notification):
self.addItem(FileTransferItem(notification.sender)) transfer = notification.sender
with self.history.transaction():
for item in (item for item in self.items if item.failed and item.direction=='incoming'):
if item.transfer.contact == transfer.contact and item.transfer.hash == transfer.hash:
self.removeItem(item)
break
self.addItem(FileTransferItem(transfer))
def _NH_BlinkFileTransferNewOutgoing(self, notification): def _NH_BlinkFileTransferNewOutgoing(self, notification):
self.addItem(FileTransferItem(notification.sender)) self.addItem(FileTransferItem(notification.sender))
def _NH_BlinkFileTransferItemDidChange(self, notification):
index = self.index(self.items.index(notification.sender))
self.dataChanged.emit(index, index)
def _NH_BlinkFileTransferDidEnd(self, notification): def _NH_BlinkFileTransferDidEnd(self, notification):
self.history.save(self.ended_items) self.history.save(self.ended_items)
def _NH_FileTransferItemDidChange(self, notification):
index = self.index(self.items.index(notification.sender))
self.dataChanged.emit(index, index)
def _NH_SIPApplicationDidStart(self, notification): def _NH_SIPApplicationDidStart(self, notification):
self.beginResetModel() self.beginResetModel()
self.items = self.history.load() self.items = self.history.load()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment