Browse Source

console add color

liuyuqi-dellpc 8 months ago
parent
commit
0efd281945

+ 3 - 1
README.md

@@ -29,9 +29,11 @@ python main.py create --platform gitlab --repo_path F:\workspace\python\repo_syn
 python main.py clone --platform coding --repo_path F:\workspace\python\repo_sync
 ```
 
+
 ## License
 
-Apache License 2.0
+Licensed under the [Apache 2.0](LICENSE) © [liuyuqi.gov@msn.cn](https://github.com/jianboy)
+
 
 ## Reference
 

+ 1 - 1
main.py

@@ -9,4 +9,4 @@
 from repo_sync import main
 
 if __name__=='__main__':
-    main()
+    main()

+ 180 - 0
repo_sync/platform/aliyun.py

@@ -0,0 +1,180 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+'''
+@Contact :   liuyuqi.gov@msn.cn
+@Time    :   2024/07/31 01:47:38
+@License :   Copyright © 2017-2022 liuyuqi. All Rights Reserved.
+@Desc    :   aliyun devops
+read docs:
+https://help.aliyun.com/document_detail/460450.html?spm=a2c4g.460449.0.0.4cc62367VCclNI
+'''
+
+from repo_sync.platform.base_platform import BasePlatform
+import csv,subprocess
+import os
+from repo_sync.repo import Repo
+from repo_sync.utils.colors import bcolors
+
+
+class AliyunDevOps(BasePlatform):
+    """aliyun devops"""
+
+    _host = 'https://devops.cn-hangzhou.aliyuncs.com'
+
+    def __init__(self, username:str, token:str,host:str =None, params: dict = None) -> None:
+        super().__init__(username=username, token=token)
+        self.sess.headers.update({'Content-Type': 'multipart/form-data'})
+        self.repo_private = True if params.get('aliyun_private', "true").lower()  == 'true' else False
+        self.companyId = 1411978
+        self.groupId = 1411978
+
+    def create_project(self, project_name: str):
+        """create a project
+        这里概念是:代码组
+        """        
+        url = f'{self._api}/repository/groups/create'
+        form_data = {
+            "accessToken":"",
+            'name': project_name, 
+            'path': project_name,
+            'visibilityLevel': self.repo_private==True ? 10 : 0, 
+            'parentId': 1411978,
+            'private': self.repo_private,
+        }
+        r = self.sess.post(url, params=form_data)
+        if r.status_code != 200:
+            print(f'{bcolors.FAIL}create project {project_name} failed, status code {r.status_code}{bcolors.ENDC}')
+            return
+        else:
+            print(r.json())
+            print(f'{bcolors.OKGREEN}create project {project_name} success{bcolors.ENDC}')
+
+    def get_project_info(self, project_name: str):
+        """get project info"""
+        url = f'{self._api}/api/4/groups/find_by_path'
+        form_data = {
+            "organizationId": self.companyId,
+            "identity": "%s/%s" % (self.companyId, project_name)
+        }
+        r = self.sess.get(url, params=form_data)
+        if r.status_code != 200:
+            print(f"{bcolors.FAIL}get project info failed{bcolors.ENDC}")
+            return
+        else:
+            print(r.json())
+            return r.json()
+
+    def delete_project(self, project_name: str):
+        """delete a project"""
+        # get group id
+        url = f'{self._api}/api/4/groups/find_by_path'
+        form_data = {
+            "organizationId": self.companyId,
+            "identity": "%s/%s" % (self.companyId, project_name)
+        }
+        r = self.sess.get(url, params=form_data)
+        if r.status_code != 200:
+            print(f"{bcolors.FAIL}get group id failed{bcolors.ENDC}")
+            return
+        else:
+            # delete the project
+            group = r.json()
+            groupId = group['result']['id']
+            url = f'{self._api}/repository/groups/{groupId}/remove'
+            form_data = {
+                "accessToken":"",
+                "organizationId": self.companyId,
+                "groupId": groupId,
+                "reason":"not need"
+                }
+            response = self.sess.post(url)
+            if response.status_code == 204:
+                print(f'{bcolors.OKGREEN}Project: {project_name} deleted from aliyun successfully!{bcolors.ENDC}')
+            else:
+                print(f'{bcolors.FAIL}Failed to delete project: {project_name} from aliyun. Error {response.status_code}: {response.text}{bcolors.ENDC}')
+    def get_project_list(self) -> list:
+        pass
+    
+    def get_repo_list(self) -> list:
+        """ get repo list"""
+        url = f'{self._api}/repository/list'
+        form_data = {
+            "organizationId": self.companyId,
+            "page": 1,
+            "pageSize": 100,
+            "orderBy":"last_activity_at",
+            # "search":"",
+            "archived":"true",
+        }
+        r = self.sess.get(url, params=form_data)
+        if r.status_code != 200:
+            print(f"{bcolors.FAIL}get repo list failed, status code {r.status_code}{bcolors.ENDC}")
+            return
+        repo_list = r.json()
+        return repo_list
+    
+    def _get_repo_info(self, repo_name: str):
+        """get repo info"""
+        url = f'{self._api}/repository/get'
+        form_data = {
+            "accessToken":"",
+            "organizationId": self.companyId,
+            "name": repo_name
+        }
+        r = self.sess.get(url, params=form_data)
+        if r.status_code != 200:
+            print(f"{bcolors.FAIL}get repo info failed, status code {r.status_code}{bcolors.ENDC}")
+            return
+        return r.json()
+
+
+    def create_repo(self, repo_name: str):
+        """create a repo"""        
+        url = f'{self._api}/repository/create'
+        form_data = {
+        "accessToken":"",
+        'name': repo_name, 
+        'private': self.repo_private,
+        }
+        r = self.sess.post(url, params=form_data)
+        if r.status_code != 201:
+            print(f"{bcolors.FAIL}create repo {repo_name} failed, status code {r.status_code}{bcolors.ENDC}")
+            return
+
+    def delete(self, repo_name: str):
+            """delete a project"""
+            url = f'{self._api}/project/{repo_name}'
+
+            response = self.sess.delete(url)
+            if response.status_code == 204:
+                print(f"{bcolors.OKGREEN}Project: {repo_name} deleted from aliyun successfully!{bcolors.ENDC}")
+            else:
+                print(f'{bcolors.FAIL}Failed to delete project: {repo_name} from aliyun. Error {response.status_code}: {response.text}{bcolors.ENDC}')
+    def pull(self, local_repo_path: str):
+        if local_repo_path[-1] == os.path.sep:
+            local_repo_path = local_repo_path[:-1]
+        repo_name = local_repo_path.split(os.path.sep)[-1]
+        print(f"{bcolors.OKGREEN}pull repo:{self.username}/{repo_name} from aliyun{bcolors.ENDC}")
+        os.chdir(local_repo_path)
+        result = subprocess.run(['git', 'pull', 'https://codeup.aliyun.com/api/v1/repository/get?accessToken=&organizationId=1411978&name='+repo_name])
+    
+    def push(self, local_repo_path: str):
+        """ push local repo to aliyun"""
+        if local_repo_path[-1] == os.path.sep:
+            local_repo_path = local_repo_path[:-1]
+        repo_name = local_repo_path.split(os.path.sep)[-1]
+        print(f"{bcolors.OKGREEN}push repo:{self.username}/{repo_name} to aliyun{bcolors.ENDC}")
+        os.chdir(local_repo_path)
+        result = subprocess.run(['git', 'push', 'https://codeup.aliyun.com/api/v1/repository/create?accessToken=&organizationId=1411978&name='+repo_name])
+            
+    def clone(self):
+        """ clone repo from aliyun"""
+        pass
+
+    @classmethod
+    def suitable(cls, extractor: str) -> bool:
+        """check if this extractor is suitable for this platform"""
+        if extractor == 'aliyun_devops':
+            return True
+        else:
+            return False

+ 2 - 1
repo_sync/platform/base_platform.py

@@ -1,5 +1,6 @@
 import requests,csv,os
 from repo_sync.repo import Repo
+from repo_sync.utils.colors import bcolors
 
 class BasePlatform(object):
     """base platform"""
@@ -56,7 +57,7 @@ class BasePlatform(object):
     def save_csv(self):
         with open(self.repo_list_path, 'w', newline='') as f:
             if len(self.repos) == 0:
-                print('repo list is empty, please delete repo_list.csv and try again')
+                print(f"{bcolors.WARNING}repo list is empty, please delete repo_list.csv and try again{bcolors.ENDC}")
                 return
             writer = csv.DictWriter(f, fieldnames=self.repos[0].__dict__.keys(), lineterminator='\n')
             writer.writeheader()

+ 71 - 65
repo_sync/platform/coding/coding.py

@@ -13,22 +13,23 @@ import os,subprocess,sys
 from repo_sync.platform.base_platform import BasePlatform
 from .project import Project
 from .repo import Repo
+from repo_sync.utils.colors import bcolors
 
 class CodingIE(BasePlatform):
     """coding util"""
     client_id = ''
     client_serect = ''
     _host = 'https://e.coding.net'  # 新版接口统一地址
-
-    def __init__(self, username:str, token:str, host:str =None , params: dict = None) -> None:
+    
+    def __init__(self, username: str, token: str, host: str = None, params: dict = None) -> None:
         """init"""
-        super().__init__(username , token)
+        super().__init__(username, token)
         self.project_name = params.get('coding_project', '')
-        self.repo_shared = False if params.get('coding_private', "true").lower()  == 'true' else True
+        self.repo_shared = False if params.get('coding_private', "true").lower() == 'true' else True
         self.url = f'{self._host}/open-api'
-
+    
     def create_project(self):
-        ''' createt a project '''
+        ''' create a project '''
         data = {
             'Action': 'CreateCodingProject',
             'Name': '',
@@ -43,10 +44,12 @@ class CodingIE(BasePlatform):
         r = self.sess.post(self.url, json=data)
         if r.status_code == 200:
             res_data = r.json()
+            print(bcolors.OKGREEN + 'Create project success' + bcolors.ENDC)
             return True
         else:
+            print(bcolors.FAIL + 'Failed to create project' + bcolors.ENDC)
             return False
-
+    
     def delete_project(self):
         data = {
             'Action': 'DeleteOneProject',
@@ -55,15 +58,17 @@ class CodingIE(BasePlatform):
         r = self.sess.post(self.url, json=data)
         if r.status_code == 200:
             res_data = r.json()
+            print(bcolors.OKGREEN + 'Delete project success' + bcolors.ENDC)
             return True
         else:
+            print(bcolors.FAIL + 'Failed to delete project' + bcolors.ENDC)
             return False
-
+    
     def get_project_list(self):
         pass
-
+    
     def get_repo_list(self, username: str = None) -> list:
-        """ get repo list  from a project
+        """ get repo list from a project
             Args: username: the target username may not self.username
             return: repo list
         """
@@ -74,7 +79,6 @@ class CodingIE(BasePlatform):
             'PageSize': 50
         }
         r = self.sess.post(self.url, json=data)
-    
         if r.status_code == 200:
             res_data = r.json()
             try:
@@ -84,20 +88,21 @@ class CodingIE(BasePlatform):
                     DepotList = []
                     # the first page
                     DepotList = res_data['Response']["DepotData"]["Depots"]
-                    repo_model = Repo(
-                        Id=DepotList[0]['Id'],
-                        Name=DepotList[0]['Name'],
-                        HttpsUrl=DepotList[0]['HttpsUrl'],
-                        ProjectId=DepotList[0]['ProjectId'],
-                        SshUrl=DepotList[0]['SshUrl'],
-                        WebUrl=DepotList[0]['WebUrl'],
-                        ProjectName=DepotList[0]['ProjectName'],
-                        Description=DepotList[0]['Description'],
-                        CreatedAt=DepotList[0]['CreatedAt'],
-                        GroupId=DepotList[0]['GroupId'],
-                        GroupName=DepotList[0]['GroupName']
-                    )
-                    DepotList.append(repo_model)
+                    for repo in DepotList:
+                        repo_model = Repo(
+                            Id=repo['Id'],
+                            Name=repo['Name'],
+                            HttpsUrl=repo['HttpsUrl'],
+                            ProjectId=repo['ProjectId'],
+                            SshUrl=repo['SshUrl'],
+                            WebUrl=repo['WebUrl'],
+                            ProjectName=repo['ProjectName'],
+                            Description=repo['Description'],
+                            CreatedAt=repo['CreatedAt'],
+                            GroupId=repo['GroupId'],
+                            GroupName=repo['GroupName']
+                        )
+                        DepotList.append(repo_model)
                     
                     currentPage += 1
                     # the other pages
@@ -110,10 +115,9 @@ class CodingIE(BasePlatform):
                         }
                         r = self.sess.post(self.url, json=data)
                         res_data = r.json()
-
                         DepotList = res_data['Response']["DepotData"]["Depots"]
                         for repo in DepotList:
-                            repo_model= Repo(
+                            repo_model = Repo(
                                 Id=repo['Id'],
                                 Name=repo['Name'],
                                 HttpsUrl=repo['HttpsUrl'],
@@ -130,9 +134,9 @@ class CodingIE(BasePlatform):
                         currentPage += 1
                     return DepotList
                 else:
-                    print(f'can not find repo in project {self.project_name}')
+                    print(bcolors.WARNING + f'Cannot find repo in project {self.project_name}' + bcolors.ENDC)
             except Exception as e:
-                raise Exception(e)
+                print(bcolors.FAIL + str(e) + bcolors.ENDC)
         
     def _get_repo_info(self, repo_name: str):
         """get repo list"""
@@ -164,25 +168,25 @@ class CodingIE(BasePlatform):
                     )
                     return depot
                 else:
-                    print(f'can not find repo {repo_name} in project {self.project_name}')
+                    print(bcolors.WARNING + f'Cannot find repo {repo_name} in project {self.project_name}' + bcolors.ENDC)
             except Exception as e:
-                raise Exception(f'can not find repo {repo_name} in project {self.project_name}')
-
-    def get_project_info(self)->Project:
+                print(bcolors.FAIL + f'Cannot find repo {repo_name} in project {self.project_name}: {str(e)}' + bcolors.ENDC)
+    
+    def get_project_info(self) -> Project:
         data = {
             "Action": "DescribeCodingProjects",
             "ProjectName": self.project_name,
             "DepotName": "",
             "PageNumber": 1,
             "PageSize": 50
-            }
+        }
         r = self.sess.post(self.url, json=data)
         if r.status_code == 200:
             res_data = r.json()
             try:
                 if res_data['Response']["Data"]["TotalCount"] > 0:
                     ProjectList = res_data['Response']["Data"]["ProjectList"]
-                    projet = Project(
+                    project = Project(
                         Id=ProjectList[0]['Id'],
                         Name=ProjectList[0]['Name'],
                         DisplayName=ProjectList[0]['DisplayName'],
@@ -190,11 +194,11 @@ class CodingIE(BasePlatform):
                         TeamOwnerId=ProjectList[0]['TeamOwnerId'],
                         TeamId=ProjectList[0]['TeamId']
                     )
-                    return projet
+                    return project
             except Exception as e:
-                print(res_data)
-                print(e)
-
+                print(bcolors.FAIL + 'Error retrieving project info: ' + str(e) + bcolors.ENDC)
+                print(bcolors.FAIL + str(res_data) + bcolors.ENDC)
+    
     def create_repo(self, repo_name: str):
         """create a repo"""
         # get project id
@@ -203,42 +207,43 @@ class CodingIE(BasePlatform):
             repo = self._get_repo_info(repo_name=repo_name)
             if repo is None:
                 data = {
-                        "Action": "CreateGitDepot",
-                        "ProjectId": project.Id, 
-                        "DepotName": repo_name,
-                        "Shared": self.repo_shared,
-                        "Description": "this is your first depot"
-                    }
+                    "Action": "CreateGitDepot",
+                    "ProjectId": project.Id, 
+                    "DepotName": repo_name,
+                    "Shared": self.repo_shared,
+                    "Description": "this is your first depot"
+                }
                 r = self.sess.post(self.url, json=data)
                 if r.status_code == 200:
-                    print(f'create repo {repo_name} success', data,r.json())
+                    print(bcolors.OKGREEN + f'Create repo {repo_name} success' + bcolors.ENDC)
                     return True
                 else:
+                    print(bcolors.FAIL + 'Failed to create repo' + bcolors.ENDC)
                     return False
             else:
-                print("repo: %s is exist" % repo_name)
+                print(bcolors.WARNING + f"Repo: {repo_name} already exists" + bcolors.ENDC)
         else:
-            print("project: %s is not exist, cannot create repo in it." % self.project_name)
-
+            print(bcolors.FAIL + f"Project: {self.project_name} does not exist, cannot create repo in it." + bcolors.ENDC)
+    
     def delete(self, repo_name: str):
-        """delete a repo"""
-        repo = self._get_repo_info(repo_name=repo_name)
-        if repo is not None:
-            data = {
-                "Action": "DeleteGitDepot",
-                "DepotId": repo.Id
-                }
-            r = self.sess.post(self.url, json=data)
-            if r.status_code == 200:
-                print(f'delete repo {repo_name} success', data,r.json())
-                return True
-            else:
-                return False
+            """delete a repo"""
+            repo = self._get_repo_info(repo_name=repo_name)
+            if repo is not None:
+                data = {
+                    "Action": "DeleteGitDepot",
+                    "DepotId": repo.Id
+                    }
+                r = self.sess.post(self.url, json=data)
+                if r.status_code == 200:
+                    print(f'delete repo {repo_name} success', data,r.json())
+                    return True
+                else:
+                    return False
 
     def pull(self, local_repo_path: str):
         ''' pull a repo from remote
             Args: local_repo_path: local repo path
-         '''
+            '''
         if local_repo_path[-1] == os.path.sep:
             local_repo_path = local_repo_path[:-1]
         repo_name = local_repo_path.split(os.path.sep)[-1]
@@ -262,7 +267,7 @@ class CodingIE(BasePlatform):
     def push(self, local_repo_path: str):
         ''' push a local repo to remote
             Args: local_repo_path: local repo path
-         '''
+            '''
         # check if remote repo is exist
 
         if local_repo_path[-1] == os.path.sep:
@@ -288,7 +293,7 @@ class CodingIE(BasePlatform):
     def clone(self, repo_path: str):
         ''' clone all repo from remote
             Args: repo_name: repo name
-         '''
+            '''
         repos = self.get_repo_list()
         for repo in repos:
             try:
@@ -306,3 +311,4 @@ class CodingIE(BasePlatform):
             return True
         else:
             return False
+

+ 35 - 44
repo_sync/platform/gitee.py

@@ -11,49 +11,42 @@ from .base_platform import BasePlatform
 import csv, subprocess
 import os
 from repo_sync.repo import Repo
+from repo_sync.utils.colors import bcolors
 
 class GiteeIE(BasePlatform):
     """gitee async"""
-
     _host = 'https://gitee.com'
     _api = _host + '/api/v5'
-
-    def __init__(self, username:str, token:str,host:str =None, params: dict = None) -> None:
+    
+    def __init__(self, username:str, token:str, host:str=None, params: dict=None) -> None:
         super().__init__(username=username, token=token)
         self.sess.headers.update({'Content-Type': 'multipart/form-data'})
-        self.repo_private = True if params.get('gitee_private', "true").lower()  == 'true' else False
-
+        self.repo_private = True if params.get('gitee_private', "true").lower() == 'true' else False
+    
     def create_repo(self, repo_name: str):
         """create a repo"""        
         url = f'{self._api}/user/repos'
         form_data = {
-        'name': repo_name, 
-        'private': self.repo_private,
+            'name': repo_name, 
+            'private': self.repo_private,
         }
         r = self.sess.post(url, params=form_data)
         if r.status_code != 201:
-            print(
-                'create repo {} failed, status code {}'.format(repo_name, r.status_code)
-            )
+            print(bcolors.FAIL + f'create repo {repo_name} failed, status code {r.status_code}' + bcolors.ENDC)
             return
-        print('create repo {} success'.format(repo_name))
-
+        print(bcolors.OKGREEN + f'create repo {repo_name} success' + bcolors.ENDC)
+    
     def delete(self, repo_name: str):
         """delete a repo"""
-        # print("delete repo:"+repo_name)
         url = f'{self._api}/repos/{self.username}/{repo_name}'
-
         response = self.sess.delete(url)
         if response.status_code == 204:
-            print(f'Repository: {repo_name} deleted from gitee successfully!')
+            print(bcolors.OKBLUE + f'Repository: {repo_name} deleted from gitee successfully!' + bcolors.ENDC)
         else:
-            print(
-                f'Failed to delete repository: {repo_name} from gitee. Error {response.status_code}: {response.text}'
-            )
-
+            print(bcolors.FAIL + f'Failed to delete repository: {repo_name} from gitee. Error {response.status_code}: {response.text}' + bcolors.ENDC)
+    
     def get_repo_list(self) -> list:
         """get repo list"""
-
         if os.path.exists(self.repo_list_path):
             with open(self.repo_list_path, 'r', encoding='utf8') as f:
                 reader = csv.reader(f)
@@ -62,16 +55,17 @@ class GiteeIE(BasePlatform):
                     repo.__dict__ = row
                     self.repos.append(repo)
             return self.repos
-
+        
         url = f'{self._api}/user/repos'
         r = self.sess.get(url)
         if r.status_code != 200:
-            print('get repo list failed, status code {}'.format(r.status_code))
+            print(bcolors.FAIL + f'get repo list failed, status code {r.status_code}' + bcolors.ENDC)
             return
+        
         repo_list = r.json()
         self.save_csv()
         return repo_list
-
+    
     def clone(self):
         pass
     
@@ -79,44 +73,41 @@ class GiteeIE(BasePlatform):
         if local_repo_path[-1] == os.path.sep:
             local_repo_path = local_repo_path[:-1]
         repo_name = local_repo_path.split(os.path.sep)[-1]
-        print(f'pull repo:{self.username}/{repo_name} from gitee')
+        print(bcolors.WARNING + f'pull repo:{self.username}/{repo_name} from gitee' + bcolors.ENDC)
+        
         os.chdir(local_repo_path)
         os.system('git remote remove origin_gitee')
-        os.system(
-            f'git remote add origin_gitee https://{self.username}:{self.token}@gitee.com/{self.username}/{repo_name}.git'
-        )
-        result = subprocess.run(
-            ['git', 'symbolic-ref', '--short', 'HEAD'], capture_output=True, text=True, encoding='utf-8')
+        os.system(f'git remote add origin_gitee https://{self.username}:{self.token}@gitee.com/{self.username}/{repo_name}.git')
+        
+        result = subprocess.run(['git', 'symbolic-ref', '--short', 'HEAD'], capture_output=True, text=True, encoding='utf-8')
         current_branch = result.stdout.strip()
         os.system(f'git pull origin_gitee {current_branch}')
         os.system('git remote remove origin_gitee')
         os.chdir('..')
-        print('pull from gitee success')
-                 
+        
+        print(bcolors.OKGREEN + 'pull from gitee success' + bcolors.ENDC)
+    
     def push(self, local_repo_path: str):
         if local_repo_path[-1] == os.path.sep:
             local_repo_path = local_repo_path[:-1]
         repo_name = local_repo_path.split(os.path.sep)[-1]
-        print(f'push repo:{self.username}/{repo_name} to gitee')
+        print(bcolors.WARNING + f'push repo:{self.username}/{repo_name} to gitee' + bcolors.ENDC)
+        
         os.chdir(local_repo_path)
         os.system('git remote remove origin_gitee')
-        os.system(
-            f'git remote add origin_gitee https://{self.username}:{self.token}@gitee.com/{self.username}/{repo_name}.git'
-        )
-        result = subprocess.run(
-            ['git', 'symbolic-ref', '--short', 'HEAD'], capture_output=True, text=True, encoding='utf-8')
+        os.system(f'git remote add origin_gitee https://{self.username}:{self.token}@gitee.com/{self.username}/{repo_name}.git')
+        
+        result = subprocess.run(['git', 'symbolic-ref', '--short', 'HEAD'], capture_output=True, text=True, encoding='utf-8')
         current_branch = result.stdout.strip()
-        os.system(f'git pull origin_gitee {current_branch}')
         
+        os.system(f'git pull origin_gitee {current_branch}')
         os.system(f'git push -u origin_gitee {current_branch}')
         os.system('git remote remove origin_gitee')
         os.chdir('..')
-        print('push to gitee success')
-
+        
+        print(bcolors.OKGREEN + 'push to gitee success' + bcolors.ENDC)
+    
     @classmethod
     def suitable(cls, extractor: str) -> bool:
         """check if this extractor is suitable for this platform"""
-        if extractor == 'gitee':
-            return True
-        else:
-            return False
+        return extractor == 'gitee'

+ 26 - 51
repo_sync/platform/github.py

@@ -11,19 +11,18 @@ import json
 import csv, subprocess
 from repo_sync.repo import Repo
 from .base_platform import BasePlatform
+from repo_sync.utils.colors import bcolors
 
 class GithubIE(BasePlatform):
     """github util"""
-
     _host = 'https://api.github.com'
-
-    def __init__(self, username:str, token:str,host:str =None ,params: dict = None) -> None:
+    
+    def __init__(self, username: str, token: str, host: str = None, params: dict = None) -> None:
         """init"""
         super().__init__(username=username, token=token)
-        # 60 unauthenticated requests per hour
         if self.token:
             self.sess.headers.update({'Accept': 'application/vnd.github.v3+json'})
-        self.repo_private = True if params.get('github_private', "true").lower()  == 'true' else False
+        self.repo_private = True if params.get('github_private', "true").lower() == 'true' else False
 
     def create_repo(self, repo_name: str):
         """create a repo"""
@@ -37,30 +36,24 @@ class GithubIE(BasePlatform):
         }
         r = self.sess.post(url, data=json.dumps(payload))
         if r.status_code != 201:
-            print(
-                'create repo {} failed, status code {}'.format(repo_name, r.status_code)
-            )
+            print(f'{bcolors.FAIL}create repo {repo_name} failed, status code {r.status_code}{bcolors.ENDC}')
             return
-        print('create repo {} success'.format(repo_name))
+        print(f'{bcolors.OKGREEN}create repo {repo_name} success{bcolors.ENDC}')
 
     def delete(self, repo_name: str):
         """delete a repo, maybe request a confirm by input"""
-        # print("delete repo:"+repo_name)
         url = f'{self._host}/repos/{self.username}/{repo_name}'
-
         response = self.sess.delete(url)
         if response.status_code == 204:
-            print(f'Repository:{repo_name} deleted from github successfully!')
+            print(f'{bcolors.OKGREEN}Repository: {repo_name} deleted from github successfully!{bcolors.ENDC}')
         else:
-            print(
-                f'Failed to delete repository: {repo_name} from github. Error {response.status_code}: {response.text}'
-            )
-        print(f'delete repo:{repo_name} from github success')
+            print(f'{bcolors.FAIL}Failed to delete repository: {repo_name} from github. Error {response.status_code}: {response.text}{bcolors.ENDC}')
+        print(f'{bcolors.WARNING}delete repo: {repo_name} from github success{bcolors.ENDC}')
 
     def repo_exists(self, repo_name: str):
         """check if a repo exists"""
         url = f'{self._host}/repos/{self.username}/{repo_name}'
-        print('check repo:' + repo_name)
+        print(f'{bcolors.INFO}check repo: {repo_name}{bcolors.ENDC}')
         try:
             response = self.sess.get(url)
             if response.status_code == 200:
@@ -73,52 +66,50 @@ class GithubIE(BasePlatform):
         if local_repo_path[-1] == os.path.sep:
             local_repo_path = local_repo_path[:-1]
         repo_name = local_repo_path.split(os.path.sep)[-1]
-        print(f'pull repo:{self.username}/{repo_name} from github')
+        print(f'{bcolors.INFO}pull repo: {self.username}/{repo_name} from github{bcolors.ENDC}')
         
         os.chdir(local_repo_path)
         os.system('git remote remove origin_github')
         os.system(
-            f'git remote add origin_github  https://{self.username}:{self.token}@github.com/{self.username}/{repo_name}.git')
+            f'git remote add origin_github https://{self.username}:{self.token}@github.com/{self.username}/{repo_name}.git'
+        )
         result = subprocess.run(
-            ['git', 'symbolic-ref', '--short', 'HEAD'], capture_output=True, text=True, encoding='utf-8')
+            ['git', 'symbolic-ref', '--short', 'HEAD'], capture_output=True, text=True, encoding='utf-8'
+        )
         current_branch = result.stdout.strip()
         os.system(f'git pull origin_github {current_branch}')
         os.system('git remote remove origin_github')
         os.chdir('..')
-        print('pull from github success')
+        print(f'{bcolors.SUCCESS}pull from github success{bcolors.ENDC}')
 
-    def clone(self, repo_name: str):
-        pass
-    
     def push(self, local_repo_path: str):
         """push a local repo to remote"""
         if local_repo_path[-1] == os.path.sep:
             local_repo_path = local_repo_path[:-1]
         repo_name = local_repo_path.split(os.path.sep)[-1]
-        print(f'push repo:{self.username}/{repo_name} to github')
+        print(f'{bcolors.INFO}push repo: {self.username}/{repo_name} to github{bcolors.ENDC}')
         if not self.repo_exists(repo_name):
             self.create_repo(repo_name)
         os.chdir(local_repo_path)
-
         os.system('git remote remove origin_github')
         os.system(
             f'git remote add origin_github https://{self.username}:{self.token}@github.com/{self.username}/{repo_name}.git'
         )
         result = subprocess.run(
-            ['git', 'symbolic-ref', '--short', 'HEAD'], capture_output=True, text=True, encoding='utf-8')
+            ['git', 'symbolic-ref', '--short', 'HEAD'], capture_output=True, text=True, encoding='utf-8'
+        )
         current_branch = result.stdout.strip()
         os.system(f'git pull origin_github {current_branch}')
         
         os.system(f'git push -u origin_github {current_branch}')
         os.system('git remote remove origin_github')
         os.chdir('..')
+        print(f'{bcolors.SUCCESS}push to github success{bcolors.ENDC}')
 
     def get_repo_list(self) -> list:
         """get all repo list of a user"""
         if os.path.exists(self.repo_list_path):
-            print(
-                'repo is exist, please read from {} file'.format(self.repo_list_path)
-            )
+            print(f'{bcolors.INFO}repo is exist, please read from {self.repo_list_path} file{bcolors.ENDC}')
             with open(self.repo_list_path, 'r', newline='') as csvfile:
                 reader = csv.reader(csvfile)
                 for row in reader:
@@ -131,13 +122,8 @@ class GithubIE(BasePlatform):
             while True:
                 r = self.sess.get(url, params={'type': 'all', 'page': page_num})
                 if r.status_code != 200:
-                    print(
-                        'request url {} failed, status code {}'.format(
-                            url, r.status_code
-                        )
-                    )
+                    print(f'{bcolors.FAIL}request url {url} failed, status code {r.status_code}{bcolors.ENDC}')
                     return
-                # rate limit
                 repo_list = r.json()
                 for repo in repo_list:
                     repo_obj = Repo()
@@ -150,35 +136,27 @@ class GithubIE(BasePlatform):
                     repo_obj.watch = repo.get('watchers_count')
                     self.repos.append(repo_obj)
                 self.repos.sort(key=lambda x: x.star, reverse=True)
-                # Link: <https://api.github.com/user/123456/repos?page=2>; rel="next", <https://api.github.com/user/123456/repos?page=3>; rel="last"
                 links = r.headers.get('Link')
-
                 if not links or 'rel="next"' not in links:
                     break
-
                 next_url = None
                 for link in links.split(','):
                     if 'rel="next"' in link:
                         next_url = link.split(';')[0].strip('<>')
                         break
-
-                # Break loop if next URL is not valid
                 if not next_url:
                     break
-
-                # Increment page number for next iteration
                 page_num += 1
             self.save_csv()
         return self.repos
 
     def _clone_all_repo(self):
-        """cloen all repo"""
+        """clone all repo"""
         for repo in self.repos:
-            os.system('git clone {}'.format(repo.url))
+            os.system(f'git clone {repo.url}')
 
     def clone_user_repos(self):
         """clone all repo of a user"""
-        # if github.csv exist, read it
         if os.path.exists(self.repo_list_path):
             with open(self.repo_list_path, 'r', newline='') as csvfile:
                 reader = csv.reader(csvfile)
@@ -189,13 +167,10 @@ class GithubIE(BasePlatform):
                     repo.__dict__ = row
                     self.repos.append(repo)
         else:
-            self.get_repo_list(self.username)
+            self.get_repo_list()
         self._clone_all_repo()
 
     @classmethod
     def suitable(cls, extractor: str) -> bool:
         """check if this extractor is suitable for this platform"""
-        if extractor == 'github':
-            return True
-        else:
-            return False
+        return extractor == 'github'

+ 30 - 38
repo_sync/platform/gitlab.py

@@ -7,23 +7,25 @@
 @Desc    :   gitlab async
 """
 import os
-import json,re 
-import csv, subprocess
+import json
+import re
+import csv
+import subprocess
 from .base_platform import BasePlatform
 from repo_sync.repo import Repo
+from repo_sync.utils.colors import bcolors
 
 class GitlabIE(BasePlatform):
     """gitlab async"""
-
-    def __init__(self, username:str, token:str, host:str =None, params: dict = None) -> None:
+    def __init__(self, username: str, token: str, host: str = None, params: dict = None) -> None:
         super().__init__(username=username, token=token)
         self.host = host or 'https://gitlab.com'
         self.sess.headers.update({"Authorization": f"Bearer {self.token}"})
-        self.repo_private = 'private' if params.get('gitlab_private', "true").lower()  == 'true' else 'public'
-
+        self.repo_private = 'private' if params.get('gitlab_private', "true").lower() == 'true' else 'public'
+    
     def create_repo(self, repo_name: str):
         """create a repo
-            and save project id to csv: gitlab_repo_list.csv
+        and save project id to csv: gitlab_repo_list.csv
         """
         url = f"{self.host}/api/v4/projects"
         payload = {
@@ -32,44 +34,42 @@ class GitlabIE(BasePlatform):
         }
         r = self.sess.post(url, data=json.dumps(payload))
         if r.status_code != 201:
-            print(
-                "create repo {} failed, status code {}".format(repo_name, r.status_code)
-            )
+            print(f"{bcolors.FAIL}create repo {repo_name} failed, status code {r.status_code}{bcolors.ENDC}")
             return
-        print("create repo {} success".format(repo_name))
+        print(f"{bcolors.OKGREEN}create repo {repo_name} success{bcolors.ENDC}")
         for repo in self.repos:
             if repo.name == repo_name:
                 repo.url = r.json()["web_url"]
                 repo.id = r.json()["id"]
                 break
         self.save_csv()
-        
+    
     def delete(self, repo_name: str):
         """delete a repo,
-            find project id from csv: gitlab_repo_list.csv
+        find project id from csv: gitlab_repo_list.csv
         """
-        project_id=""
+        project_id = ""
         r = self.sess.get(f"{self.host}/api/v4/users/{self.username}/projects?search={repo_name}")
         if r.status_code == 200:
             project_id = r.json()[0]["id"]
             url = f"{self.host}/api/v4/projects/{project_id}"
             response = self.sess.delete(url)
             if response.status_code == 202:
-                print(f"Repository: {repo_name} deleted from gitlab successfully!")
+                print(f"{bcolors.OKGREEN}Repository: {repo_name} deleted from gitlab successfully!{bcolors.ENDC}")
             else:
                 print(
-                    f"Failed to delete repository: {repo_name} from gitlab. Error {response.status_code}: {response.text}"
+                    f"{bcolors.FAIL}Failed to delete repository: {repo_name} from gitlab. Error {response.status_code}: {response.text}{bcolors.ENDC}"
                 )
         else:
-            print(f"Failed to delete repository: {repo_name} from gitlab. Error {r.status_code}: {r.text}")
-       
-    def get_repo_list(self, username: str)->list:
+            print(f"{bcolors.FAIL}Failed to delete repository: {repo_name} from gitlab. Error {r.status_code}: {r.text}{bcolors.ENDC}")
+    
+    def get_repo_list(self, username: str) -> list:
         """get repo list"""
         url = f"{self.host}/api/v4/users/{username}/projects"
         r = self.sess.get(url)
         if r.status_code != 200:
-            print("get repo list failed, status code {}".format(r.status_code))
-            return
+            print(f"{bcolors.FAIL}get repo list failed, status code {r.status_code}{bcolors.ENDC}")
+            return []
         repo_list = []
         for res in r.json():
             repo = Repo()
@@ -80,7 +80,7 @@ class GitlabIE(BasePlatform):
             repo_list.append(repo)
         self.save_csv()
         return repo_list
-
+    
     def pull(self, local_repo_path: str):
         """push a local repo to remote
         Args:
@@ -89,12 +89,10 @@ class GitlabIE(BasePlatform):
         if local_repo_path[-1] == os.path.sep:
             local_repo_path = local_repo_path[:-1]
         repo_name = local_repo_path.split(os.path.sep)[-1]
-        print(f"push repo:{self.username}/{repo_name} to gitlab")
+        print(f"{bcolors.OKGREEN}push repo:{self.username}/{repo_name} to gitlab{bcolors.ENDC}")
         self.create_repo(repo_name)
         pur_host = re.search(r'(?<=//)[^/]+', self.host).group()
-
         os.chdir(local_repo_path)
-
         os.system("git remote remove origin_gitlab")
         os.system(
             f"git remote add origin_gitlab https://{self.username}:{self.token}@{pur_host}/{self.username}/{repo_name}.git"
@@ -105,8 +103,8 @@ class GitlabIE(BasePlatform):
         os.system(f'git pull origin_gitlab {current_branch}')
         os.system("git remote remove origin_gitlab")
         os.chdir("..")
-        print(f"pull repo:{self.username}/{repo_name} from gitlab success")
-
+        print(f"{bcolors.OKGREEN}pull repo:{self.username}/{repo_name} from gitlab success{bcolors.ENDC}")
+    
     def push(self, local_repo_path: str):
         """push a local repo to remote
         Args:
@@ -115,12 +113,10 @@ class GitlabIE(BasePlatform):
         if local_repo_path[-1] == os.path.sep:
             local_repo_path = local_repo_path[:-1]
         repo_name = local_repo_path.split(os.path.sep)[-1]
-        print(f"push repo:{self.username}/{repo_name} to gitlab")
+        print(f"{bcolors.OKGREEN}push repo:{self.username}/{repo_name} to gitlab{bcolors.ENDC}")
         self.create_repo(repo_name)
         pur_host = re.search(r'(?<=//)[^/]+', self.host).group()
-
         os.chdir(local_repo_path)
-
         os.system("git remote remove origin_gitlab")
         os.system(
             f"git remote add origin_gitlab https://{self.username}:{self.token}@{pur_host}/{self.username}/{repo_name}.git"
@@ -129,19 +125,15 @@ class GitlabIE(BasePlatform):
             ['git', 'symbolic-ref', '--short', 'HEAD'], capture_output=True, text=True, encoding='utf-8')
         current_branch = result.stdout.strip()
         os.system(f'git pull origin_gitlab {current_branch}')
-        
         os.system(f"git push -u origin_gitlab {current_branch}")
         os.system("git remote remove origin_gitlab")
         os.chdir("..")
-        print(f"push repo:{self.username}/{repo_name} to gitlab success")
-        
+        print(f"{bcolors.OKGREEN}push repo:{self.username}/{repo_name} to gitlab success{bcolors.ENDC}")
+    
     def clone(self):
         pass
-
+    
     @classmethod
     def suitable(cls, extractor: str) -> bool:
         """check if this extractor is suitable for this platform"""
-        if extractor == 'gitlab':
-            return True
-        else:
-            return False
+        return extractor == 'gitlab'

+ 17 - 30
repo_sync/platform/gogs.py

@@ -9,17 +9,15 @@
 from .base_platform import BasePlatform
 import csv, re, subprocess
 import json, os
-
+from repo_sync.utils.colors import bcolors
 
 class GogsIE(BasePlatform):
-    """ """
-
+    """ gogs plotform class """
     gityoqi_repo_list = 'gityoqi_repo_list.csv'
-
-    def __init__(self, username:str, token:str, host:str =None ,params: dict = None) -> None:
-        super().__init__(username=username,token=token)
+    def __init__(self, username: str, token: str, host: str = None, params: dict = None) -> None:
+        super().__init__(username=username, token=token)
         self._host = 'https://git.yoqi.me' if host is None else host
-        self.repo_private = True if params.get('gogs_private', "true").lower()  == 'true' else False
+        self.repo_private: bool = True if params.get('gogs_private', "true").lower() == 'true' else False
 
     def create_org_repo(self, org_name: str, repo_name: str):
         """create org repo"""
@@ -35,13 +33,9 @@ class GogsIE(BasePlatform):
         }
         r = self.sess.post(url, data=json.dumps(payload))
         if r.status_code != 201:
-            print(
-                'create org repo {} failed, status code {}'.format(
-                    repo_name, r.status_code
-                )
-            )
+            print(bcolors.FAIL + f'create org repo {repo_name} failed, status code {r.status_code}' + bcolors.ENDC)
             return
-        print('create org repo {} success'.format(repo_name))
+        print(bcolors.OKGREEN + f'create org repo {repo_name} success' + bcolors.ENDC)
 
     def create_repo(self, repo_name: str):
         """create a repo"""
@@ -57,43 +51,38 @@ class GogsIE(BasePlatform):
         }
         r = self.sess.post(url, data=json.dumps(payload))
         if r.status_code != 201:
-            print(
-                'create repo {} failed, status code {}'.format(repo_name, r.status_code)
-            )
+            print(bcolors.FAIL + f'create repo {repo_name} failed, status code {r.status_code}' + bcolors.ENDC)
             return
-        print('create repo {} success'.format(repo_name))
+        print(bcolors.OKGREEN + f'create repo {repo_name} success' + bcolors.ENDC)
 
     def delete(self, repo_name: str):
         """delete a repo, maybe request a confirm by input"""
         url = f'{self._host}/api/v1/repos/{self.username}/{repo_name}'
         r = self.sess.delete(url)
         if r.status_code != 204:
-            print(
-                'delete repo {} failed, status code {}'.format(repo_name, r.status_code)
-            )
+            print(bcolors.FAIL + f'delete repo {repo_name} failed, status code {r.status_code}' + bcolors.ENDC)
             return
-        print('delete repo {} success'.format(repo_name))
+        print(bcolors.OKGREEN + f'delete repo {repo_name} success' + bcolors.ENDC)
 
     def get_repo_list(self, repo_name: str):
         """get repo list"""
         url = f'{self._host}/api/v1/users/{self.username}/repos'
         r = self.sess.get(url)
         if r.status_code != 200:
-            print('get repo list failed, status code {}'.format(r.status_code))
+            print(bcolors.FAIL + f'get repo list failed, status code {r.status_code}' + bcolors.ENDC)
             return
         self.repos = r.json()
-        print('get repo list success')
+        print(bcolors.OKGREEN + 'get repo list success' + bcolors.ENDC)
 
     def clone(self):
         pass
-    
+
     def pull(self, local_repo_path: str):
         if local_repo_path[-1] == os.path.sep:
             local_repo_path = local_repo_path[:-1]
         repo_name = local_repo_path.split(os.path.sep)[-1]
-        print(f'pull repo:{self.username}/{repo_name} from {self._host}')
+        print(f'{bcolors.WARNING}pull repo:{self.username}/{repo_name} from {self._host}{bcolors.ENDC}')
         pur_host = re.search(r'(?<=//)[^/]+', self._host).group()
-
         os.chdir(local_repo_path)
         os.system('git remote remove origin_gogs')
         os.system(
@@ -105,15 +94,13 @@ class GogsIE(BasePlatform):
         os.system(f'git pull origin_gogs {current_branch}')
         os.system('git remote remove origin_gogs')
         os.chdir('..')
-    
+
     def push(self, local_repo_path: str):
         if local_repo_path[-1] == os.path.sep:
             local_repo_path = local_repo_path[:-1]
         repo_name = local_repo_path.split(os.path.sep)[-1]
-        print(f'push repo:{self.username}/{repo_name} to {self._host}')
-
+        print(f'{bcolors.WARNING}push repo:{self.username}/{repo_name} to {self._host}{bcolors.ENDC}')
         pur_host = re.search(r'(?<=//)[^/]+', self._host).group()
-
         os.chdir(local_repo_path)
         os.system('git remote remove origin_gogs')
         os.system(

+ 28 - 0
repo_sync/platform/huawei.py

@@ -0,0 +1,28 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+'''
+@Contact :   liuyuqi.gov@msn.cn
+@Time    :   2024/08/07 19:14:51
+@License :   Copyright © 2017-2022 liuyuqi. All Rights Reserved.
+@Desc    :   huawei platform
+'''
+
+from repo_sync.platform.base_platform import BasePlatform
+import csv, subprocess,os,json
+from repo_sync.utils.colors import bcolors
+
+class HuaweiIE(BasePlatform):
+    """ huawei platform class """
+    def __init__(self, username: str, token: str, host: str = None, params: dict = None) -> None:
+        super().__init__(username=username, token=token)
+        self._host = 'https://git.huawei.com' if host is None else host
+
+
+    def clone(self, repo_name: str, repo_url: str, branch: str = None) -> bool:
+        pass
+    def pull(self, repo_name: str, branch: str = None) -> bool:
+        pass
+    def push(self, repo_name: str, branch: str = None) -> bool:
+        pass
+    def delete(self, repo_name: str) -> bool:
+        pass

+ 3 - 4
repo_sync/repo_sync.py

@@ -10,6 +10,7 @@ import os,csv,re
 import logging
 from .platform import gen_extractor_classes
 from .repo import Repo
+from repo_sync.utils.colors import bcolors
 
 class RepoSync(object):
     '''
@@ -44,7 +45,7 @@ class RepoSync(object):
                     self._find_git_repo(path=current_path, repo_name=dir)
         with open(self.repo_list_path, 'w') as f:
             if len(self.repos) == 0:
-                print('repo list is empty, please delete repo_list.csv and try again')
+                print(f"{bcolors.WARNING}repo list is empty, please delete repo_list.csv and try again{bcolors.ENDC}")
                 return
             writer = csv.DictWriter(
                 f, fieldnames=self.repos[0].__dict__.keys(), lineterminator='\n'
@@ -60,7 +61,6 @@ class RepoSync(object):
                 repo =Repo()
                 try:
                     url = re.findall(r'url\s+=\ (.*)', f.read())[0]
-                    # print(url)
                     repo.name = repo_name # url.split('/')[-1].replace('.git', '')
                     repo.url = url
                 except Exception as e:
@@ -68,8 +68,7 @@ class RepoSync(object):
                 repo.local_path = path
                 self.repos.append(repo)
         except Exception as e:
-            print("skip {} because of {}".format(path, e))
-
+            print(f"{bcolors.OKGREEN}skip {path} because of {e}{bcolors.ENDC}")
     def run(self):
         '''
         run repo

+ 19 - 0
repo_sync/utils/colors.py

@@ -0,0 +1,19 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+"""
+@Contact :   liuyuqi.gov@msn.cn
+@Time    :   2024/07/22
+@License :   Copyright © 2017-2022 liuyuqi. All Rights Reserved.
+@Desc    :   控制台颜色
+"""
+
+class bcolors:
+    HEADER = '\033[95m'
+    OKBLUE = '\033[94m'
+    OKCYAN = '\033[96m'
+    OKGREEN = '\033[92m'
+    WARNING = '\033[93m'
+    FAIL = '\033[91m'
+    ENDC = '\033[0m'
+    BOLD = '\033[1m'
+    UNDERLINE = '\033[4m'