main.py 11 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Auther :liuyuqi.gov@msn.cn
  5. @Time :2018/7/4 15:55
  6. @File :main.py
  7. '''
  8. import time
  9. from configparser import ConfigParser
  10. import pandas as pd
  11. import utils.save_conf
  12. import utils.save_result
  13. class Scheduling():
  14. '''
  15. 调度
  16. '''
  17. # 参数
  18. alpha = 10
  19. beta = 0.5
  20. T = 98
  21. EXEC_LIMIT = 100000
  22. # 静态数据 n:app数 N:inst数 m:machine数 k:资源种类
  23. num_app = num_inst = num_mac = 0
  24. num_k = 200
  25. cpuIter = list()
  26. appIndex = {}
  27. machineIndex = {}
  28. inst2AppIndex = {}
  29. # apps = list()
  30. # machines = list()
  31. appResources = list() # app表,
  32. machineResources = list() # machine表
  33. instanceDeploy = list() # instance表
  34. appInterference = list() # app_interfence冲突表
  35. # 动态数据
  36. inst2MachineRemine = {}
  37. machineResourcesUsed = list()
  38. machineHasApp = list() # 6000 [{}, {},{6004=1, 9126=1, 1598=1}, {}, {}, {},
  39. inst2Machine = list()
  40. result = pd.DataFrame(columns=list(["instanceid", "machineid"]), data=list())
  41. def __init__(self, **kw):
  42. '''
  43. 初始化参数
  44. :param kw:
  45. '''
  46. for k, v in kw.items():
  47. setattr(self, k, v)
  48. def loadData(self):
  49. for i in range(self.T):
  50. self.cpuIter.append(i)
  51. app_interference, app_resources, instance_deploy, machine_resources = self.getConfig()
  52. # 1.app_resources 9338*201
  53. self.appResources = pd.read_csv(app_resources, header=None,
  54. names=list(["appid", "cpu", "mem", "disk", "P", "M", "PM"]), encoding="utf-8")
  55. tmp_cpu = self.appResources["cpu"].str.split('|', expand=True).astype('float')
  56. tmp_mem = self.appResources["mem"].str.split('|', expand=True).astype('float')
  57. for i in range(self.T):
  58. self.appResources["cpu_" + str(i)] = tmp_cpu[i]
  59. self.appResources["mem_" + str(i)] = tmp_mem[i]
  60. # 去掉cpu/men两列
  61. self.appResources.pop("cpu")
  62. self.appResources.pop("mem")
  63. self.num_app, col = self.appResources.shape # 9338*201 201列:appid,cpu_1,cpu_2,...mem_1,men_2....,P,M,PM
  64. self.appResources["appid"] = pd.to_numeric(self.appResources["appid"].str.split("_", expand=True)[1].values)
  65. # 2.machine_resources 6000*201
  66. self.machineResources = pd.read_csv(machine_resources, header=None, names=list(
  67. ["machineid", "cpu", "mem", "disk", "P", "M", "PM"]), encoding="utf-8")
  68. self.num_mac, col = self.machineResources.shape
  69. for i in range(self.T):
  70. self.machineResources["cpu_" + str(i)] = self.machineResources["cpu"]
  71. self.machineResources["mem_" + str(i)] = self.machineResources["mem"]
  72. self.machineResources.pop("cpu")
  73. self.machineResources.pop("mem")
  74. self.machineResources["machineid"] = pd.to_numeric(
  75. self.machineResources["machineid"].str.split("_", expand=True)[1].values)
  76. self.machineResourcesUsed = self.machineResources.copy()
  77. for i in range(200):
  78. self.machineResourcesUsed.iloc[:, i + 1] = 0
  79. # 初始化 6000个空字典组成的list[{},{}....]
  80. for i in range(self.num_mac):
  81. self.machineHasApp.append({})
  82. # 3.instance_deploy
  83. self.instanceDeploy = pd.read_csv(instance_deploy, header=None,
  84. names=list(["instanceid", "appid", "machineid"]), encoding="utf-8")
  85. # 增加一个字段标注是否部署
  86. self.instanceDeploy["isdeploy"] = False
  87. self.instanceDeploy["instanceid"] = pd.to_numeric(
  88. self.instanceDeploy["instanceid"].str.split("_", expand=True)[1].values)
  89. self.instanceDeploy["appid"] = pd.to_numeric(self.instanceDeploy["appid"].str.split("_", expand=True)[1].values)
  90. self.instanceDeploy["machineid"] = pd.to_numeric(
  91. self.instanceDeploy["machineid"].str.split("_", expand=True)[1].values)
  92. # 4.app_interference 冲突表
  93. self.appInterference = pd.read_csv(app_interference, header=None,
  94. names=list(["appid1", "appid2", "max_interference"]), encoding="utf-8")
  95. self.appInterference["appid1"] = pd.to_numeric(
  96. self.appInterference["appid1"].str.split("_", expand=True)[1].values)
  97. self.appInterference["appid2"] = pd.to_numeric(
  98. self.appInterference["appid2"].str.split("_", expand=True)[1].values)
  99. # instance按照磁盘消耗排序
  100. self.num_app, col = self.appResources.shape
  101. self.num_inst, col = self.instanceDeploy.shape
  102. def getConfig(self):
  103. '''
  104. step1: 数据参数初始化
  105. :return:
  106. '''
  107. # 生成配置文件
  108. # self.init_conf()
  109. # 读取配置文件
  110. cf = ConfigParser()
  111. config_path = "../conf/config.ini"
  112. section_name = "data_file_name"
  113. cf.read(config_path)
  114. app_interference = cf.get(section_name, "app_interference")
  115. app_resources = cf.get(section_name, "app_resources")
  116. instance_deploy = cf.get(section_name, "instance_deploy")
  117. machine_resources = cf.get(section_name, "machine_resources")
  118. return app_interference, app_resources, instance_deploy, machine_resources
  119. def init_conf(self):
  120. '''
  121. 初始化配置文件
  122. :retur
  123. '''
  124. utils.save_conf.write()
  125. def sort_dynamic(self):
  126. print("ss")
  127. def pickInstance(self, instanceid):
  128. '''
  129. 先将instance从部署的主机中删除,删除一行,释放资源
  130. :return:
  131. '''
  132. if instanceid not in self.inst2Machine["instance"]:
  133. return
  134. appid = self.inst2Machine[self.inst2Machine["instanceid"] == instanceid]["appid"].values[0]
  135. # 更新machineResourcesUsed
  136. for i in range(self.num_k):
  137. machineResourcesUsed[fromMachine][j] -= appResources[appIt][i]
  138. fromMachine = self.inst2AppIndex
  139. self.inst2Machine.pop(instanceid)
  140. def toMachine(self, instanceid, machineid, doCheck=False):
  141. '''
  142. 检查互斥条件,然后把instance放入主机
  143. :rtype: object
  144. :param instanceid: 实例id
  145. :param machineid: 主机id
  146. :param doCheck: 是否检测资源限制
  147. :return: True和False
  148. '''
  149. # instanceid所属的appid
  150. appid = self.instanceDeploy[self.instanceDeploy["instanceid"] == instanceid]["appid"].values[0]
  151. # machineid从1开始,而index从0开始
  152. hasApp = self.machineHasApp[int(machineid - 1)]
  153. if doCheck:
  154. # 检查互斥
  155. # 检查资源限制
  156. for i in range(self.num_k):
  157. if (
  158. self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:,
  159. i + 1].values[0] +
  160. self.appResources[self.appResources["appid"] == appid].iloc[:, i + 1].values[0]
  161. >
  162. self.machineResources[self.machineResources["machineid"] == machineid].iloc[:,
  163. i + 1].values[0]):
  164. print("Resource Limit: instance: ", instanceid, ",", "machine:", machineid,
  165. self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:,
  166. i + 1].values[0], "+",
  167. self.appResources[self.appResources["appid"] == appid].iloc[:, i + 1].values[0], " >",
  168. self.machineResources[self.machineResources["machineid"] == machineid].iloc[:,
  169. i + 1].values[0])
  170. # 如果不符合则 return False
  171. return False
  172. # 将inst放入新的machine,占用资源
  173. self.inst2Machine.append([{"instanceid": instanceid, "machineid": machineid}])
  174. if appid not in hasApp:
  175. hasApp.update({appid: 1})
  176. else:
  177. hasApp.update({appid: hasApp.get(appid) + 1})
  178. for i in range(self.num_k):
  179. self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:, i + 1].values[
  180. 0] += \
  181. self.appResources[self.appResources["appid"] == appid].iloc[:, i + 1].values[0]
  182. return True
  183. def run(self, start):
  184. '''
  185. 执行部署
  186. :return:
  187. '''
  188. # 已经部署的instance
  189. deployed_Instance = self.instanceDeploy.loc[pd.isna(self.instanceDeploy["machineid"]) == False]
  190. count_deployed_Instance, col = deployed_Instance.shape
  191. deployed_Instance.reset_index(drop=True, inplace=True)
  192. # 将已经部署的instance放置到对应主机中,占用相应资源,这一块代码比java慢了太多
  193. for i in range(count_deployed_Instance):
  194. instanceid = deployed_Instance["instanceid"][i]
  195. machineid = deployed_Instance["machineid"][i]
  196. self.toMachine(instanceid, machineid, doCheck=False)
  197. print("初始部署第", i, "个,持续耗时", time.time() - start, "秒")
  198. # 对instance同样按照disk消耗排序
  199. self.instanceDeploy = self.instanceDeploy.sort_values(ascending=False, by="disk")
  200. # 然后通过ff方法,把instance放入machine中。每次放入instance后,队列删除,每次消耗主机i,删除主机i.先使用大主机,磁盘优先计算限制条件
  201. row1, col = self.instanceDeploy.shape
  202. while row1 > 0:
  203. # 先对主机列表按照disk剩余进行排序,降序
  204. self.machineResourcesUsed = self.machineResourcesUsed.sort_values(ascending=False, by="disk")
  205. # 每部署一次,消耗一个主机
  206. self.deployInstance()
  207. # 筛选未部署的
  208. self.instanceDeploy = self.instanceDeploy[self.instanceDeploy["isdeploy"] == False]
  209. row, col = self.instanceDeploy.shape
  210. self.instanceDeploy.reset_index(drop=True, inplace=True)
  211. j = j + 1
  212. print("已经部署:", 68219 - row, "剩余部署Instance数据:", row)
  213. print("已经消耗Machine主机数据:", j)
  214. print("部署方案前几条示意:", self.result.head())
  215. utils.save_result.save_result(self.result)
  216. def dcmp(self, x):
  217. '''
  218. 将结果映射到-1,0,1
  219. :param x:
  220. :return:
  221. '''
  222. if abs(x) < 1e-9:
  223. return 0
  224. elif x > 0:
  225. return 1
  226. else:
  227. return -1
  228. def deployInstance(self):
  229. '''
  230. 部署逻辑
  231. :return:
  232. '''
  233. # 主机:self.machineResourcesUsed["machineid"][0]
  234. for row in self.instanceDeploy.itertuples():
  235. i = row.Index
  236. # 当前row实例尝试部署到新主机,如果可以部署则部署,如果初始已经部署,则算迁移,释放原来主机资源
  237. if self.toMachine(row, machineid="", doCheck=True):
  238. machineHasApp = machineHasApp.append(pd.DataFrame(
  239. [{"instanceid": row.instanceid,
  240. "machineid": "machine_" + str(j)}]))
  241. if __name__ == '__main__':
  242. print("------------开始部署啦--------------")
  243. start = time.time()
  244. scheduling = Scheduling()
  245. # 加载数据
  246. scheduling.loadData()
  247. print("加载数据耗时:", time.time() - start)
  248. # 开始调度
  249. scheduling.run(start)
  250. # 部署完事
  251. print("------------部署完啦--------------")
  252. end = time.time()
  253. print("总共耗时:", end - start, "秒")