for Robot Artificial Inteligence

5. Motion Planning

|

  • to understand TF look at the my post

Motion Planning

Motion Planning

#!/usr/bin/env python

from copy import deepcopy
import math
import numpy
import random
from threading import Thread, Lock
import sys

import actionlib
import control_msgs.msg
import geometry_msgs.msg
import moveit_commander
import moveit_msgs.msg
import moveit_msgs.srv
import rospy
import sensor_msgs.msg
import tf
import trajectory_msgs.msg

def convert_to_message(T):
    t = geometry_msgs.msg.Pose()
    position = tf.transformations.translation_from_matrix(T)
    orientation = tf.transformations.quaternion_from_matrix(T)
    t.position.x = position[0]
    t.position.y = position[1]
    t.position.z = position[2]
    t.orientation.x = orientation[0]
    t.orientation.y = orientation[1]
    t.orientation.z = orientation[2]
    t.orientation.w = orientation[3]        
    return t

def convert_from_message(msg):
    R = tf.transformations.quaternion_matrix((msg.orientation.x,
                                              msg.orientation.y,
                                              msg.orientation.z,
                                              msg.orientation.w))
    T = tf.transformations.translation_matrix((msg.position.x,
                                               msg.position.y,
                                               msg.position.z))
    return numpy.dot(T,R)

def convert_from_trans_message(msg):
    R = tf.transformations.quaternion_matrix((msg.rotation.x,
                                              msg.rotation.y,
                                              msg.rotation.z,
                                              msg.rotation.w))
    T = tf.transformations.translation_matrix((msg.translation.x,
                                               msg.translation.y,
                                               msg.translation.z))
    return numpy.dot(T,R)

class Node(object):

    def __init__(self,config):
        # config list
        self.config = config
        self.parent = None

    def SetParent(self,parent):
        # Parent Node type
        self.parent = parent

    def GetParent(self):
        return self.parent

    def GetConfig(self):
        return self.config

