[4de1c7]: / app / LeapRecord.py

Download this file

152 lines (125 with data), 5.3 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import sys
import os
from collections import deque
from threading import Thread
from config.Configuration import env
from resources.LeapSDK.v53_python39 import Leap
from LeapData import LeapData
from resources.pymo.pymo.writers import BVHWriter as Pymo_BVHWriter
# from resources.b3d.bvh_reader import BVH as B3D_BVHReader
# from resources.b3d.c3d_convertor import Convertor as B3D_C3DWriter
from AnyWriter import AnyWriter
from BVHAnimation import bvh_animation
# from resources.virb import Virb
_LEAP_QUEUE = deque()
class LeapRecord(Leap.Listener):
def __init__(self):
super(LeapRecord, self).__init__()
# Initialize Leap2DataFrame parser
self.fps = int(env.config.frames_per_second)
basis_setting = True if env.args('anybody_basis') else False
self.leap2bvh = LeapData(channel_setting=env.config.channels,
frame_rate=1 / self.fps,
anybody_basis=basis_setting)
self.bvh_write = env.config.bvh
if self.bvh_write:
self.bvh_filename = os.path.normpath(
os.path.join(os.path.split(env.config.bvh_path)[0],
os.path.split(env.config.bvh_path)[1].replace(".bvh", "") + '.bvh'))
# self.c3d_write = env.config.c3d
# if self.c3d_write:
# self.c3d_filename = env.config.c3d_path + '\\' + env.config.c3d_filename + '.c3d'
self.anybody_write = env.config.anybody
if self.anybody_write:
self.anybody_template_path = env.config.anybody_template_path + '\\'
self.anybody_output_path = env.config.anybody_output_path + '\\'
self.processing = True
self.t = None
self.last_time = 0
# self.garmin = Virb(host=('192.168.137.34', 80))
def on_init(self, controller):
self.t = Thread(target=self.process_frame, args=(self,))
# self.garmin.start_recording()
self.t.start()
print("Initialized")
def on_connect(self, controller):
print("Connected")
print("=====================")
def on_disconnect(self, controller):
# Note: not dispatched when running in a debugger.
print("Disconnected")
def on_exit(self, controller):
print("=====================")
print("Exited")
# self.garmin.stop_recording()
def on_frame(self, controller):
# Get the most recent frame
frame = controller.frame()
_LEAP_QUEUE.append(frame)
# self.leap2bvh.add_frame(controller.frame())
@staticmethod
def process_frame(listener):
while listener.processing:
try:
while True:
frame = _LEAP_QUEUE.popleft()
if frame.timestamp - listener.last_time > int(1000000 / listener.fps):
added_frame = listener.leap2bvh.add_frame(frame)
if added_frame:
# print(added_frame.timestamp - listener.last_time)
listener.last_time = added_frame.timestamp
except IndexError:
pass
def exit(self):
self.processing = False
self.t.join()
self.exit_actions()
def exit_actions(self):
bvh_data = self.leap2bvh.parse()
if self.bvh_write:
bvh_writer = Pymo_BVHWriter()
bvh_file = open(self.bvh_filename, 'w')
bvh_writer.write(bvh_data, bvh_file)
bvh_file.close()
print('"{}" written'.format(bvh_file.name))
# if self.c3d_write:
# # workaround, need bvh
# bvh_writer = Pymo_BVHWriter()
# bvh_file = open(self.c3d_filename.strip('.c3d') + '-tmp.bvh', 'w')
# bvh_writer.write(bvh_data, bvh_file)
# bvh_file.close()
# print('"{}" written'.format(bvh_file.name))
#
# bvh_reader = B3D_BVHReader()
# if not bvh_reader.load_from_file(bvh_file.name):
# raise Exception('error: can not read "{}"'.format(bvh_file.name))
#
# c3d_writer = B3D_C3DWriter()
# c3d_writer.convert(bvh_reader, self.c3d_filename)
# print('"{}" written from "{}"'.format(self.c3d_filename, bvh_file.name))
#
# os.remove(bvh_file.name)
# print('"{}" deleted'.format(bvh_file.name))
if self.anybody_write:
AnyWriter(template_directory=self.anybody_template_path,
output_directory=self.anybody_output_path
).write(bvh_data)
print('Anybody files written to "{}"'.format(self.anybody_output_path))
bvh_animation.bvh_data = bvh_data
def start_recording():
# Create a listener and controller
listener = LeapRecord()
controller = Leap.Controller()
# Have the listener receive events from the controller
controller.add_listener(listener)
# Keep this process running until Enter is pressed
print("Listener added")
try:
sys.stdin.readline()
except KeyboardInterrupt:
pass
finally:
# Remove the listener when done
controller.remove_listener(listener)
print("Listener removed")
listener.exit()