xjab_wlist.c

Go to the documentation of this file.
00001 /*
00002  * $Id: xjab_wlist.c 4518 2008-07-28 15:39:28Z henningw $
00003  *
00004  * eXtended JABber module - worker implementation
00005  *
00006  * Copyright (C) 2001-2003 FhG Fokus
00007  *
00008  * This file is part of Kamailio, a free SIP server.
00009  *
00010  * Kamailio is free software; you can redistribute it and/or modify
00011  * it under the terms of the GNU General Public License as published by
00012  * the Free Software Foundation; either version 2 of the License, or
00013  * (at your option) any later version
00014  *
00015  * Kamailio is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  * GNU General Public License for more details.
00019  *
00020  * You should have received a copy of the GNU General Public License 
00021  * along with this program; if not, write to the Free Software
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023  *
00024  * ---
00025  *
00026  * History
00027  * -------
00028  * 2003-02-24  added 'xj_wlist_set_flag' function (dcm)
00029  * 2003-03-11  major locking changes - uses locking.h (andrei)
00030  *
00031  */
00032 
00033 
00034 #include <string.h>
00035 #include <unistd.h>
00036 #include <stdio.h>
00037 
00038 #include "../../dprint.h"
00039 #include "../../mem/mem.h"
00040 #include "../../mem/shm_mem.h"
00041 
00042 #include "xjab_worker.h"
00043 #include "mdefines.h"
00044 
00045 #define XJ_DEF_JDELIM '*'
00046 
00047 /**
00048  * init a workers list
00049  * - pipes : communication pipes
00050  * - size : size of list - number of workers
00051  * - max : maximum number of jobs per worker
00052  * return : pointer to workers list or NULL on error
00053  */
00054 xj_wlist xj_wlist_init(int **pipes, int size, int max, int cache_time,
00055       int sleep_time, int delay_time)
00056 {
00057    int i;
00058    xj_wlist jwl = NULL;
00059 
00060    if(pipes == NULL || size <= 0 || max <= 0)
00061       return NULL;
00062 #ifdef XJ_EXTRA_DEBUG
00063    LM_DBG("-----START-----\n");
00064 #endif   
00065    jwl = (xj_wlist)_M_SHM_MALLOC(sizeof(t_xj_wlist));
00066    if(jwl == NULL)
00067       return NULL;
00068    jwl->len = size;
00069    jwl->maxj = max;
00070    
00071    jwl->cachet = cache_time;
00072    jwl->delayt = delay_time;
00073    jwl->sleept = sleep_time;
00074 
00075    jwl->aliases = NULL;
00076    jwl->sems = NULL;
00077    i = 0;
00078    /* alloc locks*/
00079    if((jwl->sems = lock_set_alloc(size)) == NULL){
00080       LM_CRIT("failed to alloc lock set\n");
00081       goto clean;
00082    };
00083    /* init the locks*/
00084    if (lock_set_init(jwl->sems)==0){
00085       LM_CRIT("failed to initialize the locks\n");
00086       goto clean;
00087    };
00088    jwl->workers = (xj_worker)_M_SHM_MALLOC(size*sizeof(t_xj_worker));
00089    if(jwl->workers == NULL){
00090       lock_set_destroy(jwl->sems);
00091       goto clean;
00092    }
00093 
00094    for(i = 0; i < size; i++)
00095    {
00096       jwl->workers[i].nr = 0;
00097       jwl->workers[i].pid = 0;
00098       jwl->workers[i].wpipe = pipes[i][1];
00099       jwl->workers[i].rpipe = pipes[i][0];
00100       if((jwl->workers[i].sip_ids = newtree234(xj_jkey_cmp)) == NULL){
00101          lock_set_destroy(jwl->sems);
00102          goto clean;
00103       }
00104    }  
00105 
00106    return jwl;
00107 
00108 clean:
00109    LM_DBG("error occurred -> cleaning\n");
00110    if(jwl->sems != NULL)
00111       lock_set_dealloc(jwl->sems);
00112    if(jwl->workers != NULL)
00113    {
00114       while(i>=0)
00115       {
00116          if(jwl->workers[i].sip_ids == NULL)
00117             free2tree234(jwl->workers[i].sip_ids, xj_jkey_free_p);
00118          i--;
00119       }
00120       _M_SHM_FREE(jwl->workers);
00121    }
00122    _M_SHM_FREE(jwl);
00123    return NULL;
00124 
00125 }
00126 
00127 /**
00128  * set the p.id's of the workers
00129  * - jwl : pointer to the workers list
00130  * - pids : p.id's array
00131  * - size : number of pids
00132  * return : 0 on success or <0 on error
00133  */
00134 int xj_wlist_set_pid(xj_wlist jwl, int pid, int idx)
00135 {
00136    if(jwl == NULL || pid <= 0 || idx < 0 || idx >= jwl->len)
00137       return -1;
00138    lock_set_get(jwl->sems, idx);
00139    jwl->workers[idx].pid = pid;
00140    lock_set_release(jwl->sems, idx);
00141    return 0;
00142 }
00143 
00144 /**
00145  * free jab_wlist
00146  * - jwl : pointer to the workers list
00147  */
00148 void xj_wlist_free(xj_wlist jwl)
00149 {
00150    int i;
00151 #ifdef XJ_EXTRA_DEBUG
00152    LM_DBG("freeing 'xj_wlist' memory ...\n");
00153 #endif
00154    if(jwl == NULL)
00155       return;
00156 
00157    if(jwl->workers != NULL)
00158    {
00159       for(i=0; i<jwl->len; i++)
00160          free2tree234(jwl->workers[i].sip_ids, xj_jkey_free_p);
00161       _M_SHM_FREE(jwl->workers);
00162    }
00163 
00164    if(jwl->aliases != NULL)
00165    {
00166       if(jwl->aliases->d)
00167          _M_SHM_FREE(jwl->aliases->d);
00168 
00169       if(jwl->aliases->jdm != NULL)
00170       {
00171          _M_SHM_FREE(jwl->aliases->jdm->s);
00172          _M_SHM_FREE(jwl->aliases->jdm);
00173       }
00174       if(jwl->aliases->proxy != NULL)
00175       {
00176          _M_SHM_FREE(jwl->aliases->proxy->s);
00177          _M_SHM_FREE(jwl->aliases->proxy);
00178       }
00179       if(jwl->aliases->size > 0)
00180       {
00181          for(i=0; i<jwl->aliases->size; i++)
00182             _M_SHM_FREE(jwl->aliases->a[i].s);
00183          _M_SHM_FREE(jwl->aliases->a);
00184       }
00185       _M_SHM_FREE(jwl->aliases);
00186       jwl->aliases = NULL;
00187    }
00188    
00189    if(jwl->sems != NULL){
00190       lock_set_destroy(jwl->sems);
00191       lock_set_dealloc(jwl->sems);
00192    }
00193    
00194    _M_SHM_FREE(jwl);
00195 }
00196 
00197 /**
00198  * return communication pipe with the worker that will process the message for
00199  *       the id 'sid' only if it exists, or -1 if error
00200  * - jwl : pointer to the workers list
00201  * - sid : id of the entity (connection to Jabber - usually SHOULD be FROM
00202  *   header of the incoming SIP message)
00203  * - p : will point to the SHM location of the 'sid' in jwl
00204  */
00205 int xj_wlist_check(xj_wlist jwl, xj_jkey jkey, xj_jkey *p)
00206 {
00207    int i;
00208    if(jwl==NULL || jkey==NULL || jkey->id==NULL || jkey->id->s==NULL)
00209       return -1;
00210    
00211    i = 0;
00212    *p = NULL;
00213    while(i < jwl->len)
00214    {
00215       lock_set_get(jwl->sems, i);
00216       if(jwl->workers[i].pid <= 0)
00217       {
00218          lock_set_release(jwl->sems, i);
00219          i++;
00220          continue;
00221       }
00222       if((*p = find234(jwl->workers[i].sip_ids, (void*)jkey, NULL)) != NULL)
00223       {
00224          lock_set_release(jwl->sems, i);
00225 #ifdef XJ_EXTRA_DEBUG
00226          LM_DBG("entry exists for <%.*s> in the"
00227             " pool of <%d> [%d]\n",jkey->id->len, jkey->id->s,
00228             jwl->workers[i].pid,i);
00229 #endif
00230          return jwl->workers[i].wpipe;
00231       }
00232       lock_set_release(jwl->sems, i);
00233       i++;
00234    }
00235 #ifdef XJ_EXTRA_DEBUG
00236    LM_DBG("entry does not exist for <%.*s>\n",
00237          jkey->id->len, jkey->id->s);
00238 #endif
00239    return -1;
00240 }
00241 
00242 /**
00243  * return communication pipe with the worker that will process the message for
00244  *       the id 'sid', or -1 if error
00245  * - jwl : pointer to the workers list
00246  * - sid : id of the entity (connection to Jabber - usually SHOULD be FROM
00247  *   header of the incoming SIP message)
00248  * - p : will point to the SHM location of the 'sid' in jwl
00249  */
00250 int xj_wlist_get(xj_wlist jwl, xj_jkey jkey, xj_jkey *p)
00251 {
00252    int i = 0, pos = -1, min = 100000;
00253    xj_jkey msid = NULL;
00254    
00255    if(jwl==NULL || jkey==NULL || jkey->id==NULL || jkey->id->s==NULL)
00256       return -1;
00257 
00258    *p = NULL;
00259    while(i < jwl->len)
00260    {
00261       lock_set_get(jwl->sems, i);
00262       if(jwl->workers[i].pid <= 0)
00263       {
00264          lock_set_release(jwl->sems, i);
00265          i++;
00266          continue;
00267       }
00268       if((*p = find234(jwl->workers[i].sip_ids, (void*)jkey, NULL))!=NULL)
00269       {
00270          if(pos >= 0)
00271             lock_set_release(jwl->sems, pos);
00272             lock_set_release(jwl->sems, i);
00273 #ifdef XJ_EXTRA_DEBUG
00274          LM_DBG("entry already exists for <%.*s> in the"
00275             " pool of <%d> [%d]\n",jkey->id->len, jkey->id->s,
00276             jwl->workers[i].pid,i);
00277 #endif
00278          return jwl->workers[i].wpipe;
00279       }
00280       if(min > jwl->workers[i].nr)
00281       {
00282          if(pos >= 0)
00283             lock_set_release(jwl->sems, pos);
00284          pos = i;
00285          min = jwl->workers[i].nr;
00286       }
00287       else
00288          lock_set_release(jwl->sems, i);
00289       i++;
00290    }
00291    if(pos >= 0 && jwl->workers[pos].nr < jwl->maxj)
00292    {
00293       jwl->workers[pos].nr++;
00294 
00295       msid = (xj_jkey)_M_SHM_MALLOC(sizeof(t_xj_jkey));
00296       if(msid == NULL)
00297          goto error;
00298       msid->id = (str*)_M_SHM_MALLOC(sizeof(str));
00299       if(msid->id == NULL)
00300       {
00301          _M_SHM_FREE(msid);
00302          goto error;
00303       }
00304       
00305       msid->id->s = (char*)_M_SHM_MALLOC(jkey->id->len);
00306       if(msid->id == NULL)
00307       {
00308          _M_SHM_FREE(msid->id);
00309          _M_SHM_FREE(msid);
00310          goto error;
00311       }
00312       
00313       if((*p = add234(jwl->workers[pos].sip_ids, msid)) != NULL)
00314       {
00315          msid->id->len = jkey->id->len;
00316          memcpy(msid->id->s, jkey->id->s, jkey->id->len);
00317          msid->hash = jkey->hash;
00318          msid->flag = XJ_FLAG_OPEN;
00319          lock_set_release(jwl->sems, pos);
00320 #ifdef XJ_EXTRA_DEBUG
00321          LM_DBG("new entry for <%.*s> in the pool of"
00322             " <%d> - [%d]\n", jkey->id->len, jkey->id->s,
00323             jwl->workers[pos].pid, pos);
00324 #endif
00325          return jwl->workers[pos].wpipe;
00326       }
00327       _M_SHM_FREE(msid->id->s);
00328       _M_SHM_FREE(msid->id);
00329       _M_SHM_FREE(msid);
00330    }
00331 
00332 error:
00333    if(pos >= 0)
00334       lock_set_release(jwl->sems, pos);
00335    LM_DBG("cannot create a new entry for <%.*s>\n",
00336             jkey->id->len, jkey->id->s);
00337    return -1;
00338 }
00339 
00340 /**
00341  * set the flag of the connection identified by 'jkey'
00342  *
00343  */
00344 int xj_wlist_set_flag(xj_wlist jwl, xj_jkey jkey, int fl)
00345 {
00346    int i;
00347    xj_jkey p = NULL;
00348    if(jwl==NULL || jkey==NULL || jkey->id==NULL || jkey->id->s==NULL)
00349       return -1;
00350    
00351 #ifdef XJ_EXTRA_DEBUG
00352    LM_DBG("looking for <%.*s>"
00353       " having id=%d\n", jkey->id->len, jkey->id->s, jkey->hash);
00354 #endif
00355          
00356    i = 0;
00357    while(i < jwl->len)
00358    {
00359       lock_set_get(jwl->sems, i);
00360       if(jwl->workers[i].pid <= 0)
00361       {
00362          lock_set_release(jwl->sems, i);
00363          i++;
00364          continue;
00365       }
00366       if((p=find234(jwl->workers[i].sip_ids, (void*)jkey, NULL)) != NULL)
00367       {
00368          p->flag = fl;
00369          lock_set_release(jwl->sems, i);
00370 #ifdef XJ_EXTRA_DEBUG
00371          LM_DBG("the connection for <%.*s>"
00372             " marked with flag=%d", jkey->id->len, jkey->id->s, fl);
00373 #endif
00374          return jwl->workers[i].wpipe;
00375       }
00376       lock_set_release(jwl->sems, i);
00377       i++;
00378    }
00379 #ifdef XJ_EXTRA_DEBUG
00380    LM_DBG("entry does not exist for <%.*s>\n",
00381          jkey->id->len, jkey->id->s);
00382 #endif
00383    return -1;
00384 }
00385 
00386 
00387 /**
00388  * set IM aliases, jdomain and outbound proxy
00389  *
00390  * return 0 if OK
00391  */
00392 int  xj_wlist_set_aliases(xj_wlist jwl, char *als, char *jd, char *pa)
00393 {
00394    char *p, *p0, *p1;
00395    int i, n;
00396    
00397    if(jwl == NULL)
00398       return -1;
00399    if(!jd) // || !als || strlen(als)<2)
00400       return 0;
00401    
00402    if((jwl->aliases = (xj_jalias)_M_SHM_MALLOC(sizeof(t_xj_jalias)))==NULL)
00403    {
00404       LM_DBG("not enough SHMemory.\n");
00405       return -1;
00406    }
00407    
00408    jwl->aliases->jdm = NULL;
00409    jwl->aliases->proxy = NULL;
00410    jwl->aliases->dlm = XJ_DEF_JDELIM; // default user part delimiter
00411    jwl->aliases->size = 0;
00412    jwl->aliases->a = NULL;
00413    jwl->aliases->d = NULL;
00414 
00415    // set the jdomain
00416    if(jd != NULL && (n=strlen(jd))>2)
00417    {
00418       p = jd;
00419       while(p < jd+n && *p!='=')
00420          p++;
00421       if(p<jd+n-1)
00422       {
00423          jwl->aliases->dlm = *(p+1);
00424          n = p - jd;
00425       }
00426       if((jwl->aliases->jdm = (str*)_M_SHM_MALLOC(sizeof(str)))== NULL)
00427       {
00428          LM_DBG("not enough SHMemory!?\n");
00429          _M_SHM_FREE(jwl->aliases);
00430          jwl->aliases = NULL;
00431          return -1;     
00432       }
00433       jwl->aliases->jdm->len = n;
00434       if((jwl->aliases->jdm->s=(char*)_M_SHM_MALLOC(jwl->aliases->jdm->len))
00435             == NULL)
00436       {
00437          LM_DBG("not enough SHMemory!?!\n");
00438          _M_SHM_FREE(jwl->aliases->jdm);
00439          _M_SHM_FREE(jwl->aliases);
00440          jwl->aliases = NULL;
00441       }
00442       strncpy(jwl->aliases->jdm->s, jd, jwl->aliases->jdm->len);
00443 #ifdef XJ_EXTRA_DEBUG
00444       LM_DBG("jdomain=%.*s delim=%c\n",
00445          jwl->aliases->jdm->len, jwl->aliases->jdm->s, jwl->aliases->dlm);
00446 #endif
00447    }
00448    
00449    // set the proxy address
00450    if(pa && strlen(pa)>0)
00451    {
00452       if((jwl->aliases->proxy = (str*)_M_SHM_MALLOC(sizeof(str)))==NULL)
00453       {
00454          LM_DBG(" not enough SHMemory!!\n");
00455          goto clean3;      
00456       }
00457       i = jwl->aliases->proxy->len = strlen(pa);
00458       // check if proxy address has sip: prefix
00459       if(i < 4 || pa[0]!='s' || pa[1]!='i' || pa[2]!='p' || pa[3]!=':')
00460          jwl->aliases->proxy->len += 4;
00461       if((jwl->aliases->proxy->s=
00462                (char*)_M_SHM_MALLOC(jwl->aliases->proxy->len))
00463             == NULL)
00464       {
00465          LM_DBG("not enough SHMemory!!!\n");
00466          _M_SHM_FREE(jwl->aliases->proxy);
00467          goto clean3;
00468       }
00469       p0 = jwl->aliases->proxy->s;
00470       if(jwl->aliases->proxy->len != i)
00471       {
00472          strncpy(p0, "sip:", 4);
00473          p0 += 4;
00474       }
00475       strncpy(p0, pa, i);
00476 #ifdef XJ_EXTRA_DEBUG
00477       LM_DBG("outbound proxy=[%.*s]\n",
00478          jwl->aliases->proxy->len, jwl->aliases->proxy->s);
00479 #endif
00480    }
00481    
00482    // set the IM aliases
00483    if(!als || strlen(als)<2)
00484       return 0;
00485    
00486    if((p = strchr(als, ';')) == NULL)
00487    {
00488       LM_DBG("bad parameter value\n");
00489       return -1;
00490    }
00491    
00492    if((jwl->aliases->size = atoi(als)) <= 0)
00493    {
00494       LM_DBG("wrong number of aliases\n");
00495       return 0;
00496    }
00497    
00498    jwl->aliases->d = (char*)_M_SHM_MALLOC(jwl->aliases->size*sizeof(char));
00499    if(jwl->aliases->d == NULL)
00500    {
00501       LM_DBG("not enough SHMemory..\n");
00502       goto clean2;
00503    }
00504    memset(jwl->aliases->d, 0, jwl->aliases->size);
00505    
00506    jwl->aliases->a = (str*)_M_SHM_MALLOC(jwl->aliases->size*sizeof(str));
00507    if(jwl->aliases->a == NULL)
00508    {
00509       LM_DBG("not enough SHMemory..\n");
00510       goto clean1;
00511    }
00512    
00513    p++;
00514    for(i=0; i<jwl->aliases->size; i++)
00515    {
00516       if((p0 = strchr(p, ';'))==NULL)
00517       {
00518          LM_DBG("bad parameter value format\n");
00519          goto clean;
00520       }
00521       n = p0 - p;
00522       p1 = strchr(p, '=');
00523       if(p1 && p1<p0-1)
00524       {
00525          jwl->aliases->d[i] = *(p1+1);
00526          n = p1 - p;
00527       }
00528       jwl->aliases->a[i].len = n;
00529       if((jwl->aliases->a[i].s = (char*)_M_SHM_MALLOC(jwl->aliases->a[i].len))
00530             == NULL)
00531       {
00532          LM_DBG("not enough SHMemory!\n");
00533          goto clean;
00534       }
00535          
00536       strncpy(jwl->aliases->a[i].s, p, jwl->aliases->a[i].len);
00537 #ifdef XJ_EXTRA_DEBUG
00538       LM_DBG("alias[%d/%d]=%.*s delim=%c\n", 
00539          i+1, jwl->aliases->size, jwl->aliases->a[i].len, 
00540          jwl->aliases->a[i].s, jwl->aliases->d[i]?jwl->aliases->d[i]:'X');
00541 #endif
00542       p = p0 + 1;
00543    }
00544    return 0;
00545 
00546 clean:
00547    while(i>0)
00548    {
00549       _M_SHM_FREE(jwl->aliases->a[i-1].s);
00550       i--;
00551    }
00552    _M_SHM_FREE(jwl->aliases->a);
00553 
00554 clean1:
00555    if(jwl->aliases->d)
00556       _M_SHM_FREE(jwl->aliases->d);
00557 
00558 clean2:
00559    if(jwl->aliases->proxy)
00560    {
00561       _M_SHM_FREE(jwl->aliases->proxy->s);
00562       _M_SHM_FREE(jwl->aliases->proxy);
00563    }
00564 clean3:
00565    if(jwl->aliases->jdm)
00566    {
00567       _M_SHM_FREE(jwl->aliases->jdm->s);
00568       _M_SHM_FREE(jwl->aliases->jdm);
00569    }
00570    _M_SHM_FREE(jwl->aliases);
00571    jwl->aliases = NULL;
00572    return -1;
00573 }
00574 
00575 
00576 /**
00577  * check if the addr contains jdomain or an alias
00578  * - jwl : pointer to the workers list
00579  * - addr: the address to check against jdomain and aliases
00580  * returns 0 - if contains or !=0 if not
00581  */
00582 int  xj_wlist_check_aliases(xj_wlist jwl, str *addr)
00583 {
00584    char *p, *p0;
00585    int ll, i;
00586    if(!jwl || !jwl->aliases || !addr || !addr->s || addr->len<=0)
00587       return -1;
00588 
00589    // find '@'
00590    p = addr->s;
00591    while(p < addr->s + addr->len && *p != '@')
00592       p++;
00593    if(p >= addr->s + addr->len)
00594       return -1;
00595    
00596    p++;
00597    ll = addr->s + addr->len - p;
00598    
00599    // check parameters
00600    p0 = p;
00601    while(p0 < p + ll && *p0 != ';')
00602       p0++;
00603    if(p0 < p + ll)
00604       ll = p0 - p;
00605    
00606    ll = addr->s + addr->len - p;
00607    if(jwl->aliases->jdm && jwl->aliases->jdm->len == ll && 
00608          !strncasecmp(jwl->aliases->jdm->s, p, ll))
00609       return 0;
00610 
00611    if(jwl->aliases->size <= 0)
00612       return 1;
00613    
00614    for(i = 0; i < jwl->aliases->size; i++)
00615       if(jwl->aliases->a[i].len == ll && 
00616          !strncasecmp(p, jwl->aliases->a[i].s, ll))
00617             return 0;
00618    return 1;
00619 }
00620 
00621 /**
00622  * delete an entity from working list of a worker
00623  * - jwl : pointer to the workers list
00624  * - sid : id of the entity (connection to Jabber - usually SHOULD be FROM
00625  *   header of the incoming SIP message
00626  * - _pid : process id of the worker
00627  */
00628 void xj_wlist_del(xj_wlist jwl, xj_jkey jkey, int _pid)
00629 {
00630    int i;
00631    void *p;
00632    if(jwl==NULL || jkey==NULL || jkey->id==NULL || jkey->id->s==NULL)
00633       return;
00634    for(i=0; i < jwl->len; i++)
00635       if(jwl->workers[i].pid == _pid)
00636          break;
00637    if(i >= jwl->len)
00638    {
00639       LM_DBG("%d: key <%.*s> not found in [%d]...\n",
00640          _pid, jkey->id->len, jkey->id->s, i);
00641       return;
00642    }
00643 #ifdef XJ_EXTRA_DEBUG
00644    LM_DBG("%d: trying to delete entry for <%.*s>...\n",
00645       _pid, jkey->id->len, jkey->id->s);
00646 #endif
00647    lock_set_get(jwl->sems, i);
00648    p = del234(jwl->workers[i].sip_ids, (void*)jkey);
00649 
00650    if(p != NULL)
00651    {
00652       jwl->workers[i].nr--;
00653 #ifdef XJ_EXTRA_DEBUG
00654       LM_DBG("%d: sip id <%.*s> deleted\n", _pid,
00655          jkey->id->len, jkey->id->s);
00656 #endif
00657       xj_jkey_free_p(p);
00658    }
00659 
00660    lock_set_release(jwl->sems, i);
00661 }
00662 

Generated on Fri May 25 00:00:35 2012 for Kamailio - The Open Source SIP Server by  doxygen 1.5.6