class MoveArm(object):

    def __init__(self):
        print "Motion Planning Initializing..."
        # Prepare the mutex for synchronization
        self.mutex = Lock()

        # Some info and conventions about the robot that we hard-code in here
        # min and max joint values are not read in Python urdf, so we must hard-code them here
        self.num_joints = 7
        self.q_min = []
        self.q_max = []
        self.q_min.append(-3.1459);self.q_max.append(3.1459)
        self.q_min.append(-3.1459);self.q_max.append(3.1459)
        self.q_min.append(-3.1459);self.q_max.append(3.1459)
        self.q_min.append(-3.1459);self.q_max.append(3.1459)
        self.q_min.append(-3.1459);self.q_max.append(3.1459)
        self.q_min.append(-3.1459);self.q_max.append(3.1459)
        self.q_min.append(-3.1459);self.q_max.append(3.1459)
        # How finely to sample each joint
        self.q_sample = [0.05, 0.05, 0.05, 0.1, 0.1, 0.1, 0.1]
        self.joint_names = ["lwr_arm_0_joint",
                            "lwr_arm_1_joint",
                            "lwr_arm_2_joint",
                            "lwr_arm_3_joint",
                            "lwr_arm_4_joint",
                            "lwr_arm_5_joint",
                            "lwr_arm_6_joint"]

        # Subscribes to information about what the current joint values are.
        rospy.Subscriber("/joint_states", sensor_msgs.msg.JointState,
                         self.joint_states_callback)

        # Subscribe to command for motion planning goal
        rospy.Subscriber("/motion_planning_goal", geometry_msgs.msg.Transform,
                         self.move_arm_cb)

        # Publish trajectory command
        self.pub_trajectory = rospy.Publisher("/joint_trajectory", trajectory_msgs.msg.JointTrajectory,
                                              queue_size=1)        

        # Initialize variables
        self.joint_state = sensor_msgs.msg.JointState()

        # Wait for moveit IK service
        rospy.wait_for_service("compute_ik")
        self.ik_service = rospy.ServiceProxy('compute_ik',  moveit_msgs.srv.GetPositionIK)
        print "IK service ready"

        # Wait for validity check service
        rospy.wait_for_service("check_state_validity")
        self.state_valid_service = rospy.ServiceProxy('check_state_validity',  
                                                      moveit_msgs.srv.GetStateValidity)
        print "State validity service ready"

        # Initialize MoveIt
        self.robot = moveit_commander.RobotCommander()
        self.scene = moveit_commander.PlanningSceneInterface()
        self.group_name = "lwr_arm"
        self.group = moveit_commander.MoveGroupCommander(self.group_name)
        print "MoveIt! interface ready"

        # Options
        self.subsample_trajectory = True
        print "Initialization done."

    def get_joint_val(self, joint_state, name):
        if name not in joint_state.name:
            print "ERROR: joint name not found"
            return 0
        i = joint_state.name.index(name)
        return joint_state.position[i]

    def set_joint_val(self, joint_state, q, name):
        if name not in joint_state.name:
            print "ERROR: joint name not found"
        i = joint_state.name.index(name)
        joint_state.position[i] = q

    """ Given a complete joint_state data structure, this function finds the values for
    our arm's set of joints in a particular order and returns a list q[] containing just
    those values.
    """
    def q_from_joint_state(self, joint_state):
        q = []
        for i in range(0,self.num_joints):
            q.append(self.get_joint_val(joint_state, self.joint_names[i]))
        return q

    """ Given a list q[] of joint values and an already populated joint_state, this
    function assumes that the passed in values are for a our arm's set of joints in
    a particular order and edits the joint_state data structure to set the values
    to the ones passed in.
    """
    def joint_state_from_q(self, joint_state, q):
        for i in range(0,self.num_joints):
            self.set_joint_val(joint_state, q[i], self.joint_names[i])

    """ This function will perform IK for a given transform T of the end-effector. It
    returns a list q[] of 7 values, which are the result positions for the 7 joints of
    the left arm, ordered from proximal to distal. If no IK solution is found, it
    returns an empy list.
    """
    def IK(self, T_goal):
        req = moveit_msgs.srv.GetPositionIKRequest()
        req.ik_request.group_name = self.group_name
        req.ik_request.robot_state = moveit_msgs.msg.RobotState()
        req.ik_request.robot_state.joint_state = self.joint_state
        req.ik_request.avoid_collisions = True
        req.ik_request.pose_stamped = geometry_msgs.msg.PoseStamped()
        req.ik_request.pose_stamped.header.frame_id = "world_link"
        req.ik_request.pose_stamped.header.stamp = rospy.get_rostime()
        req.ik_request.pose_stamped.pose = convert_to_message(T_goal)
        req.ik_request.timeout = rospy.Duration(3.0)
        res = self.ik_service(req)
        q = []
        if res.error_code.val == res.error_code.SUCCESS:
            q = self.q_from_joint_state(res.solution.joint_state)
        return q

    """ This function checks if a set of joint angles q[] creates a valid state, or
    one that is free of collisions. The values in q[] are assumed to be values for
    the joints of the left arm, ordered from proximal to distal.
    """
    def is_state_valid(self, q):
        req = moveit_msgs.srv.GetStateValidityRequest()
        req.group_name = self.group_name
        current_joint_state = deepcopy(self.joint_state)
        current_joint_state.position = list(current_joint_state.position)
        self.joint_state_from_q(current_joint_state, q)
        req.robot_state = moveit_msgs.msg.RobotState()
        req.robot_state.joint_state = current_joint_state
        res = self.state_valid_service(req)
        return res.valid

    def valid_point_func(self,q_start,q_goal):
        ## inputs : q_start,q_goal Node
        ## outputs : point without colision Node

        step = 0.2
        #### descritize the path
        start_to_goal = q_goal - q_start
        norm_stg = numpy.linalg.norm(start_to_goal)
        unit_stg = start_to_goal/norm_stg
        #n = numpy.max(numpy.abs(unit_stg/self.q_sample))
        #step = (1/n)*unit_stg
        #num_of_points = norm_stg/numpy.linalg.norm(step)
        disc_points = numpy.outer(numpy.arange(step,norm_stg,step),unit_stg) + q_start
        #disc_points = numpy.outer(numpy.arange(1,num_of_points+ 1),step) + q_start
        #print(disc_points)
        past_point = [q_start]
        for disc_point in disc_points:
            #print(self.is_state_valid(disc_point))
            #print(past_point)
            if self.is_state_valid(disc_point) == False:
                return past_point
            past_point = numpy.array([disc_point])

        return q_goal
