Code Monkey home page Code Monkey logo

Comments (11)

swwgames avatar swwgames commented on June 29, 2024

I forgot this part:
The challenge I'm facing is that I'm unsure about how to implement a fix for this issue. I'm looking forward to a response from the community for guidance or assistance in resolving this.

If you have any questions or need further clarification, feel free to ask.

from tp-link-archer-c6u.

AlexandrErohin avatar AlexandrErohin commented on June 29, 2024

Hi!
Great that you have succeed with your router
Does methods get_firmware, get_status, set_wifi, reboot, work for your router after authorization changes?
Could you also try to run test.py? do all checks pass?

from tp-link-archer-c6u.

swwgames avatar swwgames commented on June 29, 2024

Thank you for your response. I have tested the methods you mentioned and here are my findings:

I started testing with get_status, that function works like it should.

Then i tested get_firmware function, this function returend an error unknown response.
After some debuging i found that the request url was http://192.168.0.1/cgi-bin/luci/;stok=stok/admin/firmware?form=upgrade.
This gave an error because the needed operation=read at the end. like this: http://192.168.0.1/cgi-bin/luci/;stok=stok/admin/firmware?form=upgrade&operation=read.
Now the code responds with the right data.
To make this change I adjusted this line in the client.py file:

    def _get_firmware(self) -> Firmware:
        data = self._get_data('admin/firmware?form=upgrade', 'operation=read')
        firmware = Firmware(data.get('hardware_version', ''), data.get('model', ''), data.get('firmware_version', ''))

        return firmware

To this:

    def _get_firmware(self) -> Firmware:
        data = self._get_data('admin/firmware?form=upgrade&operation=read')
        firmware = Firmware(data.get('hardware_version', ''), data.get('model', ''), data.get('firmware_version', ''))

        return firmware

I also tested the reboot function, that works as expected, just as set wifi, that also works as expected.

I also tested other functions and made necessary adjustments:

get_ipv4_status
this function does get the data (i added a line to print the data imidiatly after the response for debugging)
but ends in an error with key word remote. After checking the data this function returns, I found that it doesn't contain the remote data.
so i commanded it out, and now it doesn't return an error.

get_ipv4_reservations
this function has the same problem as get_firmware the url doesn't contain the &operation=load.
So I changed this:

    def _get_ipv4_reservations(self) -> [IPv4Reservation]:
        ipv4_reservations = []
        data = self._get_data('admin/dhcps?form=reservation', 'operation=load')

        for item in data:
            ipv4_reservations.append(
                IPv4Reservation(macaddress.EUI48(item['mac']), ipaddress.IPv4Address(item['ip']), item['comment'],
                                self._str2bool(item['enable'])))

        return ipv4_reservations

To this:

    def _get_ipv4_reservations(self) -> [IPv4Reservation]:
        ipv4_reservations = []
        data = self._get_data('admin/dhcps?form=reservation&operation=load')

        for item in data:
            ipv4_reservations.append(
                IPv4Reservation(macaddress.EUI48(item['mac']), ipaddress.IPv4Address(item['ip']), item['comment'],
                                self._str2bool(item['enable'])))

        return ipv4_reservations

function get_ipv4_dhcp_leases has the same problem.
I changed this:

    def _get_ipv4_dhcp_leases(self) -> [IPv4DHCPLease]:
        dhcp_leases = []
        data = self._get_data('admin/dhcps?form=client', 'operation=load')

        for item in data:
            dhcp_leases.append(
                IPv4DHCPLease(macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']), item['name'],
                              item['leasetime']))

        return dhcp_leases

To this:

    def _get_ipv4_dhcp_leases(self) -> [IPv4DHCPLease]:
        dhcp_leases = []
        data = self._get_data('admin/dhcps?form=client&operation=load')

        for item in data:
            dhcp_leases.append(
                IPv4DHCPLease(macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']), item['name'],
                              item['leasetime']))

        return dhcp_leases

The query function works as expected.

The get_full_info function also works as expected.

Here is the complete edited client.py file:

import hashlib
import re
from collections.abc import Callable
import json
import requests
import macaddress
import ipaddress
from logging import Logger
from tplinkrouterc6u.encryption import EncryptionWrapper
from tplinkrouterc6u.enum import Wifi
from tplinkrouterc6u.dataclass import Firmware, Status, Device, IPv4Reservation, IPv4DHCPLease, IPv4Status


