0
点赞
收藏
分享

微信扫一扫

HAWQ数据库技术解析——HDFS filesystem interface


src/bin/gpfilesystem/hdfs/gpfshdfs.c文件为GPDB提供HDFS filesystem interface,其使用libhdfs库与HDFS通信。如下是提供的接口和其调用的关键的libhdfs库函数。

Datum gpfs_hdfs_connect(PG_FUNCTION_ARGS);          // hdfsFS hdfsConnect(const char * host, uint16_t port);
Datum gpfs_hdfs_disconnect(PG_FUNCTION_ARGS); // int hdfsDisconnect(hdfsFS fileSystem);

Datum gpfs_hdfs_openfile(PG_FUNCTION_ARGS); // hdfsFile hdfsOpenFile(hdfsFS fileSystem, const char * path, int flags, int bufferSize, short replication, int64_t blocksize);
Datum gpfs_hdfs_sync(PG_FUNCTION_ARGS);
Datum gpfs_hdfs_closefile(PG_FUNCTION_ARGS); // int hdfsCloseFile(hdfsFS fileSystem, hdfsFile file);

Datum gpfs_hdfs_createdirectory(PG_FUNCTION_ARGS); // int hdfsCreateDirectory(hdfsFS fileSystem, const char * path);
Datum gpfs_hdfs_delete(PG_FUNCTION_ARGS); // int hdfsDelete(hdfsFS fileSystem, const char * path, int recursive);
Datum gpfs_hdfs_chmod(PG_FUNCTION_ARGS); // int hdfsChmod(hdfsFS fileSystem, const char * path, short mode);

Datum gpfs_hdfs_read(PG_FUNCTION_ARGS); // int hdfsRead(hdfsFS fileSystem, hdfsFile file, void * buffer, int length);
Datum gpfs_hdfs_write(PG_FUNCTION_ARGS); // int hdfsWrite(hdfsFS fileSystem, hdfsFile file, const void * buffer, int length);
Datum gpfs_hdfs_seek(PG_FUNCTION_ARGS); // int hdfsSeek(hdfsFS fileSystem, hdfsFile file, int64_t desiredPos);
Datum gpfs_hdfs_tell(PG_FUNCTION_ARGS); // int64_t hdfsTell(hdfsFS fileSystem, hdfsFile file);

Datum gpfs_hdfs_truncate(PG_FUNCTION_ARGS); // int hdfsTruncate(hdfsFS fileSystem, const char * path, int64_t size);

Datum gpfs_hdfs_getpathinfo(PG_FUNCTION_ARGS); // hdfsFileInfo * hdfsGetPathInfo(hdfsFS fileSystem, const char * path);
Datum gpfs_hdfs_freefileinfo(PG_FUNCTION_ARGS);

如下为编译gpfshdfs.c文件的makefile

top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global

MODULE_big = gpfshdfs
OBJS = gpfshdfs.o

PG_CPPFLAGS = -I$(libpq_srcdir)
PG_LIBS = $(libpq_pgport)
SHLIB_LINK = -lprotobuf -lboost_system -lboost_date_time -lhdfs3

ifdef USE_PGXS
PGXS := $(shell pg_config --pgxs)
include $(PGXS)
else
subdir = src/bin/gpfilesystem/hdfs
include $(top_srcdir)/contrib/contrib-global.mk
endif

src/backend/catalog/filesystem.sql使用CREATE FILESYSTEM语句向pg_filesystem添加HDFS相关记录,其语法解析定义和DefineStmt节点处理流程如下所示。

CREATE FILESYSTEM HDFS
(
gpfs_libfile = "$libdir/gpfshdfs.so",
gpfs_connect = "gpfs_hdfs_connect",
gpfs_disconnect = "gpfs_hdfs_disconnect",
gpfs_open = "gpfs_hdfs_openfile",
gpfs_close = "gpfs_hdfs_closefile",
gpfs_seek = "gpfs_hdfs_seek",
gpfs_tell = "gpfs_hdfs_tell",
gpfs_read = "gpfs_hdfs_read",
gpfs_write = "gpfs_hdfs_write",
gpfs_flush = "gpfs_hdfs_sync",
gpfs_delete = "gpfs_hdfs_delete",
gpfs_chmod = "gpfs_hdfs_chmod",
gpfs_mkdir = "gpfs_hdfs_createdirectory",
gpfs_truncate = "gpfs_hdfs_truncate",
gpfs_getpathinfo = "gpfs_hdfs_getpathinfo",
gpfs_freefileinfo = "gpfs_hdfs_freefileinfo"
);

HAWQ数据库技术解析——HDFS filesystem interface_大数据


