diff options
author | Patrick Ohly <patrick.ohly@intel.com> | 2013-06-10 15:56:36 +0200 |
---|---|---|
committer | Patrick Ohly <patrick.ohly@intel.com> | 2013-06-19 17:28:50 +0200 |
commit | a4f359be3c45bea3695dcb7ef6b119a4832b037c (patch) | |
tree | 3a385ce23307954b14636b368728dd9456036287 | |
parent | 1dd618fa165b0b66465f97eca8d7b31d88b3b52b (diff) |
asynchronous command execution
It can be useful to trigger add/update/delete requests and check
for completion later. For example, combining multiple updates
in one operation is more efficient in the Evolution Data Server
(used by SyncEvolution). Another advantage is the execution of
these changes in the background while already exchanging the
next message with the peer.
This patch implements asynchronous add/update/delete operations for
client and server. The functionality can be requested by data stores
by returning the new LOCERR_AGAIN status code.
The <strictexecordering> flag must be set to 'false' before multiple
pending operations are allowed. It is still 'true' by default, in
which case once the first operation returns LOCERR_AGAIN, all further
operations are queued immediately without submitting them to the store
first.
The flag used to allow processing further commands when the first Sync
command was queued, primarily (exclusively?) because of a slow data
store 'start data read' operation. Testing showed that allowing that
breaks syncing with the Synthesis engine, so instead of allowing
something of dubious value the code now always serializes commands
when anything more than complex than add/delete/replace/copy is
involved.
In that sense the commit reduces the semantic of <strictexecordering>
slightly from "allow broken SyncML behavior" to "enable asynchronous
command execution". Together with the next commit, that hopefully can
be done without violating the SyncML spec.
Regarding the implementation, When getting the LOCERR_AGAIN status
back, all levels involved in the processing of the change dump their
local state into some kind of ausiliary data structure and attach that
to the item. They check for that auxiliary data the next time they get
called and if found, know that they need to resume operation where
they left of before.
For small functions, this is done with changes the if/else code
branches. For large functions, the changes would have been too
intrusive and hard to verify for correctness, because the control flow
often depends on variables that may change as part of the
execution. An approach based on goto was used in those cases (kind of
like a reverse long jump).
TSyncItem stores the auxiliary data in a hash indexed with hard-coded
keys for each layer. Arbitrary string keys were considered, but
discarded because of the (unnecessary) performance overhead. The
SyncML toolkit's SmlItem has a new pointer that holds the additional
state. The toolkit itself only sets it and knows how to free it via
a callback.
There is no explicit "flush pending changes" or "complete changes"
operation on data stores, because that would require extending the
plugin API. Instead the data stores are expected to finish the item
change(s) when called a second time. Normally the second call happens
at the beginning of the next message processing, thus achieving the
goal of overlapping processing with network IO.
The exception is the last message in a session. Testing showed that
delaying item changes in the SyncML client's final message confused
Synthesis client and/or server so that they no longer agree on the end
of session. Now the pending commands are issued a second time at the
end of processing that final message, which solved that issue.
There is one caveat that needs to be addressed in the next commit:
statuses must be issued in the order of their commands. At the moment,
the engine in its default "strict exec ordering" mode will stop
issuing more commands after the initial one was queued. A different
way of ensuring correct order will be needed.
-rwxr-xr-x | src/DB_interfaces/api_db/pluginapids.cpp | 201 | ||||
-rwxr-xr-x | src/syncml_tk/src/sml/inc/smldtd.h | 11 | ||||
-rwxr-xr-x | src/syncml_tk/src/sml/mgr/all/mgrutil.c | 2 | ||||
-rwxr-xr-x | src/sysync/customimplds.cpp | 105 | ||||
-rw-r--r-- | src/sysync/localengineds.cpp | 369 | ||||
-rw-r--r-- | src/sysync/stdlogicds.cpp | 84 | ||||
-rwxr-xr-x | src/sysync/syncitem.cpp | 16 | ||||
-rwxr-xr-x | src/sysync/syncitem.h | 19 | ||||
-rw-r--r-- | src/sysync/syncsession.cpp | 60 | ||||
-rwxr-xr-x | src/sysync/syncsession.h | 1 | ||||
-rw-r--r-- | src/sysync_SDK/Sources/syerror.h | 8 | ||||
-rwxr-xr-x | src/sysync_SDK/Sources/sysync_utils.cpp | 2 |
12 files changed, 773 insertions, 105 deletions
diff --git a/src/DB_interfaces/api_db/pluginapids.cpp b/src/DB_interfaces/api_db/pluginapids.cpp index a912305..9a400b2 100755 --- a/src/DB_interfaces/api_db/pluginapids.cpp +++ b/src/DB_interfaces/api_db/pluginapids.cpp @@ -1321,7 +1321,15 @@ localstatus TPluginApiDS::apiStartDataWrite(void) return dberr; } // TPluginApiDS::apiStartDataWrite - +struct TPluginItemAux : public TSyncItemAux +{ +#if defined(DBAPI_ASKEYITEMS) && defined(ENGINEINTERFACE_SUPPORT) + TDBItemKey *fItemKeyP; +#endif +#ifdef DBAPI_TEXTITEMS + string fItemData; +#endif +}; // add new item to datastore, returns created localID localstatus TPluginApiDS::apiAddItem(TMultiFieldItem &aItem, string &aLocalID) @@ -1334,39 +1342,77 @@ localstatus TPluginApiDS::apiAddItem(TMultiFieldItem &aItem, string &aLocalID) TSyError dberr=LOCERR_OK; TDB_Api_ItemID itemAndParentID; - // prepare data from item #ifdef SCRIPT_SUPPORT fInserting=true; // flag for script, we are inserting new record #endif - // two API variants - #if defined(DBAPI_ASKEYITEMS) && defined(ENGINEINTERFACE_SUPPORT) - if (fPluginDSConfigP->fItemAsKey) { - // preprocess - if (!preWriteProcessItem(aItem)) return 510; // DB error - // get key - TDBItemKey *itemKeyP = newDBItemKey(&aItem); - // let plugin use it to obtain data to write - dberr=fDBApi_Data.InsertItemAsKey((KeyH)itemKeyP,"",itemAndParentID); - // done with the key - delete itemKeyP; - } - else - #endif - #ifdef DBAPI_TEXTITEMS - { - string itemData; - generateDBItemData( - false, // all fields, not only assigned ones - aItem, - 0, // we do not use different sets for now - itemData // here we'll get the data - ); - // now insert main record - dberr=fDBApi_Data.InsertItem(itemData.c_str(),"",itemAndParentID); + + TPluginItemAux *aux = static_cast<TPluginItemAux *>(aItem.getAux(TSyncItem::PLUGIN_API)); + if (aux) { + // Continue operation. + #if defined(DBAPI_ASKEYITEMS) && defined(ENGINEINTERFACE_SUPPORT) + if (fPluginDSConfigP->fItemAsKey) { + dberr=fDBApi_Data.InsertItemAsKey((KeyH)aux->fItemKeyP,"",itemAndParentID); + if (dberr == LOCERR_AGAIN) + return dberr; + // done with the key + delete aux->fItemKeyP; + aux->fItemKeyP=NULL; + } + else + #endif + #ifdef DBAPI_TEXTITEMS + { + dberr=fDBApi_Data.InsertItem(aux->fItemData.c_str(),"",itemAndParentID); + if (dberr == LOCERR_AGAIN) + return dberr; + } + #else + return LOCERR_WRONGUSAGE; + #endif + } else { + // Two API variants for starting the operation. + #if defined(DBAPI_ASKEYITEMS) && defined(ENGINEINTERFACE_SUPPORT) + if (fPluginDSConfigP->fItemAsKey) { + // preprocess + if (!preWriteProcessItem(aItem)) return 510; // DB error + // get key + TDBItemKey *itemKeyP = newDBItemKey(&aItem); + // let plugin use it to obtain data to write + dberr=fDBApi_Data.InsertItemAsKey((KeyH)itemKeyP,"",itemAndParentID); + if (dberr == LOCERR_AGAIN) { + TPluginItemAux *aux=new TPluginItemAux; + aux->fItemKeyP=itemKeyP; + aItem.setAux(TSyncItem::PLUGIN_API, aux); + return LOCERR_AGAIN; + } + // done with the key + delete itemKeyP; + } + else + #endif + #ifdef DBAPI_TEXTITEMS + { + string itemData; + generateDBItemData( + false, // all fields, not only assigned ones + aItem, + 0, // we do not use different sets for now + itemData // here we'll get the data + ); + // now insert main record + dberr=fDBApi_Data.InsertItem(itemData.c_str(),"",itemAndParentID); + if (dberr == LOCERR_AGAIN) { + TPluginItemAux *aux=new TPluginItemAux; + aux->fItemData=itemData; + aItem.setAux(TSyncItem::PLUGIN_API, aux); + return LOCERR_AGAIN; + } + } + #else + return LOCERR_WRONGUSAGE; // completely wrong usage - should never happen as compatibility is tested at module connect + #endif } - #else - return LOCERR_WRONGUSAGE; // completely wrong usage - should never happen as compatibility is tested at module connect - #endif + // now check result if (dberr==LOCERR_OK || dberr==DB_Conflict || @@ -1442,39 +1488,76 @@ localstatus TPluginApiDS::apiUpdateItem(TMultiFieldItem &aItem) itemAndParentID.item=(appCharP)aItem.getLocalID(); itemAndParentID.parent=const_cast<char *>(""); - // prepare data from item #ifdef SCRIPT_SUPPORT fInserting=false; // flag for script, we are updating, not inserting now #endif - // two API variants - #if defined(DBAPI_ASKEYITEMS) && defined(ENGINEINTERFACE_SUPPORT) - if (fPluginDSConfigP->fItemAsKey) { - // preprocess - if (!preWriteProcessItem(aItem)) return 510; // DB error - // get key - TDBItemKey *itemKeyP = newDBItemKey(&aItem); - // let plugin use it to obtain data to write - dberr=fDBApi_Data.UpdateItemAsKey((KeyH)itemKeyP,itemAndParentID,updItemAndParentID); - // done with the key - delete itemKeyP; - } - else - #endif - #ifdef DBAPI_TEXTITEMS - { - string itemData; - generateDBItemData( - true, // only assigned fields - aItem, - 0, // we do not use different sets for now - itemData // here we'll get the data - ); - // now update main record - dberr=fDBApi_Data.UpdateItem(itemData.c_str(),itemAndParentID,updItemAndParentID); + + TPluginItemAux *aux = static_cast<TPluginItemAux *>(aItem.getAux(TSyncItem::PLUGIN_API)); + if (aux) { + // Continue operation. + #if defined(DBAPI_ASKEYITEMS) && defined(ENGINEINTERFACE_SUPPORT) + if (fPluginDSConfigP->fItemAsKey) { + dberr=fDBApi_Data.UpdateItemAsKey((KeyH)aux->fItemKeyP,itemAndParentID,updItemAndParentID); + if (dberr == LOCERR_AGAIN) + return dberr; + // done with the key + delete aux->fItemKeyP; + aux->fItemKeyP=NULL; + } + else + #endif + #ifdef DBAPI_TEXTITEMS + { + dberr=fDBApi_Data.UpdateItem(aux->fItemData.c_str(),itemAndParentID,updItemAndParentID); + if (dberr == LOCERR_AGAIN) + return dberr; + } + #else + return LOCERR_WRONGUSAGE; // completely wrong usage - should never happen as compatibility is tested at module connect + #endif + } else { + // Two API variants for starting the operation. + #if defined(DBAPI_ASKEYITEMS) && defined(ENGINEINTERFACE_SUPPORT) + if (fPluginDSConfigP->fItemAsKey) { + // preprocess + if (!preWriteProcessItem(aItem)) return 510; // DB error + // get key + TDBItemKey *itemKeyP = newDBItemKey(&aItem); + // let plugin use it to obtain data to write + dberr=fDBApi_Data.UpdateItemAsKey((KeyH)itemKeyP,itemAndParentID,updItemAndParentID); + if (dberr == LOCERR_AGAIN) { + TPluginItemAux *aux=new TPluginItemAux; + aux->fItemKeyP=itemKeyP; + aItem.setAux(TSyncItem::PLUGIN_API, aux); + return LOCERR_AGAIN; + } + // done with the key + delete itemKeyP; + } + else + #endif + #ifdef DBAPI_TEXTITEMS + { + string itemData; + generateDBItemData( + true, // only assigned fields + aItem, + 0, // we do not use different sets for now + itemData // here we'll get the data + ); + // now update main record + dberr=fDBApi_Data.UpdateItem(itemData.c_str(),itemAndParentID,updItemAndParentID); + if (dberr == LOCERR_AGAIN) { + TPluginItemAux *aux=new TPluginItemAux; + aux->fItemData=itemData; + aItem.setAux(TSyncItem::PLUGIN_API, aux); + return LOCERR_AGAIN; + } + } + #else + return LOCERR_WRONGUSAGE; // completely wrong usage - should never happen as compatibility is tested at module connect + #endif } - #else - return LOCERR_WRONGUSAGE; // completely wrong usage - should never happen as compatibility is tested at module connect - #endif if (dberr==LOCERR_OK) { // check if ID has changed if (!updItemAndParentID.item.empty() && strcmp(updItemAndParentID.item.c_str(),aItem.getLocalID())!=0) { diff --git a/src/syncml_tk/src/sml/inc/smldtd.h b/src/syncml_tk/src/sml/inc/smldtd.h index a751ddc..2c2b27f 100755 --- a/src/syncml_tk/src/sml/inc/smldtd.h +++ b/src/syncml_tk/src/sml/inc/smldtd.h @@ -250,6 +250,15 @@ typedef struct sml_sync_hdr_s { /** + * Auxiliary data added by layer above SMLTK. + * Actual data will be in derived types that + * are not visible to SMLTK. + */ +typedef struct sml_item_aux_s { + void (*freeAux)(struct sml_item_aux_s *ptr); // Frees the aux instance. +} *SmlItemAuxPtr_t, SmlItemAux_t; + +/** * Data in SyncML is encapsulated in an "item" element. */ typedef struct sml_item_s { @@ -261,6 +270,8 @@ typedef struct sml_item_s { SmlPcdataPtr_t meta; // opt. SmlPcdataPtr_t data; // opt. Flag_t flags; // opt. for MoreData + + SmlItemAuxPtr_t aux; // for use by level above SMLTK } *SmlItemPtr_t, SmlItem_t; typedef struct sml_item_list_s { diff --git a/src/syncml_tk/src/sml/mgr/all/mgrutil.c b/src/syncml_tk/src/sml/mgr/all/mgrutil.c index 16b3a60..93633b0 100755 --- a/src/syncml_tk/src/sml/mgr/all/mgrutil.c +++ b/src/syncml_tk/src/sml/mgr/all/mgrutil.c @@ -816,6 +816,8 @@ SML_API void smlFreeItemPtr(SmlItemPtr_t pItem) if (! pItem) return; + if (pItem->aux) + pItem->aux->freeAux(pItem->aux); smlFreePcdata(pItem->meta); smlFreePcdata(pItem->data); diff --git a/src/sysync/customimplds.cpp b/src/sysync/customimplds.cpp index 988dd67..d0ee9d4 100755 --- a/src/sysync/customimplds.cpp +++ b/src/sysync/customimplds.cpp @@ -2731,7 +2731,24 @@ localstatus TCustomImplDS::implProcessMap(cAppCharP aRemoteID, cAppCharP aLocalI return sta; } // TCustomImplDS::implProcessMap +enum CustomItemOp +{ + CUSTOM_ITEM_ADD, + CUSTOM_ITEM_ADD_AUGMENTED, + CUSTOM_ITEM_UPDATE, + CUSTOM_ITEM_UPDATE_AUGMENTED, + CUSTOM_ITEM_DELETE +}; +struct TCustomItemAux : public TSyncItemAux +{ + string fLocalID; + string fRemoteID; // A copy of the original C string, to be on the safe side. + bool fRemoteIDSet; + TSyncOperation fSop; + bool fRemoteHasLatestData; + CustomItemOp fOp; +}; /// process item (according to operation: add/delete/replace - and for future: copy/move) /// @note data items will be sent only after StartWrite() @@ -2746,6 +2763,8 @@ bool TCustomImplDS::implProcessItem( } #endif // BASED_ON_BINFILE_CLIENT + // Same approach as in TLocalEngineDS::engProcessRemoteItem: + // backup local state and restore when called again. bool ok=true; localstatus sta=LOCERR_OK; string localID; @@ -2754,12 +2773,86 @@ bool TCustomImplDS::implProcessItem( TMapContainer::iterator mappos; TSyncOperation sop=sop_none; TMultiFieldItem *augmentedItemP = NULL; + bool remoteHasLatestData; + CustomItemOp op; TP_DEFIDX(li); TP_SWITCH(li,fSessionP->fTPInfo,TP_database); SYSYNC_TRY { // get casted item pointer TMultiFieldItem *myitemP = (TMultiFieldItem *)aItemP; + + TCustomItemAux *aux = static_cast<TCustomItemAux *>(myitemP->getAux(TSyncItem::CUSTOM_DS)); + DEBUGPRINTFX(DBG_DATA,( + "TCustomImplDS::implProcessItem %p %s, SyncOp=%s, RemoteID='%s', LocalID='%s'", + myitemP, + aux ? "resuming" : "starting", + SyncOpNames[myitemP->getSyncOp()], + myitemP->getRemoteID(), + myitemP->getLocalID() + )); + if (aux) { + // Resuming the function call: restore variables, jump to store + // method call. + localID = aux->fLocalID; + remoteID = aux->fRemoteIDSet ? aux->fRemoteID.c_str() : NULL; + sop = aux->fSop; + remoteHasLatestData = aux->fRemoteHasLatestData; + op = aux->fOp; + + // Stripped down logic from normal code path below. + // We can't save/restore mapppos because it points into + // a data structure which may change between calls, thus + // invalidating the old iterator. + if (IS_CLIENT) { + if (!localID.empty() && sop!=sop_add && sop!=sop_wants_add) + mappos=findMapByLocalID(localID.c_str(),mapentry_normal); + else + mappos=fMapTable.end(); + } + else { + mappos=findMapByRemoteID(remoteID); + if (mappos!=fMapTable.end()) { + localID = (*mappos).localid; + } + } + + aStatusCommand.setStatusCode(510); + switch (op) { + case CUSTOM_ITEM_ADD: goto do_add; + case CUSTOM_ITEM_ADD_AUGMENTED: goto do_add_augmented; + case CUSTOM_ITEM_UPDATE: goto do_update; + case CUSTOM_ITEM_UPDATE_AUGMENTED: goto do_update_augmented; + case CUSTOM_ITEM_DELETE: goto do_delete; + }; + } + if (false) { + // Prepare for resuming the function call. Will only be reached + // via goto with "op" set to something identifying the source of + // the jump. + again: +#define CHECK_FOR_AGAIN(_status, _op) \ + if (_status == LOCERR_AGAIN) { \ + op = _op; \ + goto again; \ + } + + if (!aux) { + aux = new TCustomItemAux; + myitemP->setAux(TSyncItem::CUSTOM_DS, aux); + } + + aux->fLocalID = localID; + aux->fRemoteID = remoteID ? remoteID : ""; + aux->fRemoteIDSet = remoteID != NULL; + aux->fSop = sop; + aux->fRemoteHasLatestData = remoteHasLatestData; + aux->fOp = op; + + aStatusCommand.setStatusCode(LOCERR_AGAIN); + goto error; + } + // - get op sop = myitemP->getSyncOp(); // - check IDs @@ -2790,7 +2883,7 @@ bool TCustomImplDS::implProcessItem( } // - now perform op aStatusCommand.setStatusCode(510); // default DB error - bool remoteHasLatestData = false; + remoteHasLatestData = false; switch (sop) { /// @todo sop_copy is now implemented by read/add sequence /// in localEngineDS, but will be moved here later possibly @@ -2809,7 +2902,9 @@ bool TCustomImplDS::implProcessItem( break; } // add item and retrieve new localID for it + do_add: sta = apiAddItem(*myitemP,localID); + CHECK_FOR_AGAIN(sta, CUSTOM_ITEM_ADD); myitemP->setLocalID(localID.c_str()); // possibly following operations need to be based on new localID returned by add // check for backend asking engine to do a merge if (sta==DB_Conflict) { @@ -2821,10 +2916,12 @@ bool TCustomImplDS::implProcessItem( sta = DB_Error; // no item found, DB error else { // store augmented version back to DB only if modified + do_add_augmented: if (changedDBVersion) sta = apiUpdateItem(*augmentedItemP); else sta = LOCERR_OK; + CHECK_FOR_AGAIN(sta, CUSTOM_ITEM_ADD_AUGMENTED); // in server case, further process like backend merge (but no need to fetch again, we just keep augmentedItemP) if (IS_SERVER && sta==LOCERR_OK) { // TLocalEngineDS::engProcessRemoteItemAsServer() in @@ -2941,7 +3038,9 @@ bool TCustomImplDS::implProcessItem( // - make sure item has local ID set myitemP->setLocalID(localID.c_str()); // update item + do_update: sta = apiUpdateItem(*myitemP); + CHECK_FOR_AGAIN(sta, CUSTOM_ITEM_UPDATE); if (sta==DB_Conflict) { // DB has detected item conflicts with data already stored in the database and // request merging current data from the backend with new data before storing. @@ -2951,10 +3050,12 @@ bool TCustomImplDS::implProcessItem( sta = DB_Error; // no item found, DB error else { // store augmented version back to DB only if modified + do_update_augmented: if (changedDBVersion) sta = apiUpdateItem(*augmentedItemP); else sta = LOCERR_OK; + CHECK_FOR_AGAIN(sta, CUSTOM_ITEM_UPDATE_AUGMENTED); delete augmentedItemP; // forget now } } @@ -2989,7 +3090,9 @@ bool TCustomImplDS::implProcessItem( // - make sure item has local ID set myitemP->setLocalID(localID.c_str()); // delete item + do_delete: sta = apiDeleteItem(*myitemP); + CHECK_FOR_AGAIN(sta, CUSTOM_ITEM_DELETE); if (sta!=LOCERR_OK) { // not found is reported as successful 211 status, because result is ok (item deleted, whatever reason) if (sta==404) diff --git a/src/sysync/localengineds.cpp b/src/sysync/localengineds.cpp index d70d8c3..1f36248 100644 --- a/src/sysync/localengineds.cpp +++ b/src/sysync/localengineds.cpp @@ -5058,6 +5058,26 @@ bool TLocalEngineDS::engProcessRemoteItem( return false; } // TLocalEngineDS::engProcessRemoteItem +class SyncOpItemAux : public SmlItemAux_t { + static void freeAuxImpl(SmlItemAuxPtr_t ptr); +public: + SyncOpItemAux(); + + TSyncItemType *remoteTypeP; + TSyncItemType *localTypeP; + TFmtTypes fmt; + TSyncItem *syncitemP; +}; + +SyncOpItemAux::SyncOpItemAux() +{ + freeAux = freeAuxImpl; +} + +void SyncOpItemAux::freeAuxImpl(SmlItemAuxPtr_t ptr) +{ + delete static_cast<SyncOpItemAux *>(ptr); +} // process SyncML SyncOp command for this datastore bool TLocalEngineDS::engProcessSyncOpItem( @@ -5073,22 +5093,59 @@ bool TLocalEngineDS::engProcessSyncOpItem( PDEBUGPRINTFX(DBG_ERROR,("engProcessSyncOpItem: Remote Datastore not known")); aStatusCommand.setStatusCode(500); } + + // We need the local and remote type plus format to + // process the item. + TSyncItemType *remoteTypeP; + TSyncItemType *localTypeP; + TFmtTypes fmt; + TSyncItem *syncitemP=NULL; + + string versstr; + + SyncOpItemAux *aux = aItemP->aux ? static_cast<SyncOpItemAux *>(aItemP->aux) : NULL; + if (aux) { + // Reuse the previously calculated + // values when being called again. + remoteTypeP = aux->remoteTypeP; + localTypeP = aux->remoteTypeP; + fmt = aux->fmt; + syncitemP = aux->syncitemP; + + goto process; + } + + if (false) { + again: + if (!aux) { + aux = new SyncOpItemAux; + } + aItemP->aux = aux; + aux->remoteTypeP = remoteTypeP; + aux->localTypeP = localTypeP; + aux->fmt = fmt; + aux->syncitemP = syncitemP; + return false; + } + + // - start with default - TSyncItemType *remoteTypeP=getRemoteSendType(); - TSyncItemType *localTypeP=getLocalReceiveType(); + remoteTypeP=getRemoteSendType(); + localTypeP=getLocalReceiveType(); // - see if command-wide meta plus item contents specify another type // (item meta, if present, overrides command wide meta) // see if item itself or command meta specify a type name or format - SmlMetInfMetInfPtr_t itemmetaP = smlPCDataToMetInfP(aItemP->meta); + SmlMetInfMetInfPtr_t itemmetaP; + itemmetaP = smlPCDataToMetInfP(aItemP->meta); // - format - TFmtTypes fmt=fmt_chr; + fmt=fmt_chr; if (itemmetaP && itemmetaP->format) smlPCDataToFormat(itemmetaP->format,fmt); // use type name from item's meta else if (aMetaP && aMetaP->format) smlPCDataToFormat(aMetaP->format,fmt); // use type name from command-wide meta // - type - string versstr; - const char *typestr = NULL; + const char *typestr; + typestr = NULL; if (itemmetaP && itemmetaP->type) typestr = smlPCDataToCharP(itemmetaP->type); // use type name from item's meta else if (aMetaP && aMetaP->type) @@ -5189,11 +5246,14 @@ bool TLocalEngineDS::engProcessSyncOpItem( } } - // now process + // Now process or resume. We cannot jump right into + // the block because of the try/catch, so some checks + // for resuming are necessary. + process: if (localTypeP && remoteTypeP) { - TSyncItem *syncitemP = NULL; // create the item (might have empty data in case of delete) - syncitemP=remoteTypeP->newSyncItem(aItemP,aSyncOp,fmt,localTypeP,this,aStatusCommand); + if (!syncitemP) + syncitemP=remoteTypeP->newSyncItem(aItemP,aSyncOp,fmt,localTypeP,this,aStatusCommand); if (!syncitemP) { // failed to create item return false; // irregular @@ -5210,9 +5270,12 @@ bool TLocalEngineDS::engProcessSyncOpItem( errctx.syncop = syncitemP->getSyncOp(); #endif SYSYNC_TRY { - // this call frees the item + // this call frees the item, unless it wants to be called again regular = engProcessRemoteItem(syncitemP,aStatusCommand); + if (aStatusCommand.getStatusCode() == LOCERR_AGAIN) { + goto again; + } syncitemP = NULL; PDEBUGENDBLOCK("Process_Item"); } @@ -5327,7 +5390,43 @@ localstatus TLocalEngineDS::engProcessMap(cAppCharP aRemoteID, cAppCharP aLocalI return logicProcessMap(aRemoteID, aLocalID); } // TLocalEngineDS::engProcessMap +enum LocalItemOp +{ + LOCAL_ITEM_DELETE, + LOCAL_ITEM_ADD_NORMAL, + LOCAL_ITEM_ADD_DELETED, + LOCAL_ITEM_ADD_DUPLICATE, + LOCAL_ITEM_REPLACE_MERGED, + LOCAL_ITEM_REPLACE_FROM_CLIENT, + LOCAL_ITEM_REPLACE, + LOCAL_ITEM_ADD_MERGED, + LOCAL_ITEM_REPLACE_MERGED2, + LOCAL_ITEM_ADD_SLOW +}; +struct TLocalSyncItemAux : public TSyncItemAux +{ + TSyncItem *fConflictingItemP; + TSyncItem *fEchoItemP; + TSyncItem *fDelItemP; + TSyncItem *fMatchingItemP; + bool fChangedIncoming; + bool fChangedExisting; + bool fRemainsVisible; + TSyncOperation fSyncOp; + uInt16 fItemTypeID; + string fRemoteID; + LocalItemOp fOp; + + TSyncOperation fCurrentSyncOp; + TSyncOperation fEchoItemOp; + TConflictResolution fItemConflictStrategy; + bool fForceConflict; + bool fDeleteWins; + bool fPreventAdd; + bool fIgnoreUpdate; + sInt16 fRejectStatus; +}; // process sync operation from client with specified sync item // (according to current sync mode of local datastore) @@ -5342,18 +5441,127 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( TStatusCommand &aStatusCommand // status, must be set to correct status code (ok / error) ) { - TSyncItem *conflictingItemP=NULL; - TSyncItem *echoItemP=NULL; - TSyncItem *delitemP=NULL; - bool changedincoming=false; - bool changedexisting=false; - bool remainsvisible=true; // usually, we want the item to remain visible in the sync set - TStatusCommand dummy(fSessionP); + // The logic of this function is pretty complex. Instead of trying + // to retrace our steps when called again after a LOCERR_AGAIN, let's + // jump to labels directly. For that to work, all local variables + // must be defined before the goto. + TSyncItem *conflictingItemP; + TSyncItem *echoItemP; + TSyncItem *delitemP; + TSyncItem *matchingItemP = NULL; + bool changedincoming; + bool changedexisting; + bool remainsvisible; + TSyncOperation syncop; + uInt16 itemtypeid; + string remoteid; + LocalItemOp op; + bool ok=false; + + TLocalSyncItemAux *aux = static_cast<TLocalSyncItemAux *>(aSyncItemP->getAux(TSyncItem::LOCAL_ENGINE)); + if (aux) { + // Resuming the function call: restore variables, jump to store + // method call. + // + // The compiler will tell us if we jump across a variable + // instantiation which initializes the variable ("jump bypasses + // variable initialization"). Variables which are not needed when + // resuming must be initialized with normal assignments to avoid + // this error. Variables which are needed, need to be moved to the + // section above and added to the store/restore. + conflictingItemP = aux->fConflictingItemP; + echoItemP = aux->fEchoItemP; + delitemP = aux->fDelItemP; + matchingItemP = aux->fMatchingItemP; + changedincoming = aux->fChangedIncoming; + changedexisting = aux->fChangedExisting; + remainsvisible = aux->fRemainsVisible; + syncop = aux->fSyncOp; + itemtypeid = aux->fItemTypeID; + remoteid = aux->fRemoteID; + op = aux->fOp; + + // Besides the local variables, we also need to set members + // to the value they had before leaving. See checkItem() below. + // Some of these will never be different from the default because + // if they got set in the first call, we don't queue and thus + // don't get here. But restore them anyway, just to be sure. + fCurrentSyncOp = aux->fCurrentSyncOp; + fEchoItemOp = aux->fEchoItemOp; + fItemConflictStrategy = aux->fItemConflictStrategy; + fForceConflict = aux->fForceConflict; + fDeleteWins = aux->fDeleteWins; + fPreventAdd = aux->fPreventAdd; + fIgnoreUpdate = aux->fIgnoreUpdate; + fRejectStatus = aux->fRejectStatus; + + PDEBUGPRINTFX(DBG_DATA,("%s item operation resumed",SyncOpNames[syncop])); + switch (op) { + case LOCAL_ITEM_DELETE: goto do_delete; + case LOCAL_ITEM_ADD_NORMAL: goto do_add_normal; + case LOCAL_ITEM_ADD_DELETED: goto do_add_deleted; + case LOCAL_ITEM_ADD_DUPLICATE: goto do_add_duplicate; + case LOCAL_ITEM_REPLACE_MERGED: goto do_replace_merged; + case LOCAL_ITEM_REPLACE_FROM_CLIENT: goto do_replace_from_client; + case LOCAL_ITEM_REPLACE: goto do_replace; + case LOCAL_ITEM_ADD_MERGED: goto do_add_merged; + case LOCAL_ITEM_REPLACE_MERGED2: goto do_replace_merged2; + case LOCAL_ITEM_ADD_SLOW: goto do_add_slow; + }; + } + if (false) { + // Prepare for resuming the function call. Will only be reached + // via goto with "op" set to something identifying the source of + // the jump. + again: +#define CHECK_FOR_AGAIN(_op) \ + if (aStatusCommand.getStatusCode() == LOCERR_AGAIN) { \ + op = _op; \ + goto again; \ + } + + if (!aux) { + aux = new TLocalSyncItemAux; + aSyncItemP->setAux(TSyncItem::LOCAL_ENGINE, aux); + } + + aux->fConflictingItemP = conflictingItemP; + aux->fEchoItemP = echoItemP; + aux->fDelItemP = delitemP; + aux->fMatchingItemP = matchingItemP; + aux->fChangedIncoming = changedincoming; + aux->fChangedExisting = changedexisting; + aux->fRemainsVisible = remainsvisible; + aux->fSyncOp = syncop; + aux->fItemTypeID = itemtypeid; + aux->fRemoteID = remoteid; + aux->fOp = op; + + aux->fCurrentSyncOp = fCurrentSyncOp; + aux->fEchoItemOp = fEchoItemOp; + aux->fItemConflictStrategy = fItemConflictStrategy; + aux->fForceConflict = fForceConflict; + aux->fDeleteWins = fDeleteWins; + aux->fPreventAdd = fPreventAdd; + aux->fIgnoreUpdate = fIgnoreUpdate; + aux->fRejectStatus = fRejectStatus; + + aStatusCommand.setStatusCode(LOCERR_AGAIN); + return false; + } + + // Normal control flow: initialize variables, then execute. + conflictingItemP=NULL; + echoItemP=NULL; + delitemP=NULL; + changedincoming=false; + changedexisting=false; + remainsvisible=true; // usually, we want the item to remain visible in the sync set // get some info out of item (we might need it after item is already consumed) - TSyncOperation syncop=aSyncItemP->getSyncOp(); - uInt16 itemtypeid=aSyncItemP->getTypeID(); - string remoteid=aSyncItemP->getRemoteID(); + syncop=aSyncItemP->getSyncOp(); + itemtypeid=aSyncItemP->getTypeID(); + remoteid=aSyncItemP->getRemoteID(); // check if datastore is aborted if(CheckAborted(aStatusCommand)) return false; @@ -5456,7 +5664,6 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( return true; } // now perform requested operation - bool ok=false; localstatus sta; switch (syncop) { readonly_delete: @@ -5566,7 +5773,9 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( // really delete fLocalItemsDeleted++; remainsvisible=false; // deleted not visible any more + do_delete: ok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible); // delete in local database NOW + CHECK_FOR_AGAIN(LOCAL_ITEM_DELETE); break; case sop_copy: if (fReadOnly) { @@ -5605,8 +5814,10 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( // Note: making item to pass sync set filter is implemented in derived DB implementation // as criteria for passing might be in data that must first be read from the DB #endif + do_add_normal: remainsvisible=true; // should remain visible ok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible); // add to local database NOW + CHECK_FOR_AGAIN(LOCAL_ITEM_ADD_NORMAL); if (!remainsvisible && fSessionP->getSyncMLVersion()>=syncml_vers_1_2) { PDEBUGPRINTFX(DBG_DATA+DBG_HOT,("Added item is not visible under current filters -> remove it on client")); goto removefromremoteandsyncset; @@ -5633,14 +5844,16 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( if (!conflictingItemP && fForceConflict) { conflictingItemP=SendDBVersionOfItemAsServer(aSyncItemP); } - bool deleteconflict=false; + bool deleteconflict; + deleteconflict=false; if (conflictingItemP) { // Note: if there is a conflict, this replace cannot be an // implicit add, so we don't need to check for fPreventAdd // here. // Note: if we are in ignoreUpdate mode, the only conflict resolution // possible is unconditional server win - sInt16 cmpRes = SYSYNC_NOT_COMPARABLE; + sInt16 cmpRes; + cmpRes = SYSYNC_NOT_COMPARABLE; // assume we can resolve the conflict aStatusCommand.setStatusCode(419); // default to server win ADDDEBUGITEM(aStatusCommand,"Conflict resolved by server"); @@ -5681,8 +5894,10 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( PDEBUGPRINTFX(DBG_PROTO+DBG_HOT,("Conflict of Client Replace with Server delete -> try to update already deleted item (as it might still exist in syncset)")); // apply replace (and in case of !fDeleteWins, possible implicit add) fPreventAdd=fDeleteWins; // we want implicit add only if delete cannot win + do_add_deleted: remainsvisible=!fDeleteWins; // we want to see the item in the sync set if delete does not win! ok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible); + CHECK_FOR_AGAIN(LOCAL_ITEM_ADD_DELETED); } if (fDeleteWins) { if (!ok) { @@ -5815,8 +6030,10 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( // - add client item as new item to server DB fLocalItemsAdded++; aSyncItemP->setSyncOp(sop_add); // set correct op + do_add_duplicate: remainsvisible=true; // should remain visible ok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible); // add to local database NOW + CHECK_FOR_AGAIN(LOCAL_ITEM_ADD_DUPLICATE); break; } else if (crstrategy==cr_server_wins) { @@ -5843,8 +6060,10 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( // process update in local database fLocalItemsUpdated++; aSyncItemP->setSyncOp(sop_replace); // update + do_replace_merged: remainsvisible=true; // should remain visible ok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible); // update in local database NOW + CHECK_FOR_AGAIN(LOCAL_ITEM_REPLACE_MERGED); break; } else { @@ -5885,8 +6104,10 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( fLocalItemsUpdated++; aSyncItemP->setSyncOp(sop_replace); } + do_replace_from_client: remainsvisible=true; // should remain visible ok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible); // replace in local database NOW + CHECK_FOR_AGAIN(LOCAL_ITEM_REPLACE_FROM_CLIENT); break; } } // replace conflict @@ -5903,13 +6124,15 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( PDEBUGPRINTFX(DBG_DATA+DBG_CONFLICT,("No Conflict: client item replaces server item")); // - replace item in server (or add if item does not exist and not fPreventAdd) aSyncItemP->setSyncOp(sop_replace); + do_replace: remainsvisible=true; // should remain visible - if (!logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible)) { + ok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible); + CHECK_FOR_AGAIN(LOCAL_ITEM_REPLACE); + if (!ok) { // check if this is a 404 or 410 and fPreventAdd if (fPreventAdd && (aStatusCommand.getStatusCode()==404 || aStatusCommand.getStatusCode()==410)) goto preventadd2; // to-be-replaced item not found and implicit add prevented -> delete from remote // simply failed - ok=false; break; } // still visible in sync set? @@ -5933,7 +6156,7 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( // prevent that in case of multiple (loosely compared) matches we // catch the wrong item and cause a mess at slowsync // NOTE: we do compare only relevant fields (eqm_conflict) - TSyncItem *matchingItemP = getMatchingItem(aSyncItemP,eqm_conflict); + matchingItemP = getMatchingItem(aSyncItemP,eqm_conflict); // separate assignment, only done as part of normal control flow if (!matchingItemP) { // try again with less strict comparison (eqm_slowsync or eqm_always for firsttimesync) DEBUGPRINTFX(DBG_DATA+DBG_MATCH,("Strict search for matching item failed, try with configured EqMode now")); @@ -5947,21 +6170,26 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( )); fSlowSyncMatches++; aStatusCommand.setStatusCode(syncop==sop_add ? 201 : 200); // default is: simply ok. But if original op was Add, MUST return 201 status (SCTS requires it) - bool matchingok=false; + bool matchingok; + matchingok = false; // - do not update map yet, as we still don't know if client item will // possibly be added instead of mapped // Note: ONLY in case this is a reference-only item, the map is already updated! - bool mapupdated = syncop==sop_reference_only; + bool mapupdated; + mapupdated = syncop==sop_reference_only; // - determine which one is winning - bool needserverupdate=false; - bool needclientupdate=false; + bool needserverupdate; + bool needclientupdate; + needserverupdate = false; + needclientupdate = false; // if updates are ignored, we can short-cut here // Note: if this is a reference-only item, it was already updated (if needed) before last suspend // so skip updating now! if (syncop!=sop_reference_only && !fIgnoreUpdate) { // Not a reference-only and also updates not suppressed // - for a read-only datastore, this defaults to server always winning - TConflictResolution crstrategy = + TConflictResolution crstrategy; + crstrategy = fReadOnly ? cr_server_wins : // server always wins for read-only fItemConflictStrategy; // pre-set strategy for this item @@ -6025,7 +6253,6 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( #endif )!=0 ) { - string guid; // items are not really equal in content, so duplicate them on both sides PDEBUGPRINTFX(DBG_PROTO,("Matching items are not fully equal, duplicate them on both sides")); fConflictsDuplicated++; @@ -6035,8 +6262,11 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( fLocalItemsAdded++; aSyncItemP->setSyncOp(sop_add); aStatusCommand.setStatusCode(201); // item added (if no error occurs) + do_add_merged: + string guid; remainsvisible=true; // should remain visible matchingok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible,&guid); // add item in local database NOW + CHECK_FOR_AGAIN(LOCAL_ITEM_ADD_MERGED); aSyncItemP=NULL; // is already deleted! if (matchingok) { // do it only if server add successful, because otherwise we don't have a GUID // - make sure same item is ADDED as new item to client @@ -6129,8 +6359,10 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( // - update server side (NOTE: processItemAsServer takes ownership, pointer gets invalid!) fLocalItemsUpdated++; aSyncItemP->setSyncOp(sop_replace); + do_replace_merged2: remainsvisible=true; // should remain visible matchingok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible); // replace item in local database NOW + CHECK_FOR_AGAIN(LOCAL_ITEM_REPLACE_MERGED2); PDEBUGPRINTFX(DBG_DATA+DBG_HOT,("Updated server item")); } else { @@ -6159,8 +6391,10 @@ bool TLocalEngineDS::engProcessRemoteItemAsServer( if (fPreventAdd) goto preventadd; fLocalItemsAdded++; aSyncItemP->setSyncOp(sop_add); // set correct op + do_add_slow: remainsvisible=true; // should remain visible ok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible); // add to local database NOW + CHECK_FOR_AGAIN(LOCAL_ITEM_ADD_SLOW); break; } } // slow sync @@ -6265,8 +6499,71 @@ bool TLocalEngineDS::engProcessRemoteItemAsClient( // but must be initialized correctly for client as well as descendants might test them fPreventAdd = false; fIgnoreUpdate = false; + + TSyncOperation syncop; + LocalItemOp op; + + TLocalSyncItemAux *aux = static_cast<TLocalSyncItemAux *>(aSyncItemP->getAux(TSyncItem::LOCAL_ENGINE)); + if (aux) { + // Resuming the function call: restore variables, jump to store + // method call. + remainsvisible = aux->fRemainsVisible; + syncop = aux->fSyncOp; + remoteid = aux->fRemoteID; + op = aux->fOp; + + fCurrentSyncOp = aux->fCurrentSyncOp; + fEchoItemOp = aux->fEchoItemOp; + fItemConflictStrategy = aux->fItemConflictStrategy; + fForceConflict = aux->fForceConflict; + fDeleteWins = aux->fDeleteWins; + fRejectStatus = aux->fRejectStatus; + + PDEBUGPRINTFX(DBG_DATA,("%s item operation resumed",SyncOpNames[syncop])); + switch (op) { + case LOCAL_ITEM_DELETE: goto do_delete; + case LOCAL_ITEM_ADD_NORMAL: goto do_add; + case LOCAL_ITEM_REPLACE: goto do_replace; + + // Not used in client: + case LOCAL_ITEM_ADD_DELETED: + case LOCAL_ITEM_ADD_DUPLICATE: + case LOCAL_ITEM_REPLACE_MERGED: + case LOCAL_ITEM_REPLACE_FROM_CLIENT: + case LOCAL_ITEM_ADD_MERGED: + case LOCAL_ITEM_REPLACE_MERGED2: + case LOCAL_ITEM_ADD_SLOW: + break; + }; + } + if (false) { + // Prepare for resuming the function call. Will only be reached + // via goto with "op" set to something identifying the source of + // the jump. + again: + if (!aux) { + aux = new TLocalSyncItemAux; + aSyncItemP->setAux(TSyncItem::LOCAL_ENGINE, aux); + } + + aux->fRemainsVisible = remainsvisible; + aux->fSyncOp = syncop; + aux->fRemoteID = remoteid; + aux->fOp = op; + + aux->fCurrentSyncOp = fCurrentSyncOp; + aux->fEchoItemOp = fEchoItemOp; + aux->fItemConflictStrategy = fItemConflictStrategy; + aux->fForceConflict = fForceConflict; + aux->fDeleteWins = fDeleteWins; + aux->fRejectStatus = fRejectStatus; + + aStatusCommand.setStatusCode(LOCERR_AGAIN); + return false; + } + // get operation out of item - TSyncOperation syncop=aSyncItemP->getSyncOp(); + syncop=aSyncItemP->getSyncOp(); // show DEBUGPRINTFX(DBG_DATA,("%s item operation received",SyncOpNames[syncop])); // check if receiving commands is allowed at all @@ -6339,8 +6636,10 @@ bool TLocalEngineDS::engProcessRemoteItemAsClient( case sop_delete: // delete item fLocalItemsDeleted++; + do_delete: remainsvisible=false; // deleted not visible any more ok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible); // delete in local database NOW + CHECK_FOR_AGAIN(LOCAL_ITEM_DELETE); break; case sop_copy: // %%% note: this would belong into specific datastore implementation, but is here @@ -6374,8 +6673,10 @@ bool TLocalEngineDS::engProcessRemoteItemAsClient( break; } #endif + do_add: remainsvisible=true; // should remain visible ok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible,&localid); // add to local database NOW, get back local GUID + CHECK_FOR_AGAIN(LOCAL_ITEM_ADD_NORMAL); if (!ok) break; // if added (not replaced), we need to send map if (aStatusCommand.getStatusCode()==201) { @@ -6402,8 +6703,10 @@ bool TLocalEngineDS::engProcessRemoteItemAsClient( // - get remoteid BEFORE processing item (as logicProcessRemoteItem consumes the item!!), // in case replace is converted to add and we need to register a map entry. remoteid=aSyncItemP->getRemoteID(); // get remote ID + do_replace: remainsvisible=true; // should remain visible ok=logicProcessRemoteItem(aSyncItemP,aStatusCommand,remainsvisible,&localid); // replace in local database NOW + CHECK_FOR_AGAIN(LOCAL_ITEM_REPLACE); // if added (not replaced), we need to send map if (aStatusCommand.getStatusCode()==201) { // Note: logicProcessRemoteItem should NOT do an add if we have no remoteid, but return 404. diff --git a/src/sysync/stdlogicds.cpp b/src/sysync/stdlogicds.cpp index eaa527d..afa9077 100644 --- a/src/sysync/stdlogicds.cpp +++ b/src/sysync/stdlogicds.cpp @@ -1175,9 +1175,21 @@ TMultiFieldItem *TStdLogicDS::mergeWithDatabaseVersion(TSyncItem *aSyncItemP, bo return dbVersionItemP; } // TStdLogicDS::mergeWithDatabaseVersion +enum LogicItemOp +{ + LOGIC_ITEM_WRITE, + LOGIC_ITEM_REPLACE_FALLBACK, + LOGIC_ITEM_ADD_FALLBACK +}; - - +struct TLogicSyncItemAux : public TSyncItemAux +{ + bool fIrregular; + bool fShouldBeVisible; + bool fVisibleInSyncset; + TSyncItem *fSyncItemP; + LogicItemOp fOp; +}; // called to process incoming item operation // Method takes ownership of syncitemP in all cases @@ -1191,6 +1203,10 @@ bool TStdLogicDS::logicProcessRemoteItem( bool irregular=false; bool shouldbevisible=aVisibleInSyncset; string datatext; + LogicItemOp op; + // Must be kept alive for the duration of the entire call, + // in case that we need to resume. + TSyncItem *origSyncItemP=syncitemP; TP_DEFIDX(li); TP_SWITCH(li,fSessionP->fTPInfo,TP_database); @@ -1203,13 +1219,56 @@ bool TStdLogicDS::logicProcessRemoteItem( aStatusCommand.setStatusCode(sta); } else { + TLogicSyncItemAux *aux = static_cast<TLogicSyncItemAux *>(origSyncItemP->getAux(TSyncItem::STD_LOGIC_DS)); + // show DEBUGPRINTFX(DBG_DATA,( - "TStdLogicDS::logicProcessRemoteItem starting, SyncOp=%s, RemoteID='%s', LocalID='%s'", - SyncOpNames[syncitemP->getSyncOp()], - syncitemP->getRemoteID(), - syncitemP->getLocalID() + "TStdLogicDS::logicProcessRemoteItem %p %s, SyncOp=%s, RemoteID='%s', LocalID='%s'", + origSyncItemP, + aux ? "resuming" : "starting", + SyncOpNames[origSyncItemP->getSyncOp()], + origSyncItemP->getRemoteID(), + origSyncItemP->getLocalID() )); + + // Same approach as in TLocalEngineDS::engProcessRemoteItemAsServer(): + // jump directly to the point where we left the last time we where + // called for this item. + if (aux) { + irregular = aux->fIrregular; + shouldbevisible = aux->fShouldBeVisible; + aVisibleInSyncset = aux->fVisibleInSyncset; + syncitemP = aux->fSyncItemP; + op = aux->fOp; + + switch (op) { + case LOGIC_ITEM_WRITE: goto do_write; + case LOGIC_ITEM_REPLACE_FALLBACK: goto do_replace_fallback; + case LOGIC_ITEM_ADD_FALLBACK: goto do_add_fallback; + } + } + + if (false) { + again: +#define CHECK_FOR_AGAIN(_op) \ + if (aStatusCommand.getStatusCode() == LOCERR_AGAIN) { \ + op = _op; \ + goto again; \ + } + + if (!aux) { + aux = new TLogicSyncItemAux; + origSyncItemP->setAux(TSyncItem::STD_LOGIC_DS, aux); + } + aux->fIrregular = irregular; + aux->fShouldBeVisible = shouldbevisible; + aux->fVisibleInSyncset = aVisibleInSyncset; + aux->fSyncItemP = syncitemP; + aux->fOp = op; + aStatusCommand.setStatusCode(LOCERR_AGAIN); + return false; + } + // now perform action if (syncitemP->getSyncOp()==sop_replace || syncitemP->getSyncOp()==sop_wants_replace) { // check if we should read before writing @@ -1312,8 +1371,6 @@ bool TStdLogicDS::logicProcessRemoteItem( } #endif #endif - // - get rid of original - delete syncitemP; // - use new item for further processing syncitemP=refitemP; } @@ -1373,8 +1430,10 @@ bool TStdLogicDS::logicProcessRemoteItem( } #endif // Now let derived class process the item + do_write: if (!implProcessItem(syncitemP,aStatusCommand)) sta = aStatusCommand.getStatusCode(); // not successful, get error status code + CHECK_FOR_AGAIN(LOGIC_ITEM_WRITE); } // perform special case handling if (sta!=LOCERR_OK) { @@ -1400,10 +1459,12 @@ bool TStdLogicDS::logicProcessRemoteItem( syncitemP->setSyncOp(sop_replace); irregular=true; // - process again + do_replace_fallback: if (implProcessItem(syncitemP,aStatusCommand)) { aStatusCommand.setStatusCode(208); // client has won } else { + CHECK_FOR_AGAIN(LOGIC_ITEM_REPLACE_FALLBACK); // failed, return status sta = aStatusCommand.getStatusCode(); } @@ -1439,8 +1500,10 @@ bool TStdLogicDS::logicProcessRemoteItem( // - process again (note that we are re-using the status command that might // already have a text item with an OS errir if something failed before) sta=LOCERR_OK; // forget previous status + do_add_fallback: if (!implProcessItem(syncitemP,aStatusCommand)) sta=aStatusCommand.getStatusCode(); // not successful, get error status code + CHECK_FOR_AGAIN(LOGIC_ITEM_ADD_FALLBACK); } } } @@ -1478,6 +1541,7 @@ bool TStdLogicDS::logicProcessRemoteItem( (*aGUID)=syncitemP->getLocalID(); } } + /* LOCERR_AGAIN */ else { PDEBUGPRINTFX(DBG_ERROR,( "- Operation %s failed with SyncML status=%hd", @@ -1488,6 +1552,8 @@ bool TStdLogicDS::logicProcessRemoteItem( } // if startDataWrite ok // anyway, we are done with this item, delete it now delete syncitemP; + if (syncitemP != origSyncItemP) + delete origSyncItemP; TP_START(fSessionP->fTPInfo,li); // done, return regular/irregular status return (sta==LOCERR_OK) && !irregular; @@ -1495,6 +1561,8 @@ bool TStdLogicDS::logicProcessRemoteItem( SYSYNC_CATCH (...) // delete the item if (syncitemP) delete syncitemP; + if (syncitemP != origSyncItemP) + delete origSyncItemP; TP_START(fSessionP->fTPInfo,li); // re-throw SYSYNC_RETHROW; diff --git a/src/sysync/syncitem.cpp b/src/sysync/syncitem.cpp index 14a93f1..b41f5eb 100755 --- a/src/sysync/syncitem.cpp +++ b/src/sysync/syncitem.cpp @@ -65,9 +65,23 @@ TSyncItem::TSyncItem(TSyncItemType *aItemType) TSyncItem::~TSyncItem() { - // NOP + for (SyncItemAux_t::iterator it = fSyncItemAux.begin(); + it != fSyncItemAux.end(); + ++it) { + delete it->second; + } } // TSyncItem::~TSyncItem +TSyncItemAux *TSyncItem::getAux(EngineLevel level) const +{ + SyncItemAux_t::const_iterator it = fSyncItemAux.find(level); + return it == fSyncItemAux.end() ? NULL : it->second; +} + +void TSyncItem::setAux(EngineLevel level, TSyncItemAux *aux) +{ + fSyncItemAux[level] = aux; +} // assignment (IDs and contents) TSyncItem& TSyncItem::operator=(TSyncItem &aSyncItem) diff --git a/src/sysync/syncitem.h b/src/sysync/syncitem.h index 6d7dfbd..1d53b3b 100755 --- a/src/sysync/syncitem.h +++ b/src/sysync/syncitem.h @@ -61,6 +61,12 @@ extern const char * const comparisonModeNames[numEQmodes]; class TLocalEngineDS; class TSyncAppBase; +class TSyncItemAux +{ + public: + virtual ~TSyncItemAux() {} +}; + class TSyncItem : noncopyable { @@ -161,6 +167,17 @@ public: #endif // - get session owner (dispatcher/clientbase) TSyncAppBase *getSyncAppBase(void); + + // Opaque auxiliary data, for use in the different levels that the item passes through. + enum EngineLevel + { + LOCAL_ENGINE, + STD_LOGIC_DS, + CUSTOM_DS, + PLUGIN_API + }; + TSyncItemAux *getAux(EngineLevel level) const; + void setAux(EngineLevel level, TSyncItemAux *aux); protected: // operation to be performed with this item at its destination TSyncOperation fSyncOp; @@ -168,6 +185,8 @@ protected: private: // cast pointer to same type, returns NULL if incompatible TSyncItem *castToSameTypeP(TSyncItem *aItemP) { return aItemP; } // all are compatible TSyncItem + typedef std::map<EngineLevel, TSyncItemAux *> SyncItemAux_t; + SyncItemAux_t fSyncItemAux; }; // TSyncItem } // namespace sysync diff --git a/src/sysync/syncsession.cpp b/src/sysync/syncsession.cpp index 6f4ea16..e6ec1e0 100644 --- a/src/sysync/syncsession.cpp +++ b/src/sysync/syncsession.cpp @@ -2520,7 +2520,13 @@ Ret_t TSyncSession::process(TSmlCommand *aSyncCommandP) delete aSyncCommandP; } else if ( - fStrictExecOrdering && + // Be extra careful: even if fStrictExecOrdering=false, still delay + // further commands if they would overtake non-trivial commands in the + // delay queue. This solves a particular problem where the Sync command + // gets delayed due to slow TStdLogicDS::startDataAccessForServer(). + // The server then sees a second (fake?!) Sync command (incoming MsgID=2, CmdID=0) + // which it must delay instead of executing it. + (fStrictExecOrdering || !onlyItemChangesPending()) && fDelayedExecutionCommands.size()>0 && aSyncCommandP->getCmdType()!=scmd_status && aSyncCommandP->getCmdType()!=scmd_alert @@ -2730,6 +2736,27 @@ Ret_t TSyncSession::processHeader(TSyncHeader *aSyncHdrP) return SML_ERR_OK; } // TSyncSession::processHeader +bool TSyncSession::onlyItemChangesPending() +{ + TSmlCommandPContainer::iterator pos=fDelayedExecutionCommands.begin(); + while (pos!=fDelayedExecutionCommands.end()) { + TSmlCommand *cmdP = *pos; + switch (cmdP->getCmdType()) { + // A white-list of commands perform simple data changes. + // These commands can be overtaken by the execution of + // other commands, see TSyncSession::process(). + case scmd_add: + case scmd_delete: + case scmd_replace: + case scmd_copy: + break; + default: + return false; + } + ++pos; + } + return true; +} bool TSyncSession::tryDelayedExecutionCommands() { @@ -3856,10 +3883,16 @@ bool TSyncSession::processSyncOpItem( )); // now let datastore handle it bool regular = fLocalSyncDatastoreP->engProcessSyncOpItem(aSyncOp, aItemP, aMetaP, aStatusCommand); + TSyError statuscode = aStatusCommand.getStatusCode(); + if (statuscode == LOCERR_AGAIN) { + PDEBUGPRINTFX(DBG_DATA,("Queueing %s-operation for later.", SyncOpNames[aSyncOp])); + aQueueForLater=true; // re-execute later... + return true; // ...but otherwise ok + } #ifdef SCRIPT_SUPPORT - // let script check status code + // let script check status code if the operation completed TErrorFuncContext errctx; - errctx.statuscode = aStatusCommand.getStatusCode(); + errctx.statuscode = statuscode; errctx.newstatuscode = errctx.statuscode; errctx.syncop = aSyncOp; errctx.datastoreP = fLocalSyncDatastoreP; @@ -5385,6 +5418,27 @@ Ret_t TSyncSession::EndMessage(Boolean_t final) // Now dump XML translation of incoming message XMLTranslationIncomingEnd(); #endif + // Flush pending item change commands? + // + // Don't retry other commands here (like a pending Sync), because + // the whole purpose of delaying Sync is to give a preliminary + // answer to the peer before finishing the command. + // + // Delaying them although we are expected to finish (final set!) + // would have two drawbacks: + // - Requires another message roundtrip. + // - More complex state transitions which is known to not work: + // Synthesis<->Synthesis sync did not complete correctly when the + // server forced the client to send another message, because client + // and server did not agree on the end of the session. (see + // "[os-libsynthesis] temporary local ID + FinalizeLocalID"). + if (final && onlyItemChangesPending() && !fDelayedExecutionCommands.empty()) { + // TODO: tell stores explicitly that we really need the results now + // instead of relying on the indirect semantic of "second call must + // succeed". + tryDelayedExecutionCommands(); + } + // End of incoming message PDEBUGPRINTFX(DBG_HOT,( "=================> Finished processing incoming message #%ld (%sfinal), request=%ld", diff --git a/src/sysync/syncsession.h b/src/sysync/syncsession.h index 3a652c6..b110712 100755 --- a/src/sysync/syncsession.h +++ b/src/sysync/syncsession.h @@ -524,6 +524,7 @@ public: bool sessionMustContinue(void); virtual void essentialStatusReceived(void) { /* NOP here */ }; void delayExecUntilNextRequest(TSmlCommand *aCommand); + bool onlyItemChangesPending(); bool tryDelayedExecutionCommands(); // returns syncEndAfterSyncPackageEnd bool delayedSyncEndsPending(void) { return fDelayedExecSyncEnds>0; }; // - continue interrupted or prevented issue in next package diff --git a/src/sysync_SDK/Sources/syerror.h b/src/sysync_SDK/Sources/syerror.h index 84c68b6..772b39c 100644 --- a/src/sysync_SDK/Sources/syerror.h +++ b/src/sysync_SDK/Sources/syerror.h @@ -36,6 +36,14 @@ enum TSyErrorEnum { /** ok */ LOCERR_OK = 0, + /** + * May be returned by a store when an operation was started without + * completing it yet. Engine will call the same operation again + * later. When called a second time, the store must finish the started + * operation and return the final result. + */ + LOCERR_AGAIN = 1, + /** no content / end of file / end of iteration / empty/NULL value */ DB_NoContent = 204, diff --git a/src/sysync_SDK/Sources/sysync_utils.cpp b/src/sysync_SDK/Sources/sysync_utils.cpp index 980d453..6cf9c2c 100755 --- a/src/sysync_SDK/Sources/sysync_utils.cpp +++ b/src/sysync_SDK/Sources/sysync_utils.cpp @@ -2822,6 +2822,8 @@ SmlItemPtr_t newItem(void) // SyncML 1.2 itemP->targetParent=NULL; itemP->sourceParent=NULL; + // custom data of client + itemP->aux=NULL; return itemP; } // newItem |