####################################################################################
    def motion_plan(self, q_start, q_goal, q_min, q_max):
        #print(q_goal)
        # Replace this with your code
        q_space = numpy.array([q_start],dtype=numpy.float64)
        q_index = []
        # start from the second point
        #V = []
        #E = []
        step = 0.5

        #V.append(Node(q_start.tolist()))

        #for itr in range(100):
        while True:
            #print(itr)
            if numpy.allclose(self.valid_point_func(q_space[-1],q_goal),numpy.array([q_goal])):
                #print(self.valid_point_func(q_space[-1],q_goal))
                #print(numpy.array(q_goal))
                nearest_point = q_space[-1]
                q_index.append(len(q_space)-1)
                q_space = numpy.append(q_space,numpy.array([q_goal]),axis=0)

                #node = Node(q_goal)
                # for i in range(len(V)):
                #     if numpy.all(V[i].GetConfig() == nearest_point.tolist()):
                #         parent_node = V[i]
                #         break
                #node.SetParent(-1)
                #V.append(node)
                break

            #### generate random sample
            q_random = []
            for i in range(self.num_joints):                
                #q_random.append(random.uniform(-1,1)*3.1459)
                q_random.append(random.uniform(-3.1459,3.1459))

            #### find nearest point

            q_random_np = numpy.array([q_random])
            vec_to_points = q_space - q_random_np

            index_to_nearest = numpy.argmin(numpy.sqrt(numpy.sum((vec_to_points)**2,axis=1)))
            #print(numpy.sqrt(numpy.sum((vec_to_points)**2)))
            #print(len(vec_to_points))
            nearest_point = q_space[index_to_nearest]

            #### point lies in 0.25 distance from point to the generated point
            nearest_to_random = q_random_np - nearest_point
            norm_ntr = numpy.linalg.norm(nearest_to_random)
            unit_ntr = nearest_to_random/norm_ntr
            # point in directed direction
            next_point = nearest_point + unit_ntr*step

            #### return a valid point
            valid_point = self.valid_point_func(nearest_point,next_point)

            #########################################################################################################


            #### append it to the array
            q_space = numpy.append(q_space,valid_point,axis=0)
            q_index.append(index_to_nearest)
            #node = Node(valid_point[0].tolist())
            #for i in range(len(V)):
            #    if numpy.all(V[i].GetConfig() == nearest_point.tolist()):
            #        parent_node = V[i]
            #        break
            #node.SetParent(parent_node)
            #V.append(node)
            #print(q_space)

        #print('X'*50)
        #node = V[-1]
        #print(q_index)
        #print(q_space)
        q_list = []
        cur_index = q_index[-1]
        q_list.append(q_goal)
        for i in range(len(q_space)):
            if  numpy.all(q_space[cur_index] == q_start):        
                q_list.append(q_start.tolist())
                break
            q_list.append(q_space[cur_index].tolist())
            cur_index = q_index[cur_index - 1]

        #    q_list.append(node.GetConfig())
        #    node = node.GetParent()

        #print(q_start)
        #print(q_goal)
        q_list.reverse()
        #print(q_list)
        return q_list


    def create_trajectory(self, q_list, v_list, a_list, t):
        joint_trajectory = trajectory_msgs.msg.JointTrajectory()
        for i in range(0, len(q_list)):
            point = trajectory_msgs.msg.JointTrajectoryPoint()
            point.positions = list(q_list[i])
            point.velocities = list(v_list[i])
            point.accelerations = list(a_list[i])
            point.time_from_start = rospy.Duration(t[i])
            joint_trajectory.points.append(point)
        joint_trajectory.joint_names = self.joint_names
        return joint_trajectory

    def create_trajectory(self, q_list):
        joint_trajectory = trajectory_msgs.msg.JointTrajectory()
        for i in range(0, len(q_list)):
            point = trajectory_msgs.msg.JointTrajectoryPoint()
            point.positions = list(q_list[i])
            joint_trajectory.points.append(point)
        joint_trajectory.joint_names = self.joint_names
        return joint_trajectory

    def project_plan(self, q_start, q_goal, q_min, q_max):
        q_list = self.motion_plan(q_start, q_goal, q_min, q_max)
        joint_trajectory = self.create_trajectory(q_list)
        return joint_trajectory

    def move_arm_cb(self, msg):
        T = convert_from_trans_message(msg)
        self.mutex.acquire()
        q_start = self.q_from_joint_state(self.joint_state)
        #print(self.is_state_valid([0.9332970208293161, 0.7256278570990718, 2.5594325471094663, 1.7652295464395606, 0.4708529716243601, -0.17818698208150993, 0.5357455372788732]))
        print "Solving IK"
        q_goal = self.IK(T)
        if len(q_goal)==0:
            print "IK failed, aborting"
            self.mutex.release()
            return
        print "IK solved, planning"
        trajectory = self.project_plan(numpy.array(q_start), q_goal, self.q_min, self.q_max)
        if not trajectory.points:
            print "Motion plan failed, aborting"
        else:
            print "Trajectory received with " + str(len(trajectory.points)) + " points"
            self.execute(trajectory)
        self.mutex.release()

    def joint_states_callback(self, joint_state):
        self.mutex.acquire()
        self.joint_state = joint_state
        self.mutex.release()

    def execute(self, joint_trajectory):
        self.pub_trajectory.publish(joint_trajectory)

