Prev: pgsql: Add a WHEN clause to CREATE TRIGGER, allowing a boolean
Next: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION
From: Daniel Farina on 23 Nov 2009 16:34 This relatively small change enables all sort of new and shiny evil by allowing specification of a function to COPY that accepts each produced row's content in turn. The function must accept a single INTERNAL-type argument, which is actually of the type StringInfo. Patch highlights: - Grammar production changes to enable "TO FUNCTION <qualified name>" - A new enumeration value in CopyDest to indicate this new mode called COPY_FN. - CopyStateData's "filename" field has been renamed "destination" and is now a Node type. Before it was either a string or NULL; now it may be a RangeVar, a string, or NULL. Some code now has to go through some minor strVal() unboxing for the regular TO '/file' behavior. - Due to the relatively restricted way this function can be called it was possible to reduce per-row overhead by computing the FunctionCallInfo once and then reusing it, as opposed to simply using one of the convenience functions in the fmgr. - Add and expose the makeNameListFromRangeVar procedure to src/catalog/namespace.c, the inverse of makeRangeVarFromNameList. Signed-off-by: Daniel Farina <dfarina(a)truviso.com> --- src/backend/catalog/namespace.c | 21 +++++ src/backend/commands/copy.c | 190 +++++++++++++++++++++++++++++++++----- src/backend/executor/spi.c | 2 +- src/backend/nodes/copyfuncs.c | 2 +- src/backend/nodes/equalfuncs.c | 2 +- src/backend/parser/gram.y | 30 ++++-- src/include/catalog/namespace.h | 1 + src/include/nodes/parsenodes.h | 3 +- 8 files changed, 212 insertions(+), 39 deletions(-) diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index 99c9140..8911e29 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -2467,6 +2467,27 @@ QualifiedNameGetCreationNamespace(List *names, char **objname_p) } /* + * makeNameListFromRangeVar + * Utility routine to convert a qualified-name list into RangeVar form. + */ +List * +makeNameListFromRangeVar(RangeVar *rangevar) +{ + List *names = NIL; + + Assert(rangevar->relname != NULL); + names = lcons(makeString(rangevar->relname), names); + + if (rangevar->schemaname != NULL) + names = lcons(makeString(rangevar->schemaname), names); + + if (rangevar->catalogname != NULL) + names = lcons(makeString(rangevar->catalogname), names); + + return names; +} + +/* * makeRangeVarFromNameList * Utility routine to convert a qualified-name list into RangeVar form. */ diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 5e95fd7..985505a 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -33,6 +33,7 @@ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "optimizer/planner.h" +#include "parser/parse_func.h" #include "parser/parse_relation.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" @@ -55,7 +56,8 @@ typedef enum CopyDest { COPY_FILE, /* to/from file */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ - COPY_NEW_FE /* to/from frontend (3.0 protocol) */ + COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ + COPY_FN /* to function */ } CopyDest; /* @@ -104,7 +106,8 @@ typedef struct CopyStateData Relation rel; /* relation to copy to or from */ QueryDesc *queryDesc; /* executable query to copy from */ List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN/STDOUT */ + Node *destination; /* filename, or NULL for STDIN/STDOUT, or a + * function */ bool binary; /* binary format? */ bool oids; /* include OIDs? */ bool csv_mode; /* Comma Separated Value format? */ @@ -131,6 +134,13 @@ typedef struct CopyStateData MemoryContext rowcontext; /* per-row evaluation context */ /* + * For writing rows out to a function. Used if copy_dest == COPY_FN + * + * Avoids repeated use of DirectFunctionCall for efficiency. + */ + FunctionCallInfoData output_fcinfo; + + /* * These variables are used to reduce overhead in textual COPY FROM. * * attribute_buf holds the separated, de-escaped text for each field of @@ -425,9 +435,11 @@ CopySendEndOfRow(CopyState cstate) { StringInfo fe_msgbuf = cstate->fe_msgbuf; + /* Take care adding row delimiters*/ switch (cstate->copy_dest) { case COPY_FILE: + case COPY_FN: if (!cstate->binary) { /* Default line termination depends on platform */ @@ -437,6 +449,18 @@ CopySendEndOfRow(CopyState cstate) CopySendString(cstate, "\r\n"); #endif } + break; + case COPY_NEW_FE: + case COPY_OLD_FE: + /* The FE/BE protocol uses \n as newline for all platforms */ + if (!cstate->binary) + CopySendChar(cstate, '\n'); + break; + } + + switch (cstate->copy_dest) + { + case COPY_FILE: (void) fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file); @@ -446,10 +470,6 @@ CopySendEndOfRow(CopyState cstate) errmsg("could not write to COPY file: %m"))); break; case COPY_OLD_FE: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->binary) - CopySendChar(cstate, '\n'); - if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)) { /* no hope of recovering connection sync, so FATAL */ @@ -459,13 +479,19 @@ CopySendEndOfRow(CopyState cstate) } break; case COPY_NEW_FE: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; + case COPY_FN: + FunctionCallInvoke(&cstate->output_fcinfo); + + /* + * These are set earlier and are not supposed to change row to row. + */ + Assert(cstate->output_fcinfo.arg[0] == + PointerGetDatum(cstate->fe_msgbuf)); + Assert(!cstate->output_fcinfo.argnull[0]); + break; } resetStringInfo(fe_msgbuf); @@ -577,6 +603,12 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; + case COPY_FN: + /* + * Should be disallowed by some prior step + */ + Assert(false); + break; } return bytesread; @@ -719,7 +751,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) { CopyState cstate; bool is_from = stmt->is_from; - bool pipe = (stmt->filename == NULL); + bool pipe = (stmt->destination == NULL); List *attnamelist = stmt->attlist; List *force_quote = NIL; List *force_notnull = NIL; @@ -986,6 +1018,14 @@ DoCopy(const CopyStmt *stmt, const char *queryString) errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); + /* Disallow COPY ... FROM FUNCTION (only TO FUNCTION supported) */ + if (is_from && cstate->destination != NULL && + IsA(cstate->destination, RangeVar)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY FROM does not support functions as sources"))); + + if (stmt->relation) { Assert(!stmt->query); @@ -1183,7 +1223,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding); cstate->copy_dest = COPY_FILE; /* default */ - cstate->filename = stmt->filename; + cstate->destination = stmt->destination; if (is_from) CopyFrom(cstate); /* copy from file to database */ @@ -1225,7 +1265,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) static void DoCopyTo(CopyState cstate) { - bool pipe = (cstate->filename == NULL); + bool pipe = (cstate->destination == NULL); if (cstate->rel) { @@ -1257,37 +1297,128 @@ DoCopyTo(CopyState cstate) else cstate->copy_file = stdout; } - else + else if (IsA(cstate->destination, String)) { mode_t oumask; /* Pre-existing umask value */ struct stat st; + char *dest_filename = strVal(cstate->destination); /* * Prevent write to relative path ... too easy to shoot oneself in the * foot by overwriting a database file ... */ - if (!is_absolute_path(cstate->filename)) + if (!is_absolute_path(dest_filename)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("relative path not allowed for COPY to file"))); oumask = umask((mode_t) 022); - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); + cstate->copy_file = AllocateFile(dest_filename, PG_BINARY_W); umask(oumask); if (cstate->copy_file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\" for writing: %m", - cstate->filename))); + dest_filename))); fstat(fileno(cstate->copy_file), &st); if (S_ISDIR(st.st_mode)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", cstate->filename))); + errmsg("\"%s\" is a directory", dest_filename))); } + /* Branch taken in the "COPY ... TO FUNCTION funcname" situation */ + else if (IsA(cstate->destination, RangeVar)) + { + List *names; + FmgrInfo *flinfo; + FuncDetailCode fdresult; + Oid funcid; + Oid rettype; + bool retset; + int nvargs; + Oid *true_typeids; + const int nargs = 1; + Oid argtypes[] = { INTERNALOID }; + + /* Flip copy-action dispatch flag */ + cstate->copy_dest = COPY_FN; + + /* Make an fcinfo that can be reused and is stored on the cstate. */ + names = makeNameListFromRangeVar((RangeVar *) cstate->destination); + flinfo = palloc0(sizeof *flinfo); + + + fdresult = func_get_detail(names, NIL, NIL, nargs, argtypes, false, + false, + + /* Begin out-arguments */ + &funcid, &rettype, &retset, &nvargs, + &true_typeids, NULL); + + /* + * Check to ensure that this is a "normal" function when looked up, + * otherwise error. + */ + switch (fdresult) + { + /* Normal function found; do nothing */ + case FUNCDETAIL_NORMAL: + break; + + case FUNCDETAIL_NOTFOUND: + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("function %s does not exist", + func_signature_string(names, nargs, NIL, + argtypes)))); + break; + + case FUNCDETAIL_AGGREGATE: + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("function %s must not be an aggregate", + func_signature_string(names, nargs, NIL, + argtypes)))); + break; + + case FUNCDETAIL_WINDOWFUNC: + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("function %s must not be a window function", + func_signature_string(names, nargs, NIL, + argtypes)))); + break; + + case FUNCDETAIL_COERCION: + /* + * Should never be yielded from func_get_detail if it is passed + * fargs == NIL, as it is previously. + */ + Assert(false); + break; + + case FUNCDETAIL_MULTIPLE: + /* + * Only support one signature, thus overloading of a name with + * different types should never occur. + */ + Assert(false); + break; + + } + + fmgr_info(funcid, flinfo); + InitFunctionCallInfoData(cstate->output_fcinfo, flinfo, + 1, NULL, NULL); + } + else + /* Unexpected type was found for cstate->destination. */ + Assert(false); + + PG_TRY(); { if (cstate->fe_copy) @@ -1310,13 +1441,13 @@ DoCopyTo(CopyState cstate) } PG_END_TRY(); - if (!pipe) + if (!pipe && cstate->copy_dest != COPY_FN) { if (FreeFile(cstate->copy_file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", - cstate->filename))); + strVal(cstate->destination)))); } } @@ -1342,6 +1473,13 @@ CopyTo(CopyState cstate) /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); + /* + * fe_msgbuf is never rebound, so there is only a need to set up the + * output_fcinfo once. + */ + cstate->output_fcinfo.arg[0] = PointerGetDatum(cstate->fe_msgbuf); + cstate->output_fcinfo.argnull[0] = false; + /* Get info about the columns we need to process. */ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); foreach(cur, cstate->attnumlist) @@ -1668,7 +1806,7 @@ limit_printout_length(const char *str) static void CopyFrom(CopyState cstate) { - bool pipe = (cstate->filename == NULL); + bool pipe = (cstate->destination == NULL); HeapTuple tuple; TupleDesc tupDesc; Form_pg_attribute *attr; @@ -1768,19 +1906,21 @@ CopyFrom(CopyState cstate) { struct stat st; - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); + cstate->copy_file = AllocateFile(strVal(cstate->destination), + PG_BINARY_R); if (cstate->copy_file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\" for reading: %m", - cstate->filename))); + strVal(cstate->destination)))); fstat(fileno(cstate->copy_file), &st); if (S_ISDIR(st.st_mode)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", cstate->filename))); + errmsg("\"%s\" is a directory", + strVal(cstate->destination)))); } tupDesc = RelationGetDescr(cstate->rel); @@ -2215,7 +2355,7 @@ CopyFrom(CopyState cstate) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from file \"%s\": %m", - cstate->filename))); + strVal(cstate->destination)))); } /* diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index f045f9c..0914dc9 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -1829,7 +1829,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, { CopyStmt *cstmt = (CopyStmt *) stmt; - if (cstmt->filename == NULL) + if (cstmt->destination == NULL) { my_res = SPI_ERROR_COPY; goto fail; diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 8bc72d1..9b39abe 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -2485,7 +2485,7 @@ _copyCopyStmt(CopyStmt *from) COPY_NODE_FIELD(query); COPY_NODE_FIELD(attlist); COPY_SCALAR_FIELD(is_from); - COPY_STRING_FIELD(filename); + COPY_NODE_FIELD(destination); COPY_NODE_FIELD(options); return newnode; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 3d65d8b..6ddf226 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -1085,7 +1085,7 @@ _equalCopyStmt(CopyStmt *a, CopyStmt *b) COMPARE_NODE_FIELD(query); COMPARE_NODE_FIELD(attlist); COMPARE_SCALAR_FIELD(is_from); - COMPARE_STRING_FIELD(filename); + COMPARE_NODE_FIELD(destination); COMPARE_NODE_FIELD(options); return true; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 130e6f4..23331ee 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -251,8 +251,7 @@ static TypeName *TableFuncTypeName(List *columns); %type <value> TriggerFuncArg %type <node> TriggerWhen -%type <str> copy_file_name - database_name access_method_clause access_method attr_name +%type <str> database_name access_method_clause access_method attr_name index_name name cursor_name file_name cluster_index_specification %type <list> func_name handler_name qual_Op qual_all_Op subquery_Op @@ -433,6 +432,8 @@ static TypeName *TableFuncTypeName(List *columns); %type <ival> opt_frame_clause frame_extent frame_bound +%type <node> copy_file_or_function_name + /* * Non-keyword token types. These are hard-wired into the "flex" lexer. * They must be listed first so that their numeric codes do not depend on @@ -1977,14 +1978,15 @@ ClosePortalStmt: *****************************************************************************/ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids - copy_from copy_file_name copy_delimiter opt_with copy_options + copy_from copy_file_or_function_name copy_delimiter opt_with + copy_options { CopyStmt *n = makeNode(CopyStmt); n->relation = $3; n->query = NULL; n->attlist = $4; n->is_from = $6; - n->filename = $7; + n->destination = $7; n->options = NIL; /* Concatenate user-supplied flags */ @@ -1998,14 +2000,15 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids n->options = list_concat(n->options, $10); $$ = (Node *)n; } - | COPY select_with_parens TO copy_file_name opt_with copy_options + | COPY select_with_parens TO copy_file_or_function_name + opt_with copy_options { CopyStmt *n = makeNode(CopyStmt); n->relation = NULL; n->query = $2; n->attlist = NIL; n->is_from = false; - n->filename = $4; + n->destination = $4; n->options = $6; $$ = (Node *)n; } @@ -2021,10 +2024,17 @@ copy_from: * used depends on the direction. (It really doesn't make sense to copy from * stdout. We silently correct the "typo".) - AY 9/94 */ -copy_file_name: - Sconst { $$ = $1; } - | STDIN { $$ = NULL; } - | STDOUT { $$ = NULL; } +copy_file_or_function_name: + Sconst { $$ = (Node *) makeString($1); } + + /* + * Note that func_name is not used here because there is no need to + * accept the "funcname(TYPES)" construction, as there is only one + * valid signature. + */ + | FUNCTION qualified_name { $$ = (Node *) $2; } + | STDIN { $$ = NULL; } + | STDOUT { $$ = NULL; } ; copy_options: copy_opt_list { $$ = $1; } diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index d356635..1d801cd 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -94,6 +94,7 @@ extern Oid LookupExplicitNamespace(const char *nspname); extern Oid LookupCreationNamespace(const char *nspname); extern Oid QualifiedNameGetCreationNamespace(List *names, char **objname_p); +extern List *makeNameListFromRangeVar(RangeVar *rangevar); extern RangeVar *makeRangeVarFromNameList(List *names); extern char *NameListToString(List *names); extern char *NameListToQuotedString(List *names); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index b34300f..203088c 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1293,7 +1293,8 @@ typedef struct CopyStmt List *attlist; /* List of column names (as Strings), or NIL * for all columns */ bool is_from; /* TO or FROM */ - char *filename; /* filename, or NULL for STDIN/STDOUT */ + Node *destination; /* filename, or NULL for STDIN/STDOUT, or a + * function */ List *options; /* List of DefElem nodes */ } CopyStmt; -- 1.6.5.3 -- Sent via pgsql-hackers mailing list (pgsql-hackers(a)postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers |