github.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. #!/usr/bin/env python
  2. # -*- encoding: utf-8 -*-
  3. """
  4. @Contact : liuyuqi.gov@msn.cn
  5. @Time : 2023/06/04 13:43:46
  6. @License : Copyright © 2017-2022 liuyuqi. All Rights Reserved.
  7. @Desc : github get a user all open source repo
  8. """
  9. import os
  10. import json
  11. import csv, subprocess
  12. from repo_sync.repo import Repo
  13. from .base_platform import BasePlatform
  14. class GithubIE(BasePlatform):
  15. """github util"""
  16. _host = 'https://api.github.com'
  17. def __init__(self, username:str, token:str,host:str =None ,params: dict = None) -> None:
  18. """init"""
  19. super().__init__(username=username, token=token)
  20. # 60 unauthenticated requests per hour
  21. if self.token:
  22. self.sess.headers.update({'Accept': 'application/vnd.github.v3+json'})
  23. self.repo_private = True if params.get('github_private', "true").lower() == 'true' else False
  24. def create_repo(self, repo_name: str):
  25. """create a repo"""
  26. url = f'{self._host}/user/repos'
  27. payload = {
  28. 'name': repo_name,
  29. 'private': self.repo_private,
  30. 'has_issues': True,
  31. 'has_projects': False,
  32. 'has_wiki': False,
  33. }
  34. r = self.sess.post(url, data=json.dumps(payload))
  35. if r.status_code != 201:
  36. print(
  37. 'create repo {} failed, status code {}'.format(repo_name, r.status_code)
  38. )
  39. return
  40. print('create repo {} success'.format(repo_name))
  41. def delete(self, repo_name: str):
  42. """delete a repo, maybe request a confirm by input"""
  43. # print("delete repo:"+repo_name)
  44. url = f'{self._host}/repos/{self.username}/{repo_name}'
  45. response = self.sess.delete(url)
  46. if response.status_code == 204:
  47. print(f'Repository:{repo_name} deleted from github successfully!')
  48. else:
  49. print(
  50. f'Failed to delete repository: {repo_name} from github. Error {response.status_code}: {response.text}'
  51. )
  52. print(f'delete repo:{repo_name} from github success')
  53. def repo_exists(self, repo_name: str):
  54. """check if a repo exists"""
  55. url = f'{self._host}/repos/{self.username}/{repo_name}'
  56. print('check repo:' + repo_name)
  57. try:
  58. response = self.sess.get(url)
  59. if response.status_code == 200:
  60. return True
  61. except Exception as e:
  62. return False
  63. def pull(self, local_repo_path: str):
  64. """pull a repo"""
  65. if local_repo_path[-1] == os.path.sep:
  66. local_repo_path = local_repo_path[:-1]
  67. repo_name = local_repo_path.split(os.path.sep)[-1]
  68. print(f'pull repo:{self.username}/{repo_name} from github')
  69. os.chdir(local_repo_path)
  70. os.system('git remote remove origin_github')
  71. os.system(
  72. f'git remote add origin_github https://{self.username}:{self.token}@github.com/{self.username}/{repo_name}.git')
  73. result = subprocess.run(
  74. ['git', 'symbolic-ref', '--short', 'HEAD'], capture_output=True, text=True)
  75. current_branch = result.stdout.strip()
  76. os.system(f'git pull origin_github {current_branch}')
  77. os.system('git remote remove origin_github')
  78. os.chdir('..')
  79. print('pull from github success')
  80. def clone(self, repo_name: str):
  81. pass
  82. def push(self, local_repo_path: str):
  83. """push a local repo to remote"""
  84. if local_repo_path[-1] == os.path.sep:
  85. local_repo_path = local_repo_path[:-1]
  86. repo_name = local_repo_path.split(os.path.sep)[-1]
  87. print(f'push repo:{self.username}/{repo_name} to github')
  88. if not self.repo_exists(repo_name):
  89. self.create_repo(repo_name)
  90. os.chdir(local_repo_path)
  91. os.system('git remote remove origin_github')
  92. os.system(
  93. f'git remote add origin_github https://{self.username}:{self.token}@github.com/{self.username}/{repo_name}.git'
  94. )
  95. result = subprocess.run(['git', 'symbolic-ref', '--short', 'HEAD'], capture_output=True, text=True)
  96. current_branch = result.stdout.strip()
  97. os.system(f'git pull origin_github {current_branch}')
  98. os.system(f'git push -u origin_github {current_branch}')
  99. os.system('git remote remove origin_github')
  100. os.chdir('..')
  101. def get_repo_list(self) -> list:
  102. """get all repo list of a user"""
  103. if os.path.exists(self.repo_list_path):
  104. print(
  105. 'repo is exist, please read from {} file'.format(self.repo_list_path)
  106. )
  107. with open(self.repo_list_path, 'r', newline='') as csvfile:
  108. reader = csv.reader(csvfile)
  109. for row in reader:
  110. repo = Repo()
  111. repo.__dict__ = row
  112. self.repos.append(repo)
  113. else:
  114. page_num = 1
  115. url = '{}/users/{}/repos'.format(self._host, self.username)
  116. while True:
  117. r = self.sess.get(url, params={'type': 'all', 'page': page_num})
  118. if r.status_code != 200:
  119. print(
  120. 'request url {} failed, status code {}'.format(
  121. url, r.status_code
  122. )
  123. )
  124. return
  125. # rate limit
  126. repo_list = r.json()
  127. for repo in repo_list:
  128. repo_obj = Repo()
  129. repo_obj.name = repo.get('name')
  130. repo_obj.url = repo.get('html_url')
  131. repo_obj.description = repo.get('description')
  132. repo_obj.language = repo.get('language')
  133. repo_obj.star = repo.get('stargazers_count')
  134. repo_obj.fork = repo.get('forks_count')
  135. repo_obj.watch = repo.get('watchers_count')
  136. self.repos.append(repo_obj)
  137. self.repos.sort(key=lambda x: x.star, reverse=True)
  138. # Link: <https://api.github.com/user/123456/repos?page=2>; rel="next", <https://api.github.com/user/123456/repos?page=3>; rel="last"
  139. links = r.headers.get('Link')
  140. if not links or 'rel="next"' not in links:
  141. break
  142. next_url = None
  143. for link in links.split(','):
  144. if 'rel="next"' in link:
  145. next_url = link.split(';')[0].strip('<>')
  146. break
  147. # Break loop if next URL is not valid
  148. if not next_url:
  149. break
  150. # Increment page number for next iteration
  151. page_num += 1
  152. self.save_csv()
  153. return self.repos
  154. def _clone_all_repo(self):
  155. """cloen all repo"""
  156. for repo in self.repos:
  157. os.system('git clone {}'.format(repo.url))
  158. def clone_user_repos(self):
  159. """clone all repo of a user"""
  160. # if github.csv exist, read it
  161. if os.path.exists(self.repo_list_path):
  162. with open(self.repo_list_path, 'r', newline='') as csvfile:
  163. reader = csv.reader(csvfile)
  164. for row in reader:
  165. if row[0] == 'name':
  166. continue
  167. repo = Repo()
  168. repo.__dict__ = row
  169. self.repos.append(repo)
  170. else:
  171. self.get_repo_list(self.username)
  172. self._clone_all_repo()
  173. @classmethod
  174. def suitable(cls, extractor: str) -> bool:
  175. """check if this extractor is suitable for this platform"""
  176. if extractor == 'github':
  177. return True
  178. else:
  179. return False