if __name__ == '__main__':
    moveit_commander.roscpp_initialize(sys.argv)
    rospy.init_node('move_arm', anonymous=True)
    ma = MoveArm()
    rospy.spin()


Comment  Read more

21. RBF Neural Network with cartpole

|

Modifications for CartPole

  • Can we use the RBF Network + Q learning script with CartPole?
  • Yes But with some modifications

SGDRegressor

  • we are going to build our own
  • Practice building a linear gradient descent model
  • We’ll use the sampe API as scikit learn’s SGDRegressor
  • Can’t use Optimistic initial value

RBF Example

  • for mountain car, we sampled from the state space to get exemplars
  • mountain car state space is bounded in a small region
  • Cartpole state space is not
  • Unfortunately the sample() method samples uniformly from all possible values, not proportional to how likely they are
  • therefore, these would be poor exemplars
  • instead, just “guess” a plausible range( or collect data to find it)
  • use different sacles too
No <= observation_examples = np.array([env.observation_space.sample() for in range(20000)])
YES <= observation_examples = np.random.random((2000,4))*2 -2

Import Library

# https://deeplearningcourses.com/c/deep-reinforcement-learning-in-python
# https://www.udemy.com/deep-reinforcement-learning-in-python
from __future__ import print_function, division
from builtins import range
# Note: you may need to update your version of future
# sudo pip install -U future
# Inspired by https://github.com/dennybritz/reinforcement-learning

# Works best w/ multiply RBF kernels at var=0.05, 0.1, 0.5, 1.0

import gym
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
from gym import wrappers
from datetime import datetime
from sklearn.pipeline import FeatureUnion
from sklearn.preprocessing import StandardScaler
from sklearn.kernel_approximation import RBFSampler
from q_learning_bins import plot_running_avg


SGDRegressor

class SGDRegressor:
  def __init__(self, D):
    self.w = np.random.randn(D) / np.sqrt(D)
    self.lr = 0.1

  def partial_fit(self, X, Y):
    self.w += self.lr*(Y - X.dot(self.w)).dot(X) # w is weight

  def predict(self, X):
    return X.dot(self.w) # w is weight

FeatureTransformer

class FeatureTransformer:
  def __init__(self, env):
    # observation_examples = np.array([env.observation_space.sample() for x in range(10000)])
    # NOTE!! state samples are poor, b/c you get velocities --> infinity
    observation_examples = np.random.random((20000, 4))*2 - 1
    # 20000만개 4 value in list
    scaler = StandardScaler()
    scaler.fit(observation_examples)

    # Used to converte a state to a featurizes represenation.
    # We use RBF kernels with different variances to cover different parts of the space
    featurizer = FeatureUnion([
            ("rbf1", RBFSampler(gamma=0.05, n_components=1000)),
            ("rbf2", RBFSampler(gamma=1.0, n_components=1000)),
            ("rbf3", RBFSampler(gamma=0.5, n_components=1000)),
            ("rbf4", RBFSampler(gamma=0.1, n_components=1000))
            ])
    feature_examples = featurizer.fit_transform(scaler.transform(observation_examples))

    self.dimensions = feature_examples.shape[1]
    self.scaler = scaler
    self.featurizer = featurizer

  def transform(self, observations):
    scaled = self.scaler.transform(observations)
    return self.featurizer.transform(scaled)

Model

