Source code for pyminitouch.connection

import subprocess
import socket
import time
import os
import random
from contextlib import contextmanager

from pyminitouch.logger import logger
from pyminitouch import config
from pyminitouch.utils import (
    str2byte,
    download_file,
    is_port_using,
    is_device_connected,
)

_ADB = config.ADB_EXECUTOR


class MNTInstaller(object):
    """ install minitouch for android devices """

    def __init__(self, device_id):
        self.device_id = device_id
        self.abi = self.get_abi()
        if self.is_mnt_existed():
            logger.info("minitouch already existed in {}".format(device_id))
        else:
            self.download_target_mnt()

    def get_abi(self):
        abi = subprocess.getoutput(
            "{} -s {} shell getprop ro.product.cpu.abi".format(_ADB, self.device_id)
        )
        logger.info("device {} is {}".format(self.device_id, abi))
        return abi

    def download_target_mnt(self):
        abi = self.get_abi()
        target_url = "{}/{}/bin/minitouch".format(config.MNT_PREBUILT_URL, abi)
        logger.info("target minitouch url: " + target_url)
        mnt_path = download_file(target_url)

        # push and grant
        subprocess.check_call(
            [_ADB, "-s", self.device_id, "push", mnt_path, config.MNT_HOME]
        )
        subprocess.check_call(
            [_ADB, "-s", self.device_id, "shell", "chmod", "777", config.MNT_HOME]
        )
        logger.info("minitouch already installed in {}".format(config.MNT_HOME))

        # remove temp
        os.remove(mnt_path)

    def is_mnt_existed(self):
        file_list = subprocess.check_output(
            [_ADB, "-s", self.device_id, "shell", "ls", "/data/local/tmp"]
        )
        return "minitouch" in file_list.decode(config.DEFAULT_CHARSET)


class MNTServer(object):
    """
    manage connection to minitouch.
    before connection, you should execute minitouch with adb shell.

    command eg::

        adb forward tcp:{some_port} localabstract:minitouch
        adb shell /data/local/tmp/minitouch

    you would better use it via safe_connection ::

        _DEVICE_ID = '123456F'

        with safe_connection(_DEVICE_ID) as conn:
            conn.send('d 0 500 500 50\nc\nd 1 500 600 50\nw 5000\nc\nu 0\nu 1\nc\n')
    """

    _PORT_SET = config.PORT_SET

    def __init__(self, device_id):
        assert is_device_connected(device_id)

        self.device_id = device_id
        logger.info("searching a usable port ...")
        self.port = self._get_port()
        logger.info("device {} bind to port {}".format(device_id, self.port))

        # check minitouch
        self.installer = MNTInstaller(device_id)

        # keep minitouch alive
        self._forward_port()
        self.mnt_process = None
        self._start_mnt()

        # make sure it's up
        time.sleep(1)
        assert (
            self.heartbeat()
        ), "minitouch did not work. see https://github.com/williamfzc/pyminitouch/issues/11"

    def stop(self):
        self.mnt_process and self.mnt_process.kill()
        self._PORT_SET.add(self.port)
        logger.info("device {} unbind to {}".format(self.device_id, self.port))

    @classmethod
    def _get_port(cls):
        """ get a random port from port set """
        new_port = random.choice(list(cls._PORT_SET))
        if is_port_using(new_port):
            return cls._get_port()
        return new_port

    def _forward_port(self):
        """ allow pc access minitouch with port """
        command_list = [
            _ADB,
            "-s",
            self.device_id,
            "forward",
            "tcp:{}".format(self.port),
            "localabstract:minitouch",
        ]
        logger.debug("forward command: {}".format(" ".join(command_list)))
        output = subprocess.check_output(command_list)
        logger.debug("output: {}".format(output))

    def _start_mnt(self):
        """ fork a process to start minitouch on android """
        command_list = [
            _ADB,
            "-s",
            self.device_id,
            "shell",
            "/data/local/tmp/minitouch",
        ]
        logger.info("start minitouch: {}".format(" ".join(command_list)))
        self.mnt_process = subprocess.Popen(command_list, stdout=subprocess.DEVNULL)

    def heartbeat(self):
        """ check if minitouch process alive """
        return self.mnt_process.poll() is None


[docs]class MNTConnection(object): """ manage socket connection between pc and android """ _DEFAULT_HOST = config.DEFAULT_HOST _DEFAULT_BUFFER_SIZE = config.DEFAULT_BUFFER_SIZE def __init__(self, port): self.port = port # build connection client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((self._DEFAULT_HOST, self.port)) self.client = client # get minitouch server info socket_out = client.makefile() # v <version> # protocol version, usually it is 1. needn't use this socket_out.readline() # ^ <max-contacts> <max-x> <max-y> <max-pressure> _, max_contacts, max_x, max_y, max_pressure, *_ = ( socket_out.readline().replace("\n", "").replace("\r", "").split(" ") ) self.max_contacts = max_contacts self.max_x = max_x self.max_y = max_y self.max_pressure = max_pressure # $ <pid> _, pid = socket_out.readline().replace("\n", "").replace("\r", "").split(" ") self.pid = pid logger.info( "minitouch running on port: {}, pid: {}".format(self.port, self.pid) ) logger.info( "max_contact: {}; max_x: {}; max_y: {}; max_pressure: {}".format( max_contacts, max_x, max_y, max_pressure ) )
[docs] def disconnect(self): self.client and self.client.close() self.client = None logger.info("minitouch disconnected")
[docs] def send(self, content): """ send message and get its response """ byte_content = str2byte(content) self.client.sendall(byte_content) return self.client.recv(self._DEFAULT_BUFFER_SIZE)
@contextmanager def safe_connection(device_id): """ safe connection runtime to use """ # prepare for connection server = MNTServer(device_id) # real connection connection = MNTConnection(server.port) try: yield connection finally: # disconnect connection.disconnect() server.stop() if __name__ == "__main__": _DEVICE_ID = "123456F" with safe_connection(_DEVICE_ID) as conn: # conn.send('d 0 150 150 50\nc\nu 0\nc\n') conn.send("d 0 500 500 50\nc\nd 1 500 600 50\nw 5000\nc\nu 0\nu 1\nc\n")