xjab_worker.c

Go to the documentation of this file.
00001 /*
00002  * $Id: xjab_worker.c 4518 2008-07-28 15:39:28Z henningw $
00003  *
00004  * eXtended JABber module - worker implementation
00005  *
00006  * Copyright (C) 2001-2003 FhG Fokus
00007  *
00008  * This file is part of Kamailio, a free SIP server.
00009  *
00010  * Kamailio is free software; you can redistribute it and/or modify
00011  * it under the terms of the GNU General Public License as published by
00012  * the Free Software Foundation; either version 2 of the License, or
00013  * (at your option) any later version
00014  *
00015  * Kamailio is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  * GNU General Public License for more details.
00019  *
00020  * You should have received a copy of the GNU General Public License 
00021  * along with this program; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023  *
00024  * History
00025  * -------
00026  * 2003-01-20  xj_worker_precess function cleaning - some part of it moved to
00027  *             xj_worker_check_jcons function, (dcm)
00028  * 2003-02-28  send NOTIFYs even the connection is closed by user, (dcm)
00029  * 2003-03-11  major locking changes - uses locking.h, (andrei)
00030  * 2003-05-07  added new presence status - 'terminated' - when connection
00031  *             with Jabber server is lost or closed, (dcm)
00032  * 2003-05-09  added new presence status - 'refused' - when the presence
00033  *             subscription request is refused by target, (dcm)
00034  * 2003-05-09  new xj_worker_precess function cleaning - some part of it moved
00035  *             to xj_worker_check_qmsg and xj_worker_check_watcher functions,
00036  *             (dcm)
00037  * 2004-06-07  new DB api => xj_worker_process takes another parameter: dbf
00038  *              (andrei)
00039  */
00040 
00041 #include <string.h>
00042 #include <unistd.h>
00043 #include <stdio.h>
00044 #include <sys/time.h>
00045 #include <sys/types.h>
00046 #include <fcntl.h>
00047 #include <errno.h>
00048 #include <signal.h>
00049 
00050 #include "../../dprint.h"
00051 #include "../../timer.h"
00052 #include "../../mem/mem.h"
00053 #include "../../mem/shm_mem.h"
00054 #include "../tm/tm_load.h"
00055 
00056 #include "xjab_worker.h"
00057 #include "xjab_util.h"
00058 #include "xjab_jcon.h"
00059 #include "xjab_dmsg.h"
00060 #include "xode.h"
00061 #include "xjab_presence.h"
00062 
00063 #include "mdefines.h"
00064 
00065 #define XJAB_RESOURCE "serXjab"
00066 
00067 #define XJ_ADDRTR_NUL   0
00068 #define XJ_ADDRTR_S2J   1
00069 #define XJ_ADDRTR_J2S   2
00070 #define XJ_ADDRTR_CON   4
00071 
00072 #define XJ_MSG_POOL_SIZE   10
00073 
00074 // proxy address
00075 #define _PADDR(a) ((a)->aliases->proxy)
00076 
00077 /** TM bind */
00078 extern struct tm_binds tmb;
00079 
00080 /** debug info */
00081 int _xj_pid = 0;
00082 int main_loop = 1;
00083 
00084 /** **/
00085 extern char *registrar;
00086 static str jab_gw_name = {"jabber_gateway@127.0.0.1", 24};
00087 
00088 /**
00089  * address correction
00090  * alias A~B: flag == 0 => A->B, otherwise B->A
00091  */
00092 int xj_address_translation(str *src, str *dst, xj_jalias als, int flag)
00093 {
00094    char *p, *p0;
00095    int i, ll;
00096    
00097    if(!src || !dst || !src->s || !dst->s )
00098       return -1; 
00099    
00100    if(!als || !als->jdm || !als->jdm->s || als->jdm->len <= 0)
00101       goto done;
00102    
00103    dst->len = 0;
00104 #ifdef XJ_EXTRA_DEBUG
00105    LM_DBG("%d: - checking aliases\n", _xj_pid);
00106 #endif
00107    p = src->s;
00108 
00109    while(p<(src->s + src->len)   && *p != '@') 
00110       p++;
00111    if(*p != '@')
00112       goto done;
00113 
00114    p++;
00115    ll = src->s + src->len - p;
00116 
00117 #ifdef XJ_EXTRA_DEBUG
00118    LM_DBG("%d: - domain is [%.*s]\n",_xj_pid,ll,p);
00119 #endif
00120    
00121    /*** checking aliases */
00122    if(als->size > 0)
00123    {
00124       for(i=0; i<als->size; i++)
00125          if(als->a[i].len == ll && 
00126             !strncasecmp(p, als->a[i].s, als->a[i].len))
00127          {
00128             if(als->d[i])
00129             {
00130                if(flag & XJ_ADDRTR_S2J)
00131                {
00132                   strncpy(dst->s, src->s, src->len);
00133                   p0 = dst->s;
00134                   while(p0 < dst->s + (p-src->s)) 
00135                   {
00136                      if(*p0 == als->dlm)
00137                         *p0 = als->d[i];
00138                      p0++;
00139                   }
00140                   return 0;
00141                }
00142                if(flag & XJ_ADDRTR_J2S)
00143                {
00144                   strncpy(dst->s, src->s, src->len);
00145                   p0 = dst->s;
00146                   while(p0 < dst->s + (p-src->s)) 
00147                   {
00148                      if(*p0 == als->d[i])
00149                         *p0 = als->dlm;
00150                      p0++;
00151                   }                 
00152                   return 0;
00153                }
00154             }
00155             goto done;
00156          }
00157    }
00158 
00159 #ifdef XJ_EXTRA_DEBUG
00160    LM_DBG("%d: - doing address correction\n",
00161          _xj_pid);   
00162 #endif
00163    
00164    if(flag & XJ_ADDRTR_S2J)
00165    {
00166       if(als->jdm->len != ll || strncasecmp(p, als->jdm->s, als->jdm->len))
00167       {
00168          LM_DBG("%d: - wrong Jabber"
00169             " destination <%.*s>!\n", _xj_pid, src->len, src->s);
00170          return -1;
00171       }
00172       if(flag & XJ_ADDRTR_CON)
00173       {
00174 #ifdef XJ_EXTRA_DEBUG
00175          LM_DBG("%d: - that is for"
00176             " Jabber conference\n", _xj_pid);
00177 #endif
00178          p0 = p-1;
00179          while(p0 > src->s && *p0 != als->dlm)
00180             p0--;
00181          if(p0 <= src->s)
00182             return -1;
00183          p0--;
00184          while(p0 > src->s && *p0 != als->dlm)
00185             p0--;
00186          if(*p0 != als->dlm)
00187             return -1;
00188          dst->len = p - p0 - 2;
00189          strncpy(dst->s, p0+1, dst->len);
00190          dst->s[dst->len]=0;
00191          p = dst->s;
00192          while(p < (dst->s + dst->len) && *p!=als->dlm)
00193             p++;
00194          if(*p==als->dlm)
00195             *p = '@';
00196          return 0;
00197       }
00198 #ifdef XJ_EXTRA_DEBUG
00199       LM_DBG("%d: - that is for Jabber network\n", _xj_pid);
00200 #endif
00201       dst->len = p - src->s - 1;
00202       strncpy(dst->s, src->s, dst->len);
00203       dst->s[dst->len]=0;
00204       if((p = strchr(dst->s, als->dlm)) != NULL)
00205          *p = '@';
00206       else
00207       {
00208          LM_DBG("%d: - wrong Jabber"
00209             " destination <%.*s>!!!\n", _xj_pid, src->len, src->s);
00210          return -1;
00211       }
00212       return 0;
00213    }
00214    if(flag & XJ_ADDRTR_J2S)
00215    {
00216       *(p-1) = als->dlm;
00217       p0 = src->s + src->len;
00218       while(p0 > p)
00219       {
00220          if(*p0 == '/')
00221          {
00222             src->len = p0 - src->s;
00223             *p0 = 0;
00224          }
00225          p0--;
00226       }
00227       strncpy(dst->s, src->s, src->len);
00228       dst->s[src->len] = '@';
00229       dst->s[src->len+1] = 0;
00230       strncat(dst->s, als->jdm->s, als->jdm->len);
00231       dst->len = strlen(dst->s);
00232       return 0;
00233    }
00234 
00235 done:
00236    dst->s = src->s;
00237    dst->len = src->len;
00238    return 0;   
00239 }
00240 
00241 /**
00242  * worker implementation 
00243  * - jwl : pointer to the workers list
00244  * - jaddress : address of the jabber server
00245  * - jport : port of the jabber server
00246  * - rank : worker's rank
00247  * - db_con : connection to database
00248  * - priority: jabber's priority
00249  *   dbf: database module callbacks structure
00250  * return : 0 on success or <0 on error
00251  */
00252 int xj_worker_process(xj_wlist jwl, char* jaddress, int jport, char* priority,
00253       int rank, db_con_t* db_con, db_func_t* dbf)
00254 {
00255    int pipe, ret, i, pos, maxfd, flag;
00256    xj_jcon_pool jcp;
00257    struct timeval tmv;
00258    fd_set set, mset;
00259    xj_sipmsg jsmsg;
00260    str sto;
00261    xj_jcon jbc = NULL;
00262    xj_jconf jcf = NULL;
00263    char *p, buff[1024], recv_buff[4096];
00264    int flags, nr, ltime = 0;
00265 
00266    static str tmp1 = str_init("sip_id");
00267    static str tmp2 = str_init("type");
00268    static str tmp3 = str_init("jab_id");
00269    static str tmp4 = str_init("jab_passwd");
00270    
00271    db_key_t keys[] = {&tmp1, &tmp2};
00272    db_val_t vals[2];
00273    db_key_t col[] = {&tmp3, &tmp4};
00274    db_res_t* res = NULL;
00275 
00276    vals[0].type=DB_STRING;
00277    vals[0].nul=0;
00278    vals[0].val.string_val=buff;
00279    vals[1].type=DB_INT;
00280    vals[1].nul=0;
00281    vals[1].val.int_val=0;
00282       
00283    _xj_pid = getpid();
00284    
00285    //signal(SIGTERM, xj_sig_handler);
00286    //signal(SIGINT, xj_sig_handler);
00287    //signal(SIGQUIT, xj_sig_handler);
00288    signal(SIGSEGV, xj_sig_handler);
00289 
00290    if(registrar)
00291    {
00292       jab_gw_name.s = registrar;
00293       jab_gw_name.len = strlen(registrar);
00294       if(registrar[0]== 's' && registrar[1]== 'i' &&
00295          registrar[2]== 'p' && registrar[3]== ':')
00296       {
00297          jab_gw_name.s += 4;
00298          jab_gw_name.len -= 4;
00299       }
00300    }
00301 
00302    if(!jwl || !jwl->aliases || !jwl->aliases->jdm 
00303          || !jaddress || rank >= jwl->len)
00304    {
00305       LM_DBG("[%d]:%d: exiting - wrong parameters\n", rank, _xj_pid);
00306       return -1;
00307    }
00308 
00309    pipe = jwl->workers[rank].rpipe;
00310    LM_DBG("[%d]:%d: started - pipe=<%d> : 1st message delay"
00311       " <%d>\n", rank, _xj_pid, pipe, jwl->delayt);
00312    if((jcp=xj_jcon_pool_init(jwl->maxj,XJ_MSG_POOL_SIZE,jwl->delayt))==NULL)
00313    {
00314       LM_DBG("cannot allocate the pool\n");
00315       return -1;
00316    }
00317 
00318    maxfd = pipe;
00319    tmv.tv_sec = jwl->sleept;
00320    tmv.tv_usec = 0;
00321 
00322    FD_ZERO(&set);
00323    FD_SET(pipe, &set);
00324    while(main_loop)
00325    {
00326       mset = set;
00327 
00328       tmv.tv_sec = (jcp->jmqueue.size == 0)?jwl->sleept:1;
00329 #ifdef XJ_EXTRA_DEBUG
00330       //LM_DBG("XJAB:xj_worker[%d]:%d: select waiting %ds - queue=%d\n",rank,
00331       //    _xj_pid, (int)tmv.tv_sec, jcp->jmqueue.size);
00332 #endif
00333       tmv.tv_usec = 0;
00334 
00335       ret = select(maxfd+1, &mset, NULL, NULL, &tmv);
00336       
00337       // check the msg queue
00338       xj_worker_check_qmsg(jwl, jcp);
00339       
00340       if(ret <= 0)
00341          goto step_x;
00342 
00343 #ifdef XJ_EXTRA_DEBUG
00344       LM_DBG("%d: something is coming\n", _xj_pid);
00345 #endif
00346       if(!FD_ISSET(pipe, &mset))
00347          goto step_y;
00348       
00349       if(read(pipe, &jsmsg, sizeof(jsmsg)) < (int)sizeof(jsmsg))
00350       {
00351          LM_DBG("%d: BROKEN PIPE - exiting\n", _xj_pid);
00352          break;
00353       }
00354 
00355 #ifdef XJ_EXTRA_DEBUG
00356       LM_DBG("%d: job <%p> from SER\n", _xj_pid, jsmsg);
00357 #endif
00358 
00359       if(jsmsg == NULL || jsmsg->jkey==NULL || jsmsg->jkey->id==NULL)
00360          goto step_w;
00361 
00362       strncpy(buff, jsmsg->jkey->id->s, jsmsg->jkey->id->len);
00363       buff[jsmsg->jkey->id->len] = 0;
00364 
00365       jbc = xj_jcon_pool_get(jcp, jsmsg->jkey);
00366       
00367       switch(jsmsg->type)
00368       {
00369          case XJ_SEND_MESSAGE:
00370             if(!xj_jconf_check_addr(&jsmsg->to, jwl->aliases->dlm) &&
00371             (!jbc||!xj_jcon_get_jconf(jbc,&jsmsg->to,jwl->aliases->dlm)))
00372             {
00373                xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to,
00374                   XJ_DMSG_ERR_NOTJCONF, NULL);
00375                goto step_w;
00376             }
00377             break;
00378          case XJ_REG_WATCHER:
00379          case XJ_JOIN_JCONF:
00380          case XJ_GO_ONLINE:
00381             break;
00382          case XJ_EXIT_JCONF:
00383             if(jbc == NULL)
00384                goto step_w;
00385             // close the conference session here
00386             if(jbc->nrjconf <= 0)
00387                goto step_w;
00388             if(!xj_jconf_check_addr(&jsmsg->to, jwl->aliases->dlm))
00389                xj_jcon_del_jconf(jbc, &jsmsg->to, jwl->aliases->dlm,
00390                   XJ_JCMD_UNSUBSCRIBE);
00391             xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to,
00392                XJ_DMSG_INF_JCONFEXIT, NULL);
00393             goto step_w;
00394          case XJ_GO_OFFLINE:
00395             if(jbc != NULL)
00396                jbc->expire = ltime = -1;
00397             goto step_w;
00398          case XJ_DEL_WATCHER:
00399          default:
00400             goto step_w;
00401       }
00402       
00403       if(jbc != NULL)
00404       {
00405 #ifdef XJ_EXTRA_DEBUG
00406          LM_DBG("%d: connection already exists"
00407             " for <%s> ...\n", _xj_pid, buff);
00408 #endif
00409          xj_jcon_update(jbc, jwl->cachet);
00410          goto step_z;
00411       }
00412       
00413       // NO OPEN CONNECTION FOR THIS SIP ID
00414 #ifdef XJ_EXTRA_DEBUG
00415       LM_DBG("%d: new connection for <%s>.\n", _xj_pid, buff);
00416 #endif      
00417       if(dbf->query(db_con, keys, 0, vals, col, 2, 2, NULL, &res) != 0 ||
00418          RES_ROW_N(res) <= 0)
00419       {
00420 #ifdef XJ_EXTRA_DEBUG
00421          LM_DBG("%d: no database result when looking"
00422             " for associated Jabber account\n", _xj_pid);
00423 #endif
00424          xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to, 
00425             XJ_DMSG_ERR_JGWFORB, NULL);
00426          
00427          goto step_v;
00428       }
00429       
00430       jbc = xj_jcon_init(jaddress, jport);
00431       
00432       if(xj_jcon_connect(jbc))
00433       {
00434          LM_DBG("%d: Cannot connect"
00435             " to the Jabber server ...\n", _xj_pid);
00436          xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to, 
00437             XJ_DMSG_ERR_NOJSRV, NULL);
00438 
00439          goto step_v;
00440       }
00441       
00442 #ifdef XJ_EXTRA_DEBUG
00443       LM_DBG("auth to jabber as: [%s] / [xxx]\n",
00444          (char*)(ROW_VALUES(RES_ROWS(res))[0].val.string_val));
00445 //       (char*)(ROW_VALUES(RES_ROWS(res))[1].val.string_val));
00446 #endif      
00447       if(xj_jcon_user_auth(jbc,
00448          (char*)(ROW_VALUES(RES_ROWS(res))[0].val.string_val),
00449          (char*)(ROW_VALUES(RES_ROWS(res))[1].val.string_val),
00450          XJAB_RESOURCE) < 0)
00451       {
00452          LM_DBG("athentication to the Jabber server failed ...\n");
00453          xj_jcon_disconnect(jbc);
00454          
00455          xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to, 
00456                XJ_DMSG_ERR_JAUTH, NULL);
00457          
00458          xj_jcon_free(jbc);
00459          goto step_v;
00460       }
00461       
00462       if(xj_jcon_set_attrs(jbc, jsmsg->jkey, jwl->cachet, jwl->delayt)
00463          || xj_jcon_pool_add(jcp, jbc))
00464       {
00465          LM_DBG("keeping connection to Jabber server"
00466             " failed! Not enough memory ...\n");
00467          xj_jcon_disconnect(jbc);
00468          xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to,  
00469                XJ_DMSG_ERR_JGWFULL, NULL);
00470          xj_jcon_free(jbc);
00471          goto step_v;
00472       }
00473                         
00474       /** add socket descriptor to select */
00475 #ifdef XJ_EXTRA_DEBUG
00476       LM_DBG("add connection on <%d> \n",jbc->sock);
00477 #endif
00478       if(jbc->sock > maxfd)
00479          maxfd = jbc->sock;
00480       FD_SET(jbc->sock, &set);
00481                               
00482       xj_jcon_get_roster(jbc);
00483       xj_jcon_send_presence(jbc, NULL, NULL, "Online", priority);
00484       
00485       /** wait for a while - the worker is tired */
00486       //sleep(3);
00487       
00488       if ((res != NULL) && (dbf->free_result(db_con,res) < 0))
00489       {
00490          LM_DBG("failed to free SQL result - worker terminated\n");
00491          return -1;
00492       }
00493       else
00494          res = NULL;
00495 
00496 step_z:
00497       if(jsmsg->type == XJ_GO_ONLINE)
00498          goto step_w;
00499       
00500       if(jsmsg->type == XJ_REG_WATCHER)
00501       { // update or register a presence watcher
00502          xj_worker_check_watcher(jwl, jcp, jbc, jsmsg);
00503          goto step_w;
00504       }
00505       
00506       flag = 0;
00507       if(!xj_jconf_check_addr(&jsmsg->to, jwl->aliases->dlm))
00508       {
00509          if((jcf = xj_jcon_get_jconf(jbc, &jsmsg->to, jwl->aliases->dlm))
00510                != NULL)
00511          {
00512             if((jsmsg->type == XJ_JOIN_JCONF) &&
00513                !(jcf->status & XJ_JCONF_READY || 
00514                   jcf->status & XJ_JCONF_WAITING))
00515             {
00516                if(!xj_jcon_jconf_presence(jbc,jcf,NULL,"online"))
00517                   jcf->status = XJ_JCONF_WAITING;
00518                else
00519                {
00520                   // unable to join the conference 
00521                   // --- send back to SIP user a msg
00522                   xj_send_sip_msgz(_PADDR(jwl),jsmsg->jkey->id,&jsmsg->to,
00523                      XJ_DMSG_ERR_JOINJCONF, &jbc->jkey->flag);
00524                   goto step_w;
00525                }
00526             }
00527             flag |= XJ_ADDRTR_CON;
00528          }
00529          else
00530          {
00531             // unable to get the conference 
00532             // --- send back to SIP user a msg
00533             xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to,
00534                   XJ_DMSG_ERR_NEWJCONF, &jbc->jkey->flag);
00535             goto step_w;
00536          }
00537       }
00538       if(jsmsg->type != XJ_SEND_MESSAGE)
00539          goto step_w;
00540       
00541       // here will come only XJ_SEND_MESSAGE
00542       switch(xj_jcon_is_ready(jbc,jsmsg->to.s,jsmsg->to.len,jwl->aliases->dlm))
00543       {
00544          case 0:
00545 #ifdef XJ_EXTRA_DEBUG
00546             LM_DBG("sending the message to Jabber network ...\n");
00547 #endif
00548             /*** address correction ***/
00549             sto.s = buff; 
00550             sto.len = 0;
00551             flag |= XJ_ADDRTR_S2J;
00552             if(xj_address_translation(&jsmsg->to, &sto, jwl->aliases, 
00553                      flag) == 0)
00554             {
00555                if(xj_jcon_send_msg(jbc, sto.s, sto.len,
00556                   jsmsg->msg.s, jsmsg->msg.len,
00557                   (flag&XJ_ADDRTR_CON)?XJ_JMSG_GROUPCHAT:XJ_JMSG_CHAT)<0)
00558                      
00559                   xj_send_sip_msgz(_PADDR(jwl),jsmsg->jkey->id,&jsmsg->to,
00560                      XJ_DMSG_ERR_SENDJMSG, &jbc->jkey->flag);
00561             }
00562             else
00563                LM_ERR("sending as Jabber message ...\n");
00564                   
00565             goto step_w;
00566       
00567          case 1:
00568 #ifdef XJ_EXTRA_DEBUG
00569             LM_DBG("scheduling the message.\n");
00570 #endif
00571             if(xj_jcon_pool_add_jmsg(jcp, jsmsg, jbc) < 0)
00572             {
00573                LM_DBG("scheduling the message FAILED."
00574                      "Message was dropped.\n");
00575                xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to,
00576                   XJ_DMSG_ERR_STOREJMSG, &jbc->jkey->flag);
00577                goto step_w;
00578             }
00579             else // skip freeing the SIP message - now is in queue
00580                goto step_y;
00581    
00582          case 2:
00583             xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to,
00584                   XJ_DMSG_ERR_NOREGIM, &jbc->jkey->flag);
00585             goto step_w;
00586          case 3: // not joined to Jabber conference
00587             xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to,
00588                   XJ_DMSG_ERR_NOTJCONF, &jbc->jkey->flag);
00589             goto step_w;
00590             
00591          default:
00592             xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to,
00593                   XJ_DMSG_ERR_SENDJMSG, &jbc->jkey->flag);
00594             goto step_w;
00595       }
00596 
00597 step_v: // error connecting to Jabber server
00598       
00599       // cleaning jab_wlist
00600       xj_wlist_del(jwl, jsmsg->jkey, _xj_pid);
00601 
00602       // cleaning db_query
00603       if ((res != NULL) && (dbf->free_result(db_con,res) < 0))
00604       {
00605          LM_DBG("failed to free the SQL result - worker terminated\n");
00606          return -1;
00607       }
00608       else
00609          res = NULL;
00610 
00611 step_w:
00612       if(jsmsg!=NULL)
00613       {
00614          xj_sipmsg_free(jsmsg);
00615          jsmsg = NULL;
00616       }        
00617 
00618 step_y:         
00619       // check for new message from ... JABBER
00620       for(i = 0; i < jcp->len && main_loop; i++)
00621       {
00622          if(jcp->ojc[i] == NULL)
00623             continue;
00624 #ifdef XJ_EXTRA_DEBUG
00625          LM_DBG("checking socket <%d> ...\n", jcp->ojc[i]->sock);
00626 #endif
00627          if(!FD_ISSET(jcp->ojc[i]->sock, &mset))
00628             continue;
00629          pos = nr = 0;
00630          do
00631          {
00632             p = recv_buff;
00633             if(pos != 0)
00634             {
00635                while(pos < nr)
00636                {
00637                   *p = recv_buff[pos];
00638                   pos++;
00639                   p++;
00640                }
00641                *p = 0;
00642                /**
00643                 * flush out the socket - set it to nonblocking 
00644                 */
00645                flags = fcntl(jcp->ojc[i]->sock, F_GETFL, 0);
00646                if(flags!=-1 && !(flags & O_NONBLOCK))
00647                   {
00648                   /* set NONBLOCK bit to enable non-blocking */
00649                   fcntl(jcp->ojc[i]->sock, F_SETFL, flags|O_NONBLOCK);
00650                   }
00651             }
00652             
00653             if((nr = read(jcp->ojc[i]->sock, p, 
00654                   sizeof(recv_buff)-(p-recv_buff))) == 0
00655                ||(nr < 0 && errno != EAGAIN))
00656             {
00657                LM_DBG("connection to jabber lost on socket <%d> ...\n",
00658                   jcp->ojc[i]->sock);
00659                xj_send_sip_msgz(_PADDR(jwl), jcp->ojc[i]->jkey->id,
00660                   &jab_gw_name,XJ_DMSG_ERR_DISCONNECTED,&jbc->jkey->flag);
00661                // make sure that will ckeck expired connections
00662                ltime = jcp->ojc[i]->expire = -1;
00663                FD_CLR(jcp->ojc[i]->sock, &set);
00664                goto step_xx;
00665             }
00666 #ifdef XJ_EXTRA_DEBUG
00667             LM_DBG("received: %dbytes Err:%d/EA:%d\n", nr, errno, EAGAIN);
00668 #endif
00669             xj_jcon_update(jcp->ojc[i], jwl->cachet);
00670 
00671             if(nr>0)
00672                p[nr] = 0;
00673             nr = strlen(recv_buff);
00674             pos = 0;
00675 #ifdef XJ_EXTRA_DEBUG
00676             LM_DBG("JMSG START ----------\n%.*s\n"
00677                " JABBER: JMSGL:%d END ----------\n", nr, recv_buff, nr);
00678 #endif
00679          } while(xj_manage_jab(recv_buff, nr, &pos, jwl->aliases,
00680                      jcp->ojc[i]) == 9 && main_loop);
00681    
00682          /**
00683           * flush out the socket - set it back to blocking 
00684           */
00685          flags = fcntl(jcp->ojc[i]->sock, F_GETFL, 0);
00686          if(flags!=-1 && (flags & O_NONBLOCK))
00687             {
00688             /* reset NONBLOCK bit to enable blocking */
00689             fcntl(jcp->ojc[i]->sock, F_SETFL, flags & ~O_NONBLOCK);
00690             }
00691 #ifdef XJ_EXTRA_DEBUG
00692          LM_DBG("msgs from socket <%d> parsed ...\n", jcp->ojc[i]->sock);  
00693 #endif
00694       } // end FOR(i = 0; i < jcp->len; i++)
00695 
00696 step_x:
00697       if(ret < 0)
00698       {
00699          LM_DBG("signal received!!!!!!!!\n");
00700          maxfd = pipe;
00701          FD_ZERO(&set);
00702          FD_SET(pipe, &set);
00703          for(i = 0; i < jcp->len; i++)
00704          {
00705             if(jcp->ojc[i] != NULL)
00706             {
00707                FD_SET(jcp->ojc[i]->sock, &set);
00708                if( jcp->ojc[i]->sock > maxfd )
00709                   maxfd = jcp->ojc[i]->sock;
00710             }
00711          }
00712       }
00713 step_xx:
00714       if(ltime < 0 || ltime + jwl->sleept <= get_ticks())
00715       {
00716          ltime = get_ticks();
00717 #ifdef XJ_EXTRA_DEBUG
00718          //LM_DBG("XJAB:xj_worker:%d: scanning for expired connection\n",
00719          // _xj_pid);
00720 #endif
00721          xj_worker_check_jcons(jwl, jcp, ltime, &set);
00722       }
00723    } // END while
00724 
00725    LM_DBG("cleaning procedure\n");
00726 
00727    return 0;
00728 } // end xj_worker_process
00729 
00730 
00731 /**
00732  * parse incoming message from Jabber server
00733  */
00734 int xj_manage_jab(char *buf, int len, int *pos, xj_jalias als, xj_jcon jbc)
00735 {
00736    int j, err=0;
00737    char *p, *to, *from, *msg, *type, *emsg, *ecode, lbuf[4096], fbuf[128];
00738    xj_jconf jcf = NULL;
00739    str ts, tf;
00740    xode x, y, z;
00741    str *sid;
00742    xj_pres_cell prc = NULL;
00743 
00744    if(!jbc)
00745       return -1;
00746 
00747    sid = jbc->jkey->id; 
00748    x = xode_from_strx(buf, len, &err, &j);
00749 #ifdef XJ_EXTRA_DEBUG
00750    LM_DBG("xode ret:%d pos:%d\n", err, j);
00751 #endif   
00752    if(err && pos != NULL)
00753       *pos= j;
00754    if(x == NULL)
00755       return -1;
00756    lbuf[0] = 0;
00757    ecode = NULL;
00758 
00759 /******************** XMPP 'MESSAGE' HANDLING **********************/
00760    
00761    if(!strncasecmp(xode_get_name(x), "message", 7))
00762    {
00763 #ifdef XJ_EXTRA_DEBUG
00764       LM_DBG("jabber [message] received\n");
00765 #endif
00766       if((to = xode_get_attrib(x, "to")) == NULL)
00767       {
00768 #ifdef XJ_EXTRA_DEBUG
00769          LM_DBG("missing 'to' attribute\n");
00770 #endif
00771          err = -1;
00772          goto ready;
00773       }
00774       if((from = xode_get_attrib(x, "from")) == NULL)
00775       {
00776 #ifdef XJ_EXTRA_DEBUG
00777          LM_DBG("missing 'from' attribute\n");
00778 #endif
00779          err = -1;
00780          goto ready;
00781       }
00782       if((y = xode_get_tag(x, "body")) == NULL
00783             || (msg = xode_get_data(y)) == NULL)
00784       {
00785 #ifdef XJ_EXTRA_DEBUG
00786          LM_DBG("missing 'body' of message\n");
00787 #endif
00788          err = -1;
00789          goto ready;
00790       }
00791       type = xode_get_attrib(x, "type");
00792       if(type != NULL && !strncasecmp(type, "error", 5))
00793       {
00794          if((y = xode_get_tag(x, "error")) == NULL
00795                || (emsg = xode_get_data(y)) == NULL)
00796             strcpy(lbuf, "{Error sending following message} - ");
00797          else
00798          {
00799             ecode = xode_get_attrib(y, "code");
00800             strcpy(lbuf, "{Error (");
00801             if(ecode != NULL)
00802             {
00803                strcat(lbuf, ecode);
00804                strcat(lbuf, " - ");
00805             }
00806             strcat(lbuf, emsg);
00807             strcat(lbuf, ") when trying to send following message}");
00808          }
00809 
00810       }
00811 
00812       // is from a conference?!?!
00813       if((jcf=xj_jcon_check_jconf(jbc, from))!=NULL)
00814       {
00815          if(lbuf[0] == 0)
00816          {
00817             p = from + strlen(from);
00818             while(p>from && *p != '/')
00819                p--;
00820             if(*p == '/')
00821             {
00822                if(jcf->nick.len>0 
00823                   && strlen(p+1) == jcf->nick.len
00824                   && !strncasecmp(p+1, jcf->nick.s, jcf->nick.len))
00825                {
00826 #ifdef XJ_EXTRA_DEBUG
00827                   LM_DBG("message sent by myself\n");
00828 #endif
00829                   goto ready;
00830                }
00831                lbuf[0] = '[';
00832                lbuf[1] = 0;
00833                strcat(lbuf, p+1);
00834                strcat(lbuf, "] ");
00835             }
00836          }
00837          else
00838          {
00839             jcf->status = XJ_JCONF_NULL;
00840             xj_jcon_jconf_presence(jbc,jcf,NULL,"online");
00841          }
00842          strcat(lbuf, msg);
00843          ts.s = lbuf;
00844          ts.len = strlen(lbuf);
00845    
00846          if(xj_send_sip_msg(als->proxy, sid, &jcf->uri, &ts,
00847                   &jbc->jkey->flag)<0)
00848             LM_ERR("sip message was not sent!\n");
00849 #ifdef XJ_EXTRA_DEBUG
00850          else
00851             LM_DBG("sip message was sent!\n");
00852 #endif
00853          goto ready;
00854       }
00855 
00856       strcat(lbuf, msg);
00857       ts.s = from;
00858       ts.len = strlen(from);
00859       tf.s = fbuf;
00860       tf.len = 0;
00861       if(xj_address_translation(&ts, &tf, als, XJ_ADDRTR_J2S) == 0)
00862       {
00863          ts.s = lbuf;
00864          ts.len = strlen(lbuf);
00865    
00866          if(xj_send_sip_msg(als->proxy, sid, &tf, &ts, &jbc->jkey->flag)<0)
00867             LM_ERR("sip message was not sent!\n");
00868 #ifdef XJ_EXTRA_DEBUG
00869          else
00870             LM_DBG("sip message was sent!\n");
00871 #endif
00872       }
00873       goto ready;
00874    }
00875 /*------------------- END 'MESSAGE' HANDLING ----------------------*/
00876    
00877 /******************** XMPP 'PRESENCE' HANDLING *********************/
00878    if(!strncasecmp(xode_get_name(x), "presence", 8))
00879    {
00880 #ifdef XJ_EXTRA_DEBUG
00881       LM_DBG("jabber [presence] received\n");
00882 #endif
00883       type = xode_get_attrib(x, "type");
00884       from = xode_get_attrib(x, "from");
00885       if(from == NULL)
00886          goto ready;
00887       ts.s = from;
00888       p = from;
00889       while(p<from + strlen(from) && *p != '/')
00890                p++;
00891       if(*p == '/')
00892          ts.len = p - from;
00893       else
00894          ts.len = strlen(from);
00895 
00896       if(type == NULL || !strncasecmp(type, "online", 6)
00897          || !strncasecmp(type, "available", 9))
00898       {
00899          if(strchr(from, '@') == NULL)
00900          {
00901             if(!strncasecmp(from, XJ_AIM_NAME, XJ_AIM_LEN))
00902             {
00903                jbc->ready |= XJ_NET_AIM;
00904 #ifdef XJ_EXTRA_DEBUG
00905                LM_DBG("AIM network ready\n");
00906 #endif
00907             }
00908             else if(!strncasecmp(from, XJ_ICQ_NAME, XJ_ICQ_LEN))
00909             {
00910                jbc->ready |= XJ_NET_ICQ;
00911 #ifdef XJ_EXTRA_DEBUG
00912                LM_DBG("ICQ network ready\n");
00913 #endif
00914             }
00915             else if(!strncasecmp(from, XJ_MSN_NAME, XJ_MSN_LEN))
00916             {
00917                jbc->ready |= XJ_NET_MSN;
00918 #ifdef XJ_EXTRA_DEBUG
00919                LM_DBG("MSN network ready\n");
00920 #endif
00921             }
00922             else if(!strncasecmp(from, XJ_YAH_NAME, XJ_YAH_LEN))
00923             {
00924                jbc->ready |= XJ_NET_YAH;
00925 #ifdef XJ_EXTRA_DEBUG
00926                LM_DBG("YAHOO network ready\n");
00927 #endif
00928             }
00929          }
00930          else if((jcf=xj_jcon_check_jconf(jbc, from))!=NULL)
00931          {
00932             jcf->status = XJ_JCONF_READY;
00933 #ifdef XJ_EXTRA_DEBUG
00934             LM_DBG(" %s conference ready\n", from);
00935 #endif
00936          }
00937          else
00938          {
00939 #ifdef XJ_EXTRA_DEBUG
00940             LM_DBG("user <%.*s> is online\n",ts.len,ts.s);
00941 #endif
00942             prc = xj_pres_list_check(jbc->plist, &ts);
00943             if(prc)
00944             {
00945                if(prc->state != XJ_PS_ONLINE)
00946                {
00947                   prc->state = XJ_PS_ONLINE;
00948                   goto call_pa_cbf;
00949                }
00950             }
00951             else
00952             {
00953 #ifdef XJ_EXTRA_DEBUG
00954                LM_DBG("user state received - creating"
00955                   " presence cell for [%.*s]\n", ts.len, ts.s);
00956 #endif
00957                prc = xj_pres_cell_new();
00958                if(prc == NULL)
00959                {
00960                   LM_DBG("cannot create presence"
00961                      " cell for [%s]\n", from);
00962                   goto ready;
00963                }
00964                if(xj_pres_cell_init(prc, &ts, NULL, NULL)<0)
00965                {
00966                   LM_DBG("cannot init presence"
00967                      " cell for [%s]\n", from);
00968                   xj_pres_cell_free(prc);
00969                   goto ready;
00970                }
00971                prc = xj_pres_list_add(jbc->plist, prc);
00972                if(prc)
00973                {
00974                   prc->state = XJ_PS_ONLINE;
00975                   goto call_pa_cbf;
00976                }
00977             }
00978          }
00979          goto ready;
00980       }
00981       
00982       if(strchr(from, '@') == NULL)
00983          goto ready;
00984    
00985       
00986       if(!strncasecmp(type, "error", 5))
00987       {
00988          if((jcf=xj_jcon_check_jconf(jbc, from))!=NULL)
00989          {
00990             tf.s = from;
00991             tf.len = strlen(from);
00992             if((y = xode_get_tag(x, "error")) == NULL)
00993                goto ready;
00994             if ((p = xode_get_attrib(y, "code")) != NULL
00995                   && atoi(p) == 409)
00996             {
00997                xj_send_sip_msgz(als->proxy, sid, &tf,
00998                      XJ_DMSG_ERR_JCONFNICK, &jbc->jkey->flag);
00999                goto ready;
01000             }
01001             xj_send_sip_msgz(als->proxy,sid,&tf,XJ_DMSG_ERR_JCONFREFUSED,
01002                   &jbc->jkey->flag);
01003          }
01004          goto ready;
01005       }
01006       if(type!=NULL && !strncasecmp(type, "subscribe", 9))
01007       {
01008          xj_jcon_send_presence(jbc, from, "subscribed", NULL, NULL);
01009          goto ready;
01010       }
01011 
01012       prc = xj_pres_list_check(jbc->plist, &ts);
01013       if(!prc)
01014          goto ready;
01015 
01016       if(!strncasecmp(type, "unavailable", 11))
01017       {
01018 #ifdef XJ_EXTRA_DEBUG
01019          LM_DBG("user <%s> is offline\n", from);
01020 #endif
01021          if(prc->state != XJ_PS_OFFLINE)
01022          {
01023             prc->state = XJ_PS_OFFLINE;
01024             goto call_pa_cbf;
01025          }
01026          goto ready;
01027       }
01028       
01029       if(!strncasecmp(type, "unsubscribed", 12))
01030       {
01031 #ifdef XJ_EXTRA_DEBUG
01032          LM_DBG("user <%s> does not allow to see his"
01033             " presence status\n", from);
01034 #endif
01035          if(prc->state != XJ_PS_REFUSED)
01036          {
01037             prc->state = XJ_PS_REFUSED;
01038             goto call_pa_cbf;
01039          }
01040       }
01041    
01042       // ignoring unknown types
01043       goto ready;
01044    }
01045 /*------------------- END XMPP 'PRESENCE' HANDLING ----------------*/
01046    
01047 /******************** XMPP 'IQ' HANDLING ***************************/
01048    if(!strncasecmp(xode_get_name(x), "iq", 2))
01049    {
01050 #ifdef XJ_EXTRA_DEBUG
01051       LM_DBG("jabber [iq] received\n");
01052 #endif
01053       if(!strncasecmp(xode_get_attrib(x, "type"), "result", 6))
01054       {
01055          if((y = xode_get_tag(x, "query?xmlns=jabber:iq:roster")) == NULL)
01056             goto ready;
01057          z = xode_get_firstchild(y);
01058          while(z)
01059          {
01060             if(!strncasecmp(xode_get_name(z), "item", 5)
01061                && (from = xode_get_attrib(z, "jid")) != NULL)
01062             {
01063                if(strchr(from, '@') == NULL)
01064                { // transports
01065                   if(!strncasecmp(from, XJ_AIM_NAME, XJ_AIM_LEN))
01066                   {
01067                      jbc->allowed |= XJ_NET_AIM;
01068 #ifdef XJ_EXTRA_DEBUG
01069                      LM_DBG("AIM network available\n");
01070 #endif
01071                   }
01072                   else if(!strncasecmp(from, XJ_ICQ_NAME, XJ_ICQ_LEN))
01073                   {
01074                      jbc->allowed |= XJ_NET_ICQ;
01075 #ifdef XJ_EXTRA_DEBUG
01076                      LM_DBG("ICQ network available\n");
01077 #endif
01078                   }
01079                   else if(!strncasecmp(from, XJ_MSN_NAME, XJ_MSN_LEN))
01080                   {
01081                      jbc->allowed |= XJ_NET_MSN;
01082 #ifdef XJ_EXTRA_DEBUG
01083                      LM_DBG("MSN network available\n");
01084 #endif
01085                   }
01086                   else if(!strncasecmp(from, XJ_YAH_NAME, XJ_YAH_LEN))
01087                   {
01088                      jbc->allowed |= XJ_NET_YAH;
01089 #ifdef XJ_EXTRA_DEBUG
01090                      LM_DBG("YAHOO network available\n");
01091 #endif
01092                   }
01093                   goto next_sibling;
01094                }
01095             }
01096 next_sibling:
01097             z = xode_get_nextsibling(z);
01098          }
01099       }
01100       
01101       goto ready;
01102    }
01103 /*------------------- END XMPP 'IQ' HANDLING ----------------------*/
01104 
01105 call_pa_cbf:
01106    if(prc && prc->cbf)
01107    {
01108       // call the PA callback function
01109       tf.s = fbuf;
01110       tf.len = 0;
01111       if(xj_address_translation(&ts,&tf,als,XJ_ADDRTR_J2S)==0)
01112       {
01113 #ifdef XJ_EXTRA_DEBUG
01114          LM_DBG("calling CBF(%.*s,%d)\n", tf.len, tf.s, prc->state);
01115 #endif
01116          (*(prc->cbf))(&tf, &tf, prc->state, prc->cbp);
01117       }
01118    }
01119 ready:
01120    xode_free(x);
01121    return err;
01122 }
01123 
01124 /**
01125  *
01126  */
01127 void xj_sig_handler(int s) 
01128 {
01129    //signal(SIGTERM, xj_sig_handler);
01130    //signal(SIGINT, xj_sig_handler);
01131    //signal(SIGQUIT, xj_sig_handler);
01132    signal(SIGSEGV, xj_sig_handler);
01133    main_loop = 0;
01134    LM_DBG("%d: SIGNAL received=%d\n **************", _xj_pid, s);
01135 }
01136 
01137 /*****************************     ****************************************/
01138 
01139 /**
01140  * send a SIP MESSAGE message
01141  * - to : destination
01142  * - from : origin
01143  * - contact : contact header
01144  * - msg : body of the message
01145  * return : 0 on success or <0 on error
01146  */
01147 int xj_send_sip_msg(str *proxy, str *to, str *from, str *msg, int *cbp)
01148 {
01149    str  msg_type = { "MESSAGE", 7};
01150    char buf[512];
01151    str  tfrom;
01152    str  str_hdr;
01153    char buf1[1024];
01154 
01155    if( !to || !to->s || to->len <= 0 
01156          || !from || !from->s || from->len <= 0 
01157          || !msg || !msg->s || msg->len <= 0
01158          || (cbp && *cbp!=0) )
01159       return -1;
01160 
01161    // from correction
01162    tfrom.len = 0;
01163    strncpy(buf+tfrom.len, "<sip:", 5);
01164    tfrom.len += 5;
01165    strncpy(buf+tfrom.len, from->s, from->len);
01166    tfrom.len += from->len;
01167    buf[tfrom.len++] = '>';
01168       
01169    tfrom.s = buf;
01170    
01171    // building Contact and Content-Type
01172    strcpy(buf1,"Content-Type: text/plain"CRLF"Contact: ");
01173    str_hdr.len = 24 + CRLF_LEN + 9;
01174    
01175    strncat(buf1,tfrom.s,tfrom.len);
01176    str_hdr.len += tfrom.len;
01177    
01178    strcat(buf1, CRLF);
01179    str_hdr.len += CRLF_LEN;
01180    str_hdr.s = buf1;
01181    if(cbp)
01182    {
01183 #ifdef XJ_EXTRA_DEBUG
01184       LM_DBG("uac callback parameter [%p==%d]\n", cbp, *cbp);
01185 #endif
01186       return tmb.t_request(&msg_type, 0, to, &tfrom, &str_hdr, msg, 
01187                   0, xj_tuac_callback, (void*)cbp);
01188    }
01189    else
01190       return tmb.t_request(&msg_type, 0, to, &tfrom, &str_hdr, msg, 0, 0, 0);
01191 }
01192 
01193 /**
01194  * send a SIP MESSAGE message
01195  * - to : destination
01196  * - from : origin
01197  * - contact : contact header
01198  * - msg : body of the message, string terminated by zero
01199  * return : 0 on success or <0 on error
01200  */
01201 int xj_send_sip_msgz(str *proxy, str *to, str *from, char *msg, int *cbp)
01202 {
01203    str tstr;
01204    int n;
01205 
01206    if(!to || !from || !msg || (cbp && *cbp!=0))
01207       return -1;
01208 
01209    tstr.s = msg;
01210    tstr.len = strlen(msg);
01211    if((n = xj_send_sip_msg(proxy, to, from, &tstr, cbp)) < 0)
01212       LM_ERR("sip message wasn't sent to [%.*s]...\n", to->len, to->s);
01213 #ifdef XJ_EXTRA_DEBUG
01214    else
01215       LM_DBG("sip message was sent to [%.*s]...\n", to->len, to->s);
01216 #endif
01217    return n;
01218 }
01219 
01220 /**
01221  * send disconnected info to all SIP users associated with worker idx
01222  * and clean the entries from wlist
01223  */
01224 int xj_wlist_clean_jobs(xj_wlist jwl, int idx, int fl)
01225 {
01226    xj_jkey p;
01227    if(jwl==NULL || idx < 0 || idx >= jwl->len || !jwl->workers[idx].sip_ids)
01228       return -1;
01229    lock_set_get(jwl->sems, idx);
01230    while((p=(xj_jkey)delpos234(jwl->workers[idx].sip_ids, 0))!=NULL)
01231    {
01232       if(fl)
01233       {
01234 #ifdef XJ_EXTRA_DEBUG
01235          LM_DBG("sending disconnect message"
01236             " to <%.*s>\n",   p->id->len, p->id->s);
01237 #endif
01238          xj_send_sip_msgz(_PADDR(jwl), p->id, &jab_gw_name,
01239                XJ_DMSG_INF_DISCONNECTED, NULL);
01240       }
01241       jwl->workers[idx].nr--;
01242       xj_jkey_free_p(p);
01243    }
01244    lock_set_release(jwl->sems, idx);
01245    return 0;
01246 }
01247 
01248 
01249 /**
01250  * callback function for TM
01251  */
01252 void xj_tuac_callback( struct cell *t, int type, struct tmcb_params *ps)
01253 {
01254 #ifdef XJ_EXTRA_DEBUG
01255    LM_DBG("completed with status %d\n", ps->code);
01256 #endif
01257    if(!ps->param)
01258    {
01259       LM_DBG("parameter not received\n");
01260       return;
01261    }
01262 #ifdef XJ_EXTRA_DEBUG
01263    LM_DBG("parameter [%p : ex-value=%d]\n", ps->param,*((int*)ps->param) );
01264 #endif
01265    if(ps->code < 200 || ps->code >= 300)
01266    {
01267 #ifdef XJ_EXTRA_DEBUG
01268       LM_DBG("no 2XX return code - connection set as expired \n");
01269 #endif
01270       *((int*)ps->param) = XJ_FLAG_CLOSE;
01271    }
01272 }
01273 
01274 /**
01275  * check for expired connections
01276  */
01277 void xj_worker_check_jcons(xj_wlist jwl, xj_jcon_pool jcp, int ltime, fd_set *pset)
01278 {
01279    int i;
01280    xj_jconf jcf;
01281    
01282    for(i = 0; i < jcp->len && main_loop; i++)
01283    {
01284       if(jcp->ojc[i] == NULL)
01285          continue;
01286       if(jcp->ojc[i]->jkey->flag==XJ_FLAG_OPEN &&
01287          jcp->ojc[i]->expire > ltime)
01288          continue;
01289          
01290 #ifdef XJ_EXTRA_DEBUG
01291       LM_DBG("connection expired for <%.*s> \n",
01292          jcp->ojc[i]->jkey->id->len, jcp->ojc[i]->jkey->id->s);
01293 #endif
01294       xj_send_sip_msgz(_PADDR(jwl), jcp->ojc[i]->jkey->id, &jab_gw_name,
01295             XJ_DMSG_INF_JOFFLINE, NULL);
01296 #ifdef XJ_EXTRA_DEBUG
01297       LM_DBG("connection's close flag =%d\n",
01298          jcp->ojc[i]->jkey->flag);
01299 #endif
01300       // CLEAN JAB_WLIST
01301       xj_wlist_del(jwl, jcp->ojc[i]->jkey, _xj_pid);
01302 
01303       // looking for open conference rooms
01304 #ifdef XJ_EXTRA_DEBUG
01305       LM_DBG("having %d open conferences\n", 
01306             jcp->ojc[i]->nrjconf);
01307 #endif
01308       while(jcp->ojc[i]->nrjconf > 0)
01309       {
01310          if((jcf=delpos234(jcp->ojc[i]->jconf,0))!=NULL)
01311          {
01312             // get out of room
01313             xj_jcon_jconf_presence(jcp->ojc[i],jcf, "unavailable", NULL);
01314             xj_jconf_free(jcf);
01315          }
01316          jcp->ojc[i]->nrjconf--;
01317       }
01318 
01319       // send offline presence to all subscribers
01320       if(jcp->ojc[i]->plist)
01321       {
01322 #ifdef XJ_EXTRA_DEBUG
01323          LM_DBG("sending 'terminated' status to SIP subscriber\n");
01324 #endif
01325          xj_pres_list_notifyall(jcp->ojc[i]->plist,
01326                XJ_PS_TERMINATED);
01327       }
01328       FD_CLR(jcp->ojc[i]->sock, pset);
01329       xj_jcon_disconnect(jcp->ojc[i]);
01330       xj_jcon_free(jcp->ojc[i]);
01331       jcp->ojc[i] = NULL;
01332    }
01333 }
01334 
01335 /**
01336  * check if there are msg to send or delete from queue
01337  */
01338 void xj_worker_check_qmsg(xj_wlist jwl, xj_jcon_pool jcp)
01339 {
01340    int i, flag;
01341    str sto;
01342    char buff[1024];
01343 
01344    if(!jwl || !jcp)
01345       return;
01346 
01347    /** check the msg queue AND if the target connection is ready */
01348    for(i = 0; i<jcp->jmqueue.size && main_loop; i++)
01349    {
01350       if(jcp->jmqueue.jsm[i]==NULL || jcp->jmqueue.ojc[i]==NULL)
01351       {
01352          if(jcp->jmqueue.jsm[i]!=NULL)
01353          {
01354             xj_sipmsg_free(jcp->jmqueue.jsm[i]);
01355             jcp->jmqueue.jsm[i] = NULL;
01356             xj_jcon_pool_del_jmsg(jcp, i);
01357          }
01358          if(jcp->jmqueue.ojc[i]!=NULL)
01359             xj_jcon_pool_del_jmsg(jcp, i);
01360          continue;
01361       }
01362       if(jcp->jmqueue.expire[i] < get_ticks())
01363       {
01364 #ifdef XJ_EXTRA_DEBUG
01365          LM_DBG("message to %.*s is expired\n",
01366             jcp->jmqueue.jsm[i]->to.len, 
01367             jcp->jmqueue.jsm[i]->to.s);
01368 #endif
01369          xj_send_sip_msgz(_PADDR(jwl), jcp->jmqueue.jsm[i]->jkey->id, 
01370                &jcp->jmqueue.jsm[i]->to, XJ_DMSG_ERR_SENDIM,
01371                &jcp->jmqueue.ojc[i]->jkey->flag);
01372          if(jcp->jmqueue.jsm[i]!=NULL)
01373          {
01374             xj_sipmsg_free(jcp->jmqueue.jsm[i]);
01375             jcp->jmqueue.jsm[i] = NULL;
01376          }
01377          /** delete message from queue */
01378          xj_jcon_pool_del_jmsg(jcp, i);
01379          continue;
01380       }
01381 #ifdef XJ_EXTRA_DEBUG
01382       LM_DBG("%d: QUEUE: message[%d] from [%.*s]"
01383             "/to [%.*s]/body[%.*s] expires at %d\n",
01384             get_ticks(), i, 
01385             jcp->jmqueue.jsm[i]->jkey->id->len,
01386             jcp->jmqueue.jsm[i]->jkey->id->s,
01387             jcp->jmqueue.jsm[i]->to.len,jcp->jmqueue.jsm[i]->to.s,
01388             jcp->jmqueue.jsm[i]->msg.len,jcp->jmqueue.jsm[i]->msg.s,
01389             jcp->jmqueue.expire[i]);
01390 #endif
01391       if(xj_jcon_is_ready(jcp->jmqueue.ojc[i], jcp->jmqueue.jsm[i]->to.s,
01392             jcp->jmqueue.jsm[i]->to.len, jwl->aliases->dlm))
01393          continue;
01394       
01395       /*** address correction ***/
01396       flag = XJ_ADDRTR_S2J;
01397       if(!xj_jconf_check_addr(&jcp->jmqueue.jsm[i]->to,jwl->aliases->dlm))
01398       flag |= XJ_ADDRTR_CON;
01399       
01400       sto.s = buff; 
01401       sto.len = 0;
01402       if(xj_address_translation(&jcp->jmqueue.jsm[i]->to,
01403          &sto, jwl->aliases, flag) == 0)
01404       {
01405          /** send message from queue */
01406 #ifdef XJ_EXTRA_DEBUG
01407          LM_DBG("sending the message from"
01408             " local queue to Jabber network ...\n");
01409 #endif
01410          xj_jcon_send_msg(jcp->jmqueue.ojc[i],
01411             sto.s, sto.len,
01412             jcp->jmqueue.jsm[i]->msg.s,
01413             jcp->jmqueue.jsm[i]->msg.len,
01414             (flag&XJ_ADDRTR_CON)?XJ_JMSG_GROUPCHAT:XJ_JMSG_CHAT);
01415       }
01416       else
01417          LM_ERR("sending the message from"
01418             " local queue to Jabber network ...\n");
01419       
01420       if(jcp->jmqueue.jsm[i]!=NULL)
01421       {
01422          xj_sipmsg_free(jcp->jmqueue.jsm[i]);
01423          jcp->jmqueue.jsm[i] = NULL;
01424       }
01425       /** delete message from queue */
01426       xj_jcon_pool_del_jmsg(jcp, i);
01427    }
01428 }
01429 
01430 
01431 /**
01432  * update or register a presence watcher
01433  */
01434 void xj_worker_check_watcher(xj_wlist jwl, xj_jcon_pool jcp,
01435             xj_jcon jbc, xj_sipmsg jsmsg)
01436 {
01437    str sto;
01438    char buff[1024];
01439    xj_pres_cell prc = NULL;
01440 
01441    if(!jwl || !jcp || !jbc || !jsmsg)
01442       return;
01443 
01444    if(!jsmsg->cbf)
01445    {
01446 #ifdef XJ_EXTRA_DEBUG
01447       LM_DBG("null PA callback function\n");
01448 #endif
01449       return;
01450    }
01451 
01452    if(!xj_jconf_check_addr(&jsmsg->to, jwl->aliases->dlm))
01453    { // is for a conference - ignore?!?!
01454 #ifdef XJ_EXTRA_DEBUG
01455       LM_DBG("presence request for a conference.\n");
01456 #endif
01457       // set as offline
01458       (*(jsmsg->cbf))(&jsmsg->to, &jsmsg->to, XJ_PS_OFFLINE, jsmsg->p);
01459       return;
01460    }
01461          
01462    sto.s = buff; 
01463    sto.len = 0;
01464 
01465    if(xj_address_translation(&jsmsg->to, &sto, jwl->aliases, 
01466       XJ_ADDRTR_S2J) == 0)
01467    {
01468       prc = xj_pres_list_check(jbc->plist, &sto);
01469       if(!prc)
01470       {
01471 #ifdef XJ_EXTRA_DEBUG
01472          LM_DBG("new presence cell for %.*s.\n", sto.len, sto.s);
01473 #endif
01474          prc = xj_pres_cell_new();
01475          if(!prc)
01476          {
01477             LM_DBG("cannot create a presence cell for %.*s.\n",sto.len, sto.s);
01478             return;
01479          }
01480          if(xj_pres_cell_init(prc, &sto, jsmsg->cbf, jsmsg->p)<0)
01481          {
01482             LM_DBG("cannot init the presence"
01483                " cell for %.*s.\n", sto.len, sto.s);
01484             xj_pres_cell_free(prc);
01485             return;
01486          }
01487          if((prc = xj_pres_list_add(jbc->plist, prc))==NULL)
01488          {
01489             LM_DBG("cannot add the presence"
01490                " cell for %.*s.\n", sto.len, sto.s);
01491             return;
01492          }
01493          sto.s[sto.len] = 0;
01494          if(!xj_jcon_send_subscribe(jbc, sto.s, NULL, "subscribe"))
01495             prc->status = XJ_PRES_STATUS_WAIT; 
01496       }
01497       else
01498       {
01499          xj_pres_cell_update(prc, jsmsg->cbf, jsmsg->p);
01500 #ifdef XJ_EXTRA_DEBUG
01501          LM_DBG("calling CBF(%.*s,%d)\n",
01502             jsmsg->to.len, jsmsg->to.s, prc->state);
01503 #endif
01504          // send presence info to SIP subscriber
01505          (*(prc->cbf))(&jsmsg->to, &jsmsg->to, prc->state, prc->cbp);
01506       }
01507    }
01508 }
01509 
01510 /*****************************     ****************************************/
01511 

Generated on Fri May 25 00:00:35 2012 for Kamailio - The Open Source SIP Server by  doxygen 1.5.6