aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBadlop <badlop@process-one.net>2013-03-19 13:29:15 +0100
committerBadlop <badlop@process-one.net>2013-03-19 13:30:17 +0100
commitf92a94a7378856c2ff3217152ee62a9d035aadc6 (patch)
tree119ef937e6ebc97790d655145bf29fa394bf1177 /src
parentCopied MySQL erlang library from ejabberd-modules SVN (diff)
Copied PostgreSQL erlang library from ejabberd-modules SVN
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.in2
-rwxr-xr-xsrc/configure3
-rw-r--r--src/configure.ac1
-rw-r--r--src/pgsql/EPLICENSE286
-rw-r--r--src/pgsql/Makefile.in38
-rw-r--r--src/pgsql/Makefile.win3218
-rw-r--r--src/pgsql/pgsql.erl96
-rw-r--r--src/pgsql/pgsql_proto.erl650
-rw-r--r--src/pgsql/pgsql_tcp.erl88
-rw-r--r--src/pgsql/pgsql_util.erl321
10 files changed, 1501 insertions, 2 deletions
diff --git a/src/Makefile.in b/src/Makefile.in
index cf26246f8..4c7c2501c 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -77,7 +77,7 @@ endif
prefix = @prefix@
exec_prefix = @exec_prefix@
-SUBDIRS = @mod_irc@ @mod_pubsub@ @mod_muc@ @mod_proxy65@ @eldap@ @pam@ @web@ mysql stringprep stun @tls@ @odbc@ @ejabberd_zlib@
+SUBDIRS = @mod_irc@ @mod_pubsub@ @mod_muc@ @mod_proxy65@ @eldap@ @pam@ @web@ mysql pgsql stringprep stun @tls@ @odbc@ @ejabberd_zlib@
ERLSHLIBS += expat_erl.so
ERLBEHAVS = cyrsasl.erl gen_mod.erl p1_fsm.erl ejabberd_auth.erl
SOURCES_ALL = $(wildcard *.erl)
diff --git a/src/configure b/src/configure
index 485527752..7b7066cb6 100755
--- a/src/configure
+++ b/src/configure
@@ -4696,7 +4696,7 @@ fi
-ac_config_files="$ac_config_files Makefile $make_mod_irc $make_mod_muc $make_mod_pubsub $make_mod_proxy65 $make_eldap $make_pam $make_web mysql/Makefile stringprep/Makefile stun/Makefile $make_tls $make_odbc $make_ejabberd_zlib"
+ac_config_files="$ac_config_files Makefile $make_mod_irc $make_mod_muc $make_mod_pubsub $make_mod_proxy65 $make_eldap $make_pam $make_web mysql/Makefile pgsql/Makefile stringprep/Makefile stun/Makefile $make_tls $make_odbc $make_ejabberd_zlib"
#openssl
@@ -5862,6 +5862,7 @@ do
"$make_pam") CONFIG_FILES="$CONFIG_FILES $make_pam" ;;
"$make_web") CONFIG_FILES="$CONFIG_FILES $make_web" ;;
"mysql/Makefile") CONFIG_FILES="$CONFIG_FILES mysql/Makefile" ;;
+ "pgsql/Makefile") CONFIG_FILES="$CONFIG_FILES pgsql/Makefile" ;;
"stringprep/Makefile") CONFIG_FILES="$CONFIG_FILES stringprep/Makefile" ;;
"stun/Makefile") CONFIG_FILES="$CONFIG_FILES stun/Makefile" ;;
"$make_tls") CONFIG_FILES="$CONFIG_FILES $make_tls" ;;
diff --git a/src/configure.ac b/src/configure.ac
index 13a3fec1d..b97ff1e2d 100644
--- a/src/configure.ac
+++ b/src/configure.ac
@@ -112,6 +112,7 @@ AC_CONFIG_FILES([Makefile
$make_pam
$make_web
mysql/Makefile
+ pgsql/Makefile
stringprep/Makefile
stun/Makefile
$make_tls
diff --git a/src/pgsql/EPLICENSE b/src/pgsql/EPLICENSE
new file mode 100644
index 000000000..36aa84e33
--- /dev/null
+++ b/src/pgsql/EPLICENSE
@@ -0,0 +1,286 @@
+ERLANG PUBLIC LICENSE
+Version 1.1
+
+1. Definitions.
+
+1.1. ``Contributor'' means each entity that creates or contributes to
+the creation of Modifications.
+
+1.2. ``Contributor Version'' means the combination of the Original
+Code, prior Modifications used by a Contributor, and the Modifications
+made by that particular Contributor.
+
+1.3. ``Covered Code'' means the Original Code or Modifications or the
+combination of the Original Code and Modifications, in each case
+including portions thereof.
+
+1.4. ``Electronic Distribution Mechanism'' means a mechanism generally
+accepted in the software development community for the electronic
+transfer of data.
+
+1.5. ``Executable'' means Covered Code in any form other than Source
+Code.
+
+1.6. ``Initial Developer'' means the individual or entity identified
+as the Initial Developer in the Source Code notice required by Exhibit
+A.
+
+1.7. ``Larger Work'' means a work which combines Covered Code or
+portions thereof with code not governed by the terms of this License.
+
+1.8. ``License'' means this document.
+
+1.9. ``Modifications'' means any addition to or deletion from the
+substance or structure of either the Original Code or any previous
+Modifications. When Covered Code is released as a series of files, a
+Modification is:
+
+A. Any addition to or deletion from the contents of a file containing
+ Original Code or previous Modifications.
+
+B. Any new file that contains any part of the Original Code or
+ previous Modifications.
+
+1.10. ``Original Code'' means Source Code of computer software code
+which is described in the Source Code notice required by Exhibit A as
+Original Code, and which, at the time of its release under this
+License is not already Covered Code governed by this License.
+
+1.11. ``Source Code'' means the preferred form of the Covered Code for
+making modifications to it, including all modules it contains, plus
+any associated interface definition files, scripts used to control
+compilation and installation of an Executable, or a list of source
+code differential comparisons against either the Original Code or
+another well known, available Covered Code of the Contributor's
+choice. The Source Code can be in a compressed or archival form,
+provided the appropriate decompression or de-archiving software is
+widely available for no charge.
+
+1.12. ``You'' means an individual or a legal entity exercising rights
+under, and complying with all of the terms of, this License. For legal
+entities,``You'' includes any entity which controls, is controlled by,
+or is under common control with You. For purposes of this definition,
+``control'' means (a) the power, direct or indirect, to cause the
+direction or management of such entity, whether by contract or
+otherwise, or (b) ownership of fifty percent (50%) or more of the
+outstanding shares or beneficial ownership of such entity.
+
+2. Source Code License.
+
+2.1. The Initial Developer Grant.
+The Initial Developer hereby grants You a world-wide, royalty-free,
+non-exclusive license, subject to third party intellectual property
+claims:
+
+(a) to use, reproduce, modify, display, perform, sublicense and
+ distribute the Original Code (or portions thereof) with or without
+ Modifications, or as part of a Larger Work; and
+
+(b) under patents now or hereafter owned or controlled by Initial
+ Developer, to make, have made, use and sell (``Utilize'') the
+ Original Code (or portions thereof), but solely to the extent that
+ any such patent is reasonably necessary to enable You to Utilize
+ the Original Code (or portions thereof) and not to any greater
+ extent that may be necessary to Utilize further Modifications or
+ combinations.
+
+2.2. Contributor Grant.
+Each Contributor hereby grants You a world-wide, royalty-free,
+non-exclusive license, subject to third party intellectual property
+claims:
+
+(a) to use, reproduce, modify, display, perform, sublicense and
+ distribute the Modifications created by such Contributor (or
+ portions thereof) either on an unmodified basis, with other
+ Modifications, as Covered Code or as part of a Larger Work; and
+
+(b) under patents now or hereafter owned or controlled by Contributor,
+ to Utilize the Contributor Version (or portions thereof), but
+ solely to the extent that any such patent is reasonably necessary
+ to enable You to Utilize the Contributor Version (or portions
+ thereof), and not to any greater extent that may be necessary to
+ Utilize further Modifications or combinations.
+
+3. Distribution Obligations.
+
+3.1. Application of License.
+The Modifications which You contribute are governed by the terms of
+this License, including without limitation Section 2.2. The Source
+Code version of Covered Code may be distributed only under the terms
+of this License, and You must include a copy of this License with
+every copy of the Source Code You distribute. You may not offer or
+impose any terms on any Source Code version that alters or restricts
+the applicable version of this License or the recipients' rights
+hereunder. However, You may include an additional document offering
+the additional rights described in Section 3.5.
+
+3.2. Availability of Source Code.
+Any Modification which You contribute must be made available in Source
+Code form under the terms of this License either on the same media as
+an Executable version or via an accepted Electronic Distribution
+Mechanism to anyone to whom you made an Executable version available;
+and if made available via Electronic Distribution Mechanism, must
+remain available for at least twelve (12) months after the date it
+initially became available, or at least six (6) months after a
+subsequent version of that particular Modification has been made
+available to such recipients. You are responsible for ensuring that
+the Source Code version remains available even if the Electronic
+Distribution Mechanism is maintained by a third party.
+
+3.3. Description of Modifications.
+You must cause all Covered Code to which you contribute to contain a
+file documenting the changes You made to create that Covered Code and
+the date of any change. You must include a prominent statement that
+the Modification is derived, directly or indirectly, from Original
+Code provided by the Initial Developer and including the name of the
+Initial Developer in (a) the Source Code, and (b) in any notice in an
+Executable version or related documentation in which You describe the
+origin or ownership of the Covered Code.
+
+3.4. Intellectual Property Matters
+
+(a) Third Party Claims.
+ If You have knowledge that a party claims an intellectual property
+ right in particular functionality or code (or its utilization
+ under this License), you must include a text file with the source
+ code distribution titled ``LEGAL'' which describes the claim and
+ the party making the claim in sufficient detail that a recipient
+ will know whom to contact. If you obtain such knowledge after You
+ make Your Modification available as described in Section 3.2, You
+ shall promptly modify the LEGAL file in all copies You make
+ available thereafter and shall take other steps (such as notifying
+ appropriate mailing lists or newsgroups) reasonably calculated to
+ inform those who received the Covered Code that new knowledge has
+ been obtained.
+
+(b) Contributor APIs.
+ If Your Modification is an application programming interface and
+ You own or control patents which are reasonably necessary to
+ implement that API, you must also include this information in the
+ LEGAL file.
+
+3.5. Required Notices.
+You must duplicate the notice in Exhibit A in each file of the Source
+Code, and this License in any documentation for the Source Code, where
+You describe recipients' rights relating to Covered Code. If You
+created one or more Modification(s), You may add your name as a
+Contributor to the notice described in Exhibit A. If it is not
+possible to put such notice in a particular Source Code file due to
+its structure, then you must include such notice in a location (such
+as a relevant directory file) where a user would be likely to look for
+such a notice. You may choose to offer, and to charge a fee for,
+warranty, support, indemnity or liability obligations to one or more
+recipients of Covered Code. However, You may do so only on Your own
+behalf, and not on behalf of the Initial Developer or any
+Contributor. You must make it absolutely clear than any such warranty,
+support, indemnity or liability obligation is offered by You alone,
+and You hereby agree to indemnify the Initial Developer and every
+Contributor for any liability incurred by the Initial Developer or
+such Contributor as a result of warranty, support, indemnity or
+liability terms You offer.
+
+3.6. Distribution of Executable Versions.
+You may distribute Covered Code in Executable form only if the
+requirements of Section 3.1-3.5 have been met for that Covered Code,
+and if You include a notice stating that the Source Code version of
+the Covered Code is available under the terms of this License,
+including a description of how and where You have fulfilled the
+obligations of Section 3.2. The notice must be conspicuously included
+in any notice in an Executable version, related documentation or
+collateral in which You describe recipients' rights relating to the
+Covered Code. You may distribute the Executable version of Covered
+Code under a license of Your choice, which may contain terms different
+from this License, provided that You are in compliance with the terms
+of this License and that the license for the Executable version does
+not attempt to limit or alter the recipient's rights in the Source
+Code version from the rights set forth in this License. If You
+distribute the Executable version under a different license You must
+make it absolutely clear that any terms which differ from this License
+are offered by You alone, not by the Initial Developer or any
+Contributor. You hereby agree to indemnify the Initial Developer and
+every Contributor for any liability incurred by the Initial Developer
+or such Contributor as a result of any such terms You offer.
+
+3.7. Larger Works.
+You may create a Larger Work by combining Covered Code with other code
+not governed by the terms of this License and distribute the Larger
+Work as a single product. In such a case, You must make sure the
+requirements of this License are fulfilled for the Covered Code.
+
+4. Inability to Comply Due to Statute or Regulation.
+If it is impossible for You to comply with any of the terms of this
+License with respect to some or all of the Covered Code due to statute
+or regulation then You must: (a) comply with the terms of this License
+to the maximum extent possible; and (b) describe the limitations and
+the code they affect. Such description must be included in the LEGAL
+file described in Section 3.4 and must be included with all
+distributions of the Source Code. Except to the extent prohibited by
+statute or regulation, such description must be sufficiently detailed
+for a recipient of ordinary skill to be able to understand it.
+
+5. Application of this License.
+
+This License applies to code to which the Initial Developer has
+attached the notice in Exhibit A, and to related Covered Code.
+
+6. CONNECTION TO MOZILLA PUBLIC LICENSE
+
+This Erlang License is a derivative work of the Mozilla Public
+License, Version 1.0. It contains terms which differ from the Mozilla
+Public License, Version 1.0.
+
+7. DISCLAIMER OF WARRANTY.
+
+COVERED CODE IS PROVIDED UNDER THIS LICENSE ON AN ``AS IS'' BASIS,
+WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
+WITHOUT LIMITATION, WARRANTIES THAT THE COVERED CODE IS FREE OF
+DEFECTS, MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE OR
+NON-INFRINGING. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF
+THE COVERED CODE IS WITH YOU. SHOULD ANY COVERED CODE PROVE DEFECTIVE
+IN ANY RESPECT, YOU (NOT THE INITIAL DEVELOPER OR ANY OTHER
+CONTRIBUTOR) ASSUME THE COST OF ANY NECESSARY SERVICING, REPAIR OR
+CORRECTION. THIS DISCLAIMER OF WARRANTY CONSTITUTES AN ESSENTIAL PART
+OF THIS LICENSE. NO USE OF ANY COVERED CODE IS AUTHORIZED HEREUNDER
+EXCEPT UNDER THIS DISCLAIMER.
+
+8. TERMINATION.
+This License and the rights granted hereunder will terminate
+automatically if You fail to comply with terms herein and fail to cure
+such breach within 30 days of becoming aware of the breach. All
+sublicenses to the Covered Code which are properly granted shall
+survive any termination of this License. Provisions which, by their
+nature, must remain in effect beyond the termination of this License
+shall survive.
+
+9. DISCLAIMER OF LIABILITY
+Any utilization of Covered Code shall not cause the Initial Developer
+or any Contributor to be liable for any damages (neither direct nor
+indirect).
+
+10. MISCELLANEOUS
+This License represents the complete agreement concerning the subject
+matter hereof. If any provision is held to be unenforceable, such
+provision shall be reformed only to the extent necessary to make it
+enforceable. This License shall be construed by and in accordance with
+the substantive laws of Sweden. Any dispute, controversy or claim
+arising out of or relating to this License, or the breach, termination
+or invalidity thereof, shall be subject to the exclusive jurisdiction
+of Swedish courts, with the Stockholm City Court as the first
+instance.
+
+EXHIBIT A.
+
+``The contents of this file are subject to the Erlang Public License,
+Version 1.1, (the "License"); you may not use this file except in
+compliance with the License. You should have received a copy of the
+Erlang Public License along with this software. If not, it can be
+retrieved via the world wide web at http://www.erlang.org/.
+
+Software distributed under the License is distributed on an "AS IS"
+basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+the License for the specific language governing rights and limitations
+under the License.
+
+The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+AB. All Rights Reserved.''
diff --git a/src/pgsql/Makefile.in b/src/pgsql/Makefile.in
new file mode 100644
index 000000000..e77da8452
--- /dev/null
+++ b/src/pgsql/Makefile.in
@@ -0,0 +1,38 @@
+# $Id: Makefile.in 1453 2008-07-16 16:58:42Z badlop $
+
+CC = @CC@
+CFLAGS = @CFLAGS@
+CPPFLAGS = @CPPFLAGS@
+LDFLAGS = @LDFLAGS@
+LIBS = @LIBS@
+
+ERLANG_CFLAGS = @ERLANG_CFLAGS@
+ERLANG_LIBS = @ERLANG_LIBS@
+
+EFLAGS += -I ..
+EFLAGS += -pz ..
+
+# make debug=true to compile Erlang module with debug informations.
+ifdef debug
+ EFLAGS+=+debug_info
+endif
+
+OUTDIR = ..
+SOURCES = $(wildcard *.erl)
+BEAMS = $(addprefix $(OUTDIR)/,$(SOURCES:.erl=.beam))
+
+
+all: $(BEAMS)
+
+$(OUTDIR)/%.beam: %.erl
+ @ERLC@ -W $(EFLAGS) -o $(OUTDIR) $<
+
+clean:
+ rm -f $(BEAMS)
+
+distclean: clean
+ rm -f Makefile
+
+TAGS:
+ etags *.erl
+
diff --git a/src/pgsql/Makefile.win32 b/src/pgsql/Makefile.win32
new file mode 100644
index 000000000..e70aba9f1
--- /dev/null
+++ b/src/pgsql/Makefile.win32
@@ -0,0 +1,18 @@
+
+include ..\Makefile.inc
+
+EFLAGS = -I .. -pz ..
+
+OUTDIR = ..
+BEAMS = ..\stun_codec.beam ..\ejabberd_stun.beam
+
+ALL : $(BEAMS)
+
+CLEAN :
+ -@erase $(BEAMS)
+
+$(OUTDIR)\stun_codec.beam : stun_codec.erl
+ erlc -W $(EFLAGS) -o $(OUTDIR) stun_codec.erl
+
+$(OUTDIR)\ejabberd_stun.beam : ejabberd_stun.erl
+ erlc -W $(EFLAGS) -o $(OUTDIR) ejabberd_stun.erl
diff --git a/src/pgsql/pgsql.erl b/src/pgsql/pgsql.erl
new file mode 100644
index 000000000..3f993ecb6
--- /dev/null
+++ b/src/pgsql/pgsql.erl
@@ -0,0 +1,96 @@
+%%% File : pgsql.erl
+%%% Author : Christian Sunesson <chsu79@gmail.com>
+%%% Description : PostgresQL interface
+%%% Created : 11 May 2005
+
+%%
+%% API for accessing the postgres driver.
+%%
+
+-module(pgsql).
+-export([connect/1, connect/4, connect/5]).
+
+-export([squery/2,
+ pquery/3,
+ terminate/1,
+ prepare/3, unprepare/2,
+ execute/3]).
+
+
+connect(Host, Database, User, Password) ->
+ connect([{database, Database},
+ {host, Host},
+ {user, User},
+ {password, Password}]).
+
+connect(Host, Database, User, Password, Port) ->
+ connect([{database, Database},
+ {host, Host},
+ {user, User},
+ {port, Port},
+ {password, Password}]).
+
+connect(Options) ->
+ pgsql_proto:start(Options).
+
+%% Close a connection
+terminate(Db) ->
+ gen_server:call(Db, terminate).
+
+%%% In the "simple query" protocol, the frontend just sends a
+%%% textual query string, which is parsed and immediately
+%%% executed by the backend.
+
+%% A simple query can contain multiple statements (separated with a semi-colon),
+%% and each statement's response.
+
+%%% squery(Db, Query) -> {ok, Results} | ... no real error handling
+%%% Query = string()
+%%% Results = [Result]
+%%% Result = {"SELECT", RowDesc, ResultSet} | ...
+squery(Db, Query) ->
+ gen_server:call(Db, {squery, Query}, infinity).
+
+%%% In the "extended query" protocol, processing of queries is
+%%% separated into multiple steps: parsing, binding of parameter
+%%% values, and execution. This offers flexibility and performance
+%%% benefits, at the cost of extra complexity.
+
+%%% pquery(Db, Query, Params) -> {ok, Command, Status, NameTypes, Rows} | timeout | ...
+%%% Query = string()
+%%% Params = [term()]
+%%% Command = string()
+%%% Status = idle | transaction | failed_transaction
+%%% NameTypes = [{ColName, ColType}]
+%%% Rows = [list()]
+pquery(Db, Query, Params) ->
+ gen_server:call(Db, {equery, {Query, Params}}).
+
+%%% prepare(Db, Name, Query) -> {ok, Status, ParamTypes, ResultTypes}
+%%% Status = idle | transaction | failed_transaction
+%%% ParamTypes = [atom()]
+%%% ResultTypes = [{ColName, ColType}]
+prepare(Db, Name, Query) when is_atom(Name) ->
+ gen_server:call(Db, {prepare, {atom_to_list(Name), Query}}).
+
+%%% unprepare(Db, Name) -> ok | timeout | ...
+%%% Name = atom()
+unprepare(Db, Name) when is_atom(Name) ->
+ gen_server:call(Db, {unprepare, atom_to_list(Name)}).
+
+%%% execute(Db, Name, Params) -> {ok, Result} | timeout | ...
+%%% Result = {'INSERT', NRows} |
+%%% {'DELETE', NRows} |
+%%% {'SELECT', ResultSet} |
+%%% ...
+%%% ResultSet = [Row]
+%%% Row = list()
+execute(Db, Name, Params) when is_atom(Name), is_list(Params) ->
+ Ref = make_ref(),
+ Db ! {execute, Ref, self(), {atom_to_list(Name), Params}},
+ receive
+ {pgsql, Ref, Result} ->
+ {ok, Result}
+ after 5000 ->
+ timeout
+ end.
diff --git a/src/pgsql/pgsql_proto.erl b/src/pgsql/pgsql_proto.erl
new file mode 100644
index 000000000..fe49a4846
--- /dev/null
+++ b/src/pgsql/pgsql_proto.erl
@@ -0,0 +1,650 @@
+%%% File : pgsql_proto.erl
+%%% Author : Christian Sunesson <chrisu@kth.se>
+%%% Description : PostgreSQL protocol driver
+%%% Created : 9 May 2005
+
+%%% This is the protocol handling part of the PostgreSQL driver, it turns packages into
+%%% erlang term messages and back.
+
+-module(pgsql_proto).
+
+-behaviour(gen_server).
+
+%% TODO:
+%% When factorizing make clear distinction between message and packet.
+%% Packet == binary on-wire representation
+%% Message = parsed Packet as erlang terms.
+
+%%% Version 3.0 of the protocol.
+%%% Supported in postgres from version 7.4
+-define(PROTOCOL_MAJOR, 3).
+-define(PROTOCOL_MINOR, 0).
+
+%%% PostgreSQL protocol message codes
+-define(PG_BACKEND_KEY_DATA, $K).
+-define(PG_PARAMETER_STATUS, $S).
+-define(PG_ERROR_MESSAGE, $E).
+-define(PG_NOTICE_RESPONSE, $N).
+-define(PG_EMPTY_RESPONSE, $I).
+-define(PG_ROW_DESCRIPTION, $T).
+-define(PG_DATA_ROW, $D).
+-define(PG_READY_FOR_QUERY, $Z).
+-define(PG_AUTHENTICATE, $R).
+-define(PG_BIND, $B).
+-define(PG_PARSE, $P).
+-define(PG_COMMAND_COMPLETE, $C).
+-define(PG_PARSE_COMPLETE, $1).
+-define(PG_BIND_COMPLETE, $2).
+-define(PG_CLOSE_COMPLETE, $3).
+-define(PG_PORTAL_SUSPENDED, $s).
+-define(PG_NO_DATA, $n).
+
+-export([start/1, start_link/1]).
+
+%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ code_change/3,
+ handle_info/2,
+ terminate/2]).
+
+%% For protocol unwrapping, pgsql_tcp for example.
+-export([decode_packet/3]).
+-export([encode_message/2]).
+-export([encode/2]).
+
+-import(pgsql_util, [option/3]).
+-import(pgsql_util, [socket/1]).
+-import(pgsql_util, [send/2, send_int/2, send_msg/3]).
+-import(pgsql_util, [recv_msg/2, recv_msg/1, recv_byte/2, recv_byte/1]).
+-import(pgsql_util, [string/1, make_pair/2, split_pair/2]).
+-import(pgsql_util, [count_string/1, to_string/2]).
+-import(pgsql_util, [coldescs/3, datacoldescs/3]).
+-import(pgsql_util, [to_integer/1, to_atom/1]).
+
+-record(state, {options, driver, params, socket, oidmap, as_binary}).
+
+start(Options) ->
+ gen_server:start(?MODULE, [self(), Options], []).
+
+start_link(Options) ->
+ gen_server:start_link(?MODULE, [self(), Options], []).
+
+init([DriverPid, Options]) ->
+ %%io:format("Init~n", []),
+ %% Default values: We connect to localhost on the standard TCP/IP
+ %% port.
+ Host = option(Options, host, "localhost"),
+ Port = option(Options, port, 5432),
+ AsBinary = option(Options, as_binary, false),
+
+ case socket({tcp, Host, Port}) of
+ {ok, Sock} ->
+ connect(#state{options = Options,
+ driver = DriverPid,
+ as_binary = AsBinary,
+ socket = Sock});
+ Error ->
+ Reason = {init, Error},
+ {stop, Reason}
+ end.
+
+connect(StateData) ->
+ %%io:format("Connect~n", []),
+ %% Connection settings for database-login.
+ %% TODO: Check if the default values are relevant:
+ UserName = option(StateData#state.options, user, "cos"),
+ DatabaseName = option(StateData#state.options, database, "template1"),
+
+ %% Make protocol startup packet.
+ Version = <<?PROTOCOL_MAJOR:16/integer, ?PROTOCOL_MINOR:16/integer>>,
+ User = make_pair(user, UserName),
+ Database = make_pair(database, DatabaseName),
+ StartupPacket = <<Version/binary,
+ User/binary,
+ Database/binary,
+ 0>>,
+
+ %% Backend will continue with authentication after the startup packet
+ PacketSize = 4 + size(StartupPacket),
+ Sock = StateData#state.socket,
+ ok = gen_tcp:send(Sock, <<PacketSize:32/integer, StartupPacket/binary>>),
+ authenticate(StateData).
+
+
+authenticate(StateData) ->
+ %% Await authentication request from backend.
+ Sock = StateData#state.socket,
+ AsBin = StateData#state.as_binary,
+ {ok, Code, Packet} = recv_msg(Sock, 5000),
+ {ok, Value} = decode_packet(Code, Packet, AsBin),
+ case Value of
+ %% Error response
+ {error_message, Message} ->
+ {stop, {authentication, Message}};
+ {authenticate, {AuthMethod, Salt}} ->
+ case AuthMethod of
+ 0 -> % Auth ok
+ setup(StateData, []);
+ 1 -> % Kerberos 4
+ {stop, {nyi, auth_kerberos4}};
+ 2 -> % Kerberos 5
+ {stop, {nyi, auth_kerberos5}};
+ 3 -> % Plaintext password
+ Password = option(StateData#state.options, password, ""),
+ EncodedPass = encode_message(pass_plain, Password),
+ ok = send(Sock, EncodedPass),
+ authenticate(StateData);
+ 4 -> % Hashed password
+ {stop, {nyi, auth_crypt}};
+ 5 -> % MD5 password
+ Password = option(StateData#state.options, password, ""),
+ User = option(StateData#state.options, user, ""),
+ EncodedPass = encode_message(pass_md5,
+ {User, Password, Salt}),
+ ok = send(Sock, EncodedPass),
+ authenticate(StateData);
+ _ ->
+ {stop, {authentication, {unknown, AuthMethod}}}
+ end;
+ %% Unknown message received
+ Any ->
+ {stop, {protocol_error, Any}}
+ end.
+
+setup(StateData, Params) ->
+ %% Receive startup messages until ReadyForQuery
+ Sock = StateData#state.socket,
+ AsBin = StateData#state.as_binary,
+ {ok, Code, Package} = recv_msg(Sock, 5000),
+ {ok, Pair} = decode_packet(Code, Package, AsBin),
+ case Pair of
+ %% BackendKeyData, necessary for issuing cancel requests
+ {backend_key_data, {Pid, Secret}} ->
+ Params1 = [{secret, {Pid, Secret}} | Params],
+ setup(StateData, Params1);
+ %% ParameterStatus, a key-value pair.
+ {parameter_status, {Key, Value}} ->
+ Params1 = [{{parameter, Key}, Value} | Params],
+ setup(StateData, Params1);
+ %% Error message, with a sequence of <<Code:8/integer, String, 0>>
+ %% of error descriptions. Code==0 terminates the Reason.
+ {error_message, Message} ->
+ gen_tcp:close(Sock),
+ {stop, {error_response, Message}};
+ %% Notice Response, with a sequence of <<Code:8/integer, String,0>>
+ %% identified fields. Code==0 terminates the Notice.
+ {notice_response, Notice} ->
+ deliver(StateData, {pgsql_notice, Notice}),
+ setup(StateData, Params);
+ %% Ready for Query, backend is ready for a new query cycle
+ {ready_for_query, _Status} ->
+ connected(StateData#state{params = Params}, Sock);
+ Any ->
+ {stop, {unknown_setup, Any}}
+ end.
+
+%% Connected state. Can now start to push messages
+%% between frontend and backend. But first some setup.
+connected(StateData, Sock) ->
+ %% Protocol unwrapping process. Factored out to make future
+ %% SSL and unix domain support easier. Store process under
+ %% 'socket' in the process dictionary.
+ AsBin = StateData#state.as_binary,
+ {ok, Unwrapper} = pgsql_tcp:start_link(Sock, self(), AsBin),
+ ok = gen_tcp:controlling_process(Sock, Unwrapper),
+
+ %% Lookup oid to type names and store them in a dictionary under
+ %% 'oidmap' in the process dictionary.
+ Packet = encode_message(squery, "SELECT oid, typname FROM pg_type"),
+ ok = send(Sock, Packet),
+ {ok, [{_, _ColDesc, Rows}]} = process_squery([], AsBin),
+ Rows1 = lists:map(fun ([CodeS, NameS]) ->
+ Code = to_integer(CodeS),
+ Name = to_atom(NameS),
+ {Code, Name}
+ end,
+ Rows),
+ OidMap = dict:from_list(Rows1),
+
+ {ok, StateData#state{oidmap = OidMap}}.
+
+
+handle_call(terminate, _From, State) ->
+ Sock = State#state.socket,
+ Packet = encode_message(terminate, []),
+ ok = send(Sock, Packet),
+ gen_tcp:close(Sock),
+ Reply = ok,
+ {stop, normal, Reply, State};
+
+%% Simple query
+handle_call({squery, Query}, _From, State) ->
+ Sock = State#state.socket,
+ AsBin = State#state.as_binary,
+ Packet = encode_message(squery, Query),
+ ok = send(Sock, Packet),
+ {ok, Result} = process_squery([], AsBin),
+ case lists:keymember(error, 1, Result) of
+ true ->
+ RBPacket = encode_message(squery, "ROLLBACK"),
+ ok = send(Sock, RBPacket),
+ {ok, _RBResult} = process_squery([], AsBin);
+ _ ->
+ ok
+ end,
+ Reply = {ok, Result},
+ {reply, Reply, State};
+
+%% Extended query
+%% simplistic version using the unnammed prepared statement and portal.
+handle_call({equery, {Query, Params}}, _From, State) ->
+ Sock = State#state.socket,
+ ParseP = encode_message(parse, {"", Query, []}),
+ BindP = encode_message(bind, {"", "", Params, [binary]}),
+ DescribeP = encode_message(describe, {portal, ""}),
+ ExecuteP = encode_message(execute, {"", 0}),
+ SyncP = encode_message(sync, []),
+ ok = send(Sock, [ParseP, BindP, DescribeP, ExecuteP, SyncP]),
+
+ {ok, Command, Desc, Status, Logs} = process_equery(State, []),
+
+ OidMap = State#state.oidmap,
+ NameTypes = lists:map(fun({Name, _Format, _ColNo, Oid, _, _, _}) ->
+ {Name, dict:fetch(Oid, OidMap)}
+ end,
+ Desc),
+ Reply = {ok, Command, Status, NameTypes, Logs},
+ {reply, Reply, State};
+
+%% Prepare a statement, so it can be used for queries later on.
+handle_call({prepare, {Name, Query}}, _From, State) ->
+ Sock = State#state.socket,
+ send_message(Sock, parse, {Name, Query, []}),
+ send_message(Sock, describe, {prepared_statement, Name}),
+ send_message(Sock, sync, []),
+ {ok, State, ParamDesc, ResultDesc} = process_prepare({[], []}),
+ OidMap = State#state.oidmap,
+ ParamTypes =
+ lists:map(fun (Oid) -> dict:fetch(Oid, OidMap) end, ParamDesc),
+ ResultNameTypes = lists:map(fun ({ColName, _Format, _ColNo, Oid, _, _, _}) ->
+ {ColName, dict:fetch(Oid, OidMap)}
+ end,
+ ResultDesc),
+ Reply = {ok, State, ParamTypes, ResultNameTypes},
+ {reply, Reply, State};
+
+%% Close a prepared statement.
+handle_call({unprepare, Name}, _From, State) ->
+ Sock = State#state.socket,
+ send_message(Sock, close, {prepared_statement, Name}),
+ send_message(Sock, sync, []),
+ {ok, _Status} = process_unprepare(),
+ Reply = ok,
+ {reply, Reply, State};
+
+%% Execute a prepared statement
+handle_call({execute, {Name, Params}}, _From, State) ->
+ Sock = State#state.socket,
+ %%io:format("execute: ~p ~p ~n", [Name, Params]),
+ begin % Issue first requests for the prepared statement.
+ BindP = encode_message(bind, {"", Name, Params, [binary]}),
+ DescribeP = encode_message(describe, {portal, ""}),
+ ExecuteP = encode_message(execute, {"", 0}),
+ FlushP = encode_message(flush, []),
+ ok = send(Sock, [BindP, DescribeP, ExecuteP, FlushP])
+ end,
+ receive
+ {pgsql, {bind_complete, _}} -> % Bind reply first.
+ %% Collect response to describe message,
+ %% which gives a hint of the rest of the messages.
+ {ok, Command, Result} = process_execute(State, Sock),
+
+ begin % Close portal and end extended query.
+ CloseP = encode_message(close, {portal, ""}),
+ SyncP = encode_message(sync, []),
+ ok = send(Sock, [CloseP, SyncP])
+ end,
+ receive
+ %% Collect response to close message.
+ {pgsql, {close_complete, _}} ->
+ receive
+ %% Collect response to sync message.
+ {pgsql, {ready_for_query, _Status}} ->
+ %%io:format("execute: ~p ~p ~p~n",
+ %% [Status, Command, Result]),
+ Reply = {ok, {Command, Result}},
+ {reply, Reply, State};
+ {pgsql, Unknown} ->
+ {stop, Unknown, {error, Unknown}, State}
+ end;
+ {pgsql, Unknown} ->
+ {stop, Unknown, {error, Unknown}, State}
+ end;
+ {pgsql, Unknown} ->
+ {stop, Unknown, {error, Unknown}, State}
+ end;
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+%% Socket closed or socket error messages.
+handle_info({socket, _Sock, Condition}, State) ->
+ {stop, {socket, Condition}, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+deliver(State, Message) ->
+ DriverPid = State#state.driver,
+ DriverPid ! Message.
+
+%% In the process_squery state we collect responses until the backend is
+%% done processing.
+process_squery(Log, AsBin) ->
+ receive
+ {pgsql, {row_description, Cols}} ->
+ {ok, Command, Rows} = process_squery_cols([], AsBin),
+ process_squery([{Command, Cols, Rows}|Log], AsBin);
+ {pgsql, {command_complete, Command}} ->
+ process_squery([Command|Log], AsBin);
+ {pgsql, {ready_for_query, _Status}} ->
+ {ok, lists:reverse(Log)};
+ {pgsql, {error_message, Error}} ->
+ process_squery([{error, Error}|Log], AsBin);
+ {pgsql, _Any} ->
+ process_squery(Log, AsBin)
+ end.
+process_squery_cols(Log, AsBin) ->
+ receive
+ {pgsql, {data_row, Row}} ->
+ process_squery_cols(
+ [lists:map(
+ fun(null) ->
+ null;
+ (R) when AsBin == true ->
+ R;
+ (R) ->
+ binary_to_list(R)
+ end, Row) | Log], AsBin);
+ {pgsql, {command_complete, Command}} ->
+ {ok, Command, lists:reverse(Log)}
+ end.
+
+process_equery(State, Log) ->
+ receive
+ %% Consume parse and bind complete messages when waiting for the first
+ %% first row_description message. What happens if the equery doesnt
+ %% return a result set?
+ {pgsql, {parse_complete, _}} ->
+ process_equery(State, Log);
+ {pgsql, {bind_complete, _}} ->
+ process_equery(State, Log);
+ {pgsql, {row_description, Descs}} ->
+ OidMap = State#state.oidmap,
+ {ok, Descs1} = pgsql_util:decode_descs(OidMap, Descs),
+ process_equery_datarow(Descs1, Log, {undefined, Descs, undefined});
+ {pgsql, Any} ->
+ process_equery(State, [Any|Log])
+ end.
+
+process_equery_datarow(Types, Log, Info={Command, Desc, Status}) ->
+ receive
+ %%
+ {pgsql, {command_complete, Command1}} ->
+ process_equery_datarow(Types, Log, {Command1, Desc, Status});
+ {pgsql, {ready_for_query, Status1}} ->
+ {ok, Command, Desc, Status1, lists:reverse(Log)};
+ {pgsql, {data_row, Row}} ->
+ {ok, DecodedRow} = pgsql_util:decode_row(Types, Row),
+ process_equery_datarow(Types, [DecodedRow|Log], Info);
+ {pgsql, Any} ->
+ process_equery_datarow(Types, [Any|Log], Info)
+ end.
+
+process_prepare(Info={ParamDesc, ResultDesc}) ->
+ receive
+ {pgsql, {no_data, _}} ->
+ process_prepare({ParamDesc, []});
+ {pgsql, {parse_complete, _}} ->
+ process_prepare(Info);
+ {pgsql, {parameter_description, Oids}} ->
+ process_prepare({Oids, ResultDesc});
+ {pgsql, {row_description, Desc}} ->
+ process_prepare({ParamDesc, Desc});
+ {pgsql, {ready_for_query, Status}} ->
+ {ok, Status, ParamDesc, ResultDesc};
+ {pgsql, Any} ->
+ io:format("process_prepare: ~p~n", [Any]),
+ process_prepare(Info)
+ end.
+
+process_unprepare() ->
+ receive
+ {pgsql, {ready_for_query, Status}} ->
+ {ok, Status};
+ {pgsql, {close_complate, []}} ->
+ process_unprepare();
+ {pgsql, Any} ->
+ io:format("process_unprepare: ~p~n", [Any]),
+ process_unprepare()
+ end.
+
+process_execute(State, Sock) ->
+ %% Either the response begins with a no_data or a row_description
+ %% Needs to return {ok, Status, Result}
+ %% where Result = {Command, ...}
+ receive
+ {pgsql, {no_data, _}} ->
+ {ok, _Command, _Result} = process_execute_nodata();
+ {pgsql, {row_description, Descs}} ->
+ OidMap = State#state.oidmap,
+ {ok, Types} = pgsql_util:decode_descs(OidMap, Descs),
+ {ok, _Command, _Result} =
+ process_execute_resultset(Sock, Types, []);
+ {pgsql, Unknown} ->
+ exit(Unknown)
+ end.
+
+process_execute_nodata() ->
+ receive
+ {pgsql, {command_complete, Cmd}} ->
+ Command = if is_binary(Cmd) ->
+ binary_to_list(Cmd);
+ true ->
+ Cmd
+ end,
+ case Command of
+ "INSERT "++Rest ->
+ {ok, [{integer, _, _Table},
+ {integer, _, NRows}], _} = erl_scan:string(Rest),
+ {ok, 'INSERT', NRows};
+ "SELECT" ->
+ {ok, 'SELECT', should_not_happen};
+ "DELETE "++Rest ->
+ {ok, [{integer, _, NRows}], _} =
+ erl_scan:string(Rest),
+ {ok, 'DELETE', NRows};
+ Any ->
+ {ok, nyi, Any}
+ end;
+
+ {pgsql, Unknown} ->
+ exit(Unknown)
+ end.
+process_execute_resultset(Sock, Types, Log) ->
+ receive
+ {pgsql, {command_complete, Command}} ->
+ {ok, to_atom(Command), lists:reverse(Log)};
+ {pgsql, {data_row, Row}} ->
+ {ok, DecodedRow} = pgsql_util:decode_row(Types, Row),
+ process_execute_resultset(Sock, Types, [DecodedRow|Log]);
+ {pgsql, {portal_suspended, _}} ->
+ throw(portal_suspended);
+ {pgsql, Any} ->
+ %%process_execute_resultset(Types, [Any|Log])
+ exit(Any)
+ end.
+
+%% With a message type Code and the payload Packet apropriate
+%% decoding procedure can proceed.
+decode_packet(Code, Packet, AsBin) ->
+ Ret = fun(CodeName, Values) -> {ok, {CodeName, Values}} end,
+ case Code of
+ ?PG_ERROR_MESSAGE ->
+ Message = pgsql_util:errordesc(Packet, AsBin),
+ Ret(error_message, Message);
+ ?PG_EMPTY_RESPONSE ->
+ Ret(empty_response, []);
+ ?PG_ROW_DESCRIPTION ->
+ <<_Columns:16/integer, ColDescs/binary>> = Packet,
+ Descs = coldescs(ColDescs, [], AsBin),
+ Ret(row_description, Descs);
+ ?PG_READY_FOR_QUERY ->
+ <<State:8/integer>> = Packet,
+ case State of
+ $I ->
+ Ret(ready_for_query, idle);
+ $T ->
+ Ret(ready_for_query, transaction);
+ $E ->
+ Ret(ready_for_query, failed_transaction)
+ end;
+ ?PG_COMMAND_COMPLETE ->
+ {Task, _} = to_string(Packet, AsBin),
+ Ret(command_complete, Task);
+ ?PG_DATA_ROW ->
+ <<NumberCol:16/integer, RowData/binary>> = Packet,
+ ColData = datacoldescs(NumberCol, RowData, []),
+ Ret(data_row, ColData);
+ ?PG_BACKEND_KEY_DATA ->
+ <<Pid:32/integer, Secret:32/integer>> = Packet,
+ Ret(backend_key_data, {Pid, Secret});
+ ?PG_PARAMETER_STATUS ->
+ {Key, Value} = split_pair(Packet, AsBin),
+ Ret(parameter_status, {Key, Value});
+ ?PG_NOTICE_RESPONSE ->
+ Ret(notice_response, []);
+ ?PG_AUTHENTICATE ->
+ <<AuthMethod:32/integer, Salt/binary>> = Packet,
+ Ret(authenticate, {AuthMethod, Salt});
+ ?PG_PARSE_COMPLETE ->
+ Ret(parse_complete, []);
+ ?PG_BIND_COMPLETE ->
+ Ret(bind_complete, []);
+ ?PG_PORTAL_SUSPENDED ->
+ Ret(portal_suspended, []);
+ ?PG_CLOSE_COMPLETE ->
+ Ret(close_complete, []);
+ $t ->
+ <<_NParams:16/integer, OidsP/binary>> = Packet,
+ Oids = pgsql_util:oids(OidsP, []),
+ Ret(parameter_description, Oids);
+ ?PG_NO_DATA ->
+ Ret(no_data, []);
+ _Any ->
+ Ret(unknown, [Code])
+ end.
+
+send_message(Sock, Type, Values) ->
+ %%io:format("send_message:~p~n", [{Type, Values}]),
+ Packet = encode_message(Type, Values),
+ ok = send(Sock, Packet).
+
+%% Add header to a message.
+encode(Code, Packet) ->
+ Len = size(Packet) + 4,
+ <<Code:8/integer, Len:4/integer-unit:8, Packet/binary>>.
+
+%% Encode a message of a given type.
+encode_message(pass_plain, Password) ->
+ Pass = pgsql_util:pass_plain(Password),
+ encode($p, Pass);
+encode_message(pass_md5, {User, Password, Salt}) ->
+ Pass = pgsql_util:pass_md5(User, Password, Salt),
+ encode($p, Pass);
+encode_message(terminate, _) ->
+ encode($X, <<>>);
+encode_message(squery, Query) -> % squery as in simple query.
+ encode($Q, string(Query));
+encode_message(close, {Object, Name}) ->
+ Type = case Object of prepared_statement -> $S; portal -> $P end,
+ String = string(Name),
+ encode($C, <<Type/integer, String/binary>>);
+encode_message(describe, {Object, Name}) ->
+ ObjectP = case Object of prepared_statement -> $S; portal -> $P end,
+ NameP = string(Name),
+ encode($D, <<ObjectP:8/integer, NameP/binary>>);
+encode_message(flush, _) ->
+ encode($H, <<>>);
+encode_message(parse, {Name, Query, _Oids}) ->
+ StringName = string(Name),
+ StringQuery = string(Query),
+ encode($P, <<StringName/binary, StringQuery/binary, 0:16/integer>>);
+encode_message(bind, {NamePortal, NamePrepared,
+ Parameters, ResultFormats}) ->
+ PortalP = string(NamePortal),
+ PreparedP = string(NamePrepared),
+
+ ParamFormatsList = lists:map(
+ fun (Bin) when is_binary(Bin) -> <<1:16/integer>>;
+ (_Text) -> <<0:16/integer>> end,
+ Parameters),
+ ParamFormatsP = erlang:list_to_binary(ParamFormatsList),
+
+ NParameters = length(Parameters),
+ ParametersList = lists:map(
+ fun (null) ->
+ Minus = -1,
+ <<Minus:32/integer>>;
+ (Bin) when is_binary(Bin) ->
+ Size = size(Bin),
+ <<Size:32/integer, Bin/binary>>;
+ (Integer) when is_integer(Integer) ->
+ List = integer_to_list(Integer),
+ Bin = list_to_binary(List),
+ Size = size(Bin),
+ <<Size:32/integer, Bin/binary>>;
+ (Text) ->
+ Bin = list_to_binary(Text),
+ Size = size(Bin),
+ <<Size:32/integer, Bin/binary>>
+ end,
+ Parameters),
+ ParametersP = erlang:list_to_binary(ParametersList),
+
+ NResultFormats = length(ResultFormats),
+ ResultFormatsList = lists:map(
+ fun (binary) -> <<1:16/integer>>;
+ (text) -> <<0:16/integer>> end,
+ ResultFormats),
+ ResultFormatsP = erlang:list_to_binary(ResultFormatsList),
+
+ %%io:format("encode bind: ~p~n", [{PortalP, PreparedP,
+ %% NParameters, ParamFormatsP,
+ %% NParameters, ParametersP,
+ %% NResultFormats, ResultFormatsP}]),
+ encode($B, <<PortalP/binary, PreparedP/binary,
+ NParameters:16/integer, ParamFormatsP/binary,
+ NParameters:16/integer, ParametersP/binary,
+ NResultFormats:16/integer, ResultFormatsP/binary>>);
+encode_message(execute, {Portal, Limit}) ->
+ String = string(Portal),
+ encode($E, <<String/binary, Limit:32/integer>>);
+encode_message(sync, _) ->
+ encode($S, <<>>).
diff --git a/src/pgsql/pgsql_tcp.erl b/src/pgsql/pgsql_tcp.erl
new file mode 100644
index 000000000..21740258c
--- /dev/null
+++ b/src/pgsql/pgsql_tcp.erl
@@ -0,0 +1,88 @@
+%%% File : pgsql_tcp.erl
+%%% Author : Blah <cos@local>
+%%% Description : Unwrapping of TCP line protocol packages to postgres messages.
+%%% Created : 22 Jul 2005
+
+-module(pgsql_tcp).
+
+-behaviour(gen_server).
+
+-export([start/3, start_link/3]).
+
+%% gen_server callbacks
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ code_change/3,
+ handle_info/2,
+ terminate/2]).
+
+-record(state, {socket, protopid, buffer, as_binary}).
+
+start(Sock, ProtoPid, AsBin) ->
+ gen_server:start(?MODULE, [Sock, ProtoPid, AsBin], []).
+
+start_link(Sock, ProtoPid, AsBin) ->
+ gen_server:start_link(?MODULE, [Sock, ProtoPid, AsBin], []).
+
+init([Sock, ProtoPid, AsBin]) ->
+ inet:setopts(Sock, [{active, once}]),
+ {ok, #state{socket = Sock, protopid = ProtoPid,
+ buffer = <<>>, as_binary = AsBin}}.
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info({tcp, Sock, Bin},
+ #state{socket = Sock,
+ protopid = ProtoPid,
+ as_binary = AsBin,
+ buffer = Buffer} = State) ->
+ {ok, Rest} = process_buffer(ProtoPid, AsBin, <<Buffer/binary, Bin/binary>>),
+ inet:setopts(Sock, [{active, once}]),
+ {noreply, State#state{buffer = Rest}};
+handle_info({tcp_closed, Sock},
+ #state{socket = Sock,
+ protopid = ProtoPid} = State) ->
+ io:format("Sock closed~n", []),
+ ProtoPid ! {socket, Sock, closed},
+ {stop, tcp_close, State};
+handle_info({tcp_error, Sock, Reason},
+ #state{socket = Sock,
+ protopid = ProtoPid} = State) ->
+ io:format("Sock error~n", []),
+ ProtoPid ! {socket, Sock, {error, Reason}},
+ {stop, tcp_error, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+%% Given a binary that begins with a proper message header the binary
+%% will be processed for each full message it contains, and it will
+%% return any trailing incomplete messages.
+process_buffer(ProtoPid, AsBin,
+ Bin = <<Code:8/integer, Size:4/integer-unit:8, Rest/binary>>) ->
+ Payload = Size - 4,
+ if
+ size(Rest) >= Payload ->
+ <<Packet:Payload/binary, Rest1/binary>> = Rest,
+ {ok, Message} = pgsql_proto:decode_packet(Code, Packet, AsBin),
+ ProtoPid ! {pgsql, Message},
+ process_buffer(ProtoPid, AsBin, Rest1);
+ true ->
+ {ok, Bin}
+ end;
+process_buffer(_ProtoPid, _AsBin, Bin) when is_binary(Bin) ->
+ {ok, Bin}.
+
diff --git a/src/pgsql/pgsql_util.erl b/src/pgsql/pgsql_util.erl
new file mode 100644
index 000000000..d562f2970
--- /dev/null
+++ b/src/pgsql/pgsql_util.erl
@@ -0,0 +1,321 @@
+%%% File : pgsql_util.erl
+%%% Author : Christian Sunesson
+%%% Description : utility functions used in implementation of
+%%% postgresql driver.
+%%% Created : 11 May 2005 by Blah <cos@local>
+
+-module(pgsql_util).
+
+%% Key-Value handling
+-export([option/3]).
+
+%% Networking
+-export([socket/1]).
+-export([send/2, send_int/2, send_msg/3]).
+-export([recv_msg/2, recv_msg/1, recv_byte/2, recv_byte/1]).
+
+%% Protocol packing
+-export([string/1, make_pair/2, split_pair/2]).
+-export([split_pair_rec/2]).
+-export([count_string/1, to_string/2]).
+-export([oids/2, coldescs/3, datacoldescs/3]).
+-export([decode_row/3, decode_descs/2]).
+-export([errordesc/2]).
+-export([to_integer/1, to_atom/1]).
+
+-export([zip/2]).
+
+%% Constructing authentication messages.
+-export([pass_plain/1, pass_md5/3]).
+-import(erlang, [md5/1]).
+-export([hexlist/2]).
+
+%% Lookup key in a plist stored in process dictionary under 'options'.
+%% Default is returned if there is no value for Key in the plist.
+option(Opts, Key, Default) ->
+ case proplists:get_value(Key, Opts, Default) of
+ Default ->
+ Default;
+ Value when is_binary(Value) ->
+ binary_to_list(Value);
+ Value ->
+ Value
+ end.
+
+
+%% Open a TCP connection
+socket({tcp, Host, Port}) ->
+ gen_tcp:connect(Host, Port, [{active, false}, binary, {packet, raw}], 5000).
+
+send(Sock, Packet) ->
+ gen_tcp:send(Sock, Packet).
+send_int(Sock, Int) ->
+ Packet = <<Int:32/integer>>,
+ gen_tcp:send(Sock, Packet).
+
+send_msg(Sock, Code, Packet) when is_binary(Packet) ->
+ Len = size(Packet) + 4,
+ Msg = <<Code:8/integer, Len:4/integer-unit:8, Packet/binary>>,
+ gen_tcp:send(Sock, Msg).
+
+recv_msg(Sock, Timeout) ->
+ {ok, Head} = gen_tcp:recv(Sock, 5, Timeout),
+ <<Code:8/integer, Size:4/integer-unit:8>> = Head,
+ %%io:format("Code: ~p, Size: ~p~n", [Code, Size]),
+ if
+ Size > 4 ->
+ {ok, Packet} = gen_tcp:recv(Sock, Size-4, Timeout),
+ {ok, Code, Packet};
+ true ->
+ {ok, Code, <<>>}
+ end.
+recv_msg(Sock) ->
+ recv_msg(Sock, infinity).
+
+
+recv_byte(Sock) ->
+ recv_byte(Sock, infinity).
+recv_byte(Sock, Timeout) ->
+ case gen_tcp:recv(Sock, 1, Timeout) of
+ {ok, <<Byte:1/integer-unit:8>>} ->
+ {ok, Byte};
+ E={error, _Reason} ->
+ throw(E)
+ end.
+
+%% Convert String to binary
+string(String) when is_list(String) ->
+ Bin = list_to_binary(String),
+ <<Bin/binary, 0/integer>>;
+string(Bin) when is_binary(Bin) ->
+ <<Bin/binary, 0/integer>>.
+
+%%% Two zero terminated strings.
+make_pair(Key, Value) when is_atom(Key) ->
+ make_pair(atom_to_list(Key), Value);
+make_pair(Key, Value) when is_atom(Value) ->
+ make_pair(Key, atom_to_list(Value));
+make_pair(Key, Value) when is_list(Key), is_list(Value) ->
+ BinKey = list_to_binary(Key),
+ BinValue = list_to_binary(Value),
+ make_pair(BinKey, BinValue);
+make_pair(Key, Value) when is_binary(Key), is_binary(Value) ->
+ <<Key/binary, 0/integer,
+ Value/binary, 0/integer>>.
+
+split_pair(Bin, AsBin) when is_binary(Bin) ->
+ split_pair(binary_to_list(Bin), AsBin);
+split_pair(Str, AsBin) ->
+ split_pair_rec(Str, norec, AsBin).
+
+split_pair_rec(Bin, AsBin) when is_binary(Bin) ->
+ split_pair_rec(binary_to_list(Bin), AsBin);
+split_pair_rec(Arg, AsBin) ->
+ split_pair_rec(Arg,[], AsBin).
+
+split_pair_rec([], Acc, _AsBin) ->
+ lists:reverse(Acc);
+split_pair_rec([0], Acc, _AsBin) ->
+ lists:reverse(Acc);
+split_pair_rec(S, Acc, AsBin) ->
+ Fun = fun(C) -> C /= 0 end,
+ {K, [0|S1]} = lists:splitwith(Fun, S),
+ {V, [0|Tail]} = lists:splitwith(Fun, S1),
+ {Key, Value} = if AsBin ->
+ {list_to_binary(K), list_to_binary(V)};
+ true ->
+ {K, V}
+ end,
+ case Acc of
+ norec -> {Key, Value};
+ _ ->
+ split_pair_rec(Tail, [{Key, Value}| Acc], AsBin)
+ end.
+
+
+count_string(Bin) when is_binary(Bin) ->
+ count_string(Bin, 0).
+
+count_string(<<>>, N) ->
+ {N, <<>>};
+count_string(<<0/integer, Rest/binary>>, N) ->
+ {N, Rest};
+count_string(<<_C/integer, Rest/binary>>, N) ->
+ count_string(Rest, N+1).
+
+to_string(Bin, AsBin) when is_binary(Bin) ->
+ {Count, _} = count_string(Bin, 0),
+ <<String:Count/binary, _/binary>> = Bin,
+ if AsBin ->
+ {String, Count};
+ true ->
+ {binary_to_list(String), Count}
+ end.
+
+oids(<<>>, Oids) ->
+ lists:reverse(Oids);
+oids(<<Oid:32/integer, Rest/binary>>, Oids) ->
+ oids(Rest, [Oid|Oids]).
+
+coldescs(<<>>, Descs, _AsBin) ->
+ lists:reverse(Descs);
+coldescs(Bin, Descs, AsBin) ->
+ {Name, Count} = to_string(Bin, AsBin),
+ <<_:Count/binary, 0/integer,
+ TableOID:32/integer,
+ ColumnNumber:16/integer,
+ TypeId:32/integer,
+ TypeSize:16/integer-signed,
+ TypeMod:32/integer-signed,
+ FormatCode:16/integer,
+ Rest/binary>> = Bin,
+ Format = case FormatCode of
+ 0 -> text;
+ 1 -> binary
+ end,
+ Desc = {Name, Format, ColumnNumber,
+ TypeId, TypeSize, TypeMod,
+ TableOID},
+ coldescs(Rest, [Desc|Descs], AsBin).
+
+datacoldescs(N, <<16#ffffffff:32, Rest/binary>>, Descs) when N >= 0 ->
+ datacoldescs(N-1, Rest, [null|Descs]);
+datacoldescs(N,
+ <<Len:32/integer, Data:Len/binary, Rest/binary>>,
+ Descs) when N >= 0 ->
+ datacoldescs(N-1, Rest, [Data|Descs]);
+datacoldescs(_N, _, Descs) ->
+ lists:reverse(Descs).
+
+decode_descs(OidMap, Cols) ->
+ decode_descs(OidMap, Cols, []).
+decode_descs(_OidMap, [], Descs) ->
+ {ok, lists:reverse(Descs)};
+decode_descs(OidMap, [Col|ColTail], Descs) ->
+ {Name, Format, ColNumber, Oid, _, _, _} = Col,
+ OidName = dict:fetch(Oid, OidMap),
+ decode_descs(OidMap, ColTail, [{Name, Format, ColNumber, OidName, [], [], []}|Descs]).
+
+decode_row(Types, Values, AsBin) ->
+ decode_row(Types, Values, [], AsBin).
+decode_row([], [], Out, _AsBin) ->
+ {ok, lists:reverse(Out)};
+decode_row([Type|TypeTail], [Value|ValueTail], Out0, AsBin) ->
+ Out1 = decode_col(Type, Value, AsBin),
+ decode_row(TypeTail, ValueTail, [Out1|Out0], AsBin).
+
+decode_col({_, text, _, _, _, _, _}, Value, AsBin) ->
+ if AsBin -> Value;
+ true -> binary_to_list(Value)
+ end;
+decode_col({_Name, _Format, _ColNumber, varchar, _Size, _Modifier, _TableOID}, Value, AsBin) ->
+ if AsBin -> Value;
+ true -> binary_to_list(Value)
+ end;
+decode_col({_Name, _Format, _ColNumber, int4, _Size, _Modifier, _TableOID}, Value, _AsBin) ->
+ <<Int4:32/integer>> = Value,
+ Int4;
+decode_col({_Name, _Format, _ColNumber, Oid, _Size, _Modifier, _TableOID}, Value, _AsBin) ->
+ {Oid, Value}.
+
+errordesc(Bin, AsBin) ->
+ errordesc(Bin, [], AsBin).
+
+errordesc(<<0/integer, _Rest/binary>>, Lines, _AsBin) ->
+ lists:reverse(Lines);
+errordesc(<<Code/integer, Rest/binary>>, Lines, AsBin) ->
+ {String, Count} = to_string(Rest, AsBin),
+ <<_:Count/binary, 0, Rest1/binary>> = Rest,
+ Msg = case Code of
+ $S ->
+ {severity, to_atom(String)};
+ $C ->
+ {code, String};
+ $M ->
+ {message, String};
+ $D ->
+ {detail, String};
+ $H ->
+ {hint, String};
+ $P ->
+ {position, to_integer(String)};
+ $p ->
+ {internal_position, to_integer(String)};
+ $W ->
+ {where, String};
+ $F ->
+ {file, String};
+ $L ->
+ {line, to_integer(String)};
+ $R ->
+ {routine, String};
+ Unknown ->
+ {Unknown, String}
+ end,
+ errordesc(Rest1, [Msg|Lines]).
+
+%%% Zip two lists together
+zip(List1, List2) ->
+ zip(List1, List2, []).
+zip(List1, List2, Result) when List1 =:= [];
+ List2 =:= [] ->
+ lists:reverse(Result);
+zip([H1|List1], [H2|List2], Result) ->
+ zip(List1, List2, [{H1, H2}|Result]).
+
+%%% Authentication utils
+
+pass_plain(Password) ->
+ Pass = [Password, 0],
+ list_to_binary(Pass).
+
+%% MD5 authentication patch from
+%% Juhani Rankimies <juhani@juranki.com>
+%% (patch slightly rewritten, new bugs are mine :] /Christian Sunesson)
+
+%%
+%% MD5(MD5(password + user) + salt)
+%%
+
+pass_md5(User, Password, Salt) ->
+ Digest = hex(md5([Password, User])),
+ Encrypt = hex(md5([Digest, Salt])),
+ Pass = ["md5", Encrypt, 0],
+ list_to_binary(Pass).
+
+to_integer(B) when is_binary(B) ->
+ to_integer(binary_to_list(B));
+to_integer(S) ->
+ list_to_integer(S).
+
+to_atom(B) when is_binary(B) ->
+ to_atom(binary_to_list(B));
+to_atom(S) ->
+ list_to_atom(S).
+
+hex(B) when is_binary(B) ->
+ hexlist(binary_to_list(B), []).
+
+hexlist([], Acc) ->
+ lists:reverse(Acc);
+hexlist([N|Rest], Acc) ->
+ HighNibble = (N band 16#f0) bsr 4,
+ LowNibble = (N band 16#0f),
+ hexlist(Rest, [hexdigit(LowNibble), hexdigit(HighNibble)|Acc]).
+
+hexdigit(0) -> $0;
+hexdigit(1) -> $1;
+hexdigit(2) -> $2;
+hexdigit(3) -> $3;
+hexdigit(4) -> $4;
+hexdigit(5) -> $5;
+hexdigit(6) -> $6;
+hexdigit(7) -> $7;
+hexdigit(8) -> $8;
+hexdigit(9) -> $9;
+hexdigit(10) -> $a;
+hexdigit(11) -> $b;
+hexdigit(12) -> $c;
+hexdigit(13) -> $d;
+hexdigit(14) -> $e;
+hexdigit(15) -> $f.