2015-11-16 16:00:22 +00:00
/*
* * 2011 - 08 - 18
* *
* * The author disclaims copyright to this source code . In place of
* * a legal notice , here is a blessing :
* *
* * May you do good and not evil .
* * May you find forgiveness for yourself and forgive others .
* * May you share freely , never taking more than you give .
* *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
* *
* * The main interface to the LSM module .
*/
# include "lsmInt.h"
# ifdef LSM_DEBUG
/*
* * This function returns a copy of its only argument .
* *
* * When the library is built with LSM_DEBUG defined , this function is called
* * whenever an error code is generated ( not propagated - generated ) . So
* * if the library is mysteriously returning ( say ) LSM_IOERR , a breakpoint
* * may be set in this function to determine why .
*/
int lsmErrorBkpt ( int rc ) {
/* Set breakpoint here! */
return rc ;
}
/*
* * This function contains various assert ( ) statements that test that the
* * lsm_db structure passed as an argument is internally consistent .
*/
static void assert_db_state ( lsm_db * pDb ) {
/* If there is at least one cursor or a write transaction open, the database
* * handle must be holding a pointer to a client snapshot . And the reverse
* * - if there are no open cursors and no write transactions then there must
* * not be a client snapshot . */
assert ( ( pDb - > pCsr ! = 0 | | pDb - > nTransOpen > 0 ) = = ( pDb - > iReader > = 0 | | pDb - > bRoTrans ) ) ;
assert ( ( pDb - > iReader < 0 & & pDb - > bRoTrans = = 0 ) | | pDb - > pClient ! = 0 ) ;
assert ( pDb - > nTransOpen > = 0 ) ;
}
# else
# define assert_db_state(x)
# endif
/*
* * The default key - compare function .
*/
static int xCmp ( void * p1 , int n1 , void * p2 , int n2 ) {
int res ;
res = memcmp ( p1 , p2 , LSM_MIN ( n1 , n2 ) ) ;
if ( res = = 0 ) res = ( n1 - n2 ) ;
return res ;
}
static void xLog ( void * pCtx , int rc , const char * z ) {
( void ) ( rc ) ;
( void ) ( pCtx ) ;
fprintf ( stderr , " %s \n " , z ) ;
fflush ( stderr ) ;
}
/*
* * Allocate a new db handle .
*/
int lsm_new ( lsm_env * pEnv , lsm_db * * ppDb ) {
lsm_db * pDb ;
/* If the user did not provide an environment, use the default. */
if ( pEnv = = 0 ) pEnv = lsm_default_env ( ) ;
assert ( pEnv ) ;
/* Allocate the new database handle */
* ppDb = pDb = ( lsm_db * ) lsmMallocZero ( pEnv , sizeof ( lsm_db ) ) ;
if ( pDb = = 0 ) return LSM_NOMEM_BKPT ;
/* Initialize the new object */
pDb - > pEnv = pEnv ;
pDb - > nTreeLimit = LSM_DFLT_AUTOFLUSH ;
pDb - > nAutockpt = LSM_DFLT_AUTOCHECKPOINT ;
pDb - > bAutowork = LSM_DFLT_AUTOWORK ;
pDb - > eSafety = LSM_DFLT_SAFETY ;
pDb - > xCmp = xCmp ;
pDb - > nDfltPgsz = LSM_DFLT_PAGE_SIZE ;
pDb - > nDfltBlksz = LSM_DFLT_BLOCK_SIZE ;
pDb - > nMerge = LSM_DFLT_AUTOMERGE ;
pDb - > nMaxFreelist = LSM_MAX_FREELIST_ENTRIES ;
pDb - > bUseLog = LSM_DFLT_USE_LOG ;
pDb - > iReader = - 1 ;
pDb - > iRwclient = - 1 ;
pDb - > bMultiProc = LSM_DFLT_MULTIPLE_PROCESSES ;
pDb - > iMmap = LSM_DFLT_MMAP ;
pDb - > xLog = xLog ;
pDb - > compress . iId = LSM_COMPRESSION_NONE ;
return LSM_OK ;
}
lsm_env * lsm_get_env ( lsm_db * pDb ) {
assert ( pDb - > pEnv ) ;
return pDb - > pEnv ;
}
/*
* * If database handle pDb is currently holding a client snapshot , but does
* * not have any open cursors or write transactions , release it .
*/
static void dbReleaseClientSnapshot ( lsm_db * pDb ) {
if ( pDb - > nTransOpen = = 0 & & pDb - > pCsr = = 0 ) {
lsmFinishReadTrans ( pDb ) ;
}
}
static int getFullpathname (
lsm_env * pEnv ,
const char * zRel ,
char * * pzAbs
) {
int nAlloc = 0 ;
char * zAlloc = 0 ;
int nReq = 0 ;
int rc ;
do {
nAlloc = nReq ;
rc = pEnv - > xFullpath ( pEnv , zRel , zAlloc , & nReq ) ;
if ( nReq > nAlloc ) {
zAlloc = lsmReallocOrFreeRc ( pEnv , zAlloc , nReq , & rc ) ;
}
} while ( nReq > nAlloc & & rc = = LSM_OK ) ;
if ( rc ! = LSM_OK ) {
lsmFree ( pEnv , zAlloc ) ;
zAlloc = 0 ;
}
* pzAbs = zAlloc ;
return rc ;
}
/*
* * Check that the bits in the db - > mLock mask are consistent with the
* * value stored in db - > iRwclient . An assert shall fail otherwise .
*/
static void assertRwclientLockValue ( lsm_db * db ) {
# ifndef NDEBUG
u64 msk ; /* Mask of mLock bits for RWCLIENT locks */
u64 rwclient = 0 ; /* Bit corresponding to db->iRwclient */
if ( db - > iRwclient > = 0 ) {
rwclient = ( ( u64 ) 1 < < ( LSM_LOCK_RWCLIENT ( db - > iRwclient ) - 1 ) ) ;
}
msk = ( ( u64 ) 1 < < ( LSM_LOCK_RWCLIENT ( LSM_LOCK_NRWCLIENT ) - 1 ) ) - 1 ;
msk - = ( ( ( u64 ) 1 < < ( LSM_LOCK_RWCLIENT ( 0 ) - 1 ) ) - 1 ) ;
assert ( ( db - > mLock & msk ) = = rwclient ) ;
# endif
}
/*
* * Open a new connection to database zFilename .
*/
int lsm_open ( lsm_db * pDb , const char * zFilename ) {
int rc ;
if ( pDb - > pDatabase ) {
rc = LSM_MISUSE ;
} else {
char * zFull ;
/* Translate the possibly relative pathname supplied by the user into
* * an absolute pathname . This is required because the supplied path
* * is used ( either directly or with " -log " appended to it ) for more
* * than one purpose - to open both the database and log files , and
* * perhaps to unlink the log file during disconnection . An absolute
* * path is required to ensure that the correct files are operated
* * on even if the application changes the cwd . */
rc = getFullpathname ( pDb - > pEnv , zFilename , & zFull ) ;
assert ( rc = = LSM_OK | | zFull = = 0 ) ;
/* Connect to the database. */
if ( rc = = LSM_OK ) {
rc = lsmDbDatabaseConnect ( pDb , zFull ) ;
}
if ( pDb - > bReadonly = = 0 ) {
/* Configure the file-system connection with the page-size and block-size
* * of this database . Even if the database file is zero bytes in size
* * on disk , these values have been set in shared - memory by now , and so
* * are guaranteed not to change during the lifetime of this connection .
*/
if ( rc = = LSM_OK & & LSM_OK = = ( rc = lsmCheckpointLoad ( pDb , 0 ) ) ) {
lsmFsSetPageSize ( pDb - > pFS , lsmCheckpointPgsz ( pDb - > aSnapshot ) ) ;
lsmFsSetBlockSize ( pDb - > pFS , lsmCheckpointBlksz ( pDb - > aSnapshot ) ) ;
}
}
lsmFree ( pDb - > pEnv , zFull ) ;
assertRwclientLockValue ( pDb ) ;
}
assert ( pDb - > bReadonly = = 0 | | pDb - > bReadonly = = 1 ) ;
assert ( rc ! = LSM_OK | | ( pDb - > pShmhdr = = 0 ) = = ( pDb - > bReadonly = = 1 ) ) ;
return rc ;
}
int lsm_close ( lsm_db * pDb ) {
int rc = LSM_OK ;
if ( pDb ) {
assert_db_state ( pDb ) ;
if ( pDb - > pCsr | | pDb - > nTransOpen ) {
rc = LSM_MISUSE_BKPT ;
} else {
lsmMCursorFreeCache ( pDb ) ;
lsmFreeSnapshot ( pDb - > pEnv , pDb - > pClient ) ;
pDb - > pClient = 0 ;
assertRwclientLockValue ( pDb ) ;
lsmDbDatabaseRelease ( pDb ) ;
lsmLogClose ( pDb ) ;
lsmFsClose ( pDb - > pFS ) ;
2017-07-04 19:23:32 +00:00
/* assert( pDb->mLock==0 ); */
2015-11-16 16:00:22 +00:00
/* Invoke any destructors registered for the compression or
* * compression factory callbacks . */
if ( pDb - > factory . xFree ) pDb - > factory . xFree ( pDb - > factory . pCtx ) ;
if ( pDb - > compress . xFree ) pDb - > compress . xFree ( pDb - > compress . pCtx ) ;
lsmFree ( pDb - > pEnv , pDb - > rollback . aArray ) ;
lsmFree ( pDb - > pEnv , pDb - > aTrans ) ;
lsmFree ( pDb - > pEnv , pDb - > apShm ) ;
lsmFree ( pDb - > pEnv , pDb ) ;
}
}
return rc ;
}
int lsm_config ( lsm_db * pDb , int eParam , . . . ) {
int rc = LSM_OK ;
va_list ap ;
va_start ( ap , eParam ) ;
switch ( eParam ) {
case LSM_CONFIG_AUTOFLUSH : {
/* This parameter is read and written in KB. But all internal
* * processing is done in bytes . */
int * piVal = va_arg ( ap , int * ) ;
int iVal = * piVal ;
if ( iVal > = 0 & & iVal < = ( 1024 * 1024 ) ) {
pDb - > nTreeLimit = iVal * 1024 ;
}
* piVal = ( pDb - > nTreeLimit / 1024 ) ;
break ;
}
case LSM_CONFIG_AUTOWORK : {
int * piVal = va_arg ( ap , int * ) ;
if ( * piVal > = 0 ) {
pDb - > bAutowork = * piVal ;
}
* piVal = pDb - > bAutowork ;
break ;
}
case LSM_CONFIG_AUTOCHECKPOINT : {
/* This parameter is read and written in KB. But all internal processing
* * ( including the lsm_db . nAutockpt variable ) is done in bytes . */
int * piVal = va_arg ( ap , int * ) ;
if ( * piVal > = 0 ) {
int iVal = * piVal ;
pDb - > nAutockpt = ( i64 ) iVal * 1024 ;
}
* piVal = ( int ) ( pDb - > nAutockpt / 1024 ) ;
break ;
}
case LSM_CONFIG_PAGE_SIZE : {
int * piVal = va_arg ( ap , int * ) ;
if ( pDb - > pDatabase ) {
/* If lsm_open() has been called, this is a read-only parameter.
* * Set the output variable to the page - size according to the
* * FileSystem object . */
* piVal = lsmFsPageSize ( pDb - > pFS ) ;
} else {
if ( * piVal > = 256 & & * piVal < = 65536 & & ( ( * piVal - 1 ) & * piVal ) = = 0 ) {
pDb - > nDfltPgsz = * piVal ;
} else {
* piVal = pDb - > nDfltPgsz ;
}
}
break ;
}
case LSM_CONFIG_BLOCK_SIZE : {
/* This parameter is read and written in KB. But all internal
* * processing is done in bytes . */
int * piVal = va_arg ( ap , int * ) ;
if ( pDb - > pDatabase ) {
/* If lsm_open() has been called, this is a read-only parameter.
* * Set the output variable to the block - size in KB according to the
* * FileSystem object . */
* piVal = lsmFsBlockSize ( pDb - > pFS ) / 1024 ;
} else {
int iVal = * piVal ;
if ( iVal > = 64 & & iVal < = 65536 & & ( ( iVal - 1 ) & iVal ) = = 0 ) {
pDb - > nDfltBlksz = iVal * 1024 ;
} else {
* piVal = pDb - > nDfltBlksz / 1024 ;
}
}
break ;
}
case LSM_CONFIG_SAFETY : {
int * piVal = va_arg ( ap , int * ) ;
if ( * piVal > = 0 & & * piVal < = 2 ) {
pDb - > eSafety = * piVal ;
}
* piVal = pDb - > eSafety ;
break ;
}
case LSM_CONFIG_MMAP : {
int * piVal = va_arg ( ap , int * ) ;
if ( pDb - > iReader < 0 & & * piVal > = 0 ) {
pDb - > iMmap = * piVal ;
rc = lsmFsConfigure ( pDb ) ;
}
* piVal = pDb - > iMmap ;
break ;
}
case LSM_CONFIG_USE_LOG : {
int * piVal = va_arg ( ap , int * ) ;
if ( pDb - > nTransOpen = = 0 & & ( * piVal = = 0 | | * piVal = = 1 ) ) {
pDb - > bUseLog = * piVal ;
}
* piVal = pDb - > bUseLog ;
break ;
}
case LSM_CONFIG_AUTOMERGE : {
int * piVal = va_arg ( ap , int * ) ;
if ( * piVal > 1 ) pDb - > nMerge = * piVal ;
* piVal = pDb - > nMerge ;
break ;
}
case LSM_CONFIG_MAX_FREELIST : {
int * piVal = va_arg ( ap , int * ) ;
if ( * piVal > = 2 & & * piVal < = LSM_MAX_FREELIST_ENTRIES ) {
pDb - > nMaxFreelist = * piVal ;
}
* piVal = pDb - > nMaxFreelist ;
break ;
}
case LSM_CONFIG_MULTIPLE_PROCESSES : {
int * piVal = va_arg ( ap , int * ) ;
if ( pDb - > pDatabase ) {
/* If lsm_open() has been called, this is a read-only parameter.
* * Set the output variable to true if this connection is currently
* * in multi - process mode . */
* piVal = lsmDbMultiProc ( pDb ) ;
} else {
pDb - > bMultiProc = * piVal = ( * piVal ! = 0 ) ;
}
break ;
}
case LSM_CONFIG_READONLY : {
int * piVal = va_arg ( ap , int * ) ;
/* If lsm_open() has been called, this is a read-only parameter. */
if ( pDb - > pDatabase = = 0 & & * piVal > = 0 ) {
pDb - > bReadonly = * piVal = ( * piVal ! = 0 ) ;
}
* piVal = pDb - > bReadonly ;
break ;
}
case LSM_CONFIG_SET_COMPRESSION : {
lsm_compress * p = va_arg ( ap , lsm_compress * ) ;
if ( pDb - > iReader > = 0 & & pDb - > bInFactory = = 0 ) {
/* May not change compression schemes with an open transaction */
rc = LSM_MISUSE_BKPT ;
} else {
if ( pDb - > compress . xFree ) {
/* Invoke any destructor belonging to the current compression. */
pDb - > compress . xFree ( pDb - > compress . pCtx ) ;
}
if ( p - > xBound = = 0 ) {
memset ( & pDb - > compress , 0 , sizeof ( lsm_compress ) ) ;
pDb - > compress . iId = LSM_COMPRESSION_NONE ;
} else {
memcpy ( & pDb - > compress , p , sizeof ( lsm_compress ) ) ;
}
rc = lsmFsConfigure ( pDb ) ;
}
break ;
}
case LSM_CONFIG_SET_COMPRESSION_FACTORY : {
lsm_compress_factory * p = va_arg ( ap , lsm_compress_factory * ) ;
if ( pDb - > factory . xFree ) {
/* Invoke any destructor belonging to the current factory. */
pDb - > factory . xFree ( pDb - > factory . pCtx ) ;
}
memcpy ( & pDb - > factory , p , sizeof ( lsm_compress_factory ) ) ;
break ;
}
case LSM_CONFIG_GET_COMPRESSION : {
lsm_compress * p = va_arg ( ap , lsm_compress * ) ;
memcpy ( p , & pDb - > compress , sizeof ( lsm_compress ) ) ;
break ;
}
default :
rc = LSM_MISUSE ;
break ;
}
va_end ( ap ) ;
return rc ;
}
void lsmAppendSegmentList ( LsmString * pStr , char * zPre , Segment * pSeg ) {
2023-01-09 21:10:42 +00:00
lsmStringAppendf ( pStr , " %s{%lld %lld %lld %lld} " , zPre ,
2015-11-16 16:00:22 +00:00
pSeg - > iFirst , pSeg - > iLastPg , pSeg - > iRoot , pSeg - > nSize
) ;
}
static int infoGetWorker ( lsm_db * pDb , Snapshot * * pp , int * pbUnlock ) {
int rc = LSM_OK ;
assert ( * pbUnlock = = 0 ) ;
if ( ! pDb - > pWorker ) {
rc = lsmBeginWork ( pDb ) ;
if ( rc ! = LSM_OK ) return rc ;
* pbUnlock = 1 ;
}
if ( pp ) * pp = pDb - > pWorker ;
return rc ;
}
static void infoFreeWorker ( lsm_db * pDb , int bUnlock ) {
if ( bUnlock ) {
int rcdummy = LSM_BUSY ;
lsmFinishWork ( pDb , 0 , & rcdummy ) ;
}
}
int lsmStructList (
lsm_db * pDb , /* Database handle */
char * * pzOut /* OUT: Nul-terminated string (tcl list) */
) {
Level * pTopLevel = 0 ; /* Top level of snapshot to report on */
int rc = LSM_OK ;
Level * p ;
LsmString s ;
Snapshot * pWorker ; /* Worker snapshot */
int bUnlock = 0 ;
/* Obtain the worker snapshot */
rc = infoGetWorker ( pDb , & pWorker , & bUnlock ) ;
if ( rc ! = LSM_OK ) return rc ;
/* Format the contents of the snapshot as text */
pTopLevel = lsmDbSnapshotLevel ( pWorker ) ;
lsmStringInit ( & s , pDb - > pEnv ) ;
for ( p = pTopLevel ; rc = = LSM_OK & & p ; p = p - > pNext ) {
int i ;
lsmStringAppendf ( & s , " %s{%d " , ( s . n ? " " : " " ) , ( int ) p - > iAge ) ;
lsmAppendSegmentList ( & s , " " , & p - > lhs ) ;
for ( i = 0 ; rc = = LSM_OK & & i < p - > nRight ; i + + ) {
lsmAppendSegmentList ( & s , " " , & p - > aRhs [ i ] ) ;
}
lsmStringAppend ( & s , " } " , 1 ) ;
}
rc = s . n > = 0 ? LSM_OK : LSM_NOMEM ;
/* Release the snapshot and return */
infoFreeWorker ( pDb , bUnlock ) ;
* pzOut = s . z ;
return rc ;
}
static int infoFreelistCb ( void * pCtx , int iBlk , i64 iSnapshot ) {
LsmString * pStr = ( LsmString * ) pCtx ;
lsmStringAppendf ( pStr , " %s{%d %lld} " , ( pStr - > n ? " " : " " ) , iBlk , iSnapshot ) ;
return 0 ;
}
int lsmInfoFreelist ( lsm_db * pDb , char * * pzOut ) {
Snapshot * pWorker ; /* Worker snapshot */
int bUnlock = 0 ;
LsmString s ;
int rc ;
/* Obtain the worker snapshot */
rc = infoGetWorker ( pDb , & pWorker , & bUnlock ) ;
if ( rc ! = LSM_OK ) return rc ;
lsmStringInit ( & s , pDb - > pEnv ) ;
rc = lsmWalkFreelist ( pDb , 0 , infoFreelistCb , & s ) ;
if ( rc ! = LSM_OK ) {
lsmFree ( pDb - > pEnv , s . z ) ;
} else {
* pzOut = s . z ;
}
/* Release the snapshot and return */
infoFreeWorker ( pDb , bUnlock ) ;
return rc ;
}
static int infoTreeSize ( lsm_db * db , int * pnOldKB , int * pnNewKB ) {
ShmHeader * pShm = db - > pShmhdr ;
TreeHeader * p = & pShm - > hdr1 ;
/* The following code suffers from two race conditions, as it accesses and
* * trusts the contents of shared memory without verifying checksums :
* *
* * * The two values read - TreeHeader . root . nByte and oldroot . nByte - are
* * 32 - bit fields . It is assumed that reading from one of these
* * is atomic - that it is not possible to read a partially written
* * garbage value . However the two values may be mutually inconsistent .
* *
* * * TreeHeader . iLogOff is a 64 - bit value . And lsmCheckpointLogOffset ( )
* * reads a 64 - bit value from a snapshot stored in shared memory . It
* * is assumed that in each case it is possible to read a partially
* * written garbage value . If this occurs , then the value returned
* * for the size of the " old " tree may reflect the size of an " old "
* * tree that was recently flushed to disk .
* *
* * Given the context in which this function is called ( as a result of an
* * lsm_info ( LSM_INFO_TREE_SIZE ) request ) , neither of these are considered to
* * be problems .
*/
* pnNewKB = ( ( int ) p - > root . nByte + 1023 ) / 1024 ;
if ( p - > iOldShmid ) {
if ( p - > iOldLog = = lsmCheckpointLogOffset ( pShm - > aSnap1 ) ) {
* pnOldKB = 0 ;
} else {
* pnOldKB = ( ( int ) p - > oldroot . nByte + 1023 ) / 1024 ;
}
} else {
* pnOldKB = 0 ;
}
return LSM_OK ;
}
int lsm_info ( lsm_db * pDb , int eParam , . . . ) {
int rc = LSM_OK ;
va_list ap ;
va_start ( ap , eParam ) ;
switch ( eParam ) {
case LSM_INFO_NWRITE : {
int * piVal = va_arg ( ap , int * ) ;
* piVal = lsmFsNWrite ( pDb - > pFS ) ;
break ;
}
case LSM_INFO_NREAD : {
int * piVal = va_arg ( ap , int * ) ;
* piVal = lsmFsNRead ( pDb - > pFS ) ;
break ;
}
case LSM_INFO_DB_STRUCTURE : {
char * * pzVal = va_arg ( ap , char * * ) ;
rc = lsmStructList ( pDb , pzVal ) ;
break ;
}
case LSM_INFO_ARRAY_STRUCTURE : {
2017-12-13 15:32:33 +00:00
LsmPgno pgno = va_arg ( ap , LsmPgno ) ;
2015-11-16 16:00:22 +00:00
char * * pzVal = va_arg ( ap , char * * ) ;
rc = lsmInfoArrayStructure ( pDb , 0 , pgno , pzVal ) ;
break ;
}
case LSM_INFO_ARRAY_PAGES : {
2017-12-13 15:32:33 +00:00
LsmPgno pgno = va_arg ( ap , LsmPgno ) ;
2015-11-16 16:00:22 +00:00
char * * pzVal = va_arg ( ap , char * * ) ;
rc = lsmInfoArrayPages ( pDb , pgno , pzVal ) ;
break ;
}
case LSM_INFO_PAGE_HEX_DUMP :
case LSM_INFO_PAGE_ASCII_DUMP : {
2017-12-13 15:32:33 +00:00
LsmPgno pgno = va_arg ( ap , LsmPgno ) ;
2015-11-16 16:00:22 +00:00
char * * pzVal = va_arg ( ap , char * * ) ;
int bUnlock = 0 ;
rc = infoGetWorker ( pDb , 0 , & bUnlock ) ;
if ( rc = = LSM_OK ) {
int bHex = ( eParam = = LSM_INFO_PAGE_HEX_DUMP ) ;
rc = lsmInfoPageDump ( pDb , pgno , bHex , pzVal ) ;
}
infoFreeWorker ( pDb , bUnlock ) ;
break ;
}
case LSM_INFO_LOG_STRUCTURE : {
char * * pzVal = va_arg ( ap , char * * ) ;
rc = lsmInfoLogStructure ( pDb , pzVal ) ;
break ;
}
case LSM_INFO_FREELIST : {
char * * pzVal = va_arg ( ap , char * * ) ;
rc = lsmInfoFreelist ( pDb , pzVal ) ;
break ;
}
case LSM_INFO_CHECKPOINT_SIZE : {
int * pnKB = va_arg ( ap , int * ) ;
rc = lsmCheckpointSize ( pDb , pnKB ) ;
break ;
}
case LSM_INFO_TREE_SIZE : {
int * pnOld = va_arg ( ap , int * ) ;
int * pnNew = va_arg ( ap , int * ) ;
rc = infoTreeSize ( pDb , pnOld , pnNew ) ;
break ;
}
case LSM_INFO_COMPRESSION_ID : {
unsigned int * piOut = va_arg ( ap , unsigned int * ) ;
if ( pDb - > pClient ) {
* piOut = pDb - > pClient - > iCmpId ;
} else {
rc = lsmInfoCompressionId ( pDb , piOut ) ;
}
break ;
}
default :
rc = LSM_MISUSE ;
break ;
}
va_end ( ap ) ;
return rc ;
}
static int doWriteOp (
lsm_db * pDb ,
int bDeleteRange ,
const void * pKey , int nKey , /* Key to write or delete */
const void * pVal , int nVal /* Value to write. Or nVal==-1 for a delete */
) {
int rc = LSM_OK ; /* Return code */
int bCommit = 0 ; /* True to commit before returning */
if ( pDb - > nTransOpen = = 0 ) {
bCommit = 1 ;
rc = lsm_begin ( pDb , 1 ) ;
}
if ( rc = = LSM_OK ) {
2017-07-03 09:00:18 +00:00
int eType = ( bDeleteRange ? LSM_DRANGE : ( nVal > = 0 ? LSM_WRITE : LSM_DELETE ) ) ;
rc = lsmLogWrite ( pDb , eType , ( void * ) pKey , nKey , ( void * ) pVal , nVal ) ;
2015-11-16 16:00:22 +00:00
}
lsmSortedSaveTreeCursors ( pDb ) ;
if ( rc = = LSM_OK ) {
int pgsz = lsmFsPageSize ( pDb - > pFS ) ;
int nQuant = LSM_AUTOWORK_QUANT * pgsz ;
int nBefore ;
int nAfter ;
int nDiff ;
if ( nQuant > pDb - > nTreeLimit ) {
2018-01-05 07:57:54 +00:00
nQuant = LSM_MAX ( pDb - > nTreeLimit , pgsz ) ;
2015-11-16 16:00:22 +00:00
}
nBefore = lsmTreeSize ( pDb ) ;
if ( bDeleteRange ) {
rc = lsmTreeDelete ( pDb , ( void * ) pKey , nKey , ( void * ) pVal , nVal ) ;
} else {
rc = lsmTreeInsert ( pDb , ( void * ) pKey , nKey , ( void * ) pVal , nVal ) ;
}
nAfter = lsmTreeSize ( pDb ) ;
nDiff = ( nAfter / nQuant ) - ( nBefore / nQuant ) ;
if ( rc = = LSM_OK & & pDb - > bAutowork & & nDiff ! = 0 ) {
rc = lsmSortedAutoWork ( pDb , nDiff * LSM_AUTOWORK_QUANT ) ;
}
}
/* If a transaction was opened at the start of this function, commit it.
* * Or , if an error has occurred , roll it back . */
if ( bCommit ) {
if ( rc = = LSM_OK ) {
rc = lsm_commit ( pDb , 0 ) ;
} else {
lsm_rollback ( pDb , 0 ) ;
}
}
return rc ;
}
/*
* * Write a new value into the database .
*/
int lsm_insert (
lsm_db * db , /* Database connection */
const void * pKey , int nKey , /* Key to write or delete */
const void * pVal , int nVal /* Value to write. Or nVal==-1 for a delete */
) {
return doWriteOp ( db , 0 , pKey , nKey , pVal , nVal ) ;
}
/*
* * Delete a value from the database .
*/
int lsm_delete ( lsm_db * db , const void * pKey , int nKey ) {
return doWriteOp ( db , 0 , pKey , nKey , 0 , - 1 ) ;
}
/*
* * Delete a range of database keys .
*/
int lsm_delete_range (
lsm_db * db , /* Database handle */
const void * pKey1 , int nKey1 , /* Lower bound of range to delete */
const void * pKey2 , int nKey2 /* Upper bound of range to delete */
) {
int rc = LSM_OK ;
if ( db - > xCmp ( ( void * ) pKey1 , nKey1 , ( void * ) pKey2 , nKey2 ) < 0 ) {
rc = doWriteOp ( db , 1 , pKey1 , nKey1 , pKey2 , nKey2 ) ;
}
return rc ;
}
/*
* * Open a new cursor handle .
* *
* * If there are currently no other open cursor handles , and no open write
* * transaction , open a read transaction here .
*/
int lsm_csr_open ( lsm_db * pDb , lsm_cursor * * ppCsr ) {
int rc = LSM_OK ; /* Return code */
MultiCursor * pCsr = 0 ; /* New cursor object */
/* Open a read transaction if one is not already open. */
assert_db_state ( pDb ) ;
if ( pDb - > pShmhdr = = 0 ) {
assert ( pDb - > bReadonly ) ;
rc = lsmBeginRoTrans ( pDb ) ;
} else if ( pDb - > iReader < 0 ) {
rc = lsmBeginReadTrans ( pDb ) ;
}
/* Allocate the multi-cursor. */
if ( rc = = LSM_OK ) {
rc = lsmMCursorNew ( pDb , & pCsr ) ;
}
/* If an error has occured, set the output to NULL and delete any partially
* * allocated cursor . If this means there are no open cursors , release the
* * client snapshot . */
if ( rc ! = LSM_OK ) {
lsmMCursorClose ( pCsr , 0 ) ;
dbReleaseClientSnapshot ( pDb ) ;
}
assert_db_state ( pDb ) ;
* ppCsr = ( lsm_cursor * ) pCsr ;
return rc ;
}
/*
* * Close a cursor opened using lsm_csr_open ( ) .
*/
int lsm_csr_close ( lsm_cursor * p ) {
if ( p ) {
lsm_db * pDb = lsmMCursorDb ( ( MultiCursor * ) p ) ;
assert_db_state ( pDb ) ;
lsmMCursorClose ( ( MultiCursor * ) p , 1 ) ;
dbReleaseClientSnapshot ( pDb ) ;
assert_db_state ( pDb ) ;
}
return LSM_OK ;
}
/*
* * Attempt to seek the cursor to the database entry specified by pKey / nKey .
* * If an error occurs ( e . g . an OOM or IO error ) , return an LSM error code .
* * Otherwise , return LSM_OK .
*/
int lsm_csr_seek ( lsm_cursor * pCsr , const void * pKey , int nKey , int eSeek ) {
return lsmMCursorSeek ( ( MultiCursor * ) pCsr , 0 , ( void * ) pKey , nKey , eSeek ) ;
}
int lsm_csr_next ( lsm_cursor * pCsr ) {
return lsmMCursorNext ( ( MultiCursor * ) pCsr ) ;
}
int lsm_csr_prev ( lsm_cursor * pCsr ) {
return lsmMCursorPrev ( ( MultiCursor * ) pCsr ) ;
}
int lsm_csr_first ( lsm_cursor * pCsr ) {
return lsmMCursorFirst ( ( MultiCursor * ) pCsr ) ;
}
int lsm_csr_last ( lsm_cursor * pCsr ) {
return lsmMCursorLast ( ( MultiCursor * ) pCsr ) ;
}
int lsm_csr_valid ( lsm_cursor * pCsr ) {
return lsmMCursorValid ( ( MultiCursor * ) pCsr ) ;
}
int lsm_csr_key ( lsm_cursor * pCsr , const void * * ppKey , int * pnKey ) {
return lsmMCursorKey ( ( MultiCursor * ) pCsr , ( void * * ) ppKey , pnKey ) ;
}
int lsm_csr_value ( lsm_cursor * pCsr , const void * * ppVal , int * pnVal ) {
return lsmMCursorValue ( ( MultiCursor * ) pCsr , ( void * * ) ppVal , pnVal ) ;
}
void lsm_config_log (
lsm_db * pDb ,
void ( * xLog ) ( void * , int , const char * ) ,
void * pCtx
) {
pDb - > xLog = xLog ;
pDb - > pLogCtx = pCtx ;
}
void lsm_config_work_hook (
lsm_db * pDb ,
void ( * xWork ) ( lsm_db * , void * ) ,
void * pCtx
) {
pDb - > xWork = xWork ;
pDb - > pWorkCtx = pCtx ;
}
void lsmLogMessage ( lsm_db * pDb , int rc , const char * zFormat , . . . ) {
if ( pDb - > xLog ) {
LsmString s ;
va_list ap , ap2 ;
lsmStringInit ( & s , pDb - > pEnv ) ;
va_start ( ap , zFormat ) ;
va_start ( ap2 , zFormat ) ;
lsmStringVAppendf ( & s , zFormat , ap , ap2 ) ;
va_end ( ap ) ;
va_end ( ap2 ) ;
pDb - > xLog ( pDb - > pLogCtx , rc , s . z ) ;
lsmStringClear ( & s ) ;
}
}
int lsm_begin ( lsm_db * pDb , int iLevel ) {
int rc ;
assert_db_state ( pDb ) ;
rc = ( pDb - > bReadonly ? LSM_READONLY : LSM_OK ) ;
/* A value less than zero means open one more transaction. */
if ( iLevel < 0 ) iLevel = pDb - > nTransOpen + 1 ;
if ( iLevel > pDb - > nTransOpen ) {
int i ;
/* Extend the pDb->aTrans[] array if required. */
if ( rc = = LSM_OK & & pDb - > nTransAlloc < iLevel ) {
TransMark * aNew ; /* New allocation */
int nByte = sizeof ( TransMark ) * ( iLevel + 1 ) ;
aNew = ( TransMark * ) lsmRealloc ( pDb - > pEnv , pDb - > aTrans , nByte ) ;
if ( ! aNew ) {
rc = LSM_NOMEM ;
} else {
nByte = sizeof ( TransMark ) * ( iLevel + 1 - pDb - > nTransAlloc ) ;
memset ( & aNew [ pDb - > nTransAlloc ] , 0 , nByte ) ;
pDb - > nTransAlloc = iLevel + 1 ;
pDb - > aTrans = aNew ;
}
}
if ( rc = = LSM_OK & & pDb - > nTransOpen = = 0 ) {
rc = lsmBeginWriteTrans ( pDb ) ;
}
if ( rc = = LSM_OK ) {
for ( i = pDb - > nTransOpen ; i < iLevel ; i + + ) {
lsmTreeMark ( pDb , & pDb - > aTrans [ i ] . tree ) ;
lsmLogTell ( pDb , & pDb - > aTrans [ i ] . log ) ;
}
pDb - > nTransOpen = iLevel ;
}
}
return rc ;
}
int lsm_commit ( lsm_db * pDb , int iLevel ) {
int rc = LSM_OK ;
assert_db_state ( pDb ) ;
/* A value less than zero means close the innermost nested transaction. */
if ( iLevel < 0 ) iLevel = LSM_MAX ( 0 , pDb - > nTransOpen - 1 ) ;
if ( iLevel < pDb - > nTransOpen ) {
if ( iLevel = = 0 ) {
2017-07-04 19:23:32 +00:00
int rc2 ;
2015-11-16 16:00:22 +00:00
/* Commit the transaction to disk. */
if ( rc = = LSM_OK ) rc = lsmLogCommit ( pDb ) ;
if ( rc = = LSM_OK & & pDb - > eSafety = = LSM_SAFETY_FULL ) {
rc = lsmFsSyncLog ( pDb - > pFS ) ;
}
2017-07-04 19:23:32 +00:00
rc2 = lsmFinishWriteTrans ( pDb , ( rc = = LSM_OK ) ) ;
if ( rc = = LSM_OK ) rc = rc2 ;
2015-11-16 16:00:22 +00:00
}
pDb - > nTransOpen = iLevel ;
}
dbReleaseClientSnapshot ( pDb ) ;
return rc ;
}
int lsm_rollback ( lsm_db * pDb , int iLevel ) {
int rc = LSM_OK ;
assert_db_state ( pDb ) ;
if ( pDb - > nTransOpen ) {
/* A value less than zero means close the innermost nested transaction. */
if ( iLevel < 0 ) iLevel = LSM_MAX ( 0 , pDb - > nTransOpen - 1 ) ;
if ( iLevel < = pDb - > nTransOpen ) {
TransMark * pMark = & pDb - > aTrans [ ( iLevel = = 0 ? 0 : iLevel - 1 ) ] ;
lsmTreeRollback ( pDb , & pMark - > tree ) ;
if ( iLevel ) lsmLogSeek ( pDb , & pMark - > log ) ;
pDb - > nTransOpen = iLevel ;
}
if ( pDb - > nTransOpen = = 0 ) {
lsmFinishWriteTrans ( pDb , 0 ) ;
}
dbReleaseClientSnapshot ( pDb ) ;
}
return rc ;
}
int lsm_get_user_version ( lsm_db * pDb , unsigned int * piUsr ) {
int rc = LSM_OK ; /* Return code */
/* Open a read transaction if one is not already open. */
assert_db_state ( pDb ) ;
if ( pDb - > pShmhdr = = 0 ) {
assert ( pDb - > bReadonly ) ;
rc = lsmBeginRoTrans ( pDb ) ;
} else if ( pDb - > iReader < 0 ) {
rc = lsmBeginReadTrans ( pDb ) ;
}
/* Allocate the multi-cursor. */
if ( rc = = LSM_OK ) {
* piUsr = pDb - > treehdr . iUsrVersion ;
}
dbReleaseClientSnapshot ( pDb ) ;
assert_db_state ( pDb ) ;
return rc ;
}
int lsm_set_user_version ( lsm_db * pDb , unsigned int iUsr ) {
int rc = LSM_OK ; /* Return code */
int bCommit = 0 ; /* True to commit before returning */
if ( pDb - > nTransOpen = = 0 ) {
bCommit = 1 ;
rc = lsm_begin ( pDb , 1 ) ;
}
if ( rc = = LSM_OK ) {
pDb - > treehdr . iUsrVersion = iUsr ;
}
/* If a transaction was opened at the start of this function, commit it.
* * Or , if an error has occurred , roll it back . */
if ( bCommit ) {
if ( rc = = LSM_OK ) {
rc = lsm_commit ( pDb , 0 ) ;
} else {
lsm_rollback ( pDb , 0 ) ;
}
}
return rc ;
}