filesystem interface代码定义在pgsql/src/backend/storage/file/filesystem.c文件中。首先介绍static HTAB *FsysInterfaceTable = NULL;
哈希表(filesystem hash table),其key为代表FsysName的字符串(filesystem name),value为FsysInterface结构体typedef struct FsysInterfaceData *FsysInterface;
。FsysInterface结构体包含了filesystem interface所抽象的函数不同文件系统的具体实现函数。
typedef struct FsysInterfaceData{
char host[MAXPGPATH + 1];
FmgrInfo fsysFuncs[FSYS_FUNC_TOTALNUM];
} FsysInterfaceData;
typedef enum FileSystemFuncType { // 定义在src/include/catalog/pg_filesystem.h文件中
FSYS_FUNC_CONNECT,
FSYS_FUNC_DISCONNECT, // nouse
FSYS_FUNC_OPEN,
FSYS_FUNC_CLOSE,
FSYS_FUNC_SEEK,
FSYS_FUNC_TELL,
FSYS_FUNC_READ,
FSYS_FUNC_WRITE,
FSYS_FUNC_SYNC, // reopen
FSYS_FUNC_DELETE,
FSYS_FUNC_CHMOD,
FSYS_FUNC_MKDIR,
FSYS_FUNC_TRUNCATE,
FSYS_FUNC_GETPATHINFO,
FSYS_FUNC_FREEFILEINFO,
FSYS_FUNC_TOTALNUM
} FileSystemFuncType;
FsysInterfaceGet函数实现了通过filesystem name查询FsysInterfaceTable获取FsysInterface结构体。如果未初始化FsysInterfaceTable,则初始化之。如果没有查找到需要调用InitFsysInterfaceFromFlatfile函数初始化已插入HASH表未初始化的FsysInterface结构体。
static FsysInterface FsysInterfaceGet(FsysName name) {
FsysInterface entry = NULL; HASHCTL hash_ctl; bool found = false;
do {
if(NULL == FsysInterfaceTable) { // 初始化FsysInterfaceTable
if(NULL == FsysGlobalContext) FsysGlobalContext = AllocSetContextCreate(TopMemoryContext,"Filesystem Global Context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
MemSet(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = MAXPGPATH; hash_ctl.entrysize = sizeof(*entry); hash_ctl.hash = string_hash; hash_ctl.hcxt = FsysGlobalContext;
FsysInterfaceTable = hash_create("filesystem hash table", EXPECTED_MAX_FSYS_ENTRIES, &hash_ctl, HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
if(FsysInterfaceTable == NULL) { elog(WARNING, "failed to create hash table: FsysInterfaceTable."); break; }
}
entry = (FsysInterface) hash_search(FsysInterfaceTable, name, HASH_ENTER, &found);
if(!found) {
if(0 != InitFsysInterfaceFromFlatfile(name, entry)) {
hash_search(FsysInterfaceTable, name, HASH_REMOVE, &found);
entry = NULL; elog(WARNING, "fail to init filesystem: %s", name);break;
}
}
} while(0);
return entry;
}
FsysInterfaceGetFunc
FsysInterfaceGetFunc函数利用FsysInterfaceGet函数获取FsysInterface结构体,并利用FileSystemFuncType获取对应的实现函数。
static FmgrInfo *FsysInterfaceGetFunc(FsysName name, FileSystemFuncType funcType){
FsysInterface fsysInterface = FsysInterfaceGet(name);
return &(fsysInterface->fsysFuncs[funcType]);
}
实现函数封装
如下实现封装函数由src/backend/storage/file/fd.c文件调用。
hdfsFS HdfsConnect(FsysName protocol, char * host, uint16_t port, char *ccname, void *token)
int HdfsDisconnect(FsysName protocol, hdfsFS fileSystem)
hdfsFile HdfsOpenFile(FsysName protocol, hdfsFS fileSystem, char * path, int flags, int bufferSize, short replication, int64_t blocksize)
int HdfsCloseFile(FsysName protocol, hdfsFS fileSystem, hdfsFile file)
int HdfsSeek(FsysName protocol, hdfsFS fileSystem, hdfsFile file, int64_t desiredPos)
int64_t HdfsTell(FsysName protocol, hdfsFS fileSystem, hdfsFile file)
int HdfsRead(FsysName protocol, hdfsFS fileSystem, hdfsFile file, void * buffer, int length)
int HdfsWrite(FsysName protocol, hdfsFS fileSystem, hdfsFile file, const void * buffer, int length)
int HdfsSync(FsysName protocol, hdfsFS fileSystem, hdfsFile file)
int HdfsDelete(FsysName protocol, hdfsFS fileSystem, char * path, int recursive)
int HdfsChmod(FsysName protocol, hdfsFS fileSystem, char * path, short mode)
int HdfsCreateDirectory(FsysName protocol, hdfsFS fileSystem, char * path)
int HdfsTruncate(FsysName protocol, hdfsFS fileSystem, char * path, int64_t size)
hdfsFileInfo * HdfsGetPathInfo(FsysName protocol, hdfsFS fileSystem, char * path)
int HdfsFreeFileInfo(FsysName protocol, hdfsFileInfo * info, int numEntries)
以HdfsConnect函数为例,首先调用FsysInterfaceGetFunc函数获取FsysName对应的文件系统抽象api的实现,初始化FileSystemUdfData结构体的成员type、fsys_host、fsys_port、fsys_hdfs、fsys_ccname和fsys_token,将fsysUdf作为Call Context、fsysFunc作为FmgrInfo初始化FunctionCallInfoData,最后调用FunctionCallInvoke函数,最后返回api接口返回的结构体hdfsFS。
hdfsFS HdfsConnect(FsysName protocol, char * host, uint16_t port, char *ccname, void *token){
FmgrInfo *fsysFunc = FsysInterfaceGetFunc(protocol, FSYS_FUNC_CONNECT);
FileSystemUdfData fsysUdf; fsysUdf.type = T_FileSystemFunctionData; fsysUdf.fsys_host = host;
fsysUdf.fsys_port = port; fsysUdf.fsys_hdfs = NULL; fsysUdf.fsys_ccname = ccname; fsysUdf.fsys_token = token;
FunctionCallInfoData fcinfo;
InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, /* FmgrInfo */ fsysFunc, /* nArgs */ 0, /* Call Context */ (Node *) (&fsysUdf), /* ResultSetInfo */ NULL);
FunctionCallInvoke(&fcinfo);
return FSYS_UDF_GET_HDFS(&fcinfo);
}