adds all initial communication functionality
- adds rest server wraper - adds secret generation - adds secret visualization
This commit is contained in:
commit
424812432c
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
__pycache__/*
|
||||
.bump*
|
22
README.md
Normal file
22
README.md
Normal file
@ -0,0 +1,22 @@
|
||||
## Bump Python Client
|
||||
|
||||
This Bump client enables easy access to the bump service through a python api.
|
||||
Bump any other client anywhere through an easy to use API.
|
||||
|
||||
## working functions
|
||||
|
||||
- peek
|
||||
- pop
|
||||
- push
|
||||
- list
|
||||
- clear
|
||||
- generate new cryptographicaly safe secret
|
||||
- visualize with rudimentary qr code to share with other clients
|
||||
|
||||
## missing
|
||||
|
||||
- add an existing secret
|
||||
- scan existing qr code
|
||||
- sync 'read' to other clients
|
||||
- alert function that shows any new notifications
|
||||
|
144
main.py
Normal file
144
main.py
Normal file
@ -0,0 +1,144 @@
|
||||
import random
|
||||
import qrcode
|
||||
import requests
|
||||
import base64
|
||||
import re
|
||||
|
||||
SENDER_LENGTH = 4
|
||||
PASSWORD_LENGTH = 8
|
||||
|
||||
from random_word import RandomWords
|
||||
|
||||
import secrets
|
||||
from base64 import urlsafe_b64encode as b64e, urlsafe_b64decode as b64d
|
||||
|
||||
from cryptography.fernet import Fernet
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
|
||||
r = RandomWords()
|
||||
|
||||
backend = default_backend()
|
||||
iterations = 100_000
|
||||
|
||||
def _derive_key(password: bytes, salt: bytes, iterations: int = iterations) -> bytes:
|
||||
"""Derive a secret key from a given password and salt"""
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(), length=32, salt=salt,
|
||||
iterations=iterations, backend=backend)
|
||||
return b64e(kdf.derive(password))
|
||||
|
||||
def password_encrypt(message: bytes, password: str, iterations: int = iterations) -> bytes:
|
||||
salt = secrets.token_bytes(16)
|
||||
key = _derive_key(password.encode(), salt, iterations)
|
||||
return b64e(
|
||||
b'%b%b%b' % (
|
||||
salt,
|
||||
iterations.to_bytes(4, 'big'),
|
||||
b64d(Fernet(key).encrypt(message)),
|
||||
)
|
||||
)
|
||||
|
||||
def password_decrypt(token: bytes, password: str) -> bytes:
|
||||
decoded = b64d(token)
|
||||
salt, iter, token = decoded[:16], decoded[16:20], b64e(decoded[20:])
|
||||
iterations = int.from_bytes(iter, 'big')
|
||||
key = _derive_key(password.encode(), salt, iterations)
|
||||
return Fernet(key).decrypt(token)
|
||||
|
||||
class Bump:
|
||||
def __init__(self, secret=None, secret_file='.bump_secrets'):
|
||||
self.secrets = self.load_secrets(secret, secret_file)
|
||||
if self.secrets == []:
|
||||
self.generate_secret(secret_file)
|
||||
self.show_secrets()
|
||||
|
||||
self.URL = "http://0.0.0.0:4000/api/"
|
||||
|
||||
|
||||
|
||||
def load_secrets(self, secret, secrets_file='.bump_secrets'):
|
||||
try:
|
||||
with open(secrets_file, 'r') as f:
|
||||
secrets = f.read().splitlines()
|
||||
except FileNotFoundError:
|
||||
secrets = []
|
||||
if secret is not None and secret not in self.secrets:
|
||||
secrets.append(secret)
|
||||
|
||||
return secrets
|
||||
|
||||
def generate_secret(self, secret_file):
|
||||
WORDS = r.get_random_words()
|
||||
secret = ""
|
||||
for _ in range(SENDER_LENGTH + PASSWORD_LENGTH):
|
||||
secret += random.choice(WORDS) + "-"
|
||||
while len(secret[SENDER_LENGTH:]) < 32:
|
||||
secret += random.choice(WORDS) + "-"
|
||||
secret = secret[:-1]
|
||||
|
||||
self.secrets.append(secret)
|
||||
|
||||
with open(secret_file, 'a+') as f:
|
||||
f.write(secret + '\n')
|
||||
|
||||
|
||||
def show_secrets(self):
|
||||
for secret in self.secrets:
|
||||
qrcode.make(secret).show()
|
||||
print(secret)
|
||||
|
||||
def get_password(self, index=0):
|
||||
secret = self.secrets[index].split('-')
|
||||
return "-".join(secret[SENDER_LENGTH:])
|
||||
|
||||
|
||||
def get_sender(self, index=0):
|
||||
words = self.secrets[index].split('-')
|
||||
return "-".join(words[0:SENDER_LENGTH])
|
||||
|
||||
def encrypt(self, data):
|
||||
password = self.get_password()
|
||||
return password_encrypt(data.encode(), password)
|
||||
|
||||
def decrypt(self, data):
|
||||
password = self.get_password()
|
||||
return password_decrypt(data, password).decode()
|
||||
|
||||
def push(self, data):
|
||||
url = self.URL + "push"
|
||||
params = {
|
||||
'sender': self.get_sender(),
|
||||
'data': self.encrypt(data)
|
||||
}
|
||||
|
||||
print(url)
|
||||
r = requests.post(url, params)
|
||||
print(r.json())
|
||||
|
||||
|
||||
def peak(self):
|
||||
return self.get_post("peak")
|
||||
|
||||
def pop(self):
|
||||
return self.get_post("pop")
|
||||
|
||||
def list(self):
|
||||
return self.get_post("list")
|
||||
|
||||
def clear(self):
|
||||
return self.get_post("clear")
|
||||
|
||||
def get_post(self, mechanism):
|
||||
url = self.URL + mechanism
|
||||
params = {
|
||||
'sender': self.get_sender()
|
||||
}
|
||||
r = requests.post(url, params)
|
||||
if r.json() == {}:
|
||||
return {}
|
||||
return self.decrypt(r.json().get('data'))
|
||||
|
||||
if __name__ == '__main__':
|
||||
b = Bump()
|
Loading…
x
Reference in New Issue
Block a user