diff options
author | Badlop <badlop@process-one.net> | 2013-03-19 13:29:15 +0100 |
---|---|---|
committer | Badlop <badlop@process-one.net> | 2013-03-19 13:30:17 +0100 |
commit | f92a94a7378856c2ff3217152ee62a9d035aadc6 (patch) | |
tree | 119ef937e6ebc97790d655145bf29fa394bf1177 /src | |
parent | Copied MySQL erlang library from ejabberd-modules SVN (diff) |
Copied PostgreSQL erlang library from ejabberd-modules SVN
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.in | 2 | ||||
-rwxr-xr-x | src/configure | 3 | ||||
-rw-r--r-- | src/configure.ac | 1 | ||||
-rw-r--r-- | src/pgsql/EPLICENSE | 286 | ||||
-rw-r--r-- | src/pgsql/Makefile.in | 38 | ||||
-rw-r--r-- | src/pgsql/Makefile.win32 | 18 | ||||
-rw-r--r-- | src/pgsql/pgsql.erl | 96 | ||||
-rw-r--r-- | src/pgsql/pgsql_proto.erl | 650 | ||||
-rw-r--r-- | src/pgsql/pgsql_tcp.erl | 88 | ||||
-rw-r--r-- | src/pgsql/pgsql_util.erl | 321 |
10 files changed, 1501 insertions, 2 deletions
diff --git a/src/Makefile.in b/src/Makefile.in index cf26246f8..4c7c2501c 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -77,7 +77,7 @@ endif prefix = @prefix@ exec_prefix = @exec_prefix@ -SUBDIRS = @mod_irc@ @mod_pubsub@ @mod_muc@ @mod_proxy65@ @eldap@ @pam@ @web@ mysql stringprep stun @tls@ @odbc@ @ejabberd_zlib@ +SUBDIRS = @mod_irc@ @mod_pubsub@ @mod_muc@ @mod_proxy65@ @eldap@ @pam@ @web@ mysql pgsql stringprep stun @tls@ @odbc@ @ejabberd_zlib@ ERLSHLIBS += expat_erl.so ERLBEHAVS = cyrsasl.erl gen_mod.erl p1_fsm.erl ejabberd_auth.erl SOURCES_ALL = $(wildcard *.erl) diff --git a/src/configure b/src/configure index 485527752..7b7066cb6 100755 --- a/src/configure +++ b/src/configure @@ -4696,7 +4696,7 @@ fi -ac_config_files="$ac_config_files Makefile $make_mod_irc $make_mod_muc $make_mod_pubsub $make_mod_proxy65 $make_eldap $make_pam $make_web mysql/Makefile stringprep/Makefile stun/Makefile $make_tls $make_odbc $make_ejabberd_zlib" +ac_config_files="$ac_config_files Makefile $make_mod_irc $make_mod_muc $make_mod_pubsub $make_mod_proxy65 $make_eldap $make_pam $make_web mysql/Makefile pgsql/Makefile stringprep/Makefile stun/Makefile $make_tls $make_odbc $make_ejabberd_zlib" #openssl @@ -5862,6 +5862,7 @@ do "$make_pam") CONFIG_FILES="$CONFIG_FILES $make_pam" ;; "$make_web") CONFIG_FILES="$CONFIG_FILES $make_web" ;; "mysql/Makefile") CONFIG_FILES="$CONFIG_FILES mysql/Makefile" ;; + "pgsql/Makefile") CONFIG_FILES="$CONFIG_FILES pgsql/Makefile" ;; "stringprep/Makefile") CONFIG_FILES="$CONFIG_FILES stringprep/Makefile" ;; "stun/Makefile") CONFIG_FILES="$CONFIG_FILES stun/Makefile" ;; "$make_tls") CONFIG_FILES="$CONFIG_FILES $make_tls" ;; diff --git a/src/configure.ac b/src/configure.ac index 13a3fec1d..b97ff1e2d 100644 --- a/src/configure.ac +++ b/src/configure.ac @@ -112,6 +112,7 @@ AC_CONFIG_FILES([Makefile $make_pam $make_web mysql/Makefile + pgsql/Makefile stringprep/Makefile stun/Makefile $make_tls diff --git a/src/pgsql/EPLICENSE b/src/pgsql/EPLICENSE new file mode 100644 index 000000000..36aa84e33 --- /dev/null +++ b/src/pgsql/EPLICENSE @@ -0,0 +1,286 @@ +ERLANG PUBLIC LICENSE +Version 1.1 + +1. Definitions. + +1.1. ``Contributor'' means each entity that creates or contributes to +the creation of Modifications. + +1.2. ``Contributor Version'' means the combination of the Original +Code, prior Modifications used by a Contributor, and the Modifications +made by that particular Contributor. + +1.3. ``Covered Code'' means the Original Code or Modifications or the +combination of the Original Code and Modifications, in each case +including portions thereof. + +1.4. ``Electronic Distribution Mechanism'' means a mechanism generally +accepted in the software development community for the electronic +transfer of data. + +1.5. ``Executable'' means Covered Code in any form other than Source +Code. + +1.6. ``Initial Developer'' means the individual or entity identified +as the Initial Developer in the Source Code notice required by Exhibit +A. + +1.7. ``Larger Work'' means a work which combines Covered Code or +portions thereof with code not governed by the terms of this License. + +1.8. ``License'' means this document. + +1.9. ``Modifications'' means any addition to or deletion from the +substance or structure of either the Original Code or any previous +Modifications. When Covered Code is released as a series of files, a +Modification is: + +A. Any addition to or deletion from the contents of a file containing + Original Code or previous Modifications. + +B. Any new file that contains any part of the Original Code or + previous Modifications. + +1.10. ``Original Code'' means Source Code of computer software code +which is described in the Source Code notice required by Exhibit A as +Original Code, and which, at the time of its release under this +License is not already Covered Code governed by this License. + +1.11. ``Source Code'' means the preferred form of the Covered Code for +making modifications to it, including all modules it contains, plus +any associated interface definition files, scripts used to control +compilation and installation of an Executable, or a list of source +code differential comparisons against either the Original Code or +another well known, available Covered Code of the Contributor's +choice. The Source Code can be in a compressed or archival form, +provided the appropriate decompression or de-archiving software is +widely available for no charge. + +1.12. ``You'' means an individual or a legal entity exercising rights +under, and complying with all of the terms of, this License. For legal +entities,``You'' includes any entity which controls, is controlled by, +or is under common control with You. For purposes of this definition, +``control'' means (a) the power, direct or indirect, to cause the +direction or management of such entity, whether by contract or +otherwise, or (b) ownership of fifty percent (50%) or more of the +outstanding shares or beneficial ownership of such entity. + +2. Source Code License. + +2.1. The Initial Developer Grant. +The Initial Developer hereby grants You a world-wide, royalty-free, +non-exclusive license, subject to third party intellectual property +claims: + +(a) to use, reproduce, modify, display, perform, sublicense and + distribute the Original Code (or portions thereof) with or without + Modifications, or as part of a Larger Work; and + +(b) under patents now or hereafter owned or controlled by Initial + Developer, to make, have made, use and sell (``Utilize'') the + Original Code (or portions thereof), but solely to the extent that + any such patent is reasonably necessary to enable You to Utilize + the Original Code (or portions thereof) and not to any greater + extent that may be necessary to Utilize further Modifications or + combinations. + +2.2. Contributor Grant. +Each Contributor hereby grants You a world-wide, royalty-free, +non-exclusive license, subject to third party intellectual property +claims: + +(a) to use, reproduce, modify, display, perform, sublicense and + distribute the Modifications created by such Contributor (or + portions thereof) either on an unmodified basis, with other + Modifications, as Covered Code or as part of a Larger Work; and + +(b) under patents now or hereafter owned or controlled by Contributor, + to Utilize the Contributor Version (or portions thereof), but + solely to the extent that any such patent is reasonably necessary + to enable You to Utilize the Contributor Version (or portions + thereof), and not to any greater extent that may be necessary to + Utilize further Modifications or combinations. + +3. Distribution Obligations. + +3.1. Application of License. +The Modifications which You contribute are governed by the terms of +this License, including without limitation Section 2.2. The Source +Code version of Covered Code may be distributed only under the terms +of this License, and You must include a copy of this License with +every copy of the Source Code You distribute. You may not offer or +impose any terms on any Source Code version that alters or restricts +the applicable version of this License or the recipients' rights +hereunder. However, You may include an additional document offering +the additional rights described in Section 3.5. + +3.2. Availability of Source Code. +Any Modification which You contribute must be made available in Source +Code form under the terms of this License either on the same media as +an Executable version or via an accepted Electronic Distribution +Mechanism to anyone to whom you made an Executable version available; +and if made available via Electronic Distribution Mechanism, must +remain available for at least twelve (12) months after the date it +initially became available, or at least six (6) months after a +subsequent version of that particular Modification has been made +available to such recipients. You are responsible for ensuring that +the Source Code version remains available even if the Electronic +Distribution Mechanism is maintained by a third party. + +3.3. Description of Modifications. +You must cause all Covered Code to which you contribute to contain a +file documenting the changes You made to create that Covered Code and +the date of any change. You must include a prominent statement that +the Modification is derived, directly or indirectly, from Original +Code provided by the Initial Developer and including the name of the +Initial Developer in (a) the Source Code, and (b) in any notice in an +Executable version or related documentation in which You describe the +origin or ownership of the Covered Code. + +3.4. Intellectual Property Matters + +(a) Third Party Claims. + If You have knowledge that a party claims an intellectual property + right in particular functionality or code (or its utilization + under this License), you must include a text file with the source + code distribution titled ``LEGAL'' which describes the claim and + the party making the claim in sufficient detail that a recipient + will know whom to contact. If you obtain such knowledge after You + make Your Modification available as described in Section 3.2, You + shall promptly modify the LEGAL file in all copies You make + available thereafter and shall take other steps (such as notifying + appropriate mailing lists or newsgroups) reasonably calculated to + inform those who received the Covered Code that new knowledge has + been obtained. + +(b) Contributor APIs. + If Your Modification is an application programming interface and + You own or control patents which are reasonably necessary to + implement that API, you must also include this information in the + LEGAL file. + +3.5. Required Notices. +You must duplicate the notice in Exhibit A in each file of the Source +Code, and this License in any documentation for the Source Code, where +You describe recipients' rights relating to Covered Code. If You +created one or more Modification(s), You may add your name as a +Contributor to the notice described in Exhibit A. If it is not +possible to put such notice in a particular Source Code file due to +its structure, then you must include such notice in a location (such +as a relevant directory file) where a user would be likely to look for +such a notice. You may choose to offer, and to charge a fee for, +warranty, support, indemnity or liability obligations to one or more +recipients of Covered Code. However, You may do so only on Your own +behalf, and not on behalf of the Initial Developer or any +Contributor. You must make it absolutely clear than any such warranty, +support, indemnity or liability obligation is offered by You alone, +and You hereby agree to indemnify the Initial Developer and every +Contributor for any liability incurred by the Initial Developer or +such Contributor as a result of warranty, support, indemnity or +liability terms You offer. + +3.6. Distribution of Executable Versions. +You may distribute Covered Code in Executable form only if the +requirements of Section 3.1-3.5 have been met for that Covered Code, +and if You include a notice stating that the Source Code version of +the Covered Code is available under the terms of this License, +including a description of how and where You have fulfilled the +obligations of Section 3.2. The notice must be conspicuously included +in any notice in an Executable version, related documentation or +collateral in which You describe recipients' rights relating to the +Covered Code. You may distribute the Executable version of Covered +Code under a license of Your choice, which may contain terms different +from this License, provided that You are in compliance with the terms +of this License and that the license for the Executable version does +not attempt to limit or alter the recipient's rights in the Source +Code version from the rights set forth in this License. If You +distribute the Executable version under a different license You must +make it absolutely clear that any terms which differ from this License +are offered by You alone, not by the Initial Developer or any +Contributor. You hereby agree to indemnify the Initial Developer and +every Contributor for any liability incurred by the Initial Developer +or such Contributor as a result of any such terms You offer. + +3.7. Larger Works. +You may create a Larger Work by combining Covered Code with other code +not governed by the terms of this License and distribute the Larger +Work as a single product. In such a case, You must make sure the +requirements of this License are fulfilled for the Covered Code. + +4. Inability to Comply Due to Statute or Regulation. +If it is impossible for You to comply with any of the terms of this +License with respect to some or all of the Covered Code due to statute +or regulation then You must: (a) comply with the terms of this License +to the maximum extent possible; and (b) describe the limitations and +the code they affect. Such description must be included in the LEGAL +file described in Section 3.4 and must be included with all +distributions of the Source Code. Except to the extent prohibited by +statute or regulation, such description must be sufficiently detailed +for a recipient of ordinary skill to be able to understand it. + +5. Application of this License. + +This License applies to code to which the Initial Developer has +attached the notice in Exhibit A, and to related Covered Code. + +6. CONNECTION TO MOZILLA PUBLIC LICENSE + +This Erlang License is a derivative work of the Mozilla Public +License, Version 1.0. It contains terms which differ from the Mozilla +Public License, Version 1.0. + +7. DISCLAIMER OF WARRANTY. + +COVERED CODE IS PROVIDED UNDER THIS LICENSE ON AN ``AS IS'' BASIS, +WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, +WITHOUT LIMITATION, WARRANTIES THAT THE COVERED CODE IS FREE OF +DEFECTS, MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE OR +NON-INFRINGING. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF +THE COVERED CODE IS WITH YOU. SHOULD ANY COVERED CODE PROVE DEFECTIVE +IN ANY RESPECT, YOU (NOT THE INITIAL DEVELOPER OR ANY OTHER +CONTRIBUTOR) ASSUME THE COST OF ANY NECESSARY SERVICING, REPAIR OR +CORRECTION. THIS DISCLAIMER OF WARRANTY CONSTITUTES AN ESSENTIAL PART +OF THIS LICENSE. NO USE OF ANY COVERED CODE IS AUTHORIZED HEREUNDER +EXCEPT UNDER THIS DISCLAIMER. + +8. TERMINATION. +This License and the rights granted hereunder will terminate +automatically if You fail to comply with terms herein and fail to cure +such breach within 30 days of becoming aware of the breach. All +sublicenses to the Covered Code which are properly granted shall +survive any termination of this License. Provisions which, by their +nature, must remain in effect beyond the termination of this License +shall survive. + +9. DISCLAIMER OF LIABILITY +Any utilization of Covered Code shall not cause the Initial Developer +or any Contributor to be liable for any damages (neither direct nor +indirect). + +10. MISCELLANEOUS +This License represents the complete agreement concerning the subject +matter hereof. If any provision is held to be unenforceable, such +provision shall be reformed only to the extent necessary to make it +enforceable. This License shall be construed by and in accordance with +the substantive laws of Sweden. Any dispute, controversy or claim +arising out of or relating to this License, or the breach, termination +or invalidity thereof, shall be subject to the exclusive jurisdiction +of Swedish courts, with the Stockholm City Court as the first +instance. + +EXHIBIT A. + +``The contents of this file are subject to the Erlang Public License, +Version 1.1, (the "License"); you may not use this file except in +compliance with the License. You should have received a copy of the +Erlang Public License along with this software. If not, it can be +retrieved via the world wide web at http://www.erlang.org/. + +Software distributed under the License is distributed on an "AS IS" +basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +the License for the specific language governing rights and limitations +under the License. + +The Initial Developer of the Original Code is Ericsson Utvecklings AB. +Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings +AB. All Rights Reserved.'' diff --git a/src/pgsql/Makefile.in b/src/pgsql/Makefile.in new file mode 100644 index 000000000..e77da8452 --- /dev/null +++ b/src/pgsql/Makefile.in @@ -0,0 +1,38 @@ +# $Id: Makefile.in 1453 2008-07-16 16:58:42Z badlop $ + +CC = @CC@ +CFLAGS = @CFLAGS@ +CPPFLAGS = @CPPFLAGS@ +LDFLAGS = @LDFLAGS@ +LIBS = @LIBS@ + +ERLANG_CFLAGS = @ERLANG_CFLAGS@ +ERLANG_LIBS = @ERLANG_LIBS@ + +EFLAGS += -I .. +EFLAGS += -pz .. + +# make debug=true to compile Erlang module with debug informations. +ifdef debug + EFLAGS+=+debug_info +endif + +OUTDIR = .. +SOURCES = $(wildcard *.erl) +BEAMS = $(addprefix $(OUTDIR)/,$(SOURCES:.erl=.beam)) + + +all: $(BEAMS) + +$(OUTDIR)/%.beam: %.erl + @ERLC@ -W $(EFLAGS) -o $(OUTDIR) $< + +clean: + rm -f $(BEAMS) + +distclean: clean + rm -f Makefile + +TAGS: + etags *.erl + diff --git a/src/pgsql/Makefile.win32 b/src/pgsql/Makefile.win32 new file mode 100644 index 000000000..e70aba9f1 --- /dev/null +++ b/src/pgsql/Makefile.win32 @@ -0,0 +1,18 @@ + +include ..\Makefile.inc + +EFLAGS = -I .. -pz .. + +OUTDIR = .. +BEAMS = ..\stun_codec.beam ..\ejabberd_stun.beam + +ALL : $(BEAMS) + +CLEAN : + -@erase $(BEAMS) + +$(OUTDIR)\stun_codec.beam : stun_codec.erl + erlc -W $(EFLAGS) -o $(OUTDIR) stun_codec.erl + +$(OUTDIR)\ejabberd_stun.beam : ejabberd_stun.erl + erlc -W $(EFLAGS) -o $(OUTDIR) ejabberd_stun.erl diff --git a/src/pgsql/pgsql.erl b/src/pgsql/pgsql.erl new file mode 100644 index 000000000..3f993ecb6 --- /dev/null +++ b/src/pgsql/pgsql.erl @@ -0,0 +1,96 @@ +%%% File : pgsql.erl +%%% Author : Christian Sunesson <chsu79@gmail.com> +%%% Description : PostgresQL interface +%%% Created : 11 May 2005 + +%% +%% API for accessing the postgres driver. +%% + +-module(pgsql). +-export([connect/1, connect/4, connect/5]). + +-export([squery/2, + pquery/3, + terminate/1, + prepare/3, unprepare/2, + execute/3]). + + +connect(Host, Database, User, Password) -> + connect([{database, Database}, + {host, Host}, + {user, User}, + {password, Password}]). + +connect(Host, Database, User, Password, Port) -> + connect([{database, Database}, + {host, Host}, + {user, User}, + {port, Port}, + {password, Password}]). + +connect(Options) -> + pgsql_proto:start(Options). + +%% Close a connection +terminate(Db) -> + gen_server:call(Db, terminate). + +%%% In the "simple query" protocol, the frontend just sends a +%%% textual query string, which is parsed and immediately +%%% executed by the backend. + +%% A simple query can contain multiple statements (separated with a semi-colon), +%% and each statement's response. + +%%% squery(Db, Query) -> {ok, Results} | ... no real error handling +%%% Query = string() +%%% Results = [Result] +%%% Result = {"SELECT", RowDesc, ResultSet} | ... +squery(Db, Query) -> + gen_server:call(Db, {squery, Query}, infinity). + +%%% In the "extended query" protocol, processing of queries is +%%% separated into multiple steps: parsing, binding of parameter +%%% values, and execution. This offers flexibility and performance +%%% benefits, at the cost of extra complexity. + +%%% pquery(Db, Query, Params) -> {ok, Command, Status, NameTypes, Rows} | timeout | ... +%%% Query = string() +%%% Params = [term()] +%%% Command = string() +%%% Status = idle | transaction | failed_transaction +%%% NameTypes = [{ColName, ColType}] +%%% Rows = [list()] +pquery(Db, Query, Params) -> + gen_server:call(Db, {equery, {Query, Params}}). + +%%% prepare(Db, Name, Query) -> {ok, Status, ParamTypes, ResultTypes} +%%% Status = idle | transaction | failed_transaction +%%% ParamTypes = [atom()] +%%% ResultTypes = [{ColName, ColType}] +prepare(Db, Name, Query) when is_atom(Name) -> + gen_server:call(Db, {prepare, {atom_to_list(Name), Query}}). + +%%% unprepare(Db, Name) -> ok | timeout | ... +%%% Name = atom() +unprepare(Db, Name) when is_atom(Name) -> + gen_server:call(Db, {unprepare, atom_to_list(Name)}). + +%%% execute(Db, Name, Params) -> {ok, Result} | timeout | ... +%%% Result = {'INSERT', NRows} | +%%% {'DELETE', NRows} | +%%% {'SELECT', ResultSet} | +%%% ... +%%% ResultSet = [Row] +%%% Row = list() +execute(Db, Name, Params) when is_atom(Name), is_list(Params) -> + Ref = make_ref(), + Db ! {execute, Ref, self(), {atom_to_list(Name), Params}}, + receive + {pgsql, Ref, Result} -> + {ok, Result} + after 5000 -> + timeout + end. diff --git a/src/pgsql/pgsql_proto.erl b/src/pgsql/pgsql_proto.erl new file mode 100644 index 000000000..fe49a4846 --- /dev/null +++ b/src/pgsql/pgsql_proto.erl @@ -0,0 +1,650 @@ +%%% File : pgsql_proto.erl +%%% Author : Christian Sunesson <chrisu@kth.se> +%%% Description : PostgreSQL protocol driver +%%% Created : 9 May 2005 + +%%% This is the protocol handling part of the PostgreSQL driver, it turns packages into +%%% erlang term messages and back. + +-module(pgsql_proto). + +-behaviour(gen_server). + +%% TODO: +%% When factorizing make clear distinction between message and packet. +%% Packet == binary on-wire representation +%% Message = parsed Packet as erlang terms. + +%%% Version 3.0 of the protocol. +%%% Supported in postgres from version 7.4 +-define(PROTOCOL_MAJOR, 3). +-define(PROTOCOL_MINOR, 0). + +%%% PostgreSQL protocol message codes +-define(PG_BACKEND_KEY_DATA, $K). +-define(PG_PARAMETER_STATUS, $S). +-define(PG_ERROR_MESSAGE, $E). +-define(PG_NOTICE_RESPONSE, $N). +-define(PG_EMPTY_RESPONSE, $I). +-define(PG_ROW_DESCRIPTION, $T). +-define(PG_DATA_ROW, $D). +-define(PG_READY_FOR_QUERY, $Z). +-define(PG_AUTHENTICATE, $R). +-define(PG_BIND, $B). +-define(PG_PARSE, $P). +-define(PG_COMMAND_COMPLETE, $C). +-define(PG_PARSE_COMPLETE, $1). +-define(PG_BIND_COMPLETE, $2). +-define(PG_CLOSE_COMPLETE, $3). +-define(PG_PORTAL_SUSPENDED, $s). +-define(PG_NO_DATA, $n). + +-export([start/1, start_link/1]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + code_change/3, + handle_info/2, + terminate/2]). + +%% For protocol unwrapping, pgsql_tcp for example. +-export([decode_packet/3]). +-export([encode_message/2]). +-export([encode/2]). + +-import(pgsql_util, [option/3]). +-import(pgsql_util, [socket/1]). +-import(pgsql_util, [send/2, send_int/2, send_msg/3]). +-import(pgsql_util, [recv_msg/2, recv_msg/1, recv_byte/2, recv_byte/1]). +-import(pgsql_util, [string/1, make_pair/2, split_pair/2]). +-import(pgsql_util, [count_string/1, to_string/2]). +-import(pgsql_util, [coldescs/3, datacoldescs/3]). +-import(pgsql_util, [to_integer/1, to_atom/1]). + +-record(state, {options, driver, params, socket, oidmap, as_binary}). + +start(Options) -> + gen_server:start(?MODULE, [self(), Options], []). + +start_link(Options) -> + gen_server:start_link(?MODULE, [self(), Options], []). + +init([DriverPid, Options]) -> + %%io:format("Init~n", []), + %% Default values: We connect to localhost on the standard TCP/IP + %% port. + Host = option(Options, host, "localhost"), + Port = option(Options, port, 5432), + AsBinary = option(Options, as_binary, false), + + case socket({tcp, Host, Port}) of + {ok, Sock} -> + connect(#state{options = Options, + driver = DriverPid, + as_binary = AsBinary, + socket = Sock}); + Error -> + Reason = {init, Error}, + {stop, Reason} + end. + +connect(StateData) -> + %%io:format("Connect~n", []), + %% Connection settings for database-login. + %% TODO: Check if the default values are relevant: + UserName = option(StateData#state.options, user, "cos"), + DatabaseName = option(StateData#state.options, database, "template1"), + + %% Make protocol startup packet. + Version = <<?PROTOCOL_MAJOR:16/integer, ?PROTOCOL_MINOR:16/integer>>, + User = make_pair(user, UserName), + Database = make_pair(database, DatabaseName), + StartupPacket = <<Version/binary, + User/binary, + Database/binary, + 0>>, + + %% Backend will continue with authentication after the startup packet + PacketSize = 4 + size(StartupPacket), + Sock = StateData#state.socket, + ok = gen_tcp:send(Sock, <<PacketSize:32/integer, StartupPacket/binary>>), + authenticate(StateData). + + +authenticate(StateData) -> + %% Await authentication request from backend. + Sock = StateData#state.socket, + AsBin = StateData#state.as_binary, + {ok, Code, Packet} = recv_msg(Sock, 5000), + {ok, Value} = decode_packet(Code, Packet, AsBin), + case Value of + %% Error response + {error_message, Message} -> + {stop, {authentication, Message}}; + {authenticate, {AuthMethod, Salt}} -> + case AuthMethod of + 0 -> % Auth ok + setup(StateData, []); + 1 -> % Kerberos 4 + {stop, {nyi, auth_kerberos4}}; + 2 -> % Kerberos 5 + {stop, {nyi, auth_kerberos5}}; + 3 -> % Plaintext password + Password = option(StateData#state.options, password, ""), + EncodedPass = encode_message(pass_plain, Password), + ok = send(Sock, EncodedPass), + authenticate(StateData); + 4 -> % Hashed password + {stop, {nyi, auth_crypt}}; + 5 -> % MD5 password + Password = option(StateData#state.options, password, ""), + User = option(StateData#state.options, user, ""), + EncodedPass = encode_message(pass_md5, + {User, Password, Salt}), + ok = send(Sock, EncodedPass), + authenticate(StateData); + _ -> + {stop, {authentication, {unknown, AuthMethod}}} + end; + %% Unknown message received + Any -> + {stop, {protocol_error, Any}} + end. + +setup(StateData, Params) -> + %% Receive startup messages until ReadyForQuery + Sock = StateData#state.socket, + AsBin = StateData#state.as_binary, + {ok, Code, Package} = recv_msg(Sock, 5000), + {ok, Pair} = decode_packet(Code, Package, AsBin), + case Pair of + %% BackendKeyData, necessary for issuing cancel requests + {backend_key_data, {Pid, Secret}} -> + Params1 = [{secret, {Pid, Secret}} | Params], + setup(StateData, Params1); + %% ParameterStatus, a key-value pair. + {parameter_status, {Key, Value}} -> + Params1 = [{{parameter, Key}, Value} | Params], + setup(StateData, Params1); + %% Error message, with a sequence of <<Code:8/integer, String, 0>> + %% of error descriptions. Code==0 terminates the Reason. + {error_message, Message} -> + gen_tcp:close(Sock), + {stop, {error_response, Message}}; + %% Notice Response, with a sequence of <<Code:8/integer, String,0>> + %% identified fields. Code==0 terminates the Notice. + {notice_response, Notice} -> + deliver(StateData, {pgsql_notice, Notice}), + setup(StateData, Params); + %% Ready for Query, backend is ready for a new query cycle + {ready_for_query, _Status} -> + connected(StateData#state{params = Params}, Sock); + Any -> + {stop, {unknown_setup, Any}} + end. + +%% Connected state. Can now start to push messages +%% between frontend and backend. But first some setup. +connected(StateData, Sock) -> + %% Protocol unwrapping process. Factored out to make future + %% SSL and unix domain support easier. Store process under + %% 'socket' in the process dictionary. + AsBin = StateData#state.as_binary, + {ok, Unwrapper} = pgsql_tcp:start_link(Sock, self(), AsBin), + ok = gen_tcp:controlling_process(Sock, Unwrapper), + + %% Lookup oid to type names and store them in a dictionary under + %% 'oidmap' in the process dictionary. + Packet = encode_message(squery, "SELECT oid, typname FROM pg_type"), + ok = send(Sock, Packet), + {ok, [{_, _ColDesc, Rows}]} = process_squery([], AsBin), + Rows1 = lists:map(fun ([CodeS, NameS]) -> + Code = to_integer(CodeS), + Name = to_atom(NameS), + {Code, Name} + end, + Rows), + OidMap = dict:from_list(Rows1), + + {ok, StateData#state{oidmap = OidMap}}. + + +handle_call(terminate, _From, State) -> + Sock = State#state.socket, + Packet = encode_message(terminate, []), + ok = send(Sock, Packet), + gen_tcp:close(Sock), + Reply = ok, + {stop, normal, Reply, State}; + +%% Simple query +handle_call({squery, Query}, _From, State) -> + Sock = State#state.socket, + AsBin = State#state.as_binary, + Packet = encode_message(squery, Query), + ok = send(Sock, Packet), + {ok, Result} = process_squery([], AsBin), + case lists:keymember(error, 1, Result) of + true -> + RBPacket = encode_message(squery, "ROLLBACK"), + ok = send(Sock, RBPacket), + {ok, _RBResult} = process_squery([], AsBin); + _ -> + ok + end, + Reply = {ok, Result}, + {reply, Reply, State}; + +%% Extended query +%% simplistic version using the unnammed prepared statement and portal. +handle_call({equery, {Query, Params}}, _From, State) -> + Sock = State#state.socket, + ParseP = encode_message(parse, {"", Query, []}), + BindP = encode_message(bind, {"", "", Params, [binary]}), + DescribeP = encode_message(describe, {portal, ""}), + ExecuteP = encode_message(execute, {"", 0}), + SyncP = encode_message(sync, []), + ok = send(Sock, [ParseP, BindP, DescribeP, ExecuteP, SyncP]), + + {ok, Command, Desc, Status, Logs} = process_equery(State, []), + + OidMap = State#state.oidmap, + NameTypes = lists:map(fun({Name, _Format, _ColNo, Oid, _, _, _}) -> + {Name, dict:fetch(Oid, OidMap)} + end, + Desc), + Reply = {ok, Command, Status, NameTypes, Logs}, + {reply, Reply, State}; + +%% Prepare a statement, so it can be used for queries later on. +handle_call({prepare, {Name, Query}}, _From, State) -> + Sock = State#state.socket, + send_message(Sock, parse, {Name, Query, []}), + send_message(Sock, describe, {prepared_statement, Name}), + send_message(Sock, sync, []), + {ok, State, ParamDesc, ResultDesc} = process_prepare({[], []}), + OidMap = State#state.oidmap, + ParamTypes = + lists:map(fun (Oid) -> dict:fetch(Oid, OidMap) end, ParamDesc), + ResultNameTypes = lists:map(fun ({ColName, _Format, _ColNo, Oid, _, _, _}) -> + {ColName, dict:fetch(Oid, OidMap)} + end, + ResultDesc), + Reply = {ok, State, ParamTypes, ResultNameTypes}, + {reply, Reply, State}; + +%% Close a prepared statement. +handle_call({unprepare, Name}, _From, State) -> + Sock = State#state.socket, + send_message(Sock, close, {prepared_statement, Name}), + send_message(Sock, sync, []), + {ok, _Status} = process_unprepare(), + Reply = ok, + {reply, Reply, State}; + +%% Execute a prepared statement +handle_call({execute, {Name, Params}}, _From, State) -> + Sock = State#state.socket, + %%io:format("execute: ~p ~p ~n", [Name, Params]), + begin % Issue first requests for the prepared statement. + BindP = encode_message(bind, {"", Name, Params, [binary]}), + DescribeP = encode_message(describe, {portal, ""}), + ExecuteP = encode_message(execute, {"", 0}), + FlushP = encode_message(flush, []), + ok = send(Sock, [BindP, DescribeP, ExecuteP, FlushP]) + end, + receive + {pgsql, {bind_complete, _}} -> % Bind reply first. + %% Collect response to describe message, + %% which gives a hint of the rest of the messages. + {ok, Command, Result} = process_execute(State, Sock), + + begin % Close portal and end extended query. + CloseP = encode_message(close, {portal, ""}), + SyncP = encode_message(sync, []), + ok = send(Sock, [CloseP, SyncP]) + end, + receive + %% Collect response to close message. + {pgsql, {close_complete, _}} -> + receive + %% Collect response to sync message. + {pgsql, {ready_for_query, _Status}} -> + %%io:format("execute: ~p ~p ~p~n", + %% [Status, Command, Result]), + Reply = {ok, {Command, Result}}, + {reply, Reply, State}; + {pgsql, Unknown} -> + {stop, Unknown, {error, Unknown}, State} + end; + {pgsql, Unknown} -> + {stop, Unknown, {error, Unknown}, State} + end; + {pgsql, Unknown} -> + {stop, Unknown, {error, Unknown}, State} + end; + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + + +handle_cast(_Msg, State) -> + {noreply, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%% Socket closed or socket error messages. +handle_info({socket, _Sock, Condition}, State) -> + {stop, {socket, Condition}, State}; +handle_info(_Info, State) -> + {noreply, State}. + + +terminate(_Reason, _State) -> + ok. + + +deliver(State, Message) -> + DriverPid = State#state.driver, + DriverPid ! Message. + +%% In the process_squery state we collect responses until the backend is +%% done processing. +process_squery(Log, AsBin) -> + receive + {pgsql, {row_description, Cols}} -> + {ok, Command, Rows} = process_squery_cols([], AsBin), + process_squery([{Command, Cols, Rows}|Log], AsBin); + {pgsql, {command_complete, Command}} -> + process_squery([Command|Log], AsBin); + {pgsql, {ready_for_query, _Status}} -> + {ok, lists:reverse(Log)}; + {pgsql, {error_message, Error}} -> + process_squery([{error, Error}|Log], AsBin); + {pgsql, _Any} -> + process_squery(Log, AsBin) + end. +process_squery_cols(Log, AsBin) -> + receive + {pgsql, {data_row, Row}} -> + process_squery_cols( + [lists:map( + fun(null) -> + null; + (R) when AsBin == true -> + R; + (R) -> + binary_to_list(R) + end, Row) | Log], AsBin); + {pgsql, {command_complete, Command}} -> + {ok, Command, lists:reverse(Log)} + end. + +process_equery(State, Log) -> + receive + %% Consume parse and bind complete messages when waiting for the first + %% first row_description message. What happens if the equery doesnt + %% return a result set? + {pgsql, {parse_complete, _}} -> + process_equery(State, Log); + {pgsql, {bind_complete, _}} -> + process_equery(State, Log); + {pgsql, {row_description, Descs}} -> + OidMap = State#state.oidmap, + {ok, Descs1} = pgsql_util:decode_descs(OidMap, Descs), + process_equery_datarow(Descs1, Log, {undefined, Descs, undefined}); + {pgsql, Any} -> + process_equery(State, [Any|Log]) + end. + +process_equery_datarow(Types, Log, Info={Command, Desc, Status}) -> + receive + %% + {pgsql, {command_complete, Command1}} -> + process_equery_datarow(Types, Log, {Command1, Desc, Status}); + {pgsql, {ready_for_query, Status1}} -> + {ok, Command, Desc, Status1, lists:reverse(Log)}; + {pgsql, {data_row, Row}} -> + {ok, DecodedRow} = pgsql_util:decode_row(Types, Row), + process_equery_datarow(Types, [DecodedRow|Log], Info); + {pgsql, Any} -> + process_equery_datarow(Types, [Any|Log], Info) + end. + +process_prepare(Info={ParamDesc, ResultDesc}) -> + receive + {pgsql, {no_data, _}} -> + process_prepare({ParamDesc, []}); + {pgsql, {parse_complete, _}} -> + process_prepare(Info); + {pgsql, {parameter_description, Oids}} -> + process_prepare({Oids, ResultDesc}); + {pgsql, {row_description, Desc}} -> + process_prepare({ParamDesc, Desc}); + {pgsql, {ready_for_query, Status}} -> + {ok, Status, ParamDesc, ResultDesc}; + {pgsql, Any} -> + io:format("process_prepare: ~p~n", [Any]), + process_prepare(Info) + end. + +process_unprepare() -> + receive + {pgsql, {ready_for_query, Status}} -> + {ok, Status}; + {pgsql, {close_complate, []}} -> + process_unprepare(); + {pgsql, Any} -> + io:format("process_unprepare: ~p~n", [Any]), + process_unprepare() + end. + +process_execute(State, Sock) -> + %% Either the response begins with a no_data or a row_description + %% Needs to return {ok, Status, Result} + %% where Result = {Command, ...} + receive + {pgsql, {no_data, _}} -> + {ok, _Command, _Result} = process_execute_nodata(); + {pgsql, {row_description, Descs}} -> + OidMap = State#state.oidmap, + {ok, Types} = pgsql_util:decode_descs(OidMap, Descs), + {ok, _Command, _Result} = + process_execute_resultset(Sock, Types, []); + {pgsql, Unknown} -> + exit(Unknown) + end. + +process_execute_nodata() -> + receive + {pgsql, {command_complete, Cmd}} -> + Command = if is_binary(Cmd) -> + binary_to_list(Cmd); + true -> + Cmd + end, + case Command of + "INSERT "++Rest -> + {ok, [{integer, _, _Table}, + {integer, _, NRows}], _} = erl_scan:string(Rest), + {ok, 'INSERT', NRows}; + "SELECT" -> + {ok, 'SELECT', should_not_happen}; + "DELETE "++Rest -> + {ok, [{integer, _, NRows}], _} = + erl_scan:string(Rest), + {ok, 'DELETE', NRows}; + Any -> + {ok, nyi, Any} + end; + + {pgsql, Unknown} -> + exit(Unknown) + end. +process_execute_resultset(Sock, Types, Log) -> + receive + {pgsql, {command_complete, Command}} -> + {ok, to_atom(Command), lists:reverse(Log)}; + {pgsql, {data_row, Row}} -> + {ok, DecodedRow} = pgsql_util:decode_row(Types, Row), + process_execute_resultset(Sock, Types, [DecodedRow|Log]); + {pgsql, {portal_suspended, _}} -> + throw(portal_suspended); + {pgsql, Any} -> + %%process_execute_resultset(Types, [Any|Log]) + exit(Any) + end. + +%% With a message type Code and the payload Packet apropriate +%% decoding procedure can proceed. +decode_packet(Code, Packet, AsBin) -> + Ret = fun(CodeName, Values) -> {ok, {CodeName, Values}} end, + case Code of + ?PG_ERROR_MESSAGE -> + Message = pgsql_util:errordesc(Packet, AsBin), + Ret(error_message, Message); + ?PG_EMPTY_RESPONSE -> + Ret(empty_response, []); + ?PG_ROW_DESCRIPTION -> + <<_Columns:16/integer, ColDescs/binary>> = Packet, + Descs = coldescs(ColDescs, [], AsBin), + Ret(row_description, Descs); + ?PG_READY_FOR_QUERY -> + <<State:8/integer>> = Packet, + case State of + $I -> + Ret(ready_for_query, idle); + $T -> + Ret(ready_for_query, transaction); + $E -> + Ret(ready_for_query, failed_transaction) + end; + ?PG_COMMAND_COMPLETE -> + {Task, _} = to_string(Packet, AsBin), + Ret(command_complete, Task); + ?PG_DATA_ROW -> + <<NumberCol:16/integer, RowData/binary>> = Packet, + ColData = datacoldescs(NumberCol, RowData, []), + Ret(data_row, ColData); + ?PG_BACKEND_KEY_DATA -> + <<Pid:32/integer, Secret:32/integer>> = Packet, + Ret(backend_key_data, {Pid, Secret}); + ?PG_PARAMETER_STATUS -> + {Key, Value} = split_pair(Packet, AsBin), + Ret(parameter_status, {Key, Value}); + ?PG_NOTICE_RESPONSE -> + Ret(notice_response, []); + ?PG_AUTHENTICATE -> + <<AuthMethod:32/integer, Salt/binary>> = Packet, + Ret(authenticate, {AuthMethod, Salt}); + ?PG_PARSE_COMPLETE -> + Ret(parse_complete, []); + ?PG_BIND_COMPLETE -> + Ret(bind_complete, []); + ?PG_PORTAL_SUSPENDED -> + Ret(portal_suspended, []); + ?PG_CLOSE_COMPLETE -> + Ret(close_complete, []); + $t -> + <<_NParams:16/integer, OidsP/binary>> = Packet, + Oids = pgsql_util:oids(OidsP, []), + Ret(parameter_description, Oids); + ?PG_NO_DATA -> + Ret(no_data, []); + _Any -> + Ret(unknown, [Code]) + end. + +send_message(Sock, Type, Values) -> + %%io:format("send_message:~p~n", [{Type, Values}]), + Packet = encode_message(Type, Values), + ok = send(Sock, Packet). + +%% Add header to a message. +encode(Code, Packet) -> + Len = size(Packet) + 4, + <<Code:8/integer, Len:4/integer-unit:8, Packet/binary>>. + +%% Encode a message of a given type. +encode_message(pass_plain, Password) -> + Pass = pgsql_util:pass_plain(Password), + encode($p, Pass); +encode_message(pass_md5, {User, Password, Salt}) -> + Pass = pgsql_util:pass_md5(User, Password, Salt), + encode($p, Pass); +encode_message(terminate, _) -> + encode($X, <<>>); +encode_message(squery, Query) -> % squery as in simple query. + encode($Q, string(Query)); +encode_message(close, {Object, Name}) -> + Type = case Object of prepared_statement -> $S; portal -> $P end, + String = string(Name), + encode($C, <<Type/integer, String/binary>>); +encode_message(describe, {Object, Name}) -> + ObjectP = case Object of prepared_statement -> $S; portal -> $P end, + NameP = string(Name), + encode($D, <<ObjectP:8/integer, NameP/binary>>); +encode_message(flush, _) -> + encode($H, <<>>); +encode_message(parse, {Name, Query, _Oids}) -> + StringName = string(Name), + StringQuery = string(Query), + encode($P, <<StringName/binary, StringQuery/binary, 0:16/integer>>); +encode_message(bind, {NamePortal, NamePrepared, + Parameters, ResultFormats}) -> + PortalP = string(NamePortal), + PreparedP = string(NamePrepared), + + ParamFormatsList = lists:map( + fun (Bin) when is_binary(Bin) -> <<1:16/integer>>; + (_Text) -> <<0:16/integer>> end, + Parameters), + ParamFormatsP = erlang:list_to_binary(ParamFormatsList), + + NParameters = length(Parameters), + ParametersList = lists:map( + fun (null) -> + Minus = -1, + <<Minus:32/integer>>; + (Bin) when is_binary(Bin) -> + Size = size(Bin), + <<Size:32/integer, Bin/binary>>; + (Integer) when is_integer(Integer) -> + List = integer_to_list(Integer), + Bin = list_to_binary(List), + Size = size(Bin), + <<Size:32/integer, Bin/binary>>; + (Text) -> + Bin = list_to_binary(Text), + Size = size(Bin), + <<Size:32/integer, Bin/binary>> + end, + Parameters), + ParametersP = erlang:list_to_binary(ParametersList), + + NResultFormats = length(ResultFormats), + ResultFormatsList = lists:map( + fun (binary) -> <<1:16/integer>>; + (text) -> <<0:16/integer>> end, + ResultFormats), + ResultFormatsP = erlang:list_to_binary(ResultFormatsList), + + %%io:format("encode bind: ~p~n", [{PortalP, PreparedP, + %% NParameters, ParamFormatsP, + %% NParameters, ParametersP, + %% NResultFormats, ResultFormatsP}]), + encode($B, <<PortalP/binary, PreparedP/binary, + NParameters:16/integer, ParamFormatsP/binary, + NParameters:16/integer, ParametersP/binary, + NResultFormats:16/integer, ResultFormatsP/binary>>); +encode_message(execute, {Portal, Limit}) -> + String = string(Portal), + encode($E, <<String/binary, Limit:32/integer>>); +encode_message(sync, _) -> + encode($S, <<>>). diff --git a/src/pgsql/pgsql_tcp.erl b/src/pgsql/pgsql_tcp.erl new file mode 100644 index 000000000..21740258c --- /dev/null +++ b/src/pgsql/pgsql_tcp.erl @@ -0,0 +1,88 @@ +%%% File : pgsql_tcp.erl +%%% Author : Blah <cos@local> +%%% Description : Unwrapping of TCP line protocol packages to postgres messages. +%%% Created : 22 Jul 2005 + +-module(pgsql_tcp). + +-behaviour(gen_server). + +-export([start/3, start_link/3]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + code_change/3, + handle_info/2, + terminate/2]). + +-record(state, {socket, protopid, buffer, as_binary}). + +start(Sock, ProtoPid, AsBin) -> + gen_server:start(?MODULE, [Sock, ProtoPid, AsBin], []). + +start_link(Sock, ProtoPid, AsBin) -> + gen_server:start_link(?MODULE, [Sock, ProtoPid, AsBin], []). + +init([Sock, ProtoPid, AsBin]) -> + inet:setopts(Sock, [{active, once}]), + {ok, #state{socket = Sock, protopid = ProtoPid, + buffer = <<>>, as_binary = AsBin}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info({tcp, Sock, Bin}, + #state{socket = Sock, + protopid = ProtoPid, + as_binary = AsBin, + buffer = Buffer} = State) -> + {ok, Rest} = process_buffer(ProtoPid, AsBin, <<Buffer/binary, Bin/binary>>), + inet:setopts(Sock, [{active, once}]), + {noreply, State#state{buffer = Rest}}; +handle_info({tcp_closed, Sock}, + #state{socket = Sock, + protopid = ProtoPid} = State) -> + io:format("Sock closed~n", []), + ProtoPid ! {socket, Sock, closed}, + {stop, tcp_close, State}; +handle_info({tcp_error, Sock, Reason}, + #state{socket = Sock, + protopid = ProtoPid} = State) -> + io:format("Sock error~n", []), + ProtoPid ! {socket, Sock, {error, Reason}}, + {stop, tcp_error, State}; +handle_info(_Info, State) -> + {noreply, State}. + + +terminate(_Reason, _State) -> + ok. + + +%% Given a binary that begins with a proper message header the binary +%% will be processed for each full message it contains, and it will +%% return any trailing incomplete messages. +process_buffer(ProtoPid, AsBin, + Bin = <<Code:8/integer, Size:4/integer-unit:8, Rest/binary>>) -> + Payload = Size - 4, + if + size(Rest) >= Payload -> + <<Packet:Payload/binary, Rest1/binary>> = Rest, + {ok, Message} = pgsql_proto:decode_packet(Code, Packet, AsBin), + ProtoPid ! {pgsql, Message}, + process_buffer(ProtoPid, AsBin, Rest1); + true -> + {ok, Bin} + end; +process_buffer(_ProtoPid, _AsBin, Bin) when is_binary(Bin) -> + {ok, Bin}. + diff --git a/src/pgsql/pgsql_util.erl b/src/pgsql/pgsql_util.erl new file mode 100644 index 000000000..d562f2970 --- /dev/null +++ b/src/pgsql/pgsql_util.erl @@ -0,0 +1,321 @@ +%%% File : pgsql_util.erl +%%% Author : Christian Sunesson +%%% Description : utility functions used in implementation of +%%% postgresql driver. +%%% Created : 11 May 2005 by Blah <cos@local> + +-module(pgsql_util). + +%% Key-Value handling +-export([option/3]). + +%% Networking +-export([socket/1]). +-export([send/2, send_int/2, send_msg/3]). +-export([recv_msg/2, recv_msg/1, recv_byte/2, recv_byte/1]). + +%% Protocol packing +-export([string/1, make_pair/2, split_pair/2]). +-export([split_pair_rec/2]). +-export([count_string/1, to_string/2]). +-export([oids/2, coldescs/3, datacoldescs/3]). +-export([decode_row/3, decode_descs/2]). +-export([errordesc/2]). +-export([to_integer/1, to_atom/1]). + +-export([zip/2]). + +%% Constructing authentication messages. +-export([pass_plain/1, pass_md5/3]). +-import(erlang, [md5/1]). +-export([hexlist/2]). + +%% Lookup key in a plist stored in process dictionary under 'options'. +%% Default is returned if there is no value for Key in the plist. +option(Opts, Key, Default) -> + case proplists:get_value(Key, Opts, Default) of + Default -> + Default; + Value when is_binary(Value) -> + binary_to_list(Value); + Value -> + Value + end. + + +%% Open a TCP connection +socket({tcp, Host, Port}) -> + gen_tcp:connect(Host, Port, [{active, false}, binary, {packet, raw}], 5000). + +send(Sock, Packet) -> + gen_tcp:send(Sock, Packet). +send_int(Sock, Int) -> + Packet = <<Int:32/integer>>, + gen_tcp:send(Sock, Packet). + +send_msg(Sock, Code, Packet) when is_binary(Packet) -> + Len = size(Packet) + 4, + Msg = <<Code:8/integer, Len:4/integer-unit:8, Packet/binary>>, + gen_tcp:send(Sock, Msg). + +recv_msg(Sock, Timeout) -> + {ok, Head} = gen_tcp:recv(Sock, 5, Timeout), + <<Code:8/integer, Size:4/integer-unit:8>> = Head, + %%io:format("Code: ~p, Size: ~p~n", [Code, Size]), + if + Size > 4 -> + {ok, Packet} = gen_tcp:recv(Sock, Size-4, Timeout), + {ok, Code, Packet}; + true -> + {ok, Code, <<>>} + end. +recv_msg(Sock) -> + recv_msg(Sock, infinity). + + +recv_byte(Sock) -> + recv_byte(Sock, infinity). +recv_byte(Sock, Timeout) -> + case gen_tcp:recv(Sock, 1, Timeout) of + {ok, <<Byte:1/integer-unit:8>>} -> + {ok, Byte}; + E={error, _Reason} -> + throw(E) + end. + +%% Convert String to binary +string(String) when is_list(String) -> + Bin = list_to_binary(String), + <<Bin/binary, 0/integer>>; +string(Bin) when is_binary(Bin) -> + <<Bin/binary, 0/integer>>. + +%%% Two zero terminated strings. +make_pair(Key, Value) when is_atom(Key) -> + make_pair(atom_to_list(Key), Value); +make_pair(Key, Value) when is_atom(Value) -> + make_pair(Key, atom_to_list(Value)); +make_pair(Key, Value) when is_list(Key), is_list(Value) -> + BinKey = list_to_binary(Key), + BinValue = list_to_binary(Value), + make_pair(BinKey, BinValue); +make_pair(Key, Value) when is_binary(Key), is_binary(Value) -> + <<Key/binary, 0/integer, + Value/binary, 0/integer>>. + +split_pair(Bin, AsBin) when is_binary(Bin) -> + split_pair(binary_to_list(Bin), AsBin); +split_pair(Str, AsBin) -> + split_pair_rec(Str, norec, AsBin). + +split_pair_rec(Bin, AsBin) when is_binary(Bin) -> + split_pair_rec(binary_to_list(Bin), AsBin); +split_pair_rec(Arg, AsBin) -> + split_pair_rec(Arg,[], AsBin). + +split_pair_rec([], Acc, _AsBin) -> + lists:reverse(Acc); +split_pair_rec([0], Acc, _AsBin) -> + lists:reverse(Acc); +split_pair_rec(S, Acc, AsBin) -> + Fun = fun(C) -> C /= 0 end, + {K, [0|S1]} = lists:splitwith(Fun, S), + {V, [0|Tail]} = lists:splitwith(Fun, S1), + {Key, Value} = if AsBin -> + {list_to_binary(K), list_to_binary(V)}; + true -> + {K, V} + end, + case Acc of + norec -> {Key, Value}; + _ -> + split_pair_rec(Tail, [{Key, Value}| Acc], AsBin) + end. + + +count_string(Bin) when is_binary(Bin) -> + count_string(Bin, 0). + +count_string(<<>>, N) -> + {N, <<>>}; +count_string(<<0/integer, Rest/binary>>, N) -> + {N, Rest}; +count_string(<<_C/integer, Rest/binary>>, N) -> + count_string(Rest, N+1). + +to_string(Bin, AsBin) when is_binary(Bin) -> + {Count, _} = count_string(Bin, 0), + <<String:Count/binary, _/binary>> = Bin, + if AsBin -> + {String, Count}; + true -> + {binary_to_list(String), Count} + end. + +oids(<<>>, Oids) -> + lists:reverse(Oids); +oids(<<Oid:32/integer, Rest/binary>>, Oids) -> + oids(Rest, [Oid|Oids]). + +coldescs(<<>>, Descs, _AsBin) -> + lists:reverse(Descs); +coldescs(Bin, Descs, AsBin) -> + {Name, Count} = to_string(Bin, AsBin), + <<_:Count/binary, 0/integer, + TableOID:32/integer, + ColumnNumber:16/integer, + TypeId:32/integer, + TypeSize:16/integer-signed, + TypeMod:32/integer-signed, + FormatCode:16/integer, + Rest/binary>> = Bin, + Format = case FormatCode of + 0 -> text; + 1 -> binary + end, + Desc = {Name, Format, ColumnNumber, + TypeId, TypeSize, TypeMod, + TableOID}, + coldescs(Rest, [Desc|Descs], AsBin). + +datacoldescs(N, <<16#ffffffff:32, Rest/binary>>, Descs) when N >= 0 -> + datacoldescs(N-1, Rest, [null|Descs]); +datacoldescs(N, + <<Len:32/integer, Data:Len/binary, Rest/binary>>, + Descs) when N >= 0 -> + datacoldescs(N-1, Rest, [Data|Descs]); +datacoldescs(_N, _, Descs) -> + lists:reverse(Descs). + +decode_descs(OidMap, Cols) -> + decode_descs(OidMap, Cols, []). +decode_descs(_OidMap, [], Descs) -> + {ok, lists:reverse(Descs)}; +decode_descs(OidMap, [Col|ColTail], Descs) -> + {Name, Format, ColNumber, Oid, _, _, _} = Col, + OidName = dict:fetch(Oid, OidMap), + decode_descs(OidMap, ColTail, [{Name, Format, ColNumber, OidName, [], [], []}|Descs]). + +decode_row(Types, Values, AsBin) -> + decode_row(Types, Values, [], AsBin). +decode_row([], [], Out, _AsBin) -> + {ok, lists:reverse(Out)}; +decode_row([Type|TypeTail], [Value|ValueTail], Out0, AsBin) -> + Out1 = decode_col(Type, Value, AsBin), + decode_row(TypeTail, ValueTail, [Out1|Out0], AsBin). + +decode_col({_, text, _, _, _, _, _}, Value, AsBin) -> + if AsBin -> Value; + true -> binary_to_list(Value) + end; +decode_col({_Name, _Format, _ColNumber, varchar, _Size, _Modifier, _TableOID}, Value, AsBin) -> + if AsBin -> Value; + true -> binary_to_list(Value) + end; +decode_col({_Name, _Format, _ColNumber, int4, _Size, _Modifier, _TableOID}, Value, _AsBin) -> + <<Int4:32/integer>> = Value, + Int4; +decode_col({_Name, _Format, _ColNumber, Oid, _Size, _Modifier, _TableOID}, Value, _AsBin) -> + {Oid, Value}. + +errordesc(Bin, AsBin) -> + errordesc(Bin, [], AsBin). + +errordesc(<<0/integer, _Rest/binary>>, Lines, _AsBin) -> + lists:reverse(Lines); +errordesc(<<Code/integer, Rest/binary>>, Lines, AsBin) -> + {String, Count} = to_string(Rest, AsBin), + <<_:Count/binary, 0, Rest1/binary>> = Rest, + Msg = case Code of + $S -> + {severity, to_atom(String)}; + $C -> + {code, String}; + $M -> + {message, String}; + $D -> + {detail, String}; + $H -> + {hint, String}; + $P -> + {position, to_integer(String)}; + $p -> + {internal_position, to_integer(String)}; + $W -> + {where, String}; + $F -> + {file, String}; + $L -> + {line, to_integer(String)}; + $R -> + {routine, String}; + Unknown -> + {Unknown, String} + end, + errordesc(Rest1, [Msg|Lines]). + +%%% Zip two lists together +zip(List1, List2) -> + zip(List1, List2, []). +zip(List1, List2, Result) when List1 =:= []; + List2 =:= [] -> + lists:reverse(Result); +zip([H1|List1], [H2|List2], Result) -> + zip(List1, List2, [{H1, H2}|Result]). + +%%% Authentication utils + +pass_plain(Password) -> + Pass = [Password, 0], + list_to_binary(Pass). + +%% MD5 authentication patch from +%% Juhani Rankimies <juhani@juranki.com> +%% (patch slightly rewritten, new bugs are mine :] /Christian Sunesson) + +%% +%% MD5(MD5(password + user) + salt) +%% + +pass_md5(User, Password, Salt) -> + Digest = hex(md5([Password, User])), + Encrypt = hex(md5([Digest, Salt])), + Pass = ["md5", Encrypt, 0], + list_to_binary(Pass). + +to_integer(B) when is_binary(B) -> + to_integer(binary_to_list(B)); +to_integer(S) -> + list_to_integer(S). + +to_atom(B) when is_binary(B) -> + to_atom(binary_to_list(B)); +to_atom(S) -> + list_to_atom(S). + +hex(B) when is_binary(B) -> + hexlist(binary_to_list(B), []). + +hexlist([], Acc) -> + lists:reverse(Acc); +hexlist([N|Rest], Acc) -> + HighNibble = (N band 16#f0) bsr 4, + LowNibble = (N band 16#0f), + hexlist(Rest, [hexdigit(LowNibble), hexdigit(HighNibble)|Acc]). + +hexdigit(0) -> $0; +hexdigit(1) -> $1; +hexdigit(2) -> $2; +hexdigit(3) -> $3; +hexdigit(4) -> $4; +hexdigit(5) -> $5; +hexdigit(6) -> $6; +hexdigit(7) -> $7; +hexdigit(8) -> $8; +hexdigit(9) -> $9; +hexdigit(10) -> $a; +hexdigit(11) -> $b; +hexdigit(12) -> $c; +hexdigit(13) -> $d; +hexdigit(14) -> $e; +hexdigit(15) -> $f. |