Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
V
vmj-qt
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Kulya
vmj-qt
Commits
bd962709
Commit
bd962709
authored
Jun 26, 2023
by
Tijmen de Mes
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Added support for file encryption/decryption
parent
58d5bb4c
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
69 additions
and
2 deletions
+69
-2
message.py
blink/streams/message.py
+69
-2
No files found.
blink/streams/message.py
View file @
bd962709
...
...
@@ -2,7 +2,7 @@ import os
from
application.notification
import
IObserver
,
NotificationCenter
,
NotificationData
from
application.python
import
Null
from
application.system
import
makedirs
from
application.system
import
makedirs
,
unlink
,
openfile
,
FileExistsError
from
otr
import
OTRTransport
from
otr.exceptions
import
IgnoreMessage
,
UnencryptedMessage
,
EncryptedMessageError
,
OTRError
,
OTRFinishedError
...
...
@@ -20,7 +20,7 @@ from pgpy.errors import PGPError, PGPDecryptionError
from
pgpy.constants
import
PubKeyAlgorithm
,
KeyFlags
,
HashAlgorithm
,
SymmetricKeyAlgorithm
,
CompressionAlgorithm
from
blink.logging
import
MessagingTrace
as
log
from
blink.util
import
run_in_gui_thread
from
blink.util
import
run_in_gui_thread
,
UniqueFilenameGenerator
__all__
=
[
'MessageStream'
]
...
...
@@ -274,6 +274,73 @@ class MessageStream(object, metaclass=MediaStreamType):
log
.
warning
(
f
'-- Decryption failed for {msg_id}, error: {error}'
)
notification_center
.
post_notification
(
'PGPMessageDidNotDecrypt'
,
sender
=
session
,
data
=
NotificationData
(
message
=
message
,
error
=
error
))
@
run_in_thread
(
'pgp'
)
def
encrypt_file
(
self
,
filename
,
transfer_session
):
session
=
self
.
blink_session
notification_center
=
NotificationCenter
()
pgp_message
=
PGPMessage
.
new
(
filename
,
file
=
True
,
compression
=
CompressionAlgorithm
.
Uncompressed
)
cipher
=
SymmetricKeyAlgorithm
.
AES256
sessionkey
=
cipher
.
gen_key
()
encrypted_content
=
self
.
public_key
.
encrypt
(
pgp_message
,
cipher
=
cipher
,
sessionkey
=
sessionkey
)
encrypted_content
=
self
.
remote_public_key
.
encrypt
(
encrypted_content
,
cipher
=
cipher
,
sessionkey
=
sessionkey
)
del
sessionkey
notification_center
.
post_notification
(
'PGPFileDidEncrypt'
,
sender
=
session
,
data
=
NotificationData
(
filename
=
f
'{filename}.asc'
,
contents
=
encrypted_content
))
@
run_in_thread
(
'pgp'
)
def
decrypt_file
(
self
,
filename
,
transfer_session
):
session
=
self
.
blink_session
notification_center
=
NotificationCenter
()
if
self
.
private_key
is
None
and
len
(
self
.
other_private_keys
)
==
0
:
notification_center
.
post_notification
(
'PGPFileDidNotDecrypt'
,
sender
=
session
,
data
=
NotificationData
(
filename
=
filename
,
error
=
"No private keys found"
))
return
log
.
info
(
f
'Trying to decrypt file {filename}'
)
try
:
pgpMessage
=
PGPMessage
.
from_file
(
filename
)
except
(
ValueError
)
as
e
:
log
.
warning
(
f
'Decryption failed for {filename}, this is not a PGP File, error: {e}'
)
return
key_list
=
[(
session
.
account
,
self
.
private_key
)]
if
self
.
private_key
is
not
None
else
[]
key_list
.
extend
(
self
.
other_private_keys
)
error
=
None
for
(
account
,
key
)
in
key_list
:
try
:
decrypted_message
=
key
.
decrypt
(
pgpMessage
)
except
(
PGPDecryptionError
,
PGPError
)
as
e
:
error
=
e
log
.
debug
(
f
'-- Decryption failed for {filename} with account key {account.id}, error: {error}'
)
continue
else
:
log
.
info
(
f
'File decrypted: {decrypted_message.filename}'
)
dir
=
os
.
path
.
dirname
(
filename
)
full_decrypted_filepath
=
os
.
path
.
join
(
dir
,
decrypted_message
.
filename
)
file_contents
=
decrypted_message
.
message
if
isinstance
(
decrypted_message
.
message
,
bytearray
)
else
decrypted_message
.
message
.
encode
()
for
name
in
UniqueFilenameGenerator
.
generate
(
full_decrypted_filepath
):
try
:
openfile
(
name
,
'xb'
)
except
FileExistsError
:
continue
else
:
full_decrypted_filepath
=
name
break
with
open
(
full_decrypted_filepath
,
'wb+'
)
as
output_file
:
output_file
.
write
(
file_contents
)
log
.
info
(
f
'File saved: {full_decrypted_filepath}'
)
unlink
(
filename
)
notification_center
.
post_notification
(
'PGPFileDidDecrypt'
,
sender
=
session
,
data
=
NotificationData
(
filename
=
full_decrypted_filepath
,
account
=
account
,
id
=
transfer_session
.
id
))
return
log
.
warning
(
f
'-- Decryption failed for {filename}, error: {error}'
)
notification_center
.
post_notification
(
'PGPFileDidNotDecrypt'
,
sender
=
transfer_session
,
data
=
NotificationData
(
filename
=
filename
,
error
=
error
))
@
run_in_gui_thread
def
handle_notification
(
self
,
notification
):
handler
=
getattr
(
self
,
'_NH_
%
s'
%
notification
.
name
,
Null
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment