diff --git a/Makefile b/Makefile index c09b4b7..b6ff0c2 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,8 @@ MODULES = wal2json # message test will fail for <= 9.5 REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \ delete3 delete4 savepoint specialvalue toast bytea message typmod \ - filtertable selecttable include_timestamp include_lsn include_xids + filtertable selecttable include_timestamp include_lsn include_xids \ + key_value PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/expected/key_value.out b/expected/key_value.out new file mode 100644 index 0000000..d16b782 --- /dev/null +++ b/expected/key_value.out @@ -0,0 +1,366 @@ +\set VERBOSITY terse +-- predictability +SET synchronous_commit = on; +CREATE TABLE table_with_pk ( +a smallserial, +b smallint, +c int, +d bigint, +e numeric(5,3), +f real not null, +g double precision, +h char(10), +i varchar(30), +j text, +k bit varying(20), +l timestamp, +m date, +n boolean not null, +o json, +p tsvector, +PRIMARY KEY(b, c, d) +); +ERROR: relation "table_with_pk" already exists +CREATE TABLE table_without_pk ( +a smallserial, +b smallint, +c int, +d bigint, +e numeric(5,3), +f real not null, +g double precision, +h char(10), +i varchar(30), +j text, +k bit varying(20), +l timestamp, +m date, +n boolean not null, +o json, +p tsvector +); +ERROR: relation "table_without_pk" already exists +CREATE TABLE table_with_unique ( +a smallserial, +b smallint, +c int, +d bigint, +e numeric(5,3) not null, +f real not null, +g double precision not null, +h char(10), +i varchar(30), +j text, +k bit varying(20), +l timestamp, +m date, +n boolean not null, +o json, +p tsvector, +UNIQUE(g, n) +); +ERROR: relation "table_with_unique" already exists +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + ?column? +---------- + init +(1 row) + +-- INSERT +BEGIN; +INSERT INTO table_with_pk (b, c, d, e, f, g, h, i, j, k, l, m, n, o, p) VALUES(1, 2, 3, 3.54, 876.563452345, 1.23, 'teste', 'testando', 'um texto longo', B'001110010101010', '2013-11-02 17:30:52', '2013-02-04', true, '{ "a": 123 }', 'Old Old Parr'::tsvector); +INSERT INTO table_without_pk (b, c, d, e, f, g, h, i, j, k, l, m, n, o, p) VALUES(1, 2, 3, 3.54, 876.563452345, 1.23, 'teste', 'testando', 'um texto longo', B'001110010101010', '2013-11-02 17:30:52', '2013-02-04', true, '{ "a": 123 }', 'Old Old Parr'::tsvector); +INSERT INTO table_with_unique (b, c, d, e, f, g, h, i, j, k, l, m, n, o, p) VALUES(1, 2, 3, 3.54, 876.563452345, 1.23, 'teste', 'testando', 'um texto longo', B'001110010101010', '2013-11-02 17:30:52', '2013-02-04', true, '{ "a": 123 }', 'Old Old Parr'::tsvector); +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'use-key-value-hash', '1'); + data +---------------------------------------------------------------------- + { + + "change": [ + + { + + "kind": "insert", + + "schema": "public", + + "table": "table_with_pk", + + "changes": { + + "a": 2, + + "b": 1, + + "c": 2, + + "d": 3, + + "e": 3.540, + + "f": 876.563, + + "g": 1.23, + + "h": "teste ", + + "i": "testando", + + "j": "um texto longo", + + "k": "001110010101010", + + "l": "Sat Nov 02 17:30:52 2013", + + "m": "02-04-2013", + + "n": true, + + "o": "{ \"a\": 123 }", + + "p": "'Old' 'Parr'" + + }, + + "columntypes": { + + "a": "smallint", + + "b": "smallint", + + "c": "integer", + + "d": "bigint", + + "e": "numeric(5,3)", + + "f": "real", + + "g": "double precision", + + "h": "character(10)", + + "i": "character varying(30)", + + "j": "text", + + "k": "bit varying(20)", + + "l": "timestamp without time zone", + + "m": "date", + + "n": "boolean", + + "o": "json", + + "p": "tsvector" + + } + + } + + ,{ + + "kind": "insert", + + "schema": "public", + + "table": "table_without_pk", + + "changes": { + + "a": 2, + + "b": 1, + + "c": 2, + + "d": 3, + + "e": 3.540, + + "f": 876.563, + + "g": 1.23, + + "h": "teste ", + + "i": "testando", + + "j": "um texto longo", + + "k": "001110010101010", + + "l": "Sat Nov 02 17:30:52 2013", + + "m": "02-04-2013", + + "n": true, + + "o": "{ \"a\": 123 }", + + "p": "'Old' 'Parr'" + + }, + + "columntypes": { + + "a": "smallint", + + "b": "smallint", + + "c": "integer", + + "d": "bigint", + + "e": "numeric(5,3)", + + "f": "real", + + "g": "double precision", + + "h": "character(10)", + + "i": "character varying(30)", + + "j": "text", + + "k": "bit varying(20)", + + "l": "timestamp without time zone", + + "m": "date", + + "n": "boolean", + + "o": "json", + + "p": "tsvector" + + } + + } + + ,{ + + "kind": "insert", + + "schema": "public", + + "table": "table_with_unique", + + "changes": { + + "a": 3, + + "b": 1, + + "c": 2, + + "d": 3, + + "e": 3.540, + + "f": 876.563, + + "g": 1.23, + + "h": "teste ", + + "i": "testando", + + "j": "um texto longo", + + "k": "001110010101010", + + "l": "Sat Nov 02 17:30:52 2013", + + "m": "02-04-2013", + + "n": true, + + "o": "{ \"a\": 123 }", + + "p": "'Old' 'Parr'" + + }, + + "columntypes": { + + "a": "smallint", + + "b": "smallint", + + "c": "integer", + + "d": "bigint", + + "e": "numeric(5,3)", + + "f": "real", + + "g": "double precision", + + "h": "character(10)", + + "i": "character varying(30)", + + "j": "text", + + "k": "bit varying(20)", + + "l": "timestamp without time zone", + + "m": "date", + + "n": "boolean", + + "o": "json", + + "p": "tsvector" + + } + + } + + ] + + } +(1 row) + +-- UPDATE +UPDATE table_without_pk SET f = -f WHERE b = 1; +-- UPDATE: no pk change +UPDATE table_with_pk SET f = -f WHERE b = 1; +-- UPDATE: pk change +UPDATE table_with_pk SET b = -b WHERE b = 1; +-- UPDATE: unique +UPDATE table_with_unique SET n = false WHERE b = 1; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'use-key-value-hash', '1'); +WARNING: table "table_without_pk" without primary key or replica identity is nothing +WARNING: table "table_with_unique" without primary key or replica identity is nothing + data +---------------------------------------------------------------------- + { + + "change": [ + + ] + + } + { + + "change": [ + + { + + "kind": "update", + + "schema": "public", + + "table": "table_with_pk", + + "changes": { + + "a": 2, + + "b": 1, + + "c": 2, + + "d": 3, + + "e": 3.540, + + "f": -876.563, + + "g": 1.23, + + "h": "teste ", + + "i": "testando", + + "j": "um texto longo", + + "k": "001110010101010", + + "l": "Sat Nov 02 17:30:52 2013", + + "m": "02-04-2013", + + "n": true, + + "o": "{ \"a\": 123 }", + + "p": "'Old' 'Parr'" + + }, + + "columntypes": { + + "a": "smallint", + + "b": "smallint", + + "c": "integer", + + "d": "bigint", + + "e": "numeric(5,3)", + + "f": "real", + + "g": "double precision", + + "h": "character(10)", + + "i": "character varying(30)", + + "j": "text", + + "k": "bit varying(20)", + + "l": "timestamp without time zone", + + "m": "date", + + "n": "boolean", + + "o": "json", + + "p": "tsvector" + + }, + + "oldkeys": { + + "b": 1, + + "c": 2, + + "d": 3 + + }, + + "keytypes": { + + "b": "smallint", + + "c": "integer", + + "d": "bigint" + + } + + } + + ] + + } + { + + "change": [ + + { + + "kind": "update", + + "schema": "public", + + "table": "table_with_pk", + + "changes": { + + "a": 2, + + "b": -1, + + "c": 2, + + "d": 3, + + "e": 3.540, + + "f": -876.563, + + "g": 1.23, + + "h": "teste ", + + "i": "testando", + + "j": "um texto longo", + + "k": "001110010101010", + + "l": "Sat Nov 02 17:30:52 2013", + + "m": "02-04-2013", + + "n": true, + + "o": "{ \"a\": 123 }", + + "p": "'Old' 'Parr'" + + }, + + "columntypes": { + + "a": "smallint", + + "b": "smallint", + + "c": "integer", + + "d": "bigint", + + "e": "numeric(5,3)", + + "f": "real", + + "g": "double precision", + + "h": "character(10)", + + "i": "character varying(30)", + + "j": "text", + + "k": "bit varying(20)", + + "l": "timestamp without time zone", + + "m": "date", + + "n": "boolean", + + "o": "json", + + "p": "tsvector" + + }, + + "oldkeys": { + + "b": 1, + + "c": 2, + + "d": 3 + + }, + + "keytypes": { + + "b": "smallint", + + "c": "integer", + + "d": "bigint" + + } + + } + + ] + + } + { + + "change": [ + + ] + + } +(4 rows) + +-- DELETE: no pk +DELETE FROM table_without_pk WHERE b = 1; +-- DELETE: pk +DELETE FROM table_with_pk WHERE b = 1; +-- DELETE: unique +DELETE FROM table_with_unique WHERE b = 1; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'use-key-value-hash', '1'); +WARNING: table "table_without_pk" without primary key or replica identity is nothing +WARNING: table "table_with_unique" without primary key or replica identity is nothing + data +--------------------- + { + + "change": [+ + ] + + } + { + + "change": [+ + ] + + } +(2 rows) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + diff --git a/sql/key_value.sql b/sql/key_value.sql new file mode 100644 index 0000000..397e2fb --- /dev/null +++ b/sql/key_value.sql @@ -0,0 +1,101 @@ +\set VERBOSITY terse + +-- predictability +SET synchronous_commit = on; + +CREATE TABLE table_with_pk ( +a smallserial, +b smallint, +c int, +d bigint, +e numeric(5,3), +f real not null, +g double precision, +h char(10), +i varchar(30), +j text, +k bit varying(20), +l timestamp, +m date, +n boolean not null, +o json, +p tsvector, +PRIMARY KEY(b, c, d) +); + +CREATE TABLE table_without_pk ( +a smallserial, +b smallint, +c int, +d bigint, +e numeric(5,3), +f real not null, +g double precision, +h char(10), +i varchar(30), +j text, +k bit varying(20), +l timestamp, +m date, +n boolean not null, +o json, +p tsvector +); + +CREATE TABLE table_with_unique ( +a smallserial, +b smallint, +c int, +d bigint, +e numeric(5,3) not null, +f real not null, +g double precision not null, +h char(10), +i varchar(30), +j text, +k bit varying(20), +l timestamp, +m date, +n boolean not null, +o json, +p tsvector, +UNIQUE(g, n) +); + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + +-- INSERT +BEGIN; +INSERT INTO table_with_pk (b, c, d, e, f, g, h, i, j, k, l, m, n, o, p) VALUES(1, 2, 3, 3.54, 876.563452345, 1.23, 'teste', 'testando', 'um texto longo', B'001110010101010', '2013-11-02 17:30:52', '2013-02-04', true, '{ "a": 123 }', 'Old Old Parr'::tsvector); +INSERT INTO table_without_pk (b, c, d, e, f, g, h, i, j, k, l, m, n, o, p) VALUES(1, 2, 3, 3.54, 876.563452345, 1.23, 'teste', 'testando', 'um texto longo', B'001110010101010', '2013-11-02 17:30:52', '2013-02-04', true, '{ "a": 123 }', 'Old Old Parr'::tsvector); +INSERT INTO table_with_unique (b, c, d, e, f, g, h, i, j, k, l, m, n, o, p) VALUES(1, 2, 3, 3.54, 876.563452345, 1.23, 'teste', 'testando', 'um texto longo', B'001110010101010', '2013-11-02 17:30:52', '2013-02-04', true, '{ "a": 123 }', 'Old Old Parr'::tsvector); +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'use-key-value-hash', '1'); + +-- UPDATE +UPDATE table_without_pk SET f = -f WHERE b = 1; + +-- UPDATE: no pk change +UPDATE table_with_pk SET f = -f WHERE b = 1; + +-- UPDATE: pk change +UPDATE table_with_pk SET b = -b WHERE b = 1; + +-- UPDATE: unique +UPDATE table_with_unique SET n = false WHERE b = 1; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'use-key-value-hash', '1'); + +-- DELETE: no pk +DELETE FROM table_without_pk WHERE b = 1; + +-- DELETE: pk +DELETE FROM table_with_pk WHERE b = 1; + +-- DELETE: unique +DELETE FROM table_with_unique WHERE b = 1; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'use-key-value-hash', '1'); + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); diff --git a/wal2json.c b/wal2json.c index 35f32bc..4c896eb 100644 --- a/wal2json.c +++ b/wal2json.c @@ -30,6 +30,17 @@ PG_MODULE_MAGIC; + +/* + * Define a similar macro for older versions of Postgres + */ +#if (PG_VERSION_NUM >= 90600 && PG_VERSION_NUM < 90605) \ + || (PG_VERSION_NUM >= 90500 && PG_VERSION_NUM < 90509) || (PG_VERSION_NUM >= 90400 && PG_VERSION_NUM < 90414) + +#define TupleDescAttr(TUPLEDESC, IDX) TUPLEDESC->attrs[IDX] + +#endif + extern void _PG_init(void); extern void PGDLLEXPORT _PG_output_plugin_init(OutputPluginCallbacks *cb); @@ -43,6 +54,7 @@ typedef struct bool include_type_oids; /* include data type oids */ bool include_typmod; /* include typmod in types */ bool include_not_null; /* include not-null constraints */ + bool use_key_value_hash; /* Output in column:value format */ bool pretty_print; /* pretty-print JSON? */ bool write_in_chunks; /* write in chunks? */ @@ -149,6 +161,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is data->format_version = WAL2JSON_FORMAT_VERSION; + data->use_key_value_hash = false; + /* pretty print */ strcpy(data->ht, ""); strcpy(data->nl, ""); @@ -264,6 +278,21 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "use-key-value-hash") == 0) + { + if (elem->arg == NULL) + { + elog(DEBUG1, "use-key-value-hash is null"); + data->use_key_value_hash = false; + } + else if (!parse_bool(strVal(elem->arg), &data->use_key_value_hash)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value for \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } + } else if (strcmp(elem->defname, "pretty-print") == 0) { if (elem->arg == NULL) @@ -761,19 +790,335 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu pfree(colvalues.data); } +/* Parse tuple information into a hashmap + * + * replident: is this tuple a replica identity? + * hasreplident: does this tuple has an associated replica identity? + */ +static void +tuple_to_hashmap(LogicalDecodingContext *ctx, TupleDesc tupdesc, + HeapTuple tuple, TupleDesc indexdesc, bool replident, + bool hasreplident) +{ + + JsonDecodingData *data; + int natt; + StringInfoData coldata; + StringInfoData coltypedata; + StringInfoData coltypeoids; + StringInfoData colnotnulls; + + char comma[3] = ""; + + data = ctx->output_plugin_private; + initStringInfo(&coldata); + initStringInfo(&coltypedata); + + if (data->include_type_oids) + initStringInfo(&coltypeoids); + if(data->include_not_null) + initStringInfo(&colnotnulls); + + /* If replident is true, output replica identify information */ + if (replident) + { + appendStringInfo( + &coldata, "%s%s%s\"oldkeys\":%s{", + data->ht, data->ht, data->ht, data->sp); + appendStringInfo( + &coltypedata, "%s%s%s\"keytypes\":%s{", + data->ht, data->ht, data->ht, data->sp); + + if (data->include_type_oids) + appendStringInfo( + &coltypeoids, "%s%s%s\"keytypeoids\":%s{", + data->ht, data->ht, data->ht, data->sp); + } + else + { + appendStringInfo( + &coldata, "%s%s%s\"changes\":%s{", + data->ht, data->ht, data->ht, data->sp); + appendStringInfo( + &coltypedata, "%s%s%s\"columntypes\":%s{", + data->ht, data->ht, data->ht, data->sp); + + if (data->include_type_oids) + appendStringInfo( + &coltypeoids, "%s%s%s\"coltypeoids\":%s{", + data->ht, data->ht, data->ht, data->sp); + + if (data->include_not_null) + appendStringInfo( + &colnotnulls, "%s%s%s\"columnoptionals\":%s{", + data->ht, data->ht, data->ht, data->sp); + } + + /* Render column information */ + for (natt = 0; natt < tupdesc->natts; natt++) + { + Form_pg_attribute attr; /* the attribute itself */ + Oid typid; /* type of current attribute */ + HeapTuple type_tuple; /* information about a type */ + Oid typoutput; /* output function */ + bool typisvarlena; + Datum origval; /* possibly toasted Datum */ + Datum val; /* definitely detoasted Datum */ + char *outputstr = NULL; + bool isnull; /* column is null? */ + + attr = TupleDescAttr(tupdesc, natt); + + elog(DEBUG1, "attribute \"%s\" (%d/%d)", NameStr(attr->attname), natt, tupdesc->natts); + + /* Do not print dropped or system columns */ + if (attr->attisdropped || attr->attnum < 0) + continue; + + /* Search indexed columns in whole heap tuple */ + if (indexdesc != NULL) + { + int j; + bool found_col = false; + + for (j = 0; j < indexdesc->natts; j++) + { + Form_pg_attribute iattr; + + iattr = TupleDescAttr(indexdesc, j); + + if (strcmp(NameStr(attr->attname), NameStr(iattr->attname)) == 0) + { + found_col = true; + } + } + if (!found_col) + { + continue; + } + } + + /* Get Datum from the tuple */ + origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull); + if (isnull && replident) + continue; + + typid = attr->atttypid; + + /* Determine the type name */ + type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid)); + if (!HeapTupleIsValid(type_tuple)) + { + elog(ERROR, "cache lookup failed for type %u", typid); + } + + /* Get information needed for printing values of a type */ + getTypeOutputInfo(typid, &typoutput, &typisvarlena); + + /* If the TOAST is unchanged, do not output */ + if (!isnull && typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) + { + elog(DEBUG1, "column \"%s\" has an unchanged TOAST", NameStr(attr->attname)); + continue; + } + + /* Begin bulding out the hashmaps for display */ + appendStringInfo( + &coldata, "%s%s%s%s%s%s", + comma, data->nl, data->ht, data->ht, data->ht, data->ht); + escape_json(&coldata, NameStr(attr->attname)); + appendStringInfo(&coldata, ":%s", data->sp); + + if (data->include_types) + { + appendStringInfo( + &coltypedata, "%s%s%s%s%s%s", + comma, data->nl, data->ht, data->ht, data->ht, data->ht); + escape_json(&coltypedata, NameStr(attr->attname)); + appendStringInfo(&coltypedata, ":%s", data->sp); + + if (data->include_typmod) + { + char *type_str; + + type_str = TextDatumGetCString(DirectFunctionCall2( + format_type, + attr->atttypid, + attr->atttypmod)); + escape_json(&coltypedata, type_str); + pfree(type_str); + } + else + { + Form_pg_type type_form = (Form_pg_type) GETSTRUCT(type_tuple); + escape_json(&coltypedata, NameStr(type_form->typname)); + } + + if (data->include_type_oids) + { + appendStringInfo( + &coltypeoids, "%s%s%s%s%s%s", + comma, data->nl, data->ht, data->ht, data->ht, data->ht); + escape_json(&coltypeoids, NameStr(attr->attname)); + appendStringInfo(&coltypeoids, ":%s%u", data->sp, typid); + } + + if (!replident && data->include_not_null) + { + + appendStringInfo( + &colnotnulls, "%s%s%s%s%s%s", + comma, data->nl, data->ht, data->ht, data->ht, data->ht); + escape_json(&colnotnulls, NameStr(attr->attname)); + appendStringInfo(&colnotnulls, ":%s", data->sp); + + if (attr->attnotnull) + appendStringInfo(&colnotnulls, "false"); + else + appendStringInfo(&colnotnulls, "true"); + + } + + } + + ReleaseSysCache(type_tuple); + if (isnull) + { + appendStringInfo(&coldata, "null"); + if (strcmp(comma, "") == 0) + snprintf(comma, 3, ",%s", data->sp); + continue; + } + + if (typisvarlena) + val = PointerGetDatum(PG_DETOAST_DATUM(origval)); + else + val = origval; + + /* Finally got the value */ + outputstr = OidOutputFunctionCall(typoutput, val); + + /* + * Data types are printed with quotes unless they are number, true, + * false, null, an array or an object. + * + * The NaN and Infinity are not valid JSON symbols. Hence, + * regardless of sign they are represented as the string null. + */ + switch (typid) + { + case INT2OID: + case INT4OID: + case INT8OID: + case OIDOID: + case FLOAT4OID: + case FLOAT8OID: + case NUMERICOID: + if (pg_strncasecmp(outputstr, "NaN", 3) == 0 || + pg_strncasecmp(outputstr, "Infinity", 8) == 0 || + pg_strncasecmp(outputstr, "-Infinity", 9) == 0) + { + appendStringInfo(&coldata, "null"); + elog(DEBUG1, "attribute \"%s\" is special: %s", + NameStr(attr->attname), outputstr); + } + else if (strspn(outputstr, "0123456789+-eE.") == strlen(outputstr)) + appendStringInfo(&coldata, "%s", outputstr); + else + elog(ERROR, "%s is not a number", outputstr); + break; + case BOOLOID: + if (strcmp(outputstr, "t") == 0) + appendStringInfo(&coldata, "true"); + else + appendStringInfo(&coldata, "false"); + break; + case BYTEAOID: + /* string is "\x54617069727573", start after "\x" */ + escape_json(&coldata, (outputstr + 2)); + break; + default: + escape_json(&coldata, outputstr); + break; + } + + /* The first column does not have comma */ + if (strcmp(comma, "") == 0) + snprintf(comma, 3, ",%s", data->sp); + } + + appendStringInfo( + &coldata, "%s%s%s%s}", data->nl, data->ht, data->ht, data->ht); + + if (data->include_types) + { + appendStringInfo(&coldata, "%s%s", comma, data->nl); + appendStringInfo( + &coltypedata, "%s%s%s%s}", data->nl, data->ht, data->ht, data->ht); + } + + if (data->include_type_oids) + { + appendStringInfo(&coltypedata, "%s%s", comma, data->nl); + appendStringInfo( + &coltypeoids, "%s%s%s%s}", data->nl, data->ht, data->ht, data->ht); + } + + if (!replident && data->include_not_null) + { + appendStringInfo(&coltypeoids, "%s%s", comma, data->nl); + appendStringInfo( + &colnotnulls, "%s%s%s%s}", data->nl, data->ht, data->ht, data->ht); + } + + appendStringInfoString(ctx->out, coldata.data); + if (data->include_types) + appendStringInfoString(ctx->out, coltypedata.data); + if (data->include_type_oids) + appendStringInfoString(ctx->out, coltypeoids.data); + if (!replident && data->include_not_null) + appendStringInfoString(ctx->out, colnotnulls.data); + + if (hasreplident) + appendStringInfoString(ctx->out, ","); + + appendStringInfoString(ctx->out, data->nl); + + pfree(coldata.data); + pfree(coltypedata.data); + + if (data->include_type_oids) + pfree(coltypeoids.data); + if (data->include_not_null) + pfree(colnotnulls.data); +} + /* Print columns information */ static void columns_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, bool hasreplident) { - tuple_to_stringinfo(ctx, tupdesc, tuple, NULL, false, hasreplident); + JsonDecodingData *data; + data = ctx->output_plugin_private; + + if (!data->use_key_value_hash) + tuple_to_stringinfo(ctx, tupdesc, tuple, NULL, false, hasreplident); + else + tuple_to_hashmap(ctx, tupdesc, tuple, NULL, false, hasreplident); } /* Print replica identity information */ static void identity_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc) { - /* Last parameter does not matter */ - tuple_to_stringinfo(ctx, tupdesc, tuple, indexdesc, true, false); + JsonDecodingData *data; + data = ctx->output_plugin_private; + + /* Last parameter does not matter */ + if (!data->use_key_value_hash) + tuple_to_stringinfo(ctx, tupdesc, tuple, indexdesc, true, false); + else + tuple_to_hashmap(ctx, tupdesc, tuple, indexdesc, true, false); + } /* Callback for individual changed tuples */