Commit ce2a7839 authored by Tijmen de Mes's avatar Tijmen de Mes

Extend history manager to support storage of chat/SIP messages

parent 10e85b6e
......@@ -5,10 +5,11 @@ import re
from PyQt5.QtGui import QIcon
from application.notification import IObserver, NotificationCenter
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.types import Singleton
from datetime import date
from datetime import date, datetime, timezone
from dateutil.tz import tzlocal
from zope.interface import implementer
......@@ -18,9 +19,13 @@ from sipsimple.threading import run_in_thread
from sipsimple.util import ISOTimestamp
from blink.resources import ApplicationData, Resources
from blink.messages import BlinkMessage
from blink.util import run_in_gui_thread
import traceback
from sqlobject import SQLObject, StringCol, DateTimeCol, DateCol, IntCol, UnicodeCol, DatabaseIndex
from sqlobject import connectionForURI
from sqlobject import dberrors
__all__ = ['HistoryManager']
......@@ -29,6 +34,7 @@ __all__ = ['HistoryManager']
class HistoryManager(object, metaclass=Singleton):
history_size = 20
sip_prefix_re = re.compile('^sips?:')
def __init__(self):
try:
......@@ -40,15 +46,32 @@ class HistoryManager(object, metaclass=Singleton):
self.calls = []
else:
self.calls = data[-self.history_size:]
self.message_history = MessageHistory()
notification_center = NotificationCenter()
notification_center.add_observer(self, name='SIPSessionDidEnd')
notification_center.add_observer(self, name='SIPSessionDidFail')
notification_center.add_observer(self, name='ChatStreamGotMessage')
notification_center.add_observer(self, name='ChatStreamWillSendMessage')
notification_center.add_observer(self, name='ChatStreamDidSendMessage')
notification_center.add_observer(self, name='ChatStreamDidDeliverMessage')
notification_center.add_observer(self, name='ChatStreamDidNotDeliverMessage')
notification_center.add_observer(self, name='BlinkMessageIsParsed')
notification_center.add_observer(self, name='BlinkMessageIsPending')
notification_center.add_observer(self, name='BlinkMessageDidSucceed')
notification_center.add_observer(self, name='BlinkMessageDidFail')
notification_center.add_observer(self, name='BlinkGotDispositionNotification')
notification_center.add_observer(self, name='BlinkDidSendDispositionNotification')
@run_in_thread('file-io')
def save(self):
with open(ApplicationData.get('calls_history'), 'wb+') as history_file:
pickle.dump(self.calls, history_file)
def load(self, uri, session):
return self.message_history.load(uri, session)
@run_in_gui_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
......@@ -84,6 +107,258 @@ class HistoryManager(object, metaclass=Singleton):
self.calls = self.calls[-self.history_size:]
self.save()
def _NH_ChatStreamGotMessage(self, notification):
message = notification.data.message
if notification.sender.blink_session.remote_focus and self.sip_prefix_re.sub('',str(message.sender.uri)) not in notification.sender.blink_session.server_conference.participants:
return
is_status_message = any(h.name == 'Message-Type' and h.value == 'status' and h.namespace == 'urn:ag-projects:xml:ns:cpim' for h in message.additional_headers)
if not is_status_message:
blink_message = BlinkMessage(**{slot: getattr(message, slot) for slot in message.__slots__})
self.message_history.add_with_session(notification.sender.blink_session, blink_message, 'incoming', 'delivered')
def _NH_ChatStreamWillSendMessage(self, notification):
self.message_history.add_with_session(notification.sender, notification.data, 'outgoing')
def _NH_ChatStreamDidSendMessage(self, notification):
self.message_history.update(notification.data.message.message_id, 'accepted')
def _NH_ChatStreamDidDeliverMessage(self, notification):
self.message_history.update(notification.data.message.message_id, 'delivered')
def _NH_ChatStreamDidNotDeliverMessage(self, notification):
self.message_history.update(notification.data.message.message_id, 'failed')
def _NH_BlinkMessageIsParsed(self, notification):
session = notification.sender
message = notification.data
self.message_history.add_with_session(session, message, 'incoming')
def _NH_BlinkMessageIsPending(self, notification):
session = notification.sender
data = notification.data
self.message_history.add_with_session(session, data.message, 'outgoing')
def _NH_BlinkMessageDidSucceed(self, notification):
data = notification.data
self.message_history.update(data.id, 'accepted')
def _NH_BlinkMessageDidFail(self, notification):
data = notification.data
self.message_history.update(data.id, 'failed')
def _NH_BlinkGotDispositionNotification(self, notification):
data = notification.data
self.message_history.update(data.id, data.status)
def _NH_BlinkDidSendDispositionNotification(self, notification):
data = notification.data
self.message_history.update(data.id, data.status)
class TableVersion(SQLObject):
class sqlmeta:
table = 'table_versions'
table_name = StringCol(alternateID=True)
version = IntCol()
class Message(SQLObject):
class sqlmeta:
table = 'messages'
message_id = StringCol()
account_id = UnicodeCol(length=128)
remote_uri = UnicodeCol(length=128)
display_name = UnicodeCol(length=128)
uri = UnicodeCol(length=128, default='')
timestamp = DateTimeCol()
direction = StringCol()
content = UnicodeCol(sqlType='LONGTEXT')
content_type = StringCol(default='text')
state = StringCol(default='pending')
encryption_type = StringCol(default='')
disposition = StringCol(default='')
remote_idx = DatabaseIndex('remote_uri')
id_idx = DatabaseIndex('message_id')
class TableVersions(object, metaclass=Singleton):
__version__ = 1
__versions__ = {}
def __init__(self):
db_file = ApplicationData.get('message_history.db')
db_uri = f'sqlite://{db_file}'
self._initialize(db_uri)
@run_in_thread('db')
def _initialize(self, db_uri):
self.db = connectionForURI(db_uri)
TableVersion._connection = self.db
if not TableVersion.tableExists():
try:
TableVersion.createTable()
except Exception as e:
pass
else:
self.set_version(TableVersion.sqlmeta.table, self.__version__)
else:
self._load_versions()
@run_in_thread('db')
def _load_versions(self):
contents = TableVersion.select()
for table_version in list(contents):
self.__versions__[table_version.table_name] = table_version.version
def version(self, table):
try:
return self.__versions__[table]
except KeyError:
return None
@run_in_thread('db')
def set_version(self, table, version):
try:
TableVersion(table_name=table, version=version)
except (dberrors.DuplicateEntryError, dberrors.IntegrityError):
try:
record = TableVersion.selectBy(table_name=table).getOne()
record.version = version
except Exception as e:
pass
except Exception as e:
pass
self.__versions__[table] = version
class MessageHistory(object, metaclass=Singleton):
__version__ = 1
phone_number_re = re.compile(r'^(?P<number>(0|00|\+)[1-9]\d{7,14})@')
def __init__(self):
db_file = ApplicationData.get('message_history.db')
db_uri = f'sqlite://{db_file}'
self._initialize(db_uri)
@run_in_thread('db')
def _initialize(self, db_uri):
self.db = connectionForURI(db_uri)
Message._connection = self.db
self.table_versions = TableVersions()
if not Message.tableExists():
try:
Message.createTable()
except Exception as e:
pass
else:
self.table_versions.set_version(Message.sqlmeta.table, self.__version__)
else:
self._check_table_version()
def _check_table_version(self):
db_table_version = self.table_versions.version(Message.sqlmeta.table)
if self.__version__ != db_table_version:
pass
@classmethod
@run_in_thread('db')
def add_with_session(cls, session, message, direction, state=None):
if message.content.startswith('?OTRv'):
return
# print(f"-- Adding message to storage: {message.id}")
user = session.uri.user
domain = session.uri.host
user = user.decode() if isinstance(user, bytes) else user
domain = domain.decode() if isinstance(domain, bytes) else domain
remote_uri = '%s@%s' % (user, domain)
match = cls.phone_number_re.match(remote_uri)
if match:
remote_uri = match.group('number')
try:
contact = next(contact for contact in AddressbookManager().get_contacts() if remote_uri in (addr.uri for addr in contact.uris))
except StopIteration:
display_name = message.sender.display_name
else:
display_name = contact.name
timestamp_native = message.timestamp
timestamp_utc = timestamp_native.replace(tzinfo=timezone.utc)
message.timestamp = timestamp_utc - message.timestamp.utcoffset()
timestamp = datetime.fromisoformat(str(message.timestamp))
optional_fields = {}
if state is not None:
optional_fields['state'] = state
if session.chat_type is not None:
chat_info = session.info.streams.chat
if chat_info.encryption is not None and chat_info.transport == 'tls':
optional_fields['encryption_type'] = str(['TLS', '{0.encryption} ({0.encryption_cipher}'.format(chat_info)])
elif chat_info.encryption is not None:
optional_fields['encryption_type'] = str(['{0.encryption} ({0.encryption_cipher}'.format(chat_info)])
elif chat_info.transport == 'tls':
optional_fields['encryption_type'] = str(['TLS'])
db_message = Message(remote_uri=remote_uri,
display_name=display_name,
uri=str(message.sender.uri),
content=message.content,
content_type=message.content_type,
message_id=message.id,
account_id=str(session.account.id),
direction=direction,
timestamp=timestamp,
disposition=str(message.disposition),
**optional_fields)
@run_in_thread('db')
def update(self, id, state):
try:
message = Message.selectBy(message_id=id)[0]
except IndexError:
pass
else:
# print(f'-- Updating {id} {message.state} -> {state}')
message.state = state
@run_in_thread('db')
def load(self, uri, session):
# print('-- Loading messages')
notification_center = NotificationCenter()
try:
result = Message.selectBy(remote_uri=uri)[:100]
except Exception as e:
notification_center.post_notification('BlinkMessageHistoryLoadDidFail', sender=session, data=NotificationData(uri=uri))
return
# print(f"-- Messages loaded: {len(list(result))}")
notification_center.post_notification('BlinkMessageHistoryLoadDidSucceed', sender=session, data=NotificationData(messages=list(result),uri=uri))
@run_in_thread('db')
def remove(self, account):
Message.deleteBy(account=account)
@run_in_thread('db')
def remove_contact_messages(self, contact):
Message.deleteBy(remote_uri=contact)
@run_in_thread('db')
def remove_message(self, id):
try:
result = Message.selectBy(message_id=id)[0]
except IndexError:
return
result.destroySelf()
class IconDescriptor(object):
def __init__(self, filename):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment