Skip to content
Snippets Groups Projects
Commit d345c11a authored by Recolic's avatar Recolic :house_with_garden:
Browse files

rpi script

parent e30e58ce
No related branches found
No related tags found
No related merge requests found
# This module keeps target machine unlocked by one of two ways:
# 1. moving mouse by 1 pixel every 58 seconds.
# 2. press "PrintScreen" key every 58 seconds. (Pause/Break key is widely used in Powershell and Putty, it will break manything)
import time, threading
from hid import keyboard as fake_keyboard
from hid import keycodes as hid_keycodes
import os.path
import os
def wake_once_method_2(keyboard_path):
hid_keycode = hid_keycodes.KEYCODE_PRINT_SCREEN
hid_modifier_keycode = 0
failed = "success"
try:
fake_keyboard.send_keystroke(keyboard_path, hid_modifier_keycode, hid_keycode)
except Exception as e:
failed = "failed " + str(e)
pass
#with open('/tmp/rlog2', 'w') as f:
# f.write('DEBUG: triggered! status=' + str(failed))
def keep_awake_daemon_func(keyboard_path):
wake_once_method_2(keyboard_path)
while True:
time.sleep(60+58)
wake_once_method_2(keyboard_path)
def send_up_enter(keyboard_path):
failed = "success"
try:
fake_keyboard.send_keystroke(keyboard_path, 0, hid_keycodes.KEYCODE_UP_ARROW)
time.sleep(1)
fake_keyboard.send_keystroke(keyboard_path, 0, hid_keycodes.KEYCODE_ENTER)
except Exception as e:
failed = "failed " + str(e)
pass
def recv_restapi_func(keyboard_path):
prev_res = ""
while True:
time.sleep(1)
if os.path.isfile('/tmp/trigger'):
# os.remove('/tmp/trigger')
with open('/tmp/trigger') as f:
res = f.read()
if res != prev_res:
print("DEBUG: triggered")
send_up_enter(keyboard_path)
prev_res = res
def start_keep_awake_thread(keyboard_path):
thread = threading.Thread(target = keep_awake_daemon_func, args=(keyboard_path,))
thread.start()
thread = threading.Thread(target = recv_restapi_func, args=(keyboard_path,))
thread.start()
#with open('/tmp/rlog', 'w') as f:
# f.write('DEBUG: keep unlocked thread started')
# Never join
import http.server, socketserver
import hashlib
import random
import subprocess
listen_port = 30401
class my_handler(http.server.BaseHTTPRequestHandler):
def do_HEAD(self):
self.send_header("Content-type", "text/plain; charset=utf-8")
self.end_headers()
self.send_response(200)
def do_GET(self):
self.send_response(200)
if self.path.startswith('/trigger'):
self.send_header("Content-type", "text/plain; charset=utf-8")
self.end_headers()
with open('/tmp/trigger', 'w+') as f:
f.write(str(random.random()))
self.wfile.write('ok'.encode('utf-8'))
return
self.send_response(403)
self.send_header("Content-type", "text/plain; charset=utf-8")
self.end_headers()
self.wfile.write('invalid get query.'.encode('utf-8'))
try:
server = http.server.HTTPServer(('', listen_port), my_handler)
print('Listening *:' + str(listen_port))
server.serve_forever()
except KeyboardInterrupt:
server.socket.close()
function ping_test () {
timeout 1s ping -c 1 10.0.0.4 &&
timeout 1s ping -c 1 10.0.0.5 &&
timeout 1s ping -c 1 10.0.0.6 &&
timeout 1s ping -c 1 10.0.0.7 &&
echo "passed $(date)"
return $?
}
while true; do
# api: restart bp
echo -n "API restart bp: "
curl http://recolic-home.freemyip.com:30401/trigger
date >> tests.log
sleep 7
ping_test && continue
sleep 10 # wait for second try
ping_test && continue
sleep 10 # wait for second try
ping_test && continue
sleep 60 # wait for second try
ping_test && continue
sleep 10 # wait for second try
ping_test && continue
echo "REPRO!!"
break
done
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment