t_sort_by_disk.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. 由于数据很大,测试使用部分数据!
  5. 按照磁盘占用率从大到小装箱,即按照磁盘先用完为止进行分配实例到主机。
  6. @Auther :liuyuqi.gov@msn.cn
  7. @Time :2018/7/7 0:43
  8. @File :sort_by_disk.py
  9. '''
  10. import matplotlib
  11. matplotlib.use('Agg')
  12. import pandas as pd
  13. import time
  14. import libs.save_result
  15. df1 = pd.read_csv("../data/scheduling_preliminary_app_resources_20180606.csv", encoding="utf-8")
  16. df3 = pd.read_csv("../data/test-instance.csv")
  17. # print(df3["cpu"].value_counts())
  18. # print(df3.head())
  19. df3["cpu"] = df3["cpu"].astype("float")
  20. df3["disk"] = df3["disk"].astype("float")
  21. df3["mem"] = df3["mem"].astype("float")
  22. df3["M"] = df3["M"].astype("float")
  23. df3["P"] = df3["P"].astype("float")
  24. df3["PM"] = df3["PM"].astype("float")
  25. df3["isdeploy"] = False
  26. # machine
  27. # 其实就两类,所以就不需要导入数据了。
  28. # 限制表
  29. df4 = pd.read_csv("../data/scheduling_preliminary_app_interference_20180606.csv", header=None,
  30. names=list(["appid1", "appid2", "max_interference"]), encoding="utf-8")
  31. result = pd.DataFrame(columns=list(["instanceid", "machineid"]), data=list())
  32. tem_pre_disk = tem_pre_mem = tem_pre_cpu = tem_pre_P = tem_pre_M = tem_pre_PM = 0
  33. tem_disk = tem_mem = tem_cpu = tem_P = tem_M = tem_PM = 0
  34. tmp_stand_cpu1 = 32
  35. tmp_stand_mem1 = 64
  36. tmp_stand_disk1 = 600
  37. tmp_stand_cpu2 = 92
  38. tmp_stand_mem2 = 288
  39. tmp_stand_disk2 = 600
  40. tmp_stand_P = 7
  41. tmp_stand_M1 = 3
  42. tmp_stand_M2 = 7
  43. tmp_stand_PM1 = 7
  44. tmp_stand_PM2 = 9
  45. machine_count = 0 # 3000小机器,3000大机器。所以在小机器用完换大机器
  46. j = 1 # j表示主机序号,从1-3000,3001到6000
  47. is_deploy = False # 主机j是否部署了instance
  48. deploy_list = list() # 主机j部署的instanceid实例
  49. # 各app之间的限制
  50. def restrictApps(instance, deploy_list):
  51. len_list = len(deploy_list)
  52. if len_list == 0:
  53. return True
  54. else:
  55. ct = pd.Series(deploy_list).value_counts()
  56. for k, v in ct.items():
  57. tmp = df4.loc[(df4["appid1"] == k) & (df4["appid2"] == instance)]
  58. row, col = tmp.shape
  59. if row > 0:
  60. if ct[instance] + 1 > tmp["max_interference"]:
  61. return False
  62. else:
  63. # 在限制表中找不到限制条件
  64. return True
  65. # 执行部署方案
  66. def deploy():
  67. global j, is_deploy, tem_mem, tem_cpu, tem_disk, tem_P, tem_M, tem_PM, tem_pre_disk, tem_pre_mem, \
  68. tem_pre_cpu, tem_pre_P, tem_pre_M, tem_pre_PM, result, df3, deploy_list
  69. print("------------开始部署啦--------------")
  70. start = time.time()
  71. row, column = df3.shape
  72. while row > 0:
  73. deployInstance()
  74. # 整个instace都遍历了,第j主机无法再放入一个,所以添加j+1主机
  75. df3 = df3[df3["isdeploy"] == False]
  76. row, column = df3.shape
  77. df3 = df3.reset_index(drop=True)
  78. j = j + 1
  79. # j++之后表示新建主机,所以新主机没有部署任何实例,为false,然后初始化所有其他参数
  80. is_deploy = False
  81. tem_pre_disk = tem_pre_mem = tem_pre_cpu = tem_pre_P = tem_pre_M = tem_pre_PM = 0
  82. tem_disk = tem_mem = tem_cpu = tem_P = tem_M = tem_PM = 0
  83. deploy_list = list()
  84. # 部署完事
  85. print("------------部署完啦--------------")
  86. end = time.time()
  87. print("总共耗时:", end - start, "秒")
  88. print("总共需要主机数:", j)
  89. print("部署方案前几条示意:", result.head())
  90. libs.save_result.save_result(result)
  91. def deployInstance():
  92. '''
  93. 根据限制部署实例到主机上
  94. :param row: 根据剩余的instance数量循环
  95. :param j: 第j台主机
  96. :return: 暂未定返回值,None
  97. '''
  98. global is_deploy, tem_mem, tem_cpu, tem_disk, tem_P, tem_M, tem_PM, tem_pre_disk, tem_pre_mem, tem_pre_cpu, tem_pre_P, tem_pre_M, tem_pre_PM, result, j, df3, deploy_list
  99. for row in df3.itertuples():
  100. i = row.Index
  101. tem_pre_cpu = tem_cpu + row.cpu
  102. tem_pre_mem = tem_mem + row.mem
  103. tem_pre_disk = tem_disk + row.disk # 当前磁盘消耗
  104. tem_pre_P = tem_P + row.P
  105. tem_pre_M = tem_M + row.M
  106. tem_pre_PM = tem_PM + row.PM
  107. # if 满足限制表条件,则把当前实例部署到这台主机上。
  108. if j < 3000: # 使用小主机
  109. if is_deploy == True:
  110. if tem_pre_disk < tmp_stand_disk1: # 磁盘够
  111. if restrictApps(instance=row.instanceid, deploy_list=deploy_list):
  112. if tem_pre_mem < tmp_stand_mem1: # 内存够
  113. if tem_pre_cpu < tmp_stand_cpu1: # CPU够
  114. if tem_pre_M < tmp_stand_M1:
  115. if tem_pre_P < tmp_stand_P:
  116. if tem_pre_PM < tmp_stand_PM1:
  117. # 条件都满足,则把instance放入主机,同时df3表中去掉这个部署好的一行
  118. result = result.append(pd.DataFrame(
  119. [{"instanceid": row.instanceid,
  120. "machineid": "machine_" + str(j)}]))
  121. tem_disk = tem_disk + row.disk
  122. tem_mem = tem_mem + row.mem
  123. tem_cpu = tem_cpu + row.cpu
  124. tem_P = tem_P + row.P
  125. tem_M = tem_M + row.M
  126. tem_PM = tem_PM + row.PM
  127. df3.loc[i, "isdeploy"] = True
  128. deploy_list.append(row.instanceid)
  129. else:
  130. # 主机j没有部署实例,则先部署一个
  131. result = result.append(
  132. pd.DataFrame([{"instanceid": row.instanceid, "machineid": "machine_" + str(j)}]))
  133. tem_disk = tem_disk + row.disk
  134. tem_mem = tem_mem + row.mem
  135. tem_cpu = tem_cpu + row.cpu
  136. tem_P = tem_P + row.P
  137. tem_M = tem_M + row.M
  138. tem_PM = tem_PM + row.PM
  139. df3.loc[i, "isdeploy"] = True
  140. deploy_list.append(row.instanceid)
  141. # df3["isdeploy"][i] = True
  142. is_deploy = True
  143. else: # 使用大主机
  144. if is_deploy == True:
  145. if tem_pre_disk < tmp_stand_disk2: # 磁盘够
  146. if restrictApps(instance=row.instanceid, deploy_list=deploy_list):
  147. if tem_pre_mem < tmp_stand_mem2: # 内存够
  148. if tem_pre_cpu < tmp_stand_cpu2: # CPU够
  149. if tem_pre_M < tmp_stand_M2:
  150. if tem_pre_P < tmp_stand_P:
  151. if tem_pre_PM < tmp_stand_PM2:
  152. # 条件都满足,则把instance放入主机
  153. result = result.append(pd.DataFrame(
  154. [{"instanceid": row.instanceid,
  155. "machineid": "machine_" + str(j)}]))
  156. tem_disk = tem_disk + row.disk
  157. tem_mem = tem_mem + row.mem
  158. tem_cpu = tem_cpu + row.cpu
  159. tem_P = tem_P + row.P
  160. tem_M = tem_M + row.M
  161. tem_PM = tem_PM + row.PM
  162. df3.loc[i, "isdeploy"] = True
  163. deploy_list.append(row.instanceid)
  164. else:
  165. # 主机j没有部署实例,则先部署一个
  166. result = result.append(
  167. pd.DataFrame([{"instanceid": row.instanceid, "machineid": "machine_" + str(j)}]))
  168. tem_disk = tem_disk + row.disk
  169. tem_mem = tem_mem + row.mem
  170. tem_cpu = tem_cpu + row.cpu
  171. tem_P = tem_P + row.P
  172. tem_M = tem_M + row.M
  173. tem_PM = tem_PM + row.PM
  174. df3.loc[i, "isdeploy"] = True
  175. deploy_list.append(row.instanceid)
  176. is_deploy = True
  177. deploy()