jabber.c

Go to the documentation of this file.
00001 /*
00002  * $Id: jabber.c 5572 2009-02-09 16:49:27Z henningw $
00003  *
00004  * XJAB module
00005  *
00006  * Copyright (C) 2001-2003 FhG Fokus
00007  *
00008  * This file is part of Kamailio, a free SIP server.
00009  *
00010  * Kamailio is free software; you can redistribute it and/or modify
00011  * it under the terms of the GNU General Public License as published by
00012  * the Free Software Foundation; either version 2 of the License, or
00013  * (at your option) any later version
00014  *
00015  * Kamailio is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  * GNU General Public License for more details.
00019  *
00020  * You should have received a copy of the GNU General Public License 
00021  * along with this program; if not, write to the Free Software 
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023  *
00024  * ---
00025  *
00026  * History
00027  * -------
00028  * 2003-02-28 connection management with ihttp implemented (dcm)
00029  * 2003-02-24 first version of callback functions for ihttp (dcm)
00030  * 2003-02-13 lot of comments enclosed in #ifdef XJ_EXTRA_DEBUG (dcm)
00031  * 2003-03-11 New module interface (janakj)
00032  * 2003-03-16 flags export parameter added (janakj)
00033  * 2003-04-06 rank 0 changed to 1 in child_init (janakj)
00034  * 2003-06-19 fixed too many Jabber workers bug (mostly on RH9.0) (dcm)
00035  * 2003-08-05 adapted to the new parse_content_type_hdr function (bogdan)
00036  * 2004-06-07 db API update (andrei)
00037  */
00038 
00039 
00040 #include <stdio.h>
00041 #include <string.h>
00042 #include <stdlib.h>
00043 #include <sys/types.h>
00044 #include <sys/wait.h>
00045 #include <sys/ipc.h>
00046 #include <unistd.h>
00047 #include <fcntl.h>
00048 #include <errno.h>
00049 
00050 #include "../../sr_module.h"
00051 #include "../../error.h"
00052 #include "../../ut.h"
00053 #include "../../mem/shm_mem.h"
00054 #include "../../mem/mem.h"
00055 #include "../../globals.h"
00056 #include "../../timer.h"
00057 #include "../../parser/parse_uri.h"
00058 #include "../../parser/parse_content.h"
00059 #include "../../parser/parse_from.h"
00060 #include "../../db/db.h"
00061 
00062 #include "../tm/tm_load.h"
00063 
00064 #ifdef HAVE_IHTTP
00065 #include "../ihttp/ih_load.h"
00066 #endif
00067 
00068 #include "xjab_load.h"
00069 #include "xjab_worker.h"
00070 #include "xjab_util.h"
00071 
00072 
00073 MODULE_VERSION
00074 
00075 /** TM bind */
00076 struct tm_binds tmb;
00077 
00078 #ifdef HAVE_IHTTP
00079 /** iHTTP bind */
00080 struct ih_binds ihb;
00081 /** iHTTP callback functions */
00082 int xjab_mod_info(ih_req_p _irp, void *_p, char *_bb, int *_bl, 
00083       char *_hb, int *_hl);
00084 int xjab_connections(ih_req_p _irp, void *_p, char *_bb, int *_bl, 
00085       char *_hb, int *_hl);
00086 #endif
00087 
00088 /** workers list */
00089 xj_wlist jwl = NULL;
00090 
00091 /** Structure that represents database connection */
00092 static db_con_t** db_con;
00093 static db_func_t jabber_dbf;
00094 
00095 /** parameters */
00096 
00097 static str db_url   = str_init("mysql://root@127.0.0.1/sip_jab");
00098 static str db_table = str_init("jusers");
00099 char *registrar=NULL; /*"sip:registrar@example.org";*/
00100 
00101 int nrw = 2;
00102 int max_jobs = 10;
00103 
00104 char *jaddress = "127.0.0.1";
00105 int jport = 5222;
00106 
00107 char *jaliases = NULL;
00108 char *jdomain  = NULL;
00109 char *proxy    = NULL;
00110 
00111 char* priority = "9";
00112 
00113 int delay_time = 90;
00114 int sleep_time = 20;
00115 int cache_time = 600;
00116 int check_time = 20;
00117 
00118 int **pipes = NULL;
00119 
00120 static int mod_init(void);
00121 static int child_init(int rank);
00122 
00123 int xjab_manage_sipmsg(struct sip_msg *msg, int type);
00124 void xjab_check_workers(int mpid);
00125 
00126 static int xj_send_message(struct sip_msg*, char*, char*);
00127 static int xj_join_jconf(struct sip_msg*, char*, char*);
00128 static int xj_exit_jconf(struct sip_msg*, char*, char*);
00129 static int xj_go_online(struct sip_msg*, char*, char*);
00130 static int xj_go_offline(struct sip_msg*, char*, char*);
00131 
00132 void destroy(void);
00133 
00134 /*
00135  * Exported functions
00136  */
00137 static cmd_export_t cmds[] = {
00138    {"jab_send_message",       (cmd_function)xj_send_message,
00139          0, 0, 0, REQUEST_ROUTE},
00140    {"jab_join_jconf",         (cmd_function)xj_join_jconf,
00141          0, 0, 0, REQUEST_ROUTE},
00142    {"jab_exit_jconf",         (cmd_function)xj_exit_jconf,
00143          0, 0, 0, REQUEST_ROUTE},
00144    {"jab_go_online",          (cmd_function)xj_go_online,
00145          0, 0, 0, REQUEST_ROUTE},
00146    {"jab_go_offline",         (cmd_function)xj_go_offline,
00147          0, 0, 0, REQUEST_ROUTE},
00148    {"jab_register_watcher",   (cmd_function)xj_register_watcher,
00149          XJ_NO_SCRIPT_F, 0, 0, 0            },
00150    {"jab_unregister_watcher", (cmd_function)xj_unregister_watcher,
00151          XJ_NO_SCRIPT_F, 0, 0, 0            },
00152    {"load_xjab",              (cmd_function)load_xjab,
00153          XJ_NO_SCRIPT_F, 0, 0, 0            },
00154    {0, 0, 0, 0, 0, 0}
00155 };
00156 
00157 
00158 /*
00159  * Exported parameters 
00160  */
00161 static param_export_t params[] = {
00162    {"db_url",     STR_PARAM, &db_url.s  },
00163    {"jaddress",   STR_PARAM, &jaddress  },
00164    {"aliases",    STR_PARAM, &jaliases  },
00165    {"proxy",      STR_PARAM, &proxy     },
00166    {"jdomain",    STR_PARAM, &jdomain   },
00167    {"registrar",  STR_PARAM, &registrar },
00168    {"priority",   STR_PARAM, &priority  },
00169    {"jport",      INT_PARAM, &jport     },
00170    {"workers",    INT_PARAM, &nrw       },
00171    {"max_jobs",   INT_PARAM, &max_jobs  },
00172    {"cache_time", INT_PARAM, &cache_time},
00173    {"delay_time", INT_PARAM, &delay_time},
00174    {"sleep_time", INT_PARAM, &sleep_time},
00175    {"check_time", INT_PARAM, &check_time},
00176    {0, 0, 0}
00177 };
00178 
00179 
00180 struct module_exports exports= {
00181    "jabber",
00182    DEFAULT_DLFLAGS, /* dlopen flags */
00183    cmds,       /* Exported functions */
00184    params,     /* Exported parameters */
00185    0,          /* exported statistics */
00186    0,          /* exported MI functions */
00187    0,          /* exported pseudo-variables */
00188    0,          /* extra processes */
00189    mod_init,   /* module initialization function */
00190    0,
00191    (destroy_function) destroy,
00192    child_init  /* per-child init function */
00193 };
00194 
00195 /**
00196  * init module function
00197  */
00198 static int mod_init(void)
00199 {
00200    LM_WARN("This module is deprecated and will be removed in the next release. Use the <purple> module.");
00201 #ifdef HAVE_IHTTP
00202    load_ih_f load_ih;
00203 #endif
00204    int  i;
00205    db_url.len = strlen(db_url.s);
00206 
00207    if(!jdomain)
00208    {
00209       LM_ERR("jdomain is NULL\n");
00210       return -1;
00211    }
00212 
00213    /* import mysql functions */
00214    if (db_bind_mod(&db_url, &jabber_dbf)<0)
00215    {
00216       LM_ERR("database module not found\n");
00217       return -1;
00218    }
00219 
00220    if (!DB_CAPABILITY(jabber_dbf, DB_CAP_QUERY)) {
00221       LM_ERR("database module does not implement 'query' function\n");
00222       return -1;
00223    }
00224 
00225    db_con = (db_con_t**)shm_malloc(nrw*sizeof(db_con_t*));
00226    if (db_con == NULL)
00227    {
00228       LM_ERR("no more shm memory\n");
00229       return -1;
00230    }
00231 
00232    /* load the TM API */
00233    if (load_tm_api(&tmb)!=0) {
00234       LM_ERR("can't load TM API\n");
00235       return -1;
00236    }
00237 
00238 #ifdef HAVE_IHTTP
00239    /* import the iHTTP auto-loading function */
00240    if ( !(load_ih=(load_ih_f)find_export("load_ih", IH_NO_SCRIPT_F, 0))) {
00241       LM_ERR("can't import load_ih\n");
00242       return -1;
00243    }
00244    /* let the auto-loading function load all TM stuff */
00245    if (load_ih( &ihb )==-1)
00246       return -1;
00247 #endif
00248    
00249    pipes = (int**)pkg_malloc(nrw*sizeof(int*));
00250    if (pipes == NULL)
00251    {
00252       LM_ERR("no more pkg memory (pipes)\n");
00253       return -1;
00254    }
00255    
00256    for(i=0; i<nrw; i++)
00257    {
00258       pipes[i] = (int*)pkg_malloc(2*sizeof(int));
00259       if (!pipes[i])
00260       {
00261          LM_ERR("no more pkg memory (pipes)\n");
00262          return -1;
00263       }
00264    }
00265    
00266    for(i=0; i<nrw; i++)
00267    {  
00268       db_con[i] = jabber_dbf.init(&db_url);
00269       if (!db_con[i])
00270       {
00271          LM_ERR("failed to connect to the database\n");
00272          return -1;
00273       }
00274       else
00275       {
00276          if (jabber_dbf.use_table(db_con[i], &db_table) < 0) {
00277             LM_ERR("use_table failed\n");
00278             return -1;
00279          }
00280          LM_DBG("database connection opened successfully\n");
00281       }
00282    }
00283 
00284    
00285    /** creating the pipes */
00286    
00287    for(i=0;i<nrw;i++)
00288    {
00289       /* create the pipe*/
00290       if (pipe(pipes[i])==-1) {
00291          LM_ERR("cannot create pipe!\n");
00292          return -1;
00293       }
00294       LM_DBG("pipe[%d] = <%d>-<%d>\n", i, pipes[i][0], pipes[i][1]);
00295    }
00296    
00297    if((jwl = xj_wlist_init(pipes,nrw,max_jobs,cache_time,sleep_time,
00298             delay_time)) == NULL)
00299    {
00300       LM_ERR("failed to initialize workers list\n");
00301       return -1;
00302    }
00303    
00304    if(xj_wlist_set_aliases(jwl, jaliases, jdomain, proxy) < 0)
00305    {
00306       LM_ERR("failed to set aliases and outbound proxy\n");
00307       return -1;
00308    }
00309 
00310    LM_DBG("initialized ...\n");  
00311    return 0;
00312 }
00313 
00314 /*
00315  * Initialize children
00316  */
00317 static int child_init(int rank)
00318 {
00319    int i, j, mpid, cpid;
00320    
00321    LM_DBG("initializing child <%d>\n", rank);
00322         /* Rank 0 is main process now - 1 is the first child (janakj) */
00323    if(rank == 1)
00324    {
00325 #ifdef HAVE_IHTTP
00326       /** register iHTTP callbacks -- go forward in any case*/
00327       ihb.reg_f("xjab", "XMPP Gateway", IH_MENU_YES,
00328             xjab_mod_info, NULL);
00329       ihb.reg_f("xjabc", "XMPP connections", IH_MENU_YES,
00330             xjab_connections, NULL);
00331 #endif
00332       if((mpid=fork())<0 )
00333       {
00334          LM_ERR("cannot launch worker's manager\n");
00335          return -1;
00336       }
00337       if(mpid == 0)
00338       {
00339          /** launching the workers */
00340          for(i=0;i<nrw;i++)
00341          {
00342             if ( (cpid=fork())<0 )
00343             {
00344                LM_ERR("cannot launch worker\n");
00345                return -1;
00346             }
00347             if (cpid == 0)
00348             {
00349                for(j=0;j<nrw;j++)
00350                   if(j!=i) close(pipes[j][0]);
00351                close(pipes[i][1]);
00352                if(xj_wlist_set_pid(jwl, getpid(), i) < 0)
00353                {
00354                   LM_ERR("failed to set worker's pid\n");
00355                   return -1;
00356                }
00357                xj_worker_process(jwl,jaddress,jport, priority, i, 
00358                      db_con[i], &jabber_dbf);
00359                exit(0);
00360             }
00361          }
00362 
00363          mpid = getpid();
00364          while(1)
00365          {
00366             sleep(check_time);
00367             xjab_check_workers(mpid);
00368          }
00369       }
00370    }
00371    
00372    //if(pipes)
00373    //{
00374    // for(i=0;i<nrw;i++)
00375    //    close(pipes[i][0]);
00376    //}
00377    return 0;
00378 }
00379 
00380 /**
00381  * send the SIP MESSAGE through Jabber
00382  */
00383 static int xj_send_message(struct sip_msg *msg, char* foo1, char * foo2)
00384 {
00385    LM_DBG("processing SIP MESSAGE\n");
00386    return xjab_manage_sipmsg(msg, XJ_SEND_MESSAGE);
00387 }
00388 
00389 /**
00390  * join a Jabber conference
00391  */
00392 static int xj_join_jconf(struct sip_msg *msg, char* foo1, char * foo2)
00393 {
00394    LM_DBG("join a Jabber conference\n");
00395    return xjab_manage_sipmsg(msg, XJ_JOIN_JCONF);
00396 }
00397 
00398 /**
00399  * exit from Jabber conference
00400  */
00401 static int xj_exit_jconf(struct sip_msg *msg, char* foo1, char * foo2)
00402 {
00403    LM_DBG("exit from a Jabber conference\n");
00404    return xjab_manage_sipmsg(msg, XJ_EXIT_JCONF);
00405 }
00406 
00407 /**
00408  * go online in Jabber network
00409  */
00410 static int xj_go_online(struct sip_msg *msg, char* foo1, char * foo2)
00411 {
00412    LM_DBG("go online in Jabber network\n");
00413    return xjab_manage_sipmsg(msg, XJ_GO_ONLINE);
00414 }
00415 
00416 /**
00417  * go offline in Jabber network
00418  */
00419 static int xj_go_offline(struct sip_msg *msg, char* foo1, char * foo2)
00420 {
00421    LM_DBG("go offline in Jabber network\n");
00422    return xjab_manage_sipmsg(msg, XJ_GO_OFFLINE);
00423 }
00424 
00425 /**
00426  * manage SIP message
00427  */
00428 int xjab_manage_sipmsg(struct sip_msg *msg, int type)
00429 {
00430    str body, dst, from_uri;
00431    xj_sipmsg jsmsg;
00432    int pipe, fl;
00433    t_xj_jkey jkey, *p;
00434    int mime;
00435 
00436    body.len = 0;
00437    body.s = 0;
00438 
00439    // extract message body - after that whole SIP MESSAGE is parsed
00440    if (type==XJ_SEND_MESSAGE)
00441    {
00442       /* get the message's body */
00443       body.s = get_body( msg );
00444       if(body.s==0) 
00445       {
00446          LM_ERR("cannot extract body from msg\n");
00447          goto error;
00448       }
00449       
00450       /* content-length (if present) must be already parsed */
00451       if(!msg->content_length)
00452       {
00453          LM_ERR("no Content-Length header found!\n");
00454          goto error;
00455       }
00456       body.len = get_content_length(msg);
00457 
00458       /* parse the content-type header */
00459       if((mime=parse_content_type_hdr(msg))<1)
00460       {
00461          LM_ERR("cannot parse Content-Type header\n");
00462          goto error;
00463       }
00464 
00465       /* check the content-type value */
00466       if(mime!=(TYPE_TEXT<<16)+SUBTYPE_PLAIN
00467          && mime!=(TYPE_MESSAGE<<16)+SUBTYPE_CPIM)
00468       {
00469          LM_ERR("invalid content-type for"
00470             " a message request! type found=%d\n", mime);
00471          goto error;
00472       }
00473    }
00474    
00475    // check for TO and FROM headers - if is not SIP MESSAGE 
00476    if(parse_headers(msg,HDR_TO_F|HDR_FROM_F,0)==-1 || !msg->to || !msg->from)
00477    {
00478       LM_ERR("cannot find TO or FROM HEADERS!\n");
00479       goto error;
00480    }
00481    
00482    /* parsing from header */
00483    if ( parse_from_header( msg )<0 || msg->from->parsed==NULL) 
00484    {
00485       LM_DBG("cannot get FROM header\n");
00486       goto error;
00487    }
00488    from_uri.s = ((struct to_body*)msg->from->parsed)->uri.s;
00489    from_uri.len = ((struct to_body*)msg->from->parsed)->uri.len;
00490    if(xj_extract_aor(&from_uri, 0))
00491    {
00492       LM_DBG("cannot get AoR from FROM header\n");
00493       goto error;
00494    }
00495 
00496    jkey.hash = xj_get_hash(&from_uri, NULL);
00497    jkey.id = &from_uri;
00498    // get the communication pipe with the worker
00499    switch(type)
00500    {
00501       case XJ_SEND_MESSAGE:
00502       case XJ_JOIN_JCONF:
00503       case XJ_GO_ONLINE:
00504          if((pipe = xj_wlist_get(jwl, &jkey, &p)) < 0)
00505          {
00506             LM_DBG("cannot find pipe of the worker!\n");
00507             goto error;
00508          }
00509       break;
00510       case XJ_EXIT_JCONF:
00511       case XJ_GO_OFFLINE:
00512          if((pipe = xj_wlist_check(jwl, &jkey, &p)) < 0)
00513          {
00514             LM_DBG("no open Jabber session for"
00515                   " <%.*s>!\n", from_uri.len, from_uri.s);
00516             goto error;
00517          }
00518       break;
00519       default:
00520          LM_DBG("error, strange SIP msg type!\n");
00521          goto error;
00522    }
00523 
00524    // if is for going ONLINE/OFFLINE we do not need the destination
00525    if(type==XJ_GO_ONLINE || type==XJ_GO_OFFLINE)
00526       goto prepare_job;
00527    
00528    // determination of destination
00529    // - try to get it from new_uri, r-uri or to hdr, but check it against
00530    // jdomain and aliases
00531    dst.len = 0;
00532    if( msg->new_uri.len > 0)
00533    {
00534       dst.s = msg->new_uri.s;
00535       dst.len = msg->new_uri.len;
00536       if(xj_wlist_check_aliases(jwl, &dst))
00537          dst.len = 0;
00538 #ifdef XJ_EXTRA_DEBUG
00539       else
00540          LM_DBG("using NEW URI for destination\n");
00541 #endif
00542    }
00543    
00544    if (dst.len == 0 &&  msg->first_line.u.request.uri.s != NULL
00545          && msg->first_line.u.request.uri.len > 0 )
00546    {
00547       dst.s = msg->first_line.u.request.uri.s;
00548       dst.len = msg->first_line.u.request.uri.len;
00549       if(xj_wlist_check_aliases(jwl, &dst))
00550          dst.len = 0;
00551 #ifdef XJ_EXTRA_DEBUG
00552       else
00553          LM_DBG("using R-URI for destination\n");
00554 #endif
00555    }
00556 
00557    if(dst.len == 0 && msg->to->parsed)
00558    {
00559       dst.s = ((struct to_body*)msg->to->parsed)->uri.s;
00560       dst.len = ((struct to_body*)msg->to->parsed)->uri.len;
00561       if(dst.s == NULL || xj_wlist_check_aliases(jwl, &dst))
00562          dst.len = 0;
00563 #ifdef XJ_EXTRA_DEBUG
00564       else
00565          LM_DBG("using TO-URI for destination\n");
00566 #endif
00567    }
00568    
00569    if(dst.len == 0)
00570    {
00571       LM_DBG("destination not found in SIP message\n");
00572       goto error;
00573    }
00574    
00575    /** skip 'sip:' and parameters in destination address */
00576    if(xj_extract_aor(&dst, 1))
00577    {
00578       LM_ERR("cannot get AoR for destination\n");
00579       goto error;
00580    }
00581 #ifdef XJ_EXTRA_DEBUG
00582    LM_DBG("destination after correction [%.*s].\n", dst.len, dst.s);
00583 #endif
00584    
00585 prepare_job:
00586    //putting the SIP message parts in share memory to be accessible by workers
00587     jsmsg = (xj_sipmsg)shm_malloc(sizeof(t_xj_sipmsg));
00588    memset(jsmsg, 0, sizeof(t_xj_sipmsg));
00589     if(jsmsg == NULL)
00590       return -1;
00591    
00592    switch(type)
00593    {
00594       case XJ_SEND_MESSAGE:
00595          jsmsg->msg.len = body.len;
00596          if((jsmsg->msg.s = (char*)shm_malloc(jsmsg->msg.len+1)) == NULL)
00597          {
00598             shm_free(jsmsg);
00599             goto error;
00600          }
00601          strncpy(jsmsg->msg.s, body.s, jsmsg->msg.len);
00602       break;
00603       case XJ_GO_ONLINE:
00604       case XJ_GO_OFFLINE:
00605          dst.len = 0;
00606          dst.s = 0;
00607       case XJ_JOIN_JCONF:
00608       case XJ_EXIT_JCONF:
00609          jsmsg->msg.len = 0;
00610          jsmsg->msg.s = NULL;
00611       break;
00612       default:
00613          LM_DBG("this SHOULD NOT appear\n");
00614          shm_free(jsmsg);
00615          goto error;
00616    }
00617    if(dst.len>0)
00618    {
00619       jsmsg->to.len = dst.len;
00620       if((jsmsg->to.s = (char*)shm_malloc(jsmsg->to.len+1))==NULL)
00621       {
00622          if(type == XJ_SEND_MESSAGE)
00623             shm_free(jsmsg->msg.s);
00624          shm_free(jsmsg);
00625          goto error;
00626       }
00627       strncpy(jsmsg->to.s, dst.s, jsmsg->to.len);
00628    }
00629    else
00630    {
00631       jsmsg->to.len = 0;
00632       jsmsg->to.s   = 0;
00633    }
00634 
00635    jsmsg->jkey = p;
00636    jsmsg->type = type;
00637    //jsmsg->jkey->hash = jkey.hash;
00638 
00639    LM_DBG("sending <%p> to worker through <%d>\n", jsmsg, pipe);
00640    // sending the SHM pointer of SIP message to the worker
00641    fl = write(pipe, &jsmsg, sizeof(jsmsg));
00642    if(fl != sizeof(jsmsg))
00643    {
00644       LM_ERR("failed to write to worker pipe!\n");
00645       if(type == XJ_SEND_MESSAGE)
00646          shm_free(jsmsg->msg.s);
00647       shm_free(jsmsg->to.s);
00648       shm_free(jsmsg);
00649       goto error;
00650    }
00651    
00652    return 1;
00653 error:
00654    return -1;
00655 }
00656 
00657 /**
00658  * destroy function of module
00659  */
00660 void destroy(void)
00661 {
00662    int i;
00663 #ifdef XJ_EXTRA_DEBUG
00664    LM_DBG("unloading module ...\n");
00665 #endif
00666    if(pipes)
00667    { // close the pipes
00668       for(i = 0; i < nrw; i++)
00669       {
00670          if(pipes[i])
00671          {
00672             close(pipes[i][0]);
00673             close(pipes[i][1]);
00674          }
00675          pkg_free(pipes[i]);
00676       }
00677       pkg_free(pipes);
00678    }
00679    // cleaning MySQL connections
00680    if(db_con != NULL)
00681    {
00682       for(i = 0; i<nrw; i++)
00683          jabber_dbf.close(db_con[i]);
00684       shm_free(db_con);
00685    }
00686          
00687    xj_wlist_free(jwl);
00688    LM_DBG("unloaded ...\n");
00689 }
00690 
00691 /**
00692  * register a watcher function for a Jabber user' presence
00693  */
00694 void xj_register_watcher(str *from, str *to, void *cbf, void *pp)
00695 {
00696    xj_sipmsg jsmsg = NULL;
00697    t_xj_jkey jkey, *jp;
00698    int pipe, fl;
00699    str from_uri, to_uri;
00700 
00701    if(!to || !from || !cbf)
00702       return;
00703 
00704 #ifdef XJ_EXTRA_DEBUG
00705    LM_DBG("from=[%.*s] to=[%.*s]\n", from->len,
00706        from->s, to->len, to->s);
00707 #endif
00708    from_uri.s = from->s;
00709    from_uri.len = from->len;
00710    if(xj_extract_aor(&from_uri, 0))
00711    {
00712       LM_ERR("cannot get AoR from FROM header\n");
00713       goto error;
00714    }
00715 
00716    jkey.hash = xj_get_hash(&from_uri, NULL);
00717    jkey.id = &from_uri;
00718 
00719    if((pipe = xj_wlist_get(jwl, &jkey, &jp)) < 0)
00720    {
00721       LM_DBG("cannot find pipe of the worker!\n");
00722       goto error;
00723    }
00724    
00725    //putting the SIP message parts in share memory to be accessible by workers
00726    jsmsg = (xj_sipmsg)shm_malloc(sizeof(t_xj_sipmsg));
00727    memset(jsmsg, 0, sizeof(t_xj_sipmsg));
00728    if(jsmsg == NULL)
00729       goto error;
00730    
00731    jsmsg->msg.len = 0;
00732    jsmsg->msg.s = NULL;
00733    
00734    to_uri.s = to->s;
00735    to_uri.len = to->len;
00736    /** skip 'sip:' and parameters in destination address */
00737    if(xj_extract_aor(&to_uri, 1))
00738    {
00739       LM_ERR("cannot get AoR for destination\n");
00740       goto error;
00741    }
00742 #ifdef XJ_EXTRA_DEBUG
00743    LM_DBG("destination after correction [%.*s].\n", to_uri.len, to_uri.s);
00744 #endif
00745 
00746    jsmsg->to.len = to_uri.len;
00747    if((jsmsg->to.s = (char*)shm_malloc(jsmsg->to.len+1)) == NULL)
00748    {
00749       if(jsmsg->msg.s)
00750          shm_free(jsmsg->msg.s);
00751       shm_free(jsmsg);
00752       goto error;
00753    }
00754    strncpy(jsmsg->to.s, to_uri.s, jsmsg->to.len);
00755    jsmsg->to.s[jsmsg->to.len] = '\0';
00756 
00757    jsmsg->jkey = jp;
00758    jsmsg->type = XJ_REG_WATCHER;
00759    //jsmsg->jkey->hash = jkey.hash;
00760    
00761    jsmsg->cbf = (pa_callback_f)cbf;
00762    jsmsg->p = pp;
00763 
00764 #ifdef XJ_EXTRA_DEBUG
00765    LM_DBG("sending <%p> to worker through <%d>\n", jsmsg, pipe);
00766 #endif
00767    // sending the SHM pointer of SIP message to the worker
00768    fl = write(pipe, &jsmsg, sizeof(jsmsg));
00769    if(fl != sizeof(jsmsg))
00770    {
00771       LM_ERR("failed to write to worker pipe!\n");
00772       if(jsmsg->msg.s)
00773          shm_free(jsmsg->msg.s);
00774       shm_free(jsmsg->to.s);
00775       shm_free(jsmsg);
00776       goto error;
00777    }
00778    
00779  error:
00780    return;
00781 }
00782 
00783 /**
00784  * unregister a watcher for a Jabber user' presence
00785  */
00786 void xj_unregister_watcher(str *from, str *to, void *cbf, void *pp)
00787 {
00788    if(!to || !from)
00789       return;
00790 }
00791 
00792 /**
00793  * check if all SER2Jab workers are still alive
00794  * - if not, try to launch new ones
00795  */
00796 void xjab_check_workers(int mpid)
00797 {
00798    int i, n, stat;
00799    //LM_DBG("time=%d\n", get_ticks());
00800    if(!jwl || jwl->len <= 0)
00801       return;
00802    for(i=0; i < jwl->len; i++)
00803    {
00804       if(jwl->workers[i].pid > 0)
00805       {
00806          stat = 0;
00807          n = waitpid(jwl->workers[i].pid, &stat, WNOHANG);
00808          if(n == 0 || n!=jwl->workers[i].pid)
00809             continue;
00810       
00811          LM_ERR("worker[%d][pid=%d] has exited - status=%d err=%d"
00812                "errno=%d\n", i, jwl->workers[i].pid, stat, n, errno);
00813          xj_wlist_clean_jobs(jwl, i, 1);
00814          xj_wlist_set_pid(jwl, -1, i);
00815       }
00816       
00817 #ifdef XJ_EXTRA_DEBUG
00818       LM_DBG("create a new worker[%d]\n", i);
00819 #endif
00820       if ( (stat=fork())<0 )
00821       {
00822 #ifdef XJ_EXTRA_DEBUG
00823          LM_DBG("cannot launch new worker[%d]\n", i);
00824 #endif
00825          LM_ERR("worker[%d] lost forever \n", i);
00826          return;
00827       }
00828       if (stat == 0)
00829       {
00830          if(xj_wlist_set_pid(jwl, getpid(), i) < 0)
00831          {
00832             LM_ERR("failed to set new worker's pid - w[%d]\n", i);
00833             return;
00834          }
00835          xj_worker_process(jwl,jaddress,jport,priority, i,
00836                db_con[i], &jabber_dbf);
00837          exit(0);
00838       }
00839    }        
00840 }
00841 
00842 #ifdef HAVE_IHTTP
00843 /**
00844  * Module's information retrieval - function to use with iHttp module
00845  *
00846  */ 
00847 int xjab_mod_info(ih_req_p _irp, void *_p, char *_bb, int *_bl, 
00848       char *_hb, int *_hl)
00849 {
00850    if(!_irp || !_bb || !_bl || *_bl <= 0 || !_hb || !_hl || *_hl <= 0)
00851       return -1;
00852    *_hl = 0;
00853    *_hb = 0;
00854    
00855    strcpy(_bb, "<h4>SER2Jabber Gateway</h4>");
00856    strcat(_bb, "<br>Module parameters:<br>");
00857    strcat(_bb, "<br> -- db table = ");
00858    strcat(_bb, db_table);
00859    strcat(_bb, "<br> -- workers = ");
00860    strcat(_bb, int2str(nrw, NULL));
00861    strcat(_bb, "<br> -- max jobs per worker = ");
00862    strcat(_bb, int2str(max_jobs, NULL));
00863 
00864    strcat(_bb, "<br> -- jabber server address = ");
00865    strcat(_bb, jaddress);
00866    strcat(_bb, "<br> -- jabber server port = ");
00867    strcat(_bb, int2str(jport, NULL));
00868 
00869    strcat(_bb, "<br> -- aliases = ");
00870    strcat(_bb, (jaliases)?jaliases:"NULL");
00871    strcat(_bb, "<br> -- jabber domain = ");
00872    strcat(_bb, (jdomain)?jdomain:"NULL");
00873    strcat(_bb, "<br> -- proxy address = ");
00874    strcat(_bb, (proxy)?proxy:"NULL");
00875 
00876    strcat(_bb, "<br> -- delay time = ");
00877    strcat(_bb, int2str(delay_time, NULL));
00878    strcat(_bb, "<br> -- sleep time = ");
00879    strcat(_bb, int2str(sleep_time, NULL));
00880    strcat(_bb, "<br> -- cache time = ");
00881    strcat(_bb, int2str(cache_time, NULL));
00882    strcat(_bb, "<br> -- check time = ");
00883    strcat(_bb, int2str(check_time, NULL));
00884    
00885    *_bl = strlen(_bb);
00886 
00887    return 0;
00888 }
00889 
00890 /**
00891  * SER2Jab connection management - function to use with iHttp module
00892  * - be aware of who is able to use the ihttp because he can close any 
00893  *   open connection between SER and Jabber server
00894  */ 
00895 int xjab_connections(ih_req_p _irp, void *_p, char *_bb, int *_bl, 
00896       char *_hb, int *_hl)
00897 {
00898    t_xj_jkey jkey, *p;
00899    str _u;
00900    ih_param_p _ipp = NULL;
00901    int idx, i, maxcount;
00902    char *cp;
00903 
00904    if(!_irp || !_bb || !_bl || *_bl <= 0 || !_hb || !_hl || *_hl <= 0)
00905       return -1;
00906    
00907    *_hl = 0;
00908    *_hb = 0;
00909    idx = -1;
00910    strcpy(_bb, "<h4>Active XMPP connections</h4>");
00911    
00912    if(_irp->params)
00913    {
00914       strcat(_bb, "<br><b>Close action is alpha release!</b><br>");
00915       _ipp = _irp->params;
00916       i = 0;
00917       while(_ipp)
00918       {
00919          switch(_ipp->name[0])
00920          {
00921             case 'w':
00922                idx = 0;
00923                cp = _ipp->value;
00924                while(*cp && *cp>='0' && *cp<='9')
00925                {
00926                   idx = idx*10 + *cp-'0';
00927                   cp++;
00928                }
00929                i++;
00930             break;
00931             case 'u':
00932                _u.s = _ipp->value;
00933                _u.len = strlen(_ipp->value);
00934                jkey.id = &_u;
00935                i++;
00936             break;
00937             case 'i':
00938                jkey.hash = 0;
00939                cp = _ipp->value;
00940                while(*cp && *cp>='0' && *cp<='9')
00941                {
00942                   jkey.hash = jkey.hash*10 + *cp-'0';
00943                   cp++;
00944                }
00945                i++;
00946             break;
00947             
00948          }
00949          _ipp = _ipp->next;
00950       }
00951       if(i!=3 || idx < 0 || idx >= jwl->len)
00952       {
00953          strcat(_bb, "<br><b><i>Bad parameters!</i></b>\n");
00954       }
00955       else
00956       {
00957          strcat(_bb, "<br><b><i>The connection of [");
00958          strcat(_bb, _u.s);
00959 
00960          if(xj_wlist_set_flag(jwl, &jkey, XJ_FLAG_CLOSE) < 0)
00961             strcat(_bb, "] does not exist!</i></b>\n");
00962          else
00963             strcat(_bb, "] was scheduled for closing!</i></b>\n");
00964       }
00965       *_bl = strlen(_bb);
00966 
00967       return 0;
00968    }
00969    
00970    if(jwl!=NULL && jwl->len > 0 && jwl->workers!=NULL)
00971    {
00972       for(idx=0; idx<jwl->len; idx++)
00973       {
00974          strcat(_bb, "<br><b><i>Worker[");
00975          strcat(_bb, int2str(idx, NULL));
00976          strcat(_bb, "]</i></b> &nbsp;&nbsp;pid=");
00977          strcat(_bb, int2str(jwl->workers[idx].pid, NULL));
00978          strcat(_bb, " &nbsp;&nbsp;nr of jobs=");
00979          strcat(_bb, int2str(jwl->workers[idx].nr, NULL));
00980          if(!jwl->workers[idx].sip_ids)
00981             continue;
00982          lock_set_get(jwl->sems, idx);
00983          maxcount = count234(jwl->workers[idx].sip_ids);
00984          for (i = 0; i < maxcount; i++) 
00985          {
00986             p = (xj_jkey)index234(jwl->workers[idx].sip_ids, i);
00987             if(p == NULL)
00988                continue;
00989             strcat(_bb, "<br>&nbsp;&nbsp;&nbsp;");
00990             strcat(_bb, int2str(i, NULL));
00991             strcat(_bb, ".&nbsp;&nbsp;&nbsp;");
00992             strcat(_bb, "<a href=\"xjabc?w=");
00993             strcat(_bb, int2str(idx, NULL));
00994             strcat(_bb, "&i=");
00995             strcat(_bb, int2str(p->hash, NULL));
00996             strcat(_bb, "&u=");
00997             strncat(_bb, p->id->s, p->id->len);
00998             strcat(_bb, "\">close</a>");
00999             strcat(_bb, "&nbsp;&nbsp;&nbsp;");
01000             strcat(_bb, int2str(p->hash, NULL));
01001             strcat(_bb, "&nbsp;&nbsp;&nbsp;");
01002             strncat(_bb, p->id->s, p->id->len);
01003          }
01004          lock_set_release(jwl->sems, idx);
01005       }
01006    }
01007    
01008    *_bl = strlen(_bb);
01009 
01010    return 0;
01011 }
01012 
01013 #endif // HAVE_IHTTP

Generated on Wed May 23 08:00:57 2012 for Kamailio - The Open Source SIP Server by  doxygen 1.5.6