src/backend/tcop/utility.c/ProcessUtility函数在T_DefineStmt分支中处理stmt->kind为OBJECT_FILESYSTEM会调用src/backend/commands/filesystemcmds.c/DefineFileSystem函数(​​DefineFileSystem(stmt->defnames, stmt->definition, stmt->newOid, stmt->trusted)​​)。

static const char *fsysLibFileName = "gpfs_libfile";
/* DefineFileSystem */
void DefineFileSystem(List *name, List *parameters, Oid newOid, bool trusted){
char *fsysName;
AclResult aclresult;
List *funcNames[FSYS_FUNC_TOTALNUM];
char *fsysLibFile = NULL;
int funcNum = 0;
Oid fsysOid;

/* Convert list of names to a name and namespace */ // 返回命名空间namespace oid(在List name中指定或使用默认),该filesystem name的名字也从在List name中解析出来,设置到fsysName中
Oid fsysNamespace = QualifiedNameGetCreationNamespace(name, &fsysName);
/* Check we have creation rights in target namespace */ // 由于filesystem name创建于指定namespace,故查看是否有权限创建
aclresult = pg_namespace_aclcheck(fsysNamespace, GetUserId(), ACL_CREATE);
if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_NAMESPACE, get_namespace_name(fsysNamespace));

ListCell *pl;
for(int i = 0; i < FSYS_FUNC_TOTALNUM; i++) funcNames[i] = NIL;
foreach(pl, parameters) { // 处理参数
DefElem *defel = (DefElem *) lfirst(pl);
int funcType;
if (pg_strcasecmp(defel->defname, fsysLibFileName) == 0){ // "gpfs_libfile"
if(fsysLibFile == NULL){
fsysLibFile = strVal(linitial(defGetQualifiedName(defel)));
}else{ ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("filesystem attribute \"%s\" duplicated", defel->defname))); }
continue;
}
// static char *fsysFuncNames[FSYS_FUNC_TOTALNUM]= { "gpfs_connect", "gpfs_disconnect", "gpfs_open", "gpfs_close", "gpfs_seek", "gpfs_tell", "gpfs_read", "gpfs_write", "gpfs_flush", "gpfs_delete", "gpfs_chmod", "gpfs_mkdir", "gpfs_truncate", "gpfs_getpathinfo", "gpfs_freefileinfo" }; // 从参数中查找是否都已指定这些函数
for(funcType = 0; funcType < FSYS_FUNC_TOTALNUM; funcType++) {
if(pg_strcasecmp(defel->defname, fsys_func_type_to_name(funcType)) == 0) break;
}

if (funcType >= FSYS_FUNC_TOTALNUM) ereport(ERROR,(errcode(ERRCODE_SYNTAX_ERROR),errmsg("filesystem attribute \"%s\" not recognized", defel->defname)));
if(funcNames[funcType] == NIL)
funcNames[funcType] = defGetQualifiedName(defel);
else
ereport(ERROR,(errcode(ERRCODE_SYNTAX_ERROR),errmsg("filesystem function \"%s\" duplicated", defel->defname)));
funcNum++;
}

/* make sure we have our required definitions */
if (fsysLibFile == NULL) ereport(ERROR,(errcode(ERRCODE_SYNTAX_ERROR),errmsg("filesystem need %s but not specified", fsysLibFileName)));
if (funcNum != FSYS_FUNC_TOTALNUM) ereport(ERROR,(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("filesystem need %d funcs but only get %d", FSYS_FUNC_TOTALNUM, funcNum)));

/* Most of the argument-checking is done inside of FileSystemCreate */
fsysOid = FileSystemCreateWithOid(fsysName, /* filesystem name */
fsysNamespace, /* namespace */
funcNames, /* functions' name */
funcNum,
fsysLibFile,
newOid,
trusted); // 定义在src/backend/catalog/pg_filesystem.c中

filesystem_file_update_needed(); /* Set flag to update flat filesystem at commit. */

if (Gp_role == GP_ROLE_DISPATCH){
DefineStmt * stmt = makeNode(DefineStmt);
stmt->kind = OBJECT_FILESYSTEM;stmt->oldstyle = false;stmt->defnames = name;
stmt->args = NIL;stmt->definition = parameters;stmt->newOid = fsysOid;
stmt->shadowOid = 0;stmt->ordered = false;stmt->trusted = trusted;
ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),errmsg("Cannot support DefineFileSystem")));
}
}

HAWQ数据库技术解析——HDFS filesystem interface_大数据_02

HAWQ数据库技术解析——HDFS filesystem interface_数据库_03


HAWQ数据库技术解析——HDFS filesystem interface_数据库_04


举报

相关推荐

0 条评论