mirror of
https://github.com/Stichting-MINIX-Research-Foundation/netbsd.git
synced 2025-08-10 06:29:51 -04:00
1016 lines
22 KiB
C
1016 lines
22 KiB
C
/* $NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $ */
|
|
|
|
/*-
|
|
* Copyright (c)2010,2011 YAMAMOTO Takashi,
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions
|
|
* are met:
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
|
|
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
|
|
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
|
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
|
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
|
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
|
* SUCH DAMAGE.
|
|
*/
|
|
|
|
/*
|
|
* backend db operations
|
|
*/
|
|
|
|
#include <sys/cdefs.h>
|
|
#ifndef lint
|
|
__RCSID("$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $");
|
|
#endif /* not lint */
|
|
|
|
#include <assert.h>
|
|
#include <err.h>
|
|
#include <errno.h>
|
|
#include <inttypes.h>
|
|
#include <puffs.h>
|
|
#include <stdbool.h>
|
|
#include <stdarg.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <util.h>
|
|
|
|
#include <libpq-fe.h>
|
|
|
|
#include "pgfs_db.h"
|
|
#include "pgfs_waitq.h"
|
|
#include "pgfs_debug.h"
|
|
|
|
bool pgfs_dosync = false;
|
|
|
|
struct Xconn {
|
|
TAILQ_ENTRY(Xconn) list;
|
|
PGconn *conn;
|
|
struct puffs_cc *blocker;
|
|
struct puffs_cc *owner;
|
|
bool in_trans;
|
|
int id;
|
|
const char *label;
|
|
};
|
|
|
|
static void
|
|
dumperror(struct Xconn *xc, const PGresult *res)
|
|
{
|
|
static const struct {
|
|
const char *name;
|
|
int code;
|
|
} fields[] = {
|
|
#define F(x) { .name = #x, .code = x, }
|
|
F(PG_DIAG_SEVERITY),
|
|
F(PG_DIAG_SQLSTATE),
|
|
F(PG_DIAG_MESSAGE_PRIMARY),
|
|
F(PG_DIAG_MESSAGE_DETAIL),
|
|
F(PG_DIAG_MESSAGE_HINT),
|
|
F(PG_DIAG_STATEMENT_POSITION),
|
|
F(PG_DIAG_INTERNAL_POSITION),
|
|
F(PG_DIAG_INTERNAL_QUERY),
|
|
F(PG_DIAG_CONTEXT),
|
|
F(PG_DIAG_SOURCE_FILE),
|
|
F(PG_DIAG_SOURCE_LINE),
|
|
F(PG_DIAG_SOURCE_FUNCTION),
|
|
#undef F
|
|
};
|
|
unsigned int i;
|
|
|
|
if (!pgfs_dodprintf) {
|
|
return;
|
|
}
|
|
assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
|
|
PQresultStatus(res) == PGRES_FATAL_ERROR);
|
|
for (i = 0; i < __arraycount(fields); i++) {
|
|
const char *val = PQresultErrorField(res, fields[i].code);
|
|
|
|
if (val == NULL) {
|
|
continue;
|
|
}
|
|
fprintf(stderr, "%s: %s\n", fields[i].name, val);
|
|
}
|
|
}
|
|
|
|
TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
|
|
struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
|
|
|
|
static struct Xconn *
|
|
getxc(struct puffs_cc *cc)
|
|
{
|
|
struct Xconn *xc;
|
|
|
|
assert(cc != NULL);
|
|
retry:
|
|
TAILQ_FOREACH(xc, &xclist, list) {
|
|
if (xc->blocker == NULL) {
|
|
assert(xc->owner == NULL);
|
|
xc->owner = cc;
|
|
DPRINTF("xc %p acquire %p\n", xc, cc);
|
|
return xc;
|
|
} else {
|
|
assert(xc->owner == xc->blocker);
|
|
}
|
|
}
|
|
DPRINTF("no free conn %p\n", cc);
|
|
waiton(&xcwaitq, cc);
|
|
goto retry;
|
|
}
|
|
|
|
static void
|
|
relxc(struct Xconn *xc)
|
|
{
|
|
|
|
assert(xc->in_trans);
|
|
assert(xc->owner != NULL);
|
|
xc->in_trans = false;
|
|
xc->owner = NULL;
|
|
wakeup_one(&xcwaitq);
|
|
}
|
|
|
|
static void
|
|
pqwait(struct Xconn *xc)
|
|
{
|
|
PGconn *conn = xc->conn;
|
|
struct puffs_cc *cc = xc->owner;
|
|
|
|
if (PQflush(conn)) {
|
|
errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
|
|
}
|
|
if (!PQisBusy(conn)) {
|
|
return;
|
|
}
|
|
assert(xc->blocker == NULL);
|
|
xc->blocker = cc;
|
|
DPRINTF("yielding %p\n", cc);
|
|
/* XXX is it safe to yield before entering mainloop? */
|
|
puffs_cc_yield(cc);
|
|
DPRINTF("yield returned %p\n", cc);
|
|
assert(xc->owner == cc);
|
|
assert(xc->blocker == cc);
|
|
xc->blocker = NULL;
|
|
}
|
|
|
|
static int
|
|
sqltoerrno(const char *sqlstate)
|
|
{
|
|
/*
|
|
* XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
|
|
* "tuple concurrently updated" errors for lowrite/lo_truncate.
|
|
*
|
|
* XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
|
|
*/
|
|
static const struct {
|
|
char sqlstate[5];
|
|
int error;
|
|
} map[] = {
|
|
{ "00000", 0, }, /* ERRCODE_SUCCESSFUL_COMPLETION */
|
|
{ "02000", ENOENT, }, /* ERRCODE_NO_DATA */
|
|
{ "23505", EEXIST, }, /* ERRCODE_UNIQUE_VIOLATION */
|
|
{ "23514", EINVAL, }, /* ERRCODE_CHECK_VIOLATION */
|
|
{ "40001", EAGAIN, }, /* ERRCODE_T_R_SERIALIZATION_FAILURE */
|
|
{ "40P01", EAGAIN, }, /* ERRCODE_T_R_DEADLOCK_DETECTED */
|
|
{ "42704", ENOENT, }, /* ERRCODE_UNDEFINED_OBJECT */
|
|
{ "53100", ENOSPC, }, /* ERRCODE_DISK_FULL */
|
|
{ "53200", ENOMEM, }, /* ERRCODE_OUT_OF_MEMORY */
|
|
{ "XX000", EAGAIN, }, /* ERRCODE_INTERNAL_ERROR */
|
|
};
|
|
unsigned int i;
|
|
|
|
for (i = 0; i < __arraycount(map); i++) {
|
|
if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
|
|
const int error = map[i].error;
|
|
|
|
if (error != 0) {
|
|
DPRINTF("sqlstate %5s mapped to error %d\n",
|
|
sqlstate, error);
|
|
}
|
|
if (error == EINVAL) {
|
|
/*
|
|
* sounds like a bug.
|
|
*/
|
|
abort();
|
|
}
|
|
return error;
|
|
}
|
|
}
|
|
DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
|
|
return EIO;
|
|
}
|
|
|
|
struct cmd {
|
|
char name[32]; /* name of prepared statement */
|
|
char *cmd; /* query string */
|
|
unsigned int nparams;
|
|
Oid *paramtypes;
|
|
uint32_t prepared_mask; /* for which connections this is prepared? */
|
|
unsigned int flags; /* CMD_ flags */
|
|
};
|
|
|
|
#define CMD_NOPREPARE 1 /* don't prepare this command */
|
|
|
|
struct cmd *
|
|
createcmd(const char *cmd, unsigned int flags, ...)
|
|
{
|
|
struct cmd *c;
|
|
va_list ap;
|
|
const char *cp;
|
|
unsigned int i;
|
|
static unsigned int cmdid;
|
|
|
|
c = emalloc(sizeof(*c));
|
|
c->cmd = estrdup(cmd);
|
|
c->nparams = 0;
|
|
va_start(ap, flags);
|
|
for (cp = cmd; *cp != 0; cp++) {
|
|
if (*cp == '$') { /* XXX */
|
|
c->nparams++;
|
|
}
|
|
}
|
|
c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
|
|
for (i = 0; i < c->nparams; i++) {
|
|
Oid type = va_arg(ap, Oid);
|
|
assert(type == BYTEA ||
|
|
type == INT4OID || type == INT8OID || type == OIDOID ||
|
|
type == TEXTOID || type == TIMESTAMPTZOID);
|
|
c->paramtypes[i] = type;
|
|
}
|
|
va_end(ap);
|
|
snprintf(c->name, sizeof(c->name), "%u", cmdid++);
|
|
if ((flags & CMD_NOPREPARE) != 0) {
|
|
c->prepared_mask = ~0;
|
|
} else {
|
|
c->prepared_mask = 0;
|
|
}
|
|
c->flags = flags;
|
|
return c;
|
|
}
|
|
|
|
static void
|
|
freecmd(struct cmd *c)
|
|
{
|
|
|
|
free(c->paramtypes);
|
|
free(c->cmd);
|
|
free(c);
|
|
}
|
|
|
|
static int
|
|
fetch_noresult(struct Xconn *xc)
|
|
{
|
|
PGresult *res;
|
|
ExecStatusType status;
|
|
PGconn *conn = xc->conn;
|
|
int error;
|
|
|
|
pqwait(xc);
|
|
res = PQgetResult(conn);
|
|
if (res == NULL) {
|
|
return ENOENT;
|
|
}
|
|
status = PQresultStatus(res);
|
|
if (status == PGRES_COMMAND_OK) {
|
|
assert(PQnfields(res) == 0);
|
|
assert(PQntuples(res) == 0);
|
|
if (!strcmp(PQcmdTuples(res), "0")) {
|
|
error = ENOENT;
|
|
} else {
|
|
error = 0;
|
|
}
|
|
} else if (status == PGRES_FATAL_ERROR) {
|
|
error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
|
|
assert(error != 0);
|
|
dumperror(xc, res);
|
|
} else {
|
|
errx(1, "%s not command_ok: %d: %s", __func__,
|
|
(int)status,
|
|
PQerrorMessage(conn));
|
|
}
|
|
PQclear(res);
|
|
res = PQgetResult(conn);
|
|
assert(res == NULL);
|
|
if (error != 0) {
|
|
DPRINTF("error %d\n", error);
|
|
}
|
|
return error;
|
|
}
|
|
|
|
static int
|
|
preparecmd(struct Xconn *xc, struct cmd *c)
|
|
{
|
|
PGconn *conn = xc->conn;
|
|
const uint32_t mask = 1 << xc->id;
|
|
int error;
|
|
int ret;
|
|
|
|
if ((c->prepared_mask & mask) != 0) {
|
|
return 0;
|
|
}
|
|
assert((c->flags & CMD_NOPREPARE) == 0);
|
|
DPRINTF("PREPARE: '%s'\n", c->cmd);
|
|
ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
|
|
if (!ret) {
|
|
errx(EXIT_FAILURE, "PQsendPrepare: %s",
|
|
PQerrorMessage(conn));
|
|
}
|
|
error = fetch_noresult(xc);
|
|
if (error != 0) {
|
|
return error;
|
|
}
|
|
c->prepared_mask |= mask;
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* vsendcmd:
|
|
*
|
|
* resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
|
|
* 0 for text and 1 for binary.
|
|
*/
|
|
|
|
static int
|
|
vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
|
|
{
|
|
PGconn *conn = xc->conn;
|
|
char **paramvalues;
|
|
int *paramlengths;
|
|
int *paramformats;
|
|
unsigned int i;
|
|
int error;
|
|
int ret;
|
|
|
|
assert(xc->owner != NULL);
|
|
assert(xc->blocker == NULL);
|
|
error = preparecmd(xc, c);
|
|
if (error != 0) {
|
|
return error;
|
|
}
|
|
paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
|
|
paramlengths = NULL;
|
|
paramformats = NULL;
|
|
DPRINTF("CMD: '%s'\n", c->cmd);
|
|
for (i = 0; i < c->nparams; i++) {
|
|
Oid type = c->paramtypes[i];
|
|
char tmpstore[1024];
|
|
const char *buf = NULL;
|
|
intmax_t v = 0; /* XXXgcc */
|
|
int sz;
|
|
bool binary = false;
|
|
|
|
switch (type) {
|
|
case BYTEA:
|
|
buf = va_arg(ap, const void *);
|
|
sz = (int)va_arg(ap, size_t);
|
|
binary = true;
|
|
break;
|
|
case INT8OID:
|
|
case OIDOID:
|
|
case INT4OID:
|
|
switch (type) {
|
|
case INT8OID:
|
|
v = (intmax_t)va_arg(ap, int64_t);
|
|
break;
|
|
case OIDOID:
|
|
v = (intmax_t)va_arg(ap, Oid);
|
|
break;
|
|
case INT4OID:
|
|
v = (intmax_t)va_arg(ap, int32_t);
|
|
break;
|
|
default:
|
|
errx(EXIT_FAILURE, "unknown integer oid %u",
|
|
type);
|
|
}
|
|
buf = tmpstore;
|
|
sz = snprintf(tmpstore, sizeof(tmpstore),
|
|
"%jd", v);
|
|
assert(sz != -1);
|
|
assert((size_t)sz < sizeof(tmpstore));
|
|
sz += 1;
|
|
break;
|
|
case TEXTOID:
|
|
case TIMESTAMPTZOID:
|
|
buf = va_arg(ap, char *);
|
|
sz = strlen(buf) + 1;
|
|
break;
|
|
default:
|
|
errx(EXIT_FAILURE, "%s: unknown param type %u",
|
|
__func__, type);
|
|
}
|
|
if (binary) {
|
|
if (paramlengths == NULL) {
|
|
paramlengths =
|
|
emalloc(c->nparams * sizeof(*paramformats));
|
|
}
|
|
if (paramformats == NULL) {
|
|
paramformats = ecalloc(1,
|
|
c->nparams * sizeof(*paramformats));
|
|
}
|
|
paramformats[i] = 1;
|
|
paramlengths[i] = sz;
|
|
}
|
|
paramvalues[i] = emalloc(sz);
|
|
memcpy(paramvalues[i], buf, sz);
|
|
if (binary) {
|
|
DPRINTF("\t[%u]=<BINARY>\n", i);
|
|
} else {
|
|
DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
|
|
}
|
|
}
|
|
if ((c->flags & CMD_NOPREPARE) != 0) {
|
|
ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
|
|
(const char * const *)paramvalues, paramlengths,
|
|
paramformats, resultmode);
|
|
} else {
|
|
ret = PQsendQueryPrepared(conn, c->name, c->nparams,
|
|
(const char * const *)paramvalues, paramlengths,
|
|
paramformats, resultmode);
|
|
}
|
|
for (i = 0; i < c->nparams; i++) {
|
|
free(paramvalues[i]);
|
|
}
|
|
free(paramvalues);
|
|
free(paramlengths);
|
|
free(paramformats);
|
|
if (!ret) {
|
|
errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
|
|
PQerrorMessage(conn));
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
sendcmd(struct Xconn *xc, struct cmd *c, ...)
|
|
{
|
|
va_list ap;
|
|
int error;
|
|
|
|
va_start(ap, c);
|
|
error = vsendcmd(xc, 0, c, ap);
|
|
va_end(ap);
|
|
return error;
|
|
}
|
|
|
|
int
|
|
sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
|
|
{
|
|
va_list ap;
|
|
int error;
|
|
|
|
va_start(ap, c);
|
|
error = vsendcmd(xc, resultmode, c, ap);
|
|
va_end(ap);
|
|
return error;
|
|
}
|
|
|
|
/*
|
|
* simplecmd: a convenient routine to execute a command which returns
|
|
* no rows synchronously.
|
|
*/
|
|
|
|
int
|
|
simplecmd(struct Xconn *xc, struct cmd *c, ...)
|
|
{
|
|
va_list ap;
|
|
int error;
|
|
|
|
va_start(ap, c);
|
|
error = vsendcmd(xc, 0, c, ap);
|
|
va_end(ap);
|
|
if (error != 0) {
|
|
return error;
|
|
}
|
|
return fetch_noresult(xc);
|
|
}
|
|
|
|
void
|
|
fetchinit(struct fetchstatus *s, struct Xconn *xc)
|
|
{
|
|
s->xc = xc;
|
|
s->res = NULL;
|
|
s->cur = 0;
|
|
s->nrows = 0;
|
|
s->done = false;
|
|
}
|
|
|
|
static intmax_t
|
|
getint(const char *str)
|
|
{
|
|
intmax_t i;
|
|
char *ep;
|
|
|
|
errno = 0;
|
|
i = strtoimax(str, &ep, 10);
|
|
assert(errno == 0);
|
|
assert(str[0] != 0);
|
|
assert(*ep == 0);
|
|
return i;
|
|
}
|
|
|
|
static int
|
|
vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
|
|
{
|
|
PGconn *conn = s->xc->conn;
|
|
unsigned int i;
|
|
|
|
assert(conn != NULL);
|
|
if (s->res == NULL) {
|
|
ExecStatusType status;
|
|
int error;
|
|
|
|
pqwait(s->xc);
|
|
s->res = PQgetResult(conn);
|
|
if (s->res == NULL) {
|
|
s->done = true;
|
|
return ENOENT;
|
|
}
|
|
status = PQresultStatus(s->res);
|
|
if (status == PGRES_FATAL_ERROR) {
|
|
error = sqltoerrno(
|
|
PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
|
|
assert(error != 0);
|
|
dumperror(s->xc, s->res);
|
|
return error;
|
|
}
|
|
if (status != PGRES_TUPLES_OK) {
|
|
errx(1, "not tuples_ok: %s",
|
|
PQerrorMessage(conn));
|
|
}
|
|
assert((unsigned int)PQnfields(s->res) == n);
|
|
s->nrows = PQntuples(s->res);
|
|
if (s->nrows == 0) {
|
|
DPRINTF("no rows\n");
|
|
return ENOENT;
|
|
}
|
|
assert(s->nrows >= 1);
|
|
s->cur = 0;
|
|
}
|
|
for (i = 0; i < n; i++) {
|
|
size_t size;
|
|
|
|
assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
|
|
DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
|
|
i, PQftype(s->res, i), types[i],
|
|
PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
|
|
PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
|
|
"<BINARY>");
|
|
assert(PQftype(s->res, i) == types[i]);
|
|
assert(!PQgetisnull(s->res, s->cur, i));
|
|
switch(types[i]) {
|
|
case INT8OID:
|
|
*va_arg(ap, int64_t *) =
|
|
getint(PQgetvalue(s->res, s->cur, i));
|
|
break;
|
|
case OIDOID:
|
|
*va_arg(ap, Oid *) =
|
|
getint(PQgetvalue(s->res, s->cur, i));
|
|
break;
|
|
case INT4OID:
|
|
*va_arg(ap, int32_t *) =
|
|
getint(PQgetvalue(s->res, s->cur, i));
|
|
break;
|
|
case TEXTOID:
|
|
*va_arg(ap, char **) =
|
|
estrdup(PQgetvalue(s->res, s->cur, i));
|
|
break;
|
|
case BYTEA:
|
|
size = PQgetlength(s->res, s->cur, i);
|
|
memcpy(va_arg(ap, void *),
|
|
PQgetvalue(s->res, s->cur, i), size);
|
|
*va_arg(ap, size_t *) = size;
|
|
break;
|
|
default:
|
|
errx(EXIT_FAILURE, "%s unknown type %u", __func__,
|
|
types[i]);
|
|
}
|
|
}
|
|
s->cur++;
|
|
if (s->cur == s->nrows) {
|
|
PQclear(s->res);
|
|
s->res = NULL;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
|
|
{
|
|
va_list ap;
|
|
int error;
|
|
|
|
va_start(ap, types);
|
|
error = vfetchnext(s, n, types, ap);
|
|
va_end(ap);
|
|
return error;
|
|
}
|
|
|
|
void
|
|
fetchdone(struct fetchstatus *s)
|
|
{
|
|
|
|
if (s->res != NULL) {
|
|
PQclear(s->res);
|
|
s->res = NULL;
|
|
}
|
|
if (!s->done) {
|
|
PGresult *res;
|
|
unsigned int n;
|
|
|
|
n = 0;
|
|
while ((res = PQgetResult(s->xc->conn)) != NULL) {
|
|
PQclear(res);
|
|
n++;
|
|
}
|
|
if (n > 0) {
|
|
DPRINTF("%u rows dropped\n", n);
|
|
}
|
|
}
|
|
}
|
|
|
|
int
|
|
simplefetch(struct Xconn *xc, Oid type, ...)
|
|
{
|
|
struct fetchstatus s;
|
|
va_list ap;
|
|
int error;
|
|
|
|
fetchinit(&s, xc);
|
|
va_start(ap, type);
|
|
error = vfetchnext(&s, 1, &type, ap);
|
|
va_end(ap);
|
|
assert(error != 0 || s.res == NULL);
|
|
fetchdone(&s);
|
|
return error;
|
|
}
|
|
|
|
/*
|
|
* setlabel: set the descriptive label for the connection.
|
|
*
|
|
* we use simple pointer comparison for label equality check.
|
|
*/
|
|
static void
|
|
setlabel(struct Xconn *xc, const char *label)
|
|
{
|
|
int error;
|
|
|
|
/*
|
|
* put the label into application_name so that it's shown in
|
|
* pg_stat_activity. we are sure that our labels don't need
|
|
* PQescapeStringConn.
|
|
*
|
|
* example:
|
|
* SELECT pid,application_name,query FROM pg_stat_activity
|
|
* WHERE state <> 'idle'
|
|
*/
|
|
|
|
if (label != NULL && label != xc->label) {
|
|
struct cmd *c;
|
|
char cmd_str[1024];
|
|
|
|
snprintf(cmd_str, sizeof(cmd_str),
|
|
"SET application_name TO 'pgfs: %s'", label);
|
|
c = createcmd(cmd_str, CMD_NOPREPARE);
|
|
error = simplecmd(xc, c);
|
|
freecmd(c);
|
|
assert(error == 0);
|
|
xc->label = label;
|
|
} else {
|
|
#if 0 /* don't bother to clear label */
|
|
static struct cmd *c;
|
|
|
|
CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
|
|
error = simplecmd(xc, c);
|
|
assert(error == 0);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
struct Xconn *
|
|
begin(struct puffs_usermount *pu, const char *label)
|
|
{
|
|
struct Xconn *xc = getxc(puffs_cc_getcc(pu));
|
|
static struct cmd *c;
|
|
int error;
|
|
|
|
setlabel(xc, label);
|
|
CREATECMD_NOPARAM(c, "BEGIN");
|
|
assert(!xc->in_trans);
|
|
error = simplecmd(xc, c);
|
|
assert(error == 0);
|
|
assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
|
|
xc->in_trans = true;
|
|
return xc;
|
|
}
|
|
|
|
struct Xconn *
|
|
begin_readonly(struct puffs_usermount *pu, const char *label)
|
|
{
|
|
struct Xconn *xc = getxc(puffs_cc_getcc(pu));
|
|
static struct cmd *c;
|
|
int error;
|
|
|
|
setlabel(xc, label);
|
|
CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
|
|
assert(!xc->in_trans);
|
|
error = simplecmd(xc, c);
|
|
assert(error == 0);
|
|
assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
|
|
xc->in_trans = true;
|
|
return xc;
|
|
}
|
|
|
|
void
|
|
rollback(struct Xconn *xc)
|
|
{
|
|
PGTransactionStatusType status;
|
|
|
|
/*
|
|
* check the status as we are not sure the status of our transaction
|
|
* after a failed commit.
|
|
*/
|
|
status = PQtransactionStatus(xc->conn);
|
|
assert(status != PQTRANS_ACTIVE);
|
|
assert(status != PQTRANS_UNKNOWN);
|
|
if (status != PQTRANS_IDLE) {
|
|
static struct cmd *c;
|
|
int error;
|
|
|
|
assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
|
|
CREATECMD_NOPARAM(c, "ROLLBACK");
|
|
error = simplecmd(xc, c);
|
|
assert(error == 0);
|
|
}
|
|
DPRINTF("xc %p rollback %p\n", xc, xc->owner);
|
|
setlabel(xc, NULL);
|
|
relxc(xc);
|
|
}
|
|
|
|
int
|
|
commit(struct Xconn *xc)
|
|
{
|
|
static struct cmd *c;
|
|
int error;
|
|
|
|
CREATECMD_NOPARAM(c, "COMMIT");
|
|
error = simplecmd(xc, c);
|
|
setlabel(xc, NULL);
|
|
if (error == 0) {
|
|
DPRINTF("xc %p commit %p\n", xc, xc->owner);
|
|
relxc(xc);
|
|
}
|
|
return error;
|
|
}
|
|
|
|
int
|
|
commit_sync(struct Xconn *xc)
|
|
{
|
|
static struct cmd *c;
|
|
int error;
|
|
|
|
assert(!pgfs_dosync);
|
|
CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
|
|
error = simplecmd(xc, c);
|
|
assert(error == 0);
|
|
return commit(xc);
|
|
}
|
|
|
|
static void
|
|
pgfs_notice_receiver(void *vp, const PGresult *res)
|
|
{
|
|
struct Xconn *xc = vp;
|
|
|
|
assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
|
|
fprintf(stderr, "got a notice on %p\n", xc);
|
|
dumperror(xc, res);
|
|
}
|
|
|
|
static int
|
|
pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
|
|
int fd, int *done)
|
|
{
|
|
struct Xconn *xc;
|
|
PGconn *conn;
|
|
|
|
TAILQ_FOREACH(xc, &xclist, list) {
|
|
if (PQsocket(xc->conn) == fd) {
|
|
break;
|
|
}
|
|
}
|
|
assert(xc != NULL);
|
|
conn = xc->conn;
|
|
PQconsumeInput(conn);
|
|
if (!PQisBusy(conn)) {
|
|
if (xc->blocker != NULL) {
|
|
DPRINTF("schedule %p\n", xc->blocker);
|
|
puffs_cc_schedule(xc->blocker);
|
|
} else {
|
|
DPRINTF("no blockers\n");
|
|
}
|
|
}
|
|
*done = 0;
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
|
|
const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
|
|
{
|
|
const char *keywords[3+1];
|
|
const char *values[3];
|
|
unsigned int i;
|
|
|
|
if (nconn > 32) {
|
|
/*
|
|
* limit from sizeof(cmd->prepared_mask)
|
|
*/
|
|
return EINVAL;
|
|
}
|
|
if (debug) {
|
|
pgfs_dodprintf = true;
|
|
}
|
|
if (synchronous) {
|
|
pgfs_dosync = true;
|
|
}
|
|
i = 0;
|
|
if (dbname != NULL) {
|
|
keywords[i] = "dbname";
|
|
values[i] = dbname;
|
|
i++;
|
|
}
|
|
if (dbuser != NULL) {
|
|
keywords[i] = "user";
|
|
values[i] = dbuser;
|
|
i++;
|
|
}
|
|
keywords[i] = "application_name";
|
|
values[i] = "pgfs";
|
|
i++;
|
|
keywords[i] = NULL;
|
|
puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
|
|
for (i = 0; i < nconn; i++) {
|
|
struct Xconn *xc;
|
|
struct Xconn *xc2;
|
|
static int xcid;
|
|
PGconn *conn;
|
|
struct cmd *c;
|
|
int error;
|
|
|
|
conn = PQconnectdbParams(keywords, values, 0);
|
|
if (conn == NULL) {
|
|
errx(EXIT_FAILURE,
|
|
"PQconnectdbParams: unknown failure");
|
|
}
|
|
if (PQstatus(conn) != CONNECTION_OK) {
|
|
/*
|
|
* XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
|
|
*/
|
|
errx(EXIT_FAILURE, "PQconnectdbParams: %s",
|
|
PQerrorMessage(conn));
|
|
}
|
|
DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
|
|
puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
|
|
xc = emalloc(sizeof(*xc));
|
|
xc->conn = conn;
|
|
xc->blocker = NULL;
|
|
xc->owner = NULL;
|
|
xc->in_trans = false;
|
|
xc->id = xcid++;
|
|
xc->label = NULL;
|
|
assert(xc->id < 32);
|
|
PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
|
|
TAILQ_INSERT_HEAD(&xclist, xc, list);
|
|
xc2 = begin(pu, NULL);
|
|
assert(xc2 == xc);
|
|
c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
|
|
error = simplecmd(xc, c);
|
|
assert(error == 0);
|
|
freecmd(c);
|
|
c = createcmd("SET SESSION CHARACTERISTICS AS "
|
|
"TRANSACTION ISOLATION LEVEL REPEATABLE READ",
|
|
CMD_NOPREPARE);
|
|
error = simplecmd(xc, c);
|
|
assert(error == 0);
|
|
freecmd(c);
|
|
c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
|
|
error = simplecmd(xc, c);
|
|
assert(error == 0);
|
|
freecmd(c);
|
|
if (!pgfs_dosync) {
|
|
c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
|
|
CMD_NOPREPARE);
|
|
error = simplecmd(xc, c);
|
|
assert(error == 0);
|
|
freecmd(c);
|
|
}
|
|
if (debug) {
|
|
struct fetchstatus s;
|
|
static const Oid types[] = { INT8OID, };
|
|
uint64_t pid;
|
|
|
|
c = createcmd("SELECT pg_backend_pid()::int8;",
|
|
CMD_NOPREPARE);
|
|
error = sendcmd(xc, c);
|
|
assert(error == 0);
|
|
fetchinit(&s, xc);
|
|
error = FETCHNEXT(&s, types, &pid);
|
|
fetchdone(&s);
|
|
assert(error == 0);
|
|
DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
|
|
}
|
|
error = commit(xc);
|
|
assert(error == 0);
|
|
assert(xc->owner == NULL);
|
|
}
|
|
/*
|
|
* XXX cleanup unlinked files here? what to do when the filesystem
|
|
* is shared?
|
|
*/
|
|
return 0;
|
|
}
|
|
|
|
struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
|
|
struct puffs_cc *flusher = NULL;
|
|
|
|
int
|
|
flush_xacts(struct puffs_usermount *pu)
|
|
{
|
|
struct puffs_cc *cc = puffs_cc_getcc(pu);
|
|
struct Xconn *xc;
|
|
static struct cmd *c;
|
|
uint64_t dummy;
|
|
int error;
|
|
|
|
/*
|
|
* flush all previously issued asynchronous transactions.
|
|
*
|
|
* XXX
|
|
* unfortunately it seems that there is no clean way to tell
|
|
* PostgreSQL flush XLOG. we could perform a CHECKPOINT but it's
|
|
* too expensive and overkill for our purpose.
|
|
* besides, PostgreSQL has an optimization to skip XLOG flushing
|
|
* for transactions which didn't produce WAL records.
|
|
* (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
|
|
* it means that an empty transaction ("BEGIN; COMMIT;"), which
|
|
* doesn't produce any WAL records, doesn't flush the XLOG even if
|
|
* synchronous_commit=on. we issues a dummy setval() to avoid the
|
|
* optimization.
|
|
* on the other hand, we try to avoid creating unnecessary WAL activity
|
|
* by serializing flushing and checking XLOG locations.
|
|
*/
|
|
|
|
assert(!pgfs_dosync);
|
|
if (flusher != NULL) { /* serialize flushers */
|
|
DPRINTF("%p flush in progress %p\n", cc, flusher);
|
|
waiton(&flushwaitq, cc);
|
|
assert(flusher == NULL);
|
|
}
|
|
DPRINTF("%p start flushing\n", cc);
|
|
flusher = cc;
|
|
retry:
|
|
xc = begin(pu, "flush");
|
|
CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
|
|
"pg_current_xlog_insert_location() <> pg_current_xlog_location()");
|
|
error = sendcmd(xc, c);
|
|
if (error != 0) {
|
|
goto got_error;
|
|
}
|
|
error = simplefetch(xc, INT8OID, &dummy);
|
|
assert(error != 0 || dummy == 1);
|
|
if (error == ENOENT) {
|
|
/*
|
|
* there seems to be nothing to flush.
|
|
*/
|
|
DPRINTF("%p no sync\n", cc);
|
|
error = 0;
|
|
}
|
|
if (error != 0) {
|
|
goto got_error;
|
|
}
|
|
error = commit_sync(xc);
|
|
if (error != 0) {
|
|
goto got_error;
|
|
}
|
|
goto done;
|
|
got_error:
|
|
rollback(xc);
|
|
if (error == EAGAIN) {
|
|
goto retry;
|
|
}
|
|
done:
|
|
assert(flusher == cc);
|
|
flusher = NULL;
|
|
wakeup_one(&flushwaitq);
|
|
DPRINTF("%p end flushing error=%d\n", cc, error);
|
|
return error;
|
|
}
|