class TplinkRouter:
    def __init__(self, host: str, password: str, username: str = 'admin', logger: Logger = None,
                 verify_ssl: bool = True, timeout: int = 10) -> None:
        self.host = host
        if not (self.host.startswith('http://') or self.host.startswith('https://')):
            self.host = "http://{}".format(self.host)
        self._verify_ssl = verify_ssl
        if self._verify_ssl is False:
            requests.packages.urllib3.disable_warnings()
        self.username = username
        self.password = password
        self.timeout = timeout
        self.single_request_mode = True
        self._logger = logger

        self._stok = ''
        self._sysauth = ''

        self._logged = False
        self._seq = ''
        self._hash = hashlib.md5((self.username + self.password).encode()).hexdigest()

        self.nn = ''
        self.ee = ''

        self._pwdNN = ''
        self._pwdEE = ''

        self._encryption = EncryptionWrapper()

    def get_firmware(self) -> Firmware | None:
        return self._request(self._get_firmware)

    def get_status(self) -> Status | None:
        return self._request(self._get_status)

    def get_ipv4_status(self) -> IPv4Status | None:
        return self._request(self._get_ipv4_status)

    def get_ipv4_reservations(self) -> [IPv4Reservation]:
        return self._request(self._get_ipv4_reservations)

    def get_ipv4_dhcp_leases(self) -> [IPv4DHCPLease]:
        return self._request(self._get_ipv4_dhcp_leases)

    def query(self, query, operation='operation=read'):
        def callback():
            return self._get_data(query, operation)

        return self._request(callback)

    def get_full_info(self) -> tuple[Firmware, Status] | None:
        def callback():
            firmware = self._get_firmware()
            status = self._get_status()

            return firmware, status

        return self._request(callback)

    def set_wifi(self, wifi: Wifi, enable: bool) -> None:
        def callback():
            path = f"admin/wireless?&form=guest&form={wifi.value}"
            data = f"operation=write&{wifi.value}_enable={'on' if enable else 'off'}"
            self._send_data(path, data)

        self._request(callback)

    def reboot(self) -> None:
        def callback():
            self._send_data('admin/system?form=reboot', 'operation=write')

        self._request(callback)

    def authorize(self) -> bool:
        referer = '{}/webpages/login.html?t=1596185370610'.format(self.host)

        #if self._pwdNN == '':
        #    self._request_pwd(referer)

        #if self._seq == '':
        #    self._request_seq(referer)

        response = self._try_login(referer)

        #if 'text/plain' in response.headers.get('Content-Type'):
        #    self._request_pwd(referer)
        #    self._request_seq(referer)
        #    response = self._try_login(referer)

        try:
            #jsonData = response.json()

            #if 'data' not in jsonData or not jsonData['data']:
            #    raise Exception('No data in response: ' + response.text)

            #encryptedResponseData = jsonData['data']
            #responseData = self._encryption.aes_decrypt(encryptedResponseData)

            #responseDict = json.loads(responseData)

            #if 'success' not in responseDict or not responseDict['success']:
            #    raise Exception('No data in response: ' + responseData)

            self._stok = response.json().get('data').get('stok')
            regex_result = re.search('sysauth=(.*);', response.headers['set-cookie'])
            self._sysauth = regex_result.group(1)
            self._logged = True
            return True
        except (ValueError, KeyError, AttributeError) as e:
            if self._logger:
                self._logger.error("TplinkRouter Integration Exception - Couldn't fetch auth tokens! Response was: %s",
                                   response.text)

        return False

    def logout(self) -> None:
        if self._logged:
            self._send_data('admin/system?form=logout', 'operation=write')
        self.clear()

    def clear(self) -> None:
        self._stok = ''
        self._sysauth = ''
        self._logged = False

    def _get_firmware(self) -> Firmware:
        data = self._get_data('admin/firmware?form=upgrade&operation=read')
        firmware = Firmware(data.get('hardware_version', ''), data.get('model', ''), data.get('firmware_version', ''))

        return firmware

    def _get_status(self) -> Status:

        def _calc_cpu_usage(data: dict) -> float | None:
            cpu_usage = (data.get('cpu_usage', 0) + data.get('cpu1_usage', 0)
                         + data.get('cpu2_usage', 0) + data.get('cpu3_usage', 0))
            return cpu_usage / 4 if cpu_usage != 0 else None

        data = self._get_data('admin/status?form=all', 'operation=read')
        status = Status()
        status.devices = []
        status._wan_macaddr = macaddress.EUI48(data['wan_macaddr']) if 'wan_macaddr' in data else None
        status._lan_macaddr = macaddress.EUI48(data['lan_macaddr'])
        status._wan_ipv4_addr = ipaddress.IPv4Address(data['wan_ipv4_ipaddr']) if 'wan_ipv4_ipaddr' in data else None
        status._lan_ipv4_addr = ipaddress.IPv4Address(data['lan_ipv4_ipaddr']) if 'lan_ipv4_ipaddr' in data else None
        status._wan_ipv4_gateway = ipaddress.IPv4Address(
            data['wan_ipv4_gateway']) if 'wan_ipv4_gateway' in data else None
        status.wan_ipv4_uptime = data.get('wan_ipv4_uptime')
        status.mem_usage = data.get('mem_usage')
        status.cpu_usage = _calc_cpu_usage(data)
        status.wired_total = len(data.get('access_devices_wired', []))
        status.wifi_clients_total = len(data.get('access_devices_wireless_host', []))
        status.guest_clients_total = len(data.get('access_devices_wireless_guest', []))
        status.clients_total = status.wired_total + status.wifi_clients_total + status.guest_clients_total
        status.guest_2g_enable = data.get('guest_2g_enable') == 'on'
        status.guest_5g_enable = data.get('guest_5g_enable') == 'on'
        status.iot_2g_enable = data.get('iot_2g_enable') == 'on' if data.get('iot_2g_enable') is not None else None
        status.iot_5g_enable = data.get('iot_5g_enable') == 'on' if data.get('iot_5g_enable') is not None else None
        status.wifi_2g_enable = data.get('wireless_2g_enable') == 'on'
        status.wifi_5g_enable = data.get('wireless_5g_enable') == 'on'

        for item in data.get('access_devices_wireless_host', []):
            type = Wifi.WIFI_2G if '2.4G' == item['wire_type'] else Wifi.WIFI_5G
            status.devices.append(Device(type, macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']),
                                         item['hostname']))

        for item in data.get('access_devices_wireless_guest', []):
            type = Wifi.WIFI_GUEST_2G if '2.4G' == item['wire_type'] else Wifi.WIFI_GUEST_5G
            status.devices.append(Device(type, macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']),
                                         item['hostname']))

        return status

    def _get_ipv4_status(self) -> IPv4Status:
        ipv4_status = IPv4Status()
        data = self._get_data('admin/network?form=status_ipv4', 'operation=read')
        ipv4_status._wan_macaddr = macaddress.EUI48(data['wan_macaddr'])
        ipv4_status._wan_ipv4_ipaddr = ipaddress.IPv4Address(data['wan_ipv4_ipaddr'])
        ipv4_status._wan_ipv4_gateway = ipaddress.IPv4Address(data['wan_ipv4_gateway'])
        ipv4_status.wan_ipv4_conntype = data['wan_ipv4_conntype']
        ipv4_status._wan_ipv4_netmask = ipaddress.IPv4Address(data['wan_ipv4_netmask'])
        ipv4_status._wan_ipv4_pridns = ipaddress.IPv4Address(data['wan_ipv4_pridns'])
        ipv4_status._wan_ipv4_snddns = ipaddress.IPv4Address(data['wan_ipv4_snddns'])
        ipv4_status._lan_macaddr = macaddress.EUI48(data['lan_macaddr'])
        ipv4_status._lan_ipv4_ipaddr = ipaddress.IPv4Address(data['lan_ipv4_ipaddr'])
        ipv4_status.lan_ipv4_dhcp_enable = self._str2bool(data['lan_ipv4_dhcp_enable'])
        ipv4_status._lan_ipv4_netmask = ipaddress.IPv4Address(data['lan_ipv4_netmask'])
        #ipv4_status.remote = self._str2bool(data['remote'])

        return ipv4_status

    def _get_ipv4_reservations(self) -> [IPv4Reservation]:
        ipv4_reservations = []
        data = self._get_data('admin/dhcps?form=reservation&operation=load')

        for item in data:
            ipv4_reservations.append(
                IPv4Reservation(macaddress.EUI48(item['mac']), ipaddress.IPv4Address(item['ip']), item['comment'],
                                self._str2bool(item['enable'])))

        return ipv4_reservations

    def _get_ipv4_dhcp_leases(self) -> [IPv4DHCPLease]:
        dhcp_leases = []
        data = self._get_data('admin/dhcps?form=client&operation=load')

        for item in data:
            dhcp_leases.append(
                IPv4DHCPLease(macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']), item['name'],
                              item['leasetime']))

        return dhcp_leases

    def _query(self, query, operation):
        data = self._get_data(query, operation)

        # for item in data:
        #    dhcp_leases.append(IPv4DHCPLease(macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']), item['name'], item['leasetime']))

        return data

    # TODO
    #        data2 = self._get_data('admin/dhcps?form=setting', 'operation=read')

    def _str2bool(self, v):
        return str(v).lower() in ("yes", "true", "on")

    def _request_pwd(self, referer: str) -> None:
        url = '{}/cgi-bin/luci/;stok=/login?form=keys'.format(self.host)

        # If possible implement RSA encryption of password here.
        response = requests.post(
            url, params={'operation': 'read'},
            headers={'Referer': referer},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

        try:
            data = response.json()

            args = data['data']['password']

            self._pwdNN = args[0]
            self._pwdEE = args[1]
        except json.decoder.JSONDecodeError:
            if self._logger:
                self._logger.error('TplinkRouter Integration Exception - No pwd response - {}'.format(response.text))
            raise Exception('Unsupported router!')
        except Exception as error:
            raise Exception('Unknown error for pwd - {}; Response - {}'.format(error, response.text))

    def _request_seq(self, referer: str) -> None:
        url = '{}/cgi-bin/luci/;stok=/login?form=auth'.format(self.host)

        # If possible implement RSA encryption of password here.
        response = requests.post(
            url,
            params={'operation': 'read'},
            headers={'Referer': referer},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

        try:
            data = response.json()

            self._seq = data['data']['seq']
            args = data['data']['key']

            self.nn = args[0]
            self.ee = args[1]
        except json.decoder.JSONDecodeError:
            if self._logger:
                self._logger.error('TplinkRouter Integration Exception - No seq response - {}'.format(response.text))
            raise Exception('Unsupported router!')
        except Exception as error:
            raise Exception('Unknown error for seq - {}; Response - {}'.format(error, response.text))

    def _try_login(self, referer: str) -> requests.Response:
        url = '{}/cgi-bin/luci/;stok=/login?form=login'.format(self.host)

        #cryptedPwd = self._encryption.rsa_encrypt(self.password, self._pwdNN, self._pwdEE)
        #data = 'operation=login&password={}&confirm=true'.format(cryptedPwd)

        #body = self._prepare_data(data)

        return requests.post(
            url,
            params={'operation': 'login', 'username': 'admin', 'password': password
            headers={'Referer': referer, 'Content-Type': 'application/x-www-form-urlencoded'},
            timeout=self.timeout,
            #verify=self._verify_ssl,
        )

    def _prepare_data(self, data) -> dict:
        encrypted_data = self._encryption.aes_encrypt(data)
        data_len = len(encrypted_data)

        sign = self._encryption.get_signature(int(self._seq) + data_len, self._logged == False, self._hash, self.nn,
                                              self.ee)

        return {'sign': sign, 'data': encrypted_data}

    def _request(self, callback: Callable):
        if not self.single_request_mode:
            return callback()

        try:
            if self.authorize():
                data = callback()
                self.logout()
                return data
        except Exception as error:
            self._seq = ''
            self._pwdNN = ''
            if self._logger:
                self._logger.error('TplinkRouter Integration Exception - {}'.format(error))
        finally:
            self.clear()

    def _get_data(self, path: str, data: str = 'operation=read') -> dict | None:
        if self._logged is False:
            raise Exception('Not authorised')
        url = '{}/cgi-bin/luci/;stok={}/{}'.format(self.host, self._stok, path)
        referer = '{}/webpages/index.html'.format(self.host)

        response = requests.post(
            url,
            data=data,
            headers={'Referer': referer},
            cookies={'sysauth': self._sysauth},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

        data = response.text
        print(data)
        try:
            json_response = response.json()
            if 'data' not in json_response:
                raise Exception("Router didn't respond with JSON - " + data)
            #data = self._encryption.aes_decrypt(json_response['data'])

            json_response = json.loads(data)

            if 'success' in json_response and json_response['success']:
                return json_response['data']
            else:
                if 'errorcode' in json_response and json_response['errorcode'] == 'timeout':
                    if self._logger:
                        self._logger.info(
                            "TplinkRouter Integration Exception - Token timed out. Relogging on next scan")
                    self._stok = ''
                    self._sysauth = ''
                elif self._logger:
                    self._logger.error(
                        "TplinkRouter Integration Exception - An unknown error happened while fetching data %s", data)
        except ValueError:
            if self._logger:
                self._logger.error(
                    "TplinkRouter Integration Exception - Router didn't respond with JSON. Check if credentials are correct")

        raise Exception('An unknown response - ' + data)

    def _send_data(self, path: str, data: str) -> None:
        if self._logged is False:
            raise Exception('Not authorised')
        url = '{}/cgi-bin/luci/;stok={}/{}'.format(self.host, self._stok, path)
        referer = '{}/webpages/index.1596185370610.html'.format(self.host)

        body = data
        requests.post(
            url,
            data=body,
            headers={'Referer': referer, 'Content-Type': 'application/x-www-form-urlencoded'},
            cookies={'sysauth': self._sysauth},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

Another thing to note is that i changed the _try_login function.
I commanded out a few lines that are not needed for the login to this router.
And instead of the password being the normal password it is i think a hashed password.
To get this password I logged in via the webinterface with developers tools open on the network tab.
And i got the password located in the payload off login?form=login

    def _try_login(self, referer: str) -> requests.Response:
        url = '{}/cgi-bin/luci/;stok=/login?form=login'.format(self.host)

        #cryptedPwd = self._encryption.rsa_encrypt(self.password, self._pwdNN, self._pwdEE)
        #data = 'operation=login&password={}&confirm=true'.format(cryptedPwd)

        #body = self._prepare_data(data)

        return requests.post(
            url,
            params={'operation': 'login', 'username': 'admin', 'password': 'web password here'},
            headers={'Referer': referer, 'Content-Type': 'application/x-www-form-urlencoded'},
            timeout=self.timeout,
            #verify=self._verify_ssl,
        )

from tp-link-archer-c6u.

AlexandrErohin avatar AlexandrErohin commented on June 29, 2024

Wow. Thank you very much for your work!
It would be great to see you as a contributor to this repo! Could you make a PR with adding your edited code as a new class TplinkC1200Router to the end of file client.py ?
Then I try to combine them

from tp-link-archer-c6u.

swwgames avatar swwgames commented on June 29, 2024

I made the pull request, I hope you can merge it. If you find any problems in my code please let me know.

I can be a contributer, but I don't now how active I will be. But i will always be open for testing.

from tp-link-archer-c6u.

AlexandrErohin avatar AlexandrErohin commented on June 29, 2024

@swwgames Thank you very much! I have merged you PR and created TplinkRouterProvider which automatically get you the right client
Could you download my updates and run test.py again?
Would be great if you review my changes and let me know if find something to fix or change

from tp-link-archer-c6u.

swwgames avatar swwgames commented on June 29, 2024

I tested the new code with test.py I found that the login part worked great.
I also checked the output of the script, I missed some data, after some testing i figured out that the query send to the router is not correct. the "&operation=read" or "&operation=load" part is not part of the url. i modified the test.py file like this:

with open('C:/Users/Sam/Documents/Code/Tplink/TP-Link-Archer-C6U-main/queries.txt') as queries:
    for query in queries:
        query = query.strip()
        if query.startswith('#'):
            continue
        query = '{}&operation=read'.format(query)
        try:
            data = router.query(query)
            print(query)
            tokens = query.split('?')
            folder = "logs" + os.sep + tokens[0]
            Path(folder).mkdir(parents=True, exist_ok=True)
            with open(folder + os.sep + f"{tokens[1]}.log", "w") as log_file:
                pp = pprint.PrettyPrinter(indent=4, stream=log_file)
                pp.pprint(data)
        except Exception as ex:
            print(f"{query} exception {ex}")
            router = TplinkRouter('http://192.168.0.1', password, timeout=10)
        finally:
            pass

Also this router has no username, so that part of the code is not neccesary.

I also tested my router in AP mode (wireless accespoint) there are some differences in the data they give, but nothing major.

I hope I provided you with the right data, let me know if you need something from me.
TplinkC1200Test.zip

from tp-link-archer-c6u.

AlexandrErohin avatar AlexandrErohin commented on June 29, 2024

Thank you very much for help!

from tp-link-archer-c6u.

AlexandrErohin avatar AlexandrErohin commented on June 29, 2024

@swwgames Could you download my updates and run test.py again? I have simplified the client

from tp-link-archer-c6u.

swwgames avatar swwgames commented on June 29, 2024

I tested your update, and it works. The only thing is that the test file doesn't add the operation mode so some data is missing.
logs.zip

from tp-link-archer-c6u.

AlexandrErohin avatar AlexandrErohin commented on June 29, 2024

@swwgames Thank you!

from tp-link-archer-c6u.

Related Issues (12)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.