######################## ## ## Traffic Controller ## ######################## import subprocess import os from threading import Lock class Traffic_Controller: process = None def __init__(self): self.process = subprocess.Popen(['./tc-launcher'], stdout = subprocess.PIPE, stdin = subprocess.PIPE) self.mutex = Lock() # Reset log file if os.path.exists("tc.log"): os.remove("tc.log") def __exit__(self, exc_type, exc_value, exc_traceback): process.kill() print('tc-launcher auxiliary process killed') ##### TC def TC_Init(self): self.mutex.acquire() self.process.stdin.write(bytes("TC_Init\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def TC_Generation(self): self.mutex.acquire() self.process.stdin.write(bytes("TC_Generation\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def TC_Build_Version(self): self.mutex.acquire() self.process.stdin.write(bytes("TC_Build_Version\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def TC_Start_Mission(self): self.mutex.acquire() self.process.stdin.write(bytes("TC_Start_Mission\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def TC_Get_Num_Steps(self): self.mutex.acquire() self.process.stdin.write(bytes("TC_Get_Num_Steps\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def TC_End_Mission(self): self.mutex.acquire() self.process.stdin.write(bytes("TC_End_Mission\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() ##### Vehicle def Vehicle_Set_Current_Pos(self, vehicle, pos): self.mutex.acquire() self.process.stdin.write(bytes("Vehicle_Set_Current_Pos " + str(vehicle) + " " + str(pos) + "\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def Vehicle_Get_Current_Pos(self, vehicle): self.mutex.acquire() self.process.stdin.write(bytes("Vehicle_Get_Current_Pos " + str(vehicle) + "\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def Vehicle_Get_Dis_To_Pos(self, vehicle, pos): self.mutex.acquire() self.process.stdin.write(bytes("Vehicle_Get_Dis_To_Pos " + str(vehicle) + " " + str(pos) + "\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def Vehicle_Set_Goal_Pos(self, vehicle, pos): self.mutex.acquire() self.process.stdin.write(bytes("Vehicle_Set_Goal_Pos " + str(vehicle) + " "+ str(pos) + "\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def Vehicle_Get_Next_Pos(self, vehicle): self.mutex.acquire() self.process.stdin.write(bytes("Vehicle_Get_Next_Pos " + str(vehicle) + "\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def Vehicle_Get_Step_Pos(self, vehicle, step): self.mutex.acquire() self.process.stdin.write(bytes("Vehicle_Get_Step_Pos " + str(vehicle) + " " + str(step) + "\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def Vehicle_Moved_Forward(self): self.mutex.acquire() self.process.stdin.write(bytes("Vehicle_Moved_Forward\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def Vehicle_Goal_Reached(self): self.mutex.acquire() self.process.stdin.write(bytes("Vehicle_Goal_Reached\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') ##### Environment def Environment_Clean(self): self.mutex.acquire() self.process.stdin.write(bytes("Environment_Clean\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def Environment_Build(self): self.mutex.acquire() self.process.stdin.write(bytes("Environment_Build\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def Environment_Get_Max_Pos(self): self.mutex.acquire() self.process.stdin.write(bytes("Environment_Get_Max_Pos\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def Environment_Get_Max_Roads(self): self.mutex.acquire() self.process.stdin.write(bytes("Environment_Get_Max_Roads\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def Environment_Add_One_Way_Road(self, pos1, pos2): self.mutex.acquire() self.process.stdin.write(bytes("Environment_Add_One_Way_Road " + str(pos1) + " " + str(pos2) + "\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def Environment_Add_Two_Way_Road_One_Lane(self, pos1, pos2): self.mutex.acquire() self.process.stdin.write(bytes("Environment_Add_Two_Way_Road_One_Lane " + str(pos1) + " " + str(pos2) + "\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def Environment_Add_Two_Way_Road_Two_Lanes(self, pos1, pos2): self.mutex.acquire() print("chivato") self.process.stdin.write(bytes("Environment_Add_Two_Way_Road_Two_Lanes " + str(pos1) + " " + str(pos2) + "\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() ##### Obstacles def Obstacles_Clean(self): self.mutex.acquire() self.process.stdin.write(bytes("Obstacles_Clean\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def Obstacles_Add_Obstacle(self, pos1, pos2): self.mutex.acquire() self.process.stdin.write(bytes("Obstacles_Add_Obstacle " + str(pos1) + " " + str(pos2) + "\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def Obstacles_Get_Num(self): self.mutex.acquire() self.process.stdin.write(bytes("Obstacles_Get_Num\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def Obstacles_Present(self, pos1, pos2): self.mutex.acquire() self.process.stdin.write(bytes("Obstacles_Present " + str(pos1) + " " + str(pos2) + "\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def Obstacles_Confirm(self): self.mutex.acquire() self.process.stdin.write(bytes("Obstacles_Confirm\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') ##### Settings def Settings_Set_Max_RAM(self, memory_mb): self.mutex.acquire() self.process.stdin.write(bytes("Settings_Set_Max_RAM " + str(memory_mb) + "\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def Settings_Set_Timeout(self, time_ms): self.mutex.acquire() self.process.stdin.write(bytes("Settings_Set_Timeout " + str(time_ms)+ "\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def Settings_Enable_Logging(self): self.mutex.acquire() self.process.stdin.write(bytes("Settings_Enable_Logging\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() def Settings_Disable_Logging(self): self.mutex.acquire() self.process.stdin.write(bytes("Settings_Disable_Logging\n", 'utf-8')) self.process.stdin.flush() self.mutex.release() ##### Monitoring def Monitoring_Get_Num_Vehicles(self): self.mutex.acquire() self.process.stdin.write(bytes("Monitoring_Get_Num_Vehicles\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') def Monitoring_Get_CPU_Time(self): self.mutex.acquire() self.process.stdin.write(bytes("Monitoring_Get_CPU_Time\n", 'utf-8')) self.process.stdin.flush() ret = self.process.stdout.readline().strip() self.mutex.release() return ret.decode('ascii') # # EOF #