# Holds one SGDRegressor for each action
class Model:
  def __init__(self, env, feature_transformer):
    self.env = env
    self.models = []
    self.feature_transformer = feature_transformer
    for i in range(env.action_space.n):
      model = SGDRegressor(feature_transformer.dimensions)
      self.models.append(model)

  def predict(self, s):
    X = self.feature_transformer.transform(np.atleast_2d(s))
    result = np.stack([m.predict(X) for m in self.models]).T
    return result

  def update(self, s, a, G):
    X = self.feature_transformer.transform(np.atleast_2d(s))
    self.models[a].partial_fit(X, [G])

  def sample_action(self, s, eps):
    if np.random.random() < eps:
      return self.env.action_space.sample()
    else:
      return np.argmax(self.predict(s))

play

def play_one(env, model, eps, gamma):
  observation = env.reset()
  done = False
  totalreward = 0
  iters = 0
  while not done and iters < 2000:
    # if we reach 2000, just quit, don't want this going forever
    # the 200 limit seems a bit early
    action = model.sample_action(observation, eps)
    prev_observation = observation
    observation, reward, done, info = env.step(action)

    if done:
      reward = -200

    # update the model
    next = model.predict(observation)
    # print(next.shape)
    assert(next.shape == (1, env.action_space.n))
    G = reward + gamma*np.max(next)
    model.update(prev_observation, action, G)

    if reward == 1: # if we changed the reward to -200
      totalreward += reward
    iters += 1

  return totalreward

Main


def main():
  env = gym.make('CartPole-v0')
  ft = FeatureTransformer(env)
  model = Model(env, ft)
  gamma = 0.99

  if 'monitor' in sys.argv:
    filename = os.path.basename(__file__).split('.')[0]
    monitor_dir = './' + filename + '_' + str(datetime.now())
    env = wrappers.Monitor(env, monitor_dir)


  N = 500
  totalrewards = np.empty(N)
  costs = np.empty(N)
  for n in range(N):
    eps = 1.0/np.sqrt(n+1)
    totalreward = play_one(env, model, eps, gamma)
    totalrewards[n] = totalreward
    if n % 100 == 0:
      print("episode:", n, "total reward:", totalreward, "eps:", eps, "avg reward (last 100):", totalrewards[max(0, n-100):(n+1)].mean())

  print("avg reward for last 100 episodes:", totalrewards[-100:].mean())
  print("total steps:", totalrewards.sum())

  plt.plot(totalrewards)
  plt.title("Rewards")
  plt.show()

  plot_running_avg(totalrewards)


if __name__ == '__main__':
  main()

Reference:

Artificial Intelligence Reinforcement Learning

Advance AI : Deep-Reinforcement Learning

Cutting-Edge Deep-Reinforcement Learning

Comment  Read more

5. Optimization basics

|

Comment  Read more

5. Inverse Kinematics

|

Inverse Kinematics

1. General Derivation(起源)

  • Not possible for serial Kinematics
  • Must be derived(获得) with geometrical intuition
  • Not always possible to find in closed-form
  • Numerical solutions sometimes required

2. Non-Linear Porblem

  • Example: TCP moves linearly along axis z
  • Joints movements are highly non-linear

2.1 Non-unique solution

  • Same TCP pose can be reached with different joints configurations
    • to solve this, we give singularities
  • infinite joints solution exist for one single TCP pose to reduce
  • Dependent linear value

3. Inverse Kinematics Steps

  1. Decouple wrist from arm
    • find out that the position of the wrist point only depends on the first three joint axes of the robot
    • moving the fourth, fifth, sixth joint will not modify the position of the wrist point
    • so we can split into two part (wrist, arm) it called Decoupling

3. Base Frame and Tool

4. Adding a tool

5. Couple

  • output of inverse kinematics is position of real joints

  • They must be adjusted to calculate correct motor on target positions

Comment  Read more

4. Cartesian Control

|

  • to understand TF look at the my post

Cartesian Control

Cartesian Control

#!/usr/bin/env python

import math
import numpy
import time
from threading import Thread, Lock

import rospy
import tf
from geometry_msgs.msg import Transform
from sensor_msgs.msg import JointState
from std_msgs.msg import Float32
from urdf_parser_py.urdf import URDF

def S_matrix(w):
    S = numpy.zeros((3,3))
    S[0,1] = -w[2]
    S[0,2] =  w[1]
    S[1,0] =  w[2]
    S[1,2] = -w[0]
    S[2,0] = -w[1]
    S[2,1] =  w[0]
    return S

