00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include <stdio.h>
00031 #include <stdlib.h>
00032 #include <string.h>
00033 #include <libxml/parser.h>
00034 #include <time.h>
00035
00036 #include "../../sr_module.h"
00037 #include "../../parser/parse_expires.h"
00038 #include "../../dprint.h"
00039 #include "../../mem/shm_mem.h"
00040 #include "../../parser/msg_parser.h"
00041 #include "../../str.h"
00042 #include "../../mem/mem.h"
00043 #include "../../pt.h"
00044 #include "../../db/db.h"
00045 #include "../tm/tm_load.h"
00046 #include "pua.h"
00047 #include "send_publish.h"
00048 #include "send_subscribe.h"
00049 #include "pua_bind.h"
00050 #include "pua_callback.h"
00051 #include "event_list.h"
00052 #include "add_events.h"
00053 #include "pidf.h"
00054
00055 MODULE_VERSION
00056 #define PUA_TABLE_VERSION 6
00057
00058 struct tm_binds tmb;
00059 htable_t* HashT= NULL;
00060 int HASH_SIZE= -1;
00061 extern int bind_pua(pua_api_t* api);
00062 int min_expires= 0;
00063 int default_expires=3600;
00064 static str db_url = str_init(DEFAULT_DB_URL);
00065 static str db_table= str_init("pua");
00066 int update_period= 100;
00067 str outbound_proxy = {0, 0};
00068 int startup_time = 0;
00069 int dlginfo_increase_version = 0;
00070 pua_event_t* pua_evlist= NULL;
00071
00072
00073 db_con_t *pua_db = NULL;
00074 db_func_t pua_dbf;
00075
00076
00077 static str str_pres_uri_col = str_init("pres_uri");
00078 static str str_pres_id_col = str_init("pres_id");
00079 static str str_expires_col= str_init("expires");
00080 static str str_flag_col= str_init("flag");
00081 static str str_etag_col= str_init("etag");
00082 static str str_tuple_id_col= str_init("tuple_id");
00083 static str str_watcher_uri_col= str_init("watcher_uri");
00084 static str str_call_id_col= str_init("call_id");
00085 static str str_to_tag_col= str_init("to_tag");
00086 static str str_from_tag_col= str_init("from_tag");
00087 static str str_cseq_col= str_init("cseq");
00088 static str str_event_col= str_init("event");
00089 static str str_record_route_col= str_init("record_route");
00090 static str str_contact_col= str_init("contact");
00091 static str str_remote_contact_col= str_init("remote_contact");
00092 static str str_extra_headers_col= str_init("extra_headers");
00093 static str str_desired_expires_col= str_init("desired_expires");
00094 static str str_version_col = str_init("version");
00095
00096
00097
00098 static int mod_init(void);
00099 static int child_init(int);
00100 static void destroy(void);
00101
00102 static int update_pua(ua_pres_t* p, unsigned int hash_code);
00103 static ua_pres_t* build_uppubl_cbparam(ua_pres_t* p);
00104
00105 static int db_restore(void);
00106 static void db_update(unsigned int ticks,void *param);
00107 static void hashT_clean(unsigned int ticks,void *param);
00108
00109 static cmd_export_t cmds[]=
00110 {
00111 {"bind_libxml_api", (cmd_function)bind_libxml_api, 1, 0, 0, 0},
00112 {"bind_pua", (cmd_function)bind_pua, 1, 0, 0, 0},
00113 {"pua_update_contact", (cmd_function)update_contact, 0, 0, 0, REQUEST_ROUTE},
00114 {0, 0, 0, 0, 0, 0}
00115 };
00116
00117 static param_export_t params[]={
00118 {"hash_size" , INT_PARAM, &HASH_SIZE },
00119 {"db_url" , STR_PARAM, &db_url.s },
00120 {"db_table" , STR_PARAM, &db_table.s },
00121 {"min_expires", INT_PARAM, &min_expires },
00122 {"default_expires", INT_PARAM, &default_expires },
00123 {"update_period", INT_PARAM, &update_period },
00124 {"outbound_proxy", STR_PARAM, &outbound_proxy.s },
00125 {"dlginfo_increase_version", INT_PARAM, &dlginfo_increase_version},
00126 {0, 0, 0 }
00127 };
00128
00129
00130 struct module_exports exports= {
00131 "pua",
00132 DEFAULT_DLFLAGS,
00133 cmds,
00134 params,
00135 0,
00136 0,
00137 0,
00138 0,
00139 mod_init,
00140 0,
00141 destroy,
00142 child_init
00143 };
00144
00145
00146
00147
00148 static int mod_init(void)
00149 {
00150 load_tm_f load_tm;
00151
00152 LM_DBG("...\n");
00153
00154 if(min_expires< 0)
00155 min_expires= 0;
00156
00157 if(default_expires< 600)
00158 default_expires= 3600;
00159
00160
00161 if((load_tm=(load_tm_f)find_export("load_tm", 0, 0))==NULL)
00162 {
00163 LM_ERR("can't import load_tm\n");
00164 return -1;
00165 }
00166
00167
00168 if(load_tm(&tmb)==-1)
00169 {
00170 LM_ERR("can't load tm functions\n");
00171 return -1;
00172 }
00173
00174 db_url.len = db_url.s ? strlen(db_url.s) : 0;
00175 LM_DBG("db_url=%s/%d/%p\n", ZSW(db_url.s), db_url.len, db_url.s);
00176 db_table.len = db_table.s ? strlen(db_table.s) : 0;
00177
00178
00179 if (db_bind_mod(&db_url, &pua_dbf))
00180 {
00181 LM_ERR("Database module not found\n");
00182 return -1;
00183 }
00184 if (!DB_CAPABILITY(pua_dbf, DB_CAP_ALL)) {
00185 LM_ERR("Database module does not implement all functions needed"
00186 " by the module\n");
00187 return -1;
00188 }
00189
00190 pua_db = pua_dbf.init(&db_url);
00191 if (!pua_db)
00192 {
00193 LM_ERR("while connecting database\n");
00194 return -1;
00195 }
00196
00197 if(db_check_table_version(&pua_dbf, pua_db, &db_table, PUA_TABLE_VERSION) < 0) {
00198 LM_ERR("error during table version check.\n");
00199 return -1;
00200 }
00201
00202 if(HASH_SIZE<=1)
00203 HASH_SIZE= 512;
00204 else
00205 HASH_SIZE = 1<<HASH_SIZE;
00206
00207 HashT= new_htable();
00208 if(HashT== NULL)
00209 {
00210 LM_ERR("while creating new hash table\n");
00211 return -1;
00212 }
00213 if(db_restore()< 0)
00214 {
00215 LM_ERR("while restoring hash_table\n");
00216 return -1;
00217 }
00218
00219 if(update_period<=0)
00220 {
00221 LM_ERR("wrong clean_period\n");
00222 return -1;
00223 }
00224 if ( init_puacb_list() < 0)
00225 {
00226 LM_ERR("callbacks initialization failed\n");
00227 return -1;
00228 }
00229 pua_evlist= init_pua_evlist();
00230 if(pua_evlist==0)
00231 {
00232 LM_ERR("when initializing pua_evlist\n");
00233 return -1;
00234 }
00235 if(pua_add_events()< 0)
00236 {
00237 LM_ERR("while adding events\n");
00238 return -1;
00239 }
00240
00241 startup_time = (int) time(NULL);
00242
00243 register_timer(hashT_clean, 0, update_period- 5);
00244
00245 register_timer(db_update, 0, update_period);
00246
00247
00248 if(pua_db)
00249 pua_dbf.close(pua_db);
00250 pua_db = NULL;
00251
00252 outbound_proxy.len = outbound_proxy.s ? strlen(outbound_proxy.s) : 0;
00253
00254 return 0;
00255 }
00256
00257 static int child_init(int rank)
00258 {
00259 if (pua_dbf.init==0)
00260 {
00261 LM_CRIT("database not bound\n");
00262 return -1;
00263 }
00264 pua_db = pua_dbf.init(&db_url);
00265 if (!pua_db)
00266 {
00267 LM_ERR("Child %d: connecting to database failed\n", rank);
00268 return -1;
00269 }
00270
00271 if (pua_dbf.use_table(pua_db, &db_table) < 0)
00272 {
00273 LM_ERR("child %d: Error in use_table pua\n", rank);
00274 return -1;
00275 }
00276
00277 LM_DBG("child %d: Database connection opened successfully\n", rank);
00278
00279 return 0;
00280 }
00281
00282 static void destroy(void)
00283 {
00284 if (puacb_list)
00285 destroy_puacb_list();
00286
00287 if(pua_db && HashT)
00288 db_update(0,0);
00289
00290 if(HashT)
00291 destroy_htable();
00292
00293 if(pua_db)
00294 pua_dbf.close(pua_db);
00295 if(pua_evlist)
00296 destroy_pua_evlist();
00297
00298 return ;
00299 }
00300
00301 static int db_restore(void)
00302 {
00303 ua_pres_t* p= NULL;
00304 db_key_t result_cols[19];
00305 db_res_t *res= NULL;
00306 db_row_t *row = NULL;
00307 db_val_t *row_vals= NULL;
00308 str pres_uri, pres_id;
00309 str etag, tuple_id;
00310 str watcher_uri, call_id;
00311 str to_tag, from_tag, remote_contact;
00312 str record_route, contact, extra_headers;
00313 int size= 0, i;
00314 int n_result_cols= 0;
00315 int puri_col,pid_col,expires_col,flag_col,etag_col, desired_expires_col;
00316 int watcher_col,callid_col,totag_col,fromtag_col,cseq_col,remote_contact_col;
00317 int event_col,contact_col,tuple_col,record_route_col, extra_headers_col;
00318 int version_col;
00319
00320 result_cols[puri_col=n_result_cols++] = &str_pres_uri_col;
00321 result_cols[pid_col=n_result_cols++] = &str_pres_id_col;
00322 result_cols[expires_col=n_result_cols++]= &str_expires_col;
00323 result_cols[flag_col=n_result_cols++] = &str_flag_col;
00324 result_cols[etag_col=n_result_cols++] = &str_etag_col;
00325 result_cols[tuple_col=n_result_cols++] = &str_tuple_id_col;
00326 result_cols[watcher_col=n_result_cols++]= &str_watcher_uri_col;
00327 result_cols[callid_col=n_result_cols++] = &str_call_id_col;
00328 result_cols[totag_col=n_result_cols++] = &str_to_tag_col;
00329 result_cols[fromtag_col=n_result_cols++]= &str_from_tag_col;
00330 result_cols[cseq_col= n_result_cols++] = &str_cseq_col;
00331 result_cols[event_col= n_result_cols++] = &str_event_col;
00332 result_cols[record_route_col= n_result_cols++] = &str_record_route_col;
00333 result_cols[contact_col= n_result_cols++] = &str_contact_col;
00334 result_cols[remote_contact_col= n_result_cols++] = &str_remote_contact_col;
00335 result_cols[extra_headers_col= n_result_cols++] = &str_extra_headers_col;
00336 result_cols[desired_expires_col= n_result_cols++] = &str_desired_expires_col;
00337 result_cols[version_col= n_result_cols++] = &str_version_col;
00338
00339 if(!pua_db)
00340 {
00341 LM_ERR("null database connection\n");
00342 return -1;
00343 }
00344
00345 if(pua_dbf.use_table(pua_db, &db_table)< 0)
00346 {
00347 LM_ERR("in use table\n");
00348 return -1;
00349 }
00350
00351 if(pua_dbf.query(pua_db,0, 0, 0, result_cols,0, n_result_cols, 0,&res)< 0)
00352 {
00353 LM_ERR("while querrying table\n");
00354 if(res)
00355 {
00356 pua_dbf.free_result(pua_db, res);
00357 res = NULL;
00358 }
00359 return -1;
00360 }
00361 if(res== NULL)
00362 return -1;
00363
00364 if(res->n<=0)
00365 {
00366 LM_INFO("the query returned no result\n");
00367 pua_dbf.free_result(pua_db, res);
00368 res = NULL;
00369 return 0;
00370 }
00371
00372 LM_DBG("found %d db entries\n", res->n);
00373
00374 for(i =0 ; i< res->n ; i++)
00375 {
00376 row = &res->rows[i];
00377 row_vals = ROW_VALUES(row);
00378
00379 pres_uri.s= (char*)row_vals[puri_col].val.string_val;
00380 pres_uri.len = strlen(pres_uri.s);
00381
00382 LM_DBG("pres_uri= %.*s\n", pres_uri.len, pres_uri.s);
00383
00384 memset(&etag, 0, sizeof(str));
00385 memset(&tuple_id, 0, sizeof(str));
00386 memset(&watcher_uri, 0, sizeof(str));
00387 memset(&call_id, 0, sizeof(str));
00388 memset(&to_tag, 0, sizeof(str));
00389 memset(&from_tag, 0, sizeof(str));
00390 memset(&record_route, 0, sizeof(str));
00391 memset(&pres_id, 0, sizeof(str));
00392 memset(&contact, 0, sizeof(str));
00393 memset(&remote_contact, 0, sizeof(str));
00394 memset(&extra_headers, 0, sizeof(str));
00395
00396 pres_id.s= (char*)row_vals[pid_col].val.string_val;
00397 if(pres_id.s)
00398 pres_id.len = strlen(pres_id.s);
00399
00400 if(row_vals[etag_col].val.string_val)
00401 {
00402 etag.s= (char*)row_vals[etag_col].val.string_val;
00403 etag.len = strlen(etag.s);
00404
00405 tuple_id.s= (char*)row_vals[tuple_col].val.string_val;
00406 tuple_id.len = strlen(tuple_id.s);
00407 }
00408
00409 if(row_vals[watcher_col].val.string_val)
00410 {
00411 watcher_uri.s= (char*)row_vals[watcher_col].val.string_val;
00412 watcher_uri.len = strlen(watcher_uri.s);
00413
00414 call_id.s= (char*)row_vals[callid_col].val.string_val;
00415 call_id.len = strlen(call_id.s);
00416
00417 to_tag.s= (char*)row_vals[totag_col].val.string_val;
00418 to_tag.len = strlen(to_tag.s);
00419
00420 from_tag.s= (char*)row_vals[fromtag_col].val.string_val;
00421 from_tag.len = strlen(from_tag.s);
00422
00423 if(row_vals[record_route_col].val.string_val)
00424 {
00425 record_route.s= (char*)row_vals[record_route_col].val.string_val;
00426 record_route.len= strlen(record_route.s);
00427 }
00428
00429 contact.s= (char*)row_vals[contact_col].val.string_val;
00430 contact.len = strlen(contact.s);
00431
00432 remote_contact.s= (char*)row_vals[remote_contact_col].val.string_val;
00433 remote_contact.len = strlen(remote_contact.s);
00434
00435 }
00436 extra_headers.s= (char*)row_vals[extra_headers_col].val.string_val;
00437 if(extra_headers.s)
00438 extra_headers.len= strlen(extra_headers.s);
00439 else
00440 extra_headers.len= 0;
00441
00442 size= sizeof(ua_pres_t)+ sizeof(str)+ (pres_uri.len+ pres_id.len+
00443 tuple_id.len)* sizeof(char);
00444 if(extra_headers.s)
00445 size+= sizeof(str)+ extra_headers.len* sizeof(char);
00446
00447 if(watcher_uri.s)
00448 size+= sizeof(str)+ (watcher_uri.len+ call_id.len+ to_tag.len+
00449 from_tag.len+ record_route.len+ contact.len)* sizeof(char);
00450
00451 p= (ua_pres_t*)shm_malloc(size);
00452 if(p== NULL)
00453 {
00454 LM_ERR("no more share memmory");
00455 goto error;
00456 }
00457 memset(p, 0, size);
00458 size= sizeof(ua_pres_t);
00459
00460 p->pres_uri= (str*)((char*)p+ size);
00461 size+= sizeof(str);
00462 p->pres_uri->s= (char*)p + size;
00463 memcpy(p->pres_uri->s, pres_uri.s, pres_uri.len);
00464 p->pres_uri->len= pres_uri.len;
00465 size+= pres_uri.len;
00466
00467 if(pres_id.s)
00468 {
00469 p->id.s= (char*)p + size;
00470 memcpy(p->id.s, pres_id.s, pres_id.len);
00471 p->id.len= pres_id.len;
00472 size+= pres_id.len;
00473 }
00474 if(tuple_id.s && tuple_id.len)
00475 {
00476 p->tuple_id.s= (char*)p + size;
00477 memcpy(p->tuple_id.s, tuple_id.s, tuple_id.len);
00478 p->tuple_id.len= tuple_id.len;
00479 size+= tuple_id.len;
00480 }
00481
00482 if(watcher_uri.s && watcher_uri.len)
00483 {
00484 p->watcher_uri= (str*)((char*)p+ size);
00485 size+= sizeof(str);
00486
00487 p->watcher_uri->s= (char*)p+ size;
00488 memcpy(p->watcher_uri->s, watcher_uri.s, watcher_uri.len);
00489 p->watcher_uri->len= watcher_uri.len;
00490 size+= watcher_uri.len;
00491
00492 p->to_tag.s= (char*)p+ size;
00493 memcpy(p->to_tag.s, to_tag.s, to_tag.len);
00494 p->to_tag.len= to_tag.len;
00495 size+= to_tag.len;
00496
00497 p->from_tag.s= (char*)p+ size;
00498 memcpy(p->from_tag.s, from_tag.s, from_tag.len);
00499 p->from_tag.len= from_tag.len;
00500 size+= from_tag.len;
00501
00502 p->call_id.s= (char*)p + size;
00503 memcpy(p->call_id.s, call_id.s, call_id.len);
00504 p->call_id.len= call_id.len;
00505 size+= call_id.len;
00506
00507 if(record_route.s && record_route.len)
00508 {
00509 p->record_route.s= (char*)p + size;
00510 memcpy(p->record_route.s, record_route.s, record_route.len);
00511 p->record_route.len= record_route.len;
00512 size+= record_route.len;
00513 }
00514 p->contact.s= (char*)p + size;
00515 memcpy(p->contact.s, contact.s, contact.len);
00516 p->contact.len= contact.len;
00517 size+= contact.len;
00518
00519 p->cseq= row_vals[cseq_col].val.int_val;
00520
00521 p->remote_contact.s= (char*)shm_malloc(remote_contact.len* sizeof(char));
00522 if(p->remote_contact.s== NULL)
00523 {
00524 LM_ERR("No more shared memory\n");
00525 goto error;
00526 }
00527 memcpy(p->remote_contact.s, remote_contact.s, remote_contact.len);
00528 p->remote_contact.len= remote_contact.len;
00529
00530 p->version= row_vals[version_col].val.int_val;
00531
00532 }
00533
00534 if(extra_headers.s)
00535 {
00536 p->extra_headers= (str*)((char*)p+ size);
00537 size+= sizeof(str);
00538 p->extra_headers->s= (char*)p+ size;
00539 memcpy(p->extra_headers->s, extra_headers.s, extra_headers.len);
00540 p->extra_headers->len= extra_headers.len;
00541 size+= extra_headers.len;
00542 }
00543
00544 LM_DBG("size= %d\n", size);
00545 p->event= row_vals[event_col].val.int_val;
00546 p->expires= row_vals[expires_col].val.int_val;
00547 p->desired_expires= row_vals[desired_expires_col].val.int_val;
00548 p->flag|= row_vals[flag_col].val.int_val;
00549
00550 memset(&p->etag, 0, sizeof(str));
00551 if(etag.s && etag.len)
00552 {
00553
00554 p->etag.s= (char*)shm_malloc(etag.len* sizeof(char));
00555 if(p->etag.s== NULL)
00556 {
00557 LM_ERR("no more share memory\n");
00558 goto error;
00559 }
00560 memcpy(p->etag.s, etag.s, etag.len);
00561 p->etag.len= etag.len;
00562 }
00563
00564 print_ua_pres(p);
00565 insert_htable(p);
00566 }
00567
00568 pua_dbf.free_result(pua_db, res);
00569 res = NULL;
00570
00571 if(pua_dbf.delete(pua_db, 0, 0 , 0, 0) < 0)
00572 {
00573 LM_ERR("while deleting information from db\n");
00574 goto error;
00575 }
00576
00577 return 0;
00578
00579 error:
00580 if(res)
00581 pua_dbf.free_result(pua_db, res);
00582
00583 if(p)
00584 shm_free(p);
00585 return -1;
00586 }
00587
00588 static void hashT_clean(unsigned int ticks,void *param)
00589 {
00590 int i;
00591 time_t now;
00592 ua_pres_t* p= NULL, *q= NULL;
00593
00594 now = time(NULL);
00595 for(i= 0;i< HASH_SIZE; i++)
00596 {
00597 lock_get(&HashT->p_records[i].lock);
00598 p= HashT->p_records[i].entity->next;
00599 while(p)
00600 {
00601 print_ua_pres(p);
00602 if(p->expires- update_period < now )
00603 {
00604 if((p->desired_expires> p->expires + min_expires) ||
00605 (p->desired_expires== 0 ))
00606 {
00607 if(update_pua(p, i)< 0)
00608 {
00609 LM_ERR("while updating record\n");
00610 lock_release(&HashT->p_records[i].lock);
00611 return;
00612 }
00613 p= p->next;
00614 continue;
00615 }
00616 if(p->expires < now - 10)
00617 {
00618 q= p->next;
00619 LM_DBG("Found expired: uri= %.*s\n", p->pres_uri->len,
00620 p->pres_uri->s);
00621 delete_htable(p, i);
00622 p= q;
00623 }
00624 else
00625 p= p->next;
00626 }
00627 else
00628 p= p->next;
00629 }
00630 lock_release(&HashT->p_records[i].lock);
00631 }
00632
00633
00634 }
00635
00636 int update_pua(ua_pres_t* p, unsigned int hash_code)
00637 {
00638 str* str_hdr= NULL;
00639 int expires;
00640 int result;
00641
00642 if(p->desired_expires== 0)
00643 expires= 3600;
00644 else
00645 expires= p->desired_expires- (int)time(NULL);
00646
00647 if(p->watcher_uri== NULL)
00648 {
00649 str met= {"PUBLISH", 7};
00650 ua_pres_t* cb_param;
00651
00652 str_hdr = publ_build_hdr(expires, get_event(p->event), NULL,
00653 &p->etag, p->extra_headers, 0);
00654 if(str_hdr == NULL)
00655 {
00656 LM_ERR("while building extra_headers\n");
00657 goto error;
00658 }
00659 LM_DBG("str_hdr:\n%.*s\n ", str_hdr->len, str_hdr->s);
00660
00661 cb_param= build_uppubl_cbparam(p);
00662 if(cb_param== NULL)
00663 {
00664 LM_ERR("while constructing publ callback param\n");
00665 goto error;
00666 }
00667 result= tmb.t_request(&met,
00668 p->pres_uri,
00669 p->pres_uri,
00670 p->pres_uri,
00671 str_hdr,
00672 0,
00673 0,
00674 publ_cback_func,
00675 (void*)cb_param
00676 );
00677 if(result< 0)
00678 {
00679 LM_ERR("in t_request function\n");
00680 shm_free(cb_param);
00681 goto error;
00682 }
00683
00684 }
00685 else
00686 {
00687 str met= {"SUBSCRIBE", 9};
00688 dlg_t* td= NULL;
00689 ua_pres_t* cb_param= NULL;
00690
00691 td= pua_build_dlg_t(p);
00692 if(td== NULL)
00693 {
00694 LM_ERR("while building tm dlg_t structure");
00695 goto error;
00696 };
00697
00698 str_hdr= subs_build_hdr(&p->contact, expires,p->event,p->extra_headers);
00699 if(str_hdr== NULL || str_hdr->s== NULL)
00700 {
00701 LM_ERR("while building extra headers\n");
00702 pkg_free(td);
00703 return -1;
00704 }
00705 cb_param= subs_cbparam_indlg(p, expires, REQ_ME);
00706 if(cb_param== NULL)
00707 {
00708 LM_ERR("while constructing subs callback param\n");
00709 goto error;
00710
00711 }
00712 result= tmb.t_request_within
00713 (&met,
00714 str_hdr,
00715 0,
00716 td,
00717 subs_cback_func,
00718 (void*)cb_param
00719 );
00720 if(result< 0)
00721 {
00722 LM_ERR("in t_request function\n");
00723 shm_free(cb_param);
00724 pkg_free(td);
00725 goto error;
00726 }
00727
00728 pkg_free(td);
00729 td= NULL;
00730 }
00731
00732 pkg_free(str_hdr);
00733 return 0;
00734
00735 error:
00736 if(str_hdr)
00737 pkg_free(str_hdr);
00738 return -1;
00739
00740 }
00741
00742 static void db_update(unsigned int ticks,void *param)
00743 {
00744 ua_pres_t* p= NULL;
00745 db_key_t q_cols[20], result_cols[1];
00746 db_res_t *res= NULL;
00747 db_key_t db_cols[5];
00748 db_val_t q_vals[20], db_vals[5];
00749 db_op_t db_ops[1] ;
00750 int n_query_cols= 0, n_query_update= 0;
00751 int n_update_cols= 0;
00752 int i;
00753 int puri_col,pid_col,expires_col,flag_col,etag_col,tuple_col,event_col;
00754 int watcher_col,callid_col,totag_col,fromtag_col,record_route_col,cseq_col;
00755 int no_lock= 0, contact_col, desired_expires_col, extra_headers_col;
00756 int remote_contact_col, version_col;
00757
00758 if(ticks== 0 && param == NULL)
00759 no_lock= 1;
00760
00761
00762 q_cols[puri_col= n_query_cols] = &str_pres_uri_col;
00763 q_vals[puri_col].type = DB_STR;
00764 q_vals[puri_col].nul = 0;
00765 n_query_cols++;
00766
00767 q_cols[pid_col= n_query_cols] = &str_pres_id_col;
00768 q_vals[pid_col].type = DB_STR;
00769 q_vals[pid_col].nul = 0;
00770 n_query_cols++;
00771
00772 q_cols[flag_col= n_query_cols] = &str_flag_col;
00773 q_vals[flag_col].type = DB_INT;
00774 q_vals[flag_col].nul = 0;
00775 n_query_cols++;
00776
00777 q_cols[event_col= n_query_cols] = &str_event_col;
00778 q_vals[event_col].type = DB_INT;
00779 q_vals[event_col].nul = 0;
00780 n_query_cols++;
00781
00782 q_cols[watcher_col= n_query_cols] = &str_watcher_uri_col;
00783 q_vals[watcher_col].type = DB_STR;
00784 q_vals[watcher_col].nul = 0;
00785 n_query_cols++;
00786
00787 q_cols[callid_col= n_query_cols] = &str_call_id_col;
00788 q_vals[callid_col].type = DB_STR;
00789 q_vals[callid_col].nul = 0;
00790 n_query_cols++;
00791
00792 q_cols[totag_col= n_query_cols] = &str_to_tag_col;
00793 q_vals[totag_col].type = DB_STR;
00794 q_vals[totag_col].nul = 0;
00795 n_query_cols++;
00796
00797 q_cols[fromtag_col= n_query_cols] = &str_from_tag_col;
00798 q_vals[fromtag_col].type = DB_STR;
00799 q_vals[fromtag_col].nul = 0;
00800 n_query_cols++;
00801
00802 q_cols[etag_col= n_query_cols] = &str_etag_col;
00803 q_vals[etag_col].type = DB_STR;
00804 q_vals[etag_col].nul = 0;
00805 n_query_cols++;
00806
00807 q_cols[tuple_col= n_query_cols] = &str_tuple_id_col;
00808 q_vals[tuple_col].type = DB_STR;
00809 q_vals[tuple_col].nul = 0;
00810 n_query_cols++;
00811
00812 q_cols[cseq_col= n_query_cols]= &str_cseq_col;
00813 q_vals[cseq_col].type = DB_INT;
00814 q_vals[cseq_col].nul = 0;
00815 n_query_cols++;
00816
00817 q_cols[expires_col= n_query_cols] = &str_expires_col;
00818 q_vals[expires_col].type = DB_INT;
00819 q_vals[expires_col].nul = 0;
00820 n_query_cols++;
00821
00822 q_cols[desired_expires_col= n_query_cols] = &str_desired_expires_col;
00823 q_vals[desired_expires_col].type = DB_INT;
00824 q_vals[desired_expires_col].nul = 0;
00825 n_query_cols++;
00826
00827 q_cols[record_route_col= n_query_cols] = &str_record_route_col;
00828 q_vals[record_route_col].type = DB_STR;
00829 q_vals[record_route_col].nul = 0;
00830 n_query_cols++;
00831
00832 q_cols[contact_col= n_query_cols] = &str_contact_col;
00833 q_vals[contact_col].type = DB_STR;
00834 q_vals[contact_col].nul = 0;
00835 n_query_cols++;
00836
00837 q_cols[remote_contact_col= n_query_cols] = &str_remote_contact_col;
00838 q_vals[remote_contact_col].type = DB_STR;
00839 q_vals[remote_contact_col].nul = 0;
00840 n_query_cols++;
00841
00842 q_cols[version_col= n_query_cols] = &str_version_col;
00843 q_vals[version_col].type = DB_INT;
00844 q_vals[version_col].nul = 0;
00845 n_query_cols++;
00846
00847
00848 q_cols[extra_headers_col= n_query_cols] = &str_extra_headers_col;
00849 q_vals[extra_headers_col].type = DB_STR;
00850 q_vals[extra_headers_col].nul = 0;
00851 n_query_cols++;
00852
00853
00854 db_cols[0]= &str_expires_col;
00855 db_vals[0].type = DB_INT;
00856 db_vals[0].nul = 0;
00857
00858 db_cols[1]= &str_cseq_col;
00859 db_vals[1].type = DB_INT;
00860 db_vals[1].nul = 0;
00861
00862 db_cols[2]= &str_etag_col;
00863 db_vals[2].type = DB_STR;
00864 db_vals[2].nul = 0;
00865
00866 db_cols[3]= &str_desired_expires_col;
00867 db_vals[3].type = DB_INT;
00868 db_vals[3].nul = 0;
00869
00870 db_cols[4]= &str_version_col;
00871 db_vals[4].type = DB_INT;
00872 db_vals[4].nul = 0;
00873
00874 result_cols[0]= &str_expires_col;
00875
00876 if(pua_db== NULL)
00877 {
00878 LM_ERR("null database connection\n");
00879 return;
00880 }
00881
00882 if(pua_dbf.use_table(pua_db, &db_table)< 0)
00883 {
00884 LM_ERR("in use table\n");
00885 return ;
00886 }
00887
00888 for(i=0; i<HASH_SIZE; i++)
00889 {
00890 if(!no_lock)
00891 lock_get(&HashT->p_records[i].lock);
00892
00893 p = HashT->p_records[i].entity->next;
00894 while(p)
00895 {
00896 if(p->expires - (int)time(NULL)< 0)
00897 {
00898 p= p->next;
00899 continue;
00900 }
00901
00902 switch(p->db_flag)
00903 {
00904 case NO_UPDATEDB_FLAG:
00905 {
00906 LM_DBG("NO_UPDATEDB_FLAG\n");
00907 break;
00908 }
00909
00910 case UPDATEDB_FLAG:
00911 {
00912 LM_DBG("UPDATEDB_FLAG\n");
00913 n_update_cols= 0;
00914 n_query_update= 0;
00915
00916 q_vals[puri_col].val.str_val = *(p->pres_uri);
00917 n_query_update++;
00918
00919 q_vals[pid_col].val.str_val = p->id;
00920 n_query_update++;
00921
00922 q_vals[flag_col].val.int_val = p->flag;
00923 n_query_update++;
00924
00925 q_vals[event_col].val.int_val = p->event;
00926 n_query_update++;
00927
00928 if(p->watcher_uri)
00929 {
00930 q_vals[watcher_col].val.str_val = *(p->watcher_uri);
00931 n_query_update++;
00932
00933 q_vals[callid_col].val.str_val = p->call_id;
00934 n_query_update++;
00935
00936 q_vals[totag_col].val.str_val = p->to_tag;
00937 n_query_update++;
00938
00939 q_vals[fromtag_col].val.str_val = p->from_tag;
00940 n_query_update++;
00941 }
00942
00943 db_vals[0].val.int_val= p->expires;
00944 n_update_cols++;
00945
00946 db_vals[1].val.int_val= p->cseq ;
00947 n_update_cols++;
00948
00949 db_vals[2].val.str_val= p->etag ;
00950 n_update_cols++;
00951
00952 db_vals[3].val.int_val= p->desired_expires;
00953 n_update_cols++;
00954
00955 db_vals[4].val.int_val= p->version;
00956 n_update_cols++;
00957
00958 LM_DBG("Updating:n_query_update= %d\tn_update_cols= %d\n",
00959 n_query_update, n_update_cols);
00960
00961 if(pua_dbf.query(pua_db, q_cols, 0, q_vals,
00962 result_cols, n_query_update, 1, 0, &res)< 0)
00963 {
00964 LM_ERR("while querying db table pua\n");
00965 if(!no_lock)
00966 lock_release(&HashT->p_records[i].lock);
00967 if(res)
00968 pua_dbf.free_result(pua_db, res);
00969 return ;
00970 }
00971 if(res && res->n> 0)
00972 {
00973 if(pua_dbf.update(pua_db, q_cols, 0, q_vals, db_cols,
00974 db_vals, n_query_update, n_update_cols)<0)
00975 {
00976 LM_ERR("while updating in database\n");
00977 if(!no_lock)
00978 lock_release(&HashT->p_records[i].lock);
00979 pua_dbf.free_result(pua_db, res);
00980 res= NULL;
00981 return ;
00982 }
00983 pua_dbf.free_result(pua_db, res);
00984 res= NULL;
00985 }
00986 else
00987 {
00988 if(res)
00989 {
00990 pua_dbf.free_result(pua_db, res);
00991 res= NULL;
00992 }
00993 LM_DBG("UPDATEDB_FLAG and no record found\n");
00994
00995 }
00996 break;
00997 }
00998 case INSERTDB_FLAG:
00999 {
01000 LM_DBG("INSERTDB_FLAG\n");
01001 q_vals[puri_col].val.str_val = *(p->pres_uri);
01002 q_vals[pid_col].val.str_val = p->id;
01003 q_vals[flag_col].val.int_val = p->flag;
01004 if((p->watcher_uri))
01005 q_vals[watcher_col].val.str_val = *(p->watcher_uri);
01006 else
01007 memset(& q_vals[watcher_col].val.str_val ,0, sizeof(str));
01008 q_vals[tuple_col].val.str_val = p->tuple_id;
01009 q_vals[etag_col].val.str_val = p->etag;
01010 q_vals[callid_col].val.str_val = p->call_id;
01011 q_vals[totag_col].val.str_val = p->to_tag;
01012 q_vals[fromtag_col].val.str_val = p->from_tag;
01013 q_vals[cseq_col].val.int_val= p->cseq;
01014 q_vals[expires_col].val.int_val = p->expires;
01015 q_vals[desired_expires_col].val.int_val = p->desired_expires;
01016 q_vals[event_col].val.int_val = p->event;
01017 q_vals[version_col].val.int_val = p->version;
01018
01019 if(p->record_route.s== NULL)
01020 {
01021 q_vals[record_route_col].val.str_val.s= "";
01022 q_vals[record_route_col].val.str_val.len = 0;
01023 }
01024 else
01025 q_vals[record_route_col].val.str_val = p->record_route;
01026
01027 q_vals[contact_col].val.str_val = p->contact;
01028 if(p->remote_contact.s)
01029 {
01030 q_vals[remote_contact_col].val.str_val = p->remote_contact;
01031 LM_DBG("p->remote_contact = %.*s\n", p->remote_contact.len, p->remote_contact.s);
01032 }
01033 else
01034 {
01035 q_vals[remote_contact_col].val.str_val.s = "";
01036 q_vals[remote_contact_col].val.str_val.len = 0;
01037 }
01038
01039 if(p->extra_headers)
01040 q_vals[extra_headers_col].val.str_val = *(p->extra_headers);
01041 else
01042 n_query_cols--;
01043
01044 if(pua_dbf.insert(pua_db, q_cols, q_vals,n_query_cols )<0)
01045 {
01046 LM_ERR("while inserting in db table pua\n");
01047 if(!no_lock)
01048 lock_release(&HashT->p_records[i].lock);
01049 return ;
01050 }
01051 break;
01052 }
01053
01054 }
01055 p->db_flag= NO_UPDATEDB_FLAG;
01056 p= p->next;
01057 }
01058 if(!no_lock)
01059 lock_release(&HashT->p_records[i].lock);
01060 }
01061
01062 db_vals[0].val.int_val= (int)time(NULL)- 10;
01063 db_ops[0]= OP_LT;
01064 if(pua_dbf.delete(pua_db, db_cols, db_ops, db_vals, 1) < 0)
01065 {
01066 LM_ERR("while deleting from db table pua\n");
01067 }
01068
01069 return ;
01070 }
01071
01072 static ua_pres_t* build_uppubl_cbparam(ua_pres_t* p)
01073 {
01074 publ_info_t publ;
01075 ua_pres_t* cb_param= NULL;
01076
01077 memset(&publ, 0, sizeof(publ_info_t));
01078 publ.pres_uri= p->pres_uri;
01079 publ.content_type= p->content_type;
01080 publ.id= p->id;
01081 publ.expires= (p->desired_expires== 0) ?-1:p->desired_expires- (int)time(NULL);
01082 publ.flag= UPDATE_TYPE;
01083 publ.source_flag= p->flag;
01084 publ.event= p->event;
01085 publ.etag= &p->etag;
01086 publ.extra_headers= p->extra_headers;
01087
01088 cb_param= publish_cbparam(&publ, NULL, &p->tuple_id, REQ_ME);
01089 if(cb_param== NULL)
01090 {
01091 LM_ERR("constructing callback parameter\n");
01092 return NULL;
01093 }
01094 return cb_param;
01095 }
01096