aboutsummaryrefslogtreecommitdiff
path: root/src/mysql/mysql_recv.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mysql/mysql_recv.erl')
-rw-r--r--src/mysql/mysql_recv.erl165
1 files changed, 165 insertions, 0 deletions
diff --git a/src/mysql/mysql_recv.erl b/src/mysql/mysql_recv.erl
new file mode 100644
index 000000000..1d24ded7f
--- /dev/null
+++ b/src/mysql/mysql_recv.erl
@@ -0,0 +1,165 @@
+%%%-------------------------------------------------------------------
+%%% File : mysql_recv.erl
+%%% Author : Fredrik Thulin <ft@it.su.se>
+%%% Descrip.: Handles data being received on a MySQL socket. Decodes
+%%% per-row framing and sends each row to parent.
+%%%
+%%% Created : 4 Aug 2005 by Fredrik Thulin <ft@it.su.se>
+%%%
+%%% Note : All MySQL code was written by Magnus Ahltorp, originally
+%%% in the file mysql.erl - I just moved it here.
+%%%
+%%% Copyright (c) 2001-2004 Kungliga Tekniska Högskolan
+%%% See the file COPYING
+%%%
+%%% Signals this receiver process can send to it's parent
+%%% (the parent is a mysql_conn connection handler) :
+%%%
+%%% {mysql_recv, self(), data, Packet, Num}
+%%% {mysql_recv, self(), closed, {error, Reason}}
+%%% {mysql_recv, self(), closed, normal}
+%%%
+%%% Internally (from inside init/4 to start_link/4) the
+%%% following signals may be sent to the parent process :
+%%%
+%%% {mysql_recv, self(), init, {ok, Sock}}
+%%% {mysql_recv, self(), init, {error, E}}
+%%%
+%%%-------------------------------------------------------------------
+-module(mysql_recv).
+
+%%--------------------------------------------------------------------
+%% External exports (should only be used by the 'mysql_conn' module)
+%%--------------------------------------------------------------------
+-export([start_link/4
+ ]).
+
+-record(state, {
+ socket,
+ parent,
+ log_fun,
+ data
+ }).
+
+-define(SECURE_CONNECTION, 32768).
+-define(CONNECT_TIMEOUT, 5000).
+
+%%====================================================================
+%% External functions
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: start_link(Host, Port, LogFun, Parent)
+%% Host = string()
+%% Port = integer()
+%% LogFun = undefined | function() of arity 3
+%% Parent = pid(), process that should get received frames
+%% Descrip.: Start a process that connects to Host:Port and waits for
+%% data. When it has received a MySQL frame, it sends it to
+%% Parent and waits for the next frame.
+%% Returns : {ok, RecvPid, Socket} |
+%% {error, Reason}
+%% RecvPid = pid(), receiver process pid
+%% Socket = term(), gen_tcp socket
+%% Reason = atom() | string()
+%%--------------------------------------------------------------------
+start_link(Host, Port, LogFun, Parent) when is_list(Host), is_integer(Port) ->
+ RecvPid =
+ spawn_link(fun () ->
+ init(Host, Port, LogFun, Parent)
+ end),
+ %% wait for the socket from the spawned pid
+ receive
+ {mysql_recv, RecvPid, init, {error, E}} ->
+ {error, E};
+ {mysql_recv, RecvPid, init, {ok, Socket}} ->
+ {ok, RecvPid, Socket}
+ after ?CONNECT_TIMEOUT ->
+ catch exit(RecvPid, kill),
+ {error, "timeout"}
+ end.
+
+
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init((Host, Port, LogFun, Parent)
+%% Host = string()
+%% Port = integer()
+%% LogFun = undefined | function() of arity 3
+%% Parent = pid(), process that should get received frames
+%% Descrip.: Connect to Host:Port and then enter receive-loop.
+%% Returns : error | never returns
+%%--------------------------------------------------------------------
+init(Host, Port, LogFun, Parent) ->
+ case gen_tcp:connect(Host, Port, [binary, {packet, 0}]) of
+ {ok, Sock} ->
+ Parent ! {mysql_recv, self(), init, {ok, Sock}},
+ State = #state{socket = Sock,
+ parent = Parent,
+ log_fun = LogFun,
+ data = <<>>
+ },
+ loop(State);
+ E ->
+ mysql:log(LogFun, error,
+ "mysql_recv: Failed connecting to ~p:~p : ~p",
+ [Host, Port, E]),
+ Msg = lists:flatten(io_lib:format("connect failed : ~p", [E])),
+ Parent ! {mysql_recv, self(), init, {error, Msg}}
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: loop(State)
+%% State = state record()
+%% Descrip.: The main loop. Wait for data from our TCP socket and act
+%% on received data or signals that our socket was closed.
+%% Returns : error | never returns
+%%--------------------------------------------------------------------
+loop(State) ->
+ Sock = State#state.socket,
+ receive
+ {tcp, Sock, InData} ->
+ NewData = list_to_binary([State#state.data, InData]),
+ %% send data to parent if we have enough data
+ Rest = sendpacket(State#state.parent, NewData),
+ loop(State#state{data = Rest});
+ {tcp_error, Sock, Reason} ->
+ mysql:log(State#state.log_fun, error, "mysql_recv: "
+ "Socket ~p closed : ~p", [Sock, Reason]),
+ State#state.parent ! {mysql_recv, self(), closed,
+ {error, Reason}},
+ error;
+ {tcp_closed, Sock} ->
+ mysql:log(State#state.log_fun, debug, "mysql_recv: "
+ "Socket ~p closed", [Sock]),
+ State#state.parent ! {mysql_recv, self(), closed, normal},
+ error
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: sendpacket(Parent, Data)
+%% Parent = pid()
+%% Data = binary()
+%% Descrip.: Check if we have received one or more complete frames by
+%% now, and if so - send them to Parent.
+%% Returns : Rest = binary()
+%%--------------------------------------------------------------------
+%% send data to parent if we have enough data
+sendpacket(Parent, Data) ->
+ case Data of
+ <<Length:24/little, Num:8, D/binary>> ->
+ if
+ Length =< size(D) ->
+ {Packet, Rest} = split_binary(D, Length),
+ Parent ! {mysql_recv, self(), data, Packet, Num},
+ sendpacket(Parent, Rest);
+ true ->
+ Data
+ end;
+ _ ->
+ Data
+ end.