# This is the function that must be filled in as part of the Project.
def cartesian_control(joint_transforms, b_T_ee_current, b_T_ee_desired,
                      red_control, q_current, q0_desired):
    num_joints = len(joint_transforms)
    dq = numpy.zeros(num_joints)
    #-------------------- Fill in your code here ---------------------------
    J = numpy.zeros((6,num_joints))
    ee_V_ee = numpy.zeros(6) # array([0., 0., 0., 0., 0., 0.])

    T_to_desired = numpy.dot(numpy.linalg.inv(b_T_ee_current),b_T_ee_desired) # symmetry make

    angle, axis = rotation_from_matrix(T_to_desired)
    ## in base frame
    delta_p = T_to_desired[:3,3] #
    delta_theta = numpy.dot(axis,angle)

    p_dot = delta_p*1
    theta_dot = delta_theta*1

    #ee_T_b = numpy.linalg.inv(b_T_ee_current)
    ee_R_b = T_to_desired[:3,:3]

    ee_V_ee[:3] = numpy.dot(ee_R_b,p_dot)
    ee_V_ee[3:] = numpy.dot(ee_R_b,theta_dot)

    for i in range(num_joints):
        J[:,i] = adjoint_matrix(numpy.dot(numpy.linalg.inv(b_T_ee_current),joint_transforms[i]))[:,5]
    dq = numpy.dot(numpy.linalg.pinv(J,0.01),ee_V_ee)
    #----------------------------------------------------------------------
    return dq

def convert_from_message(t):
    trans = tf.transformations.translation_matrix((t.translation.x,
                                                  t.translation.y,
                                                  t.translation.z))
    rot = tf.transformations.quaternion_matrix((t.rotation.x,
                                                t.rotation.y,
                                                t.rotation.z,
                                                t.rotation.w))
    T = numpy.dot(trans,rot)
    return T

def adjoint_matrix(i_T_j):

    i_p_j = numpy.linalg.inv(i_T_j)[:3, 3] # raw column
    i_R_j = i_T_j[:3, :3]
    Ad = numpy.zeros((6,6))
    Ad[:3,:3] = i_R_j
    Ad[3:,3:] = i_R_j
    Ad[:3,3:] = -numpy.dot(i_R_j,S_matrix(i_p_j))

    return Ad

# Returns the angle-axis representation of the rotation contained in the input matrix
# Use like this:
# angle, axis = rotation_from_matrix(R)
def rotation_from_matrix(matrix):
    R = numpy.array(matrix, dtype=numpy.float64, copy=False)
    R33 = R[:3, :3]
    # axis: unit eigenvector of R33 corresponding to eigenvalue of 1
    l, W = numpy.linalg.eig(R33.T)
    i = numpy.where(abs(numpy.real(l) - 1.0) < 1e-8)[0]
    if not len(i):
        raise ValueError("no unit eigenvector corresponding to eigenvalue 1")
    axis = numpy.real(W[:, i[-1]]).squeeze()
    # point: unit eigenvector of R33 corresponding to eigenvalue of 1
    l, Q = numpy.linalg.eig(R)
    i = numpy.where(abs(numpy.real(l) - 1.0) < 1e-8)[0]
    if not len(i):
        raise ValueError("no unit eigenvector corresponding to eigenvalue 1")
    # rotation angle depending on axis
    cosa = (numpy.trace(R33) - 1.0) / 2.0
    if abs(axis[2]) > 1e-8:
        sina = (R[1, 0] + (cosa-1.0)*axis[0]*axis[1]) / axis[2]
    elif abs(axis[1]) > 1e-8:
        sina = (R[0, 2] + (cosa-1.0)*axis[0]*axis[2]) / axis[1]
    else:
        sina = (R[2, 1] + (cosa-1.0)*axis[1]*axis[2]) / axis[0]
    angle = math.atan2(sina, cosa)
    return angle, axis

