[87e8bf]: / docs / source / tutorials / 6_Inverse_Dynamics.ipynb

Download this file

525 lines (524 with data), 21.8 kB

{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "6YC339udeSMh"
      },
      "source": [
        "# Imports"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "HnDtiLaLUDOQ"
      },
      "outputs": [],
      "source": [
        "from myosuite.simhive.myo_sim.test_sims import TestSims as loader\n",
        "from IPython.display import HTML\n",
        "import matplotlib.pyplot as plt\n",
        "from base64 import b64encode\n",
        "import scipy.sparse as spa\n",
        "from copy import deepcopy\n",
        "from tqdm import tqdm\n",
        "import pandas as pd\n",
        "import numpy as np\n",
        "import skvideo.io\n",
        "import mujoco\n",
        "import osqp\n",
        "import os"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Utils functions"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "def show_video(video_path, video_width = 400):\n",
        "    \"\"\"\n",
        "    Display a video within the notebook.\n",
        "    \"\"\"\n",
        "    video_file = open(video_path, \"r+b\").read()\n",
        "    video_url = f\"data:video/mp4;base64,{b64encode(video_file).decode()}\"\n",
        "    return HTML(f\"\"\"<video autoplay width={video_width} controls><source src=\"{video_url}\"></video>\"\"\")\n",
        "\n",
        "def solve_qp(P, q, lb, ub, x0):\n",
        "    \"\"\"\n",
        "    Solve a quadratic program.\n",
        "    \"\"\"\n",
        "    P = spa.csc_matrix(P)\n",
        "    A = spa.csc_matrix(spa.eye(q.shape[0]))\n",
        "    m = osqp.OSQP()\n",
        "    m.setup(P=P, q=q, A=A, l=lb, u=ub, verbose=False)\n",
        "    m.warm_start(x=x0)\n",
        "    res = m.solve()\n",
        "    return res.x\n",
        "\n",
        "def plot_qxxx(qxxx, joint_names, labels):\n",
        "    \"\"\"\n",
        "    Plot generalized variables to be compared.\n",
        "    qxxx[:,0,-1] = time axis\n",
        "    qxxx[:,1:,n] = n-th sequence\n",
        "    qxxx[:,1:,-1] = reference sequence\n",
        "    \"\"\"\n",
        "    fig, axs = plt.subplots(4, 6, figsize=(12, 8))\n",
        "    axs = axs.flatten()\n",
        "    line_objects = []\n",
        "    linestyle = ['-'] * qxxx.shape[2]\n",
        "    linestyle[-1] = '--'\n",
        "    for j in range(1, len(joint_names)+1):\n",
        "        ax = axs[j-1]\n",
        "        for i in range(qxxx.shape[2]):\n",
        "            line, = ax.plot(qxxx[:, 0, -1], qxxx[:, j, i], linestyle[i])\n",
        "            if j == 1: # add only one set of lines to the legend\n",
        "                line_objects.append(line)\n",
        "        ax.set_xlim([qxxx[:, 0].min(), qxxx[:, 0].max()])\n",
        "        ax.set_ylim([qxxx[:, 1:, :].min(), qxxx[:, 1:, :].max()])\n",
        "        ax.set_title(joint_names[j-1])\n",
        "    legend_ax = axs[len(joint_names)] # create legend in the 24th subplot area\n",
        "    legend_ax.axis('off')\n",
        "    legend_ax.legend(line_objects, labels, loc='center')\n",
        "    plt.tight_layout()\n",
        "    plt.show()\n",
        "\n",
        "def plot_uxxx(uxxx, muscle_names, labels):\n",
        "    \"\"\"\n",
        "    Plot actuator variables to be compared.\n",
        "    uxxx[:,0,-1] = time axis\n",
        "    uxxx[:,1:,n] = n-th sequence\n",
        "    \"\"\"\n",
        "    fig, axs = plt.subplots(5, 8, figsize=(12, 8))\n",
        "    axs = axs.flatten()\n",
        "    line_objects = []\n",
        "    for j in range(1, len(muscle_names)+1):\n",
        "        ax = axs[j-1]\n",
        "        for i in range(uxxx.shape[2]):\n",
        "            line, = ax.plot(uxxx[:, 0, -1], uxxx[:, j, i])\n",
        "            if j == 1: # add only one set of lines to the legend\n",
        "                line_objects.append(line)\n",
        "        ax.set_xlim([uxxx[:, 0].min(), uxxx[:, 0].max()])\n",
        "        ax.set_ylim([uxxx[:, 1:, :].min(), uxxx[:, 1:, :].max()])\n",
        "        ax.set_title(muscle_names[j-1])\n",
        "    legend_ax = axs[len(muscle_names)] # create legend in the 40th subplot area\n",
        "    legend_ax.axis('off')\n",
        "    legend_ax.legend(line_objects, labels, loc='center')\n",
        "    plt.tight_layout()\n",
        "    plt.show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "5rC2_iseUDOT"
      },
      "source": [
        "# Introduction\n",
        "In this tutorial a target trajectory will be replicated by MyoHand using MuJoCo inverse dynamics, i.e., given a sequence of joint angles *qpos*, a sequence of control *ctrl* will be generated."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "traj = pd.read_csv('data/6_trajectory.csv').values"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Computation of the generalized force\n",
        "The computation of *ctrl* is dependent on *qfrc*, which can be obtained using inverse dynamics. Disabling the constraint solver during this phase avoids simulation divergence."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "def get_qfrc(model, data, target_qpos):\n",
        "    \"\"\"\n",
        "    Compute the generalized force needed to reach the target position in the next mujoco step.\n",
        "    \"\"\"\n",
        "    data_copy = deepcopy(data)\n",
        "    data_copy.qacc = (((target_qpos - data.qpos) / model.opt.timestep) - data.qvel) / model.opt.timestep\n",
        "    model.opt.disableflags += mujoco.mjtDisableBit.mjDSBL_CONSTRAINT\n",
        "    mujoco.mj_inverse(model, data_copy)\n",
        "    model.opt.disableflags -= mujoco.mjtDisableBit.mjDSBL_CONSTRAINT\n",
        "    return data_copy.qfrc_inverse"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The effectiveness of the computed *qfrc* can be tested by applying it directly as shown below."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "model0 = loader.get_sim(None, 'hand/myohand.xml')\n",
        "data0 = mujoco.MjData(model0)\n",
        "qpos_eval = np.zeros((traj.shape[0], traj.shape[1], 2))\n",
        "qpos_eval[:,:,-1] = traj\n",
        "for idx in tqdm(range(traj.shape[0])):\n",
        "    target_qpos = traj[idx, 1:]\n",
        "    qfrc = get_qfrc(model0, data0, target_qpos)\n",
        "    data0.qfrc_applied = qfrc\n",
        "    mujoco.mj_step(model0, data0)\n",
        "    qpos_eval[idx,:,0] = np.hstack((data0.time, data0.qpos))\n",
        "error = ((qpos_eval[:,1:,0] - qpos_eval[:,1:,-1])**2).mean(axis=0)\n",
        "print(f'error max (rad): {error.max()}')\n",
        "joint_names = [model0.joint(i).name for i in range(model0.nq)]\n",
        "plot_qxxx(qpos_eval, joint_names, ['Achieved qpos', 'Reference qpos'])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The difference between the reference trajectory and the achieved one is practically zero. It was observed, however, that by scaling the computed *qfrc* it is possible to achieve an equally valid replication with a larger but still negligible error. Below are examples of the result that can be obtained by dividing the computed *qfrc* by 10, 100, and 1000. Among the three, using a scaler up to 100 allows good replication, while 1000 does not. The advantage in using lower *qfrc* is the easier solution of the optimization problem during the computation of *ctrl* in the next phase."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "all_qfrc_scaler = [10, 100, 1000]\n",
        "qpos_eval = np.zeros((traj.shape[0], traj.shape[1], len(all_qfrc_scaler)+1))\n",
        "qpos_eval[:,:,-1] = traj\n",
        "labels = []\n",
        "for i_scaler, scaler in enumerate(all_qfrc_scaler):\n",
        "    data0 = mujoco.MjData(model0)\n",
        "    for idx in tqdm(range(traj.shape[0])):\n",
        "        target_qpos = traj[idx, 1:]\n",
        "        qfrc = get_qfrc(model0, data0, target_qpos)\n",
        "        data0.qfrc_applied = qfrc/scaler\n",
        "        mujoco.mj_step(model0, data0)\n",
        "        qpos_eval[idx,:,i_scaler] = np.hstack((data0.time, data0.qpos))\n",
        "    error = ((qpos_eval[:,1:,i_scaler] - qpos_eval[:,1:,-1])**2).mean(axis=0)\n",
        "    print(f'qfrc scaler: {scaler} - error max (rad): {error.max()}')\n",
        "    labels.append(f'Achieved qpos\\nscaler:{scaler}')\n",
        "labels.append('Reference qpos')\n",
        "plot_qxxx(qpos_eval, joint_names, labels)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Compuation of the control\n",
        "The following describes how the ***get_ctrl*** function works.\n",
        "The equation to solve for *ctrl*, accordingly to muscle actuators dynamics (https://mujoco.readthedocs.io/en/latest/modeling.html#muscles), is:\n",
        "$$\n",
        "AM\\cdot\\left(gain\\odot\\left(act+timestep\\cdot\\frac{ctrl-act}{\\tau}\\right)+bias\\right)-qfrc=0\\tag{1}\n",
        "$$\n",
        "where\n",
        "$$\n",
        "\\tau=\\tau_D+(\\tau_A-\\tau_D)\\cdot sigmoid\\left(\\frac{ctrl-act}{tausmooth}+0.5\\right)\n",
        "$$\n",
        "To find a solution quickly, it is better to reformulate the equation and solve a quadratic program (QP), i.e.:\n",
        "$$\n",
        "\\min_{x} \\frac{1}{2}x^TPx+q^Tx~~~s.t.~~~lb\\leq x\\leq ub\\tag{2}\n",
        "$$\n",
        "The major obstacle to this formulation is the sigmoid in the calculation of $\\tau$. MuJoCo implements the sigmoid using the polynomial $6x^5-15x^4+10x^3$ clipped between 0 and 1. To solve the QP, the sigmoid is approximated here with $1.875x-0.4375$ and, to limit its range for a good approximation, *tausmooth* is set to 5. The target equation can then be rewritten as:\n",
        "$$\n",
        "AM\\cdot\\left(gain\\odot\\left(act+timestep\\cdot\\frac{ctrl-act}{(ctrl-act)\\cdot \\tau_1+\\tau_2}\\right)+bias\\right)-qfrc=0\\tag{3}\n",
        "$$\n",
        "where\n",
        "$$\n",
        "\\tau_1=\\frac{\\tau_A-\\tau_D}{tausmooth}\\cdot 1.875,~~~\\tau_2=(\\tau_A+\\tau_D)\\cdot 0.5\n",
        "$$\n",
        "and consequently reformulated as:\n",
        "$$\n",
        "AM\\cdot x+k=0\\tag{4}\n",
        "$$\n",
        "where\n",
        "$$\n",
        "x=\\left(timestep\\cdot gain\\odot\\frac{ctrl-act}{(ctrl-act)\\cdot \\tau_1+\\tau_2}\\right),~~~k=AM\\cdot(gain\\odot act)+AM\\cdot bias-qfrc\n",
        "$$\n",
        "Referring to equation $(2)$ then:\n",
        "$$\n",
        "P=2\\cdot AM^T\\cdot AM\\tag{5}\n",
        "$$\n",
        "$$\n",
        "q=2\\cdot AM^T\\cdot k\\tag{6}\n",
        "$$\n",
        "$$\n",
        "lb=timestep\\cdot gain\\odot\\frac{1-act}{(1-act)\\cdot \\tau_1 + \\tau_2}\\tag{7}\n",
        "$$\n",
        "$$\n",
        "ub=timestep\\cdot gain\\odot\\frac{-act}{-act\\cdot \\tau_1 + \\tau_2}\\tag{8}\n",
        "$$\n",
        "After solving the QP for *x*, *ctrl* is then calculated as:\n",
        "$$\n",
        "ctrl = act + \\frac{x\\cdot\\tau_2}{timestep\\cdot gain-x\\cdot\\tau_1}\\tag{9}\n",
        "$$"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "gqTH6NTJUDOV"
      },
      "outputs": [],
      "source": [
        "def get_ctrl(model, data, target_qpos, qfrc, qfrc_scaler, qvel_scaler):\n",
        "    \"\"\"\n",
        "    Compute the control needed to reach the target position in the next mujoco step.\n",
        "    qfrc: generalized force resulting from inverse dynamics.\n",
        "    \"\"\"\n",
        "    act = data.act\n",
        "    ctrl0 = data.ctrl\n",
        "    ts = model.opt.timestep\n",
        "    tA = model.actuator_dynprm[:,0] * (0.5 + 1.5 * act)\n",
        "    tD = model.actuator_dynprm[:,1] / (0.5 + 1.5 * act)\n",
        "    tausmooth = model.actuator_dynprm[:,2]\n",
        "    t1 = (tA - tD) * 1.875 / tausmooth\n",
        "    t2 = (tA + tD) * 0.5\n",
        "    # ---- gain, bias, and moment computation\n",
        "    data_copy = deepcopy(data)\n",
        "    data_copy.qpos = target_qpos\n",
        "    data_copy.qvel = ((target_qpos - data.qpos) / model.opt.timestep) / qvel_scaler\n",
        "    mujoco.mj_step1(model, data_copy) # gain, bias, and moment depend on qpos and qvel\n",
        "    gain = np.zeros(model.nu)\n",
        "    bias = np.zeros(model.nu)\n",
        "    for idx_actuator in range(model.nu):\n",
        "        length = data_copy.actuator_length[idx_actuator]\n",
        "        lengthrange = model.actuator_lengthrange[idx_actuator]\n",
        "        velocity = data_copy.actuator_velocity[idx_actuator]\n",
        "        acc0 = model.actuator_acc0[idx_actuator]\n",
        "        prmb = model.actuator_biasprm[idx_actuator,:9]\n",
        "        prmg = model.actuator_gainprm[idx_actuator,:9]\n",
        "        bias[idx_actuator] = mujoco.mju_muscleBias(length, lengthrange, acc0, prmb)\n",
        "        gain[idx_actuator] = min(-1, mujoco.mju_muscleGain(length, velocity, lengthrange, acc0, prmg))\n",
        "    AM = data_copy.actuator_moment.T\n",
        "    # ---- ctrl computation\n",
        "    P = 2 * AM.T @ AM\n",
        "    k = AM @ (gain * act) + AM @ bias - (qfrc / qfrc_scaler)\n",
        "    q = 2 * k @ AM\n",
        "    lb = gain * (1 - act) * ts / (t2 + t1 * (1 - act))\n",
        "    ub = - gain * act * ts / (t2 - t1 * act)\n",
        "    x0 = (gain * (ctrl0 - act) * ts) / ((ctrl0 - act) * t1 + t2)\n",
        "    x = solve_qp(P, q, lb, ub, x0)\n",
        "    ctrl = act + x * t2 / (gain * ts - x * t1)\n",
        "    return np.clip(ctrl,0,1)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "1RVZF2gYW5DN"
      },
      "source": [
        "The *ctrl* sequence to be applied to MyoHand in order to replicate the reference trajectory can thus be achieved."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "mHhY8syiUDOW"
      },
      "outputs": [],
      "source": [
        "model1 = loader.get_sim(None, 'hand/myohand.xml')\n",
        "tausmooth = 5\n",
        "model1.actuator_dynprm[:,2] = tausmooth\n",
        "data1 = mujoco.MjData(model1)\n",
        "all_ctrl = np.zeros((traj.shape[0], 1+model1.nu))\n",
        "for idx in tqdm(range(traj.shape[0])):\n",
        "    target_qpos = traj[idx, 1:]\n",
        "    qfrc = get_qfrc(model1, data1, target_qpos)\n",
        "    ctrl = get_ctrl(model1, data1, target_qpos, qfrc, 100, 5)\n",
        "    data1.ctrl = ctrl\n",
        "    mujoco.mj_step(model1, data1)\n",
        "    all_ctrl[idx,:] = np.hstack((data1.time, ctrl))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "The use of a new scaler is also included in this phase. Indeed, it was observed that by reducing the velocity set for *gain* computation, the obtained *ctrl* is more stable. Below is an example to compare the results using a scaler equal to 5."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "all_qvel_scaler = [1, 5]\n",
        "qpos_eval = np.zeros((traj.shape[0], traj.shape[1], len(all_qvel_scaler)+1))\n",
        "qpos_eval[:,:,-1] = traj\n",
        "labels_qxxx = []\n",
        "ctrl_eval = np.zeros((traj.shape[0], 1+model1.nu, len(all_qvel_scaler)))\n",
        "labels_uxxx = []\n",
        "for i_scaler, scaler in enumerate(all_qvel_scaler):\n",
        "    data1 = mujoco.MjData(model1)\n",
        "    for idx in tqdm(range(traj.shape[0])):\n",
        "        target_qpos = traj[idx, 1:]\n",
        "        qfrc = get_qfrc(model1, data1, target_qpos)\n",
        "        ctrl = get_ctrl(model1, data1, target_qpos, qfrc, 100, scaler)\n",
        "        data1.ctrl = ctrl\n",
        "        mujoco.mj_step(model1, data1)\n",
        "        qpos_eval[idx,:,i_scaler] = np.hstack((data0.time, data1.qpos))\n",
        "        ctrl_eval[idx,:,i_scaler] = np.hstack((data1.time, ctrl))\n",
        "    error = ((qpos_eval[:,1:,i_scaler] - qpos_eval[:,1:,-1])**2).mean(axis=0)\n",
        "    print(f'qvel scaler: {scaler} - error max (rad): {error.max()}')\n",
        "    labels_qxxx.append(f'Achieved qpos\\nscaler:{scaler}')\n",
        "    labels_uxxx.append(f'Achieved ctrl\\nscaler:{scaler}')\n",
        "labels_qxxx.append('Reference qpos')\n",
        "joint_names = [model1.joint(i).name for i in range(model0.nq)]\n",
        "plot_qxxx(qpos_eval, joint_names, labels_qxxx)\n",
        "muscle_names = [model1.actuator(i).name for i in range(model0.nu)]\n",
        "plot_uxxx(ctrl_eval, muscle_names, labels_uxxx)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "FPMdswPtUDOY"
      },
      "source": [
        "# Results\n",
        "The trajectory achievable by applying the *ctrl* sequence resulting from the previous block is compared below with the reference trajectory."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "gx541FM0UDOZ",
        "outputId": "76bd2952-763f-4df5-ced7-da1cb691d326"
      },
      "outputs": [],
      "source": [
        "# ---- initializations\n",
        "model_ref = loader.get_sim(None, 'hand/myohand.xml')\n",
        "model_ref.actuator_dynprm[:,2] = tausmooth\n",
        "data_ref = mujoco.MjData(model_ref) # data for reference trajectory\n",
        "model_test = loader.get_sim(None, 'hand/myohand.xml')\n",
        "model_test.actuator_dynprm[:,2] = tausmooth\n",
        "data_test = mujoco.MjData(model_test) # test data for achieved trajectory\n",
        "# ---- camera settings\n",
        "camera = mujoco.MjvCamera()\n",
        "camera.azimuth = 166.553\n",
        "camera.distance = 1.178\n",
        "camera.elevation = -36.793\n",
        "camera.lookat = np.array([-0.93762553, -0.34088276, 0.85067529])\n",
        "options_ref = mujoco.MjvOption()\n",
        "options_ref.flags[:] = 0\n",
        "options_ref.geomgroup[1:] = 0\n",
        "options_test = mujoco.MjvOption()\n",
        "options_test.flags[:] = 0\n",
        "options_test.flags[4] = 1 # actuator ON\n",
        "options_test.geomgroup[1:] = 0\n",
        "renderer_ref = mujoco.Renderer(model_ref)\n",
        "renderer_ref.scene.flags[:] = 0\n",
        "renderer_test = mujoco.Renderer(model_test)\n",
        "renderer_test.scene.flags[:] = 0\n",
        "# ---- generation loop\n",
        "frames = []\n",
        "for idx in tqdm(range(traj.shape[0])):\n",
        "    # -- reference trajectory\n",
        "    data_ref.qpos = traj[idx, 1:]\n",
        "    mujoco.mj_step1(model_ref, data_ref)\n",
        "    # -- achieved trajectory\n",
        "    data_test.ctrl = all_ctrl[idx, 1:]\n",
        "    mujoco.mj_step(model_test, data_test)\n",
        "    # -- frames generation\n",
        "    if not idx % round(0.3/(model_test.opt.timestep*25)):\n",
        "        renderer_ref.update_scene(data_ref, camera=camera, scene_option=options_ref)\n",
        "        frame_ref = renderer_ref.render()\n",
        "        renderer_test.update_scene(data_test, camera=camera, scene_option=options_test)\n",
        "        frame_test = renderer_test.render()\n",
        "        frame_merged = np.append(frame_ref, frame_test, axis=1)\n",
        "        frames.append(frame_merged)\n",
        "# -- frames writing\n",
        "os.makedirs('videos', exist_ok = True)\n",
        "output_name = 'videos/myohand_freemovement.mp4'\n",
        "skvideo.io.vwrite(output_name, np.asarray(frames),outputdict={\"-pix_fmt\": \"yuv420p\"})"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "background_save": true,
          "base_uri": "https://localhost:8080/",
          "height": 172
        },
        "id": "18T5eIELWL0D",
        "outputId": "d4d432a8-6bb8-491a-bf97-c315ba9514b6"
      },
      "outputs": [],
      "source": [
        "show_video(output_name)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "90P5pWpD0lYV"
      },
      "source": [
        "Left, reference trajectory. Right, achieved trajectory."
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [
        "aWZ_2IAdeMbW",
        "6YC339udeSMh"
      ],
      "provenance": []
    },
    "kernelspec": {
      "display_name": "myosuite_venv",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.8.7"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}