Browse Source

feat(hdfs): implement file operations for WebHDFS backend

Add methods for directory creation, deletion, file copying between local and HDFS, and file read/write operations. Includes proper error handling and logging for all operations. The implementation follows WebHDFS REST API specifications for reliable file operations.
liuyuqi-cnb 1 month ago
parent
commit
6dc398cc8d
1 changed files with 114 additions and 0 deletions
  1. 114 0
      python/hdfs_operations.py

+ 114 - 0
python/hdfs_operations.py

@@ -498,3 +498,117 @@ class WebHDFSBackend(HDFSBackend):
     def is_available(self) -> bool:
     def is_available(self) -> bool:
         """检查后端是否可用"""
         """检查后端是否可用"""
         try:
         try:
+            import requests
+            # 尝试连接 Namenode
+            response = self._get_session().get(
+                f"{self.base_url}/?op=LISTSTATUS",
+                params={'user.name': self.user},
+                timeout=5
+            )
+            return response.status_code in [200, 401, 403]
+        except Exception:
+            return False
+    
+    def make_dir(self, path: str) -> bool:
+        try:
+            session = self._get_session()
+            response = session.put(
+                f"{self.base_url}{path}",
+                params={'op': 'MKDIRS', 'user.name': self.user},
+                timeout=self.config.connect_timeout
+            )
+            return response.status_code == 200
+        except Exception as e:
+            self.logger.error(f"Failed to create directory: {e}")
+            return False
+    
+    def delete(self, path: str, recursive: bool = True) -> bool:
+        try:
+            session = self._get_session()
+            response = session.delete(
+                f"{self.base_url}{path}",
+                params={
+                    'op': 'DELETE',
+                    'recursive': str(recursive).lower(),
+                    'user.name': self.user
+                },
+                timeout=self.config.connect_timeout
+            )
+            return response.status_code == 200
+        except Exception as e:
+            self.logger.error(f"Failed to delete: {e}")
+            return False
+    
+    def copy_from_local(self, src: str, dst: str) -> bool:
+        try:
+            with open(src, 'rb') as f:
+                content = f.read()
+            return self.write_file(dst, content.decode('utf-8', errors='ignore'))
+        except Exception as e:
+            self.logger.error(f"Failed to copy from local: {e}")
+            return False
+    
+    def copy_to_local(self, src: str, dst: str) -> bool:
+        content = self.read_file(src)
+        if content is None:
+            return False
+        
+        try:
+            with open(dst, 'w', encoding='utf-8') as f:
+                f.write(content)
+            return True
+        except Exception as e:
+            self.logger.error(f"Failed to copy to local: {e}")
+            return False
+    
+    def read_file(self, path: str) -> Optional[str]:
+        try:
+            session = self._get_session()
+            response = session.get(
+                f"{self.base_url}{path}",
+                params={'op': 'OPEN', 'user.name': self.user},
+                timeout=self.config.read_timeout
+            )
+            if response.status_code == 200:
+                return response.text
+            else:
+                self.logger.error(f"Failed to read file: {response.status_code}")
+                return None
+        except Exception as e:
+            self.logger.error(f"Failed to read file: {e}")
+            return None
+    
+    def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
+        try:
+            session = self._get_session()
+            
+            # 第一步:获取写入位置
+            response = session.put(
+                f"{self.base_url}{path}",
+                params={
+                    'op': 'CREATE',
+                    'overwrite': str(overwrite).lower(),
+                    'user.name': self.user
+                },
+                allow_redirects=False,
+                timeout=self.config.connect_timeout
+            )
+            
+            if response.status_code != 307:
+                self.logger.error(f"Failed to get write location: {response.status_code}")
+                return False
+            
+            # 第二步:写入数据到 DataNode
+            location = response.headers.get('Location')
+            if not location:
+                self.logger.error("No Location header in response")
+                return False
+            
+            response = session.put(
+                location,
+                data=content.encode('utf-8'),
+                timeout=self.config.write_timeout
+            )
+            
+            return response.status_code == 201
+        except Exception as