class CartesianControl(object):

    #Initialization
    def __init__(self):
        #Loads the robot model, which contains the robot's kinematics information
        self.robot = URDF.from_parameter_server()

        #Subscribes to information about what the current joint values are.
        rospy.Subscriber("/joint_states", JointState, self.joint_callback)

        #Subscribes to command for end-effector pose
        rospy.Subscriber("/cartesian_command", Transform, self.command_callback)

        #Subscribes to command for redundant dof
        rospy.Subscriber("/redundancy_command", Float32, self.redundancy_callback)

        # Publishes desired joint velocities
        self.pub_vel = rospy.Publisher("/joint_velocities", JointState, queue_size=1)

        #This is where we hold the most recent joint transforms
        self.joint_transforms = []
        self.q_current = []
        self.x_current = tf.transformations.identity_matrix()
        self.R_base = tf.transformations.identity_matrix()
        self.x_target = tf.transformations.identity_matrix()
        self.q0_desired = 0
        self.last_command_time = 0
        self.last_red_command_time = 0

        # Initialize timer that will trigger callbacks
        self.mutex = Lock()
        self.timer = rospy.Timer(rospy.Duration(0.1), self.timer_callback)

    def command_callback(self, command):
        self.mutex.acquire()
        self.x_target = convert_from_message(command)
        self.last_command_time = time.time()
        self.mutex.release()

    def redundancy_callback(self, command):
        self.mutex.acquire()
        self.q0_desired = command.data
        self.last_red_command_time = time.time()
        self.mutex.release()        

    def timer_callback(self, event):
        msg = JointState()
        self.mutex.acquire()
        if time.time() - self.last_command_time < 0.5:
            dq = cartesian_control(self.joint_transforms,
                                   self.x_current, self.x_target,
                                   False, self.q_current, self.q0_desired)
            msg.velocity = dq
        elif time.time() - self.last_red_command_time < 0.5:
            dq = cartesian_control(self.joint_transforms,
                                   self.x_current, self.x_current,
                                   True, self.q_current, self.q0_desired)
            msg.velocity = dq
        else:            
            msg.velocity = numpy.zeros(7)
        self.mutex.release()
        self.pub_vel.publish(msg)

    def joint_callback(self, joint_values):
        root = self.robot.get_root()
        T = tf.transformations.identity_matrix()
        self.mutex.acquire()
        self.joint_transforms = []
        self.q_current = joint_values.position
        self.process_link_recursive(root, T, joint_values)
        self.mutex.release()

    def align_with_z(self, axis):
        T = tf.transformations.identity_matrix()
        z = numpy.array([0,0,1])
        x = numpy.array([1,0,0])
        dot = numpy.dot(z,axis)
        if dot == 1: return T
        if dot == -1: return tf.transformation.rotation_matrix(math.pi, x)
        rot_axis = numpy.cross(z, axis)
        angle = math.acos(dot)
        return tf.transformations.rotation_matrix(angle, rot_axis)

    def process_link_recursive(self, link, T, joint_values):
        if link not in self.robot.child_map:
            self.x_current = T
            return
        for i in range(0,len(self.robot.child_map[link])):
            (joint_name, next_link) = self.robot.child_map[link][i]
            if joint_name not in self.robot.joint_map:
                rospy.logerror("Joint not found in map")
                continue
            current_joint = self.robot.joint_map[joint_name]        

            trans_matrix = tf.transformations.translation_matrix((current_joint.origin.xyz[0],
                                                                  current_joint.origin.xyz[1],
                                                                  current_joint.origin.xyz[2]))
            rot_matrix = tf.transformations.euler_matrix(current_joint.origin.rpy[0],
                                                         current_joint.origin.rpy[1],
                                                         current_joint.origin.rpy[2], 'rxyz')
            origin_T = numpy.dot(trans_matrix, rot_matrix)
            current_joint_T = numpy.dot(T, origin_T)
            if current_joint.type != 'fixed':
                if current_joint.name not in joint_values.name:
                    rospy.logerror("Joint not found in list")
                    continue
                # compute transform that aligns rotation axis with z
                aligned_joint_T = numpy.dot(current_joint_T, self.align_with_z(current_joint.axis))
                self.joint_transforms.append(aligned_joint_T)
                index = joint_values.name.index(current_joint.name)
                angle = joint_values.position[index]
                joint_rot_T = tf.transformations.rotation_matrix(angle,
                                                                 numpy.asarray(current_joint.axis))
                next_link_T = numpy.dot(current_joint_T, joint_rot_T)
            else:
                next_link_T = current_joint_T

            self.process_link_recursive(next_link, next_link_T, joint_values)

if __name__ == '__main__':
    rospy.init_node('cartesian_control', anonymous=True)
    cc = CartesianControl()
    rospy.spin()



Comment  Read more